# Copyright 2015-2018 The Distro Tracker Developers
# See the COPYRIGHT file at the top-level directory of this distribution and
# at https://deb.li/DTAuthors
#
# This file is part of Distro Tracker. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
# including this file, may be copied, modified, propagated, or distributed
# except according to the terms contained in the LICENSE file.
"""
Module implementing the processing of incoming email messages.
"""
import asyncore
import email
import logging
import os
from datetime import timedelta
from itertools import chain
from multiprocessing import Pool
from django.conf import settings
import pyinotify
import distro_tracker.mail.control
import distro_tracker.mail.dispatch
from distro_tracker.core.utils import message_from_bytes
logger = logging.getLogger(__name__)
[docs]class MailProcessorException(Exception):
pass
[docs]class ConflictingDeliveryAddresses(MailProcessorException):
"""
The message contained multiple headers with possible delivery addresses
for the domain defined in settings.DISTRO_TRACKER_FQDN.
"""
pass
[docs]class MissingDeliveryAddress(MailProcessorException):
"""
The message contained no header with a delivery address for the domain
defined in settings.DISTRO_TRACKER_FQDN.
"""
pass
[docs]class InvalidDeliveryAddress(MailProcessorException):
"""
The message contained a delivery address for the domain defined in
settings.DISTRO_TRACKER_FQDN but it did not match any known Distro Tracker
service.
"""
pass
[docs]class MailProcessor(object):
"""
Takes an incoming email and do something useful out of it.
To this end, it must find out where the email was sent
and adjust the processing depending on the role of
the target address.
"""
def __init__(self, message_or_filename):
if isinstance(message_or_filename, email.message.Message):
self.message = message_or_filename
else:
self.load_mail_from_file(message_or_filename)
[docs] def load_mail_from_file(self, filename):
"""
Load the mail to process from a file.
:param str filename: Path of the file to parse as mail.
"""
with open(filename, 'rb') as f:
self.message = message_from_bytes(f.read())
[docs] @staticmethod
def find_delivery_address(message):
"""
Identify the email address the message was delivered to.
The message headers Delivered-To, Envelope-To, X-Original-To, and
X-Envelope-To are scanned to find out an email that matches the FQDN of
the Distro Tracker setup.
"""
addresses = []
for field in chain(message.get_all('Delivered-To', []),
message.get_all('Envelope-To', []),
message.get_all('X-Original-To', []),
message.get_all('X-Envelope-To', [])):
if field.endswith('@' + settings.DISTRO_TRACKER_FQDN):
if field not in addresses:
addresses.append(field)
if len(addresses) > 1:
raise ConflictingDeliveryAddresses()
elif len(addresses) == 1:
return addresses[0]
[docs] @staticmethod
def identify_service(address):
"""
Identify service associated to target email and extract optional args.
The address has the generic form <service>+<details>@<fqdn>.
"""
local_part = address.split('@', 1)[0]
if '+' in local_part:
return local_part.split('+', 1)
else:
return (local_part, None)
[docs] @staticmethod
def do_nothing(self):
"""Just used by unit tests to disable process()"""
[docs] def process(self):
"""
Process the message stored in self.message.
Find out the delivery address and identify the associated service.
Then defer to handle_*() for service-specific processing. Can raise
MissingDeliveryAddress and UnknownService
"""
addr = self.find_delivery_address(self.message)
if addr is None:
raise MissingDeliveryAddress()
service, details = self.identify_service(addr)
if service == 'dispatch':
package, keyword = (details, None)
if details and '_' in details:
package, keyword = details.split('_', 1)
self.handle_dispatch(package, keyword)
elif service == 'bounces':
self.handle_bounces(details)
elif service == 'control':
self.handle_control()
elif service == 'team':
self.handle_team(details)
elif settings.DISTRO_TRACKER_ACCEPT_UNQUALIFIED_EMAILS:
package, keyword = (addr.split('@', 1)[0], None)
if package and '_' in package:
package, keyword = package.split('_', 1)
self.handle_dispatch(package, keyword)
else:
raise InvalidDeliveryAddress(
'{} is not a valid Distro Tracker address'.format(addr))
[docs] @staticmethod
def build_delivery_address(service, details):
local_part = service
if details:
local_part += '+' + details
return '{}@{}'.format(local_part, settings.DISTRO_TRACKER_FQDN)
[docs] def handle_control(self):
distro_tracker.mail.control.process(self.message)
[docs] def handle_bounces(self, details):
sent_to_addr = self.build_delivery_address('bounces', details)
distro_tracker.mail.dispatch.handle_bounces(sent_to_addr, self.message)
[docs] def handle_dispatch(self, package=None, keyword=None):
distro_tracker.mail.dispatch.process(self.message, package=package,
keyword=keyword)
[docs] def handle_team(self, team):
distro_tracker.mail.dispatch.process_for_team(self.message, team)
[docs]def run_mail_processor(mail_path, log_failure=False):
"""
Run a :class:`MailProcessor` on a stored email.
:param str mail_path: path of the email
:param bool log_failure: indicates whether to log any failure
"""
try:
processor = MailProcessor(mail_path)
processor.process()
except Exception:
if log_failure:
logger.exception("Failed to process incoming mail %s", mail_path)
raise
[docs]class MailQueue(object):
"""
A queue of mails to process. The mails are identified by their filename
within `DISTRO_TRACKER_MAILDIR_DIRECTORY`.
"""
#: The maximum number of sub-process used to process the mail queue
MAX_WORKERS = 4
SLEEP_TIMEOUT_EMPTY = 30.0
SLEEP_TIMEOUT_TASK_RUNNING = 0.010
SLEEP_TIMEOUT_TASK_FINISHED = 0.0
SLEEP_TIMEOUT_TASK_RUNNABLE = 0.0
def __init__(self):
self.queue = []
self.entries = {}
self.processed_count = 0
[docs] def add(self, identifier):
"""
Add a new mail in the queue.
:param str identifiername: Filename identifying the mail.
"""
if identifier in self.entries:
return
entry = MailQueueEntry(self, identifier)
self.queue.append(entry)
self.entries[identifier] = entry
return entry
[docs] def remove(self, identifier):
"""
Remove a mail from the queue. This does not unlink the file.
:param str identifier: Filename identifying the mail.
"""
if identifier not in self.entries:
return
self.queue.remove(self.entries[identifier])
self.entries.pop(identifier)
self.processed_count += 1
@staticmethod
def _get_maildir(subfolder=None):
if subfolder:
return os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY,
subfolder, 'new')
return os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY, 'new')
@classmethod
def _get_mail_path(cls, entry, subfolder=None):
return os.path.join(cls._get_maildir(subfolder), entry)
[docs] def initialize(self):
"""Scan the Maildir and fill the queue with the mails in it."""
for mail in os.listdir(self._get_maildir()):
self.add(mail)
@property
def pool(self):
if getattr(self, '_pool', None):
return self._pool
self._pool = Pool(self.MAX_WORKERS, maxtasksperchild=100)
return self._pool
[docs] def close_pool(self):
"""Wait until all worker processes are finished and destroy the pool"""
if getattr(self, '_pool', None) is None:
return
self._pool.close()
self._pool.join()
self._pool = None
[docs] def process_queue(self):
"""
Iterate over messages in the queue and do whateever is appropriate.
"""
# Work on a snapshot of the queue as it will be modified each time
# a task is finished
queue = [item for item in self.queue]
for entry in queue:
if not entry.processing_task_started():
entry.start_processing_task()
if entry.processing_task_finished():
entry.handle_processing_task_result()
[docs] def sleep_timeout(self):
"""
Return the maximum delay we can sleep before we process the queue
again.
"""
timeout = 86400.0
for entry in self.queue:
next_try_time = entry.get_data('next_try_time')
if entry.processing_task_finished():
timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_FINISHED)
elif entry.processing_task_started():
timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_RUNNING)
elif next_try_time is not None:
wait_time = next_try_time - distro_tracker.core.utils.now()
timeout = min(timeout, wait_time.total_seconds())
else:
timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_RUNNABLE)
timeout = self.SLEEP_TIMEOUT_EMPTY if not len(self.queue) else timeout
return timeout
[docs] def process_loop(self, stop_after=None, ready_cb=None):
"""
Process all messages as they are delivered. Also processes pre-existing
messages. This method never returns.
:param int stop_after: Stop the loop after having processed the given
number of messages. Used mainly by unit tests.
:param ready_cb: a callback executed after setup of filesystem
monitoring and initial scan of the mail queue, but before the
start of the loop.
"""
watcher = MailQueueWatcher(self)
watcher.start()
self.initialize()
if ready_cb:
ready_cb()
while True:
watcher.process_events(timeout=self.sleep_timeout())
self.process_queue()
if stop_after is not None and self.processed_count >= stop_after:
break
[docs]class MailQueueEntry(object):
"""
An entry in a :py:class:MailQueue.
Contains the following public attributes:
.. :py:attr: queue
The parent :py:class:MailQueue.
.. :py:attr: identifier
The entry identifier, it's the name of the file within the directory
of the MailQueue. Used to uniquely identify the entry in the MailQueue.
.. :py:attr: path
The full path to the mail file.
"""
def __init__(self, queue, identifier):
self.queue = queue
self.identifier = identifier
self.path = os.path.join(self.queue._get_maildir(), self.identifier)
self.data = {
'creation_time': distro_tracker.core.utils.now(),
}
[docs] def set_data(self, key, value):
self.data[key] = value
[docs] def get_data(self, key):
return self.data.get(key)
[docs] def move_to_subfolder(self, folder):
"""
Move an entry from the mailqueue to the given subfolder.
"""
new_maildir = self.queue._get_maildir(folder)
if not os.path.exists(new_maildir):
os.makedirs(new_maildir)
os.rename(self.path, os.path.join(new_maildir, self.identifier))
def _processed_cb(self, _):
"""Callback executed when a worker completes successfully"""
self.queue.remove(self.identifier)
if os.path.exists(self.path):
os.unlink(self.path)
[docs] def start_processing_task(self):
"""
Create a MailProcessor and schedule its execution in the worker pool.
"""
next_try_time = self.get_data('next_try_time')
log_failure = self.get_data('log_failure')
now = distro_tracker.core.utils.now()
if next_try_time and next_try_time > now:
return
result = self.queue.pool.apply_async(run_mail_processor,
(self.path, log_failure),
callback=self._processed_cb)
self.set_data('task_result', result)
[docs] def processing_task_started(self):
"""
Returns True when the entry has been fed to workers doing mail
processing. Returns False otherwise.
:return: an indication whether the mail processing is on-going.
:rtype: bool
"""
return self.get_data('task_result') is not None
[docs] def processing_task_finished(self):
"""
Returns True when the worker processing the mail has finished its work.
Returns False otherwise, notably when the entry has not been fed to
any worker yet.
:return: an indication whether the mail processing has finished.
:rtype: bool
"""
if not self.processing_task_started():
return False
return self.get_data('task_result').ready()
[docs] def handle_processing_task_result(self):
"""
Called with mails that have been pushed to workers but that are
still in the queue. The task likely failed and we need to handle
the failure smartly.
Mails whose task raised an exception derived from
:py:class:MailProcessorException are directly moved to a "broken"
subfolder and the corresponding entry is dropped from the queue.
Mails whose task raised other exceptions are kept around for
multiple retries and after some time they are moved to a "failed"
subfolder and the corresponding entry is dropped from the queue.
"""
task_result = self.get_data('task_result')
if task_result is None:
return
try:
task_result.get()
self._processed_cb(task_result)
except MailProcessorException:
logger.warning('Failed processing %s', self.identifier)
self.move_to_subfolder('failed')
self.queue.remove(self.identifier)
except Exception:
if not self.schedule_next_try():
logger.warning('Failed processing %s (and stop retrying)',
self.identifier)
self.move_to_subfolder('broken')
self.queue.remove(self.identifier)
else:
logger.warning('Failed processing %s (but will retry later)',
self.identifier)
[docs] def schedule_next_try(self):
"""
When the mail processing failed, schedule a new try for later.
Progressively increase the delay between two tries. After 5 tries,
refuse to schedule a new try and return False.
:return: True if a new try has been scheduled, False otherwise.
"""
count = self.get_data('tries') or 0
delays = [
timedelta(seconds=150),
timedelta(seconds=300),
timedelta(seconds=600),
timedelta(seconds=1800),
timedelta(seconds=3600),
timedelta(seconds=7200),
]
try:
delay = delays[count]
except IndexError:
return False
now = distro_tracker.core.utils.now()
self.set_data('next_try_time', now + delay)
self.set_data('tries', count + 1)
self.set_data('task_result', None)
self.set_data('log_failure', count + 1 == len(delays))
return True
[docs]class MailQueueWatcher(object):
"""Watch a mail queue and add entries as they appear on the filesystem"""
[docs] class EventHandler(pyinotify.ProcessEvent):
[docs] def my_init(self, queue=None):
self.queue = queue
[docs] def process_IN_CREATE(self, event):
self.queue.add(event.name)
[docs] def process_IN_MOVED_TO(self, event):
self.queue.add(event.name)
def __init__(self, queue):
self.queue = queue
[docs] def start(self):
"""Start watching the directory of the mail queue."""
path = self.queue._get_maildir()
self.wm = pyinotify.WatchManager()
event_handler = self.EventHandler(queue=self.queue)
pyinotify.AsyncNotifier(self.wm, event_handler)
self.wm.add_watch(path, pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO,
quiet=False)
[docs] def process_events(self, timeout=0, count=1):
"""
Process all pending events since last call of the function.
:param float timeout: Maximum time to wait for an event to happen.
:param int count: Number of processing loops to do.
"""
asyncore.loop(timeout=timeout, count=count)