diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/journal_client.py @@ -0,0 +1,46 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.scheduler.model import OriginVisitStats + +msg_type = "origin_visit_status" + + +def process_journal_objects(messages, *, scheduler): + """Read messages from origin_visit_status journal topics, then inserts them in the + scheduler "origin_visit_stats" table. + + Worker function for `JournalClient.process(worker_fn)`, after + currification of `scheduler` and `task_names`. + + """ + assert msg_type in messages, f"Only '{msg_type}' messages expected" + + for ovs_dict in messages[msg_type]: + if ovs_dict["status"] in ("created", "ongoing"): + continue + visit_stats_d = { + "url": ovs_dict["origin"], + "visit_type": ovs_dict["type"], + "last_uneventful": None, + "last_eventful": None, + "last_failed": None, + } + + # partial, snapshot -> eventful + if ovs_dict["status"] == "partial" and ovs_dict["snapshot"] is not None: + key_date = "last_eventful" + # partial, no snapshot -> failed + elif ovs_dict["status"] == "partial" and ovs_dict["snapshot"] is None: + key_date = "last_failed" + elif ovs_dict["status"] == "full": + key_date = "last_eventful" + # not-found -> failed + elif ovs_dict["status"] == "not_found": # TODO? cf. T2961 + key_date = "last_failed" + + visit_stats_d[key_date] = ovs_dict["date"] + visit_stats = OriginVisitStats(**visit_stats_d) + scheduler.origin_visit_stats_upsert(visit_stats) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_journal_client.py @@ -0,0 +1,97 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools + +import pytest + +from swh.model.hashutil import hash_to_bytes +from swh.model.model import OriginVisitStatus +from swh.scheduler.journal_client import process_journal_objects +from swh.scheduler.utils import utcnow + + +def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): + process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) + + with pytest.raises(AssertionError, match="Only 'origin_visit_status'"): + process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) + + +def test_journal_client_origin_visit_status_from_journal(swh_scheduler): + process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) + visit_statuses = [ + OriginVisitStatus( # eventful + origin="foo", + visit=1, + status="full", + date=utcnow(), + type="git", + snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ), + OriginVisitStatus( # ignored due to status + origin="zrb", + visit=2, + status="created", + date=utcnow(), + type="hg", + snapshot=None, + ), + OriginVisitStatus( # uneventful + origin="bar", + visit=1, + status="partial", + date=utcnow(), + type="svn", + snapshot=None, + ), + OriginVisitStatus( # eventful + origin="blue", + visit=2, + status="partial", + date=utcnow(), + type="tar", + snapshot=hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ), + OriginVisitStatus( # ignored due to status + origin="zrb", + visit=2, + status="ongoing", + date=utcnow(), + type="hg", + snapshot=None, + ), + ] + + process_fn({"origin_visit_status": map(OriginVisitStatus.to_dict, visit_statuses)}) + + counter = 0 + for ovs in visit_statuses: + if ovs.status in ("ongoing", "created"): + continue # those are skipped + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + ovs.origin, ovs.type + ) + assert actual_origin_visit_stats is not None + + if ovs.status == "partial" and ovs.snapshot is not None: + assert actual_origin_visit_stats.last_eventful == ovs.date + assert actual_origin_visit_stats.last_failed is None + assert actual_origin_visit_stats.last_uneventful is None + elif ovs.status == "partial" and ovs.snapshot is None: + assert actual_origin_visit_stats.last_eventful is None + assert actual_origin_visit_stats.last_failed == ovs.date + assert actual_origin_visit_stats.last_uneventful is None + elif ovs.status == "full": + assert actual_origin_visit_stats.last_eventful == ovs.date + assert actual_origin_visit_stats.last_failed is None + assert actual_origin_visit_stats.last_uneventful is None + # status not-found cannot be tested as it's not part of the model yet + + counter += 1 + + assert counter == len( + [ovs for ovs in visit_statuses if ovs.status not in ("ongoing", "created")] + )