Coverage for distro_tracker/signon/views.py: 92%
63 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
1# Copyright 2020-2023 Enrico Zini <enrico@debian.org>
2# Copyright 2023 The Debusine Developers
3# See the COPYRIGHT file at the top-level directory of this distribution
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
11"""
12Views needed to interact with external authentication providers.
14Login and logout hooks are implemented as mixing for the normal
15django.contrib.auth.LoginView/LogoutView.
16"""
18from collections.abc import Sequence
19from typing import Any
21from django import http
22from django.conf import settings
23from django.core.exceptions import ImproperlyConfigured, PermissionDenied
24from django.core.handlers.exception import response_for_exception
25from django.db import transaction
26from django.http import HttpRequest, HttpResponse, HttpResponseBase
27from django.shortcuts import redirect
28from django.utils import timezone
29from django.utils.decorators import method_decorator
30from django.views.decorators.cache import never_cache
31from django.views.generic import View
33from distro_tracker.signon import providers
34from distro_tracker.signon.middleware import RequestSignonProtocol
35from distro_tracker.signon.models import Identity
38class SignonLogoutMixin:
39 """Mixin to logout external signon providers in a logout view."""
41 @method_decorator(never_cache)
42 def dispatch(
43 self, request: HttpRequest, *args: Any, **kwargs: Any
44 ) -> HttpResponse:
45 """Wrap the normal logout to also log out identifiers."""
46 if signon := getattr(request, "signon", None):
47 signon.logout_identities()
48 assert isinstance(self, View)
49 return super().dispatch(request, *args, **kwargs)
52class BindIdentityView(View):
53 """
54 Bind an external identity to the current user.
56 This will initiate an external authentication, setting things up so that on
57 success the identity is bound to the current user
58 """
60 def get(
61 self, request: HttpRequest, *args: Any, **kwargs: Any
62 ) -> HttpResponseBase:
63 """Check, setup, and redirect to the external identity provider."""
64 if not request.user.is_authenticated:
65 raise PermissionDenied
67 try:
68 provider = providers.get(self.kwargs["name"])
69 except ImproperlyConfigured:
70 raise PermissionDenied
71 bound_provider = provider.bind(request)
72 url = bound_provider.get_authorization_url("bind")
73 return redirect(url)
76class OIDCAuthenticationCallbackView(View):
77 """
78 Handle a callback from an external ODIC authentication provider.
80 If successful, this activates the identity related to the provider,
81 creating it if missing
82 """
84 def _validate(
85 self, request: http.HttpRequest
86 ) -> tuple[dict[str, Any], Sequence[str]]:
87 """
88 Validate the information from the remote OIDC provider.
90 :return: the claims dict and the options passed to
91 BoundProvider.get_authorization_url
92 """
93 name = self.kwargs["name"]
94 try:
95 provider = providers.get(name)
96 except ImproperlyConfigured:
97 raise http.Http404
99 bound_provider = provider.bind(request)
100 bound_provider.load_tokens()
102 return bound_provider.id_token_claims, bound_provider.options
104 def get(
105 self, request: HttpRequest, *args: Any, **kwargs: Any
106 ) -> HttpResponseBase:
107 """
108 Notify successful authentication from from the external OIDC server.
110 This is called by the external OIDC server.
112 Validate the server information, activate the relevant Identity and
113 recompute authentication information with the new information.
114 """
115 name = self.kwargs["name"]
116 claims, options = self._validate(request)
118 try:
119 identity = Identity.objects.get(issuer=name, subject=claims["sub"])
120 except Identity.DoesNotExist:
121 identity = Identity.objects.create(
122 issuer=name,
123 subject=claims["sub"],
124 )
126 # Remove the audience claim, which we don't need to store
127 claims.pop("aud", None)
129 identity.claims = claims
130 identity.last_used = timezone.now()
131 identity.save()
133 with transaction.atomic():
134 # Handle the exception ourselves, since we want to save the
135 # identity's last_used state even if this fails.
136 try:
137 assert isinstance(request, RequestSignonProtocol)
138 request.signon.activate_identity(identity, *options)
139 except Exception as exc:
140 return response_for_exception(request, exc)
142 return redirect(getattr(
143 settings, "SIGNON_DEFAULT_REDIRECT", "dtracker-accounts-profile"
144 ))