Source code for distro_tracker.core.utils.packages

# Copyright 2013-2018 The Distro Tracker Developers
# See the COPYRIGHT file at the top-level directory of this distribution and
# at https://deb.li/DTAuthors
#
# This file is part of Distro Tracker. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
# including this file, may be copied, modified, propagated, or distributed
# except according to the terms contained in the LICENSE file.
"""Utilities for processing Debian package information."""
import os
import re
import shutil
import subprocess
import tarfile

import apt

import apt_pkg

from debian import deb822

from django.conf import settings
from django.urls import reverse
from django.utils.encoding import force_bytes

from distro_tracker.core.utils.email_messages import \
    name_and_address_from_string as parse_address
from distro_tracker.core.utils.email_messages import \
    names_and_addresses_from_string as parse_addresses


[docs]def package_hashdir(package_name): """ Returns the name of the hash directory used to avoid having too many entries in a single directory. It's usually the first letter of the package except for lib* packages where it's the first 4 letters. :param package_name: The package name. :type package_name: str :returns: Name of the hash directory. :rtype: str """ if package_name is None: return None if package_name.startswith('lib'): return package_name[0:4] else: return package_name[0:1]
[docs]def package_url(package_name): """ Returns the URL of the page dedicated to this package name. :param package_name: The package name. :type package_name: str or PackageName model :returns: Name of the hash directory. :rtype: str """ if package_name is None: return None return reverse('dtracker-package-page', kwargs={'package_name': str(package_name)})
[docs]def extract_vcs_information(stanza): """ Extracts the VCS information from a package's Sources entry. :param stanza: The ``Sources`` entry from which to extract the VCS info. Maps ``Sources`` key names to values. :type stanza: dict :returns: VCS information regarding the package. Contains the following keys: type[, browser, url, branch] :rtype: dict """ vcs = {} for key, value in stanza.items(): key = key.lower() if key == 'vcs-browser': vcs['browser'] = value elif key.startswith('vcs-'): vcs['type'] = key[4:] vcs['url'] = value if vcs['type'] == 'git': match = re.match(r'(?P<url>.*?)\s+-b\s*(?P<branch>\S+)', value) if match: vcs['url'] = match.group('url') vcs['branch'] = match.group('branch') return vcs
[docs]def extract_dsc_file_name(stanza): """ Extracts the name of the .dsc file from a package's Sources entry. :param stanza: The ``Sources`` entry from which to extract the VCS info. Maps ``Sources`` key names to values. :type stanza: dict """ for field in ('checksums-sha256', 'checksums-sha1', 'files'): for entry in stanza.get(field, []): if entry.get('name', '').endswith('.dsc'): return entry['name'] return None
[docs]def extract_information_from_sources_entry(stanza): """ Extracts information from a ``Sources`` file entry and returns it in the form of a dictionary. :param stanza: The raw entry's key-value pairs. :type stanza: Case-insensitive dict """ binaries = [ binary.strip() for binary in stanza['binary'].split(',') ] entry = { 'version': stanza['version'], 'homepage': stanza.get('homepage', ''), 'priority': stanza.get('priority', ''), 'section': stanza.get('section', ''), 'architectures': stanza['architecture'].split(), 'binary_packages': binaries, 'maintainer': parse_address(stanza['maintainer']), 'uploaders': parse_addresses(stanza.get('uploaders', '')), 'standards_version': stanza.get('standards-version', ''), 'vcs': extract_vcs_information(stanza), 'dsc_file_name': extract_dsc_file_name(stanza), 'directory': stanza.get('directory', ''), } return entry
[docs]def extract_information_from_packages_entry(stanza): """ Extracts information from a ``Packages`` file entry and returns it in the form of a dictionary. :param stanza: The raw entry's key-value pairs. :type stanza: Case-insensitive dict """ entry = { 'version': stanza['version'], 'short_description': stanza.get('description', '')[:300], } return entry
[docs]class SourcePackageRetrieveError(Exception): pass
[docs]class AptCache(object): """ A class for handling cached package information. """ DEFAULT_MAX_SIZE = 1 * 1024 ** 3 # 1 GiB QUILT_FORMAT = '3.0 (quilt)'
[docs] class AcquireProgress(apt.progress.base.AcquireProgress): """ Instances of this class can be passed to :meth:`apt.cache.Cache.update` calls. It provides a way to track which files were changed and which were not by an update operation. """ def __init__(self, *args, **kwargs): super(AptCache.AcquireProgress, self).__init__(*args, **kwargs) self.fetched = [] self.hit = []
[docs] def done(self, item): self.fetched.append(os.path.split(item.owner.destfile)[1])
[docs] def ims_hit(self, item): self.hit.append(os.path.split(item.owner.destfile)[1])
[docs] def pulse(self, owner): return True
def __init__(self): # The root cache directory is a subdirectory in the # DISTRO_TRACKER_CACHE_DIRECTORY self.cache_root_dir = os.path.join( settings.DISTRO_TRACKER_CACHE_DIRECTORY, 'apt-cache' ) self.sources_list_path = os.path.join( self.cache_root_dir, 'etc', 'sources.list') self.conf_file_path = os.path.join(self.cache_root_dir, 'etc', 'apt.conf') os.environ['APT_CONFIG'] = self.conf_file_path self.sources = [] self.packages = [] self.cache_max_size = getattr( settings, 'DISTRO_TRACKER_APT_CACHE_MAX_SIZE', self.DEFAULT_MAX_SIZE) #: The directory where source package files are cached self.source_cache_directory = os.path.join(self.cache_root_dir, 'packages') self._cache_size = None # Evaluate the cache size lazily self.configure_cache() @property def cache_size(self): if self._cache_size is None: self._cache_size = \ self.get_directory_size(self.source_cache_directory) return self._cache_size
[docs] def get_directory_size(self, directory_path): """ Returns the total space taken by the given directory in bytes. :param directory_path: The path to the directory :type directory_path: string :rtype: int """ # Convert the directory path to bytes to make sure all os calls deal # with bytes, not unicode objects. # This way any file names with invalid utf-8 names, are correctly # handled, without causing an error. directory_path = force_bytes(directory_path) total_size = 0 for dirpath, dirnames, filenames in os.walk(directory_path): for file_name in filenames: file_path = os.path.join(dirpath, file_name) stat = os.lstat(file_path) total_size += stat.st_size return total_size
[docs] def clear_cache(self): """ Removes all cache information. This causes the next update to retrieve fresh repository files. """ self._remove_dir(self.cache_root_dir) self.configure_cache()
[docs] def update_sources_list(self): """ Updates the ``sources.list`` file used to list repositories for which package information should be cached. """ from distro_tracker.core.models import Repository directory = os.path.dirname(self.sources_list_path) if not os.path.exists(directory): os.makedirs(directory) with open(self.sources_list_path, 'w') as sources_list: for repository in Repository.objects.all(): sources_list.write(repository.sources_list_entry + '\n')
[docs] def update_apt_conf(self): """ Updates the ``apt.conf`` file which gives general settings for the :class:`apt.cache.Cache`. In particular, this updates the list of all architectures which should be considered in package updates based on architectures that the repositories support. """ from distro_tracker.core.models import Architecture with open(self.conf_file_path, 'w') as conf_file: conf_file.write('APT::Architectures { ') for architecture in Architecture.objects.all(): conf_file.write('"{arch}"; '.format(arch=architecture)) conf_file.write('};\n') conf_file.write('Acquire::CompressionTypes::Order:: "xz";\n') conf_file.write('Dir "{}/";\n'.format(self.cache_root_dir)) conf_file.write('Dir::State "state/";\n') conf_file.write('Dir::State::status "dpkg-status";\n') conf_file.write('Dir::Etc "etc/";\n') conf_file.write('Dir::Etc::sourcelist "{src}";\n'.format( src=self.sources_list_path)) conf_file.write('Dir::Etc::Trusted "{src}";\n'.format( src=settings.DISTRO_TRACKER_TRUSTED_GPG_MAIN_FILE)) conf_file.write('Dir::Etc::TrustedParts "{src}";\n'.format( src=settings.DISTRO_TRACKER_TRUSTED_GPG_PARTS_DIR))
[docs] def configure_cache(self): """ Configures the cache based on the most current repository information. """ self.update_sources_list() self.update_apt_conf() # Clean up the configuration we might have read during "import apt" for root_key in apt_pkg.config.list(): apt_pkg.config.clear(root_key) # Load the proper configuration apt_pkg.init() # Ensure we have the required directories for apt_dir in [apt_pkg.config.find_dir('Dir::State::lists'), apt_pkg.config.find_dir('Dir::Etc::sourceparts'), apt_pkg.config.find_dir('Dir::Cache::archives')]: if not os.path.exists(apt_dir): os.makedirs(apt_dir)
def _index_file_full_path(self, file_name): """ Returns the absolute path for the given cached index file. :param file_name: The name of the cached index file. :type file_name: string :rtype: string """ return os.path.join( apt_pkg.config.find_dir('Dir::State::lists'), file_name ) def _match_index_file_to_repository(self, sources_file): """ Returns a two-tuple ``(class:`Repository <distro_tracker.core. models.Repository>`, component)``. The class:`Repository <distro_tracker.core.models.Repository>` instance which matches the given cached ``Sources`` file and the ``component`` of the ``Source``. :rtype: (:class:`Repository <distro_tracker.core.models.Repository>`, string) """ from distro_tracker.core.models import Repository sources_list = apt_pkg.SourceList() sources_list.read_main_list() component_url = None component = None for entry in sources_list.list: for index_file in entry.index_files: if os.path.basename(sources_file) in index_file.describe: base_url, component, _ = index_file.describe.split(None, 2) base_url = base_url.rstrip('/') component_url = base_url + '/' + component break components = component.split('/') if len(components) >= 2: component = components[1].strip() for repository in Repository.objects.all(): if component_url in repository.component_urls: return repository, component def _get_all_cached_files(self): """ Returns a list of all cached files. """ lists_directory = apt_pkg.config.find_dir('Dir::State::lists') try: return [ os.path.join(lists_directory, file_name) for file_name in os.listdir(lists_directory) if os.path.isfile(os.path.join(lists_directory, file_name)) ] except OSError: # The directory structure does not exist => nothing is cached return []
[docs] def get_cached_files(self, filter_function=None): """ Returns cached files, optionally filtered by the given ``filter_function`` :param filter_function: Takes a file name as the only parameter and returns a :class:`bool` indicating whether it should be included in the result. :type filter_function: callable :returns: A list of cached file names :rtype: list """ if filter_function is None: # Include all files if the filter function is not provided def filter_function(x): return True return [ file_name for file_name in self._get_all_cached_files() if filter_function(file_name) ]
[docs] def get_sources_files_for_repository(self, repository): """ Returns all ``Sources`` files which are cached for the given repository. For instance, ``Sources`` files for different suites are cached separately. :param repository: The repository for which to return all cached ``Sources`` files :type repository: :class:`Repository <distro_tracker.core.models.Repository>` :rtype: ``iterable`` of strings """ return self.get_cached_files( lambda file_name: ( file_name.endswith('Sources') and self._match_index_file_to_repository( file_name)[0] == repository))
[docs] def get_packages_files_for_repository(self, repository): """ Returns all ``Packages`` files which are cached for the given repository. For instance, ``Packages`` files for different suites are cached separately. :param repository: The repository for which to return all cached ``Packages`` files :type repository: :class:`Repository <distro_tracker.core.models.Repository>` :rtype: ``iterable`` of strings """ return self.get_cached_files( lambda file_name: ( file_name.endswith('Packages') and self._match_index_file_to_repository( file_name)[0] == repository))
[docs] def update_repositories(self, force_download=False): """ Initiates a cache update. :param force_download: If set to ``True`` causes the cache to be cleared before starting the update, thus making sure all index files are downloaded again. :returns: A two-tuple ``(updated_sources, updated_packages)``. Each of the tuple's members is a list of (:class:`Repository <distro_tracker.core.models.Repository>`, ``component``, ``file_name``) tuple representing the repository which was updated, component, and the file which contains the fresh information. The file is either a ``Sources`` or a ``Packages`` file respectively. """ if force_download: self.clear_cache() self.configure_cache() cache = apt.Cache(rootdir=self.cache_root_dir) progress = AptCache.AcquireProgress() cache.update(progress) updated_sources = [] updated_packages = [] for fetched_file in progress.fetched: if fetched_file.endswith('Sources'): dest = updated_sources elif fetched_file.endswith('Packages'): dest = updated_packages else: continue repository, component = self._match_index_file_to_repository( fetched_file) dest.append(( repository, component, self._index_file_full_path(fetched_file) )) return updated_sources, updated_packages
def _get_format(self, record): """ Returns the Format field value of the given source package record. """ record = deb822.Deb822(record) return record['format'] def _extract_quilt_package_debian_tar(self, debian_tar_path, outdir): """ Extracts the given tarball to the given output directory. """ with tarfile.open(debian_tar_path) as archive_file: def is_within_directory(directory, target): abs_directory = os.path.abspath(directory) abs_target = os.path.abspath(target) prefix = os.path.commonprefix([abs_directory, abs_target]) return prefix == abs_directory def safe_extract(tar, path="."): for member in tar.getmembers(): member_path = os.path.join(path, member.name) if not is_within_directory(path, member_path): raise Exception("Attempted Path Traversal in Tar File") tar.extractall(path) safe_extract(archive_file, outdir)
[docs] def get_package_source_cache_directory(self, package_name): """ Returns the path to the directory where a particular source package is cached. :param package_name: The name of the source package :type package_name: string :rtype: string """ package_hash = ( package_name[0] if not package_name.startswith('lib') else package_name[:4] ) return os.path.join( self.source_cache_directory, package_hash, package_name)
[docs] def get_source_version_cache_directory(self, package_name, version): """ Returns the path to the directory where a particular source package version files are extracted. :param package_name: The name of the source package :type package_name: string :param version: The version of the source package :type version: string :rtype: string """ package_dir = self.get_package_source_cache_directory(package_name) return os.path.join(package_dir, package_name + '-' + version)
def _remove_dir(self, directory_path): """ Removes the given directory, including any subdirectories and files. The method makes sure to correctly handle the situation where the directory contains files with names which are invalid utf-8. """ # Convert the directory path to bytes to make sure all os calls deal # with bytes, not unicode objects. # This way any file names with invalid utf-8 names, are correctly # handled, without causing an error. directory_path = force_bytes(directory_path) if os.path.exists(directory_path): shutil.rmtree(directory_path)
[docs] def clear_cached_sources(self): """ Clears all cached package source files. """ self._remove_dir(self.source_cache_directory) self._cache_size = self.get_directory_size(self.source_cache_directory)
def _get_apt_source_records(self, source_name, version): """ Returns a :class:`apt_pkg.SourceRecords` instance where the given source package is the current working record. """ apt.Cache(rootdir=self.cache_root_dir) # must be pre-created source_records = apt_pkg.SourceRecords() source_records.restart() # Find the cached record matching this source package and version found = False while source_records.lookup(source_name): if source_records.version == version: found = True break if not found: # Package version does not exist in the cache raise SourcePackageRetrieveError( "Could not retrieve package {pkg} version {ver}:" " No such version found in the cache".format( pkg=source_name, ver=version)) return source_records def _extract_dpkg_source(self, retrieved_files, outdir): """ Uses dpkg-source to extract the source package. """ dsc_file_path = next( file_path for file_path in retrieved_files if file_path.endswith('.dsc')) dsc_file_path = os.path.abspath(dsc_file_path) outdir = os.path.abspath(outdir) subprocess.check_output(["dpkg-source", "-x", dsc_file_path, outdir], stderr=subprocess.STDOUT) def _apt_acquire_package(self, source_records, dest_dir_path, debian_directory_only): """ Using :class:`apt_pkg.Acquire`, retrieves the source files for the source package described by the current source_records record. :param source_records: The record describing the source package whose files should be retrieved. :type source_records: :class:`apt_pkg.Acquire` :param dest_dir_path: The path to the directory where the downloaded files should be saved. :type dest_dir_path: string :param debian_directory_only: A flag indicating whether only the debian directory should be downloaded. :returns: A list of absolute paths of all retrieved source files. :rtype: list of strings """ package_format = self._get_format(source_records.record) # A reference to each AcquireFile instance must be kept files = [] acquire = apt_pkg.Acquire(apt.progress.base.AcquireProgress()) for srcfile in source_records.files: base = os.path.basename(srcfile.path) dest_file_path = os.path.join(dest_dir_path, base) if debian_directory_only and package_format == self.QUILT_FORMAT: if srcfile.type != 'diff': # Only retrieve the .debian.tar.* file for quilt packages # when only the debian directory is wanted continue files.append(apt_pkg.AcquireFile( acquire, source_records.index.archive_uri(srcfile.path), srcfile.hashes, srcfile.size, base, destfile=dest_file_path )) acquire.run() # Check if all items are correctly retrieved and build the list of file # paths. retrieved_paths = [] for item in acquire.items: if item.status != item.STAT_DONE: raise SourcePackageRetrieveError( 'Could not retrieve file {file}: {error}'.format( file=item.destfile, error=item.error_text.decode('utf-8'))) retrieved_paths.append(item.destfile) return retrieved_paths
[docs] def retrieve_source(self, source_name, version, debian_directory_only=False): """ Retrieve the source package files for the given source package version. :param source_name: The name of the source package :type source_name: string :param version: The version of the source package :type version: string :param debian_directory_only: Flag indicating if the method should try to retrieve only the debian directory of the source package. This is usually only possible when the package format is 3.0 (quilt). :type debian_directory_only: Boolean :returns: The path to the directory containing the extracted source package files. :rtype: string """ if self.cache_size > self.cache_max_size: # If the maximum allowed cache size has been exceeded, # clear the cache self.clear_cached_sources() source_records = self._get_apt_source_records(source_name, version) dest_dir_path = self.get_package_source_cache_directory(source_name) if not os.path.exists(dest_dir_path): os.makedirs(dest_dir_path) # Remember the size of the directory in the beginning old_size = self.get_directory_size(dest_dir_path) # Download the source files retrieved_files = self._apt_acquire_package( source_records, dest_dir_path, debian_directory_only) # Extract the retrieved source files outdir = self.get_source_version_cache_directory(source_name, version) # dpkg-source expects this directory not to exist self._remove_dir(outdir) package_format = self._get_format(source_records.record) if debian_directory_only and package_format == self.QUILT_FORMAT: # dpkg-source cannot extract an incomplete package self._extract_quilt_package_debian_tar(retrieved_files[0], outdir) else: # Let dpkg-source handle the extraction in all other cases self._extract_dpkg_source(retrieved_files, outdir) # Update the current cache size based on the changes made by getting # this source package. new_size = self.get_directory_size(dest_dir_path) size_delta = new_size - old_size self._cache_size += size_delta return outdir
[docs]def html_package_list(packages): """Return a HTML-formatted list of packages.""" packages_html = [] for package in packages: if "/" in package: (source_package_name, remain) = package.split("/", 1) remain = "/%s" % (remain,) else: (source_package_name, remain) = (package, "") html = '<a href="{}">{}</a>{}'.format( package_url(source_package_name), source_package_name, remain) packages_html.append(html) return ', '.join(packages_html)