Coverage for distro_tracker/core/tasks/mixins.py: 99%

252 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-01-12 09:15 +0000

1# Copyright 2018 The Distro Tracker Developers 

2# See the COPYRIGHT file at the top-level directory of this distribution and 

3# at https://deb.li/DTAuthors 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10""" 

11Mixins to combine to create powerful tasks. 

12 

13""" 

14import hashlib 

15import json 

16import logging 

17 

18from debian.debian_support import version_compare 

19 

20from django.db import transaction 

21 

22from distro_tracker.core.models import ( 

23 ActionItem, 

24 ActionItemType, 

25 PackageData, 

26 PackageName, 

27 Repository, 

28 SourcePackage, 

29 SourcePackageRepositoryEntry, 

30) 

31from distro_tracker.core.utils import now 

32from distro_tracker.core.utils.http import get_resource_text 

33 

34logger = logging.getLogger('distro_tracker.tasks') 

35 

36 

37class ProcessItems(object): 

38 """ 

39 Base class for all Process* mixins. Those mixins defines a list of 

40 items that the task should process. 

41 """ 

42 

43 def __init__(self): 

44 self.register_event_handler('execute-started', 

45 self.handle_fake_update_parameter) 

46 self.register_event_handler('execute-finished', 

47 self.items_cleanup_processed_list) 

48 super().__init__() 

49 

50 def item_to_key(self, item): 

51 """ 

52 Converts an item to process into a unique string representation 

53 than can be used to record the fact that the item has been processed. 

54 

55 :param object item: Any kind of object. 

56 :return: A unique string representation of the object. 

57 :rtype: str 

58 """ 

59 return str(item) 

60 

61 def item_describe(self, item): 

62 """ 

63 Converts an item into a dictionnary with the most important 

64 data of the item that we want to save for later when the item 

65 will have vanished. 

66 

67 :param object item: Any kind of object. 

68 :return: A dictionnary describing the object. 

69 :rtype: dict 

70 """ 

71 return {} 

72 

73 def item_mark_processed(self, *args): 

74 """ 

75 Mark an item as having been processed. This records the key associated 

76 to the item in a ``processed`` dictionnary within the persistent 

77 data of the task. 

78 

79 :param args: list of items to mark as having been processed 

80 """ 

81 processed = self.data.setdefault('processed', {}) 

82 for item in args: 

83 processed[self.item_to_key(item)] = self.item_describe(item) 

84 self.data_mark_modified() 

85 

86 def item_needs_processing(self, item): 

87 """ 

88 Verifies if the item needs to be processed or not. 

89 

90 :param object item: the item to check 

91 :return: True if the obect is not recorded as having already been 

92 processed, False otherwise. 

93 :rtype: bool 

94 """ 

95 processed = self.data.setdefault('processed', {}) 

96 return self.item_to_key(item) not in processed 

97 

98 def items_all(self): 

99 """ 

100 This method returns an iterable of all the existing items, including 

101 those that have already been processed and those which are going to be 

102 processed. 

103 

104 :return: All the existing items. 

105 :rtype: An iterable, can be an iterator or a list, set, tuple. 

106 """ 

107 raise NotImplementedError("ProcessItems.items_all() must be overriden.") 

108 

109 def items_to_process(self): 

110 """ 

111 This method returns the items that have to be processed by the task. 

112 

113 Its default implementation in :class:`ProcessItems` is to iterate over 

114 the items returned by :meth:`items_all` and to :func:`yield` those where 

115 :meth:`item_needs_processing` returns True. 

116 

117 If the `force_update` parameter is set to True, then it returns all the 

118 items without calling :meth:`item_needs_processing`. 

119 """ 

120 for item in self.items_all(): 

121 if self.force_update or self.item_needs_processing(item): 

122 yield item 

123 

124 def items_all_keys(self): 

125 """ 

126 This method returns all the keys corresponding to valid-existing 

127 items. 

128 

129 Its main purpose is to be able to compute the list of keys 

130 in the 'already-processed' list that are no-longer relevant and can be 

131 dropped. 

132 

133 Its default implementation is to iterate over items returned by 

134 :meth:`items_all` and call :meth:`item_to_key` on them. This method 

135 can thus be overrident when there are more efficient ways to implement 

136 this logic. 

137 

138 :return: the set of keys of the valid objects 

139 :rtype: set 

140 """ 

141 return set([self.item_to_key(x) for x in self.items_all()]) 

142 

143 def items_to_cleanup(self): 

144 """ 

