# -*- coding: utf-8 -*-
# Copyright 2014-2021 The Distro Tracker Developers
# See the COPYRIGHT file at the top-level directory of this distribution and
# at https://deb.li/DTAuthors
#
# This file is part of Distro Tracker. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
# including this file, may be copied, modified, propagated, or distributed
# except according to the terms contained in the LICENSE file.
"""
Distro Tracker test infrastructure.
"""
import gzip
import hashlib
import inspect
import io
import json
import lzma
import os
import os.path
import re
import shutil
import tempfile
from bs4 import BeautifulSoup as soup
import django.test
from django.conf import settings
from django.contrib.staticfiles.testing import StaticLiveServerTestCase
from django.test.signals import setting_changed
import responses
from distro_tracker.accounts.models import UserEmail
from distro_tracker.core.models import (
Architecture,
BinaryPackageName,
ContributorName,
PackageData,
PackageName,
Repository,
SourcePackage,
SourcePackageName,
)
from distro_tracker.core.utils.compression import (
get_compressor_factory,
guess_compression_method,
)
from distro_tracker.core.utils.packages import package_hashdir
from django_email_accounts.models import User
[docs]class TempDirsMixin(object):
"""
Diverts all distro-tracker path settings to make them point
to temporary directories while testing.
"""
DISTRO_TRACKER_PATH_SETTINGS = {
'STATIC_ROOT': 'static',
'MEDIA_ROOT': 'media',
'DISTRO_TRACKER_CACHE_DIRECTORY': 'cache',
'DISTRO_TRACKER_KEYRING_DIRECTORY': 'keyring',
'DISTRO_TRACKER_TEMPLATE_DIRECTORY': 'templates',
'DISTRO_TRACKER_LOG_DIRECTORY': 'logs',
'DISTRO_TRACKER_MAILDIR_DIRECTORY': 'maildir',
}
def _backup_settings(self, name):
self._settings_copy[name] = getattr(settings, name)
def _restore_settings(self):
for key, value in self._settings_copy.items():
setattr(settings, key, value)
setting_changed.send(sender=self.__class__, setting=key,
value=value, enter=False)
def __call__(self, result=None):
"""
Wrapper around __call__ to perform temporary directories setup.
This means that user-defined Test Cases aren't required to
include a call to super().setUp().
"""
self._settings_copy = {}
self.addCleanup(self._restore_settings)
self._backup_settings('DISTRO_TRACKER_DATA_PATH')
tempdir = tempfile.mkdtemp(prefix='distro-tracker-tests-')
self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True)
setattr(settings, 'DISTRO_TRACKER_DATA_PATH', tempdir)
for name, dirname in self.DISTRO_TRACKER_PATH_SETTINGS.items():
self._backup_settings(name)
dirname = os.path.join(tempdir, dirname)
setattr(settings, name, dirname)
os.mkdir(dirname)
setting_changed.send(sender=self.__class__, setting=name,
value=dirname, enter=True)
return super(TempDirsMixin, self).__call__(result)
[docs]class TestCaseHelpersMixin(object):
"""
Helpers method injected into distro_tracker's ``*TestCase`` objects.
"""
[docs] def get_test_data_path(self, name):
"""
Returns an absolute path name of file within the tests-data
subdirectory in the calling TestCase.
"""
return os.path.join(os.path.dirname(inspect.getabsfile(self.__class__)),
'tests-data', name)
[docs] def add_test_template_dir(self, name='tests-templates'):
template_dir = self.get_test_data_path(name)
settings.TEMPLATES[0]['DIRS'].append(template_dir)
setting_changed.send(sender=self.__class__, setting='TEMPLATES',
value=settings.TEMPLATES, enter=True)
def cleanup_test_template_dir():
settings.TEMPLATES[0]['DIRS'].remove(template_dir)
setting_changed.send(sender=self.__class__, setting='TEMPLATES',
value=settings.TEMPLATES, enter=False)
self.addCleanup(cleanup_test_template_dir)
[docs] def get_temporary_directory(self, prefix=None, suffix=None):
tempdir = tempfile.mkdtemp(prefix=prefix, suffix=suffix)
self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True)
return tempdir
[docs] def mock_http_request(self, **kwargs):
responses.start()
self.addCleanup(responses.stop)
self.addCleanup(responses.reset)
if kwargs:
self.set_http_response(**kwargs)
[docs] @staticmethod
def compress(data, compression='gzip'):
if compression == 'gzip':
return gzip.compress(data)
elif compression == 'xz':
return lzma.compress(data)
else:
raise NotImplementedError(
'compress() does not support {} as '
'compression method'.format(compression))
[docs] def set_http_response(self, url=None, method="GET", body=None, headers=None,
status_code=200, json_data=None, compress_with=None):
# Default URL is the catch-all pattern
if url is None:
url = re.compile(".*")
if headers is None:
headers = {}
if compress_with:
if json_data is not None:
body = self.compress(
json.dumps(json_data).encode('utf-8'),
compress_with,
)
# Don't forward parameter
json_data = None
elif body is not None:
if isinstance(body, str):
body = self.compress(body.encode("utf-8"), compress_with)
else:
body = self.compress(body, compress_with)
if body is None:
body = ""
responses.remove(method, url)
responses.add(
method=method,
url=url,
body=body,
json=json_data,
status=status_code,
headers=headers,
)
[docs] def import_key_into_keyring(self, filename):
"""
Imports a key from an ascii armored file located in tests-data/keys/
into Distro Tracker's keyrings/.
"""
import gpg
old = os.environ.get('GNUPGHOME', None)
os.environ['GNUPGHOME'] = settings.DISTRO_TRACKER_KEYRING_DIRECTORY
file_path = self.get_test_data_path('keys/' + filename)
keydata = gpg.Data()
keydata.new_from_file(file_path)
with gpg.Context() as ctx:
ctx.op_import(keydata)
if old:
os.environ['GNUPGHOME'] = old
[docs]class DatabaseMixin(object):
"""
Database-related assertions injected into distro_tracker's ``*TestCase``
objects.
"""
[docs] def assertDoesNotExist(self, obj):
with self.assertRaises(obj.__class__.DoesNotExist):
obj.__class__.objects.get(pk=obj.id)
[docs] def assertDoesExist(self, obj):
try:
self.assertIsNotNone(obj.__class__.objects.get(pk=obj.id))
except obj.__class__.DoesNotExist as error:
raise AssertionError(error)
[docs] def create_source_package(self, **kwargs):
"""
Creates a source package and any related object requested through the
keyword arguments. The following arguments are supported:
- name
- version
- directory
- dsc_file_name
- maintainer (dict with 'name' and 'email')
- uploaders (list of emails)
- architectures (list of architectures)
- binary_packages (list of package names)
- repository (shorthand of a repository)
- repositories (list of repositories' shorthand)
- data (dict used to generate associated PackageData)
If the shorthand of the requested repository is 'default', then
its default field will be set to True.
:return: the created source package
:rtype: :class:`~distro_tracker.core.models.SourcePackage`
"""
name = kwargs.get('name', 'test-package')
version = kwargs.get('version', '1')
fields = {}
fields['source_package_name'] = \
SourcePackageName.objects.get_or_create(name=name)[0]
fields['version'] = version
fields['dsc_file_name'] = kwargs.get('dsc_file_name',
'%s_%s.dsc' % (name, version))
fields['directory'] = kwargs.get(
'directory', 'pool/main/%s/%s' % (package_hashdir(name), name))
if 'maintainer' in kwargs:
maintainer = kwargs['maintainer']
maintainer_email = UserEmail.objects.get_or_create(
email=maintainer['email'])[0]
fields['maintainer'] = ContributorName.objects.get_or_create(
contributor_email=maintainer_email,
name=maintainer.get('name', ''))[0]
srcpkg = SourcePackage.objects.create(**fields)
for architecture in kwargs.get('architectures', []):
srcpkg.architectures.add(
Architecture.objects.get_or_create(name=architecture)[0])
for uploader in kwargs.get('uploaders', []):
contributor = ContributorName.objects.get_or_create(
contributor_email=UserEmail.objects.get_or_create(
email=uploader)[0])[0]
srcpkg.uploaders.add(contributor)
for binary in kwargs.get('binary_packages', []):
srcpkg.binary_packages.add(
BinaryPackageName.objects.get_or_create(name=binary)[0])
if 'repository' in kwargs:
kwargs.setdefault('repositories', [kwargs['repository']])
for repo_shorthand in kwargs.get('repositories', []):
self.add_to_repository(srcpkg, repo_shorthand)
if 'data' in kwargs:
self.add_package_data(srcpkg.source_package_name, **kwargs['data'])
srcpkg.save()
return srcpkg
[docs] def add_to_repository(self, srcpkg, shorthand='default'):
"""
Add a source package to a repository. Creates the repository if it
doesn't exist.
If the shorthand of the requested repository is 'default', then
its default field will be set to True.
:param srcpkg: the source package to add to the repository
:type srcpkg: :class:`~distro_tracker.core.models.SourcePackage`
:param str shorthand: the shorthand of the repository
:return: the repository entry that has been created
:rtype:
:class:`~distro_tracker.core.models.SourcePackageRepositoryEntry`
"""
repository, _ = Repository.objects.get_or_create(
shorthand=shorthand,
defaults={
'name': 'Test repository %s' % shorthand,
'uri': 'http://localhost/debian',
'suite': shorthand,
'codename': shorthand,
'components': ['main', 'contrib', 'non-free'],
'default': True if shorthand == 'default' else False,
}
)
return srcpkg.repository_entries.create(repository=repository,
component='main')
[docs] def remove_from_repository(self, srcpkg, shorthand='default'):
"""
Remove a source package from a repository.
:param srcpkg: the source package to add to the repository
:type srcpkg: :class:`~distro_tracker.core.models.SourcePackage`
:param str shorthand: the shorthand of the repository
"""
return srcpkg.repository_entries.filter(
repository__shorthand=shorthand).delete()[0]
[docs] def add_package_data(self, pkgname, **kwargs):
"""
Creates PackageData objects associated to the package indicated
in pkgname. Each named parameter results in PackageData instance
with the `key` being the name of the parameter and the `value`
being the value of the named parameter.
:param pkgname: the name of the package to which we want to associate
data
:type pkgname: `str` or :class:`~distro_tracker.core.models.PackageName`
"""
if not isinstance(pkgname, PackageName):
pkgname, _ = PackageName.objects.get_or_create(name=str(pkgname))
for key, value in kwargs.items():
PackageData.objects.create(package=pkgname, key=key, value=value)
[docs] @staticmethod
def create_repository(
codename="sid",
name=None,
shorthand=None,
uri="http://localhost/debian",
suite=None,
components="main contrib non-free",
default=False,
optional=True,
binary=False,
source=True,
architectures=None,
):
if not name:
name = "Repository %s" % codename
if not shorthand:
shorthand = codename[:10]
if not suite:
suite = codename
repo = Repository.objects.create(
name=name,
shorthand=shorthand,
uri=uri,
public_uri=uri,
codename=codename,
suite=suite,
components=components,
default=default,
optional=optional,
binary=binary,
source=source,
)
if not architectures:
architectures = ["amd64", "i386"]
for archname in architectures:
arch, _ = Architecture.objects.get_or_create(name=archname)
repo.architectures.add(arch)
return repo
[docs]class SimpleTestCase(TempDirsMixin, TestCaseHelpersMixin,
django.test.SimpleTestCase):
pass
[docs]class TestCase(TempDirsMixin, TestCaseHelpersMixin, DatabaseMixin,
django.test.TestCase):
pass
[docs]@django.test.tag('transaction')
class TransactionTestCase(TempDirsMixin, TestCaseHelpersMixin,
DatabaseMixin, django.test.TransactionTestCase):
pass
[docs]class LiveServerTestCase(TempDirsMixin, TestCaseHelpersMixin,
DatabaseMixin, StaticLiveServerTestCase):
pass
[docs]class TemplateTestsMixin(object):
"""Helper methods to tests templates"""
[docs] @staticmethod
def html_contains_link(text, link):
html = soup(text, 'html.parser')
for a_tag in html.findAll('a', {'href': True}):
if a_tag['href'] == link:
return True
return False
[docs] def assertLinkIsInResponse(self, response, link):
self.assertTrue(self.html_contains_link(response.content, link))
[docs] def assertLinkIsNotInResponse(self, response, link):
self.assertFalse(self.html_contains_link(response.content, link))
[docs]class UserAuthMixin(object):
"""
Helpers methods to manage user authentication.
One may define additional USERS before call self.setup_users()
in self.setUp()
"""
USERS = {
'john': {},
}
[docs] def setup_users(self, login=False):
"""
Creates users defined in self.USERS and use the 'login' parameter as
follows:
* If False: no user login
* If True: login with the only user defined
* If a particular username: login with the user who owns the username
"""
self.users = {}
for username in self.USERS:
user_data = self._get_user_data(username)
self.users[username] = User.objects.create_user(**user_data)
if login:
username = None if login is True else login
self.login(username)
[docs] def login(self, username=None):
"""
Login with the user that owns the 'username' or with the only available
user in self.users. If multiple users are available, you must specify
the username or you will trigger a ValueError exception.
"""
if not username:
if len(self.users) > 1:
raise ValueError("multiple users but username not specified")
username = list(self.users.keys())[0]
user_data = self._get_user_data(username)
self.client.login(
username=user_data['main_email'],
password=user_data['password'],
)
self.current_user = self.users[username]
return self.current_user
[docs] def get_user(self, username=None):
if not username:
return self.current_user
return self.users[username]
def _get_user_data(self, username):
user_data = self.USERS[username].copy()
user_data.setdefault('main_email', '{}@example.com'.format(username))
user_data.setdefault('password', '{}password'.format(username))
return user_data
[docs]class AptRepositoryMixin(object):
"""
Helper method to mock an APT repository.
"""
[docs] def mock_apt_repository(self, repo, **kwargs):
self.mock_http_request()
global_compression_suffixes = kwargs.pop("compression_suffixes", [""])
global_content = kwargs.pop("content", None)
# Mock Sources and Packages files
for base_filename in self._apt_repo_iter_metadata(repo):
metadata_options = kwargs.get("metadata_options", {}).get(
base_filename, {}
)
compression_suffixes = metadata_options.get(
"compression_suffixes", global_compression_suffixes
)
metadata_content = metadata_options.get("content", global_content)
test_content_file = metadata_options.get("test_content_file")
for suffix in ("", ".bz2", ".gz", ".xz"):
filename = base_filename + suffix
content = metadata_content
if callable(metadata_content):
content = metadata_content(repo, filename)
if suffix in compression_suffixes:
self.mock_apt_repository_add_metadata_file(
repo,
filename,
content=content,
test_content_file=test_content_file,
)
else:
url = self._apt_repo_build_url(repo, filename)
self.set_http_response(url, status_code=404)
# Mock Release/InRelease files
self.mock_apt_repository_update_release_file(repo, **kwargs)
@staticmethod
def _apt_repo_build_url(repo, filename):
return "{}/dists/{}/{}".format(repo.uri, repo.codename, filename)
@staticmethod
def _apt_repo_iter_metadata(repo):
for component in sorted(repo.components.split()):
for arch in repo.architectures.all().order_by('name'):
yield f"{component}/binary-{arch.name}/Packages"
yield f"{component}/source/Sources"
def _apt_repo_init_checksums(self, repo):
if not hasattr(self, '_apt_repo_checksums'):
self._apt_repo_checksums = {}
self._apt_repo_checksums.setdefault(repo.shorthand, {})
def _apt_repo_iter_checksums(self, repo):
return self._apt_repo_checksums[repo.shorthand].items()
def _apt_repo_store_checksums(self, repo, filename, checksums):
self._apt_repo_checksums[repo.shorthand][filename] = checksums
[docs] def mock_apt_repository_update_release_file(
self,
repo,
enable_inrelease=True,
acquire_by_hash=True,
suite=None,
codename=None,
architectures=None,
components=None,
**kwargs,
):
self._apt_repo_init_checksums(repo)
release_url = self._apt_repo_build_url(repo, "Release")
inrelease_url = self._apt_repo_build_url(repo, "InRelease")
if suite is None:
suite = repo.suite or repo.codename or ""
if codename is None:
codename = repo.codename or repo.suite or ""
if architectures is None:
architectures = " ".join([
a.name for a in repo.architectures.all().order_by('name')
])
if components is None:
components = repo.components
# Build the content of the release file
text = """Origin: Debian
Label: Debian
Suite: {suite}
Codename: {codename}
Architectures: {architectures}
Components: {components}
""".format(
suite=suite,
codename=codename,
architectures=architectures,
components=components,
)
if acquire_by_hash:
text += "Acquire-By-Hash: yes\n"
for checksum in ("MD5Sum", "SHA256"):
text += "{}:\n".format(checksum)
for path, checksums in self._apt_repo_iter_checksums(repo):
if "/by-hash/" in path:
continue
text += " {} {} {}\n".format(
checksums[checksum], checksums["Size"], path
)
self.set_http_response(release_url, body=text)
if enable_inrelease:
signed_text = """-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256
"""
signed_text += text
signed_text += """-----BEGIN PGP SIGNATURE-----
iQIzBAEBCAAdFiEEFukLP99l7eOqfzI8BO5yN7fUU+wFAl8/gbEACgkQBO5yN7fU
U+y4Lw/+PDhJJaxEmZWS4dFjBSJYMTgyiEPXG6eMqDpeJNr8iIoBjcBd3bv3Gexq
8rS0ry9bPLy9ZZxImL0E6rB2oFU8OAqoAXXmRf5yt3x0SY/1deTjMHYr5w4kH6CB
ZwZnkm12jMyB9ds/ZAvG7+ou+qEb7bZ2+7IzhBlFuLNYO747sOaDjOM3RdV700qs
FvmSBcysOUWCAhxQNmAk/NZ585AxeKksbvSHUMczdKIRu/XN82zrTRPQhZ51eHDZ
mY444ytopHEA6G+3rkUagKeLGE6JnwS+amhz/A==
=H/pA
-----END PGP SIGNATURE-----"""
self.set_http_response(inrelease_url, body=signed_text)
else:
self.set_http_response(inrelease_url, status_code=404)