diff --git a/swh/web/inbound_email/__init__.py b/swh/web/inbound_email/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/swh/web/inbound_email/apps.py b/swh/web/inbound_email/apps.py new file mode 100644 index 00000000..a022295c --- /dev/null +++ b/swh/web/inbound_email/apps.py @@ -0,0 +1,11 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from django.apps import AppConfig + + +class InboundEmailConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "inbound_email" diff --git a/swh/web/inbound_email/management/commands/process_inbound_email.py b/swh/web/inbound_email/management/commands/process_inbound_email.py new file mode 100644 index 00000000..8d49670a --- /dev/null +++ b/swh/web/inbound_email/management/commands/process_inbound_email.py @@ -0,0 +1,73 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import email +import email.message +import email.policy +import logging +import sys +from typing import Callable + +import sentry_sdk + +from django.core.management.base import BaseCommand + +from swh.web.inbound_email import signals + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Process a new inbound email" + + def handle(self, *args, **options): + raw_message = sys.stdin.buffer.read() + try: + message = email.message_from_bytes(raw_message, policy=email.policy.default) + except Exception as exc: + sentry_sdk.capture_exception(exc) + self.handle_failed_message(raw_message) + # XXX make sure having logging doesn't make postfix unhappy + logger.exception("Could not convert email from bytes") + return + + responses = signals.email_received.send_robust( + sender=self.__class__, message=message + ) + + handled = False + for receiver, response in responses: + if isinstance(response, Exception): + sentry_sdk.capture_exception(response) + self.handle_failing_receiver(message, receiver) + logger.error( + "Receiver produced the following exception", exc_info=response + ) + elif response is signals.EmailProcessingStatus.FAILED: + self.handle_failing_receiver(message, receiver) + elif response is signals.EmailProcessingStatus.PROCESSED: + handled = True + + if not handled: + self.handle_unhandled_message(message) + + def handle_failed_message(self, raw_message: bytes): + # TODO: forward email as attachment for inspection + logger.error("Failed message: %s", raw_message.decode("ascii", "replace")) + + def handle_failing_receiver( + self, message: email.message.EmailMessage, receiver: Callable + ): + # TODO: forward email for inspection + logger.error( + "Failed receiver %s:%s; message: %s", + receiver.__module__, + receiver.__qualname__, + str(message), + ) + + def handle_unhandled_message(self, message: email.message.EmailMessage): + # TODO: pass email through to a fallback alias? + logger.error("Unhandled message: %s", str(message)) diff --git a/swh/web/inbound_email/signals.py b/swh/web/inbound_email/signals.py new file mode 100644 index 00000000..ffe3bb80 --- /dev/null +++ b/swh/web/inbound_email/signals.py @@ -0,0 +1,36 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from enum import Enum, auto + +import django.dispatch + +email_received = django.dispatch.Signal(providing_args=["message"]) +"""This signal is sent by the `process_inbound_email` management command. + +Arguments: + message (:class:`email.message.EmailMessage`): the inbound email message + +Signal receivers must return an :class:`EmailProcessingStatus` value so that the +management command knows if the email has been processed. + +Signal receivers will be called for all received emails and are expected to do their own +filtering (e.g. using the original destination address). + +Receivers ignoring a message must return `EmailProcessingStatus.IGNORED` to let the +management command know that the message hasn't been processed. + +""" + + +class EmailProcessingStatus(Enum): + """Return values for the email processing signal listeners""" + + PROCESSED = auto() + """The email has been successfully processed""" + FAILED = auto() + """The email has been processed, but the processing failed""" + IGNORED = auto() + """The email has been ignored (e.g. unknown recipient)""" diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py index 4f2d1f96..48c98dc9 100644 --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -1,293 +1,294 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django common settings for swh-web. """ import os import sys from typing import Any, Dict from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config swh_web_config = get_config() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = swh_web_config["secret_key"] # SECURITY WARNING: don't run with debug turned on in production! DEBUG = swh_web_config["debug"] DEBUG_PROPAGATE_EXCEPTIONS = swh_web_config["debug"] ALLOWED_HOSTS = ["127.0.0.1", "localhost"] + swh_web_config["allowed_hosts"] # Application definition INSTALLED_APPS = [ "django.contrib.admin", "django.contrib.auth", "django.contrib.contenttypes", "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", "rest_framework", "swh.web.common", + "swh.web.inbound_email", "swh.web.api", "swh.web.auth", "swh.web.browse", "swh.web.add_forge_now", "webpack_loader", "django_js_reverse", "corsheaders", ] MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "corsheaders.middleware.CorsMiddleware", "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", "swh.web.common.middlewares.ThrottlingHeadersMiddleware", "swh.web.common.middlewares.ExceptionMiddleware", ] # Compress all assets (static ones and dynamically generated html) # served by django in a local development environment context. # In a production environment, assets compression will be directly # handled by web servers like apache or nginx. if swh_web_config["serve_assets"]: MIDDLEWARE.insert(0, "django.middleware.gzip.GZipMiddleware") ROOT_URLCONF = "swh.web.urls" TEMPLATES = [ { "BACKEND": "django.template.backends.django.DjangoTemplates", "DIRS": [os.path.join(PROJECT_DIR, "../templates")], "APP_DIRS": True, "OPTIONS": { "context_processors": [ "django.template.context_processors.debug", "django.template.context_processors.request", "django.contrib.auth.context_processors.auth", "django.contrib.messages.context_processors.messages", "swh.web.common.utils.context_processor", ], "libraries": {"swh_templatetags": "swh.web.common.swh_templatetags",}, }, }, ] DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": swh_web_config.get("development_db", ""), } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator", # noqa }, {"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",}, {"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",}, {"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",}, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = "en-us" TIME_ZONE = "UTC" USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = "/static/" # static folder location when swh-web has been installed with pip STATIC_DIR = os.path.join(sys.prefix, "share/swh/web/static") if not os.path.exists(STATIC_DIR): # static folder location when developping swh-web STATIC_DIR = os.path.join(PROJECT_DIR, "../../../static") STATICFILES_DIRS = [STATIC_DIR] INTERNAL_IPS = ["127.0.0.1"] throttle_rates = {} http_requests = ["GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"] throttling = swh_web_config["throttling"] for limiter_scope, limiter_conf in throttling["scopes"].items(): if "default" in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"]["default"] # for backward compatibility else: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"] # register sub scopes specific for HTTP request types for http_request in http_requests: if http_request in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope + "_" + http_request.lower()] = limiter_conf[ "limiter_rate" ][http_request] REST_FRAMEWORK: Dict[str, Any] = { "DEFAULT_RENDERER_CLASSES": ( "rest_framework.renderers.JSONRenderer", "swh.web.api.renderers.YAMLRenderer", "rest_framework.renderers.TemplateHTMLRenderer", ), "DEFAULT_THROTTLE_CLASSES": ( "swh.web.api.throttling.SwhWebRateThrottle", "swh.web.api.throttling.SwhWebUserRateThrottle", ), "DEFAULT_THROTTLE_RATES": throttle_rates, "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", "swh.auth.django.backends.OIDCBearerTokenAuthentication", ], "EXCEPTION_HANDLER": "swh.web.api.apiresponse.error_response_handler", } LOGGING = { "version": 1, "disable_existing_loggers": False, "filters": { "require_debug_false": {"()": "django.utils.log.RequireDebugFalse",}, "require_debug_true": {"()": "django.utils.log.RequireDebugTrue",}, }, "formatters": { "request": { "format": "[%(asctime)s] [%(levelname)s] %(request)s %(status_code)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "simple": { "format": "[%(asctime)s] [%(levelname)s] %(message)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "verbose": { "format": ( "[%(asctime)s] [%(levelname)s] %(name)s.%(funcName)s:%(lineno)s " "- %(message)s" ), "datefmt": "%d/%b/%Y %H:%M:%S", }, }, "handlers": { "console": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "simple", }, "file": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "simple", }, "file_request": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "request", }, "console_verbose": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "verbose", }, "file_verbose": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "verbose", }, "null": {"class": "logging.NullHandler",}, }, "loggers": { "": { "handlers": ["console_verbose", "file_verbose"], "level": "DEBUG" if DEBUG else "WARNING", }, "django": { "handlers": ["console"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.request": { "handlers": ["file_request"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.db.backends": {"handlers": ["null"], "propagate": False}, "django.utils.autoreload": {"level": "INFO",}, "swh.core.statsd": {"level": "INFO",}, }, } WEBPACK_LOADER = { "DEFAULT": { "CACHE": False, "BUNDLE_DIR_NAME": "./", "STATS_FILE": os.path.join(STATIC_DIR, "webpack-stats.json"), "POLL_INTERVAL": 0.1, "TIMEOUT": None, "IGNORE": [".+\\.hot-update.js", ".+\\.map"], } } LOGIN_URL = "/admin/login/" LOGIN_REDIRECT_URL = "admin" SESSION_ENGINE = "django.contrib.sessions.backends.cache" CACHES = { "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}, } JS_REVERSE_JS_MINIFY = False CORS_ORIGIN_ALLOW_ALL = True CORS_URLS_REGEX = r"^/(badge|api)/.*$" AUTHENTICATION_BACKENDS = [ "django.contrib.auth.backends.ModelBackend", "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", ] SWH_AUTH_SERVER_URL = swh_web_config["keycloak"]["server_url"] SWH_AUTH_REALM_NAME = swh_web_config["keycloak"]["realm_name"] SWH_AUTH_CLIENT_ID = OIDC_SWH_WEB_CLIENT_ID SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout" diff --git a/swh/web/tests/inbound_email/test_management_command.py b/swh/web/tests/inbound_email/test_management_command.py new file mode 100644 index 00000000..075ff040 --- /dev/null +++ b/swh/web/tests/inbound_email/test_management_command.py @@ -0,0 +1,161 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import contextmanager +from dataclasses import dataclass +from email.message import EmailMessage +from io import BytesIO, StringIO +import re +import sys +from typing import Callable, Iterator +from unittest.mock import MagicMock + +import pytest + +from django.core.management import call_command +from django.dispatch import Signal + +from swh.web.inbound_email.signals import EmailProcessingStatus, email_received + + +class MockedStdin: + def __init__(self): + self.buffer = BytesIO() + + +@dataclass +class CommandReturn: + out: str + err: str + + +@contextmanager +def signal_receiver(signal: Signal, name: str = "receiver_name") -> Iterator[Callable]: + receiver = MagicMock() + receiver.configure_mock(__name__=name, __qualname__=name) + + try: + signal.connect(receiver) + yield receiver + finally: + signal.disconnect(receiver) + + +def call_process_inbound_email(stdin_data: bytes) -> CommandReturn: + orig_stdin = sys.stdin + try: + sys.stdin = MockedStdin() # type: ignore + sys.stdin.buffer.write(stdin_data) + sys.stdin.buffer.seek(0) + + out = StringIO() + err = StringIO() + + call_command("process_inbound_email", stdout=out, stderr=err) + + out.seek(0) + err.seek(0) + return CommandReturn(out=out.read(), err=err.read()) + finally: + sys.stdin = orig_stdin + + +def test_empty_stdin(caplog): + ret = call_process_inbound_email(b"") + assert ret.out == "" + assert ret.err == "" + + assert len(caplog.records) == 1 + log = caplog.records[0] + assert log.levelname == "ERROR" + assert "Unhandled message" in log.getMessage() + + +@pytest.mark.parametrize( + "return_value,err_contents", + [ + # When the email gets processed by one of the receivers, the management command + # should not emit any output. + (EmailProcessingStatus.PROCESSED, ""), + # When a receiver fails, the management command outputs a message to this effect + (EmailProcessingStatus.FAILED, "Failed receiver.*receiver_name"), + # When all receivers ignore a message, this fact is printed too + (EmailProcessingStatus.IGNORED, "Unhandled message"), + ], +) +def test_signal_receiver(return_value, err_contents, caplog): + """Check that signal receivers are properly called when running the management command. + + Check for output depending on its return value""" + message = EmailMessage() + message["to"] = "test@example.com" + message["subject"] = "Test Subject" + message.set_content("This is a test message.\n") + + with signal_receiver(email_received) as receiver: + receiver.return_value = return_value + + ret = call_process_inbound_email(bytes(message)) + assert ret.out == "" + assert ret.err == "" + output = "\n".join(record.getMessage() for record in caplog.records) + if err_contents: + assert re.match(err_contents, output) + else: + assert output == "" + + calls = receiver.call_args_list + + assert len(calls) == 1 + assert bytes(calls[0][1]["message"]) == bytes(message) + + +def test_multiple_receivers(caplog): + message = EmailMessage() + message["to"] = "test@example.com" + message["subject"] = "Test Subject" + message.set_content("This is a test message.\n") + + with signal_receiver(email_received, name="ignored") as ignored, signal_receiver( + email_received, name="processed" + ) as processed: + ignored.return_value = EmailProcessingStatus.IGNORED + processed.return_value = EmailProcessingStatus.PROCESSED + + ret = call_process_inbound_email(bytes(message)) + assert ret.out == "" + assert ret.err == "" + + assert not caplog.records + + for receiver in [ignored, processed]: + calls = receiver.call_args_list + + assert len(calls) == 1 + assert bytes(calls[0][1]["message"]) == bytes(message) + + +def test_signal_receiver_exception(caplog): + message = EmailMessage() + message["to"] = "test@example.com" + message["subject"] = "Test Subject" + message.set_content("This is a test message.\n") + + with signal_receiver(email_received, name="exception_raised") as receiver: + receiver.side_effect = ValueError("I'm broken!") + + ret = call_process_inbound_email(bytes(message)) + assert ret.out == "" + assert ret.err == "" + + output = "\n".join( + record.getMessage() + ("\n" + record.exc_text if record.exc_text else "") + for record in caplog.records + ) + + assert re.match("Failed receiver.*exception_raised", output) + assert "following exception" in output + assert "ValueError" in output + assert "I'm broken" in output