diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py new file mode 100644 index 0000000..e55835e --- /dev/null +++ b/swh/scheduler/cli/add_forge_now.py @@ -0,0 +1,91 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import click + +from . import cli + +if TYPE_CHECKING: + from typing import Dict, List, Optional + + +@cli.group("add-forge-now") +@click.pass_context +def add_forge_now(ctx): + """Manipulate listed origins.""" + if not ctx.obj["scheduler"]: + raise ValueError("Scheduler class (local/remote) must be instantiated") + + +@add_forge_now.command("schedule-first-visits") +@click.option( + "--type-name", + "-t", + "visit_type_names", + help="Visit/loader type (can be provided multiple times)", + type=str, + multiple=True, +) +@click.option( + "--production/--staging", + "enabled", + is_flag=True, + default=True, + help="""Determine whether we want to scheduled enabled origins (on production) or + disabled ones (on staging).""", +) +@click.option( + "--lister-name", + default=None, + help="Limit origins to those listed from lister with provided name", +) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", +) +@click.pass_context +def schedule_first_visits_cli( + ctx, + visit_type_names: List[str], + enabled: bool, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, +): + """Send next origin visits of VISIT_TYPE_NAME(S) loader to celery, filling the + associated add_forge_now queue(s). + + """ + from .utils import get_task_type, send_to_celery + + scheduler = ctx.obj["scheduler"] + + visit_type_to_queue: Dict[str, str] = {} + unknown_task_types = [] + for visit_type_name in visit_type_names: + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + unknown_task_types.append(visit_type_name) + continue + queue_name = task_type["backend_name"] + visit_type_to_queue[visit_type_name] = f"add_forge_now:{queue_name}" + + if unknown_task_types: + raise ValueError(f"Unknown task types {','.join(unknown_task_types)}.") + + send_to_celery( + scheduler, + visit_type_to_queue=visit_type_to_queue, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py index 97634af..1a0b4f3 100644 --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -1,271 +1,253 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations -from typing import TYPE_CHECKING, Iterable, List, Optional +from typing import TYPE_CHECKING import click from . import cli from ..utils import create_origin_task_dicts if TYPE_CHECKING: + from typing import Iterable, List, Optional from uuid import UUID from ..interface import SchedulerInterface from ..model import ListedOrigin @cli.group("origin") @click.pass_context def origin(ctx): """Manipulate listed origins.""" if not ctx.obj["scheduler"]: raise ValueError("Scheduler class (local/remote) must be instantiated") def format_origins( origins: List[ListedOrigin], fields: Optional[List[str]] = None, with_header: bool = True, ) -> Iterable[str]: """Format a list of origins as CSV. Arguments: origins: list of origins to output fields: optional list of fields to output (defaults to all fields) with_header: if True, output a CSV header. """ import csv from io import StringIO import attr from ..model import ListedOrigin expected_fields = [field.name for field in attr.fields(ListedOrigin)] if not fields: fields = expected_fields unknown_fields = set(fields) - set(expected_fields) if unknown_fields: raise ValueError( "Unknown ListedOrigin field(s): %s" % ", ".join(unknown_fields) ) output = StringIO() writer = csv.writer(output) def csv_row(data): """Return a single CSV-formatted row. We clear the output buffer after we're done to keep it reasonably sized.""" writer.writerow(data) output.seek(0) ret = output.read().rstrip() output.seek(0) output.truncate() return ret if with_header: yield csv_row(fields) for origin in origins: yield csv_row(str(getattr(origin, field)) for field in fields) @origin.command("grab-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--fields", "-f", default=None, help="Listed origin fields to print on output" ) @click.option( "--with-header/--without-header", is_flag=True, default=True, help="Print the CSV header?", ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def grab_next( ctx, policy: str, fields: Optional[str], with_header: bool, type: str, count: int ): """Grab the next COUNT origins to visit using the TYPE loader from the listed origins table.""" if fields: parsed_fields: Optional[List[str]] = fields.split(",") else: parsed_fields = None scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) for line in format_origins(origins, fields=parsed_fields, with_header=with_header): click.echo(line) @origin.command("schedule-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def schedule_next(ctx, policy: str, type: str, count: int): """Send the next COUNT origin visits of the TYPE loader to the scheduler as one-shot tasks.""" from ..utils import utcnow from .task import pretty_print_task scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) created = scheduler.create_tasks( [ { **task_dict, "policy": "oneshot", "next_run": utcnow(), "retries_left": 1, } for task_dict in create_origin_task_dicts(origins, scheduler) ] ) output = ["Created %d tasks\n" % len(created)] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager("\n".join(output)) @origin.command("send-to-celery") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--queue", "-q", help="Target celery queue", type=str, ) @click.option( "--tablesample", help="Table sampling percentage", type=float, ) @click.option( "--only-enabled/--only-disabled", "enabled", is_flag=True, default=True, help="""Determine whether we want to scheduled enabled or disabled origins. As default, we want to reasonably deal with enabled origins. For some edge case though, we might want the disabled ones.""", ) @click.option( "--lister-name", default=None, help="Limit origins to those listed from lister with provided name", ) @click.option( "--lister-instance-name", default=None, help="Limit origins to those listed from lister with instance name", ) -@click.argument("type", type=str) +@click.argument("visit_type_name", type=str) @click.pass_context -def send_to_celery( +def send_to_celery_cli( ctx, policy: str, queue: Optional[str], tablesample: Optional[float], - type: str, + visit_type_name: str, enabled: bool, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, ): - """Send the next origin visits of the TYPE loader to celery, filling the queue.""" - from kombu.utils.uuid import uuid + """Send next origin visits of VISIT_TYPE_NAME to celery, filling the queue.""" - from swh.scheduler.celery_backend.config import app, get_available_slots + from .utils import get_task_type, send_to_celery scheduler = ctx.obj["scheduler"] - task_type = scheduler.get_task_type(f"load-{type}") - - task_name = task_type["backend_name"] - queue_name = queue or task_name - - num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + raise ValueError(f"Unknown task type {task_type}.") - click.echo(f"{num_tasks} slots available in celery queue") + queue_name = queue or task_type["backend_name"] - lister_uuid: Optional[str] = None - if lister_name and lister_instance_name: - lister = scheduler.get_lister(lister_name, lister_instance_name) - if lister: - lister_uuid = lister.id - - origins = scheduler.grab_next_visits( - type, - num_tasks, + send_to_celery( + scheduler, + visit_type_to_queue={visit_type_name: queue_name}, policy=policy, tablesample=tablesample, enabled=enabled, - lister_uuid=lister_uuid, + lister_name=lister_name, + lister_instance_name=lister_instance_name, ) - click.echo(f"{len(origins)} visits to send to celery") - for task_dict in create_origin_task_dicts(origins, scheduler): - app.send_task( - task_name, - task_id=uuid(), - args=task_dict["arguments"]["args"], - kwargs=task_dict["arguments"]["kwargs"], - queue=queue_name, - ) - @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( "--instance", default=None, help="Only update metrics for this lister instance" ) @click.pass_context def update_metrics(ctx, lister: Optional[str], instance: Optional[str]): """Update the scheduler metrics on listed origins. Examples: swh scheduler origin update-metrics swh scheduler origin update-metrics --lister github swh scheduler origin update-metrics --lister phabricator --instance llvm """ import json import attr scheduler: SchedulerInterface = ctx.obj["scheduler"] lister_id: Optional[UUID] = None if lister is not None: lister_instance = scheduler.get_lister(name=lister, instance_name=instance) if not lister_instance: click.echo(f"Lister not found: {lister} instance={instance}") ctx.exit(2) assert False # for mypy lister_id = lister_instance.id def dictify_metrics(d): return {k: str(v) for (k, v) in attr.asdict(d).items()} ret = scheduler.update_metrics(lister_id=lister_id) click.echo(json.dumps(list(map(dictify_metrics, ret)), indent=4, sort_keys=True)) diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py index 4832212..5d68b82 100644 --- a/swh/scheduler/cli/utils.py +++ b/swh/scheduler/cli/utils.py @@ -1,102 +1,179 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control +from __future__ import annotations + +from typing import TYPE_CHECKING + import click +if TYPE_CHECKING: + from typing import Dict, Optional + + from swh.scheduler.interface import SchedulerInterface + + TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler def schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs): from itertools import islice from swh.scheduler.utils import create_task_dict nb_origins = 0 nb_tasks = 0 while True: task_batch = [] for _ in range(TASK_BATCH_SIZE): # Group origins origin_batch = [] for origin in islice(origins, origin_batch_size): origin_batch.append(origin) nb_origins += len(origin_batch) if not origin_batch: break # Create a task for these origins args = [origin_batch] task_dict = create_task_dict(task_type, "oneshot", *args, **kwargs) task_batch.append(task_dict) # Schedule a batch of tasks if not task_batch: break nb_tasks += len(task_batch) if scheduler: scheduler.create_tasks(task_batch) click.echo("Scheduled %d tasks (%d origins)." % (nb_tasks, nb_origins)) # Print final status. if nb_tasks: click.echo("Done.") else: click.echo("Nothing to do (no origin metadata matched the criteria).") def parse_argument(option): import yaml if option == "": # yaml.safe_load("") returns None return "" try: return yaml.safe_load(option) except Exception: raise click.ClickException("Invalid argument: {}".format(option)) def parse_options(options): """Parses options from a CLI as YAML and turns it into Python args and kwargs. >>> parse_options([]) ([], {}) >>> parse_options(['foo', 'bar']) (['foo', 'bar'], {}) >>> parse_options(['[foo, bar]']) ([['foo', 'bar']], {}) >>> parse_options(['"foo"', '"bar"']) (['foo', 'bar'], {}) >>> parse_options(['foo="bar"']) ([], {'foo': 'bar'}) >>> parse_options(['"foo"', 'bar="baz"']) (['foo'], {'bar': 'baz'}) >>> parse_options(['42', 'bar=False']) ([42], {'bar': False}) >>> parse_options(['42', 'bar=false']) ([42], {'bar': False}) >>> parse_options(['foo', '']) (['foo', ''], {}) >>> parse_options(['foo', 'bar=']) (['foo'], {'bar': ''}) >>> parse_options(['foo', 'null']) (['foo', None], {}) >>> parse_options(['foo', 'bar=null']) (['foo'], {'bar': None}) >>> parse_options(['42', '"foo']) Traceback (most recent call last): ... click.exceptions.ClickException: Invalid argument: "foo """ kw_pairs = [x.split("=", 1) for x in options if "=" in x] args = [parse_argument(x) for x in options if "=" not in x] kw = {k: parse_argument(v) for (k, v) in kw_pairs} return (args, kw) + + +def get_task_type(scheduler: SchedulerInterface, visit_type: str) -> Optional[Dict]: + "Given a visit type, return its associated task type." + return scheduler.get_task_type(f"load-{visit_type}") + + +def send_to_celery( + scheduler: SchedulerInterface, + visit_type_to_queue: Dict[str, str], + enabled: bool = True, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, + policy: str = "oldest_scheduled_first", + tablesample: Optional[float] = None, +): + """Utility function to read tasks from the scheduler and send those directly to + celery. + + Args: + visit_type_to_queue: Optional mapping of visit/loader type (e.g git, svn, ...) + to queue to send task to. + enabled: Determine whether we want to list enabled or disabled origins. As + default, we want reasonably enabled origins. For some edge case, we might + want the others. + lister_name: Determine the list of origins listed from the lister with name + lister_instance_name: Determine the list of origins listed from the lister + with instance name + policy: the scheduling policy used to select which visits to schedule + tablesample: the percentage of the table on which we run the query + (None: no sampling) + + """ + + from kombu.utils.uuid import uuid + + from swh.scheduler.celery_backend.config import app, get_available_slots + + from ..utils import create_origin_task_dicts + + for visit_type_name, queue_name in visit_type_to_queue.items(): + task_type = get_task_type(scheduler, visit_type_name) + assert task_type is not None + task_name = task_type["backend_name"] + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + + click.echo(f"{num_tasks} slots available in celery queue") + + origins = scheduler.grab_next_visits( + visit_type_name, + num_tasks, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) + + click.echo(f"{len(origins)} visits to send to celery") + for task_dict in create_origin_task_dicts(origins, scheduler): + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py new file mode 100644 index 0000000..d09640e --- /dev/null +++ b/swh/scheduler/tests/test_cli_add_forge_now.py @@ -0,0 +1,88 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Tuple + +import attr +import pytest + +from swh.scheduler.tests.common import TASK_TYPES +from swh.scheduler.tests.test_cli import invoke as basic_invoke + + +def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): + return basic_invoke( + scheduler, args=["add-forge-now", *args], catch_exceptions=catch_exceptions + ) + + +def test_schedule_first_visits_cli_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + args=( + "schedule-first-visits", + "-t", + "unknown-vt0", + "--type-name", + "unknown-visit-type1", + ), + ) + + +@pytest.mark.parametrize( + "extra_cmd_args", + [ + [], + ["--lister-name", "github", "--lister-instance-name", "github"], + ["--staging"], + ], +) +def test_schedule_first_visits_cli( + mocker, + swh_scheduler, + swh_scheduler_celery_app, + listed_origins_by_type, + extra_cmd_args, +): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + visit_type = next(iter(listed_origins_by_type)) + + # enabled origins by default except when --staging flag is provided + enabled = "--staging" not in extra_cmd_args + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins( + (attr.evolve(o, enabled=enabled) for o in origins) + ) + + get_queue_length = mocker.patch( + "swh.scheduler.celery_backend.config.get_queue_length" + ) + get_queue_length.return_value = None + + send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") + send_task.return_value = None + + cmd_args = ["schedule-first-visits", "--type-name", visit_type] + extra_cmd_args + + result = invoke(swh_scheduler, args=tuple(cmd_args)) + assert result.exit_code == 0 + + scheduled_tasks = { + (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list + } + + expected_tasks = { + (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) + for origin in listed_origins_by_type[visit_type] + } + + assert expected_tasks == scheduled_tasks diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py index 1a4949e..b6bb85c 100644 --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -1,169 +1,177 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Tuple import pytest from swh.scheduler.cli.origin import format_origins from swh.scheduler.tests.common import TASK_TYPES from swh.scheduler.tests.test_cli import invoke as basic_invoke def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): return basic_invoke( scheduler, args=["origin", *args], catch_exceptions=catch_exceptions ) def test_cli_origin(swh_scheduler): """Check that swh scheduler origin returns its help text""" result = invoke(swh_scheduler) assert "Commands:" in result.stdout def test_format_origins_basic(listed_origins): listed_origins = listed_origins[:100] basic_output = list(format_origins(listed_origins)) # 1 header line + all origins assert len(basic_output) == len(listed_origins) + 1 no_header_output = list(format_origins(listed_origins, with_header=False)) assert basic_output[1:] == no_header_output def test_format_origins_fields_unknown(listed_origins): listed_origins = listed_origins[:10] it = format_origins(listed_origins, fields=["unknown_field"]) with pytest.raises(ValueError, match="unknown_field"): next(it) def test_format_origins_fields(listed_origins): listed_origins = listed_origins[:10] fields = ["lister_id", "url", "visit_type"] output = list(format_origins(listed_origins, fields=fields)) assert output[0] == ",".join(fields) for i, origin in enumerate(listed_origins): assert output[i + 1] == f"{origin.lister_id},{origin.url},{origin.visit_type}" def test_grab_next(swh_scheduler, listed_origins_by_type): NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested # XXX: should test all of 'listed_origins_by_type' here... visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("grab-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 out_lines = result.stdout.splitlines() assert len(out_lines) == NUM_RESULTS + 1 fields = out_lines[0].split(",") returned_origins = [dict(zip(fields, line.split(","))) for line in out_lines[1:]] # Check that we've received origins we had listed in the first place assert set(origin["url"] for origin in returned_origins) <= set( origin.url for origin in listed_origins_by_type[visit_type] ) def test_schedule_next(swh_scheduler, listed_origins_by_type): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("schedule-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 # pull all tasks out of the scheduler tasks = swh_scheduler.search_tasks() assert len(tasks) == NUM_RESULTS scheduled_tasks = { (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks } all_possible_tasks = { (f"load-{origin.visit_type}", origin.url) for origin in listed_origins_by_type[visit_type] } assert scheduled_tasks <= all_possible_tasks +def test_send_to_celery_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke(swh_scheduler, args=("send-to-celery", "unknown-visit-type")) + + @pytest.mark.parametrize( "extra_cmd_args", [[], ["--lister-name", "github", "--lister-instance-name", "github"]], ) def test_send_to_celery( mocker, swh_scheduler, swh_scheduler_celery_app, listed_origins_by_type, extra_cmd_args, ): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) visit_type = next(iter(listed_origins_by_type)) for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) get_queue_length = mocker.patch( "swh.scheduler.celery_backend.config.get_queue_length" ) get_queue_length.return_value = None send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") send_task.return_value = None cmd_args = ["send-to-celery", visit_type] + extra_cmd_args result = invoke(swh_scheduler, args=tuple(cmd_args)) assert result.exit_code == 0 scheduled_tasks = { (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list } expected_tasks = { (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) for origin in listed_origins_by_type[visit_type] } assert expected_tasks == scheduled_tasks def test_update_metrics(swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) assert swh_scheduler.get_metrics() == [] result = invoke(swh_scheduler, args=("update-metrics",)) assert result.exit_code == 0 assert swh_scheduler.get_metrics() != []