Coverage for distro_tracker/signon/signon.py: 99%

111 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-01-12 09:15 +0000

1# Copyright 2020-2023 Enrico Zini <enrico@debian.org> 

2# Copyright 2023 The Debusine Developers 

3# See the COPYRIGHT file at the top-level directory of this distribution 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10 

11"""Logic to authenticate a request using signon Providers.""" 

12import functools 

13import logging 

14from collections.abc import Iterable, Sequence 

15 

16import django.http 

17from django.conf import settings 

18from django.contrib import auth 

19from django.contrib.auth import load_backend 

20from django.contrib.auth.hashers import make_password 

21from django.core.exceptions import PermissionDenied, ValidationError 

22 

23from distro_tracker.signon import providers 

24from distro_tracker.signon.auth import SignonAuthBackend 

25from distro_tracker.signon.models import Identity 

26from distro_tracker.signon.utils import split_full_name 

27from django_email_accounts.models import User 

28 

29log = logging.getLogger(__name__) 

30 

31 

32class Signon: 

33 """ 

34 Backend used to interact with external authentication providers. 

35 

36 This is setup by SignonMiddleware as request.signon. 

37 

38 The constructor needs to be as lightweight as possible, as it is called on 

39 every request. Everything else is loaded only when needed. 

40 """ 

41 

42 def __init__(self, request: django.http.HttpRequest) -> None: 

43 """Create a Signon object for a request.""" 

44 self.request = request 

45 self.providers: Sequence[providers.Provider] = getattr( 

46 settings, "SIGNON_PROVIDERS", () 

47 ) 

48 

49 def status( 

50 self, 

51 ) -> Iterable[tuple[providers.BoundProvider, Identity | None]]: 

52 """ 

53 Query the status of remote authentication providers. 

54 

55 :returns: an iterable of ``(bound_provider, identity | None)`` 

56 """ 

57 for provider in self.providers: 

58 bound = provider.bind(self.request) 

59 identity = self.identities.get(provider.name) 

60 yield bound, identity 

61 

62 @functools.cached_property 

63 def identities(self) -> dict[str, Identity]: 

64 """Lazily populate self.identities.""" 

65 return self._compute_identities() 

66 

67 def logout_identities(self) -> None: 

68 """Deactivate all active external identities.""" 

69 for provider in self.providers: 

70 bound = provider.bind(self.request) 

71 bound.logout() 

72 self._remove_invalid_signon_user() 

73 

74 def _compute_identities(self) -> dict[str, Identity]: 

75 """ 

76 Instantiate valid Identity entries for this request. 

77 

78 Delegate Provider objects with looking up valid Identity objects from 

79 the current request. 

80 """ 

81 identities = {} 

82 

83 for provider in self.providers: 

84 pk = self.request.session.get(f"signon_identity_{provider.name}") 

85 if pk is None: 

86 continue 

87 

88 try: 

89 identity = Identity.objects.get(pk=pk, issuer=provider.name) 

90 except Identity.DoesNotExist: 

91 # If the session has a broken Identity ID, remove it 

92 del self.request.session[f"signon_identity_{provider.name}"] 

93 continue 

94 

95 identities[provider.name] = identity 

96 

97 return identities 

98 

99 def activate_identity(self, identity: Identity, *options: str) -> None: 

100 """Activate the given identity and update authentication accordingly.""" 

101 if self.request.user.is_authenticated: 

102 if "bind" not in options: 

103 raise PermissionDenied("user is already logged in") 

104 # Bind the current user to the identity 

105 log.info("%s: auto associated to %s", self.request.user, identity) 

106 identity.user = self.request.user 

107 identity.save() 

108 elif identity.user is None: 

109 if (user := self._map_identity_to_user(identity)) is not None: 

110 log.debug("logging in autocreated user %s", user) 

111 self.request.user = user 

112 auth.login( 

113 self.request, 

114 user, 

115 backend="distro_tracker.signon.auth.SignonAuthBackend", 

116 ) 

117 else: 

118 log.debug("logging in user %s", identity.user) 

