diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/journal_client.py @@ -0,0 +1,47 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict + +from swh.scheduler.model import OriginVisitStats + +msg_type = "origin_visit_status" + + +STATUS_TO_KEY_DATE: Dict[str, str] = { + "full": "last_eventful", + "partial": "last_uneventful", # or last_failed ? + "not_found": "last_failed", # TODO? cf. https://forge.softwareheritage.org/T2961 +} + + +def process_journal_objects(messages, *, scheduler): + """Read messages from origin_visit_status journal topics, then inserts them in the + scheduler "origin_visit_stats" table. + + Worker function for `JournalClient.process(worker_fn)`, after + currification of `scheduler` and `task_names`. + + """ + assert msg_type in messages, f"Only '{msg_type}' messages expected" + + for ovs_dict in messages[msg_type]: + if ovs_dict["status"] in ("created", "ongoing"): + continue + visit_stats_d = { + "url": ovs_dict["origin"], + "visit_type": ovs_dict["type"], + "last_uneventful": None, + "last_eventful": None, + "last_failed": None, + } + + # or failed? + + key_date = STATUS_TO_KEY_DATE[ovs_dict["status"]] + visit_stats_d[key_date] = ovs_dict["date"] + + visit_stats = OriginVisitStats(**visit_stats_d) + scheduler.origin_visit_stats_upsert(visit_stats) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_journal_client.py @@ -0,0 +1,74 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools + +import pytest + +from swh.model.hashutil import hash_to_bytes +from swh.model.model import OriginVisitStatus +from swh.scheduler.journal_client import process_journal_objects +from swh.scheduler.utils import utcnow + + +def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): + process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) + + with pytest.raises(AssertionError, match="Only 'origin_visit_status'"): + process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) + + +def test_journal_client_origin_visit_status_from_journal(swh_scheduler): + process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) + visit_statuses = [ + OriginVisitStatus( + origin="foo", + visit=1, + status="full", + date=utcnow(), + type="git", + snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ), + OriginVisitStatus( + origin="zrb", + visit=2, + status="created", # ignored + date=utcnow(), + type="hg", + snapshot=None, + ), + OriginVisitStatus( + origin="bar", + visit=1, + status="partial", + date=utcnow(), + type="svn", + snapshot=None, + ), + OriginVisitStatus( + origin="zrb", + visit=2, + status="ongoing", # ignore + date=utcnow(), + type="hg", + snapshot=None, + ), + ] + + process_fn({"origin_visit_status": map(OriginVisitStatus.to_dict, visit_statuses)}) + + counter = 0 + for ovs in visit_statuses: + if ovs.status in ("ongoing", "created"): + continue # those are skipped + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + ovs.origin, ovs.type + ) + assert actual_origin_visit_stats is not None + counter += 1 + + assert counter == len( + [ovs for ovs in visit_statuses if ovs.status not in ("ongoing", "created")] + )