Coverage for distro_tracker/mail/processor.py: 95%
253 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
1# Copyright 2015-2018 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""
11Module implementing the processing of incoming email messages.
12"""
13import asyncore
14import email
15import logging
16import os
17from datetime import timedelta
18from itertools import chain
19from multiprocessing import Pool
21from django.conf import settings
23import pyinotify
25import distro_tracker.mail.control
26import distro_tracker.mail.dispatch
27from distro_tracker.core.utils import message_from_bytes
29logger = logging.getLogger(__name__)
32class MailProcessorException(Exception):
33 pass
36class ConflictingDeliveryAddresses(MailProcessorException):
37 """
38 The message contained multiple headers with possible delivery addresses
39 for the domain defined in settings.DISTRO_TRACKER_FQDN.
40 """
41 pass
44class MissingDeliveryAddress(MailProcessorException):
45 """
46 The message contained no header with a delivery address for the domain
47 defined in settings.DISTRO_TRACKER_FQDN.
48 """
49 pass
52class InvalidDeliveryAddress(MailProcessorException):
53 """
54 The message contained a delivery address for the domain defined in
55 settings.DISTRO_TRACKER_FQDN but it did not match any known Distro Tracker
56 service.
57 """
58 pass
61class MailProcessor(object):
62 """
63 Takes an incoming email and do something useful out of it.
65 To this end, it must find out where the email was sent
66 and adjust the processing depending on the role of
67 the target address.
68 """
70 def __init__(self, message_or_filename):
71 if isinstance(message_or_filename, email.message.Message):
72 self.message = message_or_filename
73 else:
74 self.load_mail_from_file(message_or_filename)
76 def load_mail_from_file(self, filename):
77 """
78 Load the mail to process from a file.
80 :param str filename: Path of the file to parse as mail.
81 """
82 with open(filename, 'rb') as f:
83 self.message = message_from_bytes(f.read())
85 @staticmethod
86 def find_delivery_address(message):
87 """
88 Identify the email address the message was delivered to.
90 The message headers Delivered-To, Envelope-To, X-Original-To, and
91 X-Envelope-To are scanned to find out an email that matches the FQDN of
92 the Distro Tracker setup.
93 """
94 addresses = []
95 for field in chain(message.get_all('Delivered-To', []),
96 message.get_all('Envelope-To', []),
97 message.get_all('X-Original-To', []),
98 message.get_all('X-Envelope-To', [])):
99 if field.endswith('@' + settings.DISTRO_TRACKER_FQDN):
100 if field not in addresses:
101 addresses.append(field)
102 if len(addresses) > 1:
103 raise ConflictingDeliveryAddresses()
104 elif len(addresses) == 1:
105 return addresses[0]
107 @staticmethod
108 def identify_service(address):
109 """
110 Identify service associated to target email and extract optional args.
112 The address has the generic form <service>+<details>@<fqdn>.
113 """
114 local_part = address.split('@', 1)[0]
115 if '+' in local_part:
116 return local_part.split('+', 1)
117 else:
118 return (local_part, None)
120 @staticmethod
121 def do_nothing(self):
122 """Just used by unit tests to disable process()"""
124 def process(self):
125 """
126 Process the message stored in self.message.
128 Find out the delivery address and identify the associated service.
129 Then defer to handle_*() for service-specific processing. Can raise
130 MissingDeliveryAddress and UnknownService
131 """
132 addr = self.find_delivery_address(self.message)
133 if addr is None:
134 raise MissingDeliveryAddress()
135 service, details = self.identify_service(addr)
136 if service == 'dispatch':
137 package, keyword = (details, None)
138 if details and '_' in details:
139 package, keyword = details.split('_', 1)
140 self.handle_dispatch(package, keyword)
141 elif service == 'bounces':
142 self.handle_bounces(details)
143 elif service == 'control':
144 self.handle_control()
145 elif service == 'team':
146 self.handle_team(details)
147 elif settings.DISTRO_TRACKER_ACCEPT_UNQUALIFIED_EMAILS:
148 package, keyword = (addr.split('@', 1)[0], None)
149 if package and '_' in package: 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true
150 package, keyword = package.split('_', 1)
151 self.handle_dispatch(package, keyword)
152 else:
153 raise InvalidDeliveryAddress(
154 '{} is not a valid Distro Tracker address'.format(addr))
156 @staticmethod
157 def build_delivery_address(service, details):
158 local_part = service
159 if details: 159 ↛ 161line 159 didn't jump to line 161, because the condition on line 159 was never false
160 local_part += '+' + details
161 return '{}@{}'.format(local_part, settings.DISTRO_TRACKER_FQDN)
163 def handle_control(self):
164 distro_tracker.mail.control.process(self.message)
166 def handle_bounces(self, details):
167 sent_to_addr = self.build_delivery_address('bounces', details)
168 distro_tracker.mail.dispatch.handle_bounces(sent_to_addr, self.message)
170 def handle_dispatch(self, package=None, keyword=None):
171 distro_tracker.mail.dispatch.process(self.message, package=package,
172 keyword=keyword)
174 def handle_team(self, team):
175 distro_tracker.mail.dispatch.process_for_team(self.message, team)
178def run_mail_processor(mail_path, log_failure=False):
179 """
180 Run a :class:`MailProcessor` on a stored email.
182 :param str mail_path: path of the email
183 :param bool log_failure: indicates whether to log any failure
184 """
185 try:
186 processor = MailProcessor(mail_path)
187 processor.process()
188 except Exception:
189 if log_failure:
190 logger.exception("Failed to process incoming mail %s", mail_path)
191 raise
194class MailQueue(object):
195 """
196 A queue of mails to process. The mails are identified by their filename
197 within `DISTRO_TRACKER_MAILDIR_DIRECTORY`.
198 """
200 #: The maximum number of sub-process used to process the mail queue
201 MAX_WORKERS = 4
203 SLEEP_TIMEOUT_EMPTY = 30.0
204 SLEEP_TIMEOUT_TASK_RUNNING = 0.010
205 SLEEP_TIMEOUT_TASK_FINISHED = 0.0
206 SLEEP_TIMEOUT_TASK_RUNNABLE = 0.0
208 def __init__(self):
209 self.queue = []
210 self.entries = {}
211 self.processed_count = 0
213 def add(self, identifier):
214 """
215 Add a new mail in the queue.
217 :param str identifiername: Filename identifying the mail.
218 """
219 if identifier in self.entries:
220 return
222 entry = MailQueueEntry(self, identifier)
223 self.queue.append(entry)
224 self.entries[identifier] = entry
225 return entry
227 def remove(self, identifier):
228 """
229 Remove a mail from the queue. This does not unlink the file.
231 :param str identifier: Filename identifying the mail.
232 """
233 if identifier not in self.entries:
234 return
235 self.queue.remove(self.entries[identifier])
236 self.entries.pop(identifier)
237 self.processed_count += 1
239 @staticmethod
240 def _get_maildir(subfolder=None):
241 if subfolder:
242 return os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY,
243 subfolder, 'new')
244 return os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY, 'new')
246 @classmethod
247 def _get_mail_path(cls, entry, subfolder=None):
248 return os.path.join(cls._get_maildir(subfolder), entry)
250 def initialize(self):
251 """Scan the Maildir and fill the queue with the mails in it."""
252 for mail in os.listdir(self._get_maildir()):
253 self.add(mail)
255 @property
256 def pool(self):
257 if getattr(self, '_pool', None):
258 return self._pool
259 self._pool = Pool(self.MAX_WORKERS, maxtasksperchild=100)
260 return self._pool
262 def close_pool(self):
263 """Wait until all worker processes are finished and destroy the pool"""
264 if getattr(self, '_pool', None) is None:
265 return
266 self._pool.close()
267 self._pool.join()
268 self._pool = None
270 def process_queue(self):
271 """
272 Iterate over messages in the queue and do whateever is appropriate.
273 """
274 # Work on a snapshot of the queue as it will be modified each time
275 # a task is finished
276 queue = [item for item in self.queue]
277 for entry in queue:
278 if not entry.processing_task_started():
279 entry.start_processing_task()
280 if entry.processing_task_finished():
281 entry.handle_processing_task_result()
283 def sleep_timeout(self):
284 """
285 Return the maximum delay we can sleep before we process the queue
286 again.
287 """
288 timeout = 86400.0
289 for entry in self.queue:
290 next_try_time = entry.get_data('next_try_time')
291 if entry.processing_task_finished():
292 timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_FINISHED)
293 elif entry.processing_task_started():
294 timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_RUNNING)
295 elif next_try_time is not None:
296 wait_time = next_try_time - distro_tracker.core.utils.now()
297 timeout = min(timeout, wait_time.total_seconds())
298 else:
299 timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_RUNNABLE)
300 timeout = self.SLEEP_TIMEOUT_EMPTY if not len(self.queue) else timeout
301 return timeout
303 def process_loop(self, stop_after=None, ready_cb=None):
304 """
305 Process all messages as they are delivered. Also processes pre-existing
306 messages. This method never returns.
308 :param int stop_after: Stop the loop after having processed the given
309 number of messages. Used mainly by unit tests.
310 :param ready_cb: a callback executed after setup of filesystem
311 monitoring and initial scan of the mail queue, but before the
312 start of the loop.
313 """
314 watcher = MailQueueWatcher(self)
315 watcher.start()
316 self.initialize()
317 if ready_cb:
318 ready_cb()
319 while True:
320 watcher.process_events(timeout=self.sleep_timeout())
321 self.process_queue()
322 if stop_after is not None and self.processed_count >= stop_after: 322 ↛ 319line 322 didn't jump to line 319, because the condition on line 322 was never false
323 break
326class MailQueueEntry(object):
327 """
328 An entry in a :py:class:MailQueue.
330 Contains the following public attributes:
332 .. :py:attr: queue
334 The parent :py:class:MailQueue.
336 .. :py:attr: identifier
338 The entry identifier, it's the name of the file within the directory
339 of the MailQueue. Used to uniquely identify the entry in the MailQueue.
341 .. :py:attr: path
343 The full path to the mail file.
344 """
346 def __init__(self, queue, identifier):
347 self.queue = queue
348 self.identifier = identifier
349 self.path = os.path.join(self.queue._get_maildir(), self.identifier)
350 self.data = {
351 'creation_time': distro_tracker.core.utils.now(),
352 }
354 def set_data(self, key, value):
355 self.data[key] = value
357 def get_data(self, key):
358 return self.data.get(key)
360 def move_to_subfolder(self, folder):
361 """
362 Move an entry from the mailqueue to the given subfolder.
363 """
364 new_maildir = self.queue._get_maildir(folder)
365 if not os.path.exists(new_maildir): 365 ↛ 367line 365 didn't jump to line 367, because the condition on line 365 was never false
366 os.makedirs(new_maildir)
367 os.rename(self.path, os.path.join(new_maildir, self.identifier))
369 def _processed_cb(self, _):
370 """Callback executed when a worker completes successfully"""
371 self.queue.remove(self.identifier)
372 if os.path.exists(self.path):
373 os.unlink(self.path)
375 def start_processing_task(self):
376 """
377 Create a MailProcessor and schedule its execution in the worker pool.
378 """
379 next_try_time = self.get_data('next_try_time')
380 log_failure = self.get_data('log_failure')
381 now = distro_tracker.core.utils.now()
382 if next_try_time and next_try_time > now:
383 return
385 result = self.queue.pool.apply_async(run_mail_processor,
386 (self.path, log_failure),
387 callback=self._processed_cb)
388 self.set_data('task_result', result)
390 def processing_task_started(self):
391 """
392 Returns True when the entry has been fed to workers doing mail
393 processing. Returns False otherwise.
395 :return: an indication whether the mail processing is on-going.
396 :rtype: bool
397 """
398 return self.get_data('task_result') is not None
400 def processing_task_finished(self):
401 """
402 Returns True when the worker processing the mail has finished its work.
403 Returns False otherwise, notably when the entry has not been fed to
404 any worker yet.
406 :return: an indication whether the mail processing has finished.
407 :rtype: bool
408 """
409 if not self.processing_task_started():
410 return False
411 return self.get_data('task_result').ready()
413 def handle_processing_task_result(self):
414 """
415 Called with mails that have been pushed to workers but that are
416 still in the queue. The task likely failed and we need to handle
417 the failure smartly.
419 Mails whose task raised an exception derived from
420 :py:class:MailProcessorException are directly moved to a "broken"
421 subfolder and the corresponding entry is dropped from the queue.
423 Mails whose task raised other exceptions are kept around for
424 multiple retries and after some time they are moved to a "failed"
425 subfolder and the corresponding entry is dropped from the queue.
426 """
427 task_result = self.get_data('task_result')
428 if task_result is None:
429 return
430 try:
431 task_result.get()
432 self._processed_cb(task_result)
433 except MailProcessorException:
434 logger.warning('Failed processing %s', self.identifier)
435 self.move_to_subfolder('failed')
436 self.queue.remove(self.identifier)
437 except Exception:
438 if not self.schedule_next_try():
439 logger.warning('Failed processing %s (and stop retrying)',
440 self.identifier)
441 self.move_to_subfolder('broken')
442 self.queue.remove(self.identifier)
443 else:
444 logger.warning('Failed processing %s (but will retry later)',
445 self.identifier)
447 def schedule_next_try(self):
448 """
449 When the mail processing failed, schedule a new try for later.
450 Progressively increase the delay between two tries. After 5 tries,
451 refuse to schedule a new try and return False.
453 :return: True if a new try has been scheduled, False otherwise.
454 """
455 count = self.get_data('tries') or 0
456 delays = [
457 timedelta(seconds=150),
458 timedelta(seconds=300),
459 timedelta(seconds=600),
460 timedelta(seconds=1800),
461 timedelta(seconds=3600),
462 timedelta(seconds=7200),
463 ]
465 try:
466 delay = delays[count]
467 except IndexError:
468 return False
470 now = distro_tracker.core.utils.now()
471 self.set_data('next_try_time', now + delay)
472 self.set_data('tries', count + 1)
473 self.set_data('task_result', None)
474 self.set_data('log_failure', count + 1 == len(delays))
476 return True
479class MailQueueWatcher(object):
480 """Watch a mail queue and add entries as they appear on the filesystem"""
482 class EventHandler(pyinotify.ProcessEvent):
483 def my_init(self, queue=None):
484 self.queue = queue
486 def process_IN_CREATE(self, event):
487 self.queue.add(event.name)
489 def process_IN_MOVED_TO(self, event):
490 self.queue.add(event.name)
492 def __init__(self, queue):
493 self.queue = queue
495 def start(self):
496 """Start watching the directory of the mail queue."""
497 path = self.queue._get_maildir()
498 self.wm = pyinotify.WatchManager()
499 event_handler = self.EventHandler(queue=self.queue)
500 pyinotify.AsyncNotifier(self.wm, event_handler)
501 self.wm.add_watch(path, pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO,
502 quiet=False)
504 def process_events(self, timeout=0, count=1):
505 """
506 Process all pending events since last call of the function.
508 :param float timeout: Maximum time to wait for an event to happen.
509 :param int count: Number of processing loops to do.
510 """
511 asyncore.loop(timeout=timeout, count=count)