Page MenuHomeSoftware Heritage

D6146.id.diff
No OneTemporary

D6146.id.diff

diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -145,6 +145,52 @@
click.echo_via_pager("\n".join(output))
+@origin.command("send-to-celery")
+@click.option(
+ "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy"
+)
+@click.option(
+ "--queue", "-q", help="Target celery queue", type=str,
+)
+@click.option(
+ "--tablesample", help="Table sampling percentage", type=float,
+)
+@click.argument("type", type=str)
+@click.pass_context
+def send_to_celery(
+ ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str
+):
+ """Send the next origin visits of the TYPE loader to celery, filling the queue."""
+ from kombu.utils.uuid import uuid
+
+ from swh.scheduler.celery_backend.config import app, get_available_slots
+
+ scheduler = ctx.obj["scheduler"]
+
+ task_type = scheduler.get_task_type(f"load-{type}")
+
+ task_name = task_type["backend_name"]
+ queue_name = queue or task_name
+
+ num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
+
+ click.echo(f"{num_tasks} slots available in celery queue")
+ origins = scheduler.grab_next_visits(
+ type, num_tasks, policy=policy, tablesample=tablesample
+ )
+
+ click.echo(f"{len(origins)} visits to send to celery")
+ for origin in origins:
+ task_dict = origin.as_task_dict()
+ app.send_task(
+ task_name,
+ task_id=uuid(),
+ args=task_dict["arguments"]["args"],
+ kwargs=task_dict["arguments"]["kwargs"],
+ queue=queue_name,
+ )
+
+
@origin.command("update-metrics")
@click.option("--lister", default=None, help="Only update metrics for this lister")
@click.option(
diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py
--- a/swh/scheduler/tests/test_cli_origin.py
+++ b/swh/scheduler/tests/test_cli_origin.py
@@ -112,6 +112,40 @@
assert scheduled_tasks <= all_possible_tasks
+def test_send_to_celery(
+ mocker, swh_scheduler, swh_scheduler_celery_app, listed_origins_by_type,
+):
+ for task_type in TASK_TYPES.values():
+ swh_scheduler.create_task_type(task_type)
+
+ visit_type = next(iter(listed_origins_by_type))
+
+ for origins in listed_origins_by_type.values():
+ swh_scheduler.record_listed_origins(origins)
+
+ get_queue_length = mocker.patch(
+ "swh.scheduler.celery_backend.config.get_queue_length"
+ )
+ get_queue_length.return_value = None
+
+ send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task")
+ send_task.return_value = None
+
+ result = invoke(swh_scheduler, args=("send-to-celery", visit_type))
+ assert result.exit_code == 0
+
+ scheduled_tasks = {
+ (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list
+ }
+
+ expected_tasks = {
+ (TASK_TYPES[origin.visit_type]["backend_name"], origin.url)
+ for origin in listed_origins_by_type[visit_type]
+ }
+
+ assert expected_tasks == scheduled_tasks
+
+
def test_update_metrics(swh_scheduler, listed_origins):
swh_scheduler.record_listed_origins(listed_origins)

File Metadata

Mime Type
text/plain
Expires
Mar 17 2025, 6:53 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219695

Event Timeline