diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -145,6 +145,52 @@ click.echo_via_pager("\n".join(output)) +@origin.command("send-to-celery") +@click.option( + "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" +) +@click.option( + "--queue", "-q", help="Target celery queue", type=str, +) +@click.option( + "--tablesample", help="Table sampling percentage", type=float, +) +@click.argument("type", type=str) +@click.pass_context +def send_to_celery( + ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str +): + """Send the next origin visits of the TYPE loader to celery, filling the queue.""" + from kombu.utils.uuid import uuid + + from swh.scheduler.celery_backend.config import app, get_available_slots + + scheduler = ctx.obj["scheduler"] + + task_type = scheduler.get_task_type(f"load-{type}") + + task_name = task_type["backend_name"] + queue_name = queue or task_name + + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + + click.echo(f"{num_tasks} slots available in celery queue") + origins = scheduler.grab_next_visits( + type, num_tasks, policy=policy, tablesample=tablesample + ) + + click.echo(f"{len(origins)} visits to send to celery") + for origin in origins: + task_dict = origin.as_task_dict() + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -112,6 +112,40 @@ assert scheduled_tasks <= all_possible_tasks +def test_send_to_celery( + mocker, swh_scheduler, swh_scheduler_celery_app, listed_origins_by_type, +): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + visit_type = next(iter(listed_origins_by_type)) + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins(origins) + + get_queue_length = mocker.patch( + "swh.scheduler.celery_backend.config.get_queue_length" + ) + get_queue_length.return_value = None + + send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") + send_task.return_value = None + + result = invoke(swh_scheduler, args=("send-to-celery", visit_type)) + assert result.exit_code == 0 + + scheduled_tasks = { + (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list + } + + expected_tasks = { + (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) + for origin in listed_origins_by_type[visit_type] + } + + assert expected_tasks == scheduled_tasks + + def test_update_metrics(swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins)