1# Copyright 2015-2018 The Distro Tracker Developers 

2# See the COPYRIGHT file at the top-level directory of this distribution and 

3# at https://deb.li/DTAuthors 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10""" 

11Module implementing the processing of incoming email messages. 

12""" 

13import asyncore 

14import email 

15import logging 

16import os 

17from datetime import timedelta 

18from itertools import chain 

19from multiprocessing import Pool 

20 

21from django.conf import settings 

22 

23import pyinotify 

24 

25import distro_tracker.mail.control 

26import distro_tracker.mail.dispatch 

27from distro_tracker.core.utils import message_from_bytes 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32class MailProcessorException(Exception): 

33 pass 

34 

35 

36class ConflictingDeliveryAddresses(MailProcessorException): 

37 """ 

38 The message contained multiple headers with possible delivery addresses 

39 for the domain defined in settings.DISTRO_TRACKER_FQDN. 

40 """ 

41 pass 

42 

43 

44class MissingDeliveryAddress(MailProcessorException): 

45 """ 

46 The message contained no header with a delivery address for the domain 

47 defined in settings.DISTRO_TRACKER_FQDN. 

48 """ 

49 pass 

50 

51 

52class InvalidDeliveryAddress(MailProcessorException): 

53 """ 

54 The message contained a delivery address for the domain defined in 

55 settings.DISTRO_TRACKER_FQDN but it did not match any known Distro Tracker 

56 service. 

57 """ 

58 pass 

59 

60 

61class MailProcessor(object): 

62 """ 

63 Takes an incoming email and do something useful out of it. 

64 

65 To this end, it must find out where the email was sent 

66 and adjust the processing depending on the role of 

67 the target address. 

68 """ 

69 

70 def __init__(self, message_or_filename): 

71 if isinstance(message_or_filename, email.message.Message): 

72 self.message = message_or_filename 

73 else: 

74 self.load_mail_from_file(message_or_filename) 

75 

76 def load_mail_from_file(self, filename): 

77 """ 

78 Load the mail to process from a file. 

79 

80 :param str filename: Path of the file to parse as mail. 

81 """ 

82 with open(filename, 'rb') as f: 

83 self.message = message_from_bytes(f.read()) 

84 

85 @staticmethod 

86 def find_delivery_address(message): 

87 """ 

88 Identify the email address the message was delivered to. 

89 

90 The message headers Delivered-To, Envelope-To, X-Original-To, and 

91 X-Envelope-To are scanned to find out an email that matches the FQDN of 

92 the Distro Tracker setup. 

93 """ 

94 addresses = [] 

95 for field in chain(message.get_all('Delivered-To', []), 

96 message.get_all('Envelope-To', []), 

97 message.get_all('X-Original-To', []), 

98 message.get_all('X-Envelope-To', [])): 

99 if field.endswith('@' + settings.DISTRO_TRACKER_FQDN): 

100 if field not in addresses: 

101 addresses.append(field) 

102 if len(addresses) > 1: 

103 raise ConflictingDeliveryAddresses() 

104 elif len(addresses) == 1: 

105 return addresses[0] 

106 

107 @staticmethod 

108 def identify_service(address): 

109 """ 

110 Identify service associated to target email and extract optional args. 

111 

112 The address has the generic form <service>+<details>@<fqdn>. 

113 """ 

114 local_part = address.split('@', 1)[0] 

115 if '+' in local_part: 

116 return local_part.split('+', 1) 

117 else: 

118 return (local_part, None) 

119 

120 @staticmethod 

121 def do_nothing(self): 

122 """Just used by unit tests to disable process()""" 

123 

124 def process(self): 

125 """ 

126 Process the message stored in self.message. 

127 

128 Find out the delivery address and identify the associated service. 

129 Then defer to handle_*() for service-specific processing. Can raise 

130 MissingDeliveryAddress and UnknownService 

