1# -*- coding: utf-8 -*-
3# Copyright 2014-2021 The Distro Tracker Developers
4# See the COPYRIGHT file at the top-level directory of this distribution and
5# at https://deb.li/DTAuthors
6#
7# This file is part of Distro Tracker. It is subject to the license terms
8# in the LICENSE file found in the top-level directory of this
9# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
10# including this file, may be copied, modified, propagated, or distributed
11# except according to the terms contained in the LICENSE file.
13"""
14Distro Tracker test infrastructure.
15"""
17import gzip
18import hashlib
19import inspect
20import io
21import json
22import lzma
23import os
24import os.path
25import re
26import shutil
27import tempfile
29from bs4 import BeautifulSoup as soup
31import django.test
32from django.conf import settings
33from django.contrib.staticfiles.testing import StaticLiveServerTestCase
34from django.test.signals import setting_changed
36import responses
38from distro_tracker.accounts.models import UserEmail
39from distro_tracker.core.models import (
40 Architecture,
41 BinaryPackageName,
42 ContributorName,
43 PackageData,
44 PackageName,
45 Repository,
46 SourcePackage,
47 SourcePackageName,
48)
49from distro_tracker.core.utils.compression import (
50 get_compressor_factory,
51 guess_compression_method,
52)
53from distro_tracker.core.utils.packages import package_hashdir
55from django_email_accounts.models import User
58class TempDirsMixin(object):
59 """
60 Diverts all distro-tracker path settings to make them point
61 to temporary directories while testing.
62 """
64 DISTRO_TRACKER_PATH_SETTINGS = {
65 'STATIC_ROOT': 'static',
66 'MEDIA_ROOT': 'media',
67 'DISTRO_TRACKER_CACHE_DIRECTORY': 'cache',
68 'DISTRO_TRACKER_KEYRING_DIRECTORY': 'keyring',
69 'DISTRO_TRACKER_TEMPLATE_DIRECTORY': 'templates',
70 'DISTRO_TRACKER_LOG_DIRECTORY': 'logs',
71 'DISTRO_TRACKER_MAILDIR_DIRECTORY': 'maildir',
72 }
74 def _backup_settings(self, name):
75 self._settings_copy[name] = getattr(settings, name)
77 def _restore_settings(self):
78 for key, value in self._settings_copy.items():
79 setattr(settings, key, value)
80 setting_changed.send(sender=self.__class__, setting=key,
81 value=value, enter=False)
83 def __call__(self, result=None):
84 """
85 Wrapper around __call__ to perform temporary directories setup.
86 This means that user-defined Test Cases aren't required to
87 include a call to super().setUp().
88 """
89 self._settings_copy = {}
90 self.addCleanup(self._restore_settings)
91 self._backup_settings('DISTRO_TRACKER_DATA_PATH')
92 tempdir = tempfile.mkdtemp(prefix='distro-tracker-tests-')
93 self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True)
94 setattr(settings, 'DISTRO_TRACKER_DATA_PATH', tempdir)
95 for name, dirname in self.DISTRO_TRACKER_PATH_SETTINGS.items():
96 self._backup_settings(name)
97 dirname = os.path.join(tempdir, dirname)
98 setattr(settings, name, dirname)
99 os.mkdir(dirname)
100 setting_changed.send(sender=self.__class__, setting=name,
101 value=dirname, enter=True)
103 return super(TempDirsMixin, self).__call__(result)
106class TestCaseHelpersMixin(object):
107 """
108 Helpers method injected into distro_tracker's ``*TestCase`` objects.
109 """
111 def get_test_data_path(self, name):
112 """
113 Returns an absolute path name of file within the tests-data
114 subdirectory in the calling TestCase.
115 """
116 return os.path.join(os.path.dirname(inspect.getabsfile(self.__class__)),
117 'tests-data', name)
119 def add_test_template_dir(self, name='tests-templates'):
120 template_dir = self.get_test_data_path(name)
121 settings.TEMPLATES[0]['DIRS'].append(template_dir)
122 setting_changed.send(sender=self.__class__, setting='TEMPLATES',
123 value=settings.TEMPLATES, enter=True)
125 def cleanup_test_template_dir():
126 settings.TEMPLATES[0]['DIRS'].remove(template_dir)
127 setting_changed.send(sender=self.__class__, setting='TEMPLATES',
128 value=settings.TEMPLATES, enter=False)
130 self.addCleanup(cleanup_test_template_dir)
132 def get_temporary_directory(self, prefix=None, suffix=None):
133 tempdir = tempfile.mkdtemp(prefix=prefix, suffix=suffix)
134 self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True)
136 return tempdir
138 def mock_http_request(self, **kwargs):
139 responses.start()
140 self.addCleanup(responses.stop)
141 self.addCleanup(responses.reset)
143 if kwargs:
144 self.set_http_response(**kwargs)
146 @staticmethod
147 def compress(data, compression='gzip'):
148 if compression == 'gzip':
149 return gzip.compress(data)
150 elif compression == 'xz':
151 return lzma.compress(data)
152 else:
153 raise NotImplementedError(
154 'compress() does not support {} as '
155 'compression method'.format(compression))
157 def set_http_response(self, url=None, method="GET", body=None, headers=None,
158 status_code=200, json_data=None, compress_with=None):
159 # Default URL is the catch-all pattern
160 if url is None:
161 url = re.compile(".*")
163 if headers is None:
164 headers = {}
166 if compress_with:
167 if json_data is not None:
168 body = self.compress(
169 json.dumps(json_data).encode('utf-8'),
170 compress_with,
171 )
172 # Don't forward parameter
173 json_data = None
174 elif body is not None: 174 ↛ 180line 174 didn't jump to line 180, because the condition on line 174 was never false
175 if isinstance(body, str):
176 body = self.compress(body.encode("utf-8"), compress_with)
177 else:
178 body = self.compress(body, compress_with)
180 if body is None:
181 body = ""
183 responses.remove(method, url)
184 responses.add(
185 method=method,
186 url=url,
187 body=body,
188 json=json_data,
189 status=status_code,
190 headers=headers,
191 )
193 def import_key_into_keyring(self, filename):
194 """
195 Imports a key from an ascii armored file located in tests-data/keys/
196 into Distro Tracker's keyrings/.
197 """
198 import gpg
200 old = os.environ.get('GNUPGHOME', None)
201 os.environ['GNUPGHOME'] = settings.DISTRO_TRACKER_KEYRING_DIRECTORY
203 file_path = self.get_test_data_path('keys/' + filename)
204 keydata = gpg.Data()
205 keydata.new_from_file(file_path)
207 with gpg.Context() as ctx:
208 ctx.op_import(keydata)
210 if old: 210 ↛ exitline 210 didn't return from function 'import_key_into_keyring', because the condition on line 210 was never false
211 os.environ['GNUPGHOME'] = old
214class DatabaseMixin(object):
215 """
216 Database-related assertions injected into distro_tracker's ``*TestCase``
217 objects.
218 """
220 def assertDoesNotExist(self, obj):
221 with self.assertRaises(obj.__class__.DoesNotExist):
222 obj.__class__.objects.get(pk=obj.id)
224 def assertDoesExist(self, obj):
225 try:
226 self.assertIsNotNone(obj.__class__.objects.get(pk=obj.id))
227 except obj.__class__.DoesNotExist as error:
228 raise AssertionError(error)
230 def create_source_package(self, **kwargs):
231 """
232 Creates a source package and any related object requested through the
233 keyword arguments. The following arguments are supported:
234 - name
235 - version
236 - directory
237 - dsc_file_name
238 - maintainer (dict with 'name' and 'email')
239 - uploaders (list of emails)
240 - architectures (list of architectures)
241 - binary_packages (list of package names)
242 - repository (shorthand of a repository)
243 - repositories (list of repositories' shorthand)
244 - data (dict used to generate associated PackageData)
246 If the shorthand of the requested repository is 'default', then
247 its default field will be set to True.
249 :return: the created source package
250 :rtype: :class:`~distro_tracker.core.models.SourcePackage`
251 """
252 name = kwargs.get('name', 'test-package')
253 version = kwargs.get('version', '1')
255 fields = {}
256 fields['source_package_name'] = \
257 SourcePackageName.objects.get_or_create(name=name)[0]
258 fields['version'] = version
259 fields['dsc_file_name'] = kwargs.get('dsc_file_name',
260 '%s_%s.dsc' % (name, version))
261 fields['directory'] = kwargs.get(
262 'directory', 'pool/main/%s/%s' % (package_hashdir(name), name))
264 if 'maintainer' in kwargs:
265 maintainer = kwargs['maintainer']
266 maintainer_email = UserEmail.objects.get_or_create(
267 email=maintainer['email'])[0]
268 fields['maintainer'] = ContributorName.objects.get_or_create(
269 contributor_email=maintainer_email,
270 name=maintainer.get('name', ''))[0]
272 srcpkg = SourcePackage.objects.create(**fields)
274 for architecture in kwargs.get('architectures', []):
275 srcpkg.architectures.add(
276 Architecture.objects.get_or_create(name=architecture)[0])
278 for uploader in kwargs.get('uploaders', []):
279 contributor = ContributorName.objects.get_or_create(
280 contributor_email=UserEmail.objects.get_or_create(
281 email=uploader)[0])[0]
282 srcpkg.uploaders.add(contributor)
284 for binary in kwargs.get('binary_packages', []):
285 srcpkg.binary_packages.add(
286 BinaryPackageName.objects.get_or_create(name=binary)[0])
288 if 'repository' in kwargs:
289 kwargs.setdefault('repositories', [kwargs['repository']])
290 for repo_shorthand in kwargs.get('repositories', []):
291 self.add_to_repository(srcpkg, repo_shorthand)
293 if 'data' in kwargs:
294 self.add_package_data(srcpkg.source_package_name, **kwargs['data'])
296 srcpkg.save()
297 return srcpkg
299 def add_to_repository(self, srcpkg, shorthand='default'):
300 """
301 Add a source package to a repository. Creates the repository if it
302 doesn't exist.
304 If the shorthand of the requested repository is 'default', then
305 its default field will be set to True.
307 :param srcpkg: the source package to add to the repository
308 :type srcpkg: :class:`~distro_tracker.core.models.SourcePackage`
309 :param str shorthand: the shorthand of the repository
311 :return: the repository entry that has been created
312 :rtype:
313 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry`
314 """
315 repository, _ = Repository.objects.get_or_create(
316 shorthand=shorthand,
317 defaults={
318 'name': 'Test repository %s' % shorthand,
319 'uri': 'http://localhost/debian',
320 'suite': shorthand,
321 'codename': shorthand,
322 'components': ['main', 'contrib', 'non-free'],
323 'default': True if shorthand == 'default' else False,
324 }
325 )
326 return srcpkg.repository_entries.create(repository=repository,
327 component='main')
329 def remove_from_repository(self, srcpkg, shorthand='default'):
330 """
331 Remove a source package from a repository.
333 :param srcpkg: the source package to add to the repository
334 :type srcpkg: :class:`~distro_tracker.core.models.SourcePackage`
335 :param str shorthand: the shorthand of the repository
336 """
337 return srcpkg.repository_entries.filter(
338 repository__shorthand=shorthand).delete()[0]
340 def add_package_data(self, pkgname, **kwargs):
341 """
342 Creates PackageData objects associated to the package indicated
343 in pkgname. Each named parameter results in PackageData instance
344 with the `key` being the name of the parameter and the `value`
345 being the value of the named parameter.
347 :param pkgname: the name of the package to which we want to associate
348 data
349 :type pkgname: `str` or :class:`~distro_tracker.core.models.PackageName`
350 """
351 if not isinstance(pkgname, PackageName):
352 pkgname, _ = PackageName.objects.get_or_create(name=str(pkgname))
353 for key, value in kwargs.items():
354 PackageData.objects.create(package=pkgname, key=key, value=value)
356 @staticmethod
357 def create_repository(
358 codename="sid",
359 name=None,
360 shorthand=None,
361 uri="http://localhost/debian",
362 suite=None,
363 components="main contrib non-free",
364 default=False,
365 optional=True,
366 binary=False,
367 source=True,
368 architectures=None,
369 ):
370 if not name:
371 name = "Repository %s" % codename
372 if not shorthand:
373 shorthand = codename[:10]
374 if not suite:
375 suite = codename
377 repo = Repository.objects.create(
378 name=name,
379 shorthand=shorthand,
380 uri=uri,
381 public_uri=uri,
382 codename=codename,
383 suite=suite,
384 components=components,
385 default=default,
386 optional=optional,
387 binary=binary,
388 source=source,
389 )
391 if not architectures:
392 architectures = ["amd64", "i386"]
393 for archname in architectures:
394 arch, _ = Architecture.objects.get_or_create(name=archname)
395 repo.architectures.add(arch)
397 return repo
400class SimpleTestCase(TempDirsMixin, TestCaseHelpersMixin,
401 django.test.SimpleTestCase):
402 pass
405class TestCase(TempDirsMixin, TestCaseHelpersMixin, DatabaseMixin,
406 django.test.TestCase):
407 pass
410@django.test.tag('transaction')
411class TransactionTestCase(TempDirsMixin, TestCaseHelpersMixin,
412 DatabaseMixin, django.test.TransactionTestCase):
413 pass
416class LiveServerTestCase(TempDirsMixin, TestCaseHelpersMixin,
417 DatabaseMixin, StaticLiveServerTestCase):
418 pass
421class TemplateTestsMixin(object):
422 """Helper methods to tests templates"""
424 @staticmethod
425 def html_contains_link(text, link):
426 html = soup(text, 'html.parser')
427 for a_tag in html.findAll('a', {'href': True}):
428 if a_tag['href'] == link:
429 return True
430 return False
432 def assertLinkIsInResponse(self, response, link):
433 self.assertTrue(self.html_contains_link(response.content, link))
435 def assertLinkIsNotInResponse(self, response, link):
436 self.assertFalse(self.html_contains_link(response.content, link))
439class UserAuthMixin(object):
440 """
441 Helpers methods to manage user authentication.
442 One may define additional USERS before call self.setup_users()
443 in self.setUp()
444 """
445 USERS = {
446 'john': {},
447 }
449 def setup_users(self, login=False):
450 """
451 Creates users defined in self.USERS and use the 'login' parameter as
452 follows:
453 * If False: no user login
454 * If True: login with the only user defined
455 * If a particular username: login with the user who owns the username
456 """
457 self.users = {}
458 for username in self.USERS:
459 user_data = self._get_user_data(username)
460 self.users[username] = User.objects.create_user(**user_data)
461 if login: 461 ↛ exitline 461 didn't return from function 'setup_users', because the condition on line 461 was never false
462 username = None if login is True else login
463 self.login(username)
465 def login(self, username=None):
466 """
467 Login with the user that owns the 'username' or with the only available
468 user in self.users. If multiple users are available, you must specify
469 the username or you will trigger a ValueError exception.
470 """
471 if not username:
472 if len(self.users) > 1: 472 ↛ 473line 472 didn't jump to line 473, because the condition on line 472 was never true
473 raise ValueError("multiple users but username not specified")
474 username = list(self.users.keys())[0]
475 user_data = self._get_user_data(username)
476 self.client.login(
477 username=user_data['main_email'],
478 password=user_data['password'],
479 )
480 self.current_user = self.users[username]
481 return self.current_user
483 def get_user(self, username=None):
484 if not username: 484 ↛ 485line 484 didn't jump to line 485, because the condition on line 484 was never true
485 return self.current_user
486 return self.users[username]
488 def _get_user_data(self, username):
489 user_data = self.USERS[username].copy()
490 user_data.setdefault('main_email', '{}@example.com'.format(username))
491 user_data.setdefault('password', '{}password'.format(username))
492 return user_data
495class AptRepositoryMixin(object):
496 """
497 Helper method to mock an APT repository.
498 """
500 def mock_apt_repository(self, repo, **kwargs):
501 self.mock_http_request()
502 global_compression_suffixes = kwargs.pop("compression_suffixes", [""])
503 global_content = kwargs.pop("content", None)
504 # Mock Sources and Packages files
505 for base_filename in self._apt_repo_iter_metadata(repo):
506 metadata_options = kwargs.get("metadata_options", {}).get(
507 base_filename, {}
508 )
509 compression_suffixes = metadata_options.get(
510 "compression_suffixes", global_compression_suffixes
511 )
512 metadata_content = metadata_options.get("content", global_content)
513 test_content_file = metadata_options.get("test_content_file")
514 for suffix in ("", ".bz2", ".gz", ".xz"):
515 filename = base_filename + suffix
516 content = metadata_content
517 if callable(metadata_content):
518 content = metadata_content(repo, filename)
519 if suffix in compression_suffixes:
520 self.mock_apt_repository_add_metadata_file(
521 repo,
522 filename,
523 content=content,
524 test_content_file=test_content_file,
525 )
526 else:
527 url = self._apt_repo_build_url(repo, filename)
528 self.set_http_response(url, status_code=404)
529 # Mock Release/InRelease files
530 self.mock_apt_repository_update_release_file(repo, **kwargs)
532 @staticmethod
533 def _apt_repo_build_url(repo, filename):
534 return "{}/dists/{}/{}".format(repo.uri, repo.codename, filename)
536 @staticmethod
537 def _apt_repo_iter_metadata(repo):
538 for component in sorted(repo.components.split()):
539 for arch in repo.architectures.all().order_by('name'):
540 yield f"{component}/binary-{arch.name}/Packages"
541 yield f"{component}/source/Sources"
543 def _apt_repo_init_checksums(self, repo):
544 if not hasattr(self, '_apt_repo_checksums'):
545 self._apt_repo_checksums = {}
546 self._apt_repo_checksums.setdefault(repo.shorthand, {})
548 def _apt_repo_iter_checksums(self, repo):
549 return self._apt_repo_checksums[repo.shorthand].items()
551 def _apt_repo_store_checksums(self, repo, filename, checksums):
552 self._apt_repo_checksums[repo.shorthand][filename] = checksums
554 def mock_apt_repository_add_metadata_file(
555 self, repo, filename, content=None, test_content_file=None,
556 compression="auto", **kwargs,
557 ):
558 self._apt_repo_init_checksums(repo)
560 # Load test content if required
561 if test_content_file:
562 data_path = self.get_test_data_path(test_content_file)
563 with open(data_path, 'rb') as f:
564 content = f.read()
566 # Generate content if required, then compress it if required
567 if content is None:
568 content = b""
570 # Detect compression method
571 if compression == "auto": 571 ↛ 574line 571 didn't jump to line 574, because the condition on line 571 was never false
572 compression = guess_compression_method(filename)
574 if compression:
575 stream = io.BytesIO()
576 compressor = get_compressor_factory(compression)(stream, mode="wb")
577 compressor.write(content)
578 compressor.close()
579 content = stream.getvalue()
581 # Store checksums of metadata
582 checksums = {
583 "Size": len(content),
584 "MD5Sum": hashlib.md5(content).hexdigest(),
585 "SHA256": hashlib.sha256(content).hexdigest(),
586 }
587 self._apt_repo_store_checksums(repo, filename, checksums)
589 # Register the metadata in the http mock
590 url = self._apt_repo_build_url(repo, filename)
591 self.set_http_response(url, body=content)
593 def mock_apt_repository_update_release_file(
594 self,
595 repo,
596 enable_inrelease=True,
597 acquire_by_hash=True,
598 suite=None,
599 codename=None,
600 architectures=None,
601 components=None,
602 **kwargs,
603 ):
604 self._apt_repo_init_checksums(repo)
606 release_url = self._apt_repo_build_url(repo, "Release")
607 inrelease_url = self._apt_repo_build_url(repo, "InRelease")
608 if suite is None: 608 ↛ 610line 608 didn't jump to line 610, because the condition on line 608 was never false
609 suite = repo.suite or repo.codename or ""
610 if codename is None: 610 ↛ 612line 610 didn't jump to line 612, because the condition on line 610 was never false
611 codename = repo.codename or repo.suite or ""
612 if architectures is None: 612 ↛ 616line 612 didn't jump to line 616, because the condition on line 612 was never false
613 architectures = " ".join([
614 a.name for a in repo.architectures.all().order_by('name')
615 ])
616 if components is None: 616 ↛ 620line 616 didn't jump to line 620, because the condition on line 616 was never false
617 components = repo.components
619 # Build the content of the release file
620 text = """Origin: Debian
621Label: Debian
622Suite: {suite}
623Codename: {codename}
624Architectures: {architectures}
625Components: {components}
626""".format(
627 suite=suite,
628 codename=codename,
629 architectures=architectures,
630 components=components,
631 )
632 if acquire_by_hash:
633 text += "Acquire-By-Hash: yes\n"
634 for checksum in ("MD5Sum", "SHA256"):
635 text += "{}:\n".format(checksum)
636 for path, checksums in self._apt_repo_iter_checksums(repo):
637 if "/by-hash/" in path: 637 ↛ 638line 637 didn't jump to line 638, because the condition on line 637 was never true
638 continue
639 text += " {} {} {}\n".format(
640 checksums[checksum], checksums["Size"], path
641 )
643 self.set_http_response(release_url, body=text)
645 if enable_inrelease:
646 signed_text = """-----BEGIN PGP SIGNED MESSAGE-----
647Hash: SHA256
649"""
650 signed_text += text
651 signed_text += """-----BEGIN PGP SIGNATURE-----
653iQIzBAEBCAAdFiEEFukLP99l7eOqfzI8BO5yN7fUU+wFAl8/gbEACgkQBO5yN7fU
654U+y4Lw/+PDhJJaxEmZWS4dFjBSJYMTgyiEPXG6eMqDpeJNr8iIoBjcBd3bv3Gexq
6558rS0ry9bPLy9ZZxImL0E6rB2oFU8OAqoAXXmRf5yt3x0SY/1deTjMHYr5w4kH6CB
656ZwZnkm12jMyB9ds/ZAvG7+ou+qEb7bZ2+7IzhBlFuLNYO747sOaDjOM3RdV700qs
657FvmSBcysOUWCAhxQNmAk/NZ585AxeKksbvSHUMczdKIRu/XN82zrTRPQhZ51eHDZ
658mY444ytopHEA6G+3rkUagKeLGE6JnwS+amhz/A==
659=H/pA
660-----END PGP SIGNATURE-----"""
661 self.set_http_response(inrelease_url, body=signed_text)
662 else:
663 self.set_http_response(inrelease_url, status_code=404)