diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -974,6 +974,24 @@ ) return [OriginVisitStats(**row) for row in rows] + @db_transaction() + def visit_scheduler_queue_position_get( + self, db=None, cur=None, + ) -> Dict[str, datetime.datetime]: + cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position") + return {row["visit_type"]: row["position"] for row in cur} + + @db_transaction() + def visit_scheduler_queue_position_set( + self, visit_type: str, position: datetime.datetime, db=None, cur=None, + ) -> None: + query = """ + INSERT INTO visit_scheduler_queue_position(visit_type, position) + VALUES(%s, %s) + ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position + """ + cur.execute(query, (visit_type, position)) + @db_transaction() def update_metrics( self, diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -440,6 +440,25 @@ """ ... + @remote_api_endpoint("visit_scheduler/get") + def visit_scheduler_queue_position_get(self,) -> Dict[str, datetime.datetime]: + """Retrieve all current queue positions for the recurrent visit scheduler. + + Returns + Mapping of visit type to their current queue position + + """ + ... + + @remote_api_endpoint("visit_scheduler/set") + def visit_scheduler_queue_position_set( + self, visit_type: str, position: datetime.datetime + ) -> None: + """Set the current queue position of the recurrent visit scheduler for `visit_type`. + + """ + ... + @remote_api_endpoint("scheduler_metrics/update") def update_metrics( self, diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -26,12 +26,29 @@ return max(filtered_dates) +def update_next_position_offset(visit_stats: Dict, increment: int) -> None: + """Update the next position offset according to existing value and the increment. The + resulting value must be a positive integer. + + """ + visit_stats["next_position_offset"] = max( + 0, visit_stats["next_position_offset"] + increment + ) + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topics, then inserts them in the scheduler "origin_visit_stats" table. + FIXME: Clarify the existing behavior + FIXME2: Explicit the new perimeter + + This is also in charge of updating the `next_visit_queue_position` (~ time at which + some new objects are expected to be added for the origin), next_position_offset + (duration that we expect to wait between visits of this origin). + Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. @@ -79,22 +96,20 @@ visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) - elif msg_dict["status"] == "failed": - visit_stats_d["last_failed"] = max_date( - msg_dict["date"], visit_stats_d.get("last_failed") - ) - elif msg_dict["snapshot"] is None: + update_next_position_offset(visit_stats_d, 1) # visit less often + elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) + update_next_position_offset(visit_stats_d, 1) # visit less often else: # visit with snapshot, something happened if visit_stats_d["last_snapshot"] is None: # first time visit with snapshot, we keep relevant information visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: - # visit with snapshot already stored, last_eventful should already be - # stored + # last_snapshot is set, so an eventful visit should have previously been + # recorded assert visit_stats_d["last_eventful"] is not None latest_recorded_visit_date = max_date( visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] @@ -111,6 +126,8 @@ # new eventful visit (new snapshot) visit_stats_d["last_eventful"] = current_status_date visit_stats_d["last_snapshot"] = msg_dict["snapshot"] + # Visit this origin more often in the future + update_next_position_offset(visit_stats_d, -2) else: # same snapshot as before if ( @@ -127,6 +144,8 @@ visit_stats_d["last_eventful"] = min( visit_stats_d["last_eventful"], current_status_date ) + # Visit this origin less often in the future + update_next_position_offset(visit_stats_d, 1) elif ( latest_recorded_visit_date and current_status_date == latest_recorded_visit_date @@ -137,6 +156,8 @@ else: # uneventful event visit_stats_d["last_uneventful"] = current_status_date + # Visit this origin less often in the future + update_next_position_offset(visit_stats_d, 1) scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -204,6 +204,10 @@ last_snapshot = attr.ib( type=Optional[bytes], validator=type_validator(), default=None ) + next_visit_queue_position = attr.ib( + type=Optional[datetime.datetime], validator=type_validator(), default=None + ) + next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) @last_eventful.validator def check_last_eventful(self, attribute, value): @@ -221,6 +225,10 @@ def check_last_notfound(self, attribute, value): check_timestamptz(value) + @next_visit_queue_position.validator + def check_next_visit_queue_position(self, attribute, value): + check_timestamptz(value) + @attr.s(frozen=True, slots=True) class SchedulerMetrics(BaseSchedulerModel): diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -172,10 +172,16 @@ last_scheduled timestamptz, -- last snapshot resulting from an eventful visit last_snapshot bytea, + -- position in the global queue, the "time" at which we expect the origin to have new + -- objects + next_visit_queue_position timestamptz, + -- duration that we expect to wait between visits of this origin + next_position_offset int not null default 4, primary key (url, visit_type) ); +comment on table origin_visit_stats is 'Aggregated information on visits for each origin in the archive'; comment on column origin_visit_stats.url is 'Origin URL'; comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; @@ -185,6 +191,19 @@ comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; +comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; +comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; + +create table visit_scheduler_queue_position ( + visit_type text not null, + position timestamptz not null, + + primary key (visit_type) +); + +comment on table visit_scheduler_queue_position is 'Current queue position for the recurrent visit scheduler'; +comment on column visit_scheduler_queue_position.visit_type is 'Visit type'; +comment on column visit_scheduler_queue_position.position is 'Current position for the runner of this visit type'; create table scheduler_metrics ( lister_id uuid not null references listers(id), diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -70,6 +70,8 @@ "task_type/create", "task_type/get", "task_type/get_all", + "visit_scheduler/get", + "visit_scheduler/set", "visit_stats/get", "visit_stats/upsert", ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -147,6 +147,8 @@ last_failed=None, last_notfound=visit_status["date"], last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=5, ) ] @@ -183,6 +185,8 @@ last_failed=None, last_notfound=DATE3, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=7, ) ] @@ -237,6 +241,8 @@ last_failed=DATE3, last_notfound=None, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=7, ) ] @@ -275,6 +281,8 @@ last_failed=DATE2, last_notfound=None, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=6, ) ] @@ -329,6 +337,8 @@ last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + next_visit_queue_position=None, + next_position_offset=0, ) ] @@ -354,6 +364,8 @@ last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], + next_visit_queue_position=None, + next_position_offset=4, ) ] ) @@ -374,6 +386,8 @@ last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], + next_visit_queue_position=None, + next_position_offset=5, # uneventful so visit less often ) ] @@ -434,6 +448,8 @@ last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=5, # uneventful, visit origin less often in the future ) assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ @@ -647,7 +663,7 @@ def test_journal_client_origin_visit_status_several_upsert(swh_scheduler): - """A duplicated message must be ignored + """An old message updates old information """ visit_status1 = { @@ -676,18 +692,18 @@ {"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=DATE2, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - expected_visit_stats + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1, + last_uneventful=DATE2, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=5, + ) ] @@ -735,16 +751,16 @@ {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1, - last_uneventful=DATE1 + 2 * ONE_YEAR, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [ - expected_visit_stats + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_eventful=DATE1, + last_uneventful=DATE1 + 2 * ONE_YEAR, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=6, # 2 uneventful visits, whatever the permutation + ) ]