diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -76,22 +76,46 @@ msg_dict["date"], visit_stats_d.get("last_failed") ) else: # visit with snapshot, something happened - if visit_stats_d == empty_object: + if visit_stats_d["last_snapshot"] is None: + # first time visit with snapshot, we keep relevant information visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: - date = max_date( + # visit with snapshot already stored, last_eventful should already be + # stored + assert visit_stats_d["last_eventful"] is not None + latest_recorded_visit_date = max_date( visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] ) - if date and msg_dict["date"] < date: - # ignore out of order message - continue + current_status_date = msg_dict["date"] previous_snapshot = visit_stats_d["last_snapshot"] if msg_dict["snapshot"] != previous_snapshot: - visit_stats_d["last_eventful"] = msg_dict["date"] + if ( + latest_recorded_visit_date + and current_status_date < latest_recorded_visit_date + ): + # out of order message so ignored + continue + # new eventful visit (new snapshot) + visit_stats_d["last_eventful"] = current_status_date visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: - visit_stats_d["last_uneventful"] = msg_dict["date"] + # same snapshot as before + if ( + latest_recorded_visit_date + and current_status_date < latest_recorded_visit_date + ): + # we receive an old message which is an earlier "eventful" event + # than what we had, we consider the last_eventful event as + # actually an uneventful event. The true eventful message is the + # current one + visit_stats_d["last_uneventful"] = visit_stats_d[ + "last_eventful" + ] + visit_stats_d["last_eventful"] = current_status_date + else: + # uneventful event + visit_stats_d["last_uneventful"] = current_status_date scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -5,6 +5,7 @@ import datetime import functools +from itertools import permutations import pytest @@ -204,7 +205,7 @@ }, { "origin": "bar", - "visit": 3, + "visit": 4, "status": "full", "date": DATE2, "type": "git", @@ -271,16 +272,14 @@ last_snapshot=visit_status["snapshot"], ) - most_recent_date = DATE3 - most_recent_snapshot = hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd") visit_statuses = [ { "origin": "foo", - "visit": 3, + "visit": 4, "status": "full", - "date": most_recent_date, + "date": DATE3, "type": "git", - "snapshot": most_recent_snapshot, + "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, { "origin": "foo", @@ -304,11 +303,11 @@ assert actual_origin_visit_stats == OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], - last_eventful=most_recent_date, + last_eventful=DATE3, last_uneventful=None, last_failed=None, last_notfound=None, - last_snapshot=most_recent_snapshot, + last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), ) @@ -354,3 +353,127 @@ last_notfound=DATE1, last_snapshot=visit_status["snapshot"], ) + + +VISIT_STATUSES = [ + {**ovs, "date": DATE1 + n * ONE_DAY} + for n, ovs in enumerate( + [ + { + "origin": "foo", + "type": "git", + "visit": 1, + "status": "created", + "snapshot": None, + }, + { + "origin": "foo", + "type": "git", + "visit": 1, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "foo", + "type": "git", + "visit": 2, + "status": "created", + "snapshot": None, + }, + { + "origin": "foo", + "type": "git", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + ] + ) +] + + +@pytest.mark.parametrize( + "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) +) +def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): + """Ensure out of order topic subscription ends up in the same final state + + """ + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + expected_visit_stats = OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1 + ONE_DAY, + last_uneventful=DATE1 + 3 * ONE_DAY, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ) + + assert swh_scheduler.origin_visit_stats_get("foo", "git") == expected_visit_stats + + +VISIT_STATUSES1 = [ + {**ovs, "date": DATE1 + n * ONE_DAY} + for n, ovs in enumerate( + [ + { + "origin": "cavabarder", + "type": "hg", + "visit": 1, + "status": "partial", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 3, + "status": "full", + "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 4, + "status": "full", + "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + }, + ] + ) +] + + +@pytest.mark.parametrize( + "visit_statuses", permutations(VISIT_STATUSES1, len(VISIT_STATUSES1)) +) +def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): + """Ensure out of order topic subscription ends up in the same final state + + """ + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + expected_visit_stats = OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_eventful=DATE1 + 2 * ONE_DAY, + last_uneventful=DATE1 + 3 * ONE_DAY, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ) + + assert ( + swh_scheduler.origin_visit_stats_get("cavabarder", "hg") == expected_visit_stats + )