diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,6 +1,5 @@ pytest pytest-mock -celery >= 4.3 hypothesis >= 3.11.0 swh.lister swh.storage[testing] diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -85,7 +85,16 @@ ctx.obj["config"] = conf -from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa +from . import ( # noqa + add_forge_now, + admin, + celery_monitor, + journal, + origin, + simulator, + task, + task_type, +) def main(): diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/cli/add_forge_now.py @@ -0,0 +1,161 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import click + +from . import cli + +if TYPE_CHECKING: + from typing import Dict, List, Optional + + +@cli.group("add-forge-now") +@click.option( + "-e", + "--environment", + "environment", + default="production", + type=click.Choice(["production", "staging"]), + help="Determine environment to use, production by default.", +) +@click.pass_context +def add_forge_now(ctx, environment): + """Manipulate add-forge-now requests.""" + if not ctx.obj["scheduler"]: + raise ValueError("Scheduler class (local/remote) must be instantiated") + + ctx.obj["environment"] = environment + + +@add_forge_now.command("register-lister") +@click.argument("lister_name", nargs=1, required=True) +@click.argument("options", nargs=-1) +@click.pass_context +def register_lister_cli( + ctx, + lister_name, + options, +): + """Register the lister tasks in the scheduler. + + Depending on the environment, we'll want different policies: + - staging: only the "full" but limited listing (as "oneshot" task) of disabled + origins + - production: both "full" and "incremental" (if that exists) listing (as "recurring" + task). The "full" will be triggered asap, the "incremental" will be triggered the + next day. + + """ + from .utils import lister_task_type, parse_options, task_add + + scheduler = ctx.obj["scheduler"] + environment = ctx.obj["environment"] + + # Map the associated task types for the lister + task_type_names: Dict[str, str] = { + listing_type: lister_task_type(lister_name, listing_type) + for listing_type in ["full", "incremental"] + } + + task_types: Dict[str, Dict] = {} + for listing_type, task_type_name in task_type_names.items(): + task_type = scheduler.get_task_type(task_type_name) + if task_type: + task_types[listing_type] = task_type + + if not task_types: + raise ValueError(f"Unknown lister type {lister_name}.") + + (args, kw) = parse_options(options) + + # Recurring policy on production + if environment == "production": + policy = "recurring" + else: # staging, "full" but limited listing as a oneshot + policy = "oneshot" + kw.update({"max_pages": 3, "max_origins_per_page": 10, "enable_origins": False}) + # We want a "full" listing in production if both incremental and full exists + if "full" in task_types: + task_types.pop("incremental", None) + + from datetime import timedelta + + from swh.scheduler.utils import utcnow + + for listing_type, task_type in task_types.items(): + now = utcnow() + next_run = now if listing_type == "full" else now + timedelta(days=1) + task_add( + scheduler, + task_type_name=task_type["type"], + args=args, + kw=kw, + policy=policy, + next_run=next_run, + ) + + +@add_forge_now.command("schedule-first-visits") +@click.option( + "--type-name", + "-t", + "visit_type_names", + help="Visit/loader type (can be provided multiple times)", + type=str, + multiple=True, +) +@click.option( + "--lister-name", + default=None, + help="Limit origins to those listed from lister with provided name", +) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", +) +@click.pass_context +def schedule_first_visits_cli( + ctx, + visit_type_names: List[str], + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, +): + """Send next origin visits of VISIT_TYPE_NAME(S) loader to celery, filling the + associated add_forge_now queue(s). + + """ + from .utils import get_task_type, send_to_celery + + scheduler = ctx.obj["scheduler"] + environment = ctx.obj["environment"] + + visit_type_to_queue: Dict[str, str] = {} + unknown_task_types = [] + for visit_type_name in visit_type_names: + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + unknown_task_types.append(visit_type_name) + continue + queue_name = task_type["backend_name"] + visit_type_to_queue[visit_type_name] = f"add_forge_now:{queue_name}" + + if unknown_task_types: + raise ValueError(f"Unknown task types {','.join(unknown_task_types)}.") + + send_to_celery( + scheduler, + visit_type_to_queue=visit_type_to_queue, + enabled=environment == "production", + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -1,11 +1,11 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations -from typing import TYPE_CHECKING, Iterable, List, Optional +from typing import TYPE_CHECKING import click @@ -13,6 +13,7 @@ from ..utils import create_origin_task_dicts if TYPE_CHECKING: + from typing import Iterable, List, Optional from uuid import UUID from ..interface import SchedulerInterface @@ -121,7 +122,7 @@ """Send the next COUNT origin visits of the TYPE loader to the scheduler as one-shot tasks.""" from ..utils import utcnow - from .task import pretty_print_task + from .utils import pretty_print_task scheduler = ctx.obj["scheduler"] @@ -180,59 +181,40 @@ default=None, help="Limit origins to those listed from lister with instance name", ) -@click.argument("type", type=str) +@click.argument("visit_type_name", type=str) @click.pass_context -def send_to_celery( +def send_to_celery_cli( ctx, policy: str, queue: Optional[str], tablesample: Optional[float], - type: str, + visit_type_name: str, enabled: bool, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, ): - """Send the next origin visits of the TYPE loader to celery, filling the queue.""" - from kombu.utils.uuid import uuid + """Send next origin visits of VISIT_TYPE_NAME to celery, filling the queue.""" - from swh.scheduler.celery_backend.config import app, get_available_slots + from .utils import get_task_type, send_to_celery scheduler = ctx.obj["scheduler"] - task_type = scheduler.get_task_type(f"load-{type}") - - task_name = task_type["backend_name"] - queue_name = queue or task_name - - num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + raise ValueError(f"Unknown task type {task_type}.") - click.echo(f"{num_tasks} slots available in celery queue") + queue_name = queue or task_type["backend_name"] - lister_uuid: Optional[str] = None - if lister_name and lister_instance_name: - lister = scheduler.get_lister(lister_name, lister_instance_name) - if lister: - lister_uuid = lister.id - - origins = scheduler.grab_next_visits( - type, - num_tasks, + send_to_celery( + scheduler, + visit_type_to_queue={visit_type_name: queue_name}, policy=policy, tablesample=tablesample, enabled=enabled, - lister_uuid=lister_uuid, + lister_name=lister_name, + lister_instance_name=lister_instance_name, ) - click.echo(f"{len(origins)} visits to send to celery") - for task_dict in create_origin_task_dicts(origins, scheduler): - app.send_task( - task_name, - task_id=uuid(), - args=task_dict["arguments"]["args"], - kwargs=task_dict["arguments"]["kwargs"], - queue=queue_name, - ) - @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -27,128 +27,6 @@ DATETIME = click.DateTime() -def format_dict(d): - """Recursively format date objects in the dict passed as argument""" - import datetime - - ret = {} - for k, v in d.items(): - if isinstance(v, (datetime.date, datetime.datetime)): - v = v.isoformat() - elif isinstance(v, dict): - v = format_dict(v) - ret[k] = v - return ret - - -def pretty_print_list(list, indent=0): - """Pretty-print a list""" - return "".join("%s%r\n" % (" " * indent, item) for item in list) - - -def pretty_print_dict(dict, indent=0): - """Pretty-print a list""" - return "".join( - "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) - for key, value in sorted(dict.items()) - ) - - -def pretty_print_run(run, indent=4): - fmt = ( - "{indent}{backend_id} [{status}]\n" - "{indent} scheduled: {scheduled} [{started}:{ended}]" - ) - return fmt.format(indent=" " * indent, **format_dict(run)) - - -def pretty_print_task(task, full=False): - """Pretty-print a task - - If 'full' is True, also print the status and priority fields. - - >>> import datetime - >>> task = { - ... 'id': 1234, - ... 'arguments': { - ... 'args': ['foo', 'bar', True], - ... 'kwargs': {'key': 'value', 'key2': 42}, - ... }, - ... 'current_interval': datetime.timedelta(hours=1), - ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), - ... 'policy': 'oneshot', - ... 'priority': None, - ... 'status': 'next_run_not_scheduled', - ... 'type': 'test_task', - ... } - >>> print(click.unstyle(pretty_print_task(task))) - Task 1234 - Next run: ... (2019-02-21T13:52:35.407818) - Interval: 1:00:00 - Type: test_task - Policy: oneshot - Args: - 'foo' - 'bar' - True - Keyword args: - key: 'value' - key2: 42 - - >>> print(click.unstyle(pretty_print_task(task, full=True))) - Task 1234 - Next run: ... (2019-02-21T13:52:35.407818) - Interval: 1:00:00 - Type: test_task - Policy: oneshot - Status: next_run_not_scheduled - Priority:\x20 - Args: - 'foo' - 'bar' - True - Keyword args: - key: 'value' - key2: 42 - - """ - import humanize - - next_run = task["next_run"] - lines = [ - "%s %s\n" % (click.style("Task", bold=True), task["id"]), - click.style(" Next run: ", bold=True), - "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), - "\n", - click.style(" Interval: ", bold=True), - str(task["current_interval"]), - "\n", - click.style(" Type: ", bold=True), - task["type"] or "", - "\n", - click.style(" Policy: ", bold=True), - task["policy"] or "", - "\n", - ] - if full: - lines += [ - click.style(" Status: ", bold=True), - task["status"] or "", - "\n", - click.style(" Priority: ", bold=True), - task["priority"] or "", - "\n", - ] - lines += [ - click.style(" Args:\n", bold=True), - pretty_print_list(task["arguments"]["args"], indent=4), - click.style(" Keyword args:\n", bold=True), - pretty_print_dict(task["arguments"]["kwargs"], indent=4), - ] - - return "".join(lines) - - @cli.group("task") @click.pass_context def task(ctx): @@ -202,6 +80,8 @@ from swh.scheduler.utils import utcnow + from .utils import pretty_print_task + tasks = [] now = utcnow() scheduler = ctx.obj["scheduler"] @@ -261,39 +141,26 @@ which is considered as the lowest priority level. """ - from swh.scheduler.utils import utcnow - - from .utils import parse_options + from .utils import parse_options, task_add scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") - if scheduler.get_task_type(task_type_name) is None: - raise ValueError(f"Unknown task type {task_type_name}.") - - now = utcnow() + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown task name {task_type_name}.") (args, kw) = parse_options(options) - task = { - "type": task_type_name, - "policy": policy, - "priority": priority, - "arguments": { - "args": args, - "kwargs": kw, - }, - "next_run": next_run or now, - } - created = scheduler.create_tasks([task]) - - output = [ - "Created %d tasks\n" % len(created), - ] - for task in created: - output.append(pretty_print_task(task)) - - click.echo("\n".join(output)) + task_add( + scheduler, + task_type_name=task_type_name, + policy=policy, + priority=priority, + next_run=next_run, + args=args, + kw=kw, + ) def iter_origins( # use string annotations to prevent some pkg loading @@ -412,6 +279,8 @@ You can override the number of tasks to fetch with the --limit flag. """ + from .utils import pretty_print_task + scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") @@ -511,6 +380,8 @@ """List tasks.""" from operator import itemgetter + from .utils import pretty_print_run, pretty_print_task + scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") diff --git a/swh/scheduler/cli/test_cli_utils.py b/swh/scheduler/cli/test_cli_utils.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/cli/test_cli_utils.py @@ -0,0 +1,17 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.scheduler.cli.utils import lister_task_type + + +@pytest.mark.parametrize( + "lister_name,listing_type", [("foo", "full"), ("bar", "incremental")] +) +def test_lister_task_type(lister_name, listing_type): + assert lister_task_type(lister_name, listing_type) == ( + f"list-{lister_name}-{listing_type}" + ) diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py --- a/swh/scheduler/cli/utils.py +++ b/swh/scheduler/cli/utils.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,8 +6,18 @@ # WARNING: do not import unnecessary things here to keep cli startup time under # control +from __future__ import annotations + +from typing import TYPE_CHECKING + import click +if TYPE_CHECKING: + from typing import Dict, List, Optional, Tuple + + from swh.scheduler.interface import SchedulerInterface + + TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler @@ -63,7 +73,7 @@ raise click.ClickException("Invalid argument: {}".format(option)) -def parse_options(options): +def parse_options(options: List[str]) -> Tuple[List[str], Dict]: """Parses options from a CLI as YAML and turns it into Python args and kwargs. @@ -100,3 +110,227 @@ args = [parse_argument(x) for x in options if "=" not in x] kw = {k: parse_argument(v) for (k, v) in kw_pairs} return (args, kw) + + +def get_task_type(scheduler: SchedulerInterface, visit_type: str) -> Optional[Dict]: + "Given a visit type, return its associated task type." + return scheduler.get_task_type(f"load-{visit_type}") + + +def send_to_celery( + scheduler: SchedulerInterface, + visit_type_to_queue: Dict[str, str], + enabled: bool = True, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, + policy: str = "oldest_scheduled_first", + tablesample: Optional[float] = None, +): + """Utility function to read tasks from the scheduler and send those directly to + celery. + + Args: + visit_type_to_queue: Optional mapping of visit/loader type (e.g git, svn, ...) + to queue to send task to. + enabled: Determine whether we want to list enabled or disabled origins. As + default, we want reasonably enabled origins. For some edge case, we might + want the others. + lister_name: Determine the list of origins listed from the lister with name + lister_instance_name: Determine the list of origins listed from the lister + with instance name + policy: the scheduling policy used to select which visits to schedule + tablesample: the percentage of the table on which we run the query + (None: no sampling) + + """ + + from kombu.utils.uuid import uuid + + from swh.scheduler.celery_backend.config import app, get_available_slots + + from ..utils import create_origin_task_dicts + + for visit_type_name, queue_name in visit_type_to_queue.items(): + task_type = get_task_type(scheduler, visit_type_name) + assert task_type is not None + task_name = task_type["backend_name"] + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + + click.echo(f"{num_tasks} slots available in celery queue") + + origins = scheduler.grab_next_visits( + visit_type_name, + num_tasks, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) + + click.echo(f"{len(origins)} visits to send to celery") + for task_dict in create_origin_task_dicts(origins, scheduler): + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + +def pretty_print_list(list, indent=0): + """Pretty-print a list""" + return "".join("%s%r\n" % (" " * indent, item) for item in list) + + +def pretty_print_dict(dict, indent=0): + """Pretty-print a list""" + return "".join( + "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) + for key, value in sorted(dict.items()) + ) + + +def format_dict(d): + """Recursively format date objects in the dict passed as argument""" + import datetime + + ret = {} + for k, v in d.items(): + if isinstance(v, (datetime.date, datetime.datetime)): + v = v.isoformat() + elif isinstance(v, dict): + v = format_dict(v) + ret[k] = v + return ret + + +def pretty_print_run(run, indent=4): + fmt = ( + "{indent}{backend_id} [{status}]\n" + "{indent} scheduled: {scheduled} [{started}:{ended}]" + ) + return fmt.format(indent=" " * indent, **format_dict(run)) + + +def pretty_print_task(task, full=False): + """Pretty-print a task + + If 'full' is True, also print the status and priority fields. + + >>> import datetime + >>> task = { + ... 'id': 1234, + ... 'arguments': { + ... 'args': ['foo', 'bar', True], + ... 'kwargs': {'key': 'value', 'key2': 42}, + ... }, + ... 'current_interval': datetime.timedelta(hours=1), + ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), + ... 'policy': 'oneshot', + ... 'priority': None, + ... 'status': 'next_run_not_scheduled', + ... 'type': 'test_task', + ... } + >>> print(click.unstyle(pretty_print_task(task))) + Task 1234 + Next run: ... (2019-02-21T13:52:35.407818) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Args: + 'foo' + 'bar' + True + Keyword args: + key: 'value' + key2: 42 + + >>> print(click.unstyle(pretty_print_task(task, full=True))) + Task 1234 + Next run: ... (2019-02-21T13:52:35.407818) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Status: next_run_not_scheduled + Priority:\x20 + Args: + 'foo' + 'bar' + True + Keyword args: + key: 'value' + key2: 42 + + """ + import humanize + + next_run = task["next_run"] + lines = [ + "%s %s\n" % (click.style("Task", bold=True), task["id"]), + click.style(" Next run: ", bold=True), + "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), + "\n", + click.style(" Interval: ", bold=True), + str(task["current_interval"]), + "\n", + click.style(" Type: ", bold=True), + task["type"] or "", + "\n", + click.style(" Policy: ", bold=True), + task["policy"] or "", + "\n", + ] + if full: + lines += [ + click.style(" Status: ", bold=True), + task["status"] or "", + "\n", + click.style(" Priority: ", bold=True), + task["priority"] or "", + "\n", + ] + lines += [ + click.style(" Args:\n", bold=True), + pretty_print_list(task["arguments"]["args"], indent=4), + click.style(" Keyword args:\n", bold=True), + pretty_print_dict(task["arguments"]["kwargs"], indent=4), + ] + + return "".join(lines) + + +def task_add( + scheduler: SchedulerInterface, + task_type_name: str, + args: List[str], + kw: Dict, + policy: str, + priority: Optional[str] = None, + next_run: Optional[str] = None, +): + """Add a task task_type_name in the scheduler.""" + from swh.scheduler.utils import utcnow + + task = { + "type": task_type_name, + "policy": policy, + "priority": priority, + "arguments": { + "args": args, + "kwargs": kw, + }, + "next_run": next_run or utcnow(), + } + created = scheduler.create_tasks([task]) + + output = [f"Created {len(created)} tasks\n"] + for task in created: + output.append(pretty_print_task(task)) + + click.echo("\n".join(output)) + + +def lister_task_type(lister_name: str, lister_type: str) -> str: + return f"list-{lister_name}-{lister_type}" diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_cli_add_forge_now.py @@ -0,0 +1,164 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict, Tuple + +import attr +import pytest + +from swh.scheduler.cli.utils import lister_task_type +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.tests.common import TASK_TYPES +from swh.scheduler.tests.test_cli import invoke as basic_invoke + + +def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): + return basic_invoke( + scheduler, args=["add-forge-now", *args], catch_exceptions=catch_exceptions + ) + + +def test_schedule_first_visits_cli_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + args=( + "schedule-first-visits", + "-t", + "unknown-vt0", + "--type-name", + "unknown-visit-type1", + ), + ) + + +@pytest.mark.parametrize( + "cmd_args, subcmd_args", + [ + ([], []), + ([], ["--lister-name", "github", "--lister-instance-name", "github"]), + (["--environment", "staging"], []), + ], +) +def test_schedule_first_visits_cli( + mocker, + swh_scheduler, + swh_scheduler_celery_app, + listed_origins_by_type, + cmd_args, + subcmd_args, +): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + visit_type = next(iter(listed_origins_by_type)) + + # enabled origins by default except when --staging flag is provided + enabled = "staging" not in cmd_args + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins( + (attr.evolve(o, enabled=enabled) for o in origins) + ) + + get_queue_length = mocker.patch( + "swh.scheduler.celery_backend.config.get_queue_length" + ) + get_queue_length.return_value = None + + send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") + send_task.return_value = None + + command_args = ( + cmd_args + ["schedule-first-visits", "--type-name", visit_type] + subcmd_args + ) + + result = invoke(swh_scheduler, args=tuple(command_args)) + assert result.exit_code == 0 + + scheduled_tasks = { + (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list + } + + expected_tasks = { + (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) + for origin in listed_origins_by_type[visit_type] + } + + assert scheduled_tasks == expected_tasks + + +def _create_task_type( + swh_scheduler: SchedulerInterface, lister_name: str, listing_type: str = "full" +) -> Dict: + task_type = { + "type": lister_task_type(lister_name, listing_type), # only relevant bit + "description": f"{listing_type} listing", + "backend_name": "swh.example.backend", + "default_interval": "1 day", + "min_interval": "1 day", + "max_interval": "1 day", + "backoff_factor": "1", + "max_queue_length": "100", + "num_retries": 3, + } + swh_scheduler.create_task_type(task_type) + task_type = swh_scheduler.get_task_type(task_type["type"]) + assert task_type is not None + return task_type + + +@pytest.mark.parametrize("environment", ["staging", "production"]) +def test_schedule_register_lister(swh_scheduler, stored_lister, environment): + # given + assert stored_lister is not None + lister_name = stored_lister.name + # Let's create all possible associated lister task types + full = _create_task_type(swh_scheduler, lister_name, "full") + incremental = _create_task_type(swh_scheduler, lister_name, "incremental") + + # Let's trigger the registering of that lister + result = invoke( + swh_scheduler, + [ + "--environment", + environment, + "register-lister", + lister_name, + "url=https://example.org", + ], + ) + + output = result.output.lstrip() + + expected_msgs = [] + if environment == "production": + # 2 tasks: 1 full + 1 incremental (tomorrow) with recurring policy + expected_msgs = ["Policy: recurring", incremental["type"], "Next run: tomorrow"] + else: + # 1 task full with policy oneshot + expected_msgs = ["Policy: oneshot"] + + # In any case, there is the full listing type too + expected_msgs.append(full["type"]) + + assert len(expected_msgs) > 0 + for msg in expected_msgs: + assert msg in output + + +def test_register_lister_unknown_task_type(swh_scheduler): + """When scheduling unknown task type, the cli should raise.""" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + [ + "register-lister", + "unknown-lister-type-should-raise", + ], + ) diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -114,6 +114,14 @@ assert scheduled_tasks <= all_possible_tasks +def test_send_to_celery_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke(swh_scheduler, args=("send-to-celery", "unknown-visit-type")) + + @pytest.mark.parametrize( "extra_cmd_args", [[], ["--lister-name", "github", "--lister-instance-name", "github"]],