Coverage for distro_tracker/signon/providers.py: 100%
129 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
1# Copyright 2020-2023 Enrico Zini <enrico@debian.org>
2# Copyright 2023 The Debusine Developers
3# See the COPYRIGHT file at the top-level directory of this distribution
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
11"""
12Support for external signon providers.
14This is configured by the SIGNON_PROVIDERS variable in django settings.
16SIGNON_PROVIDERS is expected to be a sequence of `Provider` instances, one for
17each supported signon provider.
19Example::
21 # Configure salsa.debian.org as identity provider
22 SIGNON_PROVIDERS=[
23 providers.GitlabProvider(
24 name="salsa",
25 label="Salsa",
26 icon="signon/gitlabian.svg",
27 client_id="123client_id",
28 client_secret="123client_secret",
29 url="https://salsa.debian.org",
30 restrict=["email-verified", "group:debian"],
31 ),
32 ]
34The `restrict` setting can be used to restrict local account creation to
35remote users that have a specific set of claims. It can be set to a sequence of
36strings, each specifying a restriction:
38* `group:name` the given group name must be present
39* `email-verified` the primary email needs to be reported as verified
41All listed restrictions must be met (i.e. they are AND-ed together).
42"""
44import functools
45import json
46from collections.abc import Sequence
47from typing import Any, TYPE_CHECKING
49from django.core.exceptions import ImproperlyConfigured
50from django.utils.crypto import constant_time_compare
52if TYPE_CHECKING: # pragma: no cover
53 import django.http
54 import jwcrypto.jwk
56 from distro_tracker.signon.models import Identity
58# Note: this module is supposed to be imported from settings.py
59#
60# Its module-level import list for the case of defining providers should be
61# kept accordingly minimal
64def get(name: str) -> "Provider":
65 """
66 Look up a provider by name.
68 :param name: name of the provider to look up, matching Provider.name
69 :raises ImproperlyConfigured: if no provider with that name has been
70 defined in settings
71 :return: the Provider instance
72 """
73 from django.conf import settings
74 from django.core.exceptions import ImproperlyConfigured
76 providers = getattr(settings, "SIGNON_PROVIDERS", None)
77 if providers is None:
78 raise ImproperlyConfigured(
79 f"signon provider {name} requested,"
80 " but SIGNON_PROVIDERS is not defined in settings"
81 )
83 # Lookup provider by name
84 for p in providers:
85 if p.name == name:
86 if not isinstance(p, Provider):
87 raise ImproperlyConfigured(
88 f"signon provider {name} requested,"
89 f" but its entry in SIGNON_PROVIDERS setting is not a"
90 f" Provider"
91 )
92 return p
94 raise ImproperlyConfigured(
95 f"signon provider {name} requested,"
96 " but not found in SIGNON_PROVIDERS setting"
97 )
100class BoundProvider:
101 """
102 Request-aware proxy for Provider.
104 This class provides provider-specific functionality based on the current
105 Django request object.
106 """
108 def __init__(
109 self, provider: "Provider", request: "django.http.HttpRequest"
110 ) -> None:
111 """
112 Construct a BoundProvider from a Provider and a HttpRequest.
114 :param provider: provider to bind to a request
115 :param request: current Django request
116 """
117 self.provider = provider
118 self.request = request
120 def __getattr__(self, name: str) -> Any:
121 """Proxy attribute access to the provider definition."""
122 return getattr(self.provider, name)
124 def logout(self) -> None:
125 """Log out an externally authenticated identity."""
126 from distro_tracker.signon.middleware import RequestSignonProtocol
128 assert isinstance(self.request, RequestSignonProtocol)
129 self.request.signon.identities.pop(self.provider.name, None)
130 self.request.session.pop(f"signon_identity_{self.provider.name}", None)
133class Provider:
134 """Information about a signon identity provider."""
136 #: Identifier to reference the provider in code and configuration
137 name: str
138 #: User-visible description
139 label: str
140 #: Optional user-visible icon, resolved via ``{% static %}`` in templates
141 icon: str | None
142 #: Freeform options used to configure behaviour for Signon subclasses
143 options: dict[str, Any]
144 #: Class used to create a request-bound version
145 bound_class: type["BoundProvider"] = BoundProvider
147 def __init__(
148 self,
149 name: str,
150 label: str,
151 *,
152 icon: str | None = None,
153 options: dict[str, Any] | None = None,
154 ) -> None:
155 """
156 Define an external authentication provider.
158 Provider implementation subclasses can definer further keyword
159 arguments.
160 """
161 self.name = name
162 self.label = label
163 self.icon = icon
164 self.options: dict[str, Any] = options or {}
166 def bind(self, request: "django.http.HttpRequest") -> "BoundProvider":
167 """Create a BoundProvider for this session."""
168 return self.bound_class(self, request)
170 def validate_claims(self, identity: "Identity") -> list[str]: # noqa: U100
171 """
172 Check if the claims in identity are acceptable by this provider.
174 :return: a list of error messages, or an empty list if validation passed
175 """
176 return []
179class OIDCValidationError(Exception):
180 """Exception raised when OIDC authentication fails validation."""
183class BoundOIDCProvider(BoundProvider):
184 """Bound version of the OpenID Connect provider."""
186 provider: "OIDCProvider"
188 def __init__(self, *args: Any, **kwargs: Any) -> None:
189 """Construct a BoundProvider for OpenID Connect."""
190 super().__init__(*args, **kwargs)
191 from django.urls import reverse
192 from requests_oauthlib import OAuth2Session
194 self.oauth = OAuth2Session(
195 self.provider.client_id,
196 scope=self.provider.scope,
197 redirect_uri=self.request.build_absolute_uri(
198 # FIXME: the URL name is currently dependent on the way this
199 # module is deployed. This needs rethinking if we want to
200 # publish this as a reusable module
201 reverse("oidc_callback", args=(self.name,))
202 ),
203 )
204 self.tokens = None
205 self.id_token_claims: dict[str, Any] | None = None
206 self.options: Sequence[str] | None = None
208 def get_authorization_url(self, *options: str) -> str:
209 """Return an authorization URL for this provider."""
210 url, state = self.oauth.authorization_url(self.provider.url_authorize)
211 self.request.session[f"signon_state_{self.provider.name}"] = state
212 self.request.session[f"signon_state_{self.provider.name}_options"] = (
213 options
214 )
215 assert isinstance(url, str)
216 return url
218 @functools.cached_property
219 def keyset(self) -> "jwcrypto.jwk.JWKSet":
220 """Load crypto keys from settings."""
221 # TODO: better key caching can be considered in the future.
222 #
223 # This caches the server keys forever in process memory.
224 #
225 # If usage scales up, this can be changed to introduce a way to cache
226 # the keys into persistent storage, to avoid one call to url_jwks per
227 # process.
228 #
229 # Also, if the way this is deployed means that processes are very long
230 # lived, more attention should be paid to deal with key rotation.
231 #
232 # See https://openid.net/specs/openid-connect-core-1_0.html#RotateSigKeys # noqa: E501
233 import jwcrypto.jwk
235 key_response = self.oauth.get(self.provider.url_jwks)
236 key_response.raise_for_status()
237 return jwcrypto.jwk.JWKSet.from_json(key_response.text)
239 def load_tokens(self) -> None:
240 """Fetch and validate access_token and id_token from OIDC provider."""
241 import jwcrypto.jwt
243 expected_state = self.request.session.pop(
244 f"signon_state_{self.provider.name}", None
245 )
246 options = self.request.session.pop(
247 f"signon_state_{self.provider.name}_options", None
248 )
250 if expected_state is None:
251 raise OIDCValidationError(
252 "Request state mismatch: expected state not found in session"
253 )
254 if options is None:
255 raise OIDCValidationError(
256 "Request state mismatch: options not found in session"
257 )
259 remote_state = self.request.GET.get("state")
260 if remote_state is None:
261 raise OIDCValidationError(
262 "Request state mismatch: state not found in remote response"
263 )
265 if not constant_time_compare(remote_state, expected_state):
266 raise OIDCValidationError(
267 "Request state mismatch:"
268 f" remote: {remote_state!r},"
269 f" expected: {expected_state!r}"
270 )
272 tokens = self.oauth.fetch_token(
273 self.url_token,
274 authorization_response=self.request.build_absolute_uri(),
275 client_secret=self.client_secret,
276 )
278 # See https://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation # noqa: E501
280 id_token = tokens["id_token"]
282 tok = jwcrypto.jwt.JWT(key=self.keyset, jwt=id_token)
283 id_token_claims = json.loads(tok.claims)
285 if not constant_time_compare(
286 id_token_claims["iss"], self.provider.url_issuer
287 ):
288 raise OIDCValidationError(
289 f"Issuer mismatch: remote: {id_token_claims['iss']!r},"
290 f" expected: {self.provider.url_issuer!r}"
291 )
293 if not constant_time_compare(
294 id_token_claims["aud"], self.provider.client_id
295 ):
296 raise OIDCValidationError(
297 f"Audience mismatch: remote: {id_token_claims['aud']!r},"
298 f" expected: {self.provider.client_id!r}"
299 )
301 # Note: the 'exp' claim is checked by default by JWT
303 # TODO: potential extra checks to be implemented if needed:
304 #
305 # * assert tok["iat"] to be not too old
306 #
307 # OpenID Connect Core 1.0 states:
308 #
309 # > The iat Claim can be used to reject tokens that were issued too far
310 # > away from the current time, limiting the amount of time that nonces
311 # > need to be stored to prevent attacks. The acceptable range is
312 # > Client specific.
313 #
314 # * honor tok["auth_time"]
315 #
316 # OpenID Connect Core 1.0 states:
317 #
318 # > If the auth_time Claim was requested, either through a specific
319 # > request for this Claim or by using the max_age parameter, the
320 # > Client SHOULD check the auth_time Claim value and request
321 # > re-authentication if it determines too much time has elapsed since
322 # > the last End-User authentication.
324 self.tokens = tokens
325 self.id_token_claims = id_token_claims
326 self.options = options
329class OIDCProvider(Provider):
330 """OpenID Connect identity provider."""
332 bound_class = BoundOIDCProvider
334 def __init__(
335 self,
336 *args: Any,
337 client_id: str,
338 client_secret: str,
339 url_issuer: str,
340 url_authorize: str,
341 url_token: str,
342 url_userinfo: str,
343 url_jwks: str,
344 scope: str | Sequence[str],
345 restrict: Sequence[str] = ("email-verified",),
346 **kwargs: Any,
347 ) -> None:
348 """
349 Define an OpenID Connect provider.
351 :param client_id: client identifier configured in the authentication
352 server
353 :param client_secret: client_secret provided by the authentication
354 server
355 :param url_issuer: URL identifying the authentication server
356 :param url_authorize: OIDC authorization endpoint
357 :param url_token: OIDC token endpoint
358 :param url_userinfo: OIDC userinfo endpoint
359 :param url_jwks: OIDC jwks_uri to retrieve the authentication server
360 signing keys
361 :param restrict: list of restrictions for mapping remote accounts to
362 local ones
364 See https://openid.net/specs/openid-connect-core-1_0.html for details
365 """
366 super().__init__(*args, **kwargs)
367 self.client_id = client_id
368 self.client_secret = client_secret
369 self.url_issuer = url_issuer
370 self.url_authorize = url_authorize
371 self.url_token = url_token
372 self.url_userinfo = url_userinfo
373 self.url_jwks = url_jwks
374 self.scope: list[str]
375 if isinstance(scope, str):
376 self.scope = [scope]
377 else:
378 self.scope = list(scope)
379 self.restrict: Sequence[str] = tuple(restrict)
381 def validate_claims(self, identity: "Identity") -> list[str]:
382 """Check that the claims in identity match restrict."""
383 res = super().validate_claims(identity)
385 for restriction in self.restrict:
386 if restriction == "email-verified":
387 # Work only with verified emails
388 if not identity.claims.get("email_verified", False):
389 res.append(
390 f"identity {identity} does not have a verified email"
391 )
392 elif restriction.startswith("group:"):
393 group_name = restriction[6:]
394 if group_name not in identity.claims.get("groups_direct", ()):
395 res.append(
396 f"identity {identity} is not in group {group_name}"
397 )
398 else:
399 raise ImproperlyConfigured(
400 f"unsupported signon restriction: {restriction!r}"
401 )
403 return res
406class GitlabProvider(OIDCProvider):
407 """Gitlab OIDC identity provider."""
409 def __init__(self, *args: Any, url: str, **kwargs: Any) -> None:
410 """
411 Define a GitLab-based OIDC Connect provider.
413 :param url: URL to the root of the GitLab server. It will be used to
414 automatically generate all ``url_*`` arguments for OIDCProvider
415 """
416 kwargs.setdefault("scope", "openid")
417 kwargs["url_issuer"] = url
418 kwargs["url_authorize"] = f"{url}/oauth/authorize"
419 kwargs["url_token"] = f"{url}/oauth/token"
420 kwargs["url_userinfo"] = f"{url}/oauth/userinfo"
421 kwargs["url_jwks"] = f"{url}/oauth/discovery/keys"
422 super().__init__(*args, **kwargs)