1# -*- coding: utf-8 -*- 

2 

3# Copyright 2014-2021 The Distro Tracker Developers 

4# See the COPYRIGHT file at the top-level directory of this distribution and 

5# at https://deb.li/DTAuthors 

6# 

7# This file is part of Distro Tracker. It is subject to the license terms 

8# in the LICENSE file found in the top-level directory of this 

9# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

10# including this file, may be copied, modified, propagated, or distributed 

11# except according to the terms contained in the LICENSE file. 

12 

13""" 

14Distro Tracker test infrastructure. 

15""" 

16 

17import gzip 

18import hashlib 

19import inspect 

20import io 

21import json 

22import lzma 

23import os 

24import os.path 

25import re 

26import shutil 

27import tempfile 

28 

29from bs4 import BeautifulSoup as soup 

30 

31import django.test 

32from django.conf import settings 

33from django.contrib.staticfiles.testing import StaticLiveServerTestCase 

34from django.test.signals import setting_changed 

35 

36import responses 

37 

38from distro_tracker.accounts.models import UserEmail 

39from distro_tracker.core.models import ( 

40 Architecture, 

41 BinaryPackageName, 

42 ContributorName, 

43 PackageData, 

44 PackageName, 

45 Repository, 

46 SourcePackage, 

47 SourcePackageName, 

48) 

49from distro_tracker.core.utils.compression import ( 

50 get_compressor_factory, 

51 guess_compression_method, 

52) 

53from distro_tracker.core.utils.packages import package_hashdir 

54 

55from django_email_accounts.models import User 

56 

57 

58class TempDirsMixin(object): 

59 """ 

60 Diverts all distro-tracker path settings to make them point 

61 to temporary directories while testing. 

62 """ 

63 

64 DISTRO_TRACKER_PATH_SETTINGS = { 

65 'STATIC_ROOT': 'static', 

66 'MEDIA_ROOT': 'media', 

67 'DISTRO_TRACKER_CACHE_DIRECTORY': 'cache', 

68 'DISTRO_TRACKER_KEYRING_DIRECTORY': 'keyring', 

69 'DISTRO_TRACKER_TEMPLATE_DIRECTORY': 'templates', 

70 'DISTRO_TRACKER_LOG_DIRECTORY': 'logs', 

71 'DISTRO_TRACKER_MAILDIR_DIRECTORY': 'maildir', 

72 } 

73 

74 def _backup_settings(self, name): 

75 self._settings_copy[name] = getattr(settings, name) 

76 

77 def _restore_settings(self): 

78 for key, value in self._settings_copy.items(): 

79 setattr(settings, key, value) 

80 setting_changed.send(sender=self.__class__, setting=key, 

81 value=value, enter=False) 

82 

83 def __call__(self, result=None): 

84 """ 

85 Wrapper around __call__ to perform temporary directories setup. 

86 This means that user-defined Test Cases aren't required to 

87 include a call to super().setUp(). 

88 """ 

89 self._settings_copy = {} 

90 self.addCleanup(self._restore_settings) 

91 self._backup_settings('DISTRO_TRACKER_DATA_PATH') 

92 tempdir = tempfile.mkdtemp(prefix='distro-tracker-tests-') 

93 self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True) 

94 setattr(settings, 'DISTRO_TRACKER_DATA_PATH', tempdir) 

95 for name, dirname in self.DISTRO_TRACKER_PATH_SETTINGS.items(): 

96 self._backup_settings(name) 

97 dirname = os.path.join(tempdir, dirname) 

98 setattr(settings, name, dirname) 

99 os.mkdir(dirname) 

100 setting_changed.send(sender=self.__class__, setting=name, 

101 value=dirname, enter=True) 

102 

103 return super(TempDirsMixin, self).__call__(result) 

104 

105 

106class TestCaseHelpersMixin(object): 

107 """ 

108 Helpers method injected into distro_tracker's ``*TestCase`` objects. 

109 """ 

110 

111 def get_test_data_path(self, name): 

112 """ 

113 Returns an absolute path name of file within the tests-data 

114 subdirectory in the calling TestCase. 

115 """ 

116 return os.path.join(os.path.dirname(inspect.getabsfile(self.__class__)), 

117 'tests-data', name) 

118 