145 This method returns an iterators returning a tuple 

146 (key, description) for old items that have been processed 

147 in the past but are no longer existing in :meth:`all_items`. 

148 

149 The description is the value returned by :meth:`item_describe` 

150 at the time when the item has been processed. The key is the value 

151 returned by :meth:`item_to_key` at the time when the item has been 

152 processed. 

153 

154 :return: (key, description) 

155 :rtype: tuple 

156 """ 

157 processed = self.data.setdefault('processed', {}) 

158 processed_set = set(processed.keys()) 

159 unused_keys = processed_set.difference(self.items_all_keys()) 

160 for key in unused_keys: 

161 yield (key, processed[key]) 

162 

163 def items_cleanup_processed_list(self): 

164 """ 

165 This method drops unused keys from the list of processed items. 

166 

167 To identify unused keys, it computes the difference between the 

168 set of keys present in the 'processed' list and the set of keys 

169 returned by :meth:`items_all_keys`. 

170 """ 

171 processed = self.data.setdefault('processed', {}) 

172 modified = False 

173 for key, _ in self.items_to_cleanup(): 

174 del processed[key] 

175 modified = True 

176 if modified: 

177 self.data_mark_modified() 

178 

179 def items_fake_processed_list(self): 

180 """ 

181 This method goes over all items to process and marks them as processed. 

182 This is useful to fake the whole update process and bootstrap an 

183 iterative process where we don't want the initial run to process 

184 all existing entries. 

185 """ 

186 for item in self.items_to_process(): 

187 self.item_mark_processed(item) 

188 

189 def handle_fake_update_parameter(self): 

190 """ 

191 This method is registered as an execute-started event handler and 

192 marks all items as processed even before the task has a chance to 

193 process them. 

194 """ 

195 if self.fake_update: 

196 self.items_fake_processed_list() 

197 

198 

199class ProcessModel(ProcessItems): 

200 """ 

201 With this mixin, the list of items to be processed is a list of objects 

202 retrieved through the database model specified in the :attr:`model` 

203 attribute. Sub-classes should thus at least override this attribute. 

204 """ 

205 

206 #: The database model defining the list of items to process 

207 model = None 

208 

209 def items_all(self): 

210 return self.items_extend_queryset(self.model.objects.all()) 

211 

212 def items_to_process(self): 

213 items = self.items_all() 

214 # Exclude the items already processed, unless --force-update tells us to 

215 # reprocess all entries 

216 if not self.force_update: 

217 processed = self.data.setdefault('processed', {}) 

218 # XXX: might not be the right thing when primary key is not the id 

219 processed_keys = list(map(lambda x: int(x), processed.keys())) 

220 items = items.exclude(pk__in=processed_keys) 

221 return items 

222 

223 def items_extend_queryset(self, queryset): 

224 """ 

225 This method can be overriden by sub-classes to customize the queryset 

226 returned by :meth:`items_all`. The normal queryset is passed as 

227 parameter and the method should return the modified queryset. 

228 

229 :param QuerySet queryset: the original queryset 

230 :return: the modified queryset 

231 :rtype: QuerySet 

232 """ 

233 return queryset 

234 

235 def item_to_key(self, item): 

236 """ 

237 For database objects, we use the primary key as the key for the 

238 processed list. 

239 

240 :param item: an instance of the associated model 

241 :return: the value of its primary key 

242 """ 

243 return str(item.pk) 

244 

245 def items_all_keys(self): 

246 # Better implementation with an optimized query 

247 return set(map(lambda x: str(x), 

248 self.items_all().values_list('pk', flat=True))) 

249 

250 def item_describe(self, item): 

251 data = super().item_describe(item) 

252 for field_name in getattr(self, 'fields_to_save', []): 

253 field = getattr(item, field_name) 

254 if callable(field): 

255 field = field() 

256 data[field_name] = field 

257 return data 

258 

259 

260class ProcessSourcePackage(ProcessModel): 

261 """ 

262 Process all :class:`~distro_tracker.core.models.SourcePackage` objects. 

263 """ 

264 model = SourcePackage 

265 fields_to_save = ('name', 'version') 

266 

267 

268class ProcessSrcRepoEntry(ProcessModel): 

269 """ 

270 Process all 

