Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124114
D4852.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
D4852.diff
View Options
diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/journal_client.py
@@ -0,0 +1,46 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.scheduler.model import OriginVisitStats
+
+msg_type = "origin_visit_status"
+
+
+def process_journal_objects(messages, *, scheduler):
+ """Read messages from origin_visit_status journal topics, then inserts them in the
+ scheduler "origin_visit_stats" table.
+
+ Worker function for `JournalClient.process(worker_fn)`, after
+ currification of `scheduler` and `task_names`.
+
+ """
+ assert msg_type in messages, f"Only '{msg_type}' messages expected"
+
+ for ovs_dict in messages[msg_type]:
+ if ovs_dict["status"] in ("created", "ongoing"):
+ continue
+ visit_stats_d = {
+ "url": ovs_dict["origin"],
+ "visit_type": ovs_dict["type"],
+ "last_uneventful": None,
+ "last_eventful": None,
+ "last_failed": None,
+ }
+
+ # partial, snapshot -> eventful
+ if ovs_dict["status"] == "partial" and ovs_dict["snapshot"] is not None:
+ key_date = "last_eventful"
+ # partial, no snapshot -> failed
+ elif ovs_dict["status"] == "partial" and ovs_dict["snapshot"] is None:
+ key_date = "last_failed"
+ elif ovs_dict["status"] == "full":
+ key_date = "last_eventful"
+ # not-found -> failed
+ elif ovs_dict["status"] == "not_found": # TODO? cf. T2961
+ key_date = "last_failed"
+
+ visit_stats_d[key_date] = ovs_dict["date"]
+ visit_stats = OriginVisitStats(**visit_stats_d)
+ scheduler.origin_visit_stats_upsert(visit_stats)
diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_journal_client.py
@@ -0,0 +1,97 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import functools
+
+import pytest
+
+from swh.model.hashutil import hash_to_bytes
+from swh.model.model import OriginVisitStatus
+from swh.scheduler.journal_client import process_journal_objects
+from swh.scheduler.utils import utcnow
+
+
+def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler):
+ process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,)
+
+ with pytest.raises(AssertionError, match="Only 'origin_visit_status'"):
+ process_fn({"origin_visit": [{"url": "http://foobar.baz"},]})
+
+
+def test_journal_client_origin_visit_status_from_journal(swh_scheduler):
+ process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,)
+ visit_statuses = [
+ OriginVisitStatus( # eventful
+ origin="foo",
+ visit=1,
+ status="full",
+ date=utcnow(),
+ type="git",
+ snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ ),
+ OriginVisitStatus( # ignored due to status
+ origin="zrb",
+ visit=2,
+ status="created",
+ date=utcnow(),
+ type="hg",
+ snapshot=None,
+ ),
+ OriginVisitStatus( # uneventful
+ origin="bar",
+ visit=1,
+ status="partial",
+ date=utcnow(),
+ type="svn",
+ snapshot=None,
+ ),
+ OriginVisitStatus( # eventful
+ origin="blue",
+ visit=2,
+ status="partial",
+ date=utcnow(),
+ type="tar",
+ snapshot=hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ ),
+ OriginVisitStatus( # ignored due to status
+ origin="zrb",
+ visit=2,
+ status="ongoing",
+ date=utcnow(),
+ type="hg",
+ snapshot=None,
+ ),
+ ]
+
+ process_fn({"origin_visit_status": map(OriginVisitStatus.to_dict, visit_statuses)})
+
+ counter = 0
+ for ovs in visit_statuses:
+ if ovs.status in ("ongoing", "created"):
+ continue # those are skipped
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get(
+ ovs.origin, ovs.type
+ )
+ assert actual_origin_visit_stats is not None
+
+ if ovs.status == "partial" and ovs.snapshot is not None:
+ assert actual_origin_visit_stats.last_eventful == ovs.date
+ assert actual_origin_visit_stats.last_failed is None
+ assert actual_origin_visit_stats.last_uneventful is None
+ elif ovs.status == "partial" and ovs.snapshot is None:
+ assert actual_origin_visit_stats.last_eventful is None
+ assert actual_origin_visit_stats.last_failed == ovs.date
+ assert actual_origin_visit_stats.last_uneventful is None
+ elif ovs.status == "full":
+ assert actual_origin_visit_stats.last_eventful == ovs.date
+ assert actual_origin_visit_stats.last_failed is None
+ assert actual_origin_visit_stats.last_uneventful is None
+ # status not-found cannot be tested as it's not part of the model yet
+
+ counter += 1
+
+ assert counter == len(
+ [ovs for ovs in visit_statuses if ovs.status not in ("ongoing", "created")]
+ )
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 4:23 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222954
Attached To
D4852: Start journal client to populate the origin_visit_stats model
Event Timeline
Log In to Comment