Coverage for distro_tracker/core/utils/__init__.py: 80%
151 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
1# Copyright 2013-2019 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""Various utilities for the distro-tracker project."""
11import datetime
12import json
13import logging
14import os
16from django.conf import settings
17from django.core.exceptions import ValidationError
18from django.core.validators import EmailValidator
19from django.db import models
20from django.http import HttpResponse
21from django.template.loader import render_to_string
23import gpg
25from distro_tracker import vendor
27# Re-export some functions
28from .email_messages import extract_email_address_from_header # noqa
29from .email_messages import get_decoded_message_payload # noqa
30from .email_messages import message_from_bytes # noqa
32logger_input = logging.getLogger('distro_tracker.input')
35def get_or_none(model, **kwargs):
36 """
37 Gets a Django Model object from the database or returns ``None`` if it
38 does not exist.
39 """
40 try:
41 return model.objects.get(**kwargs)
42 except model.DoesNotExist:
43 return None
46def distro_tracker_render_to_string(template_name, context=None):
47 """
48 A custom function to render a template to a string which injects extra
49 distro-tracker specific information to the context, such as the name
50 of the derivative.
52 This function is necessary since Django's
53 :data:`TEMPLATE_CONTEXT_PROCESSORS
54 <distro_tracker.project.settings.TEMPLATE_CONTEXT_PROCESSORS> only work when
55 using a :class:`RequestContext <django.template.RequestContext>`, whereas
56 this function can be called independently from any HTTP request.
57 """
58 from distro_tracker.core import context_processors
59 if context is None:
60 context = {}
61 extra_context = context_processors.DISTRO_TRACKER_EXTRAS
62 context.update(extra_context)
64 return render_to_string(template_name, context)
67def render_to_json_response(response):
68 """
69 Helper function creating an :class:`HttpResponse <django.http.HttpResponse>`
70 by serializing the given ``response`` object to a JSON string.
72 The resulting HTTP response has Content-Type set to application/json.
74 :param response: The object to be serialized in the response. It must be
75 serializable by the :mod:`json` module.
76 :rtype: :class:`HttpResponse <django.http.HttpResponse>`
77 """
78 return HttpResponse(
79 json.dumps(response),
80 content_type='application/json'
81 )
84class PrettyPrintList(object):
85 """
86 A class which wraps the built-in :class:`list` object so that when it is
87 converted to a string, its contents are printed using the given
88 :attr:`delimiter`.
90 The default delimiter is a space.
92 >>> a = PrettyPrintList([1, 2, 3])
93 >>> print(a)
94 1 2 3
95 >>> print(PrettyPrintList([u'one', u'2', u'3']))
96 one 2 3
97 >>> print(PrettyPrintList([1, 2, 3], delimiter=', '))
98 1, 2, 3
99 >>> # Still acts as a list
100 >>> a == [1, 2, 3]
101 True
102 >>> a == ['1', '2', '3']
103 False
104 """
105 def __init__(self, the_list=None, delimiter=' '):
106 if the_list is None:
107 self._list = []
108 else:
109 self._list = the_list
110 self.delimiter = delimiter
112 def __getattr__(self, name, *args, **kwargs):
113 return getattr(self._list, name)
115 def __len__(self):
116 return len(self._list)
118 def __getitem__(self, pos):
119 return self._list[pos]
121 def __iter__(self):
122 return self._list.__iter__()
124 def __str__(self):
125 return self.delimiter.join(map(str, self._list))
127 def __repr__(self):
128 return str(self)
130 def __eq__(self, other):
131 if isinstance(other, PrettyPrintList):
132 return self._list == other._list
133 return self._list == other
136class SpaceDelimitedTextField(models.TextField):
137 """
138 A custom Django model field which stores a list of strings.
140 It stores the list in a :class:`TextField <django.db.models.TextField>` as a
141 space delimited list. It is marshalled back to a :class:`PrettyPrintList` in
142 the Python domain.
143 """
145 description = "Stores a space delimited list of strings"
147 def from_db_value(self, value, expression, connection):
148 return self.to_python(value)
150 def to_python(self, value):
151 if value is None: 151 ↛ 152line 151 didn't jump to line 152, because the condition on line 151 was never true
152 return None
154 if isinstance(value, PrettyPrintList): 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true
155 return value
156 elif isinstance(value, list): 156 ↛ 157line 156 didn't jump to line 157, because the condition on line 156 was never true
157 return PrettyPrintList(value)
159 return PrettyPrintList(value.split())
161 def get_prep_value(self, value, **kwargs):
162 if value is None: 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true
163 return
164 # Any iterable value can be converted into this type of field.
165 return ' '.join(map(str, value))
167 def get_db_prep_value(self, value, **kwargs):
168 return self.get_prep_value(value)
170 def value_to_string(self, obj):
171 value = self._get_val_from_obj(obj)
172 return self.get_prep_value(value)
175#: A map of currently available VCS systems' shorthands to their names.
176VCS_SHORTHAND_TO_NAME = {
177 'svn': 'Subversion',
178 'git': 'Git',
179 'bzr': 'Bazaar',
180 'cvs': 'CVS',
181 'darcs': 'Darcs',
182 'hg': 'Mercurial',
183 'mtn': 'Monotone',
184}
187def get_vcs_name(shorthand):
188 """
189 Returns a full name for the VCS given its shorthand.
191 If the given shorthand is unknown an empty string is returned.
193 :param shorthand: The shorthand of a VCS for which a name is required.
195 :rtype: string
196 """
197 return VCS_SHORTHAND_TO_NAME.get(shorthand, shorthand)
200def verify_signature(content):
201 """
202 The function extracts any possible signature information found in the given
203 content.
205 Uses the ``DISTRO_TRACKER_KEYRING_DIRECTORY`` setting to access the keyring.
206 If this setting does not exist, no signatures can be validated.
208 :type content: :class:`bytes` or :class:`string`
210 :returns: Information about the signers of the content as a list or
211 ``None`` if there is no (valid) signature.
212 :rtype: list of ``(name, email)`` pairs or ``None``
213 :type content: :class:`bytes`
214 """
215 keyring_directory = getattr(settings, 'DISTRO_TRACKER_KEYRING_DIRECTORY',
216 None)
217 if not keyring_directory: 217 ↛ 219line 217 didn't jump to line 219, because the condition on line 217 was never true
218 # The vendor has not provided a keyring
219 return None
221 if content is None:
222 return None
224 if isinstance(content, str):
225 content = content.encode('utf-8')
227 os.environ['GNUPGHOME'] = keyring_directory
228 signers = []
230 with gpg.Context() as ctx:
232 # Try to verify the given content
233 signed_data = gpg.Data()
234 signed_data.new_from_mem(content)
236 try:
237 _, result = ctx.verify(signed_data)
238 except gpg.errors.BadSignatures:
239 return []
240 except gpg.errors.GpgError:
241 return None
243 # Extract signer information
244 for signature in result.signatures:
245 key_missing = bool(signature.summary &
246 gpg.constants.SIGSUM_KEY_MISSING)
248 if key_missing: 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true
249 continue
251 key = ctx.get_key(signature.fpr)
252 preferred_domain = "".join(
253 settings.DISTRO_TRACKER_FQDN.split(".", 1)[1:2])
255 selected_uid = _select_uid_in_key(key, domain=preferred_domain)
256 if not selected_uid:
257 selected_uid = _select_uid_in_key(key)
259 if selected_uid:
260 signers.append((selected_uid.name, selected_uid.email))
261 else:
262 logger_input.warning(
263 'Key %s has no UID with a valid email (name=%s email=%s)',
264 signature.fpr, key.uids[0].name, key.uids[0].email)
266 return signers
269def _select_uid_in_key(key, domain=None):
270 """
271 Select the desired UID among all the available UIDs.
272 """
273 selected_uid = None
274 validate_email = EmailValidator()
276 for uid in key.uids:
277 if uid.revoked or uid.invalid:
278 continue
279 try:
280 validate_email(uid.email)
281 if domain:
282 if uid.email.endswith('@' + domain):
283 selected_uid = uid
284 break
285 else:
286 selected_uid = uid
287 break
288 except ValidationError:
289 continue
291 return selected_uid
294def now(tz=datetime.timezone.utc):
295 """
296 Returns the current timestamp in the requested timezone (UTC by default)
297 and can be easily mocked out for tests.
298 """
299 return datetime.datetime.now(tz)
302def get_developer_information_url(email):
303 """
304 Returns developer's information url based on his/her email
305 through vendor-specific function
306 """
307 info_url, implemented = vendor.call(
308 'get_developer_information_url', **{'developer_email': email, })
309 if implemented and info_url: 309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true
310 return info_url
313def add_developer_extras(general, url_only=False):
314 """
315 Receives a general dict with package data and add to it more data
316 regarding that package's developers
317 """
318 if 'maintainer' in general: 318 ↛ 328line 318 didn't jump to line 328, because the condition on line 318 was never false
319 maintainer_email = general['maintainer']['email']
320 url = get_developer_information_url(maintainer_email)
321 if url: 321 ↛ 322line 321 didn't jump to line 322, because the condition on line 321 was never true
322 general['maintainer']['developer_info_url'] = url
323 if not url_only:
324 extra, implemented = vendor.call(
325 'get_maintainer_extra', maintainer_email, general['name'])
326 general['maintainer']['extra'] = extra
328 uploaders = general.get('uploaders', None)
329 if uploaders: 329 ↛ 330line 329 didn't jump to line 330, because the condition on line 329 was never true
330 for uploader in uploaders:
331 url = get_developer_information_url(uploader['email'])
332 if url:
333 uploader['developer_info_url'] = url
334 if url_only:
335 continue
336 # Vendor specific extras.
337 extra, implemented = vendor.call(
338 'get_uploader_extra', uploader['email'], general['name'])
339 if implemented and extra:
340 uploader['extra'] = extra
342 return general