1# Copyright 2013 The Distro Tracker Developers 

2# See the COPYRIGHT file at the top-level directory of this distribution and 

3# at https://deb.li/DTAuthors 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10"""Authentication with the Debian SSO service.""" 

11 

12import json 

13import logging 

14 

15from django.contrib import auth 

16from django.contrib.auth.backends import RemoteUserBackend 

17from django.contrib.auth.middleware import RemoteUserMiddleware 

18from django.core.exceptions import ImproperlyConfigured, ValidationError 

19from django.utils.http import urlencode 

20 

21from distro_tracker.accounts.models import User, UserEmail 

22from distro_tracker.core.utils.http import get_resource_content 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class DebianSsoUserMiddleware(RemoteUserMiddleware): 

28 """ 

29 Middleware that initiates user authentication based on the REMOTE_USER 

30 field provided by Debian's SSO system, or based on the SSL_CLIENT_S_DN_CN 

31 field provided by the validation of the SSL client certificate generated 

32 by sso.debian.org. 

33 

34 If the currently logged in user is a DD (as identified by having a 

35 @debian.org address), they are forcefully logged out if the header 

36 is no longer found or is invalid. 

37 """ 

38 dacs_header = 'REMOTE_USER' 

39 cert_header = 'SSL_CLIENT_S_DN_CN' 

40 

41 @staticmethod 

42 def dacs_user_to_email(username): 

43 parts = [part for part in username.split(':') if part] 

44 federation, jurisdiction = parts[:2] 

45 if (federation, jurisdiction) != ('DEBIANORG', 'DEBIAN'): 

46 return 

47 username = parts[-1] 

48 if '@' in username: 

49 return username # Full email already 

50 return username + '@debian.org' 

51 

52 @staticmethod 

53 def is_debian_member(user): 

54 return any( 54 ↛ exitline 54 didn't finish the generator expression on line 54

55 email.email.endswith('@debian.org') 

56 for email in user.emails.all() 

57 ) 

58 

59 def process_request(self, request): 

60 # AuthenticationMiddleware is required so that request.user exists. 

61 if not hasattr(request, 'user'): 61 ↛ 62line 61 didn't jump to line 62, because the condition on line 61 was never true

62 raise ImproperlyConfigured( 

63 "The Django remote user auth middleware requires the" 

64 " authentication middleware to be installed. Edit your" 

65 " MIDDLEWARE setting to insert" 

66 " 'django.contrib.auth.middleware.AuthenticationMiddleware'" 

67 " before the DebianSsoUserMiddleware class.") 

68 

69 dacs_user = request.META.get(self.dacs_header) 

70 cert_user = request.META.get(self.cert_header) 

71 if cert_user is not None: 

72 remote_user = cert_user 

73 elif dacs_user is not None: 

74 remote_user = self.dacs_user_to_email(dacs_user) 

75 else: 

76 # Debian developers can only authenticate via SSO/SSL certs 

77 # so log them out now if they no longer have the proper META 

78 # variable 

79 if request.user.is_authenticated: 79 ↛ 82line 79 didn't jump to line 82, because the condition on line 79 was never false

80 if self.is_debian_member(request.user): 80 ↛ 82line 80 didn't jump to line 82, because the condition on line 80 was never false

81 auth.logout(request) 

82 return 

83 

84 if request.user.is_authenticated: 84 ↛ 85line 84 didn't jump to line 85, because the condition on line 84 was never true

85 if request.user.emails.filter(email=remote_user).exists(): 

86 # The currently logged in user matches the one given by the 

87 # headers. 

88 return 

89 

90 if remote_user and remote_user.endswith('@users.alioth.debian.org'): 

91 # Disallow logins with Alioth certs 

92 return 

93 

94 # This will create the user if it doesn't exist 

95 user = auth.authenticate(remote_user=remote_user) 

96 if user: 

97 # User is valid. Set request.user and persist user in the session 

98 # by logging the user in. 

99 request.user = user 

100 auth.login(request, user) 

101 

102 

103class DebianSsoUserBackend(RemoteUserBackend): 

104 """ 

105 The authentication backend which authenticates the provided remote 

106 user (identified by their @debian.org email) in Distro Tracker. If 

107 a matching User model instance does not exist, one is 

108 automatically created. In that case the DDs first and last name 

109 are pulled from Debian's NM REST API. 

110 """ 

111 def authenticate(self, request=None, remote_user=None): 

112 if not remote_user: 

113 return 

114 

115 email = remote_user 

116 

117 try: 

118 user_email, _ = UserEmail.objects.get_or_create(email=email) 

119 except ValidationError: 

120 logger.error('remote_user="%s" is not a valid email.', 

121 remote_user) 

122 return 

123 

124 if not user_email.user: 

125 kwargs = {} 

126 names = self.get_user_details(remote_user) 

127 if names: 127 ↛ 129line 127 didn't jump to line 129, because the condition on line 127 was never false

128 kwargs.update(names) 

129 user = User.objects.create_user(main_email=email, **kwargs) 

130 else: 

131 user = User.objects.get(pk=user_email.user.id) 

132 

133 return user 

134 

135 @staticmethod 

136 def get_uid(remote_user): 

137 # Strips off the @debian.org part of the email leaving the uid 

138 if remote_user.endswith('@debian.org'): 

139 return remote_user[:-11] 

140 return remote_user 

141 

142 def get_user_details(self, remote_user): 

143 """ 

144 Gets the details of the given user from the Debian NM REST API. 

145 

146 :return: Dict with the keys ``first_name``, ``last_name`` 

147 ``None`` if the API lookup did not return anything. 

148 """ 

149 if not remote_user.endswith('@debian.org'): 

150 # We only know how to extract data for DD via NM API 

151 return None 

152 

153 content = get_resource_content( 

154 'https://nm.debian.org/api/people?' + 

155 urlencode({'uid': self.get_uid(remote_user)})) 

156 if content: 

157 result = json.loads(content.decode('utf-8'))['r'] 

158 

159 if not result: 

160 return None 

161 return { 

162 'first_name': result[0]['cn'], 

163 'last_name': result[0]['sn'], 

164 }