diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -1,11 +1,11 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations -from typing import TYPE_CHECKING, Iterable, List, Optional +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional import click @@ -146,6 +146,71 @@ click.echo_via_pager("\n".join(output)) +def get_task_type(scheduler: SchedulerInterface, visit_type: str) -> Optional[Dict]: + "Given a visit type, return its associated task type." + return scheduler.get_task_type(f"load-{visit_type}") + + +def send_to_celery( + scheduler: SchedulerInterface, + visit_type_to_queue: Dict[str, str], + enabled: bool = True, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, + policy: str = "oldest_scheduled_first", + tablesample: Optional[float] = None, +): + """Utility function to read tasks from the scheduler and send those directly to + celery. + + Args: + visit_type_to_queue: Optional mapping of visit/loader type (e.g git, svn, ...) + to queue to send task to. + enabled: Determine whether we want to list enabled or disabled origins. As + default, we want reasonably enabled origins. For some edge case, we might + want the others. + lister_name: Determine the list of origins listed from the lister with name + lister_instance_name: Determine the list of origins listed from the lister + with instance name + policy: the scheduling policy used to select which visits to schedule + tablesample: the percentage of the table on which we run the query + (None: no sampling) + + """ + + from kombu.utils.uuid import uuid + + from swh.scheduler.celery_backend.config import app, get_available_slots + + for visit_type_name, queue_name in visit_type_to_queue.items(): + task_type = get_task_type(scheduler, visit_type_name) + assert task_type is not None + task_name = task_type["backend_name"] + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + + click.echo(f"{num_tasks} slots available in celery queue") + + origins = scheduler.grab_next_visits( + visit_type_name, + num_tasks, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) + + click.echo(f"{len(origins)} visits to send to celery") + for task_dict in create_origin_task_dicts(origins, scheduler): + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + @origin.command("send-to-celery") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" @@ -180,58 +245,100 @@ default=None, help="Limit origins to those listed from lister with instance name", ) -@click.argument("type", type=str) +@click.argument("visit_type_name", type=str) @click.pass_context -def send_to_celery( +def send_to_celery_cli( ctx, policy: str, queue: Optional[str], tablesample: Optional[float], - type: str, + visit_type_name: str, enabled: bool, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, ): - """Send the next origin visits of the TYPE loader to celery, filling the queue.""" - from kombu.utils.uuid import uuid - - from swh.scheduler.celery_backend.config import app, get_available_slots - + """Send next origin visits of VISIT_TYPE_NAME to celery, filling the queue.""" scheduler = ctx.obj["scheduler"] - task_type = scheduler.get_task_type(f"load-{type}") + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + raise ValueError(f"Unknown task type {task_type}.") - task_name = task_type["backend_name"] - queue_name = queue or task_name + queue_name = queue or task_type["backend_name"] - num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) - - click.echo(f"{num_tasks} slots available in celery queue") - - lister_uuid: Optional[str] = None - if lister_name and lister_instance_name: - lister = scheduler.get_lister(lister_name, lister_instance_name) - if lister: - lister_uuid = lister.id - - origins = scheduler.grab_next_visits( - type, - num_tasks, + send_to_celery( + scheduler, + visit_type_to_queue={visit_type_name: queue_name}, policy=policy, tablesample=tablesample, enabled=enabled, - lister_uuid=lister_uuid, + lister_name=lister_name, + lister_instance_name=lister_instance_name, ) - click.echo(f"{len(origins)} visits to send to celery") - for task_dict in create_origin_task_dicts(origins, scheduler): - app.send_task( - task_name, - task_id=uuid(), - args=task_dict["arguments"]["args"], - kwargs=task_dict["arguments"]["kwargs"], - queue=queue_name, - ) + +@origin.command("add-forge-now") +@click.option( + "--type-name", + "-t", + "visit_type_names", + help="Visit/loader type (can be provided multiple times)", + type=str, + multiple=True, +) +@click.option( + "--only-enabled/--only-disabled", + "enabled", + is_flag=True, + default=True, + help="""Determine whether we want to scheduled enabled or disabled origins. As + default, we want to reasonably deal with enabled origins. For some edge case + though, we might want the disabled ones.""", +) +@click.option( + "--lister-name", + default=None, + help="Limit origins to those listed from lister with provided name", +) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", +) +@click.pass_context +def addforgenow_cli( + ctx, + visit_type_names: List[str], + enabled: bool, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, +): + """Send next origin visits of VISIT_TYPE_NAME(S) loader to celery, filling the + associated add_forge_now queue(s). + + """ + scheduler = ctx.obj["scheduler"] + + visit_type_to_queue: Dict[str, str] = {} + unknown_task_types = [] + for visit_type_name in visit_type_names: + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + unknown_task_types.append(visit_type_name) + continue + queue_name = task_type["backend_name"] + visit_type_to_queue[visit_type_name] = f"add_forge_now:{queue_name}" + + if unknown_task_types: + raise ValueError(f"Unknown task types {','.join(unknown_task_types)}.") + + send_to_celery( + scheduler, + visit_type_to_queue=visit_type_to_queue, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) @origin.command("update-metrics") diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -114,6 +114,14 @@ assert scheduled_tasks <= all_possible_tasks +def test_send_to_celery_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke(swh_scheduler, args=("send-to-celery", "unknown-visit-type")) + + @pytest.mark.parametrize( "extra_cmd_args", [[], ["--lister-name", "github", "--lister-instance-name", "github"]], @@ -158,6 +166,67 @@ assert expected_tasks == scheduled_tasks +def test_add_forge_now_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + args=( + "add-forge-now", + "-t", + "unknown-vt0", + "--type-name", + "unknown-visit-type1", + ), + ) + + +@pytest.mark.parametrize( + "extra_cmd_args", + [[], ["--lister-name", "github", "--lister-instance-name", "github"]], +) +def test_add_forge_now_cli( + mocker, + swh_scheduler, + swh_scheduler_celery_app, + listed_origins_by_type, + extra_cmd_args, +): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + visit_type = next(iter(listed_origins_by_type)) + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins(origins) + + get_queue_length = mocker.patch( + "swh.scheduler.celery_backend.config.get_queue_length" + ) + get_queue_length.return_value = None + + send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") + send_task.return_value = None + + cmd_args = ["add-forge-now", "--type-name", visit_type] + extra_cmd_args + + result = invoke(swh_scheduler, args=tuple(cmd_args)) + assert result.exit_code == 0 + + scheduled_tasks = { + (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list + } + + expected_tasks = { + (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) + for origin in listed_origins_by_type[visit_type] + } + + assert expected_tasks == scheduled_tasks + + def test_update_metrics(swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins)