1# Copyright 2013-2019 The Distro Tracker Developers 

2# See the COPYRIGHT file at the top-level directory of this distribution and 

3# at https://deb.li/DTAuthors 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10"""Various utilities for the distro-tracker project.""" 

11import datetime 

12import json 

13import logging 

14import os 

15 

16from django.conf import settings 

17from django.core.exceptions import ValidationError 

18from django.core.validators import EmailValidator 

19from django.db import models 

20from django.http import HttpResponse 

21from django.template.loader import render_to_string 

22 

23import gpg 

24 

25from distro_tracker import vendor 

26 

27# Re-export some functions 

28from .email_messages import extract_email_address_from_header # noqa 

29from .email_messages import get_decoded_message_payload # noqa 

30from .email_messages import message_from_bytes # noqa 

31 

32logger_input = logging.getLogger('distro_tracker.input') 

33 

34 

35def get_or_none(model, **kwargs): 

36 """ 

37 Gets a Django Model object from the database or returns ``None`` if it 

38 does not exist. 

39 """ 

40 try: 

41 return model.objects.get(**kwargs) 

42 except model.DoesNotExist: 

43 return None 

44 

45 

46def distro_tracker_render_to_string(template_name, context=None): 

47 """ 

48 A custom function to render a template to a string which injects extra 

49 distro-tracker specific information to the context, such as the name 

50 of the derivative. 

51 

52 This function is necessary since Django's 

53 :data:`TEMPLATE_CONTEXT_PROCESSORS 

54 <distro_tracker.project.settings.TEMPLATE_CONTEXT_PROCESSORS> only work when 

55 using a :class:`RequestContext <django.template.RequestContext>`, whereas 

56 this function can be called independently from any HTTP request. 

57 """ 

58 from distro_tracker.core import context_processors 

59 if context is None: 

60 context = {} 

61 extra_context = context_processors.DISTRO_TRACKER_EXTRAS 

62 context.update(extra_context) 

63 

64 return render_to_string(template_name, context) 

65 

66 

67def render_to_json_response(response): 

68 """ 

69 Helper function creating an :class:`HttpResponse <django.http.HttpResponse>` 

70 by serializing the given ``response`` object to a JSON string. 

71 

72 The resulting HTTP response has Content-Type set to application/json. 

73 

74 :param response: The object to be serialized in the response. It must be 

75 serializable by the :mod:`json` module. 

76 :rtype: :class:`HttpResponse <django.http.HttpResponse>` 

77 """ 

78 return HttpResponse( 

79 json.dumps(response), 

80 content_type='application/json' 

81 ) 

82 

83 

84class PrettyPrintList(object): 

85 """ 

86 A class which wraps the built-in :class:`list` object so that when it is 

87 converted to a string, its contents are printed using the given 

88 :attr:`delimiter`. 

89 

90 The default delimiter is a space. 

91 

92 >>> a = PrettyPrintList([1, 2, 3]) 

93 >>> print(a) 

94 1 2 3 

95 >>> print(PrettyPrintList([u'one', u'2', u'3'])) 

96 one 2 3 

97 >>> print(PrettyPrintList([1, 2, 3], delimiter=', ')) 

98 1, 2, 3 

99 >>> # Still acts as a list 

100 >>> a == [1, 2, 3] 

101 True 

102 >>> a == ['1', '2', '3'] 

103 False 

104 """ 

105 def __init__(self, the_list=None, delimiter=' '): 

106 if the_list is None: 

107 self._list = [] 

108 else: 

109 self._list = the_list 

110 self.delimiter = delimiter 

111 

112 def __getattr__(self, name, *args, **kwargs): 

113 return getattr(self._list, name) 

114 

115 def __len__(self): 

116 return len(self._list) 

117 

118 def __getitem__(self, pos): 

119 return self._list[pos] 

120 

121 def __iter__(self): 

122 return self._list.__iter__() 

123 

124 def __str__(self): 

125 return self.delimiter.join(map(str, self._list)) 

126 

127 def __repr__(self): 

128 return str(self) 

129 

130 def __eq__(self, other): 

131 if isinstance(other, PrettyPrintList): 

132 return self._list == other._list 

133 return self._list == other 

134 

135 

136class SpaceDelimitedTextField(models.TextField): 

137 """ 

138 A custom Django model field which stores a list of strings. 

139 

140 It stores the list in a :class:`TextField <django.db.models.TextField>` as a 

141 space delimited list. It is marshalled back to a :class:`PrettyPrintList` in 

142 the Python domain. 

143 """ 

144 

145 description = "Stores a space delimited list of strings" 

146 

147 def from_db_value(self, value, expression, connection): 

148 return self.to_python(value) 

149 

150 def to_python(self, value): 

151 if value is None: 151 ↛ 152line 151 didn't jump to line 152, because the condition on line 151 was never true

152 return None 

153 

154 if isinstance(value, PrettyPrintList): 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true

155 return value 

156 elif isinstance(value, list): 156 ↛ 157line 156 didn't jump to line 157, because the condition on line 156 was never true

157 return PrettyPrintList(value) 

158 

159 return PrettyPrintList(value.split()) 

160 

161 def get_prep_value(self, value, **kwargs): 

162 if value is None: 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true

163 return 

164 # Any iterable value can be converted into this type of field. 

165 return ' '.join(map(str, value)) 

166 

167 def get_db_prep_value(self, value, **kwargs): 

168 return self.get_prep_value(value) 

169 

170 def value_to_string(self, obj): 

171 value = self._get_val_from_obj(obj) 

172 return self.get_prep_value(value) 

173 

174 

175#: A map of currently available VCS systems' shorthands to their names. 

176VCS_SHORTHAND_TO_NAME = { 

177 'svn': 'Subversion', 

178 'git': 'Git', 

179 'bzr': 'Bazaar', 

180 'cvs': 'CVS', 

181 'darcs': 'Darcs', 

182 'hg': 'Mercurial', 

183 'mtn': 'Monotone', 

184} 

185 

186 

187def get_vcs_name(shorthand): 

188 """ 

189 Returns a full name for the VCS given its shorthand. 

190 

191 If the given shorthand is unknown an empty string is returned. 

192 

193 :param shorthand: The shorthand of a VCS for which a name is required. 

194 

195 :rtype: string 

196 """ 

197 return VCS_SHORTHAND_TO_NAME.get(shorthand, shorthand) 

198 

199 

200def verify_signature(content): 

201 """ 

202 The function extracts any possible signature information found in the given 

203 content. 

204 

205 Uses the ``DISTRO_TRACKER_KEYRING_DIRECTORY`` setting to access the keyring. 

206 If this setting does not exist, no signatures can be validated. 

207 

208 :type content: :class:`bytes` or :class:`string` 

209 

210 :returns: Information about the signers of the content as a list or 

211 ``None`` if there is no (valid) signature. 

212 :rtype: list of ``(name, email)`` pairs or ``None`` 

213 :type content: :class:`bytes` 

214 """ 

215 keyring_directory = getattr(settings, 'DISTRO_TRACKER_KEYRING_DIRECTORY', 

216 None) 

217 if not keyring_directory: 217 ↛ 219line 217 didn't jump to line 219, because the condition on line 217 was never true

218 # The vendor has not provided a keyring 

219 return None 

220 

221 if content is None: 

222 return None 

223 

224 if isinstance(content, str): 

225 content = content.encode('utf-8') 

226 

227 os.environ['GNUPGHOME'] = keyring_directory 

228 signers = [] 

229 

230 with gpg.Context() as ctx: 

231 

232 # Try to verify the given content 

233 signed_data = gpg.Data() 

234 signed_data.new_from_mem(content) 

235 

236 try: 

237 _, result = ctx.verify(signed_data) 

238 except gpg.errors.BadSignatures: 

239 return [] 

240 except gpg.errors.GpgError: 

241 return None 

242 

243 # Extract signer information 

244 for signature in result.signatures: 

245 key_missing = bool(signature.summary & 

246 gpg.constants.SIGSUM_KEY_MISSING) 

247 

248 if key_missing: 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true

249 continue 

250 

251 key = ctx.get_key(signature.fpr) 

252 preferred_domain = "".join( 

253 settings.DISTRO_TRACKER_FQDN.split(".", 1)[1:2]) 

254 

255 selected_uid = _select_uid_in_key(key, domain=preferred_domain) 

256 if not selected_uid: 

257 selected_uid = _select_uid_in_key(key) 

258 

259 if selected_uid: 

260 signers.append((selected_uid.name, selected_uid.email)) 

261 else: 

262 logger_input.warning( 

263 'Key %s has no UID with a valid email (name=%s email=%s)', 

264 signature.fpr, key.uids[0].name, key.uids[0].email) 

265 

266 return signers 

267 

268 

269def _select_uid_in_key(key, domain=None): 

270 """ 

271 Select the desired UID among all the available UIDs. 

272 """ 

273 selected_uid = None 

274 validate_email = EmailValidator() 

275 

276 for uid in key.uids: 

277 if uid.revoked or uid.invalid: 277 ↛ 278line 277 didn't jump to line 278, because the condition on line 277 was never true

278 continue 

279 try: 

280 validate_email(uid.email) 

281 if domain: 

282 if uid.email.endswith('@' + domain): 

283 selected_uid = uid 

284 break 

285 else: 

286 selected_uid = uid 

287 break 

288 except ValidationError: 

289 continue 

290 

291 return selected_uid 

292 

293 

294def now(tz=datetime.timezone.utc): 

295 """ 

296 Returns the current timestamp in the requested timezone (UTC by default) 

297 and can be easily mocked out for tests. 

298 """ 

299 return datetime.datetime.now(tz) 

300 

301 

302def get_developer_information_url(email): 

303 """ 

304 Returns developer's information url based on his/her email 

305 through vendor-specific function 

306 """ 

307 info_url, implemented = vendor.call( 

308 'get_developer_information_url', **{'developer_email': email, }) 

309 if implemented and info_url: 309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true

310 return info_url 

311 

312 

313def add_developer_extras(general, url_only=False): 

314 """ 

315 Receives a general dict with package data and add to it more data 

316 regarding that package's developers 

317 """ 

318 if 'maintainer' in general: 318 ↛ 328line 318 didn't jump to line 328, because the condition on line 318 was never false

319 maintainer_email = general['maintainer']['email'] 

320 url = get_developer_information_url(maintainer_email) 

321 if url: 321 ↛ 322line 321 didn't jump to line 322, because the condition on line 321 was never true

322 general['maintainer']['developer_info_url'] = url 

323 if not url_only: 

324 extra, implemented = vendor.call( 

325 'get_maintainer_extra', maintainer_email, general['name']) 

326 general['maintainer']['extra'] = extra 

327 

328 uploaders = general.get('uploaders', None) 

329 if uploaders: 329 ↛ 330line 329 didn't jump to line 330, because the condition on line 329 was never true

330 for uploader in uploaders: 

331 url = get_developer_information_url(uploader['email']) 

332 if url: 

333 uploader['developer_info_url'] = url 

334 if url_only: 

335 continue 

336 # Vendor specific extras. 

337 extra, implemented = vendor.call( 

338 'get_uploader_extra', uploader['email'], general['name']) 

339 if implemented and extra: 

340 uploader['extra'] = extra 

341 

342 return general