diff --git a/swh/web/inbound_email/utils.py b/swh/web/inbound_email/utils.py new file mode 100644 --- /dev/null +++ b/swh/web/inbound_email/utils.py @@ -0,0 +1,63 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from dataclasses import dataclass +from email.headerregistry import Address +from email.message import EmailMessage +from typing import List, Optional + + +def extract_recipients(message: EmailMessage) -> List[Address]: + """Extract a list of recipients of the `message`. + + This uses the ``To`` and ``Cc`` fields. + """ + + ret = [] + + for header_name in ("to", "cc"): + for header in message.get_all(header_name, []): + ret.extend(header.addresses) + + return ret + + +@dataclass +class AddressMatch: + """Data related to a recipient match""" + + recipient: Address + """The original recipient that matched the expected address""" + extension: Optional[str] + """The parsed +-extension of the matched recipient address""" + + +def recipient_matches(message: EmailMessage, address: str) -> List[AddressMatch]: + """Check whether any of the message recipients match the given address. + + The match is case-insensitive, which is not really RFC-compliant but matches what + most people would expect. + + This function supports "+-addressing", where the local part of the email address is + appended with a `+`. + + """ + + ret = [] + + parsed_address = Address(addr_spec=address.lower()) + + for recipient in extract_recipients(message): + if recipient.domain.lower() != parsed_address.domain: + continue + + base_username, _, extension = recipient.username.partition("+") + + if base_username.lower() != parsed_address.username: + continue + + ret.append(AddressMatch(recipient=recipient, extension=extension or None)) + + return ret diff --git a/swh/web/tests/inbound_email/test_utils.py b/swh/web/tests/inbound_email/test_utils.py new file mode 100644 --- /dev/null +++ b/swh/web/tests/inbound_email/test_utils.py @@ -0,0 +1,113 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from email.headerregistry import Address +from email.message import EmailMessage + +from swh.web.inbound_email import utils + + +def test_extract_recipients(): + message = EmailMessage() + assert utils.extract_recipients(message) == [] + + message["To"] = "Test Recipient " + + assert utils.extract_recipients(message) == [ + Address(display_name="Test Recipient", addr_spec="test-recipient@example.com") + ] + + message["Cc"] = ( + "test-recipient-2@example.com, " + "Another Test Recipient " + ) + assert utils.extract_recipients(message) == [ + Address(display_name="Test Recipient", addr_spec="test-recipient@example.com"), + Address(addr_spec="test-recipient-2@example.com"), + Address( + display_name="Another Test Recipient", + addr_spec="test-recipient-3@example.com", + ), + ] + + del message["To"] + assert utils.extract_recipients(message) == [ + Address(addr_spec="test-recipient-2@example.com"), + Address( + display_name="Another Test Recipient", + addr_spec="test-recipient-3@example.com", + ), + ] + + +def test_recipient_matches(): + message = EmailMessage() + assert utils.recipient_matches(message, "match@example.com") == [] + + message = EmailMessage() + message["to"] = "nomatch@example.com" + assert utils.recipient_matches(message, "match@example.com") == [] + + message = EmailMessage() + message["to"] = "match@example.com" + assert utils.recipient_matches(message, "match@example.com") == [ + utils.AddressMatch( + recipient=Address(addr_spec="match@example.com"), extension=None + ) + ] + + message = EmailMessage() + message["to"] = "match+extension@example.com" + assert utils.recipient_matches(message, "match@example.com") == [ + utils.AddressMatch( + recipient=Address(addr_spec="match+extension@example.com"), + extension="extension", + ) + ] + + message = EmailMessage() + message["to"] = "match+weird+plussed+extension@example.com" + assert utils.recipient_matches(message, "match@example.com") == [ + utils.AddressMatch( + recipient=Address(addr_spec="match+weird+plussed+extension@example.com"), + extension="weird+plussed+extension", + ) + ] + + message = EmailMessage() + message["to"] = "nomatch@example.com" + message["cc"] = ", ".join( + ( + "match@example.com", + "match@notamatch.example.com", + "Another Match ", + ) + ) + assert utils.recipient_matches(message, "match@example.com") == [ + utils.AddressMatch( + recipient=Address(addr_spec="match@example.com"), extension=None, + ), + utils.AddressMatch( + recipient=Address( + display_name="Another Match", addr_spec="match+extension@example.com" + ), + extension="extension", + ), + ] + + +def test_recipient_matches_casemapping(): + message = EmailMessage() + message["to"] = "match@example.com" + + assert utils.recipient_matches(message, "Match@Example.Com") + assert utils.recipient_matches(message, "match@example.com") + + message = EmailMessage() + message["to"] = "Match+weirdCaseMapping@Example.Com" + + matches = utils.recipient_matches(message, "match@example.com") + assert matches + assert matches[0].extension == "weirdCaseMapping"