119 def add_test_template_dir(self, name='tests-templates'): 

120 template_dir = self.get_test_data_path(name) 

121 settings.TEMPLATES[0]['DIRS'].append(template_dir) 

122 setting_changed.send(sender=self.__class__, setting='TEMPLATES', 

123 value=settings.TEMPLATES, enter=True) 

124 

125 def cleanup_test_template_dir(): 

126 settings.TEMPLATES[0]['DIRS'].remove(template_dir) 

127 setting_changed.send(sender=self.__class__, setting='TEMPLATES', 

128 value=settings.TEMPLATES, enter=False) 

129 

130 self.addCleanup(cleanup_test_template_dir) 

131 

132 def get_temporary_directory(self, prefix=None, suffix=None): 

133 tempdir = tempfile.mkdtemp(prefix=prefix, suffix=suffix) 

134 self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True) 

135 

136 return tempdir 

137 

138 def mock_http_request(self, **kwargs): 

139 responses.start() 

140 self.addCleanup(responses.stop) 

141 self.addCleanup(responses.reset) 

142 

143 if kwargs: 

144 self.set_http_response(**kwargs) 

145 

146 @staticmethod 

147 def compress(data, compression='gzip'): 

148 if compression == 'gzip': 

149 return gzip.compress(data) 

150 elif compression == 'xz': 

151 return lzma.compress(data) 

152 else: 

153 raise NotImplementedError( 

154 'compress() does not support {} as ' 

155 'compression method'.format(compression)) 

156 

157 def set_http_response(self, url=None, method="GET", body=None, headers=None, 

158 status_code=200, json_data=None, compress_with=None): 

159 # Default URL is the catch-all pattern 

160 if url is None: 

161 url = re.compile(".*") 

162 

163 if headers is None: 

164 headers = {} 

165 

166 if compress_with: 

167 if json_data is not None: 

168 body = self.compress( 

169 json.dumps(json_data).encode('utf-8'), 

170 compress_with, 

171 ) 

172 # Don't forward parameter 

173 json_data = None 

174 elif body is not None: 174 ↛ 180line 174 didn't jump to line 180, because the condition on line 174 was never false

175 if isinstance(body, str): 

176 body = self.compress(body.encode("utf-8"), compress_with) 

177 else: 

178 body = self.compress(body, compress_with) 

179 

180 if body is None: 

181 body = "" 

182 

183 responses.remove(method, url) 

184 responses.add( 

185 method=method, 

186 url=url, 

187 body=body, 

188 json=json_data, 

189 status=status_code, 

190 headers=headers, 

191 ) 

192 

193 def import_key_into_keyring(self, filename): 

194 """ 

195 Imports a key from an ascii armored file located in tests-data/keys/ 

196 into Distro Tracker's keyrings/. 

197 """ 

198 import gpg 

199 

200 old = os.environ.get('GNUPGHOME', None) 

201 os.environ['GNUPGHOME'] = settings.DISTRO_TRACKER_KEYRING_DIRECTORY 

202 

203 file_path = self.get_test_data_path('keys/' + filename) 

204 keydata = gpg.Data() 

205 keydata.new_from_file(file_path) 

206 

207 with gpg.Context() as ctx: 

208 ctx.op_import(keydata) 

209 

210 if old: 210 ↛ exitline 210 didn't return from function 'import_key_into_keyring', because the condition on line 210 was never false

211 os.environ['GNUPGHOME'] = old 

212 

213 

214class DatabaseMixin(object): 

215 """ 

216 Database-related assertions injected into distro_tracker's ``*TestCase`` 

217 objects. 

218 """ 

219 

220 def assertDoesNotExist(self, obj): 

221 with self.assertRaises(obj.__class__.DoesNotExist): 

222 obj.__class__.objects.get(pk=obj.id) 

223 

224 def assertDoesExist(self, obj): 

225 try: 

226 self.assertIsNotNone(obj.__class__.objects.get(pk=obj.id)) 

227 except obj.__class__.DoesNotExist as error: 

228 raise AssertionError(error) 

229 

230 def create_source_package(self, **kwargs): 

