Coverage for distro_tracker/core/utils/packages.py: 63%
275 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
1# Copyright 2013-2018 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""Utilities for processing Debian package information."""
11import os
12import re
13import shutil
14import subprocess
15import tarfile
17import apt
19import apt_pkg
21from debian import deb822
23from django.conf import settings
24from django.urls import reverse
25from django.utils.encoding import force_bytes
27from distro_tracker.core.utils.email_messages import \
28 names_and_addresses_from_string as parse_addresses
31def package_hashdir(package_name):
32 """
33 Returns the name of the hash directory used to avoid having too
34 many entries in a single directory. It's usually the first letter
35 of the package except for lib* packages where it's the first 4
36 letters.
38 :param package_name: The package name.
39 :type package_name: str
41 :returns: Name of the hash directory.
42 :rtype: str
43 """
44 if package_name is None:
45 return None
46 if package_name.startswith('lib'):
47 return package_name[0:4]
48 else:
49 return package_name[0:1]
52def package_url(package_name):
53 """
54 Returns the URL of the page dedicated to this package name.
56 :param package_name: The package name.
57 :type package_name: str or PackageName model
59 :returns: Name of the hash directory.
60 :rtype: str
61 """
62 if package_name is None:
63 return None
64 return reverse('dtracker-package-page',
65 kwargs={'package_name': str(package_name)})
68def extract_vcs_information(stanza):
69 """
70 Extracts the VCS information from a package's Sources entry.
72 :param stanza: The ``Sources`` entry from which to extract the VCS info.
73 Maps ``Sources`` key names to values.
74 :type stanza: dict
76 :returns: VCS information regarding the package. Contains the following
77 keys: type[, browser, url, branch]
78 :rtype: dict
79 """
80 vcs = {}
81 for key, value in stanza.items():
82 key = key.lower()
83 if key == 'vcs-browser':
84 vcs['browser'] = value
85 elif key.startswith('vcs-'):
86 vcs['type'] = key[4:]
87 vcs['url'] = value
88 if vcs['type'] == 'git':
89 match = re.match(r'(?P<url>.*?)\s+-b\s*(?P<branch>\S+)', value)
90 if match:
91 vcs['url'] = match.group('url')
92 vcs['branch'] = match.group('branch')
93 return vcs
96def extract_dsc_file_name(stanza):
97 """
98 Extracts the name of the .dsc file from a package's Sources entry.
100 :param stanza: The ``Sources`` entry from which to extract the VCS info.
101 Maps ``Sources`` key names to values.
102 :type stanza: dict
104 """
105 for field in ('checksums-sha256', 'checksums-sha1', 'files'):
106 for entry in stanza.get(field, []):
107 if entry.get('name', '').endswith('.dsc'): 107 ↛ 106line 107 didn't jump to line 106, because the condition on line 107 was never false
108 return entry['name']
110 return None
113def extract_information_from_sources_entry(stanza):
114 """
115 Extracts information from a ``Sources`` file entry and returns it in the
116 form of a dictionary.
118 :param stanza: The raw entry's key-value pairs.
119 :type stanza: Case-insensitive dict
120 """
121 binaries = [
122 binary.strip()
123 for binary in stanza['binary'].split(',')
124 ]
125 entry = {
126 'version': stanza['version'],
127 'homepage': stanza.get('homepage', ''),
128 'priority': stanza.get('priority', ''),
129 'section': stanza.get('section', ''),
130 'architectures': stanza['architecture'].split(),
131 'binary_packages': binaries,
132 'maintainer': parse_addresses(stanza['maintainer'])[0],
133 'uploaders': parse_addresses(stanza.get('uploaders', '')),
134 'standards_version': stanza.get('standards-version', ''),
135 'vcs': extract_vcs_information(stanza),
136 'dsc_file_name': extract_dsc_file_name(stanza),
137 'directory': stanza.get('directory', ''),
138 }
140 return entry
143def extract_information_from_packages_entry(stanza):
144 """
145 Extracts information from a ``Packages`` file entry and returns it in the
146 form of a dictionary.
148 :param stanza: The raw entry's key-value pairs.
149 :type stanza: Case-insensitive dict
150 """
151 entry = {
152 'version': stanza['version'],
153 'short_description': stanza.get('description', '')[:300],
154 }
156 return entry
159class SourcePackageRetrieveError(Exception):
160 pass
163class AptCache(object):
164 """
165 A class for handling cached package information.
166 """
167 DEFAULT_MAX_SIZE = 1 * 1024 ** 3 # 1 GiB
168 QUILT_FORMAT = '3.0 (quilt)'
170 class AcquireProgress(apt.progress.base.AcquireProgress):
171 """
172 Instances of this class can be passed to :meth:`apt.cache.Cache.update`
173 calls.
174 It provides a way to track which files were changed and which were not
175 by an update operation.
176 """
177 def __init__(self, *args, **kwargs):
178 super(AptCache.AcquireProgress, self).__init__(*args, **kwargs)
179 self.fetched = []
180 self.hit = []
182 def done(self, item):
183 self.fetched.append(os.path.split(item.owner.destfile)[1])
185 def ims_hit(self, item):
186 self.hit.append(os.path.split(item.owner.destfile)[1])
188 def pulse(self, owner):
189 return True
191 def __init__(self):
192 # The root cache directory is a subdirectory in the
193 # DISTRO_TRACKER_CACHE_DIRECTORY
194 self.cache_root_dir = os.path.join(
195 settings.DISTRO_TRACKER_CACHE_DIRECTORY,
196 'apt-cache'
197 )
198 self.sources_list_path = os.path.join(
199 self.cache_root_dir, 'etc', 'sources.list')
200 self.conf_file_path = os.path.join(self.cache_root_dir,
201 'etc', 'apt.conf')
202 os.environ['APT_CONFIG'] = self.conf_file_path
204 self.sources = []
205 self.packages = []
206 self.cache_max_size = getattr(
207 settings, 'DISTRO_TRACKER_APT_CACHE_MAX_SIZE',
208 self.DEFAULT_MAX_SIZE)
209 #: The directory where source package files are cached
210 self.source_cache_directory = os.path.join(self.cache_root_dir,
211 'packages')
212 self._cache_size = None # Evaluate the cache size lazily
214 self.configure_cache()
216 @property
217 def cache_size(self):
218 if self._cache_size is None:
219 self._cache_size = \
220 self.get_directory_size(self.source_cache_directory)
221 return self._cache_size
223 def get_directory_size(self, directory_path):
224 """
225 Returns the total space taken by the given directory in bytes.
227 :param directory_path: The path to the directory
228 :type directory_path: string
230 :rtype: int
231 """
232 # Convert the directory path to bytes to make sure all os calls deal
233 # with bytes, not unicode objects.
234 # This way any file names with invalid utf-8 names, are correctly
235 # handled, without causing an error.
236 directory_path = force_bytes(directory_path)
237 total_size = 0
238 for dirpath, dirnames, filenames in os.walk(directory_path):
239 for file_name in filenames:
240 file_path = os.path.join(dirpath, file_name)
241 stat = os.lstat(file_path)
242 total_size += stat.st_size
244 return total_size
246 def clear_cache(self):
247 """
248 Removes all cache information. This causes the next update to retrieve
249 fresh repository files.
250 """
251 self._remove_dir(self.cache_root_dir)
252 self.configure_cache()
254 def update_sources_list(self):
255 """
256 Updates the ``sources.list`` file used to list repositories for which
257 package information should be cached.
258 """
259 from distro_tracker.core.models import Repository
261 directory = os.path.dirname(self.sources_list_path)
262 if not os.path.exists(directory):
263 os.makedirs(directory)
265 with open(self.sources_list_path, 'w') as sources_list:
266 for repository in Repository.objects.all():
267 sources_list.write(repository.sources_list_entry + '\n')
269 def update_apt_conf(self):
270 """
271 Updates the ``apt.conf`` file which gives general settings for the
272 :class:`apt.cache.Cache`.
274 In particular, this updates the list of all architectures which should
275 be considered in package updates based on architectures that the
276 repositories support.
277 """
278 from distro_tracker.core.models import Architecture
280 with open(self.conf_file_path, 'w') as conf_file:
281 conf_file.write('APT::Architectures { ')
282 for architecture in Architecture.objects.all():
283 conf_file.write('"{arch}"; '.format(arch=architecture))
284 conf_file.write('};\n')
285 conf_file.write('Acquire::CompressionTypes::Order:: "xz";\n')
286 conf_file.write('Dir "{}/";\n'.format(self.cache_root_dir))
287 conf_file.write('Dir::State "state/";\n')
288 conf_file.write('Dir::State::status "dpkg-status";\n')
289 conf_file.write('Dir::Etc "etc/";\n')
290 conf_file.write('Dir::Etc::sourcelist "{src}";\n'.format(
291 src=self.sources_list_path))
292 conf_file.write('Dir::Etc::Trusted "{src}";\n'.format(
293 src=settings.DISTRO_TRACKER_TRUSTED_GPG_MAIN_FILE))
294 conf_file.write('Dir::Etc::TrustedParts "{src}";\n'.format(
295 src=settings.DISTRO_TRACKER_TRUSTED_GPG_PARTS_DIR))
297 def configure_cache(self):
298 """
299 Configures the cache based on the most current repository information.
300 """
301 self.update_sources_list()
302 self.update_apt_conf()
303 # Clean up the configuration we might have read during "import apt"
304 for root_key in apt_pkg.config.list():
305 apt_pkg.config.clear(root_key)
306 # Load the proper configuration
307 apt_pkg.init()
308 # Ensure we have the required directories
309 for apt_dir in [apt_pkg.config.find_dir('Dir::State::lists'),
310 apt_pkg.config.find_dir('Dir::Etc::sourceparts'),
311 apt_pkg.config.find_dir('Dir::Cache::archives')]:
312 if not os.path.exists(apt_dir):
313 os.makedirs(apt_dir)
315 def _index_file_full_path(self, file_name):
316 """
317 Returns the absolute path for the given cached index file.
319 :param file_name: The name of the cached index file.
320 :type file_name: string
322 :rtype: string
323 """
324 return os.path.join(
325 apt_pkg.config.find_dir('Dir::State::lists'),
326 file_name
327 )
329 def _match_index_file_to_repository(self, sources_file):
330 """
331 Returns a two-tuple ``(class:`Repository <distro_tracker.core.
332 models.Repository>`, component)``. The class:`Repository
333 <distro_tracker.core.models.Repository>` instance which matches the
334 given cached ``Sources`` file and the ``component`` of the ``Source``.
336 :rtype: (:class:`Repository <distro_tracker.core.models.Repository>`,
337 string)
338 """
339 from distro_tracker.core.models import Repository
341 sources_list = apt_pkg.SourceList()
342 sources_list.read_main_list()
343 component_url = None
344 component = None
345 for entry in sources_list.list:
346 for index_file in entry.index_files:
347 if os.path.basename(sources_file) in index_file.describe:
348 base_url, component, _ = index_file.describe.split(None, 2)
349 base_url = base_url.rstrip('/')
350 component_url = base_url + '/' + component
351 break
353 components = component.split('/')
354 if len(components) >= 2:
355 component = components[1].strip()
357 for repository in Repository.objects.all():
358 if component_url in repository.component_urls:
359 return repository, component
361 def _get_all_cached_files(self):
362 """
363 Returns a list of all cached files.
364 """
365 lists_directory = apt_pkg.config.find_dir('Dir::State::lists')
366 try:
367 return [
368 os.path.join(lists_directory, file_name)
369 for file_name in os.listdir(lists_directory)
370 if os.path.isfile(os.path.join(lists_directory, file_name))
371 ]
372 except OSError:
373 # The directory structure does not exist => nothing is cached
374 return []
376 def get_cached_files(self, filter_function=None):
377 """
378 Returns cached files, optionally filtered by the given
379 ``filter_function``
381 :param filter_function: Takes a file name as the only parameter and
382 returns a :class:`bool` indicating whether it should be included
383 in the result.
384 :type filter_function: callable
386 :returns: A list of cached file names
387 :rtype: list
388 """
389 if filter_function is None: 389 ↛ 391line 389 didn't jump to line 391, because the condition on line 389 was never true
390 # Include all files if the filter function is not provided
391 def filter_function(x):
392 return True
394 return [
395 file_name
396 for file_name in self._get_all_cached_files()
397 if filter_function(file_name)
398 ]
400 def get_sources_files_for_repository(self, repository):
401 """
402 Returns all ``Sources`` files which are cached for the given
403 repository.
405 For instance, ``Sources`` files for different suites are cached
406 separately.
408 :param repository: The repository for which to return all cached
409 ``Sources`` files
410 :type repository: :class:`Repository
411 <distro_tracker.core.models.Repository>`
413 :rtype: ``iterable`` of strings
414 """
415 return self.get_cached_files(
416 lambda file_name: (
417 file_name.endswith('Sources') and
418 self._match_index_file_to_repository(
419 file_name)[0] == repository))
421 def get_packages_files_for_repository(self, repository):
422 """
423 Returns all ``Packages`` files which are cached for the given
424 repository.
426 For instance, ``Packages`` files for different suites are cached
427 separately.
429 :param repository: The repository for which to return all cached
430 ``Packages`` files
431 :type repository: :class:`Repository
432 <distro_tracker.core.models.Repository>`
434 :rtype: ``iterable`` of strings
435 """
436 return self.get_cached_files(
437 lambda file_name: (
438 file_name.endswith('Packages') and
439 self._match_index_file_to_repository(
440 file_name)[0] == repository))
442 def update_repositories(self, force_download=False):
443 """
444 Initiates a cache update.
446 :param force_download: If set to ``True`` causes the cache to be
447 cleared before starting the update, thus making sure all index
448 files are downloaded again.
450 :returns: A two-tuple ``(updated_sources, updated_packages)``. Each of
451 the tuple's members is a list of
452 (:class:`Repository <distro_tracker.core.models.Repository>`,
453 ``component``, ``file_name``) tuple representing the repository
454 which was updated, component, and the file which contains the fresh
455 information. The file is either a ``Sources`` or a ``Packages``
456 file respectively.
457 """
458 if force_download:
459 self.clear_cache()
461 self.configure_cache()
463 cache = apt.Cache(rootdir=self.cache_root_dir)
464 progress = AptCache.AcquireProgress()
465 cache.update(progress)
467 updated_sources = []
468 updated_packages = []
469 for fetched_file in progress.fetched:
470 if fetched_file.endswith('Sources'):
471 dest = updated_sources
472 elif fetched_file.endswith('Packages'):
473 dest = updated_packages
474 else:
475 continue
476 repository, component = self._match_index_file_to_repository(
477 fetched_file)
478 dest.append((
479 repository, component, self._index_file_full_path(fetched_file)
480 ))
482 return updated_sources, updated_packages
484 def _get_format(self, record):
485 """
486 Returns the Format field value of the given source package record.
487 """
488 record = deb822.Deb822(record)
489 return record['format']
491 def _extract_quilt_package_debian_tar(self, debian_tar_path, outdir):
492 """
493 Extracts the given tarball to the given output directory.
494 """
495 with tarfile.open(debian_tar_path) as archive_file:
497 def is_within_directory(directory, target):
498 abs_directory = os.path.abspath(directory)
499 abs_target = os.path.abspath(target)
501 prefix = os.path.commonprefix([abs_directory, abs_target])
503 return prefix == abs_directory
505 def safe_extract(tar, path="."):
506 for member in tar.getmembers():
507 member_path = os.path.join(path, member.name)
508 if not is_within_directory(path, member_path):
509 raise Exception("Attempted Path Traversal in Tar File")
511 tar.extractall(path)
513 safe_extract(archive_file, outdir)
515 def get_package_source_cache_directory(self, package_name):
516 """
517 Returns the path to the directory where a particular source package is
518 cached.
520 :param package_name: The name of the source package
521 :type package_name: string
523 :rtype: string
524 """
525 package_hash = (
526 package_name[0]
527 if not package_name.startswith('lib') else
528 package_name[:4]
529 )
530 return os.path.join(
531 self.source_cache_directory,
532 package_hash,
533 package_name)
535 def get_source_version_cache_directory(self, package_name, version):
536 """
537 Returns the path to the directory where a particular source package
538 version files are extracted.
540 :param package_name: The name of the source package
541 :type package_name: string
543 :param version: The version of the source package
544 :type version: string
546 :rtype: string
547 """
548 package_dir = self.get_package_source_cache_directory(package_name)
549 return os.path.join(package_dir, package_name + '-' + version)
551 def _remove_dir(self, directory_path):
552 """
553 Removes the given directory, including any subdirectories and files.
554 The method makes sure to correctly handle the situation where the
555 directory contains files with names which are invalid utf-8.
556 """
557 # Convert the directory path to bytes to make sure all os calls deal
558 # with bytes, not unicode objects.
559 # This way any file names with invalid utf-8 names, are correctly
560 # handled, without causing an error.
561 directory_path = force_bytes(directory_path)
562 if os.path.exists(directory_path):
563 shutil.rmtree(directory_path)
565 def clear_cached_sources(self):
566 """
567 Clears all cached package source files.
568 """
569 self._remove_dir(self.source_cache_directory)
570 self._cache_size = self.get_directory_size(self.source_cache_directory)
572 def _get_apt_source_records(self, source_name, version):
573 """
574 Returns a :class:`apt_pkg.SourceRecords` instance where the given
575 source package is the current working record.
576 """
577 apt.Cache(rootdir=self.cache_root_dir) # must be pre-created
578 source_records = apt_pkg.SourceRecords()
579 source_records.restart()
580 # Find the cached record matching this source package and version
581 found = False
582 while source_records.lookup(source_name):
583 if source_records.version == version:
584 found = True
585 break
587 if not found:
588 # Package version does not exist in the cache
589 raise SourcePackageRetrieveError(
590 "Could not retrieve package {pkg} version {ver}:"
591 " No such version found in the cache".format(
592 pkg=source_name, ver=version))
594 return source_records
596 def _extract_dpkg_source(self, retrieved_files, outdir):
597 """
598 Uses dpkg-source to extract the source package.
599 """
600 dsc_file_path = next(
601 file_path
602 for file_path in retrieved_files
603 if file_path.endswith('.dsc'))
604 dsc_file_path = os.path.abspath(dsc_file_path)
605 outdir = os.path.abspath(outdir)
606 subprocess.check_output(["dpkg-source", "-x", dsc_file_path, outdir],
607 stderr=subprocess.STDOUT)
609 def _apt_acquire_package(self,
610 source_records,
611 dest_dir_path,
612 debian_directory_only):
613 """
614 Using :class:`apt_pkg.Acquire`, retrieves the source files for the
615 source package described by the current source_records record.
617 :param source_records: The record describing the source package whose
618 files should be retrieved.
619 :type source_records: :class:`apt_pkg.Acquire`
621 :param dest_dir_path: The path to the directory where the downloaded
622 files should be saved.
623 :type dest_dir_path: string
625 :param debian_directory_only: A flag indicating whether only the debian
626 directory should be downloaded.
628 :returns: A list of absolute paths of all retrieved source files.
629 :rtype: list of strings
630 """
631 package_format = self._get_format(source_records.record)
632 # A reference to each AcquireFile instance must be kept
633 files = []
634 acquire = apt_pkg.Acquire(apt.progress.base.AcquireProgress())
635 for srcfile in source_records.files:
636 base = os.path.basename(srcfile.path)
637 dest_file_path = os.path.join(dest_dir_path, base)
638 if debian_directory_only and package_format == self.QUILT_FORMAT:
639 if srcfile.type != 'diff':
640 # Only retrieve the .debian.tar.* file for quilt packages
641 # when only the debian directory is wanted
642 continue
643 files.append(apt_pkg.AcquireFile(
644 acquire,
645 source_records.index.archive_uri(srcfile.path),
646 srcfile.hashes,
647 srcfile.size,
648 base,
649 destfile=dest_file_path
650 ))
652 acquire.run()
654 # Check if all items are correctly retrieved and build the list of file
655 # paths.
656 retrieved_paths = []
657 for item in acquire.items:
658 if item.status != item.STAT_DONE:
659 raise SourcePackageRetrieveError(
660 'Could not retrieve file {file}: {error}'.format(
661 file=item.destfile,
662 error=item.error_text.decode('utf-8')))
663 retrieved_paths.append(item.destfile)
665 return retrieved_paths
667 def retrieve_source(self, source_name, version,
668 debian_directory_only=False):
669 """
670 Retrieve the source package files for the given source package version.
672 :param source_name: The name of the source package
673 :type source_name: string
674 :param version: The version of the source package
675 :type version: string
676 :param debian_directory_only: Flag indicating if the method should try
677 to retrieve only the debian directory of the source package. This
678 is usually only possible when the package format is 3.0 (quilt).
679 :type debian_directory_only: Boolean
681 :returns: The path to the directory containing the extracted source
682 package files.
683 :rtype: string
684 """
685 if self.cache_size > self.cache_max_size:
686 # If the maximum allowed cache size has been exceeded,
687 # clear the cache
688 self.clear_cached_sources()
690 source_records = self._get_apt_source_records(source_name, version)
692 dest_dir_path = self.get_package_source_cache_directory(source_name)
693 if not os.path.exists(dest_dir_path): 693 ↛ 696line 693 didn't jump to line 696, because the condition on line 693 was never false
694 os.makedirs(dest_dir_path)
695 # Remember the size of the directory in the beginning
696 old_size = self.get_directory_size(dest_dir_path)
698 # Download the source files
699 retrieved_files = self._apt_acquire_package(
700 source_records, dest_dir_path, debian_directory_only)
702 # Extract the retrieved source files
703 outdir = self.get_source_version_cache_directory(source_name, version)
704 # dpkg-source expects this directory not to exist
705 self._remove_dir(outdir)
707 package_format = self._get_format(source_records.record)
708 if debian_directory_only and package_format == self.QUILT_FORMAT: 708 ↛ 710line 708 didn't jump to line 710, because the condition on line 708 was never true
709 # dpkg-source cannot extract an incomplete package
710 self._extract_quilt_package_debian_tar(retrieved_files[0], outdir)
711 else:
712 # Let dpkg-source handle the extraction in all other cases
713 self._extract_dpkg_source(retrieved_files, outdir)
715 # Update the current cache size based on the changes made by getting
716 # this source package.
717 new_size = self.get_directory_size(dest_dir_path)
718 size_delta = new_size - old_size
719 self._cache_size += size_delta
721 return outdir
724def html_package_list(packages):
725 """Return a HTML-formatted list of packages."""
726 packages_html = []
727 for package in packages:
728 if "/" in package:
729 (source_package_name, remain) = package.split("/", 1)
730 remain = "/%s" % (remain,)
731 else:
732 (source_package_name, remain) = (package, "")
733 html = '<a href="{}">{}</a>{}'.format(
734 package_url(source_package_name), source_package_name, remain)
735 packages_html.append(html)
737 return ', '.join(packages_html)