Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/tests/test_backend.py
# Copyright (C) 2017-2020 The Software Heritage developers | # Copyright (C) 2017-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | import contextlib | ||||
import datetime | import datetime | ||||
import re | |||||
import smtplib | |||||
from unittest.mock import MagicMock, patch | from unittest.mock import MagicMock, patch | ||||
import attr | import attr | ||||
import psycopg2 | import psycopg2 | ||||
import pytest | import pytest | ||||
from swh.core.sentry import init_sentry | |||||
from swh.model.model import Content | from swh.model.model import Content | ||||
from swh.model.swhids import CoreSWHID | from swh.model.swhids import CoreSWHID | ||||
from swh.vault.exc import NotFoundExc | from swh.vault.exc import NotFoundExc | ||||
from swh.vault.tests.vault_testing import hash_content | from swh.vault.tests.vault_testing import hash_content | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def mock_cooking(vault_backend): | def mock_cooking(vault_backend): | ||||
▲ Show 20 Lines • Show All 185 Lines • ▼ Show 20 Lines | |||||
def test_send_all_emails(swh_vault): | def test_send_all_emails(swh_vault): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
emails = ("a@example.com", "billg@example.com", "test+42@example.org") | emails = ("a@example.com", "billg@example.com", "test+42@example.org") | ||||
for email in emails: | for email in emails: | ||||
swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) | swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) | ||||
swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") | swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") | ||||
with patch.object(swh_vault, "smtp_server") as m: | with patch.object(swh_vault, "_smtp_send") as m: | ||||
swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | ||||
sent_emails = {k[0][0] for k in m.send_message.call_args_list} | sent_emails = {k[0][0] for k in m.call_args_list} | ||||
assert {k["To"] for k in sent_emails} == set(emails) | assert {k["To"] for k in sent_emails} == set(emails) | ||||
for e in sent_emails: | for e in sent_emails: | ||||
assert "bot@softwareheritage.org" in e["From"] | assert "bot@softwareheritage.org" in e["From"] | ||||
assert TEST_TYPE in e["Subject"] | assert TEST_TYPE in e["Subject"] | ||||
assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] | assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] | ||||
assert TEST_TYPE in str(e) | assert TEST_TYPE in str(e) | ||||
assert "https://archive.softwareheritage.org/" in str(e) | assert "https://archive.softwareheritage.org/" in str(e) | ||||
assert TEST_SWHID.object_id.hex()[:5] in str(e) | assert TEST_SWHID.object_id.hex()[:5] in str(e) | ||||
assert "--\x20\n" in str(e) # Well-formated signature!!! | assert "--\x20\n" in str(e) # Well-formated signature!!! | ||||
# Check that the entries have been deleted and recalling the | # Check that the entries have been deleted and recalling the | ||||
# function does not re-send the e-mails | # function does not re-send the e-mails | ||||
m.reset_mock() | m.reset_mock() | ||||
swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | ||||
m.assert_not_called() | m.assert_not_called() | ||||
def test_send_email_error_no_smtp(swh_vault): | |||||
reports = [] | |||||
init_sentry("http://example.org", extra_kwargs={"transport": reports.append}) | |||||
emails = ("a@example.com", "billg@example.com", "test+42@example.org") | |||||
with mock_cooking(swh_vault): | |||||
for email in emails: | |||||
swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) | |||||
swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") | |||||
swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | |||||
assert len(reports) == 6 | |||||
for i, email in enumerate(emails): | |||||
# first report is the logger.error | |||||
assert reports[2 * i]["level"] == "error" | |||||
assert reports[2 * i]["logger"] == "swh.vault.backend" | |||||
reg = re.compile( | |||||
"Unable to send SMTP message 'Bundle ready: gitfast [0-9a-f]{7}' " | |||||
f"to {email.replace('+', '[+]')}: cannot connect to server" | |||||
) | |||||
assert reg.match(reports[2 * i]["logentry"]["message"]) | |||||
# second is the sentry_sdk.capture_message | |||||
assert reports[2 * i + 1]["level"] == "error" | |||||
assert reg.match(reports[2 * i + 1]["message"]) | |||||
def test_send_email_error_send_failed(swh_vault): | |||||
reports = [] | |||||
init_sentry("http://example.org", extra_kwargs={"transport": reports.append}) | |||||
emails = ("a@example.com", "billg@example.com", "test+42@example.org") | |||||
with mock_cooking(swh_vault): | |||||
for email in emails: | |||||
swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) | |||||
swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") | |||||
with patch("smtplib.SMTP") as MockSMTP: | |||||
smtp = MockSMTP.return_value | |||||
smtp.noop.return_value = [250] | |||||
smtp.send_message.side_effect = smtplib.SMTPHeloError(404, "HELO Failed") | |||||
swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | |||||
assert len(reports) == 4 | |||||
# first one is the captured exception | |||||
assert reports[0]["level"] == "error" | |||||
assert reports[0]["exception"]["values"][0]["type"] == "SMTPHeloError" | |||||
# the following 3 ones are the sentry_sdk.capture_message() calls | |||||
for i, email in enumerate(emails, start=1): | |||||
assert reports[i]["level"] == "error" | |||||
reg = re.compile( | |||||
"Unable to send SMTP message 'Bundle ready: gitfast [0-9a-f]{7}' " | |||||
f"to {email.replace('+', '[+]')}: [(]404, 'HELO Failed'[)]" | |||||
) | |||||
assert reg.match(reports[i]["message"]) | |||||
def test_available(swh_vault): | def test_available(swh_vault): | ||||
assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) | assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.create_task(TEST_TYPE, TEST_SWHID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) | assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) | ||||
swh_vault.cache.add(TEST_TYPE, TEST_SWHID, b"content") | swh_vault.cache.add(TEST_TYPE, TEST_SWHID, b"content") | ||||
▲ Show 20 Lines • Show All 77 Lines • ▼ Show 20 Lines | |||||
def test_send_failure_email(swh_vault): | def test_send_failure_email(swh_vault): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.cook(TEST_TYPE, TEST_SWHID, email="a@example.com") | swh_vault.cook(TEST_TYPE, TEST_SWHID, email="a@example.com") | ||||
swh_vault.set_status(TEST_TYPE, TEST_SWHID, "failed") | swh_vault.set_status(TEST_TYPE, TEST_SWHID, "failed") | ||||
swh_vault.set_progress(TEST_TYPE, TEST_SWHID, "test error") | swh_vault.set_progress(TEST_TYPE, TEST_SWHID, "test error") | ||||
with patch.object(swh_vault, "smtp_server") as m: | with patch.object(swh_vault, "_smtp_send") as m: | ||||
swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | ||||
e = [k[0][0] for k in m.send_message.call_args_list][0] | e = [k[0][0] for k in m.call_args_list][0] | ||||
assert e["To"] == "a@example.com" | assert e["To"] == "a@example.com" | ||||
assert "bot@softwareheritage.org" in e["From"] | assert "bot@softwareheritage.org" in e["From"] | ||||
assert TEST_TYPE in e["Subject"] | assert TEST_TYPE in e["Subject"] | ||||
assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] | assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] | ||||
assert "fail" in e["Subject"] | assert "fail" in e["Subject"] | ||||
assert TEST_TYPE in str(e) | assert TEST_TYPE in str(e) | ||||
assert TEST_SWHID.object_id.hex()[:5] in str(e) | assert TEST_SWHID.object_id.hex()[:5] in str(e) | ||||
Show All 12 Lines |