diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -342,6 +342,8 @@ visit_type: str, count: int, policy: str, + enabled: bool = True, + lister_uuid: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), @@ -363,7 +365,7 @@ common_table_expressions: List[Tuple[str, str]] = [] # "NOT enabled" = the lister said the origin no longer exists - where_clauses.append("enabled") + where_clauses.append("enabled" if enabled else "not enabled") # Only schedule visits of the given type where_clauses.append("visit_type = %s") @@ -466,6 +468,10 @@ else: table = "listed_origins" + if lister_uuid: + where_clauses.append("lister_id = %s") + query_args.append(lister_uuid) + # fmt: off common_table_expressions.insert(0, ("selected_origins", f""" SELECT diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -20,7 +20,7 @@ def run_ready_tasks( backend: SchedulerInterface, app, - task_types: List[Dict] = [], + task_types_list: List[Dict] = [], with_priority: bool = False, ) -> List[Dict]: """Schedule tasks ready to be scheduled. @@ -59,11 +59,11 @@ """ all_backend_tasks: List[Dict] = [] while True: - if not task_types: - task_types = backend.get_task_types() + if not task_types_list: + task_types_list = backend.get_task_types() task_types_d = {} pending_tasks = [] - for task_type in task_types: + for task_type in task_types_list: task_type_name = task_type["type"] task_types_d[task_type_name] = task_type max_queue_length = task_type["max_queue_length"] or 0 diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -28,10 +28,13 @@ "task_type_names", multiple=True, default=[], - help="Task type names (e.g load-git, load-hg, list-github-full, ...) to schedule.", + help=( + "Task types to schedule. If not provided, this iterates over every " + "task types referenced in the scheduler backend." + ), ) @click.option( - "--with-priority/--without-priority", + "--with-priority / --without-priority", is_flag=True, default=False, help=( @@ -56,14 +59,19 @@ scheduler = ctx.obj["scheduler"] logger.debug("Scheduler %s" % scheduler) task_types = [] - for task_type_name in task_type_names: - task_type = scheduler.get_task_type(task_type_name) - if not task_type: - raise ValueError(f"Unknown {task_type_name}") - task_types.append(task_type) + + if not task_type_names: + for task_type_name in task_type_names: + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown {task_type_name}") + task_types.append(task_type) try: while True: + if not task_types: + task_types = scheduler.get_task_types() + logger.debug("Run ready tasks") try: ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -155,10 +155,24 @@ @click.option( "--tablesample", help="Table sampling percentage", type=float, ) +@click.option( + "--with-enabled/--without-enabled", "enabled", is_flag=True, default=True, +) +@click.option( + "--lister-uuid", + default=None, + help="Limit origins to those listed from such lister", +) @click.argument("type", type=str) @click.pass_context def send_to_celery( - ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str + ctx, + policy: str, + queue: Optional[str], + tablesample: Optional[float], + type: str, + enabled: bool, + lister_uuid: Optional[str] = None, ): """Send the next origin visits of the TYPE loader to celery, filling the queue.""" from kombu.utils.uuid import uuid @@ -176,7 +190,12 @@ print(num_tasks, "slots available in celery queue") origins = scheduler.grab_next_visits( - type, num_tasks, policy=policy, tablesample=tablesample + type, + num_tasks, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_uuid=lister_uuid, ) print(len(origins), "visits to send to celery") diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -394,6 +394,8 @@ visit_type: str, count: int, policy: str, + enabled: bool = True, + lister_uuid: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), @@ -410,6 +412,10 @@ visit_type: type of visits to schedule count: number of visits to schedule policy: the scheduling policy used to select which visits to schedule + enabled: Determine whether we want to list enabled or disabled origins. As + default, we want reasonably enabled origins. For some edge case, we might + want the others. + lister_uuid: Determine the list of origins listed from the lister with uuid timestamp: the mocked timestamp at which we're recording that the visits are being scheduled (defaults to the current time) scheduled_cooldown: the minimal interval before which we can schedule @@ -420,6 +426,7 @@ not_found origin tablesample: the percentage of the table on which we run the query (None: no sampling) + """ ... diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -100,7 +100,8 @@ swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + task_types = swh_scheduler.get_task_types() + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) assert backend_tasks for task in backend_tasks: # Make sure the task completed @@ -135,7 +136,8 @@ assert task["priority"] is None task_ids.add(task["id"]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + task_types = swh_scheduler.get_task_types() + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) assert len(backend_tasks) == len(tasks) scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) @@ -179,8 +181,9 @@ assert task["priority"] is not None task_ids.add(task["id"]) + task_types = swh_scheduler.get_task_types() backend_tasks = run_ready_tasks( - swh_scheduler, swh_scheduler_celery_app, task_types=[], with_priority=True + swh_scheduler, swh_scheduler_celery_app, task_types, with_priority=True ) assert len(backend_tasks) == len(tasks) @@ -206,7 +209,8 @@ swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) - backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + task_types = swh_scheduler.get_task_types() + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app, task_types) assert len(backend_tasks) == 1 task = backend_tasks[0]