Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/journal_client.py
Show First 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | for msg_dict in messages[msg_type]: | ||||
empty_object = { | empty_object = { | ||||
"url": origin, | "url": origin, | ||||
"visit_type": visit_type, | "visit_type": visit_type, | ||||
"last_uneventful": None, | "last_uneventful": None, | ||||
"last_eventful": None, | "last_eventful": None, | ||||
"last_failed": None, | "last_failed": None, | ||||
"last_notfound": None, | "last_notfound": None, | ||||
"last_snapshot": None, | "last_snapshot": None, | ||||
"successive_visits": 0, | |||||
} | } | ||||
pk = origin, visit_type | pk = origin, visit_type | ||||
if pk not in origin_visit_stats: | if pk not in origin_visit_stats: | ||||
visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) | visit_stats = scheduler.origin_visit_stats_get(origin, visit_type) | ||||
origin_visit_stats[pk] = ( | origin_visit_stats[pk] = ( | ||||
attr.asdict(visit_stats) if visit_stats else empty_object | attr.asdict(visit_stats) if visit_stats else empty_object | ||||
) | ) | ||||
visit_stats_d = origin_visit_stats[pk] | visit_stats_d = origin_visit_stats[pk] | ||||
# get the name of the most recetn event we got | |||||
date_keys = [ | |||||
f"last_{k}" for k in ("eventful", "uneventful", "failed", "notfound") | |||||
] | |||||
event_dates = dict( | |||||
(v, k) for k, v in visit_stats_d.items() if k in date_keys and v is not None | |||||
) | |||||
if not event_dates: | |||||
maxdate = None | |||||
last_event = None | |||||
else: | |||||
maxdate = max_date(*event_dates.keys()) | |||||
last_event = event_dates[maxdate] | |||||
increment_successive_visits = False | |||||
if msg_dict["status"] == "not_found": | if msg_dict["status"] == "not_found": | ||||
visit_stats_d["last_notfound"] = max_date( | visit_stats_d["last_notfound"] = max_date( | ||||
msg_dict["date"], visit_stats_d.get("last_notfound") | msg_dict["date"], visit_stats_d.get("last_notfound") | ||||
) | ) | ||||
if last_event == "last_notfound": | |||||
increment_successive_visits = True | |||||
elif msg_dict["snapshot"] is None: | elif msg_dict["snapshot"] is None: | ||||
visit_stats_d["last_failed"] = max_date( | visit_stats_d["last_failed"] = max_date( | ||||
msg_dict["date"], visit_stats_d.get("last_failed") | msg_dict["date"], visit_stats_d.get("last_failed") | ||||
) | ) | ||||
if last_event == "last_failed": | |||||
increment_successive_visits = True | |||||
else: # visit with snapshot, something happened | else: # visit with snapshot, something happened | ||||
if visit_stats_d["last_snapshot"] is None: | if visit_stats_d["last_snapshot"] is None: | ||||
# first time visit with snapshot, we keep relevant information | # first time visit with snapshot, we keep relevant information | ||||
visit_stats_d["last_eventful"] = msg_dict["date"] | visit_stats_d["last_eventful"] = msg_dict["date"] | ||||
visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | ||||
else: | else: | ||||
# visit with snapshot already stored, last_eventful should already be | # visit with snapshot already stored, last_eventful should already be | ||||
# stored | # stored | ||||
assert visit_stats_d["last_eventful"] is not None | assert visit_stats_d["last_eventful"] is not None | ||||
latest_recorded_visit_date = max_date( | latest_recorded_visit_date = max_date( | ||||
visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] | visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] | ||||
) | ) | ||||
current_status_date = msg_dict["date"] | current_status_date = msg_dict["date"] | ||||
previous_snapshot = visit_stats_d["last_snapshot"] | previous_snapshot = visit_stats_d["last_snapshot"] | ||||
if msg_dict["snapshot"] != previous_snapshot: | if msg_dict["snapshot"] != previous_snapshot: | ||||
if ( | if ( | ||||
latest_recorded_visit_date | latest_recorded_visit_date | ||||
and current_status_date < latest_recorded_visit_date | and current_status_date < latest_recorded_visit_date | ||||
): | ): | ||||
# out of order message so ignored | # out of order message so ignored | ||||
continue | continue | ||||
# new eventful visit (new snapshot) | # new eventful visit (new snapshot) | ||||
visit_stats_d["last_eventful"] = current_status_date | visit_stats_d["last_eventful"] = current_status_date | ||||
visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | visit_stats_d["last_snapshot"] = msg_dict["snapshot"] | ||||
if last_event == "last_eventful": | |||||
increment_successive_visits = True | |||||
else: | else: | ||||
# same snapshot as before | # same snapshot as before | ||||
if ( | if ( | ||||
latest_recorded_visit_date | latest_recorded_visit_date | ||||
and current_status_date < latest_recorded_visit_date | and current_status_date < latest_recorded_visit_date | ||||
): | ): | ||||
# we receive an old message which is an earlier "eventful" event | # we receive an old message which is an earlier "eventful" event | ||||
# than what we had, we consider the last_eventful event as | # than what we had, we consider the last_eventful event as | ||||
# actually an uneventful event. The true eventful message is the | # actually an uneventful event. The true eventful message is the | ||||
# current one | # current one | ||||
visit_stats_d["last_uneventful"] = visit_stats_d[ | visit_stats_d["last_uneventful"] = visit_stats_d[ | ||||
"last_eventful" | "last_eventful" | ||||
] | ] | ||||
visit_stats_d["last_eventful"] = current_status_date | visit_stats_d["last_eventful"] = current_status_date | ||||
# there is no way we can do anything but reset the | |||||
# successive_visits here... | |||||
else: | else: | ||||
# uneventful event | # uneventful event | ||||
visit_stats_d["last_uneventful"] = current_status_date | visit_stats_d["last_uneventful"] = current_status_date | ||||
if last_event == "last_uneventful": | |||||
increment_successive_visits = True | |||||
if increment_successive_visits: | |||||
visit_stats_d["successive_visits"] += 1 | |||||
else: | |||||
visit_stats_d["successive_visits"] = 1 | |||||
scheduler.origin_visit_stats_upsert( | scheduler.origin_visit_stats_upsert( | ||||
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() | ||||
) | ) |