diff --git a/sql/updates/29.sql b/sql/updates/29.sql --- a/sql/updates/29.sql +++ b/sql/updates/29.sql @@ -12,8 +12,12 @@ alter table origin_visit_stats add column next_position_offset int not null default 4; +alter table origin_visit_stats +add column successive_visits int not null default 0; + comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; +comment on column origin_visit_stats.successive_visits is 'number of successive visits with the same status'; create table visit_scheduler_queue_position ( visit_type text not null, diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -16,6 +16,9 @@ msg_type = "origin_visit_status" +INTERESTING_EVENTS = ("eventful", "uneventful", "failed", "notfound") + + def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates @@ -102,6 +105,29 @@ return current_position + visit_interval +def compute_last_event(visit_stats_d: Dict) -> Optional[str]: + """Determine the most recent event of a current visit_stats dict. + + Args: + visit_stats_d: Dict representing a visit stats + + Returns: + the last event if any (e.g "last_eventful", ...) + + """ + date_keys = [f"last_{k}" for k in INTERESTING_EVENTS] + event_dates = dict( + (v, k) for k, v in visit_stats_d.items() if k in date_keys and v is not None + ) + if not event_dates: + last_event = None + else: + maxdate = max_date(*event_dates.keys()) + last_event = event_dates[maxdate] + + return last_event + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: @@ -144,7 +170,7 @@ interesting_messages = [ msg for msg in messages[msg_type] - if "type" in msg and msg["status"] not in ("created", "ongoing") + if "type" in msg and msg["status"] in INTERESTING_EVENTS ] if not interesting_messages: @@ -177,17 +203,24 @@ } visit_stats_d = origin_visit_stats[pk] + last_event = compute_last_event(visit_stats_d) + increment_successive_visits = False if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) update_next_position_offset(visit_stats_d, 1) # visit less often + if last_event == "last_notfound": + increment_successive_visits = True elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) update_next_position_offset(visit_stats_d, 1) # visit less often + if last_event == "last_failed": + increment_successive_visits = True + else: # visit with snapshot, something happened if visit_stats_d["last_snapshot"] is None: # first time visit with snapshot, we keep relevant information @@ -214,6 +247,9 @@ visit_stats_d["last_snapshot"] = msg_dict["snapshot"] # Visit this origin more often in the future update_next_position_offset(visit_stats_d, -2) + if last_event == "last_eventful": + increment_successive_visits = True + else: # same snapshot as before if ( @@ -232,6 +268,8 @@ ) # Visit this origin less often in the future update_next_position_offset(visit_stats_d, 1) + # there is no way we can do anything but reset the + # successive_visits here... elif ( latest_recorded_visit_date and current_status_date == latest_recorded_visit_date @@ -244,6 +282,8 @@ visit_stats_d["last_uneventful"] = current_status_date # Visit this origin less often in the future update_next_position_offset(visit_stats_d, 1) + if last_event == "last_uneventful": + increment_successive_visits = True # Update the next visit queue position (which will be used solely for origin # without any last_update, cf. the dedicated scheduling policy @@ -252,6 +292,11 @@ queue_position_per_visit_type, visit_stats_d ) + if increment_successive_visits: + visit_stats_d["successive_visits"] += 1 + else: + visit_stats_d["successive_visits"] = 1 + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -209,6 +209,8 @@ ) next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) + successive_visits = attr.ib(type=int, validator=type_validator(), default=0) + @last_eventful.validator def check_last_eventful(self, attribute, value): check_timestamptz(value) diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -177,6 +177,7 @@ next_visit_queue_position timestamptz, -- duration that we expect to wait between visits of this origin next_position_offset int not null default 4, + successive_visits int not null default 0, primary key (url, visit_type) ); @@ -193,6 +194,7 @@ comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; +comment on column origin_visit_stats.successive_visits is 'number of successive visits with the same status'; create table visit_scheduler_queue_position ( visit_type text not null, diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -7,6 +7,7 @@ from datetime import timedelta import functools from itertools import permutations +import random from unittest.mock import Mock import attr @@ -14,6 +15,8 @@ from swh.model.hashutil import hash_to_bytes from swh.scheduler.journal_client import ( + INTERESTING_EVENTS, + compute_last_event, from_position_offset_to_days, max_date, next_visit_queue_position, @@ -130,7 +133,9 @@ swh_scheduler.origin_visit_stats_upsert.assert_not_called() -def assert_visit_stats_ok(actual_visit_stats, expected_visit_stats): +def assert_visit_stats_ok( + actual_visit_stats, expected_visit_stats, compare_successive_visits=True +): """Utility test function to ensure visits stats read from the backend are in the right shape. The comparison on the next_visit_queue_position will be dealt with in dedicated tests so it's not tested in tests that are calling this function. @@ -140,6 +145,8 @@ for visit_stats in actual_visit_stats: visit_stats = attr.evolve(visit_stats, next_visit_queue_position=None) + if not compare_successive_visits: + visit_stats = attr.evolve(visit_stats, successive_visits=0) assert visit_stats in expected_visit_stats @@ -171,6 +178,7 @@ last_notfound=visit_status["date"], last_snapshot=None, next_position_offset=5, + successive_visits=1, ) ], ) @@ -211,6 +219,7 @@ last_notfound=DATE3, last_snapshot=None, next_position_offset=7, + successive_visits=3, ) ], ) @@ -269,6 +278,7 @@ last_notfound=None, last_snapshot=None, next_position_offset=7, + successive_visits=3, ) ], ) @@ -311,6 +321,7 @@ last_notfound=None, last_snapshot=None, next_position_offset=6, + successive_visits=2, ) ], ) @@ -369,6 +380,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), next_position_offset=0, + successive_visits=3, ) ], ) @@ -397,6 +409,7 @@ last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=4, + successive_visits=1, ) ] ) @@ -408,6 +421,7 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) + assert_visit_stats_ok( actual_origin_visit_stats, [ @@ -420,6 +434,7 @@ last_notfound=DATE1, last_snapshot=visit_status["snapshot"], next_position_offset=5, # uneventful so visit less often + successive_visits=2, ) ], ) @@ -486,6 +501,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), next_position_offset=5, # uneventful, visit origin less often in future + successive_visits=1, ) ], ) @@ -550,6 +566,7 @@ assert actual_visit_stats.last_snapshot == hash_to_bytes( "aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd" ) + assert actual_visit_stats.successive_visits == 1 VISIT_STATUSES_2 = [ @@ -693,6 +710,8 @@ last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=4, + successive_visits=1, ) ], ) @@ -741,6 +760,7 @@ last_notfound=None, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), next_position_offset=5, + successive_visits=1, ) ], ) @@ -807,6 +827,7 @@ next_position_offset=6, # 2 uneventful visits, whatever the permutation ) ], + compare_successive_visits=False, # depending on permutations, result change ) @@ -907,3 +928,44 @@ ) assert mock_random.called + + +@pytest.mark.parametrize( + "last_event", INTERESTING_EVENTS, +) +def test_compute_last_event(last_event): + """Compute last event of visit stats should always return most recent event if any + + """ + + maxdate = utcnow() + + event_key = f"last_{last_event}" + visit_stats_d = { + event_key: maxdate, + } + + other_events = set(INTERESTING_EVENTS) - set([last_event]) + + # For the remaining event, even have some empty dates or have some dates in the past + for i, event in enumerate(other_events): + empty_date = random.choice([True, False]) + visit_stats_d[f"last_{event}"] = ( + None if empty_date else (maxdate - timedelta(days=1 + i)) + ) + + # Ensure our visit_stats_d is configured properly + assert max_date(*visit_stats_d.values()) == maxdate + + actual_last_event = compute_last_event(visit_stats_d) + assert actual_last_event == event_key + + +def test_compute_last_event_none(): + """Compute last event out of visit stats without any date should return None + + """ + + visit_stats_d = {f"last_{event}": None for event in INTERESTING_EVENTS} + + assert compute_last_event(visit_stats_d) is None