1# -*- coding: utf-8 -*- 

2 

3# Copyright 2014-2021 The Distro Tracker Developers 

4# See the COPYRIGHT file at the top-level directory of this distribution and 

5# at https://deb.li/DTAuthors 

6# 

7# This file is part of Distro Tracker. It is subject to the license terms 

8# in the LICENSE file found in the top-level directory of this 

9# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

10# including this file, may be copied, modified, propagated, or distributed 

11# except according to the terms contained in the LICENSE file. 

12 

13""" 

14Distro Tracker test infrastructure. 

15""" 

16 

17import gzip 

18import hashlib 

19import inspect 

20import io 

21import json 

22import lzma 

23import os 

24import os.path 

25import re 

26import shutil 

27import tempfile 

28 

29from bs4 import BeautifulSoup as soup 

30 

31import django.test 

32from django.conf import settings 

33from django.contrib.staticfiles.testing import StaticLiveServerTestCase 

34from django.test.signals import setting_changed 

35 

36import responses 

37 

38from distro_tracker.accounts.models import UserEmail 

39from distro_tracker.core.models import ( 

40 Architecture, 

41 BinaryPackageName, 

42 ContributorName, 

43 PackageData, 

44 PackageName, 

45 Repository, 

46 SourcePackage, 

47 SourcePackageName, 

48) 

49from distro_tracker.core.utils.compression import ( 

50 get_compressor_factory, 

51 guess_compression_method, 

52) 

53from distro_tracker.core.utils.packages import package_hashdir 

54 

55from django_email_accounts.models import User 

56 

57 

58class TempDirsMixin(object): 

59 """ 

60 Diverts all distro-tracker path settings to make them point 

61 to temporary directories while testing. 

62 """ 

63 

64 DISTRO_TRACKER_PATH_SETTINGS = { 

65 'STATIC_ROOT': 'static', 

66 'MEDIA_ROOT': 'media', 

67 'DISTRO_TRACKER_CACHE_DIRECTORY': 'cache', 

68 'DISTRO_TRACKER_KEYRING_DIRECTORY': 'keyring', 

69 'DISTRO_TRACKER_TEMPLATE_DIRECTORY': 'templates', 

70 'DISTRO_TRACKER_LOG_DIRECTORY': 'logs', 

71 'DISTRO_TRACKER_MAILDIR_DIRECTORY': 'maildir', 

72 } 

73 

74 def _backup_settings(self, name): 

75 self._settings_copy[name] = getattr(settings, name) 

76 

77 def _restore_settings(self): 

78 for key, value in self._settings_copy.items(): 

79 setattr(settings, key, value) 

80 

81 def __call__(self, result=None): 

82 """ 

83 Wrapper around __call__ to perform temporary directories setup. 

84 This means that user-defined Test Cases aren't required to 

85 include a call to super().setUp(). 

86 """ 

87 self._settings_copy = {} 

88 self.addCleanup(self._restore_settings) 

89 self._backup_settings('DISTRO_TRACKER_DATA_PATH') 

90 tempdir = tempfile.mkdtemp(prefix='distro-tracker-tests-') 

91 self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True) 

92 setattr(settings, 'DISTRO_TRACKER_DATA_PATH', tempdir) 

93 for name, dirname in self.DISTRO_TRACKER_PATH_SETTINGS.items(): 

94 self._backup_settings(name) 

95 dirname = os.path.join(tempdir, dirname) 

96 setattr(settings, name, dirname) 

97 os.mkdir(dirname) 

98 return super(TempDirsMixin, self).__call__(result) 

99 

100 

101class TestCaseHelpersMixin(object): 

102 """ 

103 Helpers method injected into distro_tracker's ``*TestCase`` objects. 

104 """ 

105 

106 def get_test_data_path(self, name): 

107 """ 

108 Returns an absolute path name of file within the tests-data 

109 subdirectory in the calling TestCase. 

110 """ 

111 return os.path.join(os.path.dirname(inspect.getabsfile(self.__class__)), 

112 'tests-data', name) 

113 

114 def add_test_template_dir(self, name='tests-templates'): 

115 template_dir = self.get_test_data_path(name) 

116 settings.TEMPLATES[0]['DIRS'].append(template_dir) 

