diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -315,6 +315,8 @@ visit_type: str, count: int, policy: str, + enabled: bool = True, + lister_uuid: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), @@ -333,7 +335,7 @@ where_clauses = [] # "NOT enabled" = the lister said the origin no longer exists - where_clauses.append("enabled") + where_clauses.append("enabled" if enabled else "not enabled") # Only schedule visits of the given type where_clauses.append("visit_type = %s") @@ -429,6 +431,10 @@ else: table = "listed_origins" + if lister_uuid: + where_clauses.append("lister_id = %s") + query_args.append(lister_uuid) + select_query = f""" SELECT {origin_select_cols} diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -155,10 +155,24 @@ @click.option( "--tablesample", help="Table sampling percentage", type=float, ) +@click.option( + "--with-enabled/--without-enabled", "enabled", is_flag=True, default=True, +) +@click.option( + "--lister-uuid", + default=None, + help="Limit origins to those listed from such lister", +) @click.argument("type", type=str) @click.pass_context def send_to_celery( - ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str + ctx, + policy: str, + queue: Optional[str], + tablesample: Optional[float], + type: str, + enabled: bool, + lister_uuid: Optional[str] = None, ): """Send the next origin visits of the TYPE loader to celery, filling the queue.""" from kombu.utils.uuid import uuid @@ -176,11 +190,17 @@ print(num_tasks, "slots available in celery queue") origins = scheduler.grab_next_visits( - type, num_tasks, policy=policy, tablesample=tablesample + type, + num_tasks, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_uuid=lister_uuid, ) print(len(origins), "visits to send to celery") for origin in origins: + print("####", origin) task_dict = origin.as_task_dict() app.send_task( task_name, diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -388,6 +388,8 @@ visit_type: str, count: int, policy: str, + enabled: bool = True, + lister_uuid: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), @@ -404,6 +406,10 @@ visit_type: type of visits to schedule count: number of visits to schedule policy: the scheduling policy used to select which visits to schedule + enabled: Determine whether we want to list enabled or disabled origins. As + default, we want reasonably enabled origins. For some edge case, we might + want the others. + lister_uuid: Determine the list of origins listed from the lister with uuid timestamp: the mocked timestamp at which we're recording that the visits are being scheduled (defaults to the current time) scheduled_cooldown: the minimal interval before which we can schedule @@ -414,6 +420,7 @@ notfound origin tablesample: the percentage of the table on which we run the query (None: no sampling) + """ ...