131 """ 

132 addr = self.find_delivery_address(self.message) 

133 if addr is None: 

134 raise MissingDeliveryAddress() 

135 service, details = self.identify_service(addr) 

136 if service == 'dispatch': 

137 package, keyword = (details, None) 

138 if details and '_' in details: 

139 package, keyword = details.split('_', 1) 

140 self.handle_dispatch(package, keyword) 

141 elif service == 'bounces': 

142 self.handle_bounces(details) 

143 elif service == 'control': 

144 self.handle_control() 

145 elif service == 'team': 

146 self.handle_team(details) 

147 elif settings.DISTRO_TRACKER_ACCEPT_UNQUALIFIED_EMAILS: 

148 package, keyword = (addr.split('@', 1)[0], None) 

149 if package and '_' in package: 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true

150 package, keyword = package.split('_', 1) 

151 self.handle_dispatch(package, keyword) 

152 else: 

153 raise InvalidDeliveryAddress( 

154 '{} is not a valid Distro Tracker address'.format(addr)) 

155 

156 @staticmethod 

157 def build_delivery_address(service, details): 

158 local_part = service 

159 if details: 159 ↛ 161line 159 didn't jump to line 161, because the condition on line 159 was never false

160 local_part += '+' + details 

161 return '{}@{}'.format(local_part, settings.DISTRO_TRACKER_FQDN) 

162 

163 def handle_control(self): 

164 distro_tracker.mail.control.process(self.message) 

165 

166 def handle_bounces(self, details): 

167 sent_to_addr = self.build_delivery_address('bounces', details) 

168 distro_tracker.mail.dispatch.handle_bounces(sent_to_addr, self.message) 

169 

170 def handle_dispatch(self, package=None, keyword=None): 

171 distro_tracker.mail.dispatch.process(self.message, package=package, 

172 keyword=keyword) 

173 

174 def handle_team(self, team): 

175 distro_tracker.mail.dispatch.process_for_team(self.message, team) 

176 

177 

178def run_mail_processor(mail_path, log_failure=False): 

179 """ 

180 Run a :class:`MailProcessor` on a stored email. 

181 

182 :param str mail_path: path of the email 

183 :param bool log_failure: indicates whether to log any failure 

184 """ 

185 try: 

186 processor = MailProcessor(mail_path) 

187 processor.process() 

188 except Exception: 

189 if log_failure: 

190 logger.exception("Failed to process incoming mail %s", mail_path) 

191 raise 

192 

193 

194class MailQueue(object): 

195 """ 

196 A queue of mails to process. The mails are identified by their filename 

197 within `DISTRO_TRACKER_MAILDIR_DIRECTORY`. 

198 """ 

199 

200 #: The maximum number of sub-process used to process the mail queue 

201 MAX_WORKERS = 4 

202 

203 SLEEP_TIMEOUT_EMPTY = 30.0 

204 SLEEP_TIMEOUT_TASK_RUNNING = 0.010 

205 SLEEP_TIMEOUT_TASK_FINISHED = 0.0 

206 SLEEP_TIMEOUT_TASK_RUNNABLE = 0.0 

207 

208 def __init__(self): 

209 self.queue = [] 

210 self.entries = {} 

211 self.processed_count = 0 

212 

213 def add(self, identifier): 

214 """ 

215 Add a new mail in the queue. 

216 

217 :param str identifiername: Filename identifying the mail. 

218 """ 

219 if identifier in self.entries: 

220 return 

221 

222 entry = MailQueueEntry(self, identifier) 

223 self.queue.append(entry) 

224 self.entries[identifier] = entry 

225 return entry 

226 

227 def remove(self, identifier): 

228 """ 

229 Remove a mail from the queue. This does not unlink the file. 

230 

231 :param str identifier: Filename identifying the mail. 

232 """ 

233 if identifier not in self.entries: 

234 return 

235 self.queue.remove(self.entries[identifier]) 

236 self.entries.pop(identifier) 

237 self.processed_count += 1 

238 

239 @staticmethod 

240 def _get_maildir(subfolder=None): 

241 if subfolder: 

242 return os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY, 

243 subfolder, 'new') 

244 return os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY, 'new') 

245 

246 @classmethod 

247 def _get_mail_path(cls, entry, subfolder=None): 

248 return os.path.join(cls._get_maildir(subfolder), entry) 

249 

250 def initialize(self): 

251 """Scan the Maildir and fill the queue with the mails in it.""" 

252 for mail in os.listdir(self._get_maildir()): 

253 self.add(mail) 

254 

255 @property 

256 def pool(self): 

257 if getattr(self, '_pool', None): 

258 return self._pool 

259 self._pool = Pool(self.MAX_WORKERS, maxtasksperchild=100) 

260 return self._pool 

261 

262 def close_pool(self): 

263 """Wait until all worker processes are finished and destroy the pool""" 

264 if getattr(self, '_pool', None) is None: 

265 return 

266 self._pool.close() 

267 self._pool.join() 

268 self._pool = None 

269 

270 def process_queue(self): 

271 """ 

272 Iterate over messages in the queue and do whateever is appropriate. 

273 """ 

274 # Work on a snapshot of the queue as it will be modified each time 

275 # a task is finished 

276 queue = [item for item in self.queue] 

277 for entry in queue: 

278 if not entry.processing_task_started(): 

279 entry.start_processing_task() 

280 if entry.processing_task_finished(): 

281 entry.handle_processing_task_result() 

282 

283 def sleep_timeout(self): 

284 """ 

285 Return the maximum delay we can sleep before we process the queue 

286 again. 

287 """ 

288 timeout = 86400.0 

289 for entry in self.queue: 

290 next_try_time = entry.get_data('next_try_time') 

291 if entry.processing_task_finished(): 

292 timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_FINISHED) 

293 elif entry.processing_task_started(): 

294 timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_RUNNING) 

295 elif next_try_time is not None: 

296 wait_time = next_try_time - distro_tracker.core.utils.now() 

297 timeout = min(timeout, wait_time.total_seconds()) 

298 else: 

299 timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_RUNNABLE) 

300 timeout = self.SLEEP_TIMEOUT_EMPTY if not len(self.queue) else timeout 

301 return timeout 

302 

303 def process_loop(self, stop_after=None, ready_cb=None): 

304 """ 

305 Process all messages as they are delivered. Also processes pre-existing 

306 messages. This method never returns. 

307 

308 :param int stop_after: Stop the loop after having processed the given 

309 number of messages. Used mainly by unit tests. 

310 :param ready_cb: a callback executed after setup of filesystem 

311 monitoring and initial scan of the mail queue, but before the 

312 start of the loop. 

313 """ 

314 watcher = MailQueueWatcher(self) 

315 watcher.start() 

316 self.initialize() 

317 if ready_cb: 

318 ready_cb() 

319 while True: 

320 watcher.process_events(timeout=self.sleep_timeout()) 

321 self.process_queue() 

322 if stop_after is not None and self.processed_count >= stop_after: 322 ↛ 320line 322 didn't jump to line 320, because the condition on line 322 was never false

323 break 

324 

325 

326class MailQueueEntry(object): 

327 """ 

328 An entry in a :py:class:MailQueue. 

329 

330 Contains the following public attributes: 

331 

332 .. :py:attr: queue 

333 

334 The parent :py:class:MailQueue. 

335 

336 .. :py:attr: identifier 

337 

338 The entry identifier, it's the name of the file within the directory 

339 of the MailQueue. Used to uniquely identify the entry in the MailQueue. 

340 

341 .. :py:attr: path 

342 

343 The full path to the mail file. 

344 """ 

345 

346 def __init__(self, queue, identifier): 

347 self.queue = queue 

348 self.identifier = identifier 

349 self.path = os.path.join(self.queue._get_maildir(), self.identifier) 

350 self.data = { 

351 'creation_time': distro_tracker.core.utils.now(), 

352 } 

353 

354 def set_data(self, key, value): 

355 self.data[key] = value 

356 

357 def get_data(self, key): 

358 return self.data.get(key) 

359 

360 def move_to_subfolder(self, folder): 

361 """ 

362 Move an entry from the mailqueue to the given subfolder. 

363 """ 

364 new_maildir = self.queue._get_maildir(folder) 

365 if not os.path.exists(new_maildir): 365 ↛ 367line 365 didn't jump to line 367, because the condition on line 365 was never false

366 os.makedirs(new_maildir) 

367 os.rename(self.path, os.path.join(new_maildir, self.identifier)) 

368 

369 def _processed_cb(self, _): 

370 """Callback executed when a worker completes successfully""" 

371 self.queue.remove(self.identifier) 

372 if os.path.exists(self.path): 

373 os.unlink(self.path) 

374 

375 def start_processing_task(self): 

376 """ 

377 Create a MailProcessor and schedule its execution in the worker pool. 

378 """ 

379 next_try_time = self.get_data('next_try_time') 

380 log_failure = self.get_data('log_failure') 

381 now = distro_tracker.core.utils.now() 

382 if next_try_time and next_try_time > now: 

383 return 

384 

385 result = self.queue.pool.apply_async(run_mail_processor, 

386 (self.path, log_failure), 

387 callback=self._processed_cb) 

388 self.set_data('task_result', result) 

389 

390 def processing_task_started(self): 

391 """ 