117 setting_changed.send(sender=self.__class__, setting='TEMPLATES', 

118 value=settings.TEMPLATES, enter=True) 

119 

120 def cleanup_test_template_dir(): 

121 settings.TEMPLATES[0]['DIRS'].remove(template_dir) 

122 setting_changed.send(sender=self.__class__, setting='TEMPLATES', 

123 value=settings.TEMPLATES, enter=False) 

124 

125 self.addCleanup(cleanup_test_template_dir) 

126 

127 def get_temporary_directory(self, prefix=None, suffix=None): 

128 tempdir = tempfile.mkdtemp(prefix=prefix, suffix=suffix) 

129 self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True) 

130 

131 return tempdir 

132 

133 def mock_http_request(self, **kwargs): 

134 responses.start() 

135 self.addCleanup(responses.stop) 

136 self.addCleanup(responses.reset) 

137 

138 if kwargs: 

139 self.set_http_response(**kwargs) 

140 

141 @staticmethod 

142 def compress(data, compression='gzip'): 

143 if compression == 'gzip': 

144 return gzip.compress(data) 

145 elif compression == 'xz': 

146 return lzma.compress(data) 

147 else: 

148 raise NotImplementedError( 

149 'compress() does not support {} as ' 

150 'compression method'.format(compression)) 

151 

152 def set_http_response(self, url=None, method="GET", body=None, headers=None, 

153 status_code=200, json_data=None, compress_with=None): 

154 # Default URL is the catch-all pattern 

155 if url is None: 

156 url = re.compile(".*") 

157 

158 if headers is None: 

159 headers = {} 

160 

161 if compress_with: 

162 if json_data is not None: 

163 body = self.compress( 

164 json.dumps(json_data).encode('utf-8'), 

165 compress_with, 

166 ) 

167 # Don't forward parameter 

168 json_data = None 

169 elif body is not None: 169 ↛ 175line 169 didn't jump to line 175, because the condition on line 169 was never false

170 if isinstance(body, str): 

171 body = self.compress(body.encode("utf-8"), compress_with) 

172 else: 

173 body = self.compress(body, compress_with) 

174 

175 if body is None: 

176 body = "" 

177 

178 responses.remove(method, url) 

179 responses.add( 

180 method=method, 

181 url=url, 

182 body=body, 

183 json=json_data, 

184 status=status_code, 

185 headers=headers, 

186 ) 

187 

188 def import_key_into_keyring(self, filename): 

189 """ 

190 Imports a key from an ascii armored file located in tests-data/keys/ 

191 into Distro Tracker's keyrings/. 

192 """ 

193 import gpg 

194 

195 old = os.environ.get('GNUPGHOME', None) 

196 os.environ['GNUPGHOME'] = settings.DISTRO_TRACKER_KEYRING_DIRECTORY 

197 

198 file_path = self.get_test_data_path('keys/' + filename) 

199 keydata = gpg.Data() 

200 keydata.new_from_file(file_path) 

201 

202 with gpg.Context() as ctx: 

203 ctx.op_import(keydata) 

204 

205 if old: 205 ↛ exitline 205 didn't return from function 'import_key_into_keyring', because the condition on line 205 was never false

206 os.environ['GNUPGHOME'] = old 

207 

208 

209class DatabaseMixin(object): 

210 """ 

211 Database-related assertions injected into distro_tracker's ``*TestCase`` 

212 objects. 

213 """ 

214 

215 def assertDoesNotExist(self, obj): 

216 with self.assertRaises(obj.__class__.DoesNotExist): 

217 obj.__class__.objects.get(pk=obj.id) 

218 

219 def assertDoesExist(self, obj): 

220 try: 

221 self.assertIsNotNone(obj.__class__.objects.get(pk=obj.id)) 

222 except obj.__class__.DoesNotExist as error: 

223 raise AssertionError(error) 

224 

225 def create_source_package(self, **kwargs): 

