Coverage for distro_tracker/signon/signon.py: 99%
111 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
1# Copyright 2020-2023 Enrico Zini <enrico@debian.org>
2# Copyright 2023 The Debusine Developers
3# See the COPYRIGHT file at the top-level directory of this distribution
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
11"""Logic to authenticate a request using signon Providers."""
12import functools
13import logging
14from collections.abc import Iterable, Sequence
16import django.http
17from django.conf import settings
18from django.contrib import auth
19from django.contrib.auth import load_backend
20from django.contrib.auth.hashers import make_password
21from django.core.exceptions import PermissionDenied, ValidationError
23from distro_tracker.signon import providers
24from distro_tracker.signon.auth import SignonAuthBackend
25from distro_tracker.signon.models import Identity
26from distro_tracker.signon.utils import split_full_name
27from django_email_accounts.models import User
29log = logging.getLogger(__name__)
32class Signon:
33 """
34 Backend used to interact with external authentication providers.
36 This is setup by SignonMiddleware as request.signon.
38 The constructor needs to be as lightweight as possible, as it is called on
39 every request. Everything else is loaded only when needed.
40 """
42 def __init__(self, request: django.http.HttpRequest) -> None:
43 """Create a Signon object for a request."""
44 self.request = request
45 self.providers: Sequence[providers.Provider] = getattr(
46 settings, "SIGNON_PROVIDERS", ()
47 )
49 def status(
50 self,
51 ) -> Iterable[tuple[providers.BoundProvider, Identity | None]]:
52 """
53 Query the status of remote authentication providers.
55 :returns: an iterable of ``(bound_provider, identity | None)``
56 """
57 for provider in self.providers:
58 bound = provider.bind(self.request)
59 identity = self.identities.get(provider.name)
60 yield bound, identity
62 @functools.cached_property
63 def identities(self) -> dict[str, Identity]:
64 """Lazily populate self.identities."""
65 return self._compute_identities()
67 def logout_identities(self) -> None:
68 """Deactivate all active external identities."""
69 for provider in self.providers:
70 bound = provider.bind(self.request)
71 bound.logout()
72 self._remove_invalid_signon_user()
74 def _compute_identities(self) -> dict[str, Identity]:
75 """
76 Instantiate valid Identity entries for this request.
78 Delegate Provider objects with looking up valid Identity objects from
79 the current request.
80 """
81 identities = {}
83 for provider in self.providers:
84 pk = self.request.session.get(f"signon_identity_{provider.name}")
85 if pk is None:
86 continue
88 try:
89 identity = Identity.objects.get(pk=pk, issuer=provider.name)
90 except Identity.DoesNotExist:
91 # If the session has a broken Identity ID, remove it
92 del self.request.session[f"signon_identity_{provider.name}"]
93 continue
95 identities[provider.name] = identity
97 return identities
99 def activate_identity(self, identity: Identity, *options: str) -> None:
100 """Activate the given identity and update authentication accordingly."""
101 if self.request.user.is_authenticated:
102 if "bind" not in options:
103 raise PermissionDenied("user is already logged in")
104 # Bind the current user to the identity
105 log.info("%s: auto associated to %s", self.request.user, identity)
106 identity.user = self.request.user
107 identity.save()
108 elif identity.user is None:
109 if (user := self._map_identity_to_user(identity)) is not None:
110 log.debug("logging in autocreated user %s", user)
111 self.request.user = user
112 auth.login(
113 self.request,
114 user,
115 backend="distro_tracker.signon.auth.SignonAuthBackend",
116 )
117 else:
118 log.debug("logging in user %s", identity.user)
119 auth.login(
120 self.request,
121 identity.user,
122 backend="distro_tracker.signon.auth.SignonAuthBackend",
123 )
125 self.request.session[f"signon_identity_{identity.issuer}"] = identity.pk
127 def _remove_invalid_signon_user(self) -> None:
128 """
129 Log out an externally authenticated user.
131 This is used to invalidate credentials in case a consistency check
132 failed between active identities.
134 Log out only happens if the user was authenticated via SignonMiddleware
135 """
136 try:
137 stored_backend = load_backend(
138 self.request.session.get(auth.BACKEND_SESSION_KEY, '')
139 )
140 except ImportError:
141 # backend failed to load
142 auth.logout(self.request)
143 else:
144 if isinstance(stored_backend, SignonAuthBackend):
145 auth.logout(self.request)
147 def _map_identity_to_user(self, identity: Identity) -> User | None:
148 """Create a user model from the given identity, and bind it to it."""
149 # Validate identity claims
150 if not (provider := self.get_provider_for_identity(identity)):
151 return None
152 if claim_errors := provider.validate_claims(identity):
153 for error in claim_errors:
154 log.warning("%s", error)
155 return None
157 # First lookup an existing user
158 user = self._lookup_user_from_identity(identity)
159 if user is not None:
160 log.info("%s: user matched to identity %s", user, identity)
161 else:
162 # Else try to create a new user
163 user = self.create_user_from_identity(identity)
164 if user is not None:
165 log.info("%s: auto created from identity %s", user, identity)
166 else:
167 return None
169 # A user was found or created: bind it to the identity
170 identity.user = user
171 identity.save()
172 log.info("%s: bound to identity %s", user, identity)
173 return user
175 def get_provider_for_identity(
176 self, identity: Identity
177 ) -> providers.Provider | None:
178 """Find the Provider for an identity."""
179 for provider in self.providers:
180 if provider.name == identity.issuer:
181 return provider
183 log.warning(
184 "identity %s has unknown issuer %s", identity, identity.issuer
185 )
186 return None
188 def _lookup_user_from_identity(self, identity: Identity) -> User | None:
189 """Lookup an existing user from claims in an Identity."""
190 User = auth.get_user_model()
191 try:
192 return User.objects.get(main_email=identity.claims["email"])
193 except User.DoesNotExist:
194 return None
196 def create_user_from_identity(self, identity: Identity) -> User | None:
197 """
198 Lookup or create a user from the data in an Identity.
200 This is a default implementation. It can be customized by subclassing
201 Signon and using settings.SIGNON_CLASS
202 """
203 User = auth.get_user_model()
204 first_name, last_name = split_full_name(identity.claims["name"])
206 # Django does not run validators on create_user, so garbage in the
207 # claims can either create garbage users, or cause database transaction
208 # errors that will invalidate the current transaction.
209 #
210 # See: https://stackoverflow.com/questions/67442439/why-django-does-not-validate-email-in-customuser-model # noqa: E501
212 # Instead of calling create_user, I instead have to replicate what it
213 # does here and call validation explicitly before save.
215 # This is the equivalent of the following, with validation:
216 # user = User.objects.create_user(
217 # username=identity.claims["email"],
218 # first_name=first_name,
219 # last_name=last_name,
220 # email=identity.claims["email"],
221 # )
222 email = User.objects.normalize_email(identity.claims["email"])
223 user = User(
224 main_email=email,
225 first_name=first_name,
226 last_name=last_name,
227 )
228 user.password = make_password(None)
230 try:
231 user.clean_fields()
232 except ValidationError as e:
233 log.warning("%s: cannot create a local user", identity, exc_info=e)
234 return None
236 user.save()
237 return user