diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -974,6 +974,24 @@ ) return [OriginVisitStats(**row) for row in rows] + @db_transaction() + def visit_scheduler_queue_position_get( + self, db=None, cur=None, + ) -> Dict[str, datetime.datetime]: + cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position") + return {row["visit_type"]: row["position"] for row in cur} + + @db_transaction() + def visit_scheduler_queue_position_set( + self, visit_type: str, position: datetime.datetime, db=None, cur=None, + ) -> None: + query = """ + INSERT INTO visit_scheduler_queue_position(visit_type, position) + VALUES(%s, %s) + ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position + """ + cur.execute(query, (visit_type, position)) + @db_transaction() def update_metrics( self, diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -440,6 +440,25 @@ """ ... + @remote_api_endpoint("visit_scheduler/get") + def visit_scheduler_queue_position_get(self,) -> Dict[str, datetime.datetime]: + """Retrieve all current queue positions for the recurrent visit scheduler. + + Returns + Mapping of visit type to their current queue position + + """ + ... + + @remote_api_endpoint("visit_scheduler/set") + def visit_scheduler_queue_position_set( + self, visit_type: str, position: datetime.datetime + ) -> None: + """Set the current queue position of the recurrent visit scheduler for `visit_type`. + + """ + ... + @remote_api_endpoint("scheduler_metrics/update") def update_metrics( self, diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -177,9 +177,11 @@ next_visit_queue_position timestamptz, -- duration that we expect to wait between visits of this origin next_position_offset int not null default 4, + primary key (url, visit_type) ); +comment on table origin_visit_stats is 'Aggregated information on visits for each origin in the archive'; comment on column origin_visit_stats.url is 'Origin URL'; comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; @@ -192,6 +194,17 @@ comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; +create table visit_scheduler_queue_position ( + visit_type text not null, + position timestamptz not null, + + primary key (visit_type) +); + +comment on table visit_scheduler_queue_position is 'Current queue position for the recurrent visit scheduler'; +comment on column visit_scheduler_queue_position.visit_type is 'Visit type'; +comment on column visit_scheduler_queue_position.position is 'Current position for the runner of this visit type'; + create table scheduler_metrics ( lister_id uuid not null references listers(id), visit_type text not null, diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -70,6 +70,8 @@ "task_type/create", "task_type/get", "task_type/get_all", + "visit_scheduler/get", + "visit_scheduler/set", "visit_stats/get", "visit_stats/upsert", )