diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -311,14 +311,30 @@ @db_transaction() def grab_next_visits( - self, visit_type: str, count: int, policy: str, db=None, cur=None, + self, + visit_type: str, + count: int, + policy: str, + timestamp: Optional[datetime.datetime] = None, + db=None, + cur=None, ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. - This will mark the origins as "being visited" in the listed_origins + This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin. + + Arguments: + visit_type: type of visits to schedule + count: number of visits to schedule + policy: the scheduling policy used to select which visits to schedule + timestamp: the mocked timestamp at which we're recording that the visits are + being scheduled (defaults to the current time) """ + if timestamp is None: + timestamp = utcnow() + origin_select_cols = ", ".join(ListedOrigin.select_columns()) query_args: List[Any] = [] @@ -399,7 +415,7 @@ url, visit_type, last_scheduled ) SELECT - url, visit_type, now() + url, visit_type, %s FROM selected_origins ON CONFLICT (url, visit_type) DO UPDATE @@ -413,6 +429,7 @@ FROM selected_origins """ + query_args.append(timestamp) cur.execute(query, tuple(query_args)) return [ListedOrigin(**d) for d in cur] diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -336,13 +336,24 @@ @remote_api_endpoint("origins/grab_next") def grab_next_visits( - self, visit_type: str, count: int, policy: str + self, + visit_type: str, + count: int, + policy: str, + timestamp: Optional[datetime.datetime] = None, ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. - This will mark the origins as "being visited" in the listed_origins + This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin. + + Arguments: + visit_type: type of visits to schedule + count: number of visits to schedule + policy: the scheduling policy used to select which visits to schedule + timestamp: the mocked timestamp at which we're recording that the visits are + being scheduled (defaults to the current time) """ ... diff --git a/swh/scheduler/simulator/origin_scheduler.py b/swh/scheduler/simulator/origin_scheduler.py --- a/swh/scheduler/simulator/origin_scheduler.py +++ b/swh/scheduler/simulator/origin_scheduler.py @@ -29,7 +29,7 @@ if remaining < min_batch_size: continue next_origins = env.scheduler.grab_next_visits( - visit_type, remaining, policy=policy + visit_type, remaining, policy=policy, timestamp=env.time ) logger.debug( "%s runner: running %s %s tasks",