1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

# Copyright 2013-2019 The Distro Tracker Developers 

# See the COPYRIGHT file at the top-level directory of this distribution and 

# at https://deb.li/DTAuthors 

# 

# This file is part of Distro Tracker. It is subject to the license terms 

# in the LICENSE file found in the top-level directory of this 

# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

# including this file, may be copied, modified, propagated, or distributed 

# except according to the terms contained in the LICENSE file. 

"""Various utilities for the distro-tracker project.""" 

import datetime 

import json 

import logging 

import os 

 

from django.conf import settings 

from django.core.exceptions import ValidationError 

from django.core.validators import EmailValidator 

from django.db import models 

from django.http import HttpResponse 

from django.template.loader import render_to_string 

 

import gpg 

 

from distro_tracker import vendor 

 

# Re-export some functions 

from .email_messages import extract_email_address_from_header # noqa 

from .email_messages import get_decoded_message_payload # noqa 

from .email_messages import message_from_bytes # noqa 

 

logger_input = logging.getLogger('distro_tracker.input') 

 

 

def get_or_none(model, **kwargs): 

""" 

Gets a Django Model object from the database or returns ``None`` if it 

does not exist. 

""" 

try: 

return model.objects.get(**kwargs) 

except model.DoesNotExist: 

return None 

 

 

def distro_tracker_render_to_string(template_name, context=None): 

""" 

A custom function to render a template to a string which injects extra 

distro-tracker specific information to the context, such as the name 

of the derivative. 

 

This function is necessary since Django's 

:data:`TEMPLATE_CONTEXT_PROCESSORS 

<distro_tracker.project.settings.TEMPLATE_CONTEXT_PROCESSORS> only work when 

using a :class:`RequestContext <django.template.RequestContext>`, whereas 

this function can be called independently from any HTTP request. 

""" 

from distro_tracker.core import context_processors 

if context is None: 

context = {} 

extra_context = context_processors.DISTRO_TRACKER_EXTRAS 

context.update(extra_context) 

 

return render_to_string(template_name, context) 

 

 

def render_to_json_response(response): 

""" 

Helper function creating an :class:`HttpResponse <django.http.HttpResponse>` 

by serializing the given ``response`` object to a JSON string. 

 

The resulting HTTP response has Content-Type set to application/json. 

 

:param response: The object to be serialized in the response. It must be 

serializable by the :mod:`json` module. 

:rtype: :class:`HttpResponse <django.http.HttpResponse>` 

""" 

return HttpResponse( 

json.dumps(response), 

content_type='application/json' 

) 

 

 

class PrettyPrintList(object): 

""" 

A class which wraps the built-in :class:`list` object so that when it is 

converted to a string, its contents are printed using the given 

:attr:`delimiter`. 

 

The default delimiter is a space. 

 

>>> a = PrettyPrintList([1, 2, 3]) 

>>> print(a) 

1 2 3 

>>> print(PrettyPrintList([u'one', u'2', u'3'])) 

one 2 3 

>>> print(PrettyPrintList([1, 2, 3], delimiter=', ')) 

1, 2, 3 

>>> # Still acts as a list 

>>> a == [1, 2, 3] 

True 

>>> a == ['1', '2', '3'] 

False 

""" 

def __init__(self, the_list=None, delimiter=' '): 

if the_list is None: 

self._list = [] 

else: 

self._list = the_list 

self.delimiter = delimiter 

 

def __getattr__(self, name, *args, **kwargs): 

return getattr(self._list, name) 

 

def __len__(self): 

return len(self._list) 

 

def __getitem__(self, pos): 

return self._list[pos] 

 

def __iter__(self): 

return self._list.__iter__() 

 

def __str__(self): 

return self.delimiter.join(map(str, self._list)) 

 

def __repr__(self): 

return str(self) 

 

def __eq__(self, other): 

if isinstance(other, PrettyPrintList): 

return self._list == other._list 

return self._list == other 

 

 

class SpaceDelimitedTextField(models.TextField): 

""" 

A custom Django model field which stores a list of strings. 

 

It stores the list in a :class:`TextField <django.db.models.TextField>` as a 

space delimited list. It is marshalled back to a :class:`PrettyPrintList` in 

the Python domain. 

""" 

 

description = "Stores a space delimited list of strings" 

 

def from_db_value(self, value, expression, connection): 

return self.to_python(value) 

 

def to_python(self, value): 

151 ↛ 152line 151 didn't jump to line 152, because the condition on line 151 was never true if value is None: 

return None 

 

154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true if isinstance(value, PrettyPrintList): 

return value 

156 ↛ 157line 156 didn't jump to line 157, because the condition on line 156 was never true elif isinstance(value, list): 

return PrettyPrintList(value) 

 

return PrettyPrintList(value.split()) 

 

def get_prep_value(self, value, **kwargs): 

162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true if value is None: 

return 

# Any iterable value can be converted into this type of field. 

return ' '.join(map(str, value)) 

 

def get_db_prep_value(self, value, **kwargs): 

return self.get_prep_value(value) 

 

def value_to_string(self, obj): 

value = self._get_val_from_obj(obj) 

