Coverage for distro_tracker/core/tasks/mixins.py: 99%
252 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
1# Copyright 2018 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""
11Mixins to combine to create powerful tasks.
13"""
14import hashlib
15import json
16import logging
18from debian.debian_support import version_compare
20from django.db import transaction
22from distro_tracker.core.models import (
23 ActionItem,
24 ActionItemType,
25 PackageData,
26 PackageName,
27 Repository,
28 SourcePackage,
29 SourcePackageRepositoryEntry,
30)
31from distro_tracker.core.utils import now
32from distro_tracker.core.utils.http import get_resource_text
34logger = logging.getLogger('distro_tracker.tasks')
37class ProcessItems(object):
38 """
39 Base class for all Process* mixins. Those mixins defines a list of
40 items that the task should process.
41 """
43 def __init__(self):
44 self.register_event_handler('execute-started',
45 self.handle_fake_update_parameter)
46 self.register_event_handler('execute-finished',
47 self.items_cleanup_processed_list)
48 super().__init__()
50 def item_to_key(self, item):
51 """
52 Converts an item to process into a unique string representation
53 than can be used to record the fact that the item has been processed.
55 :param object item: Any kind of object.
56 :return: A unique string representation of the object.
57 :rtype: str
58 """
59 return str(item)
61 def item_describe(self, item):
62 """
63 Converts an item into a dictionnary with the most important
64 data of the item that we want to save for later when the item
65 will have vanished.
67 :param object item: Any kind of object.
68 :return: A dictionnary describing the object.
69 :rtype: dict
70 """
71 return {}
73 def item_mark_processed(self, *args):
74 """
75 Mark an item as having been processed. This records the key associated
76 to the item in a ``processed`` dictionnary within the persistent
77 data of the task.
79 :param args: list of items to mark as having been processed
80 """
81 processed = self.data.setdefault('processed', {})
82 for item in args:
83 processed[self.item_to_key(item)] = self.item_describe(item)
84 self.data_mark_modified()
86 def item_needs_processing(self, item):
87 """
88 Verifies if the item needs to be processed or not.
90 :param object item: the item to check
91 :return: True if the obect is not recorded as having already been
92 processed, False otherwise.
93 :rtype: bool
94 """
95 processed = self.data.setdefault('processed', {})
96 return self.item_to_key(item) not in processed
98 def items_all(self):
99 """
100 This method returns an iterable of all the existing items, including
101 those that have already been processed and those which are going to be
102 processed.
104 :return: All the existing items.
105 :rtype: An iterable, can be an iterator or a list, set, tuple.
106 """
107 raise NotImplementedError("ProcessItems.items_all() must be overriden.")
109 def items_to_process(self):
110 """
111 This method returns the items that have to be processed by the task.
113 Its default implementation in :class:`ProcessItems` is to iterate over
114 the items returned by :meth:`items_all` and to :func:`yield` those where
115 :meth:`item_needs_processing` returns True.
117 If the `force_update` parameter is set to True, then it returns all the
118 items without calling :meth:`item_needs_processing`.
119 """
120 for item in self.items_all():
121 if self.force_update or self.item_needs_processing(item):
122 yield item
124 def items_all_keys(self):
125 """
126 This method returns all the keys corresponding to valid-existing
127 items.
129 Its main purpose is to be able to compute the list of keys
130 in the 'already-processed' list that are no-longer relevant and can be
131 dropped.
133 Its default implementation is to iterate over items returned by
134 :meth:`items_all` and call :meth:`item_to_key` on them. This method
135 can thus be overrident when there are more efficient ways to implement
136 this logic.
138 :return: the set of keys of the valid objects
139 :rtype: set
140 """
141 return set([self.item_to_key(x) for x in self.items_all()])
143 def items_to_cleanup(self):
144 """
145 This method returns an iterators returning a tuple
146 (key, description) for old items that have been processed
147 in the past but are no longer existing in :meth:`all_items`.
149 The description is the value returned by :meth:`item_describe`
150 at the time when the item has been processed. The key is the value
151 returned by :meth:`item_to_key` at the time when the item has been
152 processed.
154 :return: (key, description)
155 :rtype: tuple
156 """
157 processed = self.data.setdefault('processed', {})
158 processed_set = set(processed.keys())
159 unused_keys = processed_set.difference(self.items_all_keys())
160 for key in unused_keys:
161 yield (key, processed[key])
163 def items_cleanup_processed_list(self):
164 """
165 This method drops unused keys from the list of processed items.
167 To identify unused keys, it computes the difference between the
168 set of keys present in the 'processed' list and the set of keys
169 returned by :meth:`items_all_keys`.
170 """
171 processed = self.data.setdefault('processed', {})
172 modified = False
173 for key, _ in self.items_to_cleanup():
174 del processed[key]
175 modified = True
176 if modified:
177 self.data_mark_modified()
179 def items_fake_processed_list(self):
180 """
181 This method goes over all items to process and marks them as processed.
182 This is useful to fake the whole update process and bootstrap an
183 iterative process where we don't want the initial run to process
184 all existing entries.
185 """
186 for item in self.items_to_process():
187 self.item_mark_processed(item)
189 def handle_fake_update_parameter(self):
190 """
191 This method is registered as an execute-started event handler and
192 marks all items as processed even before the task has a chance to
193 process them.
194 """
195 if self.fake_update:
196 self.items_fake_processed_list()
199class ProcessModel(ProcessItems):
200 """
201 With this mixin, the list of items to be processed is a list of objects
202 retrieved through the database model specified in the :attr:`model`
203 attribute. Sub-classes should thus at least override this attribute.
204 """
206 #: The database model defining the list of items to process
207 model = None
209 def items_all(self):
210 return self.items_extend_queryset(self.model.objects.all())
212 def items_to_process(self):
213 items = self.items_all()
214 # Exclude the items already processed, unless --force-update tells us to
215 # reprocess all entries
216 if not self.force_update:
217 processed = self.data.setdefault('processed', {})
218 # XXX: might not be the right thing when primary key is not the id
219 processed_keys = list(map(lambda x: int(x), processed.keys()))
220 items = items.exclude(pk__in=processed_keys)
221 return items
223 def items_extend_queryset(self, queryset):
224 """
225 This method can be overriden by sub-classes to customize the queryset
226 returned by :meth:`items_all`. The normal queryset is passed as
227 parameter and the method should return the modified queryset.
229 :param QuerySet queryset: the original queryset
230 :return: the modified queryset
231 :rtype: QuerySet
232 """
233 return queryset
235 def item_to_key(self, item):
236 """
237 For database objects, we use the primary key as the key for the
238 processed list.
240 :param item: an instance of the associated model
241 :return: the value of its primary key
242 """
243 return str(item.pk)
245 def items_all_keys(self):
246 # Better implementation with an optimized query
247 return set(map(lambda x: str(x),
248 self.items_all().values_list('pk', flat=True)))
250 def item_describe(self, item):
251 data = super().item_describe(item)
252 for field_name in getattr(self, 'fields_to_save', []):
253 field = getattr(item, field_name)
254 if callable(field):
255 field = field()
256 data[field_name] = field
257 return data
260class ProcessSourcePackage(ProcessModel):
261 """
262 Process all :class:`~distro_tracker.core.models.SourcePackage` objects.
263 """
264 model = SourcePackage
265 fields_to_save = ('name', 'version')
268class ProcessSrcRepoEntry(ProcessModel):
269 """
270 Process all
271 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry`.
272 """
274 model = SourcePackageRepositoryEntry
276 def items_extend_queryset(self, queryset):
277 return queryset.select_related(
278 'source_package__source_package_name', 'repository')
280 def item_describe(self, item):
281 data = super().item_describe(item)
282 data['name'] = item.source_package.name
283 data['version'] = item.source_package.version
284 data['repository'] = item.repository.shorthand
285 data['repository_id'] = item.repository.id
286 return data
289class ProcessSrcRepoEntryInDefaultRepository(ProcessSrcRepoEntry):
290 """
291 Process
292 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry`.
293 from the default repository.
294 """
296 def items_extend_queryset(self, queryset):
297 queryset = super().items_extend_queryset(queryset)
298 return queryset.filter(repository__default=True)
301class ProcessMainRepoEntry(ProcessItems):
302 """
303 Process the main
304 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry`
305 for each package. The main entry is defined as being the one existing in the
306 default repository. If there's no default entry for a given package, then
307 it's the entry with the biggest version that is taken. If there are still
308 two entries, then we take the one in the repository with the biggest
309 "position".
310 """
312 def __init__(self):
313 super().__init__()
314 self.main_entries = None
315 self.register_event_handler('execute-started',
316 self.clear_main_entries_cache)
317 self.register_event_handler('execute-finished',
318 self.clear_main_entries_cache)
319 self.register_event_handler('execute-failed',
320 self.clear_main_entries_cache)
322 def clear_main_entries_cache(self):
323 self.main_entries = None
325 def items_all(self):
326 if self.main_entries is not None:
327 return self.main_entries.values()
329 main_entries = {}
331 def register_entry(entry):
332 name = entry.source_package.name
333 version = entry.source_package.version
334 if name not in main_entries:
335 main_entries[name] = entry
336 else:
337 selected_version = main_entries[name].source_package.version
338 if version_compare(selected_version, version) < 0:
339 main_entries[name] = entry
340 elif version_compare(selected_version, version) == 0:
341 # If both versions are equal, we use the repository with the
342 # biggest position
343 if (entry.repository.position >
344 main_entries[name].repository.position):
345 main_entries[name] = entry
347 # First identify entries from the default repository
348 qs = SourcePackageRepositoryEntry.objects.filter(
349 repository__default=True).select_related(
350 'source_package__source_package_name',
351 'repository')
353 for entry in qs:
354 register_entry(entry)
356 # Then again for all the other remaining packages
357 qs = SourcePackageRepositoryEntry.objects.exclude(
358 source_package__source_package_name__name__in=main_entries.keys()
359 ).select_related(
360 'source_package__source_package_name',
361 'repository'
362 )
363 for entry in qs:
364 register_entry(entry)
366 self.main_entries = main_entries
367 return self.main_entries.values()
369 def item_to_key(self, item):
370 return str(item.id)
372 def item_describe(self, item):
373 return {
374 'name': item.source_package.name,
375 'version': item.source_package.version,
376 'repository': item.repository.shorthand,
377 }
380class ProcessRepositoryUpdates(ProcessSrcRepoEntry):
381 """
382 Watch repositories and generates updates operations to be processed.
384 :meth:`items_to_process` returns repository entries but you can query
385 :meth:`is_new_source_package` on the associated source package to know
386 if the source package was already present in another repository in the
387 previous run or not.
389 There's a new :meth:`iter_removals_by_repository` to find out packages
390 which have been dropped from the repository.
391 """
393 def __init__(self):
394 super().__init__()
395 self.register_event_handler('execute-started',
396 self.compute_known_packages)
398 def compute_known_packages(self):
399 """
400 Goes over the list of formerly processed items and builds lists to
401 quickly lookup wether a given package is new or not.
402 """
403 self.pkglist = {
404 'all': {},
405 }
406 self.srcpkglist = {
407 'all': {},
408 }
409 for data in self.data.get('processed', {}).values():
410 key = '%s_%s' % (data['name'], data['version'])
411 self.pkglist['all'][data['name']] = True
412 self.srcpkglist['all'][key] = True
413 repo_pkglist = self.pkglist.setdefault(data['repository_id'], {})
414 repo_srcpkglist = self.srcpkglist.setdefault(data['repository_id'],
415 {})
416 repo_pkglist[data['name']] = True
417 repo_srcpkglist[key] = True
419 def is_new_source_package(self, srcpkg):
420 """
421 Returns True if the source package was not present in the former run,
422 False otherwise.
424 The existence of the source package is deducted from the list of already
425 processed entries (with the help of :meth:`compute_known_packages` which
426 is called at the start of the :meth:`execute` method.
428 :param srcpkg: the source package
429 :type srcpkg: :class:`~distro_tracker.core.models.SourcePackage`
430 :returns: True if never seen, False otherwise
431 :rtype: bool
432 """
433 key = '%s_%s' % (srcpkg.name, srcpkg.version)
434 return key not in self.srcpkglist['all']
436 def iter_removals_by_repository(self):
437 """
438 Returns an iterator to process all package removals that happened in all
439 the repositories. The iterator yields tuples with the package name (as
440 a string) and the repository object.
441 """
442 for repository in Repository.objects.all():
443 if repository.id not in self.pkglist:
444 continue
445 qs = repository.source_packages.all()
446 new_pkglist = set(
447 qs.values_list('source_package_name__name', flat=True))
448 for package in self.pkglist[repository.id]:
449 if package not in new_pkglist:
450 yield (package, repository)
453class PackageTagging(object):
454 """
455 A task mixin that helps to maintain a set of package tags:
456 by untagging packages that no longer should be tagged and by
457 tagging packages that should.
459 Subclasses must define:
460 - `TAG_NAME`: defines the key for PackageData to be updated. One must define
461 keys matching `tag:.*`
462 - `TAG_DISPLAY_NAME`: defines the display name for the tag
463 - `TAG_COLOR_TYPE`: defines the color type to be used while rendering
464 content related to the tag. It must be defined based on the tag severity.
465 One may use one of the following options: success, danger, warning, or info.
466 - `TAG_DESCRIPTION`: defines a help text to be displayed with a 'title'
467 attribute
468 - `TAG_TABLE_TITLE`: the title of the table showing all the packages
469 with this tag
471 Also, subclasses must implement the :func:`packages_to_tag` function to
472 define the list of packages that must be tagged.
473 """
474 TAG_NAME = None
475 TAG_DISPLAY_NAME = ''
476 TAG_COLOR_TYPE = ''
477 TAG_DESCRIPTION = ''
478 TAG_TABLE_TITLE = ''
480 def packages_to_tag(self):
481 """
482 Subclasses must override this method to return the list of packages
483 that must be tagged with the tag defined by `TAG_NAME`
484 """
485 return []
487 def execute_package_tagging(self):
488 with transaction.atomic():
489 # Clear previous TaggedItems
490 PackageData.objects.filter(key=self.TAG_NAME).delete()
492 items = []
493 value = {
494 'display_name': self.TAG_DISPLAY_NAME,
495 'color_type': self.TAG_COLOR_TYPE,
496 'description': self.TAG_DESCRIPTION,
497 'table_title': self.TAG_TABLE_TITLE
498 }
499 for package in self.packages_to_tag():
500 tag = PackageData(
501 package=package, key=self.TAG_NAME, value=value)
502 items.append(tag)
503 PackageData.objects.bulk_create(items)
506class ImportExternalData:
507 """
508 This mixin downloads an external file and process it to generate various
509 database elements like PackageData or ActionItem. You need to configure it
510 by setting many attributes.
512 You need to set ``data_url`` to indicate the URL where the data will be
513 downloaded.
515 If you want to manage ActionItem out of the generated data, you need to
516 override the ``generate_action_items`` method, it returns a list of tuples
517 where each tuple is the name of the ActionItemType and a dict mapping
518 package names to attributes of ActionItem (short_description, severity,
519 extra_data). You also need to set the ``action_item_types``
520 attribute, it is a list of dict describing the ActionItemType that have
521 to be registered. The key names are the names of the attributes of the
522 model.
523 """
524 data_url = None # The URL where data is downloaded
525 action_item_types = []
527 def generate_action_items(self):
528 return []
530 @transaction.atomic
531 def _process_action_items(self):
532 # Register the needed ActionItemType
533 for ait in self.action_item_types:
534 ActionItemType.objects.create_or_update(
535 ait['type_name'], ait['full_description_template']
536 )
538 # Create/update/delete the action items
539 now_ts = now()
540 for type_name, pkglist in self.generate_action_items():
541 ait = ActionItemType.objects.get(type_name=type_name)
542 package_name_objects = {}
543 action_item_objects = {}
544 for p in PackageName.objects.filter(name__in=pkglist.keys()):
545 package_name_objects[p.name] = p
546 qs = ActionItem.objects.filter(item_type=ait,
547 package__name__in=pkglist.keys())
548 for ai in qs.all():
549 action_item_objects[ai.package.name] = ai
550 to_add = []
551 to_update = []
552 for pkgname, action_item_data in pkglist.items():
553 action_item = action_item_objects.get(pkgname)
554 if action_item:
555 updated = False
556 for field, value in action_item_data.items():
557 if value != getattr(action_item, field): 557 ↛ 556line 557 didn't jump to line 556, because the condition on line 557 was never false
558 setattr(action_item, field, value)
559 updated = True
560 if updated: 560 ↛ 552line 560 didn't jump to line 552, because the condition on line 560 was never false
561 action_item.last_updated_timestamp = now_ts
562 to_update.append(action_item)
563 else:
564 package_name = package_name_objects.get(pkgname)
565 if package_name:
566 to_add.append(
567 ActionItem(item_type=ait, package=package_name,
568 **action_item_data)
569 )
570 fields = [
571 'short_description', 'severity', 'extra_data',
572 'last_updated_timestamp'
573 ]
574 ActionItem.objects.bulk_update(to_update, fields)
575 ActionItem.objects.bulk_create(to_add)
576 ActionItem.objects.delete_obsolete_items([ait], pkglist.keys())
578 def generate_package_data(self):
579 return []
581 @transaction.atomic
582 def _process_package_data(self):
583 for key, pkglist in self.generate_package_data():
584 package_name_objects = {}
585 package_data_objects = {}
586 for p in PackageName.objects.filter(name__in=pkglist.keys()):
587 package_name_objects[p.name] = p
588 qs = PackageData.objects.filter(key=key,
589 package__name__in=pkglist.keys())
590 for pd in qs.all():
591 package_data_objects[pd.package.name] = pd
592 to_add = []
593 to_update = []
594 for pkgname, package_data_value in pkglist.items():
595 package_data = package_data_objects.get(pkgname)
596 if package_data:
597 if package_data.value != package_data_value: 597 ↛ 594line 597 didn't jump to line 594, because the condition on line 597 was never false
598 package_data.value = package_data_value
599 to_update.append(package_data)
600 else:
601 package_name = package_name_objects.get(pkgname)
602 if package_name:
603 to_add.append(
604 PackageData(key=key, value=package_data_value,
605 package=package_name)
606 )
607 PackageData.objects.bulk_update(to_update, ['value'])
608 PackageData.objects.bulk_create(to_add)
609 PackageData.objects.filter(key=key).exclude(
610 package__name__in=pkglist.keys()).delete()
612 def execute_import_external_data(self):
613 # Download the data and make it available to the various
614 # generate methods
615 url_content = get_resource_text(self.data_url,
616 force_update=self.force_update)
617 self.external_data = json.loads(url_content)
619 # If the data is unchanged since last run, do nothing
620 old_checksum = self.data.get('input_checksum')
621 checksum = hashlib.md5(
622 url_content.encode('utf-8', 'ignore')).hexdigest()
623 if checksum == old_checksum and not self.force_update:
624 return
625 self.data['input_checksum'] = checksum
627 # Process the various kind of objects that we can create
628 self._process_action_items()
629 self._process_package_data()