226 """ 

227 Creates a source package and any related object requested through the 

228 keyword arguments. The following arguments are supported: 

229 - name 

230 - version 

231 - directory 

232 - dsc_file_name 

233 - maintainer (dict with 'name' and 'email') 

234 - uploaders (list of emails) 

235 - architectures (list of architectures) 

236 - binary_packages (list of package names) 

237 - repository (shorthand of a repository) 

238 - repositories (list of repositories' shorthand) 

239 - data (dict used to generate associated PackageData) 

240 

241 If the shorthand of the requested repository is 'default', then 

242 its default field will be set to True. 

243 

244 :return: the created source package 

245 :rtype: :class:`~distro_tracker.core.models.SourcePackage` 

246 """ 

247 name = kwargs.get('name', 'test-package') 

248 version = kwargs.get('version', '1') 

249 

250 fields = {} 

251 fields['source_package_name'] = \ 

252 SourcePackageName.objects.get_or_create(name=name)[0] 

253 fields['version'] = version 

254 fields['dsc_file_name'] = kwargs.get('dsc_file_name', 

255 '%s_%s.dsc' % (name, version)) 

256 fields['directory'] = kwargs.get( 

257 'directory', 'pool/main/%s/%s' % (package_hashdir(name), name)) 

258 

259 if 'maintainer' in kwargs: 

260 maintainer = kwargs['maintainer'] 

261 maintainer_email = UserEmail.objects.get_or_create( 

262 email=maintainer['email'])[0] 

263 fields['maintainer'] = ContributorName.objects.get_or_create( 

264 contributor_email=maintainer_email, 

265 name=maintainer.get('name', ''))[0] 

266 

267 srcpkg = SourcePackage.objects.create(**fields) 

268 

269 for architecture in kwargs.get('architectures', []): 

270 srcpkg.architectures.add( 

271 Architecture.objects.get_or_create(name=architecture)[0]) 

272 

273 for uploader in kwargs.get('uploaders', []): 

274 contributor = ContributorName.objects.get_or_create( 

275 contributor_email=UserEmail.objects.get_or_create( 

276 email=uploader)[0])[0] 

277 srcpkg.uploaders.add(contributor) 

278 

279 for binary in kwargs.get('binary_packages', []): 

280 srcpkg.binary_packages.add( 

281 BinaryPackageName.objects.get_or_create(name=binary)[0]) 

282 

283 if 'repository' in kwargs: 

284 kwargs.setdefault('repositories', [kwargs['repository']]) 

285 for repo_shorthand in kwargs.get('repositories', []): 

286 self.add_to_repository(srcpkg, repo_shorthand) 

287 

288 if 'data' in kwargs: 

289 self.add_package_data(srcpkg.source_package_name, **kwargs['data']) 

290 

291 srcpkg.save() 

292 return srcpkg 

293 

294 def add_to_repository(self, srcpkg, shorthand='default'): 

295 """ 

296 Add a source package to a repository. Creates the repository if it 

297 doesn't exist. 

298 

299 If the shorthand of the requested repository is 'default', then 

300 its default field will be set to True. 

301 

302 :param srcpkg: the source package to add to the repository 

303 :type srcpkg: :class:`~distro_tracker.core.models.SourcePackage` 

304 :param str shorthand: the shorthand of the repository 

305 

306 :return: the repository entry that has been created 

307 :rtype: 

308 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry` 

309 """ 

310 repository, _ = Repository.objects.get_or_create( 

311 shorthand=shorthand, 

312 defaults={ 

313 'name': 'Test repository %s' % shorthand, 

314 'uri': 'http://localhost/debian', 

315 'suite': shorthand, 

316 'codename': shorthand, 

317 'components': ['main', 'contrib', 'non-free'], 

318 'default': True if shorthand == 'default' else False, 

319 } 

320 ) 

321 return srcpkg.repository_entries.create(repository=repository, 

322 component='main') 

323 

324 def remove_from_repository(self, srcpkg, shorthand='default'): 

325 """ 

326 Remove a source package from a repository. 

327 

328 :param srcpkg: the source package to add to the repository 

329 :type srcpkg: :class:`~distro_tracker.core.models.SourcePackage` 

330 :param str shorthand: the shorthand of the repository 

331 """ 

332 return srcpkg.repository_entries.filter( 

333 repository__shorthand=shorthand).delete()[0] 

334 

335 def add_package_data(self, pkgname, **kwargs): 

336 """ 

337 Creates PackageData objects associated to the package indicated 

338 in pkgname. Each named parameter results in PackageData instance 

339 with the `key` being the name of the parameter and the `value` 

340 being the value of the named parameter. 

341 

342 :param pkgname: the name of the package to which we want to associate 

343 data 

344 :type pkgname: `str` or :class:`~distro_tracker.core.models.PackageName` 

345 """ 

346 if not isinstance(pkgname, PackageName): 

347 pkgname, _ = PackageName.objects.get_or_create(name=str(pkgname)) 

348 for key, value in kwargs.items(): 

349 PackageData.objects.create(package=pkgname, key=key, value=value) 

350 

351 @staticmethod 

352 def create_repository( 

353 codename="sid", 

354 name=None, 

355 shorthand=None, 

356 uri="http://localhost/debian", 

357 suite=None, 

358 components="main contrib non-free", 

359 default=False, 

360 optional=True, 

361 binary=False, 

362 source=True, 

363 architectures=None, 

364 ): 

365 if not name: 

366 name = "Repository %s" % codename 

367 if not shorthand: 

368 shorthand = codename[:10] 

369 if not suite: 

370 suite = codename 

371 

372 repo = Repository.objects.create( 

373 name=name, 

374 shorthand=shorthand, 

375 uri=uri, 

376 public_uri=uri, 

377 codename=codename, 

378 suite=suite, 

379 components=components, 

380 default=default, 

381 optional=optional, 

382 binary=binary, 

383 source=source, 

384 ) 

385 

386 if not architectures: 

387 architectures = ["amd64", "i386"] 

388 for archname in architectures: 

389 arch, _ = Architecture.objects.get_or_create(name=archname) 

390 repo.architectures.add(arch) 

391 

392 return repo 

393 

394 

395class SimpleTestCase(TempDirsMixin, TestCaseHelpersMixin, 

396 django.test.SimpleTestCase): 

397 pass 

398 

399 

400class TestCase(TempDirsMixin, TestCaseHelpersMixin, DatabaseMixin, 

401 django.test.TestCase): 

402 pass 

403 

404 

405@django.test.tag('transaction') 

406class TransactionTestCase(TempDirsMixin, TestCaseHelpersMixin, 

407 DatabaseMixin, django.test.TransactionTestCase): 

408 pass 

409 

410 

411class LiveServerTestCase(TempDirsMixin, TestCaseHelpersMixin, 

412 DatabaseMixin, StaticLiveServerTestCase): 

413 pass 

414 

415 

416class TemplateTestsMixin(object): 

417 """Helper methods to tests templates""" 

418 

419 @staticmethod 

420 def html_contains_link(text, link): 

421 html = soup(text, 'html.parser') 

422 for a_tag in html.findAll('a', {'href': True}): 

423 if a_tag['href'] == link: 

424 return True 

425 return False 

426 

427 def assertLinkIsInResponse(self, response, link): 

428 self.assertTrue(self.html_contains_link(response.content, link)) 

429 

430 def assertLinkIsNotInResponse(self, response, link): 

431 self.assertFalse(self.html_contains_link(response.content, link)) 

432 

433 

434class UserAuthMixin(object): 

435 """ 

436 Helpers methods to manage user authentication. 

437 One may define additional USERS before call self.setup_users() 

438 in self.setUp() 

439 """ 

440 USERS = { 

441 'john': {}, 

442 } 

443 

444 def setup_users(self, login=False): 

445 """ 

446 Creates users defined in self.USERS and use the 'login' parameter as 

447 follows: 

448 * If False: no user login 

449 * If True: login with the only user defined 

450 * If a particular username: login with the user who owns the username 

451 """ 

452 self.users = {} 

453 for username in self.USERS: 

454 user_data = self._get_user_data(username) 

455 self.users[username] = User.objects.create_user(**user_data) 

456 if login: 456 ↛ exitline 456 didn't return from function 'setup_users', because the condition on line 456 was never false

457 username = None if login is True else login 

458 self.login(username) 

459 

460 def login(self, username=None): 

461 """ 

462 Login with the user that owns the 'username' or with the only available 

463 user in self.users. If multiple users are available, you must specify 

464 the username or you will trigger a ValueError exception. 

465 """ 

466 if not username: 

