Page MenuHomeSoftware Heritage

D4852.id17178.diff
No OneTemporary

D4852.id17178.diff

diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/journal_client.py
@@ -0,0 +1,47 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Dict
+
+from swh.scheduler.model import OriginVisitStats
+
+msg_type = "origin_visit_status"
+
+
+STATUS_TO_KEY_DATE: Dict[str, str] = {
+ "full": "last_eventful",
+ "partial": "last_uneventful", # or last_failed ?
+ "not_found": "last_failed", # TODO? cf. https://forge.softwareheritage.org/T2961
+}
+
+
+def process_journal_objects(messages, *, scheduler):
+ """Read messages from origin_visit_status journal topics, then inserts them in the
+ scheduler "origin_visit_stats" table.
+
+ Worker function for `JournalClient.process(worker_fn)`, after
+ currification of `scheduler` and `task_names`.
+
+ """
+ assert msg_type in messages, f"Only '{msg_type}' messages expected"
+
+ for ovs_dict in messages[msg_type]:
+ if ovs_dict["status"] in ("created", "ongoing"):
+ continue
+ visit_stats_d = {
+ "url": ovs_dict["origin"],
+ "visit_type": ovs_dict["type"],
+ "last_uneventful": None,
+ "last_eventful": None,
+ "last_failed": None,
+ }
+
+ # or failed?
+
+ key_date = STATUS_TO_KEY_DATE[ovs_dict["status"]]
+ visit_stats_d[key_date] = ovs_dict["date"]
+
+ visit_stats = OriginVisitStats(**visit_stats_d)
+ scheduler.origin_visit_stats_upsert(visit_stats)
diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_journal_client.py
@@ -0,0 +1,74 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+
+import pytest
+
+from swh.model.hashutil import hash_to_bytes
+from swh.model.model import OriginVisitStatus
+from swh.scheduler.journal_client import process_journal_objects
+from swh.scheduler.utils import utcnow
+
+
+def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler):
+ process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,)
+
+ with pytest.raises(AssertionError, match="Only 'origin_visit_status'"):
+ process_fn({"origin_visit": [{"url": "http://foobar.baz"},]})
+
+
+def test_journal_client_origin_visit_status_from_journal(swh_scheduler):
+ process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,)
+ visit_statuses = [
+ OriginVisitStatus(
+ origin="foo",
+ visit=1,
+ status="full",
+ date=utcnow(),
+ type="git",
+ snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ ),
+ OriginVisitStatus(
+ origin="zrb",
+ visit=2,
+ status="created", # ignored
+ date=utcnow(),
+ type="hg",
+ snapshot=None,
+ ),
+ OriginVisitStatus(
+ origin="bar",
+ visit=1,
+ status="partial",
+ date=utcnow(),
+ type="svn",
+ snapshot=None,
+ ),
+ OriginVisitStatus(
+ origin="zrb",
+ visit=2,
+ status="ongoing", # ignore
+ date=utcnow(),
+ type="hg",
+ snapshot=None,
+ ),
+ ]
+
+ process_fn({"origin_visit_status": map(OriginVisitStatus.to_dict, visit_statuses)})
+
+ counter = 0
+ for ovs in visit_statuses:
+ if ovs.status in ("ongoing", "created"):
+ continue # those are skipped
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get(
+ ovs.origin, ovs.type
+ )
+ assert actual_origin_visit_stats is not None
+ counter += 1
+
+ assert counter == len(
+ [ovs for ovs in visit_statuses if ovs.status not in ("ongoing", "created")]
+ )

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 7:27 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3230153

Event Timeline