Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/journal_client.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from swh.scheduler.model import OriginVisitStats | |||||
msg_type = "origin_visit_status" | |||||
def process_journal_objects(messages, *, scheduler): | |||||
"""Read messages from origin_visit_status journal topics, then inserts them in the | |||||
scheduler "origin_visit_stats" table. | |||||
Worker function for `JournalClient.process(worker_fn)`, after | |||||
currification of `scheduler` and `task_names`. | |||||
""" | |||||
assert msg_type in messages, f"Only '{msg_type}' messages expected" | |||||
for ovs_dict in messages[msg_type]: | |||||
if ovs_dict["status"] in ("created", "ongoing"): | |||||
continue | |||||
visit_stats_d = { | |||||
"url": ovs_dict["origin"], | |||||
"visit_type": ovs_dict["type"], | |||||
"last_uneventful": None, | |||||
"last_eventful": None, | |||||
"last_failed": None, | |||||
} | |||||
# partial, snapshot -> eventful | |||||
if ovs_dict["status"] == "partial" and ovs_dict["snapshot"] is not None: | |||||
key_date = "last_eventful" | |||||
# partial, no snapshot -> failed | |||||
elif ovs_dict["status"] == "partial" and ovs_dict["snapshot"] is None: | |||||
key_date = "last_failed" | |||||
elif ovs_dict["status"] == "full": | |||||
key_date = "last_eventful" | |||||
# not-found -> failed | |||||
elif ovs_dict["status"] == "not_found": # TODO? cf. T2961 | |||||
key_date = "last_failed" | |||||
visit_stats_d[key_date] = ovs_dict["date"] | |||||
visit_stats = OriginVisitStats(**visit_stats_d) | |||||
scheduler.origin_visit_stats_upsert(visit_stats) |