271 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry`. 

272 """ 

273 

274 model = SourcePackageRepositoryEntry 

275 

276 def items_extend_queryset(self, queryset): 

277 return queryset.select_related( 

278 'source_package__source_package_name', 'repository') 

279 

280 def item_describe(self, item): 

281 data = super().item_describe(item) 

282 data['name'] = item.source_package.name 

283 data['version'] = item.source_package.version 

284 data['repository'] = item.repository.shorthand 

285 data['repository_id'] = item.repository.id 

286 return data 

287 

288 

289class ProcessSrcRepoEntryInDefaultRepository(ProcessSrcRepoEntry): 

290 """ 

291 Process 

292 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry`. 

293 from the default repository. 

294 """ 

295 

296 def items_extend_queryset(self, queryset): 

297 queryset = super().items_extend_queryset(queryset) 

298 return queryset.filter(repository__default=True) 

299 

300 

301class ProcessMainRepoEntry(ProcessItems): 

302 """ 

303 Process the main 

304 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry` 

305 for each package. The main entry is defined as being the one existing in the 

306 default repository. If there's no default entry for a given package, then 

307 it's the entry with the biggest version that is taken. If there are still 

308 two entries, then we take the one in the repository with the biggest 

309 "position". 

310 """ 

311 

312 def __init__(self): 

313 super().__init__() 

314 self.main_entries = None 

315 self.register_event_handler('execute-started', 

316 self.clear_main_entries_cache) 

317 self.register_event_handler('execute-finished', 

318 self.clear_main_entries_cache) 

319 self.register_event_handler('execute-failed', 

320 self.clear_main_entries_cache) 

321 

322 def clear_main_entries_cache(self): 

323 self.main_entries = None 

324 

325 def items_all(self): 

326 if self.main_entries is not None: 

327 return self.main_entries.values() 

328 

329 main_entries = {} 

330 

331 def register_entry(entry): 

332 name = entry.source_package.name 

333 version = entry.source_package.version 

334 if name not in main_entries: 

335 main_entries[name] = entry 

336 else: 

337 selected_version = main_entries[name].source_package.version 

338 if version_compare(selected_version, version) < 0: 

339 main_entries[name] = entry 

340 elif version_compare(selected_version, version) == 0: 

341 # If both versions are equal, we use the repository with the 

342 # biggest position 

343 if (entry.repository.position > 

344 main_entries[name].repository.position): 

345 main_entries[name] = entry 

346 

347 # First identify entries from the default repository 

348 qs = SourcePackageRepositoryEntry.objects.filter( 

349 repository__default=True).select_related( 

350 'source_package__source_package_name', 

351 'repository') 

352 

353 for entry in qs: 

354 register_entry(entry) 

355 

356 # Then again for all the other remaining packages 

357 qs = SourcePackageRepositoryEntry.objects.exclude( 

358 source_package__source_package_name__name__in=main_entries.keys() 

359 ).select_related( 

360 'source_package__source_package_name', 

361 'repository' 

362 ) 

363 for entry in qs: 

364 register_entry(entry) 

365 

366 self.main_entries = main_entries 

367 return self.main_entries.values() 

368 

369 def item_to_key(self, item): 

370 return str(item.id) 

371 

372 def item_describe(self, item): 

373 return { 

374 'name': item.source_package.name, 

375 'version': item.source_package.version, 

376 'repository': item.repository.shorthand, 

377 } 

378 

379 

380class ProcessRepositoryUpdates(ProcessSrcRepoEntry): 

381 """ 

382 Watch repositories and generates updates operations to be processed. 

383 

384 :meth:`items_to_process` returns repository entries but you can query 

385 :meth:`is_new_source_package` on the associated source package to know 

386 if the source package was already present in another repository in the 

387 previous run or not. 

388 

389 There's a new :meth:`iter_removals_by_repository` to find out packages 

390 which have been dropped from the repository. 

391 """ 

392 

393 def __init__(self): 

394 super().__init__() 

395 self.register_event_handler('execute-started', 

396 self.compute_known_packages) 

397 

398 def compute_known_packages(self): 

399 """ 

400 Goes over the list of formerly processed items and builds lists to 

401 quickly lookup wether a given package is new or not. 

402 """ 

403 self.pkglist = { 

404 'all': {}, 

405 } 

406 self.srcpkglist = { 

407 'all': {}, 

408 } 

409 for data in self.data.get('processed', {}).values(): 

410 key = '%s_%s' % (data['name'], data['version']) 

411 self.pkglist['all'][data['name']] = True 

412 self.srcpkglist['all'][key] = True 

413 repo_pkglist = self.pkglist.setdefault(data['repository_id'], {}) 

414 repo_srcpkglist = self.srcpkglist.setdefault(data['repository_id'], 

415 {}) 

416 repo_pkglist[data['name']] = True 

417 repo_srcpkglist[key] = True 

418 

419 def is_new_source_package(self, srcpkg): 

420 """ 

421 Returns True if the source package was not present in the former run, 

422 False otherwise. 

423 

424 The existence of the source package is deducted from the list of already 

425 processed entries (with the help of :meth:`compute_known_packages` which 

426 is called at the start of the :meth:`execute` method. 

427 

428 :param srcpkg: the source package 

429 :type srcpkg: :class:`~distro_tracker.core.models.SourcePackage` 

430 :returns: True if never seen, False otherwise 

431 :rtype: bool 

432 """ 

433 key = '%s_%s' % (srcpkg.name, srcpkg.version) 

434 return key not in self.srcpkglist['all'] 

435 

436 def iter_removals_by_repository(self): 

437 """ 

438 Returns an iterator to process all package removals that happened in all 

439 the repositories. The iterator yields tuples with the package name (as 

440 a string) and the repository object. 

441 """ 

442 for repository in Repository.objects.all(): 

443 if repository.id not in self.pkglist: 

444 continue 

445 qs = repository.source_packages.all() 

446 new_pkglist = set( 

447 qs.values_list('source_package_name__name', flat=True)) 

448 for package in self.pkglist[repository.id]: 

449 if package not in new_pkglist: 

450 yield (package, repository) 

451 

452 

453class PackageTagging(object): 

454 """ 

455 A task mixin that helps to maintain a set of package tags: 

456 by untagging packages that no longer should be tagged and by 

457 tagging packages that should. 

458 

459 Subclasses must define: 

460 - `TAG_NAME`: defines the key for PackageData to be updated. One must define 

461 keys matching `tag:.*` 

462 - `TAG_DISPLAY_NAME`: defines the display name for the tag 

463 - `TAG_COLOR_TYPE`: defines the color type to be used while rendering 

464 content related to the tag. It must be defined based on the tag severity. 

465 One may use one of the following options: success, danger, warning, or info. 

466 - `TAG_DESCRIPTION`: defines a help text to be displayed with a 'title' 

467 attribute 

468 - `TAG_TABLE_TITLE`: the title of the table showing all the packages 

469 with this tag 

470 

471 Also, subclasses must implement the :func:`packages_to_tag` function to 

472 define the list of packages that must be tagged. 

473 """ 

474 TAG_NAME = None 

475 TAG_DISPLAY_NAME = '' 

476 TAG_COLOR_TYPE = '' 

477 TAG_DESCRIPTION = '' 

478 TAG_TABLE_TITLE = '' 

479 

480 def packages_to_tag(self): 

481 """ 

482 Subclasses must override this method to return the list of packages 

483 that must be tagged with the tag defined by `TAG_NAME` 

484 """ 

485 return [] 

486 

487 def execute_package_tagging(self): 

488 with transaction.atomic(): 

489 # Clear previous TaggedItems 

490 PackageData.objects.filter(key=self.TAG_NAME).delete() 

491 

492 items = [] 

493 value = { 

494 'display_name': self.TAG_DISPLAY_NAME, 

495 'color_type': self.TAG_COLOR_TYPE, 

496 'description': self.TAG_DESCRIPTION, 

497 'table_title': self.TAG_TABLE_TITLE 

498 } 

499 for package in self.packages_to_tag(): 

500 tag = PackageData( 

501 package=package, key=self.TAG_NAME, value=value) 

502 items.append(tag) 

503 PackageData.objects.bulk_create(items) 

504 

505 

506class ImportExternalData: 

507 """ 

508 This mixin downloads an external file and process it to generate various 

509 database elements like PackageData or ActionItem. You need to configure it 

510 by setting many attributes. 

511 

512 You need to set ``data_url`` to indicate the URL where the data will be 

513 downloaded. 

514 

515 If you want to manage ActionItem out of the generated data, you need to 

516 override the ``generate_action_items`` method, it returns a list of tuples 

517 where each tuple is the name of the ActionItemType and a dict mapping 

518 package names to attributes of ActionItem (short_description, severity, 

519 extra_data). You also need to set the ``action_item_types`` 

520 attribute, it is a list of dict describing the ActionItemType that have 

521 to be registered. The key names are the names of the attributes of the 

522 model. 

523 """ 

524 data_url = None # The URL where data is downloaded 

525 action_item_types = [] 

526 

527 def generate_action_items(self): 

528 return [] 

529 

530 @transaction.atomic 

531 def _process_action_items(self): 

532 # Register the needed ActionItemType 

533 for ait in self.action_item_types: 

534 ActionItemType.objects.create_or_update( 

535 ait['type_name'], ait['full_description_template'] 

536 ) 

537 

538 # Create/update/delete the action items 

539 now_ts = now() 

540 for type_name, pkglist in self.generate_action_items(): 

541 ait = ActionItemType.objects.get(type_name=type_name) 

542 package_name_objects = {} 

543 action_item_objects = {} 

544 for p in PackageName.objects.filter(name__in=pkglist.keys()): 

545 package_name_objects[p.name] = p 

546 qs = ActionItem.objects.filter(item_type=ait, 

547 package__name__in=pkglist.keys()) 

548 for ai in qs.all(): 

549 action_item_objects[ai.package.name] = ai 

550 to_add = [] 

551 to_update = [] 

552 for pkgname, action_item_data in pkglist.items(): 

553 action_item = action_item_objects.get(pkgname) 

554 if action_item: 

555 updated = False 

556 for field, value in action_item_data.items(): 

557 if value != getattr(action_item, field): 557 ↛ 556line 557 didn't jump to line 556, because the condition on line 557 was never false

558 setattr(action_item, field, value) 

559 updated = True 

560 if updated: 560 ↛ 552line 560 didn't jump to line 552, because the condition on line 560 was never false

561 action_item.last_updated_timestamp = now_ts 

562 to_update.append(action_item) 

563 else: 

564 package_name = package_name_objects.get(pkgname) 

565 if package_name: 

566 to_add.append( 

567 ActionItem(item_type=ait, package=package_name, 

568 **action_item_data) 

569 ) 

570 fields = [ 

571 'short_description', 'severity', 'extra_data', 

572 'last_updated_timestamp' 

573 ] 

574 ActionItem.objects.bulk_update(to_update, fields) 

575 ActionItem.objects.bulk_create(to_add) 

576 ActionItem.objects.delete_obsolete_items([ait], pkglist.keys()) 

577 

578 def generate_package_data(self): 

579 return [] 

580 

581 @transaction.atomic 

582 def _process_package_data(self): 

583 for key, pkglist in self.generate_package_data(): 

584 package_name_objects = {} 

585 package_data_objects = {} 

586 for p in PackageName.objects.filter(name__in=pkglist.keys()): 

587 package_name_objects[p.name] = p 

588 qs = PackageData.objects.filter(key=key, 

589 package__name__in=pkglist.keys()) 

590 for pd in qs.all(): 

591 package_data_objects[pd.package.name] = pd 

592 to_add = [] 

593 to_update = [] 

594 for pkgname, package_data_value in pkglist.items(): 

595 package_data = package_data_objects.get(pkgname) 

596 if package_data: 

597 if package_data.value != package_data_value: 597 ↛ 594line 597 didn't jump to line 594, because the condition on line 597 was never false

598 package_data.value = package_data_value 

599 to_update.append(package_data) 

600 else: 

601 package_name = package_name_objects.get(pkgname) 

602 if package_name: 

603 to_add.append( 

604 PackageData(key=key, value=package_data_value, 

605 package=package_name) 

606 ) 

607 PackageData.objects.bulk_update(to_update, ['value']) 

608 PackageData.objects.bulk_create(to_add) 

609 PackageData.objects.filter(key=key).exclude( 

610 package__name__in=pkglist.keys()).delete() 

611 

612 def execute_import_external_data(self): 

613 # Download the data and make it available to the various 

614 # generate methods 

615 url_content = get_resource_text(self.data_url, 

616 force_update=self.force_update) 

617 self.external_data = json.loads(url_content) 

618 

619 # If the data is unchanged since last run, do nothing 

620 old_checksum = self.data.get('input_checksum') 

621 checksum = hashlib.md5( 

622 url_content.encode('utf-8', 'ignore')).hexdigest() 

623 if checksum == old_checksum and not self.force_update: 

624 return 

625 self.data['input_checksum'] = checksum 

626 

627 # Process the various kind of objects that we can create 

628 self._process_action_items() 

629 self._process_package_data()