Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/journal_client.py
Show All 20 Lines | def max_date(*dates: Optional[datetime]) -> datetime: | ||||
""" | """ | ||||
filtered_dates = [d for d in dates if d is not None] | filtered_dates = [d for d in dates if d is not None] | ||||
if not filtered_dates: | if not filtered_dates: | ||||
raise ValueError("At least one date should be a valid datetime") | raise ValueError("At least one date should be a valid datetime") | ||||
return max(filtered_dates) | return max(filtered_dates) | ||||
def update_next_position_offset(visit_stats: Dict, increment: int) -> None: | |||||
"""Update the next position offset according to existing value and the increment. The | |||||
resulting value must be a positive integer. | |||||
""" | |||||
visit_stats["next_position_offset"] = max( | |||||
0, visit_stats["next_position_offset"] + increment | |||||
) | |||||
def process_journal_objects( | def process_journal_objects( | ||||
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface | ||||
) -> None: | ) -> None: | ||||
"""Read messages from origin_visit_status journal topics, then inserts them in the | """Read messages from origin_visit_status journal topics, then inserts them in the | ||||
scheduler "origin_visit_stats" table. | scheduler "origin_visit_stats" table. | ||||
FIXME: Clarify the existing behavior | |||||
FIXME2: Explicit the new perimeter | |||||
This is also in charge of updating the `next_visit_queue_position` (~ time at which | |||||
some new objects are expected to be added for the origin), next_position_offset | |||||
(duration that we expect to wait between visits of this origin). | |||||
Worker function for `JournalClient.process(worker_fn)`, after | Worker function for `JournalClient.process(worker_fn)`, after | ||||
currification of `scheduler` and `task_names`. | currification of `scheduler` and `task_names`. | ||||
""" | """ | ||||
assert set(messages) <= { | assert set(messages) <= { | ||||
msg_type | msg_type | ||||
}, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" | }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" | ||||
assert msg_type in messages, f"Expected {msg_type} messages" | assert msg_type in messages, f"Expected {msg_type} messages" | ||||
Show All 31 Lines | for msg_dict in interesting_messages: | ||||
} | } | ||||
visit_stats_d = origin_visit_stats[pk] | visit_stats_d = origin_visit_stats[pk] | ||||
if msg_dict["status"] == "not_found": | if msg_dict["status"] == "not_found": | ||||
visit_stats_d["last_notfound"] = max_date( | visit_stats_d["last_notfound"] = max_date( | ||||
msg_dict["date"], visit_stats_d.get("last_notfound") | msg_dict["date"], visit_stats_d.get("last_notfound") | ||||
) | ) | ||||
elif msg_dict["status"] == "failed": | update_next_position_offset(visit_stats_d, 1) # visit less often | ||||
visit_stats_d["last_failed"] = max_date( | elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: | ||||
msg_dict["date"], visit_stats_d.get("last_failed") | |||||
) | |||||
elif msg_dict["snapshot"] is None: | |||||
visit_stats_d["last_failed"] = max_date( | visit_stats_d["last_failed"] = max_date( | ||||
msg_dict["date"], visit_stats_d.get("last_failed") | msg_dict["date"], visit_stats_d.get("last_failed") | ||||
) | ) | ||||
update_next_position_offset(visit_stats_d, 1) # visit less often | |||||
else: # visit with snapshot, something happened | else: # visit with snapshot, something happened | ||||
if visit_stats_d["last_snapshot"] is None: | if visit_stats_d["last_snapshot"] is None: | ||||
# first time visit with snapshot, we keep relevant information | # first time visit with snapshot, we keep relevant information | ||||
visit_stats_d["last_eventful"] = msg_dict["date"] | visit_stats_d["last_eventful"] = msg_dict["date"] | ||||
visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | ||||
else: | else: | ||||
# visit with snapshot already stored, last_eventful should already be | # last_snapshot is set, so an eventful visit should have previously been | ||||
# stored | # recorded | ||||
assert visit_stats_d["last_eventful"] is not None | assert visit_stats_d["last_eventful"] is not None | ||||
latest_recorded_visit_date = max_date( | latest_recorded_visit_date = max_date( | ||||
visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] | visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] | ||||
) | ) | ||||
current_status_date = msg_dict["date"] | current_status_date = msg_dict["date"] | ||||
previous_snapshot = visit_stats_d["last_snapshot"] | previous_snapshot = visit_stats_d["last_snapshot"] | ||||
if msg_dict["snapshot"] != previous_snapshot: | if msg_dict["snapshot"] != previous_snapshot: | ||||
if ( | if ( | ||||
latest_recorded_visit_date | latest_recorded_visit_date | ||||
and current_status_date < latest_recorded_visit_date | and current_status_date < latest_recorded_visit_date | ||||
): | ): | ||||
# out of order message so ignored | # out of order message so ignored | ||||
continue | continue | ||||
# new eventful visit (new snapshot) | # new eventful visit (new snapshot) | ||||
visit_stats_d["last_eventful"] = current_status_date | visit_stats_d["last_eventful"] = current_status_date | ||||
visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | ||||
# Visit this origin more often in the future | |||||
update_next_position_offset(visit_stats_d, -2) | |||||
else: | else: | ||||
# same snapshot as before | # same snapshot as before | ||||
if ( | if ( | ||||
latest_recorded_visit_date | latest_recorded_visit_date | ||||
and current_status_date < latest_recorded_visit_date | and current_status_date < latest_recorded_visit_date | ||||
): | ): | ||||
# we receive an old message which is an earlier "eventful" event | # we receive an old message which is an earlier "eventful" event | ||||
# than what we had, we consider the last_eventful event as | # than what we had, we consider the last_eventful event as | ||||
# actually an uneventful event. | # actually an uneventful event. | ||||
# The last uneventful visit remains the most recent: | # The last uneventful visit remains the most recent: | ||||
# max, previously computed | # max, previously computed | ||||
visit_stats_d["last_uneventful"] = latest_recorded_visit_date | visit_stats_d["last_uneventful"] = latest_recorded_visit_date | ||||
# The eventful visit remains the oldest one: min | # The eventful visit remains the oldest one: min | ||||
visit_stats_d["last_eventful"] = min( | visit_stats_d["last_eventful"] = min( | ||||
visit_stats_d["last_eventful"], current_status_date | visit_stats_d["last_eventful"], current_status_date | ||||
) | ) | ||||
# Visit this origin less often in the future | |||||
update_next_position_offset(visit_stats_d, 1) | |||||
elif ( | elif ( | ||||
latest_recorded_visit_date | latest_recorded_visit_date | ||||
and current_status_date == latest_recorded_visit_date | and current_status_date == latest_recorded_visit_date | ||||
): | ): | ||||
# A duplicated message must be ignored to avoid | # A duplicated message must be ignored to avoid | ||||
# populating the last_uneventful message | # populating the last_uneventful message | ||||
continue | continue | ||||
else: | else: | ||||
# uneventful event | # uneventful event | ||||
visit_stats_d["last_uneventful"] = current_status_date | visit_stats_d["last_uneventful"] = current_status_date | ||||
# Visit this origin less often in the future | |||||
update_next_position_offset(visit_stats_d, 1) | |||||
scheduler.origin_visit_stats_upsert( | scheduler.origin_visit_stats_upsert( | ||||
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | ||||
) | ) | ||||
ardumont: ¯\_(ツ)_/¯ |
¯\_(ツ)_/¯