1# Copyright 2013 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""Authentication with the Debian SSO service."""
12import json
13import logging
15from django.contrib import auth
16from django.contrib.auth.backends import RemoteUserBackend
17from django.contrib.auth.middleware import RemoteUserMiddleware
18from django.core.exceptions import ImproperlyConfigured, ValidationError
19from django.utils.http import urlencode
21from distro_tracker.accounts.models import User, UserEmail
22from distro_tracker.core.utils.http import get_resource_content
24logger = logging.getLogger(__name__)
27class DebianSsoUserMiddleware(RemoteUserMiddleware):
28 """
29 Middleware that initiates user authentication based on the REMOTE_USER
30 field provided by Debian's SSO system, or based on the SSL_CLIENT_S_DN_CN
31 field provided by the validation of the SSL client certificate generated
32 by sso.debian.org.
34 If the currently logged in user is a DD (as identified by having a
35 @debian.org address), they are forcefully logged out if the header
36 is no longer found or is invalid.
37 """
38 dacs_header = 'REMOTE_USER'
39 cert_header = 'SSL_CLIENT_S_DN_CN'
41 @staticmethod
42 def dacs_user_to_email(username):
43 parts = [part for part in username.split(':') if part]
44 federation, jurisdiction = parts[:2]
45 if (federation, jurisdiction) != ('DEBIANORG', 'DEBIAN'):
46 return
47 username = parts[-1]
48 if '@' in username:
49 return username # Full email already
50 return username + '@debian.org'
52 @staticmethod
53 def is_debian_member(user):
54 return any( 54 ↛ exitline 54 didn't finish the generator expression on line 54
55 email.email.endswith('@debian.org')
56 for email in user.emails.all()
57 )
59 def process_request(self, request):
60 # AuthenticationMiddleware is required so that request.user exists.
61 if not hasattr(request, 'user'): 61 ↛ 62line 61 didn't jump to line 62, because the condition on line 61 was never true
62 raise ImproperlyConfigured(
63 "The Django remote user auth middleware requires the"
64 " authentication middleware to be installed. Edit your"
65 " MIDDLEWARE setting to insert"
66 " 'django.contrib.auth.middleware.AuthenticationMiddleware'"
67 " before the DebianSsoUserMiddleware class.")
69 dacs_user = request.META.get(self.dacs_header)
70 cert_user = request.META.get(self.cert_header)
71 if cert_user is not None:
72 remote_user = cert_user
73 elif dacs_user is not None:
74 remote_user = self.dacs_user_to_email(dacs_user)
75 else:
76 # Debian developers can only authenticate via SSO/SSL certs
77 # so log them out now if they no longer have the proper META
78 # variable
79 if request.user.is_authenticated: 79 ↛ 82line 79 didn't jump to line 82, because the condition on line 79 was never false
80 if self.is_debian_member(request.user): 80 ↛ 82line 80 didn't jump to line 82, because the condition on line 80 was never false
81 auth.logout(request)
82 return
84 if request.user.is_authenticated: 84 ↛ 85line 84 didn't jump to line 85, because the condition on line 84 was never true
85 if request.user.emails.filter(email__iexact=remote_user).exists():
86 # The currently logged in user matches the one given by the
87 # headers.
88 return
90 if remote_user and remote_user.endswith('@users.alioth.debian.org'):
91 # Disallow logins with Alioth certs
92 return
94 # This will create the user if it doesn't exist
95 user = auth.authenticate(remote_user=remote_user)
96 if user:
97 # User is valid. Set request.user and persist user in the session
98 # by logging the user in.
99 request.user = user
100 auth.login(request, user)
103class DebianSsoUserBackend(RemoteUserBackend):
104 """
105 The authentication backend which authenticates the provided remote
106 user (identified by their @debian.org email) in Distro Tracker. If
107 a matching User model instance does not exist, one is
108 automatically created. In that case the DDs first and last name
109 are pulled from Debian's NM REST API.
110 """
111 def authenticate(self, request=None, remote_user=None):
112 if not remote_user:
113 return
115 email = remote_user
117 try:
118 user_email, _ = UserEmail.objects.get_or_create(
119 email__iexact=email, defaults={'email': email}
120 )
121 except ValidationError:
122 logger.error('remote_user="%s" is not a valid email.',
123 remote_user)
124 return
126 if not user_email.user:
127 kwargs = {}
128 names = self.get_user_details(remote_user)
129 if names: 129 ↛ 131line 129 didn't jump to line 131, because the condition on line 129 was never false
130 kwargs.update(names)
131 user = User.objects.create_user(main_email=email, **kwargs)
132 else:
133 user = User.objects.get(pk=user_email.user.id)
135 return user
137 @staticmethod
138 def get_uid(remote_user):
139 # Strips off the @debian.org part of the email leaving the uid
140 if remote_user.endswith('@debian.org'):
141 return remote_user[:-11]
142 return remote_user
144 def get_user_details(self, remote_user):
145 """
146 Gets the details of the given user from the Debian NM REST API.
148 :return: Dict with the keys ``first_name``, ``last_name``
149 ``None`` if the API lookup did not return anything.
150 """
151 if not remote_user.endswith('@debian.org'):
152 # We only know how to extract data for DD via NM API
153 return None
155 content = get_resource_content(
156 'https://nm.debian.org/api/people?' +
157 urlencode({'uid': self.get_uid(remote_user)}))
158 if content:
159 result = json.loads(content.decode('utf-8'))['r']
161 if not result:
162 return None
163 return {
164 'first_name': result[0]['cn'],
165 'last_name': result[0]['sn'],
166 }