Source code for distro_tracker.mail.control

# Copyright 2013 The Distro Tracker Developers
# See the COPYRIGHT file at the top-level directory of this distribution and
# at https://deb.li/DTAuthors
#
# This file is part of Distro Tracker. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
# including this file, may be copied, modified, propagated, or distributed
# except according to the terms contained in the LICENSE file.
"""
Module implementing the processing of email control messages.
"""
import logging
import re
from email.iterators import typed_subpart_iterator

from django.conf import settings
from django.core.mail import EmailMessage
from django.template.loader import render_to_string

from distro_tracker.core.utils import distro_tracker_render_to_string
from distro_tracker.core.utils import extract_email_address_from_header
from distro_tracker.core.utils import get_decoded_message_payload
from distro_tracker.core.utils.email_messages import decode_header
from distro_tracker.core.utils.email_messages import unfold_header
from distro_tracker.mail.control.commands import CommandFactory
from distro_tracker.mail.control.commands import CommandProcessor
from distro_tracker.mail.models import CommandConfirmation


DISTRO_TRACKER_CONTACT_EMAIL = settings.DISTRO_TRACKER_CONTACT_EMAIL
DISTRO_TRACKER_BOUNCES_EMAIL = settings.DISTRO_TRACKER_BOUNCES_EMAIL
DISTRO_TRACKER_CONTROL_EMAIL = settings.DISTRO_TRACKER_CONTROL_EMAIL

logger = logging.getLogger(__name__)


[docs]def send_response(original_message, message_text, recipient_email, cc=None): """ Helper function which sends an email message in response to a control message. :param original_message: The received control message. :type original_message: :py:class:`email.message.Message` or an object with an equivalent interface :param message_text: The text which should be included in the body of the response. :param cc: A list of emails which should receive a CC of the response. """ subject = unfold_header(decode_header(original_message.get('Subject', ''))) if not subject: subject = 'Your mail' message_id = unfold_header(original_message.get('Message-ID', '')) references = unfold_header(original_message.get('References', '')) if references: references += ' ' references += message_id message = EmailMessage( subject='Re: ' + subject, to=[unfold_header(original_message['From'])], cc=cc, from_email=DISTRO_TRACKER_BOUNCES_EMAIL, headers={ 'From': DISTRO_TRACKER_CONTACT_EMAIL, 'X-Loop': DISTRO_TRACKER_CONTROL_EMAIL, 'References': references, 'In-Reply-To': message_id, }, body=message_text, ) logger.info("control => %(to)s %(cc)s", { 'to': recipient_email, 'cc': " ".join(cc) if cc else "", }) message.send()
[docs]def send_plain_text_warning(original_message, logdata): """ Sends an email warning the user that the control message could not be decoded due to not being a text/plain message. :param original_message: The received control message. :type original_message: :py:class:`email.message.Message` or an object with an equivalent interface """ warning_message = render_to_string('control/email-plaintext-warning.txt') send_response(original_message, warning_message, recipient_email=logdata['from']) logger.info("control :: no plain text found in %(msgid)s", logdata)
[docs]class ConfirmationSet(object): """ A class which keeps track of all confirmations which are required during a single control process run. This is necessary in order to send the emails asking for confirmation only when all commands are processed. """ def __init__(self): self.commands = {} self.confirmation_messages = {}
[docs] def add_command(self, email, command_text, confirmation_message): """ Adds a command to the list of all commands which need to be confirmed. :param email: The email of the user the command references. :param command_text: The text of the command which needs to be confirmed. :param confirmation_message: An extra message to be included in the email when asking for confirmation of this command. This is usually an explanation of what the effect of the command is. """ self.commands.setdefault(email, []) self.confirmation_messages.setdefault(email, []) self.commands[email].append(command_text) self.confirmation_messages[email].append(confirmation_message)
def _ask_confirmation(self, email, commands, messages): """ Sends a confirmation mail to a single user. Includes all commands that the user needs to confirm. """ command_confirmation = CommandConfirmation.objects.create_for_commands( commands=commands) message = distro_tracker_render_to_string( 'control/email-confirmation-required.txt', { 'command_confirmation': command_confirmation, 'confirmation_messages': self.confirmation_messages[email], } ) subject = 'CONFIRM ' + command_confirmation.confirmation_key EmailMessage( subject=subject, to=[email], from_email=DISTRO_TRACKER_BOUNCES_EMAIL, headers={ 'From': DISTRO_TRACKER_CONTROL_EMAIL, }, body=message, ).send() logger.info("control => confirmation token sent to %s", email)
[docs] def ask_confirmation_all(self): """ Sends a confirmation mail to all users which have been registered by using :py:meth:`add_command`. """ for email, commands in self.commands.items(): self._ask_confirmation( email, commands, self.confirmation_messages[email])
[docs] def get_emails(self): """ :returns: A unique list of emails which will receive a confirmation mail since there exists at least one command which references this user's email. """ return self.commands.keys()
[docs]def process(msg): """ The function which actually processes a received command email message. :param msg: The received command email message. :type msg: ``email.message.Message`` """ email = extract_email_address_from_header(msg.get('From', '')) logdata = { 'from': email, 'msgid': msg.get('Message-ID', 'no-msgid-present@localhost'), } logger.info("control <= %(from)s %(msgid)s", logdata) if 'X-Loop' in msg and \ DISTRO_TRACKER_CONTROL_EMAIL in msg.get_all('X-Loop'): logger.info("control :: discarded %(msgid)s due to X-Loop", logdata) return # Get the first plain-text part of the message plain_text_part = next(typed_subpart_iterator(msg, 'text', 'plain'), None) if not plain_text_part: # There is no plain text in the email send_plain_text_warning(msg, logdata) return # Decode the plain text into a unicode string text = get_decoded_message_payload(plain_text_part) lines = extract_command_from_subject(msg) + text.splitlines() # Process the commands factory = CommandFactory({'email': email}) confirmation_set = ConfirmationSet() processor = CommandProcessor(factory) processor.confirmation_set = confirmation_set processor.process(lines) confirmation_set.ask_confirmation_all() # Send a response only if there were some commands processed if processor.is_success(): send_response(msg, processor.get_output(), recipient_email=email, cc=set(confirmation_set.get_emails())) else: logger.info("control :: no command processed in %(msgid)s", logdata)
[docs]def extract_command_from_subject(message): """ Returns a command found in the subject of the email. :param message: An email message. :type message: :py:class:`email.message.Message` or an object with an equivalent interface """ subject = decode_header(message.get('Subject')) if not subject: return [] match = re.match(r'(?:Re\s*:\s*)?(.*)$', subject, re.IGNORECASE) return ['# Message subject', match.group(1) if match else subject]