392 Returns True when the entry has been fed to workers doing mail 

393 processing. Returns False otherwise. 

394 

395 :return: an indication whether the mail processing is on-going. 

396 :rtype: bool 

397 """ 

398 return self.get_data('task_result') is not None 

399 

400 def processing_task_finished(self): 

401 """ 

402 Returns True when the worker processing the mail has finished its work. 

403 Returns False otherwise, notably when the entry has not been fed to 

404 any worker yet. 

405 

406 :return: an indication whether the mail processing has finished. 

407 :rtype: bool 

408 """ 

409 if not self.processing_task_started(): 

410 return False 

411 return self.get_data('task_result').ready() 

412 

413 def handle_processing_task_result(self): 

414 """ 

415 Called with mails that have been pushed to workers but that are 

416 still in the queue. The task likely failed and we need to handle 

417 the failure smartly. 

418 

419 Mails whose task raised an exception derived from 

420 :py:class:MailProcessorException are directly moved to a "broken" 

421 subfolder and the corresponding entry is dropped from the queue. 

422 

423 Mails whose task raised other exceptions are kept around for 

424 multiple retries and after some time they are moved to a "failed" 

425 subfolder and the corresponding entry is dropped from the queue. 

426 """ 

427 task_result = self.get_data('task_result') 

428 if task_result is None: 

429 return 

430 try: 

431 task_result.get() 

432 self._processed_cb(task_result) 

433 except MailProcessorException: 

434 logger.warning('Failed processing %s', self.identifier) 

435 self.move_to_subfolder('failed') 

436 self.queue.remove(self.identifier) 

437 except Exception: 

438 if not self.schedule_next_try(): 

439 logger.warning('Failed processing %s (and stop retrying)', 

440 self.identifier) 

441 self.move_to_subfolder('broken') 

442 self.queue.remove(self.identifier) 

443 else: 

444 logger.warning('Failed processing %s (but will retry later)', 

445 self.identifier) 

446 

447 def schedule_next_try(self): 

448 """ 

449 When the mail processing failed, schedule a new try for later. 

450 Progressively increase the delay between two tries. After 5 tries, 

451 refuse to schedule a new try and return False. 

452 

453 :return: True if a new try has been scheduled, False otherwise. 

454 """ 

455 count = self.get_data('tries') or 0 

456 delays = [ 

457 timedelta(seconds=150), 

458 timedelta(seconds=300), 

459 timedelta(seconds=600), 

460 timedelta(seconds=1800), 

461 timedelta(seconds=3600), 

462 timedelta(seconds=7200), 

463 ] 

464 

465 try: 

466 delay = delays[count] 

467 except IndexError: 

468 return False 

469 

470 now = distro_tracker.core.utils.now() 

471 self.set_data('next_try_time', now + delay) 

472 self.set_data('tries', count + 1) 

473 self.set_data('task_result', None) 

474 self.set_data('log_failure', count + 1 == len(delays)) 

475 

476 return True 

477 

478 

479class MailQueueWatcher(object): 

480 """Watch a mail queue and add entries as they appear on the filesystem""" 

481 

482 class EventHandler(pyinotify.ProcessEvent): 

483 def my_init(self, queue=None): 

484 self.queue = queue 

485 

486 def process_IN_CREATE(self, event): 

487 self.queue.add(event.name) 

488 

489 def process_IN_MOVED_TO(self, event): 

490 self.queue.add(event.name) 

491 

492 def __init__(self, queue): 

493 self.queue = queue 

494 

495 def start(self): 

496 """Start watching the directory of the mail queue.""" 

497 path = self.queue._get_maildir() 

498 self.wm = pyinotify.WatchManager() 

499 event_handler = self.EventHandler(queue=self.queue) 

500 pyinotify.AsyncNotifier(self.wm, event_handler) 

501 self.wm.add_watch(path, pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO, 

502 quiet=False) 

503 

504 def process_events(self, timeout=0, count=1): 

505 """ 

506 Process all pending events since last call of the function. 

507 

508 :param float timeout: Maximum time to wait for an event to happen. 

509 :param int count: Number of processing loops to do. 

510 """ 

511 asyncore.loop(timeout=timeout, count=count)