119 auth.login( 

120 self.request, 

121 identity.user, 

122 backend="distro_tracker.signon.auth.SignonAuthBackend", 

123 ) 

124 

125 self.request.session[f"signon_identity_{identity.issuer}"] = identity.pk 

126 

127 def _remove_invalid_signon_user(self) -> None: 

128 """ 

129 Log out an externally authenticated user. 

130 

131 This is used to invalidate credentials in case a consistency check 

132 failed between active identities. 

133 

134 Log out only happens if the user was authenticated via SignonMiddleware 

135 """ 

136 try: 

137 stored_backend = load_backend( 

138 self.request.session.get(auth.BACKEND_SESSION_KEY, '') 

139 ) 

140 except ImportError: 

141 # backend failed to load 

142 auth.logout(self.request) 

143 else: 

144 if isinstance(stored_backend, SignonAuthBackend): 

145 auth.logout(self.request) 

146 

147 def _map_identity_to_user(self, identity: Identity) -> User | None: 

148 """Create a user model from the given identity, and bind it to it.""" 

149 # Validate identity claims 

150 if not (provider := self.get_provider_for_identity(identity)): 

151 return None 

152 if claim_errors := provider.validate_claims(identity): 

153 for error in claim_errors: 

154 log.warning("%s", error) 

155 return None 

156 

157 # First lookup an existing user 

158 user = self._lookup_user_from_identity(identity) 

159 if user is not None: 

160 log.info("%s: user matched to identity %s", user, identity) 

161 else: 

162 # Else try to create a new user 

163 user = self.create_user_from_identity(identity) 

164 if user is not None: 

165 log.info("%s: auto created from identity %s", user, identity) 

166 else: 

167 return None 

168 

169 # A user was found or created: bind it to the identity 

170 identity.user = user 

171 identity.save() 

172 log.info("%s: bound to identity %s", user, identity) 

173 return user 

174 

175 def get_provider_for_identity( 

176 self, identity: Identity 

177 ) -> providers.Provider | None: 

178 """Find the Provider for an identity.""" 

179 for provider in self.providers: 

180 if provider.name == identity.issuer: 

181 return provider 

182 

183 log.warning( 

184 "identity %s has unknown issuer %s", identity, identity.issuer 

185 ) 

186 return None 

187 

188 def _lookup_user_from_identity(self, identity: Identity) -> User | None: 

189 """Lookup an existing user from claims in an Identity.""" 

190 User = auth.get_user_model() 

191 try: 

192 return User.objects.get(main_email=identity.claims["email"]) 

193 except User.DoesNotExist: 

194 return None 

195 

196 def create_user_from_identity(self, identity: Identity) -> User | None: 

197 """ 

198 Lookup or create a user from the data in an Identity. 

199 

200 This is a default implementation. It can be customized by subclassing 

201 Signon and using settings.SIGNON_CLASS 

202 """ 

203 User = auth.get_user_model() 

204 first_name, last_name = split_full_name(identity.claims["name"]) 

205 

206 # Django does not run validators on create_user, so garbage in the 

207 # claims can either create garbage users, or cause database transaction 

208 # errors that will invalidate the current transaction. 

209 # 

210 # See: https://stackoverflow.com/questions/67442439/why-django-does-not-validate-email-in-customuser-model # noqa: E501 

211 

212 # Instead of calling create_user, I instead have to replicate what it 

213 # does here and call validation explicitly before save. 

214 

215 # This is the equivalent of the following, with validation: 

216 # user = User.objects.create_user( 

217 # username=identity.claims["email"], 

218 # first_name=first_name, 

219 # last_name=last_name, 

220 # email=identity.claims["email"], 

221 # ) 

222 email = User.objects.normalize_email(identity.claims["email"]) 

223 user = User( 

224 main_email=email, 

225 first_name=first_name, 

226 last_name=last_name, 

227 ) 

228 user.password = make_password(None) 

229 

230 try: 

231 user.clean_fields() 

232 except ValidationError as e: 

233 log.warning("%s: cannot create a local user", identity, exc_info=e) 

234 return None 

235 

236 user.save() 

237 return user