diff --git a/sql/updates/29.sql b/sql/updates/29.sql --- a/sql/updates/29.sql +++ b/sql/updates/29.sql @@ -12,8 +12,12 @@ alter table origin_visit_stats add column next_position_offset int not null default 4; +alter table origin_visit_stats +add column successive_visits int not null default 0; + comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; +comment on column origin_visit_stats.successive_visits is 'number of successive visits with the same status'; create table visit_scheduler_queue_position ( visit_type text not null, diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -178,16 +178,36 @@ visit_stats_d = origin_visit_stats[pk] + # get the name of the most recent event we got + date_keys = [ + f"last_{k}" for k in ("eventful", "uneventful", "failed", "notfound") + ] + event_dates = dict( + (v, k) for k, v in visit_stats_d.items() if k in date_keys and v is not None + ) + if not event_dates: + maxdate = None + last_event = None + else: + maxdate = max_date(*event_dates.keys()) + last_event = event_dates[maxdate] + increment_successive_visits = False + if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) update_next_position_offset(visit_stats_d, 1) # visit less often + if last_event == "last_notfound": + increment_successive_visits = True elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) update_next_position_offset(visit_stats_d, 1) # visit less often + if last_event == "last_failed": + increment_successive_visits = True + else: # visit with snapshot, something happened if visit_stats_d["last_snapshot"] is None: # first time visit with snapshot, we keep relevant information @@ -214,6 +234,9 @@ visit_stats_d["last_snapshot"] = msg_dict["snapshot"] # Visit this origin more often in the future update_next_position_offset(visit_stats_d, -2) + if last_event == "last_eventful": + increment_successive_visits = True + else: # same snapshot as before if ( @@ -232,6 +255,8 @@ ) # Visit this origin less often in the future update_next_position_offset(visit_stats_d, 1) + # there is no way we can do anything but reset the + # successive_visits here... elif ( latest_recorded_visit_date and current_status_date == latest_recorded_visit_date @@ -244,6 +269,8 @@ visit_stats_d["last_uneventful"] = current_status_date # Visit this origin less often in the future update_next_position_offset(visit_stats_d, 1) + if last_event == "last_uneventful": + increment_successive_visits = True # Update the next visit queue position (which will be used solely for origin # without any last_update, cf. the dedicated scheduling policy @@ -252,6 +279,11 @@ queue_position_per_visit_type, visit_stats_d ) + if increment_successive_visits: + visit_stats_d["successive_visits"] += 1 + else: + visit_stats_d["successive_visits"] = 1 + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -209,6 +209,8 @@ ) next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) + successive_visits = attr.ib(type=int, validator=type_validator(), default=0) + @last_eventful.validator def check_last_eventful(self, attribute, value): check_timestamptz(value) diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -177,6 +177,7 @@ next_visit_queue_position timestamptz, -- duration that we expect to wait between visits of this origin next_position_offset int not null default 4, + successive_visits int not null default 0, primary key (url, visit_type) ); @@ -193,6 +194,7 @@ comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; +comment on column origin_visit_stats.successive_visits is 'number of successive visits with the same status'; create table visit_scheduler_queue_position ( visit_type text not null, diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -130,7 +130,9 @@ swh_scheduler.origin_visit_stats_upsert.assert_not_called() -def assert_visit_stats_ok(actual_visit_stats, expected_visit_stats): +def assert_visit_stats_ok( + actual_visit_stats, expected_visit_stats, compare_successive_visits=True +): """Utility test function to ensure visits stats read from the backend are in the right shape. The comparison on the next_visit_queue_position will be dealt with in dedicated tests so it's not tested in tests that are calling this function. @@ -140,6 +142,8 @@ for visit_stats in actual_visit_stats: visit_stats = attr.evolve(visit_stats, next_visit_queue_position=None) + if not compare_successive_visits: + visit_stats = attr.evolve(visit_stats, successive_visits=0) assert visit_stats in expected_visit_stats @@ -171,6 +175,7 @@ last_notfound=visit_status["date"], last_snapshot=None, next_position_offset=5, + successive_visits=1, ) ], ) @@ -211,6 +216,7 @@ last_notfound=DATE3, last_snapshot=None, next_position_offset=7, + successive_visits=3, ) ], ) @@ -269,6 +275,7 @@ last_notfound=None, last_snapshot=None, next_position_offset=7, + successive_visits=3, ) ], ) @@ -311,6 +318,7 @@ last_notfound=None, last_snapshot=None, next_position_offset=6, + successive_visits=2, ) ], ) @@ -369,6 +377,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), next_position_offset=0, + successive_visits=3, ) ], ) @@ -397,6 +406,7 @@ last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=4, + successive_visits=1, ) ] ) @@ -408,6 +418,7 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) + assert_visit_stats_ok( actual_origin_visit_stats, [ @@ -420,6 +431,7 @@ last_notfound=DATE1, last_snapshot=visit_status["snapshot"], next_position_offset=5, # uneventful so visit less often + successive_visits=2, ) ], ) @@ -486,6 +498,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), next_position_offset=5, # uneventful, visit origin less often in future + successive_visits=1, ) ], ) @@ -550,6 +563,7 @@ assert actual_visit_stats.last_snapshot == hash_to_bytes( "aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd" ) + assert actual_visit_stats.successive_visits == 1 VISIT_STATUSES_2 = [ @@ -693,6 +707,8 @@ last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=4, + successive_visits=1, ) ], ) @@ -741,6 +757,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), next_position_offset=5, + successive_visits=1, ) ], ) @@ -807,6 +824,7 @@ next_position_offset=6, # 2 uneventful visits, whatever the permutation ) ], + compare_successive_visits=False, # depending on permutations, result change )