231 """ 

232 Creates a source package and any related object requested through the 

233 keyword arguments. The following arguments are supported: 

234 - name 

235 - version 

236 - directory 

237 - dsc_file_name 

238 - maintainer (dict with 'name' and 'email') 

239 - uploaders (list of emails) 

240 - architectures (list of architectures) 

241 - binary_packages (list of package names) 

242 - repository (shorthand of a repository) 

243 - repositories (list of repositories' shorthand) 

244 - data (dict used to generate associated PackageData) 

245 

246 If the shorthand of the requested repository is 'default', then 

247 its default field will be set to True. 

248 

249 :return: the created source package 

250 :rtype: :class:`~distro_tracker.core.models.SourcePackage` 

251 """ 

252 name = kwargs.get('name', 'test-package') 

253 version = kwargs.get('version', '1') 

254 

255 fields = {} 

256 fields['source_package_name'] = \ 

257 SourcePackageName.objects.get_or_create(name=name)[0] 

258 fields['version'] = version 

259 fields['dsc_file_name'] = kwargs.get('dsc_file_name', 

260 '%s_%s.dsc' % (name, version)) 

261 fields['directory'] = kwargs.get( 

262 'directory', 'pool/main/%s/%s' % (package_hashdir(name), name)) 

263 

264 if 'maintainer' in kwargs: 

265 maintainer = kwargs['maintainer'] 

266 maintainer_email = UserEmail.objects.get_or_create( 

267 email=maintainer['email'])[0] 

268 fields['maintainer'] = ContributorName.objects.get_or_create( 

269 contributor_email=maintainer_email, 

270 name=maintainer.get('name', ''))[0] 

271 

272 srcpkg = SourcePackage.objects.create(**fields) 

273 

274 for architecture in kwargs.get('architectures', []): 

275 srcpkg.architectures.add( 

276 Architecture.objects.get_or_create(name=architecture)[0]) 

277 

278 for uploader in kwargs.get('uploaders', []): 

279 contributor = ContributorName.objects.get_or_create( 

280 contributor_email=UserEmail.objects.get_or_create( 

281 email=uploader)[0])[0] 

282 srcpkg.uploaders.add(contributor) 

283 

284 for binary in kwargs.get('binary_packages', []): 

285 srcpkg.binary_packages.add( 

286 BinaryPackageName.objects.get_or_create(name=binary)[0]) 

287 

288 if 'repository' in kwargs: 

289 kwargs.setdefault('repositories', [kwargs['repository']]) 

290 for repo_shorthand in kwargs.get('repositories', []): 

291 self.add_to_repository(srcpkg, repo_shorthand) 

292 

293 if 'data' in kwargs: 

294 self.add_package_data(srcpkg.source_package_name, **kwargs['data']) 

295 

296 srcpkg.save() 

297 return srcpkg 

298 

299 def add_to_repository(self, srcpkg, shorthand='default'): 

300 """ 

301 Add a source package to a repository. Creates the repository if it 

302 doesn't exist. 

303 

304 If the shorthand of the requested repository is 'default', then 

305 its default field will be set to True. 

306 

307 :param srcpkg: the source package to add to the repository 

308 :type srcpkg: :class:`~distro_tracker.core.models.SourcePackage` 

309 :param str shorthand: the shorthand of the repository 

310 

311 :return: the repository entry that has been created 

312 :rtype: 

313 :class:`~distro_tracker.core.models.SourcePackageRepositoryEntry` 

314 """ 

315 repository, _ = Repository.objects.get_or_create( 

316 shorthand=shorthand, 

317 defaults={ 

318 'name': 'Test repository %s' % shorthand, 

319 'uri': 'http://localhost/debian', 

320 'suite': shorthand, 

321 'codename': shorthand, 

322 'components': ['main', 'contrib', 'non-free'], 

323 'default': True if shorthand == 'default' else False, 

324 } 

325 ) 

326 return srcpkg.repository_entries.create(repository=repository, 

327 component='main') 

328 

329 def remove_from_repository(self, srcpkg, shorthand='default'): 

330 """ 

331 Remove a source package from a repository. 

332 

333 :param srcpkg: the source package to add to the repository 

334 :type srcpkg: :class:`~distro_tracker.core.models.SourcePackage` 

335 :param str shorthand: the shorthand of the repository 

336 """ 

337 return srcpkg.repository_entries.filter( 

338 repository__shorthand=shorthand).delete()[0] 

339 

340 def add_package_data(self, pkgname, **kwargs): 

341 """ 

342 Creates PackageData objects associated to the package indicated 

343 in pkgname. Each named parameter results in PackageData instance 

344 with the `key` being the name of the parameter and the `value` 

345 being the value of the named parameter. 

346 

347 :param pkgname: the name of the package to which we want to associate 

348 data 

349 :type pkgname: `str` or :class:`~distro_tracker.core.models.PackageName` 

350 """ 

351 if not isinstance(pkgname, PackageName): 

352 pkgname, _ = PackageName.objects.get_or_create(name=str(pkgname)) 

353 for key, value in kwargs.items(): 

354 PackageData.objects.create(package=pkgname, key=key, value=value) 

355 

356 @staticmethod 

357 def create_repository( 

358 codename="sid", 

359 name=None, 

360 shorthand=None, 

361 uri="http://localhost/debian", 

362 suite=None, 

363 components="main contrib non-free", 

364 default=False, 

365 optional=True, 

366 binary=False, 

367 source=True, 

368 architectures=None, 

369 ): 

370 if not name: 

371 name = "Repository %s" % codename 

372 if not shorthand: 

373 shorthand = codename[:10] 

374 if not suite: 

375 suite = codename 

376 

377 repo = Repository.objects.create( 

378 name=name, 

379 shorthand=shorthand, 

380 uri=uri, 

381 public_uri=uri, 

382 codename=codename, 

383 suite=suite, 

384 components=components, 

385 default=default, 

386 optional=optional, 

387 binary=binary, 

388 source=source, 

389 ) 

390 

391 if not architectures: 

392 architectures = ["amd64", "i386"] 

393 for archname in architectures: 

394 arch, _ = Architecture.objects.get_or_create(name=archname) 

395 repo.architectures.add(arch) 

396 

397 return repo 

398 

399 

400class SimpleTestCase(TempDirsMixin, TestCaseHelpersMixin, 

401 django.test.SimpleTestCase): 

402 pass 

403 

404 

405class TestCase(TempDirsMixin, TestCaseHelpersMixin, DatabaseMixin, 

406 django.test.TestCase): 

407 pass 

408 

409 

410@django.test.tag('transaction') 

411class TransactionTestCase(TempDirsMixin, TestCaseHelpersMixin, 

412 DatabaseMixin, django.test.TransactionTestCase): 

413 pass 

414 

415 

416class LiveServerTestCase(TempDirsMixin, TestCaseHelpersMixin, 

417 DatabaseMixin, StaticLiveServerTestCase): 

418 pass 

419 

420 

421class TemplateTestsMixin(object): 

422 """Helper methods to tests templates""" 

423 

424 @staticmethod 

425 def html_contains_link(text, link): 

426 html = soup(text, 'html.parser') 

427 for a_tag in html.findAll('a', {'href': True}): 

428 if a_tag['href'] == link: 

429 return True 

430 return False 

431 

432 def assertLinkIsInResponse(self, response, link): 

433 self.assertTrue(self.html_contains_link(response.content, link)) 

434 

435 def assertLinkIsNotInResponse(self, response, link): 

436 self.assertFalse(self.html_contains_link(response.content, link)) 

437 

438 

439class UserAuthMixin(object): 

440 """ 

441 Helpers methods to manage user authentication. 

442 One may define additional USERS before call self.setup_users() 

443 in self.setUp() 

444 """ 

445 USERS = { 

446 'john': {}, 

447 } 

448 

449 def setup_users(self, login=False): 

450 """ 

451 Creates users defined in self.USERS and use the 'login' parameter as 

452 follows: 

453 * If False: no user login 

454 * If True: login with the only user defined 

455 * If a particular username: login with the user who owns the username 

456 """ 

457 self.users = {} 

458 for username in self.USERS: 

459 user_data = self._get_user_data(username) 

460 self.users[username] = User.objects.create_user(**user_data) 

461 if login: 461 ↛ exitline 461 didn't return from function 'setup_users', because the condition on line 461 was never false

462 username = None if login is True else login 

463 self.login(username) 

464 

465 def login(self, username=None): 

466 """ 

467 Login with the user that owns the 'username' or with the only available 

468 user in self.users. If multiple users are available, you must specify 

469 the username or you will trigger a ValueError exception. 

470 """ 