467 if len(self.users) > 1: 467 ↛ 468line 467 didn't jump to line 468, because the condition on line 467 was never true

468 raise ValueError("multiple users but username not specified") 

469 username = list(self.users.keys())[0] 

470 user_data = self._get_user_data(username) 

471 self.client.login( 

472 username=user_data['main_email'], 

473 password=user_data['password'], 

474 ) 

475 self.current_user = self.users[username] 

476 return self.current_user 

477 

478 def get_user(self, username=None): 

479 if not username: 479 ↛ 480line 479 didn't jump to line 480, because the condition on line 479 was never true

480 return self.current_user 

481 return self.users[username] 

482 

483 def _get_user_data(self, username): 

484 user_data = self.USERS[username].copy() 

485 user_data.setdefault('main_email', '{}@example.com'.format(username)) 

486 user_data.setdefault('password', '{}password'.format(username)) 

487 return user_data 

488 

489 

490class AptRepositoryMixin(object): 

491 """ 

492 Helper method to mock an APT repository. 

493 """ 

494 

495 def mock_apt_repository(self, repo, **kwargs): 

496 self.mock_http_request() 

497 global_compression_suffixes = kwargs.pop("compression_suffixes", [""]) 

498 global_content = kwargs.pop("content", None) 

499 # Mock Sources and Packages files 

500 for base_filename in self._apt_repo_iter_metadata(repo): 

501 metadata_options = kwargs.get("metadata_options", {}).get( 

502 base_filename, {} 

503 ) 

504 compression_suffixes = metadata_options.get( 

505 "compression_suffixes", global_compression_suffixes 

506 ) 

507 metadata_content = metadata_options.get("content", global_content) 

508 test_content_file = metadata_options.get("test_content_file") 

509 for suffix in ("", ".bz2", ".gz", ".xz"): 

510 filename = base_filename + suffix 

511 content = metadata_content 

512 if callable(metadata_content): 

513 content = metadata_content(repo, filename) 

514 if suffix in compression_suffixes: 

515 self.mock_apt_repository_add_metadata_file( 

516 repo, 

517 filename, 

518 content=content, 

519 test_content_file=test_content_file, 

520 ) 

521 else: 

522 url = self._apt_repo_build_url(repo, filename) 

523 self.set_http_response(url, status_code=404) 

524 # Mock Release/InRelease files 

525 self.mock_apt_repository_update_release_file(repo, **kwargs) 

526 

527 @staticmethod 

528 def _apt_repo_build_url(repo, filename): 

529 return "{}/dists/{}/{}".format(repo.uri, repo.codename, filename) 

530 

531 @staticmethod 

532 def _apt_repo_iter_metadata(repo): 

533 for component in sorted(repo.components.split()): 

534 for arch in repo.architectures.all().order_by('name'): 

535 yield f"{component}/binary-{arch.name}/Packages" 

536 yield f"{component}/source/Sources" 

537 

538 def _apt_repo_init_checksums(self, repo): 

539 if not hasattr(self, '_apt_repo_checksums'): 

540 self._apt_repo_checksums = {} 

541 self._apt_repo_checksums.setdefault(repo.shorthand, {}) 

542 

543 def _apt_repo_iter_checksums(self, repo): 

544 return self._apt_repo_checksums[repo.shorthand].items() 

545 

546 def _apt_repo_store_checksums(self, repo, filename, checksums): 

547 self._apt_repo_checksums[repo.shorthand][filename] = checksums 

548 

549 def mock_apt_repository_add_metadata_file( 

550 self, repo, filename, content=None, test_content_file=None, 

551 compression="auto", **kwargs, 

552 ): 

553 self._apt_repo_init_checksums(repo) 

554 

555 # Load test content if required 

556 if test_content_file: 

557 data_path = self.get_test_data_path(test_content_file) 

558 with open(data_path, 'rb') as f: 

559 content = f.read() 

560 

561 # Generate content if required, then compress it if required 

562 if content is None: 

563 content = b"" 

564 

565 # Detect compression method 

566 if compression == "auto": 566 ↛ 569line 566 didn't jump to line 569, because the condition on line 566 was never false

567 compression = guess_compression_method(filename) 

568 

569 if compression: 

