Coverage for distro_tracker/core/utils/packages.py: 64%
281 statements
« prev ^ index » next coverage.py v6.5.0, created at 2026-04-24 08:28 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2026-04-24 08:28 +0000
1# Copyright 2013-2018 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""Utilities for processing Debian package information."""
11import os
12import re
13import shutil
14import subprocess
15import tarfile
17import apt
19import apt_pkg
21from debian import deb822
23from django.conf import settings
24from django.core.validators import URLValidator
25from django.core.exceptions import ValidationError
26from django.urls import reverse
27from django.utils.encoding import force_bytes
29from distro_tracker.core.utils.email_messages import \
30 names_and_addresses_from_string as parse_addresses
33def package_hashdir(package_name):
34 """
35 Returns the name of the hash directory used to avoid having too
36 many entries in a single directory. It's usually the first letter
37 of the package except for lib* packages where it's the first 4
38 letters.
40 :param package_name: The package name.
41 :type package_name: str
43 :returns: Name of the hash directory.
44 :rtype: str
45 """
46 if package_name is None:
47 return None
48 if package_name.startswith('lib'):
49 return package_name[0:4]
50 else:
51 return package_name[0:1]
54def package_url(package_name):
55 """
56 Returns the URL of the page dedicated to this package name.
58 :param package_name: The package name.
59 :type package_name: str or PackageName model
61 :returns: Name of the hash directory.
62 :rtype: str
63 """
64 if package_name is None:
65 return None
66 return reverse('dtracker-package-page',
67 kwargs={'package_name': str(package_name)})
70def extract_vcs_information(stanza):
71 """
72 Extracts the VCS information from a package's Sources entry.
74 :param stanza: The ``Sources`` entry from which to extract the VCS info.
75 Maps ``Sources`` key names to values.
76 :type stanza: dict
78 :returns: VCS information regarding the package. Contains the following
79 keys: type[, browser, url, branch]
80 :rtype: dict
81 """
82 vcs = {}
83 for key, value in stanza.items():
84 key = key.lower()
85 if key == 'vcs-browser':
86 vcs['browser'] = value
87 elif key.startswith('vcs-'):
88 vcs['type'] = key[4:]
89 vcs['url'] = value
90 if vcs['type'] == 'git':
91 match = re.match(r'(?P<url>.*?)\s+-b\s*(?P<branch>\S+)', value)
92 if match:
93 vcs['url'] = match.group('url')
94 vcs['branch'] = match.group('branch')
95 return vcs
98def extract_dsc_file_name(stanza):
99 """
100 Extracts the name of the .dsc file from a package's Sources entry.
102 :param stanza: The ``Sources`` entry from which to extract the VCS info.
103 Maps ``Sources`` key names to values.
104 :type stanza: dict
106 """
107 for field in ('checksums-sha256', 'checksums-sha1', 'files'):
108 for entry in stanza.get(field, []):
109 if entry.get('name', '').endswith('.dsc'): 109 ↛ 108line 109 didn't jump to line 108, because the condition on line 109 was never false
110 return entry['name']
112 return None
115def extract_information_from_sources_entry(stanza):
116 """
117 Extracts information from a ``Sources`` file entry and returns it in the
118 form of a dictionary.
120 :param stanza: The raw entry's key-value pairs.
121 :type stanza: Case-insensitive dict
122 """
123 binaries = [
124 binary.strip()
125 for binary in stanza['binary'].split(',')
126 ]
127 entry = {
128 'version': stanza['version'],
129 'homepage': stanza.get('homepage', ''),
130 'priority': stanza.get('priority', ''),
131 'section': stanza.get('section', ''),
132 'architectures': stanza['architecture'].split(),
133 'binary_packages': binaries,
134 'maintainer': parse_addresses(stanza['maintainer'])[0],
135 'uploaders': parse_addresses(stanza.get('uploaders', '')),
136 'standards_version': stanza.get('standards-version', ''),
137 'vcs': extract_vcs_information(stanza),
138 'dsc_file_name': extract_dsc_file_name(stanza),
139 'directory': stanza.get('directory', ''),
140 }
142 # Validate incoming data a little bit
143 try:
144 URLValidator()(entry['homepage'])
145 except ValidationError:
146 entry['homepage'] = ''
148 return entry
151def extract_information_from_packages_entry(stanza):
152 """
153 Extracts information from a ``Packages`` file entry and returns it in the
154 form of a dictionary.
156 :param stanza: The raw entry's key-value pairs.
157 :type stanza: Case-insensitive dict
158 """
159 entry = {
160 'version': stanza['version'],
161 'short_description': stanza.get('description', '')[:300],
162 }
164 return entry
167class SourcePackageRetrieveError(Exception):
168 pass
171class AptCache(object):
172 """
173 A class for handling cached package information.
174 """
175 DEFAULT_MAX_SIZE = 1 * 1024 ** 3 # 1 GiB
176 QUILT_FORMAT = '3.0 (quilt)'
178 class AcquireProgress(apt.progress.base.AcquireProgress):
179 """
180 Instances of this class can be passed to :meth:`apt.cache.Cache.update`
181 calls.
182 It provides a way to track which files were changed and which were not
183 by an update operation.
184 """
185 def __init__(self, *args, **kwargs):
186 super(AptCache.AcquireProgress, self).__init__(*args, **kwargs)
187 self.fetched = []
188 self.hit = []
190 def done(self, item):
191 self.fetched.append(os.path.split(item.owner.destfile)[1])
193 def ims_hit(self, item):
194 self.hit.append(os.path.split(item.owner.destfile)[1])
196 def pulse(self, owner):
197 return True
199 def __init__(self):
200 # The root cache directory is a subdirectory in the
201 # DISTRO_TRACKER_CACHE_DIRECTORY
202 self.cache_root_dir = os.path.join(
203 settings.DISTRO_TRACKER_CACHE_DIRECTORY,
204 'apt-cache'
205 )
206 self.sources_list_path = os.path.join(
207 self.cache_root_dir, 'etc', 'sources.list')
208 self.conf_file_path = os.path.join(self.cache_root_dir,
209 'etc', 'apt.conf')
210 os.environ['APT_CONFIG'] = self.conf_file_path
212 self.sources = []
213 self.packages = []
214 self.cache_max_size = getattr(
215 settings, 'DISTRO_TRACKER_APT_CACHE_MAX_SIZE',
216 self.DEFAULT_MAX_SIZE)
217 #: The directory where source package files are cached
218 self.source_cache_directory = os.path.join(self.cache_root_dir,
219 'packages')
220 self._cache_size = None # Evaluate the cache size lazily
222 self.configure_cache()
224 @property
225 def cache_size(self):
226 if self._cache_size is None:
227 self._cache_size = \
228 self.get_directory_size(self.source_cache_directory)
229 return self._cache_size
231 def get_directory_size(self, directory_path):
232 """
233 Returns the total space taken by the given directory in bytes.
235 :param directory_path: The path to the directory
236 :type directory_path: string
238 :rtype: int
239 """
240 # Convert the directory path to bytes to make sure all os calls deal
241 # with bytes, not unicode objects.
242 # This way any file names with invalid utf-8 names, are correctly
243 # handled, without causing an error.
244 directory_path = force_bytes(directory_path)
245 total_size = 0
246 for dirpath, dirnames, filenames in os.walk(directory_path):
247 for file_name in filenames:
248 file_path = os.path.join(dirpath, file_name)
249 stat = os.lstat(file_path)
250 total_size += stat.st_size
252 return total_size
254 def clear_cache(self):
255 """
256 Removes all cache information. This causes the next update to retrieve
257 fresh repository files.
258 """
259 self._remove_dir(self.cache_root_dir)
260 self.configure_cache()
262 def update_sources_list(self):
263 """
264 Updates the ``sources.list`` file used to list repositories for which
265 package information should be cached.
266 """
267 from distro_tracker.core.models import Repository
269 directory = os.path.dirname(self.sources_list_path)
270 if not os.path.exists(directory):
271 os.makedirs(directory)
273 with open(self.sources_list_path, 'w') as sources_list:
274 for repository in Repository.objects.all():
275 sources_list.write(repository.sources_list_entry + '\n')
277 def update_apt_conf(self):
278 """
279 Updates the ``apt.conf`` file which gives general settings for the
280 :class:`apt.cache.Cache`.
282 In particular, this updates the list of all architectures which should
283 be considered in package updates based on architectures that the
284 repositories support.
285 """
286 from distro_tracker.core.models import Architecture
288 with open(self.conf_file_path, 'w') as conf_file:
289 conf_file.write('APT::Architectures { ')
290 for architecture in Architecture.objects.all():
291 conf_file.write('"{arch}"; '.format(arch=architecture))
292 conf_file.write('};\n')
293 conf_file.write('Acquire::CompressionTypes::Order:: "xz";\n')
294 conf_file.write('Dir "{}/";\n'.format(self.cache_root_dir))
295 conf_file.write('Dir::State "state/";\n')
296 conf_file.write('Dir::State::status "dpkg-status";\n')
297 conf_file.write('Dir::Etc "etc/";\n')
298 conf_file.write('Dir::Etc::sourcelist "{src}";\n'.format(
299 src=self.sources_list_path))
300 conf_file.write('Dir::Etc::Trusted "{src}";\n'.format(
301 src=settings.DISTRO_TRACKER_TRUSTED_GPG_MAIN_FILE))
302 conf_file.write('Dir::Etc::TrustedParts "{src}";\n'.format(
303 src=settings.DISTRO_TRACKER_TRUSTED_GPG_PARTS_DIR))
305 def configure_cache(self):
306 """
307 Configures the cache based on the most current repository information.
308 """
309 self.update_sources_list()
310 self.update_apt_conf()
311 # Clean up the configuration we might have read during "import apt"
312 for root_key in apt_pkg.config.list():
313 apt_pkg.config.clear(root_key)
314 # Load the proper configuration
315 apt_pkg.init()
316 # Ensure we have the required directories
317 for apt_dir in [apt_pkg.config.find_dir('Dir::State::lists'),
318 apt_pkg.config.find_dir('Dir::Etc::sourceparts'),
319 apt_pkg.config.find_dir('Dir::Cache::archives')]:
320 if not os.path.exists(apt_dir):
321 os.makedirs(apt_dir)
323 def _index_file_full_path(self, file_name):
324 """
325 Returns the absolute path for the given cached index file.
327 :param file_name: The name of the cached index file.
328 :type file_name: string
330 :rtype: string
331 """
332 return os.path.join(
333 apt_pkg.config.find_dir('Dir::State::lists'),
334 file_name
335 )
337 def _match_index_file_to_repository(self, sources_file):
338 """
339 Returns a two-tuple ``(class:`Repository <distro_tracker.core.
340 models.Repository>`, component)``. The class:`Repository
341 <distro_tracker.core.models.Repository>` instance which matches the
342 given cached ``Sources`` file and the ``component`` of the ``Source``.
344 :rtype: (:class:`Repository <distro_tracker.core.models.Repository>`,
345 string)
346 """
347 from distro_tracker.core.models import Repository
349 sources_list = apt_pkg.SourceList()
350 sources_list.read_main_list()
351 component_url = None
352 component = None
353 for entry in sources_list.list:
354 for index_file in entry.index_files:
355 if os.path.basename(sources_file) in index_file.describe:
356 base_url, component, _ = index_file.describe.split(None, 2)
357 base_url = base_url.rstrip('/')
358 component_url = base_url + '/' + component
359 break
361 components = component.split('/')
362 if len(components) >= 2:
363 component = components[1].strip()
365 for repository in Repository.objects.all():
366 if component_url in repository.component_urls:
367 return repository, component
369 def _get_all_cached_files(self):
370 """
371 Returns a list of all cached files.
372 """
373 lists_directory = apt_pkg.config.find_dir('Dir::State::lists')
374 try:
375 return [
376 os.path.join(lists_directory, file_name)
377 for file_name in os.listdir(lists_directory)
378 if os.path.isfile(os.path.join(lists_directory, file_name))
379 ]
380 except OSError:
381 # The directory structure does not exist => nothing is cached
382 return []
384 def get_cached_files(self, filter_function=None):
385 """
386 Returns cached files, optionally filtered by the given
387 ``filter_function``
389 :param filter_function: Takes a file name as the only parameter and
390 returns a :class:`bool` indicating whether it should be included
391 in the result.
392 :type filter_function: callable
394 :returns: A list of cached file names
395 :rtype: list
396 """
397 if filter_function is None: 397 ↛ 399line 397 didn't jump to line 399, because the condition on line 397 was never true
398 # Include all files if the filter function is not provided
399 def filter_function(x):
400 return True
402 return [
403 file_name
404 for file_name in self._get_all_cached_files()
405 if filter_function(file_name)
406 ]
408 def get_sources_files_for_repository(self, repository):
409 """
410 Returns all ``Sources`` files which are cached for the given
411 repository.
413 For instance, ``Sources`` files for different suites are cached
414 separately.
416 :param repository: The repository for which to return all cached
417 ``Sources`` files
418 :type repository: :class:`Repository
419 <distro_tracker.core.models.Repository>`
421 :rtype: ``iterable`` of strings
422 """
423 return self.get_cached_files(
424 lambda file_name: (
425 file_name.endswith('Sources') and
426 self._match_index_file_to_repository(
427 file_name)[0] == repository))
429 def get_packages_files_for_repository(self, repository):
430 """
431 Returns all ``Packages`` files which are cached for the given
432 repository.
434 For instance, ``Packages`` files for different suites are cached
435 separately.
437 :param repository: The repository for which to return all cached
438 ``Packages`` files
439 :type repository: :class:`Repository
440 <distro_tracker.core.models.Repository>`
442 :rtype: ``iterable`` of strings
443 """
444 return self.get_cached_files(
445 lambda file_name: (
446 file_name.endswith('Packages') and
447 self._match_index_file_to_repository(
448 file_name)[0] == repository))
450 def update_repositories(self, force_download=False):
451 """
452 Initiates a cache update.
454 :param force_download: If set to ``True`` causes the cache to be
455 cleared before starting the update, thus making sure all index
456 files are downloaded again.
458 :returns: A two-tuple ``(updated_sources, updated_packages)``. Each of
459 the tuple's members is a list of
460 (:class:`Repository <distro_tracker.core.models.Repository>`,
461 ``component``, ``file_name``) tuple representing the repository
462 which was updated, component, and the file which contains the fresh
463 information. The file is either a ``Sources`` or a ``Packages``
464 file respectively.
465 """
466 if force_download:
467 self.clear_cache()
469 self.configure_cache()
471 cache = apt.Cache(rootdir=self.cache_root_dir)
472 progress = AptCache.AcquireProgress()
473 cache.update(progress)
475 updated_sources = []
476 updated_packages = []
477 for fetched_file in progress.fetched:
478 if fetched_file.endswith('Sources'):
479 dest = updated_sources
480 elif fetched_file.endswith('Packages'):
481 dest = updated_packages
482 else:
483 continue
484 repository, component = self._match_index_file_to_repository(
485 fetched_file)
486 dest.append((
487 repository, component, self._index_file_full_path(fetched_file)
488 ))
490 return updated_sources, updated_packages
492 def _get_format(self, record):
493 """
494 Returns the Format field value of the given source package record.
495 """
496 record = deb822.Deb822(record)
497 return record['format']
499 def _extract_quilt_package_debian_tar(self, debian_tar_path, outdir):
500 """
501 Extracts the given tarball to the given output directory.
502 """
503 with tarfile.open(debian_tar_path) as archive_file:
505 def is_within_directory(directory, target):
506 abs_directory = os.path.abspath(directory)
507 abs_target = os.path.abspath(target)
509 prefix = os.path.commonprefix([abs_directory, abs_target])
511 return prefix == abs_directory
513 def safe_extract(tar, path="."):
514 for member in tar.getmembers():
515 member_path = os.path.join(path, member.name)
516 if not is_within_directory(path, member_path):
517 raise Exception("Attempted Path Traversal in Tar File")
519 tar.extractall(path)
521 safe_extract(archive_file, outdir)
523 def get_package_source_cache_directory(self, package_name):
524 """
525 Returns the path to the directory where a particular source package is
526 cached.
528 :param package_name: The name of the source package
529 :type package_name: string
531 :rtype: string
532 """
533 package_hash = (
534 package_name[0]
535 if not package_name.startswith('lib') else
536 package_name[:4]
537 )
538 return os.path.join(
539 self.source_cache_directory,
540 package_hash,
541 package_name)
543 def get_source_version_cache_directory(self, package_name, version):
544 """
545 Returns the path to the directory where a particular source package
546 version files are extracted.
548 :param package_name: The name of the source package
549 :type package_name: string
551 :param version: The version of the source package
552 :type version: string
554 :rtype: string
555 """
556 package_dir = self.get_package_source_cache_directory(package_name)
557 return os.path.join(package_dir, package_name + '-' + version)
559 def _remove_dir(self, directory_path):
560 """
561 Removes the given directory, including any subdirectories and files.
562 The method makes sure to correctly handle the situation where the
563 directory contains files with names which are invalid utf-8.
564 """
565 # Convert the directory path to bytes to make sure all os calls deal
566 # with bytes, not unicode objects.
567 # This way any file names with invalid utf-8 names, are correctly
568 # handled, without causing an error.
569 directory_path = force_bytes(directory_path)
570 if os.path.exists(directory_path):
571 shutil.rmtree(directory_path)
573 def clear_cached_sources(self):
574 """
575 Clears all cached package source files.
576 """
577 self._remove_dir(self.source_cache_directory)
578 self._cache_size = self.get_directory_size(self.source_cache_directory)
580 def _get_apt_source_records(self, source_name, version):
581 """
582 Returns a :class:`apt_pkg.SourceRecords` instance where the given
583 source package is the current working record.
584 """
585 apt.Cache(rootdir=self.cache_root_dir) # must be pre-created
586 source_records = apt_pkg.SourceRecords()
587 source_records.restart()
588 # Find the cached record matching this source package and version
589 found = False
590 while source_records.lookup(source_name):
591 if source_records.version == version:
592 found = True
593 break
595 if not found:
596 # Package version does not exist in the cache
597 raise SourcePackageRetrieveError(
598 "Could not retrieve package {pkg} version {ver}:"
599 " No such version found in the cache".format(
600 pkg=source_name, ver=version))
602 return source_records
604 def _extract_dpkg_source(self, retrieved_files, outdir):
605 """
606 Uses dpkg-source to extract the source package.
607 """
608 dsc_file_path = next(
609 file_path
610 for file_path in retrieved_files
611 if file_path.endswith('.dsc'))
612 dsc_file_path = os.path.abspath(dsc_file_path)
613 outdir = os.path.abspath(outdir)
614 subprocess.check_output(["dpkg-source", "-x", dsc_file_path, outdir],
615 stderr=subprocess.STDOUT)
617 def _apt_acquire_package(self,
618 source_records,
619 dest_dir_path,
620 debian_directory_only):
621 """
622 Using :class:`apt_pkg.Acquire`, retrieves the source files for the
623 source package described by the current source_records record.
625 :param source_records: The record describing the source package whose
626 files should be retrieved.
627 :type source_records: :class:`apt_pkg.Acquire`
629 :param dest_dir_path: The path to the directory where the downloaded
630 files should be saved.
631 :type dest_dir_path: string
633 :param debian_directory_only: A flag indicating whether only the debian
634 directory should be downloaded.
636 :returns: A list of absolute paths of all retrieved source files.
637 :rtype: list of strings
638 """
639 package_format = self._get_format(source_records.record)
640 # A reference to each AcquireFile instance must be kept
641 files = []
642 acquire = apt_pkg.Acquire(apt.progress.base.AcquireProgress())
643 for srcfile in source_records.files:
644 base = os.path.basename(srcfile.path)
645 dest_file_path = os.path.join(dest_dir_path, base)
646 if debian_directory_only and package_format == self.QUILT_FORMAT:
647 if srcfile.type != 'diff':
648 # Only retrieve the .debian.tar.* file for quilt packages
649 # when only the debian directory is wanted
650 continue
651 files.append(apt_pkg.AcquireFile(
652 acquire,
653 source_records.index.archive_uri(srcfile.path),
654 srcfile.hashes,
655 srcfile.size,
656 base,
657 destfile=dest_file_path
658 ))
660 acquire.run()
662 # Check if all items are correctly retrieved and build the list of file
663 # paths.
664 retrieved_paths = []
665 for item in acquire.items:
666 if item.status != item.STAT_DONE:
667 raise SourcePackageRetrieveError(
668 'Could not retrieve file {file}: {error}'.format(
669 file=item.destfile,
670 error=item.error_text.decode('utf-8')))
671 retrieved_paths.append(item.destfile)
673 return retrieved_paths
675 def retrieve_source(self, source_name, version,
676 debian_directory_only=False):
677 """
678 Retrieve the source package files for the given source package version.
680 :param source_name: The name of the source package
681 :type source_name: string
682 :param version: The version of the source package
683 :type version: string
684 :param debian_directory_only: Flag indicating if the method should try
685 to retrieve only the debian directory of the source package. This
686 is usually only possible when the package format is 3.0 (quilt).
687 :type debian_directory_only: Boolean
689 :returns: The path to the directory containing the extracted source
690 package files.
691 :rtype: string
692 """
693 if self.cache_size > self.cache_max_size:
694 # If the maximum allowed cache size has been exceeded,
695 # clear the cache
696 self.clear_cached_sources()
698 source_records = self._get_apt_source_records(source_name, version)
700 dest_dir_path = self.get_package_source_cache_directory(source_name)
701 if not os.path.exists(dest_dir_path): 701 ↛ 704line 701 didn't jump to line 704, because the condition on line 701 was never false
702 os.makedirs(dest_dir_path)
703 # Remember the size of the directory in the beginning
704 old_size = self.get_directory_size(dest_dir_path)
706 # Download the source files
707 retrieved_files = self._apt_acquire_package(
708 source_records, dest_dir_path, debian_directory_only)
710 # Extract the retrieved source files
711 outdir = self.get_source_version_cache_directory(source_name, version)
712 # dpkg-source expects this directory not to exist
713 self._remove_dir(outdir)
715 package_format = self._get_format(source_records.record)
716 if debian_directory_only and package_format == self.QUILT_FORMAT: 716 ↛ 718line 716 didn't jump to line 718, because the condition on line 716 was never true
717 # dpkg-source cannot extract an incomplete package
718 self._extract_quilt_package_debian_tar(retrieved_files[0], outdir)
719 else:
720 # Let dpkg-source handle the extraction in all other cases
721 self._extract_dpkg_source(retrieved_files, outdir)
723 # Update the current cache size based on the changes made by getting
724 # this source package.
725 new_size = self.get_directory_size(dest_dir_path)
726 size_delta = new_size - old_size
727 self._cache_size += size_delta
729 return outdir
732def html_package_list(packages):
733 """Return a HTML-formatted list of packages."""
734 packages_html = []
735 for package in packages:
736 if "/" in package:
737 (source_package_name, remain) = package.split("/", 1)
738 remain = "/%s" % (remain,)
739 else:
740 (source_package_name, remain) = (package, "")
741 html = '<a href="{}">{}</a>{}'.format(
742 package_url(source_package_name), source_package_name, remain)
743 packages_html.append(html)
745 return ', '.join(packages_html)