diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py index 888281e..dfa97e8 100644 --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -1,142 +1,143 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats msg_type = "origin_visit_status" def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates At least one date must be not None. """ filtered_dates = [d for d in dates if d is not None] if not filtered_dates: raise ValueError("At least one date should be a valid datetime") return max(filtered_dates) def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topics, then inserts them in the scheduler "origin_visit_stats" table. Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. """ assert set(messages) <= { msg_type }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" interesting_messages = [ msg for msg in messages[msg_type] if "type" in msg and msg["status"] not in ("created", "ongoing") ] if not interesting_messages: return origin_visit_stats: Dict[Tuple[str, str], Dict] = { (visit_stats.url, visit_stats.visit_type): attr.asdict(visit_stats) for visit_stats in scheduler.origin_visit_stats_get( list(set((vs["origin"], vs["type"]) for vs in interesting_messages)) ) } + # Use the default values from the model object + empty_object = { + field.name: field.default if field.default != attr.NOTHING else None + for field in attr.fields(OriginVisitStats) + } for msg_dict in interesting_messages: origin = msg_dict["origin"] visit_type = msg_dict["type"] - empty_object = { - "url": origin, - "visit_type": visit_type, - "last_uneventful": None, - "last_eventful": None, - "last_failed": None, - "last_notfound": None, - "last_snapshot": None, - } pk = origin, visit_type if pk not in origin_visit_stats: - origin_visit_stats[pk] = empty_object + origin_visit_stats[pk] = { + **empty_object, + "url": origin, + "visit_type": visit_type, + } + visit_stats_d = origin_visit_stats[pk] if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) elif msg_dict["status"] == "failed": visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) elif msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) else: # visit with snapshot, something happened if visit_stats_d["last_snapshot"] is None: # first time visit with snapshot, we keep relevant information visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: # visit with snapshot already stored, last_eventful should already be # stored assert visit_stats_d["last_eventful"] is not None latest_recorded_visit_date = max_date( visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] ) current_status_date = msg_dict["date"] previous_snapshot = visit_stats_d["last_snapshot"] if msg_dict["snapshot"] != previous_snapshot: if ( latest_recorded_visit_date and current_status_date < latest_recorded_visit_date ): # out of order message so ignored continue # new eventful visit (new snapshot) visit_stats_d["last_eventful"] = current_status_date visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: # same snapshot as before if ( latest_recorded_visit_date and current_status_date < latest_recorded_visit_date ): # we receive an old message which is an earlier "eventful" event # than what we had, we consider the last_eventful event as # actually an uneventful event. # The last uneventful visit remains the most recent: # max, previously computed visit_stats_d["last_uneventful"] = latest_recorded_visit_date # The eventful visit remains the oldest one: min visit_stats_d["last_eventful"] = min( visit_stats_d["last_eventful"], current_status_date ) elif ( latest_recorded_visit_date and current_status_date == latest_recorded_visit_date ): # A duplicated message must be ignored to avoid # populating the last_uneventful message continue else: # uneventful event visit_stats_d["last_uneventful"] = current_status_date scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() )