570 stream = io.BytesIO() 

571 compressor = get_compressor_factory(compression)(stream, mode="wb") 

572 compressor.write(content) 

573 compressor.close() 

574 content = stream.getvalue() 

575 

576 # Store checksums of metadata 

577 checksums = { 

578 "Size": len(content), 

579 "MD5Sum": hashlib.md5(content).hexdigest(), 

580 "SHA256": hashlib.sha256(content).hexdigest(), 

581 } 

582 self._apt_repo_store_checksums(repo, filename, checksums) 

583 

584 # Register the metadata in the http mock 

585 url = self._apt_repo_build_url(repo, filename) 

586 self.set_http_response(url, body=content) 

587 

588 def mock_apt_repository_update_release_file( 

589 self, 

590 repo, 

591 enable_inrelease=True, 

592 acquire_by_hash=True, 

593 suite=None, 

594 codename=None, 

595 architectures=None, 

596 components=None, 

597 **kwargs, 

598 ): 

599 self._apt_repo_init_checksums(repo) 

600 

601 release_url = self._apt_repo_build_url(repo, "Release") 

602 inrelease_url = self._apt_repo_build_url(repo, "InRelease") 

603 if suite is None: 603 ↛ 605line 603 didn't jump to line 605, because the condition on line 603 was never false

604 suite = repo.suite or repo.codename or "" 

605 if codename is None: 605 ↛ 607line 605 didn't jump to line 607, because the condition on line 605 was never false

606 codename = repo.codename or repo.suite or "" 

607 if architectures is None: 607 ↛ 611line 607 didn't jump to line 611, because the condition on line 607 was never false

608 architectures = " ".join([ 

609 a.name for a in repo.architectures.all().order_by('name') 

610 ]) 

611 if components is None: 611 ↛ 615line 611 didn't jump to line 615, because the condition on line 611 was never false

612 components = repo.components 

613 

614 # Build the content of the release file 

615 text = """Origin: Debian 

616Label: Debian 

617Suite: {suite} 

618Codename: {codename} 

619Architectures: {architectures} 

620Components: {components} 

621""".format( 

622 suite=suite, 

623 codename=codename, 

624 architectures=architectures, 

625 components=components, 

626 ) 

627 if acquire_by_hash: 

628 text += "Acquire-By-Hash: yes\n" 

629 for checksum in ("MD5Sum", "SHA256"): 

630 text += "{}:\n".format(checksum) 

631 for path, checksums in self._apt_repo_iter_checksums(repo): 

632 if "/by-hash/" in path: 632 ↛ 633line 632 didn't jump to line 633, because the condition on line 632 was never true

633 continue 

634 text += " {} {} {}\n".format( 

635 checksums[checksum], checksums["Size"], path 

636 ) 

637 

638 self.set_http_response(release_url, body=text) 

639 

640 if enable_inrelease: 

641 signed_text = """-----BEGIN PGP SIGNED MESSAGE----- 

642Hash: SHA256 

643 

644""" 

645 signed_text += text 

646 signed_text += """-----BEGIN PGP SIGNATURE----- 

647 

648iQIzBAEBCAAdFiEEFukLP99l7eOqfzI8BO5yN7fUU+wFAl8/gbEACgkQBO5yN7fU 

649U+y4Lw/+PDhJJaxEmZWS4dFjBSJYMTgyiEPXG6eMqDpeJNr8iIoBjcBd3bv3Gexq 

6508rS0ry9bPLy9ZZxImL0E6rB2oFU8OAqoAXXmRf5yt3x0SY/1deTjMHYr5w4kH6CB 

651ZwZnkm12jMyB9ds/ZAvG7+ou+qEb7bZ2+7IzhBlFuLNYO747sOaDjOM3RdV700qs 

652FvmSBcysOUWCAhxQNmAk/NZ585AxeKksbvSHUMczdKIRu/XN82zrTRPQhZ51eHDZ 

653mY444ytopHEA6G+3rkUagKeLGE6JnwS+amhz/A== 

654=H/pA 

655-----END PGP SIGNATURE-----""" 

656 self.set_http_response(inrelease_url, body=signed_text) 

657 else: 

658 self.set_http_response(inrelease_url, status_code=404)