return self.get_prep_value(value) 

 

 

#: A map of currently available VCS systems' shorthands to their names. 

VCS_SHORTHAND_TO_NAME = { 

'svn': 'Subversion', 

'git': 'Git', 

'bzr': 'Bazaar', 

'cvs': 'CVS', 

'darcs': 'Darcs', 

'hg': 'Mercurial', 

'mtn': 'Monotone', 

} 

 

 

def get_vcs_name(shorthand): 

""" 

Returns a full name for the VCS given its shorthand. 

 

If the given shorthand is unknown an empty string is returned. 

 

:param shorthand: The shorthand of a VCS for which a name is required. 

 

:rtype: string 

""" 

return VCS_SHORTHAND_TO_NAME.get(shorthand, shorthand) 

 

 

def verify_signature(content): 

""" 

The function extracts any possible signature information found in the given 

content. 

 

Uses the ``DISTRO_TRACKER_KEYRING_DIRECTORY`` setting to access the keyring. 

If this setting does not exist, no signatures can be validated. 

 

:type content: :class:`bytes` or :class:`string` 

 

:returns: Information about the signers of the content as a list or 

``None`` if there is no (valid) signature. 

:rtype: list of ``(name, email)`` pairs or ``None`` 

:type content: :class:`bytes` 

""" 

keyring_directory = getattr(settings, 'DISTRO_TRACKER_KEYRING_DIRECTORY', 

None) 

217 ↛ 219line 217 didn't jump to line 219, because the condition on line 217 was never true if not keyring_directory: 

# The vendor has not provided a keyring 

return None 

 

if content is None: 

return None 

 

if isinstance(content, str): 

content = content.encode('utf-8') 

 

os.environ['GNUPGHOME'] = keyring_directory 

signers = [] 

 

with gpg.Context() as ctx: 

 

# Try to verify the given content 

signed_data = gpg.Data() 

signed_data.new_from_mem(content) 

 

try: 

_, result = ctx.verify(signed_data) 

except gpg.errors.BadSignatures: 

return [] 

except gpg.errors.GpgError: 

return None 

 

# Extract signer information 

for signature in result.signatures: 

key_missing = bool(signature.summary & 

gpg.constants.SIGSUM_KEY_MISSING) 

 

248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true if key_missing: 

continue 

 

key = ctx.get_key(signature.fpr) 

preferred_domain = "".join( 

settings.DISTRO_TRACKER_FQDN.split(".", 1)[1:2]) 

 

selected_uid = _select_uid_in_key(key, domain=preferred_domain) 

if not selected_uid: 

selected_uid = _select_uid_in_key(key) 

 

if selected_uid: 

signers.append((selected_uid.name, selected_uid.email)) 

else: 

logger_input.warning( 

'Key %s has no UID with a valid email (name=%s email=%s)', 

signature.fpr, key.uids[0].name, key.uids[0].email) 

 

return signers 

 

 

def _select_uid_in_key(key, domain=None): 

""" 

Select the desired UID among all the available UIDs. 

""" 

selected_uid = None 

validate_email = EmailValidator() 

 

for uid in key.uids: 

277 ↛ 278line 277 didn't jump to line 278, because the condition on line 277 was never true if uid.revoked or uid.invalid: 

continue 

try: 

validate_email(uid.email) 

if domain: 

if uid.email.endswith('@' + domain): 

selected_uid = uid 

break 

else: 

selected_uid = uid 

break 

except ValidationError: 

continue 

 

return selected_uid 

 

 

def now(tz=datetime.timezone.utc): 

""" 

Returns the current timestamp in the requested timezone (UTC by default) 

and can be easily mocked out for tests. 

""" 

return datetime.datetime.now(tz) 

 

 

def get_developer_information_url(email): 

""" 

Returns developer's information url based on his/her email 

through vendor-specific function 

""" 

info_url, implemented = vendor.call( 

'get_developer_information_url', **{'developer_email': email, }) 

309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true if implemented and info_url: 

return info_url 

 

 

def add_developer_extras(general, url_only=False): 

""" 

Receives a general dict with package data and add to it more data 

regarding that package's developers 

""" 

318 ↛ 328line 318 didn't jump to line 328, because the condition on line 318 was never false if 'maintainer' in general: 

maintainer_email = general['maintainer']['email'] 

url = get_developer_information_url(maintainer_email) 

321 ↛ 322line 321 didn't jump to line 322, because the condition on line 321 was never true if url: 

general['maintainer']['developer_info_url'] = url 

if not url_only: 

extra, implemented = vendor.call( 

'get_maintainer_extra', maintainer_email, general['name']) 

general['maintainer']['extra'] = extra 

 

uploaders = general.get('uploaders', None) 

329 ↛ 330line 329 didn't jump to line 330, because the condition on line 329 was never true if uploaders: 

for uploader in uploaders: 

url = get_developer_information_url(uploader['email']) 

if url: 

uploader['developer_info_url'] = url 

if url_only: 

continue 

# Vendor specific extras. 

extra, implemented = vendor.call( 

'get_uploader_extra', uploader['email'], general['name']) 

if implemented and extra: 

uploader['extra'] = extra 

 

return general