471 if not username: 

472 if len(self.users) > 1: 472 ↛ 473line 472 didn't jump to line 473, because the condition on line 472 was never true

473 raise ValueError("multiple users but username not specified") 

474 username = list(self.users.keys())[0] 

475 user_data = self._get_user_data(username) 

476 self.client.login( 

477 username=user_data['main_email'], 

478 password=user_data['password'], 

479 ) 

480 self.current_user = self.users[username] 

481 return self.current_user 

482 

483 def get_user(self, username=None): 

484 if not username: 484 ↛ 485line 484 didn't jump to line 485, because the condition on line 484 was never true

485 return self.current_user 

486 return self.users[username] 

487 

488 def _get_user_data(self, username): 

489 user_data = self.USERS[username].copy() 

490 user_data.setdefault('main_email', '{}@example.com'.format(username)) 

491 user_data.setdefault('password', '{}password'.format(username)) 

492 return user_data 

493 

494 

495class AptRepositoryMixin(object): 

496 """ 

497 Helper method to mock an APT repository. 

498 """ 

499 

500 def mock_apt_repository(self, repo, **kwargs): 

501 self.mock_http_request() 

502 global_compression_suffixes = kwargs.pop("compression_suffixes", [""]) 

503 global_content = kwargs.pop("content", None) 

504 # Mock Sources and Packages files 

505 for base_filename in self._apt_repo_iter_metadata(repo): 

506 metadata_options = kwargs.get("metadata_options", {}).get( 

507 base_filename, {} 

508 ) 

509 compression_suffixes = metadata_options.get( 

510 "compression_suffixes", global_compression_suffixes 

511 ) 

512 metadata_content = metadata_options.get("content", global_content) 

513 test_content_file = metadata_options.get("test_content_file") 

514 for suffix in ("", ".bz2", ".gz", ".xz"): 

515 filename = base_filename + suffix 

516 content = metadata_content 

517 if callable(metadata_content): 

518 content = metadata_content(repo, filename) 

519 if suffix in compression_suffixes: 

520 self.mock_apt_repository_add_metadata_file( 

521 repo, 

522 filename, 

523 content=content, 

524 test_content_file=test_content_file, 

525 ) 

526 else: 

527 url = self._apt_repo_build_url(repo, filename) 

528 self.set_http_response(url, status_code=404) 

529 # Mock Release/InRelease files 

530 self.mock_apt_repository_update_release_file(repo, **kwargs) 

531 

532 @staticmethod 

533 def _apt_repo_build_url(repo, filename): 

534 return "{}/dists/{}/{}".format(repo.uri, repo.codename, filename) 

535 

536 @staticmethod 

537 def _apt_repo_iter_metadata(repo): 

538 for component in sorted(repo.components.split()): 

539 for arch in repo.architectures.all().order_by('name'): 

540 yield f"{component}/binary-{arch.name}/Packages" 

541 yield f"{component}/source/Sources" 

542 

543 def _apt_repo_init_checksums(self, repo): 

544 if not hasattr(self, '_apt_repo_checksums'): 

545 self._apt_repo_checksums = {} 

546 self._apt_repo_checksums.setdefault(repo.shorthand, {}) 

547 

548 def _apt_repo_iter_checksums(self, repo): 

549 return self._apt_repo_checksums[repo.shorthand].items() 

550 

551 def _apt_repo_store_checksums(self, repo, filename, checksums): 

552 self._apt_repo_checksums[repo.shorthand][filename] = checksums 

553 

554 def mock_apt_repository_add_metadata_file( 

555 self, repo, filename, content=None, test_content_file=None, 

556 compression="auto", **kwargs, 

557 ): 

558 self._apt_repo_init_checksums(repo) 

559 

560 # Load test content if required 

561 if test_content_file: 

562 data_path = self.get_test_data_path(test_content_file) 

563 with open(data_path, 'rb') as f: 

564 content = f.read() 

565 

566 # Generate content if required, then compress it if required 

567 if content is None: 

568 content = b"" 

569 

570 # Detect compression method 

571 if compression == "auto": 571 ↛ 574line 571 didn't jump to line 574, because the condition on line 571 was never false

572 compression = guess_compression_method(filename) 

573 

574 if compression: 

575 stream = io.BytesIO() 

576 compressor = get_compressor_factory(compression)(stream, mode="wb") 

577 compressor.write(content) 

578 compressor.close() 

579 content = stream.getvalue() 

580 

581 # Store checksums of metadata 

582 checksums = { 

583 "Size": len(content), 

584 "MD5Sum": hashlib.md5(content).hexdigest(), 

585 "SHA256": hashlib.sha256(content).hexdigest(), 

586 } 

587 self._apt_repo_store_checksums(repo, filename, checksums) 

588 

589 # Register the metadata in the http mock 

590 url = self._apt_repo_build_url(repo, filename) 

591 self.set_http_response(url, body=content) 

592 

593 def mock_apt_repository_update_release_file( 

594 self, 

595 repo, 

596 enable_inrelease=True, 

597 acquire_by_hash=True, 

598 suite=None, 

599 codename=None, 

600 architectures=None, 

601 components=None, 

602 **kwargs, 

603 ): 

604 self._apt_repo_init_checksums(repo) 

605 

606 release_url = self._apt_repo_build_url(repo, "Release") 

607 inrelease_url = self._apt_repo_build_url(repo, "InRelease") 

608 if suite is None: 608 ↛ 610line 608 didn't jump to line 610, because the condition on line 608 was never false

609 suite = repo.suite or repo.codename or "" 

610 if codename is None: 610 ↛ 612line 610 didn't jump to line 612, because the condition on line 610 was never false

611 codename = repo.codename or repo.suite or "" 

612 if architectures is None: 612 ↛ 616line 612 didn't jump to line 616, because the condition on line 612 was never false

613 architectures = " ".join([ 

614 a.name for a in repo.architectures.all().order_by('name') 

615 ]) 

616 if components is None: 616 ↛ 620line 616 didn't jump to line 620, because the condition on line 616 was never false

617 components = repo.components 

618 

619 # Build the content of the release file 

620 text = """Origin: Debian 

621Label: Debian 

622Suite: {suite} 

623Codename: {codename} 

624Architectures: {architectures} 

625Components: {components} 

626""".format( 

627 suite=suite, 

628 codename=codename, 

629 architectures=architectures, 

630 components=components, 

631 ) 

632 if acquire_by_hash: 

633 text += "Acquire-By-Hash: yes\n" 

634 for checksum in ("MD5Sum", "SHA256"): 

635 text += "{}:\n".format(checksum) 

636 for path, checksums in self._apt_repo_iter_checksums(repo): 

637 if "/by-hash/" in path: 637 ↛ 638line 637 didn't jump to line 638, because the condition on line 637 was never true

638 continue 

639 text += " {} {} {}\n".format( 

640 checksums[checksum], checksums["Size"], path 

641 ) 

642 

643 self.set_http_response(release_url, body=text) 

644 

645 if enable_inrelease: 

646 signed_text = """-----BEGIN PGP SIGNED MESSAGE----- 

647Hash: SHA256 

648 

649""" 

650 signed_text += text 

651 signed_text += """-----BEGIN PGP SIGNATURE----- 

652 

653iQIzBAEBCAAdFiEEFukLP99l7eOqfzI8BO5yN7fUU+wFAl8/gbEACgkQBO5yN7fU 

654U+y4Lw/+PDhJJaxEmZWS4dFjBSJYMTgyiEPXG6eMqDpeJNr8iIoBjcBd3bv3Gexq 

6558rS0ry9bPLy9ZZxImL0E6rB2oFU8OAqoAXXmRf5yt3x0SY/1deTjMHYr5w4kH6CB 

656ZwZnkm12jMyB9ds/ZAvG7+ou+qEb7bZ2+7IzhBlFuLNYO747sOaDjOM3RdV700qs 

657FvmSBcysOUWCAhxQNmAk/NZ585AxeKksbvSHUMczdKIRu/XN82zrTRPQhZ51eHDZ 

658mY444ytopHEA6G+3rkUagKeLGE6JnwS+amhz/A== 

659=H/pA 

660-----END PGP SIGNATURE-----""" 

661 self.set_http_response(inrelease_url, body=signed_text) 

662 else: 

663 self.set_http_response(inrelease_url, status_code=404)