diff --git a/PKG-INFO b/PKG-INFO index ed98fda..c9ad4f8 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,33 +1,33 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 1.3.0 +Version: 1.4.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator License-File: LICENSE License-File: LICENSE.Celery License-File: AUTHORS swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). diff --git a/requirements-test.txt b/requirements-test.txt index 726caf0..2cc88ed 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,11 +1,10 @@ pytest pytest-mock -celery >= 4.3 hypothesis >= 3.11.0 swh.lister swh.storage[testing] types-click types-flask types-pyyaml types-requests types-Deprecated diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index ed98fda..c9ad4f8 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,33 +1,33 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 1.3.0 +Version: 1.4.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator License-File: LICENSE License-File: LICENSE.Celery License-File: AUTHORS swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index 7532bff..caa4ae1 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,134 +1,137 @@ .git-blame-ignore-revs .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE LICENSE.Celery MANIFEST.in Makefile README.md conftest.py mypy.ini pyproject.toml pytest.ini requirements-journal.txt requirements-simulator.txt requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini data/README.md data/elastic-template.json data/update-index-settings.json docs/.gitignore docs/Makefile docs/cli.rst docs/conf.py docs/index.rst docs/simulator.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/cli_utils.py swh/scheduler/exc.py swh/scheduler/interface.py swh/scheduler/journal_client.py swh/scheduler/model.py swh/scheduler/py.typed swh/scheduler/pytest_plugin.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/serializers.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/pika_listener.py swh/scheduler/celery_backend/recurrent_visits.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py +swh/scheduler/cli/add_forge_now.py swh/scheduler/cli/admin.py swh/scheduler/cli/celery_monitor.py swh/scheduler/cli/journal.py swh/scheduler/cli/origin.py swh/scheduler/cli/simulator.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py +swh/scheduler/cli/test_cli_utils.py swh/scheduler/cli/utils.py swh/scheduler/simulator/__init__.py swh/scheduler/simulator/common.py swh/scheduler/simulator/origin_scheduler.py swh/scheduler/simulator/origins.py swh/scheduler/simulator/task_scheduler.py swh/scheduler/sql/10-superuser-init.sql swh/scheduler/sql/30-schema.sql swh/scheduler/sql/40-func.sql swh/scheduler/sql/50-data.sql swh/scheduler/sql/60-indexes.sql swh/scheduler/sql/upgrades/02.sql swh/scheduler/sql/upgrades/03.sql swh/scheduler/sql/upgrades/04.sql swh/scheduler/sql/upgrades/05.sql swh/scheduler/sql/upgrades/06.sql swh/scheduler/sql/upgrades/07.sql swh/scheduler/sql/upgrades/08.sql swh/scheduler/sql/upgrades/09.sql swh/scheduler/sql/upgrades/10.sql swh/scheduler/sql/upgrades/11.sql swh/scheduler/sql/upgrades/12.sql swh/scheduler/sql/upgrades/13.sql swh/scheduler/sql/upgrades/14.sql swh/scheduler/sql/upgrades/15.sql swh/scheduler/sql/upgrades/16.sql swh/scheduler/sql/upgrades/17.sql swh/scheduler/sql/upgrades/18.sql swh/scheduler/sql/upgrades/19.sql swh/scheduler/sql/upgrades/20.sql swh/scheduler/sql/upgrades/23.sql swh/scheduler/sql/upgrades/24.sql swh/scheduler/sql/upgrades/25.sql swh/scheduler/sql/upgrades/26.sql swh/scheduler/sql/upgrades/27.sql swh/scheduler/sql/upgrades/28.sql swh/scheduler/sql/upgrades/29.sql swh/scheduler/sql/upgrades/30-bis.sql swh/scheduler/sql/upgrades/30.sql swh/scheduler/sql/upgrades/31.sql swh/scheduler/sql/upgrades/32.sql swh/scheduler/sql/upgrades/33.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/common.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py +swh/scheduler/tests/test_cli_add_forge_now.py swh/scheduler/tests/test_cli_celery_monitor.py swh/scheduler/tests/test_cli_journal.py swh/scheduler/tests/test_cli_origin.py swh/scheduler/tests/test_cli_task_type.py swh/scheduler/tests/test_common.py swh/scheduler/tests/test_config.py swh/scheduler/tests/test_init.py swh/scheduler/tests/test_journal_client.py swh/scheduler/tests/test_model.py swh/scheduler/tests/test_recurrent_visits.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_simulator.py swh/scheduler/tests/test_utils.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index 51cc624..49de3b2 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,39 +1,38 @@ attrs attrs-strict celery!=5.0.3,>=4.3 click flask humanize importlib_metadata<5.0 pika>=1.1.0 psycopg2 pyyaml requests sentry-sdk setuptools typing-extensions swh.core[db,http]>=2.9 swh.storage>=0.11.1 [journal] swh.journal [simulator] plotille simpy<4,>=3 [testing] pytest pytest-mock -celery>=4.3 hypothesis>=3.11.0 swh.lister swh.storage[testing] types-click types-flask types-pyyaml types-requests types-Deprecated swh.journal plotille simpy<4,>=3 diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py index 8eeadaa..b4cd057 100644 --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -1,102 +1,111 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group # If you're looking for subcommand imports, they are further down this file to # avoid a circular import! @swh_cli_group.group( name="scheduler", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="Configuration file.", ) @click.option( "--database", "-d", default=None, help="Scheduling database DSN (imply cls is 'postgresql')", ) @click.option( "--url", "-u", default=None, help="Scheduler's url access (imply cls is 'remote')" ) @click.option( "--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console" ) @click.pass_context def cli(ctx, config_file, database, url, no_stdout): """Software Heritage Scheduler tools. Use a local scheduler instance by default (plugged to the main scheduler db). """ try: from psycopg2 import OperationalError except ImportError: class OperationalError(Exception): pass from swh.core import config from swh.scheduler import DEFAULT_CONFIG, get_scheduler ctx.ensure_object(dict) logger = logging.getLogger(__name__) scheduler = None conf = config.read(config_file, DEFAULT_CONFIG) if "scheduler" not in conf: raise ValueError("missing 'scheduler' configuration") if database: conf["scheduler"]["cls"] = "postgresql" conf["scheduler"]["db"] = database elif url: conf["scheduler"]["cls"] = "remote" conf["scheduler"]["url"] = url sched_conf = conf["scheduler"] try: logger.debug("Instantiating scheduler with %s", sched_conf) scheduler = get_scheduler(**sched_conf) except (ValueError, OperationalError): # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj["scheduler"] = scheduler ctx.obj["config"] = conf -from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa +from . import ( # noqa + add_forge_now, + admin, + celery_monitor, + journal, + origin, + simulator, + task, + task_type, +) def main(): import click.core click.core.DEPRECATED_HELP_NOTICE = """ DEPRECATED! Please use the command 'swh scheduler'.""" cli.deprecated = True return cli(auto_envvar_prefix="SWH_SCHEDULER") if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py new file mode 100644 index 0000000..e1aad07 --- /dev/null +++ b/swh/scheduler/cli/add_forge_now.py @@ -0,0 +1,163 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import click + +from . import cli + +if TYPE_CHECKING: + from typing import Dict, List, Optional + + +@cli.group("add-forge-now") +@click.option( + "-p", + "--preset", + "preset", + default="production", + type=click.Choice(["production", "staging"]), + help='Determine preset to use, "production" by default.', +) +@click.pass_context +def add_forge_now(ctx, preset): + """Manipulate add-forge-now requests.""" + if not ctx.obj["scheduler"]: + raise ValueError("Scheduler class (local/remote) must be instantiated") + + ctx.obj["preset"] = preset + + +@add_forge_now.command("register-lister") +@click.argument("lister_name", nargs=1, required=True) +@click.argument("options", nargs=-1) +@click.pass_context +def register_lister_cli( + ctx, + lister_name, + options, +): + """Register the lister tasks in the scheduler. + + The specifics of what tasks are registered depends on the add-forge-now --preset + option: + - staging preset: a single oneshot full listing task is scheduled. This "full" + listing is limited to 3 pages and 10 origins per page. The origins are recorded as + disabled (to avoid their recurrent loading). + - production preset: a recurrent full and incremental (if the loader has such a + task) listing task are scheduled. The first run of the full lister is scheduled + immediately, and the first run of the incremental lister is delayed by a day. + + """ + from .utils import lister_task_type, parse_options, task_add + + scheduler = ctx.obj["scheduler"] + preset = ctx.obj["preset"] + + # Map the associated task types for the lister + task_type_names: Dict[str, str] = { + listing_type: lister_task_type(lister_name, listing_type) + for listing_type in ["full", "incremental"] + } + + task_types: Dict[str, Dict] = {} + for listing_type, task_type_name in task_type_names.items(): + task_type = scheduler.get_task_type(task_type_name) + if task_type: + task_types[listing_type] = task_type + + if not task_types: + raise ValueError(f"Unknown lister type {lister_name}.") + + (args, kw) = parse_options(options) + + # Recurring policy on production + if preset == "production": + policy = "recurring" + else: # staging, "full" but limited listing as a oneshot + policy = "oneshot" + kw.update({"max_pages": 3, "max_origins_per_page": 10, "enable_origins": False}) + # We want a "full" listing in production if both incremental and full exists + if "full" in task_types: + task_types.pop("incremental", None) + + from datetime import timedelta + + from swh.scheduler.utils import utcnow + + for listing_type, task_type in task_types.items(): + now = utcnow() + next_run = now if listing_type == "full" else now + timedelta(days=1) + task_add( + scheduler, + task_type_name=task_type["type"], + args=args, + kw=kw, + policy=policy, + next_run=next_run, + ) + + +@add_forge_now.command("schedule-first-visits") +@click.option( + "--type-name", + "-t", + "visit_type_names", + help="Visit/loader type (can be provided multiple times)", + type=str, + multiple=True, +) +@click.option( + "--lister-name", + default=None, + help="Limit origins to those listed from lister with provided name", +) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", +) +@click.pass_context +def schedule_first_visits_cli( + ctx, + visit_type_names: List[str], + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, +): + """Send next origin visits of VISIT_TYPE_NAME(S) loader to celery, filling the + associated add_forge_now queue(s). + + """ + from .utils import get_task_type, send_to_celery + + scheduler = ctx.obj["scheduler"] + preset = ctx.obj["preset"] + + visit_type_to_queue: Dict[str, str] = {} + unknown_task_types = [] + for visit_type_name in visit_type_names: + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + unknown_task_types.append(visit_type_name) + continue + queue_name = task_type["backend_name"] + visit_type_to_queue[visit_type_name] = f"add_forge_now:{queue_name}" + + if unknown_task_types: + raise ValueError(f"Unknown task types {','.join(unknown_task_types)}.") + + send_to_celery( + scheduler, + visit_type_to_queue=visit_type_to_queue, + enabled=preset == "production", + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py index 97634af..dfccdcd 100644 --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -1,271 +1,253 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations -from typing import TYPE_CHECKING, Iterable, List, Optional +from typing import TYPE_CHECKING import click from . import cli from ..utils import create_origin_task_dicts if TYPE_CHECKING: + from typing import Iterable, List, Optional from uuid import UUID from ..interface import SchedulerInterface from ..model import ListedOrigin @cli.group("origin") @click.pass_context def origin(ctx): """Manipulate listed origins.""" if not ctx.obj["scheduler"]: raise ValueError("Scheduler class (local/remote) must be instantiated") def format_origins( origins: List[ListedOrigin], fields: Optional[List[str]] = None, with_header: bool = True, ) -> Iterable[str]: """Format a list of origins as CSV. Arguments: origins: list of origins to output fields: optional list of fields to output (defaults to all fields) with_header: if True, output a CSV header. """ import csv from io import StringIO import attr from ..model import ListedOrigin expected_fields = [field.name for field in attr.fields(ListedOrigin)] if not fields: fields = expected_fields unknown_fields = set(fields) - set(expected_fields) if unknown_fields: raise ValueError( "Unknown ListedOrigin field(s): %s" % ", ".join(unknown_fields) ) output = StringIO() writer = csv.writer(output) def csv_row(data): """Return a single CSV-formatted row. We clear the output buffer after we're done to keep it reasonably sized.""" writer.writerow(data) output.seek(0) ret = output.read().rstrip() output.seek(0) output.truncate() return ret if with_header: yield csv_row(fields) for origin in origins: yield csv_row(str(getattr(origin, field)) for field in fields) @origin.command("grab-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--fields", "-f", default=None, help="Listed origin fields to print on output" ) @click.option( "--with-header/--without-header", is_flag=True, default=True, help="Print the CSV header?", ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def grab_next( ctx, policy: str, fields: Optional[str], with_header: bool, type: str, count: int ): """Grab the next COUNT origins to visit using the TYPE loader from the listed origins table.""" if fields: parsed_fields: Optional[List[str]] = fields.split(",") else: parsed_fields = None scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) for line in format_origins(origins, fields=parsed_fields, with_header=with_header): click.echo(line) @origin.command("schedule-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def schedule_next(ctx, policy: str, type: str, count: int): """Send the next COUNT origin visits of the TYPE loader to the scheduler as one-shot tasks.""" from ..utils import utcnow - from .task import pretty_print_task + from .utils import pretty_print_task scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) created = scheduler.create_tasks( [ { **task_dict, "policy": "oneshot", "next_run": utcnow(), "retries_left": 1, } for task_dict in create_origin_task_dicts(origins, scheduler) ] ) output = ["Created %d tasks\n" % len(created)] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager("\n".join(output)) @origin.command("send-to-celery") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--queue", "-q", help="Target celery queue", type=str, ) @click.option( "--tablesample", help="Table sampling percentage", type=float, ) @click.option( "--only-enabled/--only-disabled", "enabled", is_flag=True, default=True, help="""Determine whether we want to scheduled enabled or disabled origins. As default, we want to reasonably deal with enabled origins. For some edge case though, we might want the disabled ones.""", ) @click.option( "--lister-name", default=None, help="Limit origins to those listed from lister with provided name", ) @click.option( "--lister-instance-name", default=None, help="Limit origins to those listed from lister with instance name", ) -@click.argument("type", type=str) +@click.argument("visit_type_name", type=str) @click.pass_context -def send_to_celery( +def send_to_celery_cli( ctx, policy: str, queue: Optional[str], tablesample: Optional[float], - type: str, + visit_type_name: str, enabled: bool, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, ): - """Send the next origin visits of the TYPE loader to celery, filling the queue.""" - from kombu.utils.uuid import uuid + """Send next origin visits of VISIT_TYPE_NAME to celery, filling the queue.""" - from swh.scheduler.celery_backend.config import app, get_available_slots + from .utils import get_task_type, send_to_celery scheduler = ctx.obj["scheduler"] - task_type = scheduler.get_task_type(f"load-{type}") - - task_name = task_type["backend_name"] - queue_name = queue or task_name - - num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + raise ValueError(f"Unknown task type {task_type}.") - click.echo(f"{num_tasks} slots available in celery queue") + queue_name = queue or task_type["backend_name"] - lister_uuid: Optional[str] = None - if lister_name and lister_instance_name: - lister = scheduler.get_lister(lister_name, lister_instance_name) - if lister: - lister_uuid = lister.id - - origins = scheduler.grab_next_visits( - type, - num_tasks, + send_to_celery( + scheduler, + visit_type_to_queue={visit_type_name: queue_name}, policy=policy, tablesample=tablesample, enabled=enabled, - lister_uuid=lister_uuid, + lister_name=lister_name, + lister_instance_name=lister_instance_name, ) - click.echo(f"{len(origins)} visits to send to celery") - for task_dict in create_origin_task_dicts(origins, scheduler): - app.send_task( - task_name, - task_id=uuid(), - args=task_dict["arguments"]["args"], - kwargs=task_dict["arguments"]["kwargs"], - queue=queue_name, - ) - @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( "--instance", default=None, help="Only update metrics for this lister instance" ) @click.pass_context def update_metrics(ctx, lister: Optional[str], instance: Optional[str]): """Update the scheduler metrics on listed origins. Examples: swh scheduler origin update-metrics swh scheduler origin update-metrics --lister github swh scheduler origin update-metrics --lister phabricator --instance llvm """ import json import attr scheduler: SchedulerInterface = ctx.obj["scheduler"] lister_id: Optional[UUID] = None if lister is not None: lister_instance = scheduler.get_lister(name=lister, instance_name=instance) if not lister_instance: click.echo(f"Lister not found: {lister} instance={instance}") ctx.exit(2) assert False # for mypy lister_id = lister_instance.id def dictify_metrics(d): return {k: str(v) for (k, v) in attr.asdict(d).items()} ret = scheduler.update_metrics(lister_id=lister_id) click.echo(json.dumps(list(map(dictify_metrics, ret)), indent=4, sort_keys=True)) diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py index 4ee1860..bc37fb4 100644 --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -1,599 +1,470 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations # WARNING: do not import unnecessary things here to keep cli startup time under # control import locale from typing import TYPE_CHECKING, Iterator, List, Optional import click from . import cli if TYPE_CHECKING: import datetime # importing swh.storage.interface triggers the load of 300+ modules, so... import swh.model.model from swh.storage.interface import StorageInterface locale.setlocale(locale.LC_ALL, "") CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) DATETIME = click.DateTime() -def format_dict(d): - """Recursively format date objects in the dict passed as argument""" - import datetime - - ret = {} - for k, v in d.items(): - if isinstance(v, (datetime.date, datetime.datetime)): - v = v.isoformat() - elif isinstance(v, dict): - v = format_dict(v) - ret[k] = v - return ret - - -def pretty_print_list(list, indent=0): - """Pretty-print a list""" - return "".join("%s%r\n" % (" " * indent, item) for item in list) - - -def pretty_print_dict(dict, indent=0): - """Pretty-print a list""" - return "".join( - "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) - for key, value in sorted(dict.items()) - ) - - -def pretty_print_run(run, indent=4): - fmt = ( - "{indent}{backend_id} [{status}]\n" - "{indent} scheduled: {scheduled} [{started}:{ended}]" - ) - return fmt.format(indent=" " * indent, **format_dict(run)) - - -def pretty_print_task(task, full=False): - """Pretty-print a task - - If 'full' is True, also print the status and priority fields. - - >>> import datetime - >>> task = { - ... 'id': 1234, - ... 'arguments': { - ... 'args': ['foo', 'bar', True], - ... 'kwargs': {'key': 'value', 'key2': 42}, - ... }, - ... 'current_interval': datetime.timedelta(hours=1), - ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), - ... 'policy': 'oneshot', - ... 'priority': None, - ... 'status': 'next_run_not_scheduled', - ... 'type': 'test_task', - ... } - >>> print(click.unstyle(pretty_print_task(task))) - Task 1234 - Next run: ... (2019-02-21T13:52:35.407818) - Interval: 1:00:00 - Type: test_task - Policy: oneshot - Args: - 'foo' - 'bar' - True - Keyword args: - key: 'value' - key2: 42 - - >>> print(click.unstyle(pretty_print_task(task, full=True))) - Task 1234 - Next run: ... (2019-02-21T13:52:35.407818) - Interval: 1:00:00 - Type: test_task - Policy: oneshot - Status: next_run_not_scheduled - Priority:\x20 - Args: - 'foo' - 'bar' - True - Keyword args: - key: 'value' - key2: 42 - - """ - import humanize - - next_run = task["next_run"] - lines = [ - "%s %s\n" % (click.style("Task", bold=True), task["id"]), - click.style(" Next run: ", bold=True), - "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), - "\n", - click.style(" Interval: ", bold=True), - str(task["current_interval"]), - "\n", - click.style(" Type: ", bold=True), - task["type"] or "", - "\n", - click.style(" Policy: ", bold=True), - task["policy"] or "", - "\n", - ] - if full: - lines += [ - click.style(" Status: ", bold=True), - task["status"] or "", - "\n", - click.style(" Priority: ", bold=True), - task["priority"] or "", - "\n", - ] - lines += [ - click.style(" Args:\n", bold=True), - pretty_print_list(task["arguments"]["args"], indent=4), - click.style(" Keyword args:\n", bold=True), - pretty_print_dict(task["arguments"]["kwargs"], indent=4), - ] - - return "".join(lines) - - @cli.group("task") @click.pass_context def task(ctx): """Manipulate tasks.""" pass @task.command("schedule") @click.option( "--columns", "-c", multiple=True, default=["type", "args", "kwargs", "next_run"], type=click.Choice(["type", "args", "kwargs", "policy", "next_run"]), help="columns present in the CSV file", ) @click.option("--delimiter", "-d", default=",") @click.argument("file", type=click.File(encoding="utf-8")) @click.pass_context def schedule_tasks(ctx, columns, delimiter, file): """Schedule tasks from a CSV input file. The following columns are expected, and can be set through the -c option: - type: the type of the task to be scheduled (mandatory) - args: the arguments passed to the task (JSON list, defaults to an empty list) - kwargs: the keyword arguments passed to the task (JSON object, defaults to an empty dict) - next_run: the date at which the task should run (datetime, defaults to now) The CSV can be read either from a named file, or from stdin (use - as filename). Use sample: cat scheduling-task.txt | \ python3 -m swh.scheduler.cli \ --database 'service=swh-scheduler-dev' \ task schedule \ --columns type --columns kwargs --columns policy \ --delimiter ';' - """ import csv import json from swh.scheduler.utils import utcnow + from .utils import pretty_print_task + tasks = [] now = utcnow() scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") reader = csv.reader(file, delimiter=delimiter) for line in reader: task = dict(zip(columns, line)) args = json.loads(task.pop("args", "[]")) kwargs = json.loads(task.pop("kwargs", "{}")) task["arguments"] = { "args": args, "kwargs": kwargs, } task["next_run"] = task.get("next_run", now) tasks.append(task) created = scheduler.create_tasks(tasks) output = [ "Created %d tasks\n" % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager("\n".join(output)) @task.command("add") @click.argument("task_type_name", nargs=1, required=True) @click.argument("options", nargs=-1) @click.option( "--policy", "-p", default="recurring", type=click.Choice(["recurring", "oneshot"]) ) @click.option( "--priority", "-P", default=None, type=click.Choice(["low", "normal", "high"]) ) @click.option("--next-run", "-n", default=None) @click.pass_context def schedule_task(ctx, task_type_name, options, policy, priority, next_run): """Schedule one task from arguments. The first argument is the name of the task type. Flag options (policy, priority) are task configuration. Further options are positional and keyword argument(s) of the task, in YAML format. Keyword args are of the form key=value. Usage sample: swh-scheduler --database 'service=swh-scheduler' \ task add list-pypi swh-scheduler --database 'service=swh-scheduler' \ task add list-debian-distribution --policy=oneshot distribution=stretch Note: if the priority is not given, the task won't have the priority set, which is considered as the lowest priority level. """ - from swh.scheduler.utils import utcnow - - from .utils import parse_options + from .utils import parse_options, task_add scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") - if scheduler.get_task_type(task_type_name) is None: - raise ValueError(f"Unknown task type {task_type_name}.") - - now = utcnow() + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown task name {task_type_name}.") (args, kw) = parse_options(options) - task = { - "type": task_type_name, - "policy": policy, - "priority": priority, - "arguments": { - "args": args, - "kwargs": kw, - }, - "next_run": next_run or now, - } - created = scheduler.create_tasks([task]) - - output = [ - "Created %d tasks\n" % len(created), - ] - for task in created: - output.append(pretty_print_task(task)) - - click.echo("\n".join(output)) + task_add( + scheduler, + task_type_name=task_type_name, + policy=policy, + priority=priority, + next_run=next_run, + args=args, + kw=kw, + ) def iter_origins( # use string annotations to prevent some pkg loading storage: StorageInterface, page_token: Optional[str] = None, ) -> Iterator[swh.model.model.Origin]: """Iterate over origins in the storage. Optionally starting from page_token. This logs regularly an info message during pagination with the page_token. This, in order to feed it back to the cli if the process interrupted. Yields origin model objects from the storage """ while True: page_result = storage.origin_list(page_token=page_token) page_token = page_result.next_page_token yield from page_result.results if not page_token: break click.echo(f"page_token: {page_token}\n") @task.command("schedule_origins") @click.argument("type", nargs=1, required=True) @click.argument("options", nargs=-1) @click.option( "--batch-size", "-b", "origin_batch_size", default=10, show_default=True, type=int, help="Number of origins per task", ) @click.option( "--page-token", default=0, show_default=True, type=str, help="Only schedule tasks for origins whose ID is greater", ) @click.option( "--limit", default=None, type=int, help="Limit the tasks scheduling up to this number of tasks", ) @click.option("--storage-url", "-g", help="URL of the (graph) storage API") @click.option( "--dry-run/--no-dry-run", is_flag=True, default=False, help="List only what would be scheduled.", ) @click.pass_context def schedule_origin_metadata_index( ctx, type, options, storage_url, origin_batch_size, page_token, limit, dry_run ): """Schedules tasks for origins that are already known. The first argument is the name of the task type, further ones are keyword argument(s) of the task in the form key=value, where value is in YAML format. Usage sample: swh-scheduler --database 'service=swh-scheduler' \ task schedule_origins index-origin-metadata """ from itertools import islice from swh.storage import get_storage from .utils import parse_options, schedule_origin_batches scheduler = ctx.obj["scheduler"] storage = get_storage("remote", url=storage_url) if dry_run: scheduler = None (args, kw) = parse_options(options) if args: raise click.ClickException("Only keywords arguments are allowed.") origins = iter_origins(storage, page_token=page_token) if limit: origins = islice(origins, limit) origin_urls = (origin.url for origin in origins) schedule_origin_batches(scheduler, type, origin_urls, origin_batch_size, kw) @task.command("list-pending") @click.argument("task-types", required=True, nargs=-1) @click.option( "--limit", "-l", "num_tasks", required=False, type=click.INT, help="The maximum number of tasks to fetch", ) @click.option( "--before", "-b", required=False, type=DATETIME, help="List all jobs supposed to run before the given date", ) @click.pass_context def list_pending_tasks(ctx, task_types, num_tasks, before): """List tasks with no priority that are going to be run. You can override the number of tasks to fetch with the --limit flag. """ + from .utils import pretty_print_task + scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") output = [] for task_type in task_types: pending = scheduler.peek_ready_tasks( task_type, timestamp=before, num_tasks=num_tasks, ) output.append("Found %d %s tasks\n" % (len(pending), task_type)) for task in pending: output.append(pretty_print_task(task)) click.echo("\n".join(output)) @task.command("list") @click.option( "--task-id", "-i", default=None, multiple=True, metavar="ID", help="List only tasks whose id is ID.", ) @click.option( "--task-type", "-t", default=None, multiple=True, metavar="TYPE", help="List only tasks of type TYPE", ) @click.option( "--limit", "-l", required=False, type=click.INT, help="The maximum number of tasks to fetch.", ) @click.option( "--status", "-s", multiple=True, metavar="STATUS", type=click.Choice( ("next_run_not_scheduled", "next_run_scheduled", "completed", "disabled") ), default=None, help="List tasks whose status is STATUS.", ) @click.option( "--policy", "-p", default=None, type=click.Choice(["recurring", "oneshot"]), help="List tasks whose policy is POLICY.", ) @click.option( "--priority", "-P", default=None, multiple=True, type=click.Choice(["all", "low", "normal", "high"]), help="List tasks whose priority is PRIORITY.", ) @click.option( "--before", "-b", required=False, type=DATETIME, metavar="DATETIME", help="Limit to tasks supposed to run before the given date.", ) @click.option( "--after", "-a", required=False, type=DATETIME, metavar="DATETIME", help="Limit to tasks supposed to run after the given date.", ) @click.option( "--list-runs", "-r", is_flag=True, default=False, help="Also list past executions of each task.", ) @click.pass_context def list_tasks( ctx, task_id, task_type, limit, status, policy, priority, before, after, list_runs ): """List tasks.""" from operator import itemgetter + from .utils import pretty_print_run, pretty_print_task + scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") if not task_type: task_type = [x["type"] for x in scheduler.get_task_types()] # if task_id is not given, default value for status is # 'next_run_not_scheduled' # if task_id is given, default status is 'all' if task_id is None and status is None: status = ["next_run_not_scheduled"] if status and "all" in status: status = None if priority and "all" in priority: priority = None output = [] tasks = scheduler.search_tasks( task_id=task_id, task_type=task_type, status=status, priority=priority, policy=policy, before=before, after=after, limit=limit, ) if list_runs: runs = {t["id"]: [] for t in tasks} for r in scheduler.get_task_runs([task["id"] for task in tasks]): runs[r["task"]].append(r) else: runs = {} output.append("Found %d tasks\n" % (len(tasks))) for task in sorted(tasks, key=itemgetter("id")): output.append(pretty_print_task(task, full=True)) if runs.get(task["id"]): output.append(click.style(" Executions:", bold=True)) for run in sorted(runs[task["id"]], key=itemgetter("id")): output.append(pretty_print_run(run, indent=4)) click.echo("\n".join(output)) @task.command("respawn") @click.argument("task-ids", required=True, nargs=-1) @click.option( "--next-run", "-n", required=False, type=DATETIME, metavar="DATETIME", default=None, help="Re spawn the selected tasks at this date", ) @click.pass_context def respawn_tasks(ctx, task_ids: List[str], next_run: datetime.datetime): """Respawn tasks. Respawn tasks given by their ids (see the 'task list' command to find task ids) at the given date (immediately by default). Eg. swh-scheduler task respawn 1 3 12 """ from swh.scheduler.utils import utcnow scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") if next_run is None: next_run = utcnow() output = [] task_ids_int = [int(id_) for id_ in task_ids] scheduler.set_status_tasks( task_ids_int, status="next_run_not_scheduled", next_run=next_run ) output.append("Respawn tasks %s\n" % (task_ids_int,)) click.echo("\n".join(output)) diff --git a/swh/scheduler/cli/test_cli_utils.py b/swh/scheduler/cli/test_cli_utils.py new file mode 100644 index 0000000..755407e --- /dev/null +++ b/swh/scheduler/cli/test_cli_utils.py @@ -0,0 +1,17 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.scheduler.cli.utils import lister_task_type + + +@pytest.mark.parametrize( + "lister_name,listing_type", [("foo", "full"), ("bar", "incremental")] +) +def test_lister_task_type(lister_name, listing_type): + assert lister_task_type(lister_name, listing_type) == ( + f"list-{lister_name}-{listing_type}" + ) diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py index 4832212..cb2e865 100644 --- a/swh/scheduler/cli/utils.py +++ b/swh/scheduler/cli/utils.py @@ -1,102 +1,336 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control +from __future__ import annotations + +from typing import TYPE_CHECKING + import click +if TYPE_CHECKING: + from typing import Dict, List, Optional, Tuple + + from swh.scheduler.interface import SchedulerInterface + + TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler def schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs): from itertools import islice from swh.scheduler.utils import create_task_dict nb_origins = 0 nb_tasks = 0 while True: task_batch = [] for _ in range(TASK_BATCH_SIZE): # Group origins origin_batch = [] for origin in islice(origins, origin_batch_size): origin_batch.append(origin) nb_origins += len(origin_batch) if not origin_batch: break # Create a task for these origins args = [origin_batch] task_dict = create_task_dict(task_type, "oneshot", *args, **kwargs) task_batch.append(task_dict) # Schedule a batch of tasks if not task_batch: break nb_tasks += len(task_batch) if scheduler: scheduler.create_tasks(task_batch) click.echo("Scheduled %d tasks (%d origins)." % (nb_tasks, nb_origins)) # Print final status. if nb_tasks: click.echo("Done.") else: click.echo("Nothing to do (no origin metadata matched the criteria).") def parse_argument(option): import yaml if option == "": # yaml.safe_load("") returns None return "" try: return yaml.safe_load(option) except Exception: raise click.ClickException("Invalid argument: {}".format(option)) -def parse_options(options): +def parse_options(options: List[str]) -> Tuple[List[str], Dict]: """Parses options from a CLI as YAML and turns it into Python args and kwargs. >>> parse_options([]) ([], {}) >>> parse_options(['foo', 'bar']) (['foo', 'bar'], {}) >>> parse_options(['[foo, bar]']) ([['foo', 'bar']], {}) >>> parse_options(['"foo"', '"bar"']) (['foo', 'bar'], {}) >>> parse_options(['foo="bar"']) ([], {'foo': 'bar'}) >>> parse_options(['"foo"', 'bar="baz"']) (['foo'], {'bar': 'baz'}) >>> parse_options(['42', 'bar=False']) ([42], {'bar': False}) >>> parse_options(['42', 'bar=false']) ([42], {'bar': False}) >>> parse_options(['foo', '']) (['foo', ''], {}) >>> parse_options(['foo', 'bar=']) (['foo'], {'bar': ''}) >>> parse_options(['foo', 'null']) (['foo', None], {}) >>> parse_options(['foo', 'bar=null']) (['foo'], {'bar': None}) >>> parse_options(['42', '"foo']) Traceback (most recent call last): ... click.exceptions.ClickException: Invalid argument: "foo """ kw_pairs = [x.split("=", 1) for x in options if "=" in x] args = [parse_argument(x) for x in options if "=" not in x] kw = {k: parse_argument(v) for (k, v) in kw_pairs} return (args, kw) + + +def get_task_type(scheduler: SchedulerInterface, visit_type: str) -> Optional[Dict]: + "Given a visit type, return its associated task type." + return scheduler.get_task_type(f"load-{visit_type}") + + +def send_to_celery( + scheduler: SchedulerInterface, + visit_type_to_queue: Dict[str, str], + enabled: bool = True, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, + policy: str = "oldest_scheduled_first", + tablesample: Optional[float] = None, +): + """Utility function to read tasks from the scheduler and send those directly to + celery. + + Args: + visit_type_to_queue: Optional mapping of visit/loader type (e.g git, svn, ...) + to queue to send task to. + enabled: Determine whether we want to list enabled or disabled origins. As + default, we want reasonably enabled origins. For some edge case, we might + want the others. + lister_name: Determine the list of origins listed from the lister with name + lister_instance_name: Determine the list of origins listed from the lister + with instance name + policy: the scheduling policy used to select which visits to schedule + tablesample: the percentage of the table on which we run the query + (None: no sampling) + + """ + + from kombu.utils.uuid import uuid + + from swh.scheduler.celery_backend.config import app, get_available_slots + + from ..utils import create_origin_task_dicts + + for visit_type_name, queue_name in visit_type_to_queue.items(): + task_type = get_task_type(scheduler, visit_type_name) + assert task_type is not None + task_name = task_type["backend_name"] + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + + click.echo(f"{num_tasks} slots available in celery queue") + + origins = scheduler.grab_next_visits( + visit_type_name, + num_tasks, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) + + click.echo(f"{len(origins)} visits to send to celery") + for task_dict in create_origin_task_dicts(origins, scheduler): + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + +def pretty_print_list(list, indent=0): + """Pretty-print a list""" + return "".join("%s%r\n" % (" " * indent, item) for item in list) + + +def pretty_print_dict(dict, indent=0): + """Pretty-print a list""" + return "".join( + "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) + for key, value in sorted(dict.items()) + ) + + +def format_dict(d): + """Recursively format date objects in the dict passed as argument""" + import datetime + + ret = {} + for k, v in d.items(): + if isinstance(v, (datetime.date, datetime.datetime)): + v = v.isoformat() + elif isinstance(v, dict): + v = format_dict(v) + ret[k] = v + return ret + + +def pretty_print_run(run, indent=4): + fmt = ( + "{indent}{backend_id} [{status}]\n" + "{indent} scheduled: {scheduled} [{started}:{ended}]" + ) + return fmt.format(indent=" " * indent, **format_dict(run)) + + +def pretty_print_task(task, full=False): + """Pretty-print a task + + If 'full' is True, also print the status and priority fields. + + >>> import datetime + >>> task = { + ... 'id': 1234, + ... 'arguments': { + ... 'args': ['foo', 'bar', True], + ... 'kwargs': {'key': 'value', 'key2': 42}, + ... }, + ... 'current_interval': datetime.timedelta(hours=1), + ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), + ... 'policy': 'oneshot', + ... 'priority': None, + ... 'status': 'next_run_not_scheduled', + ... 'type': 'test_task', + ... } + >>> print(click.unstyle(pretty_print_task(task))) + Task 1234 + Next run: ... (2019-02-21T13:52:35.407818) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Args: + 'foo' + 'bar' + True + Keyword args: + key: 'value' + key2: 42 + + >>> print(click.unstyle(pretty_print_task(task, full=True))) + Task 1234 + Next run: ... (2019-02-21T13:52:35.407818) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Status: next_run_not_scheduled + Priority:\x20 + Args: + 'foo' + 'bar' + True + Keyword args: + key: 'value' + key2: 42 + + """ + import humanize + + next_run = task["next_run"] + lines = [ + "%s %s\n" % (click.style("Task", bold=True), task["id"]), + click.style(" Next run: ", bold=True), + "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), + "\n", + click.style(" Interval: ", bold=True), + str(task["current_interval"]), + "\n", + click.style(" Type: ", bold=True), + task["type"] or "", + "\n", + click.style(" Policy: ", bold=True), + task["policy"] or "", + "\n", + ] + if full: + lines += [ + click.style(" Status: ", bold=True), + task["status"] or "", + "\n", + click.style(" Priority: ", bold=True), + task["priority"] or "", + "\n", + ] + lines += [ + click.style(" Args:\n", bold=True), + pretty_print_list(task["arguments"]["args"], indent=4), + click.style(" Keyword args:\n", bold=True), + pretty_print_dict(task["arguments"]["kwargs"], indent=4), + ] + + return "".join(lines) + + +def task_add( + scheduler: SchedulerInterface, + task_type_name: str, + args: List[str], + kw: Dict, + policy: str, + priority: Optional[str] = None, + next_run: Optional[str] = None, +): + """Add a task task_type_name in the scheduler.""" + from swh.scheduler.utils import utcnow + + task = { + "type": task_type_name, + "policy": policy, + "priority": priority, + "arguments": { + "args": args, + "kwargs": kw, + }, + "next_run": next_run or utcnow(), + } + created = scheduler.create_tasks([task]) + + output = [f"Created {len(created)} tasks\n"] + for task in created: + output.append(pretty_print_task(task)) + + click.echo("\n".join(output)) + + +def lister_task_type(lister_name: str, lister_type: str) -> str: + return f"list-{lister_name}-{lister_type}" diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py new file mode 100644 index 0000000..69fe9e1 --- /dev/null +++ b/swh/scheduler/tests/test_cli_add_forge_now.py @@ -0,0 +1,164 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict, Tuple + +import attr +import pytest + +from swh.scheduler.cli.utils import lister_task_type +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.tests.common import TASK_TYPES +from swh.scheduler.tests.test_cli import invoke as basic_invoke + + +def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): + return basic_invoke( + scheduler, args=["add-forge-now", *args], catch_exceptions=catch_exceptions + ) + + +def test_schedule_first_visits_cli_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + args=( + "schedule-first-visits", + "-t", + "unknown-vt0", + "--type-name", + "unknown-visit-type1", + ), + ) + + +@pytest.mark.parametrize( + "cmd_args, subcmd_args", + [ + ([], []), + ([], ["--lister-name", "github", "--lister-instance-name", "github"]), + (["--preset", "staging"], []), + ], +) +def test_schedule_first_visits_cli( + mocker, + swh_scheduler, + swh_scheduler_celery_app, + listed_origins_by_type, + cmd_args, + subcmd_args, +): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + visit_type = next(iter(listed_origins_by_type)) + + # enabled origins by default except when --staging flag is provided + enabled = "staging" not in cmd_args + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins( + (attr.evolve(o, enabled=enabled) for o in origins) + ) + + get_queue_length = mocker.patch( + "swh.scheduler.celery_backend.config.get_queue_length" + ) + get_queue_length.return_value = None + + send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") + send_task.return_value = None + + command_args = ( + cmd_args + ["schedule-first-visits", "--type-name", visit_type] + subcmd_args + ) + + result = invoke(swh_scheduler, args=tuple(command_args)) + assert result.exit_code == 0 + + scheduled_tasks = { + (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list + } + + expected_tasks = { + (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) + for origin in listed_origins_by_type[visit_type] + } + + assert scheduled_tasks == expected_tasks + + +def _create_task_type( + swh_scheduler: SchedulerInterface, lister_name: str, listing_type: str = "full" +) -> Dict: + task_type = { + "type": lister_task_type(lister_name, listing_type), # only relevant bit + "description": f"{listing_type} listing", + "backend_name": "swh.example.backend", + "default_interval": "1 day", + "min_interval": "1 day", + "max_interval": "1 day", + "backoff_factor": "1", + "max_queue_length": "100", + "num_retries": 3, + } + swh_scheduler.create_task_type(task_type) + task_type = swh_scheduler.get_task_type(task_type["type"]) + assert task_type is not None + return task_type + + +@pytest.mark.parametrize("preset", ["staging", "production"]) +def test_schedule_register_lister(swh_scheduler, stored_lister, preset): + # given + assert stored_lister is not None + lister_name = stored_lister.name + # Let's create all possible associated lister task types + full = _create_task_type(swh_scheduler, lister_name, "full") + incremental = _create_task_type(swh_scheduler, lister_name, "incremental") + + # Let's trigger the registering of that lister + result = invoke( + swh_scheduler, + [ + "--preset", + preset, + "register-lister", + lister_name, + "url=https://example.org", + ], + ) + + output = result.output.lstrip() + + expected_msgs = [] + if preset == "production": + # 2 tasks: 1 full + 1 incremental (tomorrow) with recurring policy + expected_msgs = ["Policy: recurring", incremental["type"], "Next run: tomorrow"] + else: + # 1 task full with policy oneshot + expected_msgs = ["Policy: oneshot"] + + # In any case, there is the full listing type too + expected_msgs.append(full["type"]) + + assert len(expected_msgs) > 0 + for msg in expected_msgs: + assert msg in output + + +def test_register_lister_unknown_task_type(swh_scheduler): + """When scheduling unknown task type, the cli should raise.""" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + [ + "register-lister", + "unknown-lister-type-should-raise", + ], + ) diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py index 1a4949e..b6bb85c 100644 --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -1,169 +1,177 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Tuple import pytest from swh.scheduler.cli.origin import format_origins from swh.scheduler.tests.common import TASK_TYPES from swh.scheduler.tests.test_cli import invoke as basic_invoke def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): return basic_invoke( scheduler, args=["origin", *args], catch_exceptions=catch_exceptions ) def test_cli_origin(swh_scheduler): """Check that swh scheduler origin returns its help text""" result = invoke(swh_scheduler) assert "Commands:" in result.stdout def test_format_origins_basic(listed_origins): listed_origins = listed_origins[:100] basic_output = list(format_origins(listed_origins)) # 1 header line + all origins assert len(basic_output) == len(listed_origins) + 1 no_header_output = list(format_origins(listed_origins, with_header=False)) assert basic_output[1:] == no_header_output def test_format_origins_fields_unknown(listed_origins): listed_origins = listed_origins[:10] it = format_origins(listed_origins, fields=["unknown_field"]) with pytest.raises(ValueError, match="unknown_field"): next(it) def test_format_origins_fields(listed_origins): listed_origins = listed_origins[:10] fields = ["lister_id", "url", "visit_type"] output = list(format_origins(listed_origins, fields=fields)) assert output[0] == ",".join(fields) for i, origin in enumerate(listed_origins): assert output[i + 1] == f"{origin.lister_id},{origin.url},{origin.visit_type}" def test_grab_next(swh_scheduler, listed_origins_by_type): NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested # XXX: should test all of 'listed_origins_by_type' here... visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("grab-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 out_lines = result.stdout.splitlines() assert len(out_lines) == NUM_RESULTS + 1 fields = out_lines[0].split(",") returned_origins = [dict(zip(fields, line.split(","))) for line in out_lines[1:]] # Check that we've received origins we had listed in the first place assert set(origin["url"] for origin in returned_origins) <= set( origin.url for origin in listed_origins_by_type[visit_type] ) def test_schedule_next(swh_scheduler, listed_origins_by_type): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("schedule-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 # pull all tasks out of the scheduler tasks = swh_scheduler.search_tasks() assert len(tasks) == NUM_RESULTS scheduled_tasks = { (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks } all_possible_tasks = { (f"load-{origin.visit_type}", origin.url) for origin in listed_origins_by_type[visit_type] } assert scheduled_tasks <= all_possible_tasks +def test_send_to_celery_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke(swh_scheduler, args=("send-to-celery", "unknown-visit-type")) + + @pytest.mark.parametrize( "extra_cmd_args", [[], ["--lister-name", "github", "--lister-instance-name", "github"]], ) def test_send_to_celery( mocker, swh_scheduler, swh_scheduler_celery_app, listed_origins_by_type, extra_cmd_args, ): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) visit_type = next(iter(listed_origins_by_type)) for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) get_queue_length = mocker.patch( "swh.scheduler.celery_backend.config.get_queue_length" ) get_queue_length.return_value = None send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") send_task.return_value = None cmd_args = ["send-to-celery", visit_type] + extra_cmd_args result = invoke(swh_scheduler, args=tuple(cmd_args)) assert result.exit_code == 0 scheduled_tasks = { (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list } expected_tasks = { (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) for origin in listed_origins_by_type[visit_type] } assert expected_tasks == scheduled_tasks def test_update_metrics(swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) assert swh_scheduler.get_metrics() == [] result = invoke(swh_scheduler, args=("update-metrics",)) assert result.exit_code == 0 assert swh_scheduler.get_metrics() != []