Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/journal_client.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | |||||||||||||
# License: GNU General Public License version 3, or any later version | |||||||||||||
# See top-level LICENSE file for more information | |||||||||||||
from datetime import datetime | |||||||||||||
from typing import Dict, List, Optional | |||||||||||||
from swh.scheduler.interface import SchedulerInterface | |||||||||||||
from swh.scheduler.model import OriginVisitStats | |||||||||||||
msg_type = "origin_visit_status" | |||||||||||||
def max_date(d1: Optional[datetime], d2: Optional[datetime]) -> datetime: | |||||||||||||
"""Return the max date of the visit stats | |||||||||||||
""" | |||||||||||||
if d1 is None and d2 is None: | |||||||||||||
raise ValueError("At least one date should be a valid datetime") | |||||||||||||
tenmaUnsubmitted Not Done Inline Actions
tenma: | |||||||||||||
if d1 is None: | |||||||||||||
assert d2 is not None # make mypy happy | |||||||||||||
Not Done Inline Actionsmay seem useless after a raise, but using else/elif clauses may make mypy happy AND be less redundant tenma: may seem useless after a raise, but using else/elif clauses may make mypy happy AND be less… | |||||||||||||
Done Inline Actionsit's totally for mypy here... ardumont: it's totally for mypy here... | |||||||||||||
return d2 | |||||||||||||
if d2 is None: | |||||||||||||
return d1 | |||||||||||||
return max(d1, d2) | |||||||||||||
def process_journal_objects( | |||||||||||||
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | |||||||||||||
) -> None: | |||||||||||||
Not Done Inline ActionsWhy not swh.model.model objects? vlorentz: Why not swh.model.model objects? | |||||||||||||
Done Inline ActionsAre you implying we can receive model objects here? It'd be neater. ardumont: Are you implying we can receive model objects here?
It'd be neater. | |||||||||||||
Not Done Inline Actionsmy bad, we can't vlorentz: my bad, we can't | |||||||||||||
"""Read messages from origin_visit_status journal topics, then inserts them in the | |||||||||||||
scheduler "origin_visit_stats" table. | |||||||||||||
Worker function for `JournalClient.process(worker_fn)`, after | |||||||||||||
currification of `scheduler` and `task_names`. | |||||||||||||
""" | |||||||||||||
assert set(messages) <= { | |||||||||||||
Not Done Inline Actions
vlorentz: | |||||||||||||
msg_type | |||||||||||||
}, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" | |||||||||||||
assert msg_type in messages, f"Expected {msg_type} messages" | |||||||||||||
for msg_dict in messages[msg_type]: | |||||||||||||
if msg_dict["status"] in ("created", "ongoing"): | |||||||||||||
continue | |||||||||||||
origin = msg_dict["origin"] | |||||||||||||
visit_type = msg_dict["type"] | |||||||||||||
visit_stats_d = { | |||||||||||||
"url": origin, | |||||||||||||
"visit_type": visit_type, | |||||||||||||
"last_uneventful": None, | |||||||||||||
"last_eventful": None, | |||||||||||||
"last_failed": None, | |||||||||||||
"last_notfound": None, | |||||||||||||
"last_snapshot": None, | |||||||||||||
} | |||||||||||||
actual_visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) | |||||||||||||
if msg_dict["status"] == "not_found": | |||||||||||||
visit_stats_d["last_notfound"] = max_date( | |||||||||||||
msg_dict["date"], | |||||||||||||
actual_visit_stats.last_notfound if actual_visit_stats else None, | |||||||||||||
) | |||||||||||||
elif msg_dict["snapshot"] is None: | |||||||||||||
visit_stats_d["last_failed"] = max_date( | |||||||||||||
msg_dict["date"], | |||||||||||||
actual_visit_stats.last_failed if actual_visit_stats else None, | |||||||||||||
) | |||||||||||||
else: # visit with snapshot, something happened | |||||||||||||
if not actual_visit_stats: | |||||||||||||
visit_stats_d["last_eventful"] = msg_dict["date"] | |||||||||||||
visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | |||||||||||||
else: | |||||||||||||
date = max_date( | |||||||||||||
actual_visit_stats.last_eventful, actual_visit_stats.last_uneventful | |||||||||||||
) | |||||||||||||
if date and msg_dict["date"] < date: | |||||||||||||
# ignore out of order message | |||||||||||||
continue | |||||||||||||
previous_snapshot = actual_visit_stats.last_snapshot | |||||||||||||
if msg_dict["snapshot"] != previous_snapshot: | |||||||||||||
visit_stats_d["last_eventful"] = msg_dict["date"] | |||||||||||||
visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | |||||||||||||
else: | |||||||||||||
Not Done Inline ActionsMake last_notfound/last_failed/... local variables, and build OriginVisitStats with explicit arguments at the end, so that mypy can type-check them. vlorentz: Make `last_notfound`/`last_failed`/... local variables, and build `OriginVisitStats` with… | |||||||||||||
Done Inline Actionsnext diff about batch upsert modifies this part so i won't touch it yet. ardumont: next diff about batch upsert modifies this part so i won't touch it yet. | |||||||||||||
visit_stats_d["last_uneventful"] = msg_dict["date"] | |||||||||||||
visit_stats = OriginVisitStats(**visit_stats_d) | |||||||||||||
scheduler.origin_visit_stats_upsert(visit_stats) |