Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/tests/test_backend.py
# Copyright (C) 2017-2020 The Software Heritage developers | # Copyright (C) 2017-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | import contextlib | ||||
import datetime | import datetime | ||||
from unittest.mock import MagicMock, patch | from unittest.mock import MagicMock, patch | ||||
import attr | import attr | ||||
import psycopg2 | import psycopg2 | ||||
import pytest | import pytest | ||||
from swh.model import hashutil | from swh.model.identifiers import CoreSWHID | ||||
from swh.model.model import Content | |||||
from swh.vault.exc import NotFoundExc | from swh.vault.exc import NotFoundExc | ||||
from swh.vault.tests.vault_testing import hash_content | from swh.vault.tests.vault_testing import hash_content | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def mock_cooking(vault_backend): | def mock_cooking(vault_backend): | ||||
with patch.object(vault_backend, "_send_task") as mt: | with patch.object(vault_backend, "_send_task") as mt: | ||||
mt.return_value = 42 | mt.return_value = 42 | ||||
Show All 14 Lines | |||||
def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa | def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa | ||||
now = datetime.datetime.now(datetime.timezone.utc) | now = datetime.datetime.now(datetime.timezone.utc) | ||||
creation_delta_secs = (ts - now).total_seconds() | creation_delta_secs = (ts - now).total_seconds() | ||||
assert creation_delta_secs < tolerance_secs | assert creation_delta_secs < tolerance_secs | ||||
def fake_cook(backend, bundle_type, result_content, sticky=False): | def fake_cook(backend, bundle_type, result_content, sticky=False): | ||||
swhid = Content.from_data(result_content).swhid() | |||||
content, obj_id = hash_content(result_content) | content, obj_id = hash_content(result_content) | ||||
with mock_cooking(backend): | with mock_cooking(backend): | ||||
backend.create_task(bundle_type, obj_id, sticky) | backend.create_task(bundle_type, swhid, sticky) | ||||
backend.cache.add(bundle_type, obj_id, b"content") | backend.cache.add(bundle_type, swhid, b"content") | ||||
backend.set_status(bundle_type, obj_id, "done") | backend.set_status(bundle_type, swhid, "done") | ||||
return obj_id, content | return swhid, content | ||||
def fail_cook(backend, bundle_type, obj_id, failure_reason): | def fail_cook(backend, bundle_type, swhid, failure_reason): | ||||
with mock_cooking(backend): | with mock_cooking(backend): | ||||
backend.create_task(bundle_type, obj_id) | backend.create_task(bundle_type, swhid) | ||||
backend.set_status(bundle_type, obj_id, "failed") | backend.set_status(bundle_type, swhid, "failed") | ||||
backend.set_progress(bundle_type, obj_id, failure_reason) | backend.set_progress(bundle_type, swhid, failure_reason) | ||||
TEST_TYPE = "revision_gitfast" | TEST_TYPE = "gitfast" | ||||
TEST_HEX_ID = "4a4b9771542143cf070386f86b4b92d42966bdbc" | TEST_SWHID = CoreSWHID.from_string("swh:1:rev:4a4b9771542143cf070386f86b4b92d42966bdbc") | ||||
TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) | |||||
TEST_PROGRESS = ( | TEST_PROGRESS = ( | ||||
"Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} " | "Mr. White, You're telling me you're cooking again? \N{ASTONISHED FACE} " | ||||
) | ) | ||||
TEST_EMAIL = "ouiche@lorraine.fr" | TEST_EMAIL = "ouiche@lorraine.fr" | ||||
@pytest.fixture | @pytest.fixture | ||||
def swh_vault(swh_vault, sample_data): | def swh_vault(swh_vault, sample_data): | ||||
# make the vault's storage consistent with test data | # make the vault's storage consistent with test data | ||||
revision = attr.evolve(sample_data.revision, id=TEST_OBJ_ID) | revision = attr.evolve(sample_data.revision, id=TEST_SWHID.object_id) | ||||
swh_vault.storage.revision_add([revision]) | swh_vault.storage.revision_add([revision]) | ||||
return swh_vault | return swh_vault | ||||
def test_create_task_simple(swh_vault): | def test_create_task_simple(swh_vault): | ||||
with mock_cooking(swh_vault) as m: | with mock_cooking(swh_vault) as m: | ||||
swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
m["get_cooker_cls"].assert_called_once_with(TEST_TYPE) | m["get_cooker_cls"].assert_called_once_with(TEST_TYPE, TEST_SWHID.object_type) | ||||
args = m["cooker_cls"].call_args[0] | args = m["cooker_cls"].call_args[0] | ||||
assert args[0] == TEST_TYPE | assert args[0] == TEST_SWHID | ||||
assert args[1] == TEST_HEX_ID | |||||
assert m["cooker"].check_exists.call_count == 1 | assert m["cooker"].check_exists.call_count == 1 | ||||
assert m["_send_task"].call_count == 1 | assert m["_send_task"].call_count == 1 | ||||
args = m["_send_task"].call_args[0] | args = m["_send_task"].call_args[0] | ||||
assert args[0] == TEST_TYPE | assert args[0] == TEST_TYPE | ||||
assert args[1] == TEST_HEX_ID | assert args[1] == TEST_SWHID | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["object_id"] == TEST_HEX_ID | assert info["swhid"] == TEST_SWHID | ||||
assert info["type"] == TEST_TYPE | assert info["type"] == TEST_TYPE | ||||
assert info["task_status"] == "new" | assert info["task_status"] == "new" | ||||
assert info["task_id"] == 42 | assert info["task_id"] == 42 | ||||
assertTimestampAlmostNow(info["ts_created"]) | assertTimestampAlmostNow(info["ts_created"]) | ||||
assert info["ts_done"] is None | assert info["ts_done"] is None | ||||
assert info["progress_msg"] is None | assert info["progress_msg"] is None | ||||
def test_create_fail_duplicate_task(swh_vault): | def test_create_fail_duplicate_task(swh_vault): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
with pytest.raises(psycopg2.IntegrityError): | with pytest.raises(psycopg2.IntegrityError): | ||||
swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
def test_create_fail_nonexisting_object(swh_vault): | def test_create_fail_nonexisting_object(swh_vault): | ||||
with mock_cooking(swh_vault) as m: | with mock_cooking(swh_vault) as m: | ||||
m["cooker"].check_exists.side_effect = ValueError("Nothing here.") | m["cooker"].check_exists.side_effect = ValueError("Nothing here.") | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
def test_create_set_progress(swh_vault): | def test_create_set_progress(swh_vault): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["progress_msg"] is None | assert info["progress_msg"] is None | ||||
swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, TEST_PROGRESS) | swh_vault.set_progress(TEST_TYPE, TEST_SWHID, TEST_PROGRESS) | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["progress_msg"] == TEST_PROGRESS | assert info["progress_msg"] == TEST_PROGRESS | ||||
def test_create_set_status(swh_vault): | def test_create_set_status(swh_vault): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["task_status"] == "new" | assert info["task_status"] == "new" | ||||
assert info["ts_done"] is None | assert info["ts_done"] is None | ||||
swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "pending") | swh_vault.set_status(TEST_TYPE, TEST_SWHID, "pending") | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["task_status"] == "pending" | assert info["task_status"] == "pending" | ||||
assert info["ts_done"] is None | assert info["ts_done"] is None | ||||
swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") | swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["task_status"] == "done" | assert info["task_status"] == "done" | ||||
assertTimestampAlmostNow(info["ts_done"]) | assertTimestampAlmostNow(info["ts_done"]) | ||||
def test_create_update_access_ts(swh_vault): | def test_create_update_access_ts(swh_vault): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
access_ts_1 = info["ts_last_access"] | access_ts_1 = info["ts_last_access"] | ||||
assertTimestampAlmostNow(access_ts_1) | assertTimestampAlmostNow(access_ts_1) | ||||
swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) | swh_vault.update_access_ts(TEST_TYPE, TEST_SWHID) | ||||
info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
access_ts_2 = info["ts_last_access"] | access_ts_2 = info["ts_last_access"] | ||||
assertTimestampAlmostNow(access_ts_2) | assertTimestampAlmostNow(access_ts_2) | ||||
swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) | swh_vault.update_access_ts(TEST_TYPE, TEST_SWHID) | ||||
info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
access_ts_3 = info["ts_last_access"] | access_ts_3 = info["ts_last_access"] | ||||
assertTimestampAlmostNow(access_ts_3) | assertTimestampAlmostNow(access_ts_3) | ||||
assert access_ts_1 < access_ts_2 | assert access_ts_1 < access_ts_2 | ||||
assert access_ts_2 < access_ts_3 | assert access_ts_2 < access_ts_3 | ||||
def test_cook_idempotent(swh_vault, sample_data): | def test_cook_idempotent(swh_vault, sample_data): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
info1 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) | info1 = swh_vault.cook(TEST_TYPE, TEST_SWHID) | ||||
info2 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) | info2 = swh_vault.cook(TEST_TYPE, TEST_SWHID) | ||||
info3 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) | info3 = swh_vault.cook(TEST_TYPE, TEST_SWHID) | ||||
assert info1 == info2 | assert info1 == info2 | ||||
assert info1 == info3 | assert info1 == info3 | ||||
def test_cook_email_pending_done(swh_vault): | def test_cook_email_pending_done(swh_vault): | ||||
with mock_cooking(swh_vault), patch.object( | with mock_cooking(swh_vault), patch.object( | ||||
swh_vault, "add_notif_email" | swh_vault, "add_notif_email" | ||||
) as madd, patch.object(swh_vault, "send_notification") as msend: | ) as madd, patch.object(swh_vault, "send_notification") as msend: | ||||
swh_vault.cook(TEST_TYPE, TEST_HEX_ID) | swh_vault.cook(TEST_TYPE, TEST_SWHID) | ||||
madd.assert_not_called() | madd.assert_not_called() | ||||
msend.assert_not_called() | msend.assert_not_called() | ||||
madd.reset_mock() | madd.reset_mock() | ||||
msend.reset_mock() | msend.reset_mock() | ||||
swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) | swh_vault.cook(TEST_TYPE, TEST_SWHID, email=TEST_EMAIL) | ||||
madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) | madd.assert_called_once_with(TEST_TYPE, TEST_SWHID, TEST_EMAIL) | ||||
msend.assert_not_called() | msend.assert_not_called() | ||||
madd.reset_mock() | madd.reset_mock() | ||||
msend.reset_mock() | msend.reset_mock() | ||||
swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") | swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") | ||||
swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) | swh_vault.cook(TEST_TYPE, TEST_SWHID, email=TEST_EMAIL) | ||||
msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_HEX_ID, "done") | msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_SWHID, "done") | ||||
madd.assert_not_called() | madd.assert_not_called() | ||||
def test_send_all_emails(swh_vault): | def test_send_all_emails(swh_vault): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
emails = ("a@example.com", "billg@example.com", "test+42@example.org") | emails = ("a@example.com", "billg@example.com", "test+42@example.org") | ||||
for email in emails: | for email in emails: | ||||
swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=email) | swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) | ||||
swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") | swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") | ||||
with patch.object(swh_vault, "smtp_server") as m: | with patch.object(swh_vault, "smtp_server") as m: | ||||
swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) | swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | ||||
sent_emails = {k[0][0] for k in m.send_message.call_args_list} | sent_emails = {k[0][0] for k in m.send_message.call_args_list} | ||||
assert {k["To"] for k in sent_emails} == set(emails) | assert {k["To"] for k in sent_emails} == set(emails) | ||||
for e in sent_emails: | for e in sent_emails: | ||||
assert "bot@softwareheritage.org" in e["From"] | assert "bot@softwareheritage.org" in e["From"] | ||||
assert TEST_TYPE in e["Subject"] | assert TEST_TYPE in e["Subject"] | ||||
assert TEST_HEX_ID[:5] in e["Subject"] | assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] | ||||
assert TEST_TYPE in str(e) | assert TEST_TYPE in str(e) | ||||
assert "https://archive.softwareheritage.org/" in str(e) | assert "https://archive.softwareheritage.org/" in str(e) | ||||
assert TEST_HEX_ID[:5] in str(e) | assert TEST_SWHID.object_id.hex()[:5] in str(e) | ||||
assert "--\x20\n" in str(e) # Well-formated signature!!! | assert "--\x20\n" in str(e) # Well-formated signature!!! | ||||
# Check that the entries have been deleted and recalling the | # Check that the entries have been deleted and recalling the | ||||
# function does not re-send the e-mails | # function does not re-send the e-mails | ||||
m.reset_mock() | m.reset_mock() | ||||
swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) | swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | ||||
m.assert_not_called() | m.assert_not_called() | ||||
def test_available(swh_vault): | def test_available(swh_vault): | ||||
assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) | assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) | swh_vault.create_task(TEST_TYPE, TEST_SWHID) | ||||
assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) | assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) | ||||
swh_vault.cache.add(TEST_TYPE, TEST_HEX_ID, b"content") | swh_vault.cache.add(TEST_TYPE, TEST_SWHID, b"content") | ||||
assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) | assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) | ||||
swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") | swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") | ||||
assert swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) | assert swh_vault.is_available(TEST_TYPE, TEST_SWHID) | ||||
def test_fetch(swh_vault): | def test_fetch(swh_vault): | ||||
assert swh_vault.fetch(TEST_TYPE, TEST_HEX_ID, raise_notfound=False) is None | assert swh_vault.fetch(TEST_TYPE, TEST_SWHID, raise_notfound=False) is None | ||||
with pytest.raises( | with pytest.raises( | ||||
NotFoundExc, match=f"{TEST_TYPE} {TEST_HEX_ID} is not available." | NotFoundExc, match=f"{TEST_TYPE} {TEST_SWHID} is not available." | ||||
): | ): | ||||
swh_vault.fetch(TEST_TYPE, TEST_HEX_ID) | swh_vault.fetch(TEST_TYPE, TEST_SWHID) | ||||
obj_id, content = fake_cook(swh_vault, TEST_TYPE, b"content") | swhid, content = fake_cook(swh_vault, TEST_TYPE, b"content") | ||||
info = swh_vault.progress(TEST_TYPE, obj_id) | info = swh_vault.progress(TEST_TYPE, swhid) | ||||
access_ts_before = info["ts_last_access"] | access_ts_before = info["ts_last_access"] | ||||
assert swh_vault.fetch(TEST_TYPE, obj_id) == b"content" | assert swh_vault.fetch(TEST_TYPE, swhid) == b"content" | ||||
info = swh_vault.progress(TEST_TYPE, obj_id) | info = swh_vault.progress(TEST_TYPE, swhid) | ||||
access_ts_after = info["ts_last_access"] | access_ts_after = info["ts_last_access"] | ||||
assertTimestampAlmostNow(access_ts_after) | assertTimestampAlmostNow(access_ts_after) | ||||
assert access_ts_before < access_ts_after | assert access_ts_before < access_ts_after | ||||
def test_cache_expire_oldest(swh_vault): | def test_cache_expire_oldest(swh_vault): | ||||
r = range(1, 10) | r = range(1, 10) | ||||
inserted = {} | inserted = {} | ||||
for i in r: | for i in r: | ||||
sticky = i == 5 | sticky = i == 5 | ||||
content = b"content%s" % str(i).encode() | content = b"content%s" % str(i).encode() | ||||
obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) | swhid, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) | ||||
inserted[i] = (obj_id, content) | inserted[i] = (swhid, content) | ||||
swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) | swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) | ||||
swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) | swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) | ||||
swh_vault.cache_expire_oldest(n=4) | swh_vault.cache_expire_oldest(n=4) | ||||
should_be_still_here = {2, 3, 5, 8, 9} | should_be_still_here = {2, 3, 5, 8, 9} | ||||
for i in r: | for i in r: | ||||
assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( | assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( | ||||
i in should_be_still_here | i in should_be_still_here | ||||
) | ) | ||||
def test_cache_expire_until(swh_vault): | def test_cache_expire_until(swh_vault): | ||||
r = range(1, 10) | r = range(1, 10) | ||||
inserted = {} | inserted = {} | ||||
for i in r: | for i in r: | ||||
sticky = i == 5 | sticky = i == 5 | ||||
content = b"content%s" % str(i).encode() | content = b"content%s" % str(i).encode() | ||||
obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) | swhid, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) | ||||
inserted[i] = (obj_id, content) | inserted[i] = (swhid, content) | ||||
if i == 7: | if i == 7: | ||||
cutoff_date = datetime.datetime.now() | cutoff_date = datetime.datetime.now() | ||||
swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) | swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) | ||||
swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) | swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) | ||||
swh_vault.cache_expire_until(date=cutoff_date) | swh_vault.cache_expire_until(date=cutoff_date) | ||||
should_be_still_here = {2, 3, 5, 8, 9} | should_be_still_here = {2, 3, 5, 8, 9} | ||||
for i in r: | for i in r: | ||||
assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( | assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( | ||||
i in should_be_still_here | i in should_be_still_here | ||||
) | ) | ||||
def test_fail_cook_simple(swh_vault): | def test_fail_cook_simple(swh_vault): | ||||
fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") | fail_cook(swh_vault, TEST_TYPE, TEST_SWHID, "error42") | ||||
assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) | assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["progress_msg"] == "error42" | assert info["progress_msg"] == "error42" | ||||
def test_send_failure_email(swh_vault): | def test_send_failure_email(swh_vault): | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email="a@example.com") | swh_vault.cook(TEST_TYPE, TEST_SWHID, email="a@example.com") | ||||
swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "failed") | swh_vault.set_status(TEST_TYPE, TEST_SWHID, "failed") | ||||
swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, "test error") | swh_vault.set_progress(TEST_TYPE, TEST_SWHID, "test error") | ||||
with patch.object(swh_vault, "smtp_server") as m: | with patch.object(swh_vault, "smtp_server") as m: | ||||
swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) | swh_vault.send_notif(TEST_TYPE, TEST_SWHID) | ||||
e = [k[0][0] for k in m.send_message.call_args_list][0] | e = [k[0][0] for k in m.send_message.call_args_list][0] | ||||
assert e["To"] == "a@example.com" | assert e["To"] == "a@example.com" | ||||
assert "bot@softwareheritage.org" in e["From"] | assert "bot@softwareheritage.org" in e["From"] | ||||
assert TEST_TYPE in e["Subject"] | assert TEST_TYPE in e["Subject"] | ||||
assert TEST_HEX_ID[:5] in e["Subject"] | assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] | ||||
assert "fail" in e["Subject"] | assert "fail" in e["Subject"] | ||||
assert TEST_TYPE in str(e) | assert TEST_TYPE in str(e) | ||||
assert TEST_HEX_ID[:5] in str(e) | assert TEST_SWHID.object_id.hex()[:5] in str(e) | ||||
assert "test error" in str(e) | assert "test error" in str(e) | ||||
assert "--\x20\n" in str(e) # Well-formated signature | assert "--\x20\n" in str(e) # Well-formated signature | ||||
def test_retry_failed_bundle(swh_vault): | def test_retry_failed_bundle(swh_vault): | ||||
fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") | fail_cook(swh_vault, TEST_TYPE, TEST_SWHID, "error42") | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["task_status"] == "failed" | assert info["task_status"] == "failed" | ||||
with mock_cooking(swh_vault): | with mock_cooking(swh_vault): | ||||
swh_vault.cook(TEST_TYPE, TEST_HEX_ID) | swh_vault.cook(TEST_TYPE, TEST_SWHID) | ||||
info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) | info = swh_vault.progress(TEST_TYPE, TEST_SWHID) | ||||
assert info["task_status"] == "new" | assert info["task_status"] == "new" |