diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py index 61ab1a5..97634af 100644 --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -1,258 +1,271 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING, Iterable, List, Optional import click from . import cli from ..utils import create_origin_task_dicts if TYPE_CHECKING: from uuid import UUID from ..interface import SchedulerInterface from ..model import ListedOrigin @cli.group("origin") @click.pass_context def origin(ctx): """Manipulate listed origins.""" if not ctx.obj["scheduler"]: raise ValueError("Scheduler class (local/remote) must be instantiated") def format_origins( origins: List[ListedOrigin], fields: Optional[List[str]] = None, with_header: bool = True, ) -> Iterable[str]: """Format a list of origins as CSV. Arguments: origins: list of origins to output fields: optional list of fields to output (defaults to all fields) with_header: if True, output a CSV header. """ import csv from io import StringIO import attr from ..model import ListedOrigin expected_fields = [field.name for field in attr.fields(ListedOrigin)] if not fields: fields = expected_fields unknown_fields = set(fields) - set(expected_fields) if unknown_fields: raise ValueError( "Unknown ListedOrigin field(s): %s" % ", ".join(unknown_fields) ) output = StringIO() writer = csv.writer(output) def csv_row(data): """Return a single CSV-formatted row. We clear the output buffer after we're done to keep it reasonably sized.""" writer.writerow(data) output.seek(0) ret = output.read().rstrip() output.seek(0) output.truncate() return ret if with_header: yield csv_row(fields) for origin in origins: yield csv_row(str(getattr(origin, field)) for field in fields) @origin.command("grab-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--fields", "-f", default=None, help="Listed origin fields to print on output" ) @click.option( "--with-header/--without-header", is_flag=True, default=True, help="Print the CSV header?", ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def grab_next( ctx, policy: str, fields: Optional[str], with_header: bool, type: str, count: int ): """Grab the next COUNT origins to visit using the TYPE loader from the listed origins table.""" if fields: parsed_fields: Optional[List[str]] = fields.split(",") else: parsed_fields = None scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) for line in format_origins(origins, fields=parsed_fields, with_header=with_header): click.echo(line) @origin.command("schedule-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def schedule_next(ctx, policy: str, type: str, count: int): """Send the next COUNT origin visits of the TYPE loader to the scheduler as one-shot tasks.""" from ..utils import utcnow from .task import pretty_print_task scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) created = scheduler.create_tasks( [ { **task_dict, "policy": "oneshot", "next_run": utcnow(), "retries_left": 1, } for task_dict in create_origin_task_dicts(origins, scheduler) ] ) output = ["Created %d tasks\n" % len(created)] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager("\n".join(output)) @origin.command("send-to-celery") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--queue", "-q", help="Target celery queue", type=str, ) @click.option( "--tablesample", help="Table sampling percentage", type=float, ) @click.option( "--only-enabled/--only-disabled", "enabled", is_flag=True, default=True, help="""Determine whether we want to scheduled enabled or disabled origins. As default, we want to reasonably deal with enabled origins. For some edge case though, we might want the disabled ones.""", ) @click.option( - "--lister-uuid", + "--lister-name", default=None, - help="Limit origins to those listed from such lister", + help="Limit origins to those listed from lister with provided name", +) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", ) @click.argument("type", type=str) @click.pass_context def send_to_celery( ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str, enabled: bool, - lister_uuid: Optional[str] = None, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, ): """Send the next origin visits of the TYPE loader to celery, filling the queue.""" from kombu.utils.uuid import uuid from swh.scheduler.celery_backend.config import app, get_available_slots scheduler = ctx.obj["scheduler"] task_type = scheduler.get_task_type(f"load-{type}") task_name = task_type["backend_name"] queue_name = queue or task_name num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) click.echo(f"{num_tasks} slots available in celery queue") + + lister_uuid: Optional[str] = None + if lister_name and lister_instance_name: + lister = scheduler.get_lister(lister_name, lister_instance_name) + if lister: + lister_uuid = lister.id + origins = scheduler.grab_next_visits( type, num_tasks, policy=policy, tablesample=tablesample, enabled=enabled, lister_uuid=lister_uuid, ) click.echo(f"{len(origins)} visits to send to celery") for task_dict in create_origin_task_dicts(origins, scheduler): app.send_task( task_name, task_id=uuid(), args=task_dict["arguments"]["args"], kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, ) @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( "--instance", default=None, help="Only update metrics for this lister instance" ) @click.pass_context def update_metrics(ctx, lister: Optional[str], instance: Optional[str]): """Update the scheduler metrics on listed origins. Examples: swh scheduler origin update-metrics swh scheduler origin update-metrics --lister github swh scheduler origin update-metrics --lister phabricator --instance llvm """ import json import attr scheduler: SchedulerInterface = ctx.obj["scheduler"] lister_id: Optional[UUID] = None if lister is not None: lister_instance = scheduler.get_lister(name=lister, instance_name=instance) if not lister_instance: click.echo(f"Lister not found: {lister} instance={instance}") ctx.exit(2) assert False # for mypy lister_id = lister_instance.id def dictify_metrics(d): return {k: str(v) for (k, v) in attr.asdict(d).items()} ret = scheduler.update_metrics(lister_id=lister_id) click.echo(json.dumps(list(map(dictify_metrics, ret)), indent=4, sort_keys=True)) diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py index d25c4f6..e256b9d 100644 --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -1,127 +1,127 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime from typing import Dict, List, Optional TEMPLATES = { "test-git": { "type": "load-test-git", "arguments": { "args": [], "kwargs": {}, }, "next_run": None, }, "test-hg": { "type": "load-test-hg", "arguments": { "args": [], "kwargs": {}, }, "next_run": None, "policy": "oneshot", }, } TASK_TYPES = { "test-git": { "type": "load-test-git", "description": "Update a git repository", "backend_name": "swh.loader.git.tasks.UpdateGitRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, "test-hg": { "type": "load-test-hg", "description": "Update a mercurial repository", "backend_name": "swh.loader.mercurial.tasks.UpdateHgRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, } def _task_from_template( template: Dict, next_run: datetime.datetime, priority: Optional[str], *args, **kwargs, ) -> Dict: ret = copy.deepcopy(template) ret["next_run"] = next_run if priority: ret["priority"] = priority if args: ret["arguments"]["args"] = list(args) if kwargs: ret["arguments"]["kwargs"] = kwargs return ret def tasks_from_template( template: Dict, max_timestamp: datetime.datetime, num: Optional[int] = None, priority: Optional[str] = None, num_priorities: Dict[Optional[str], int] = {}, ) -> List[Dict]: """Build ``num`` tasks from template""" assert bool(num) != bool(num_priorities), "mutually exclusive" if not num_priorities: assert num is not None # to please mypy num_priorities = {None: num} tasks: List[Dict] = [] for (priority, num) in num_priorities.items(): for _ in range(num): i = len(tasks) tasks.append( _task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, "argument-%03d" % i, **{"kwarg%03d" % i: "bogus-kwarg"}, ) ) return tasks def tasks_with_priority_from_template( template: Dict, max_timestamp: datetime.datetime, num: int, priority: str ) -> List[Dict]: """Build tasks with priority from template""" return [ _task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, "argument-%03d" % i, **{"kwarg%03d" % i: "bogus-kwarg"}, ) for i in range(num) ] LISTERS = ( - {"name": "github"}, + {"name": "github", "instance_name": "github"}, {"name": "gitlab", "instance_name": "gitlab"}, {"name": "gitlab", "instance_name": "freedesktop"}, {"name": "npm"}, {"name": "pypi"}, ) diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py index 90644c5..1a4949e 100644 --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -1,162 +1,169 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Tuple import pytest from swh.scheduler.cli.origin import format_origins from swh.scheduler.tests.common import TASK_TYPES from swh.scheduler.tests.test_cli import invoke as basic_invoke def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): return basic_invoke( scheduler, args=["origin", *args], catch_exceptions=catch_exceptions ) def test_cli_origin(swh_scheduler): """Check that swh scheduler origin returns its help text""" result = invoke(swh_scheduler) assert "Commands:" in result.stdout def test_format_origins_basic(listed_origins): listed_origins = listed_origins[:100] basic_output = list(format_origins(listed_origins)) # 1 header line + all origins assert len(basic_output) == len(listed_origins) + 1 no_header_output = list(format_origins(listed_origins, with_header=False)) assert basic_output[1:] == no_header_output def test_format_origins_fields_unknown(listed_origins): listed_origins = listed_origins[:10] it = format_origins(listed_origins, fields=["unknown_field"]) with pytest.raises(ValueError, match="unknown_field"): next(it) def test_format_origins_fields(listed_origins): listed_origins = listed_origins[:10] fields = ["lister_id", "url", "visit_type"] output = list(format_origins(listed_origins, fields=fields)) assert output[0] == ",".join(fields) for i, origin in enumerate(listed_origins): assert output[i + 1] == f"{origin.lister_id},{origin.url},{origin.visit_type}" def test_grab_next(swh_scheduler, listed_origins_by_type): NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested # XXX: should test all of 'listed_origins_by_type' here... visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("grab-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 out_lines = result.stdout.splitlines() assert len(out_lines) == NUM_RESULTS + 1 fields = out_lines[0].split(",") returned_origins = [dict(zip(fields, line.split(","))) for line in out_lines[1:]] # Check that we've received origins we had listed in the first place assert set(origin["url"] for origin in returned_origins) <= set( origin.url for origin in listed_origins_by_type[visit_type] ) def test_schedule_next(swh_scheduler, listed_origins_by_type): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("schedule-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 # pull all tasks out of the scheduler tasks = swh_scheduler.search_tasks() assert len(tasks) == NUM_RESULTS scheduled_tasks = { (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks } all_possible_tasks = { (f"load-{origin.visit_type}", origin.url) for origin in listed_origins_by_type[visit_type] } assert scheduled_tasks <= all_possible_tasks +@pytest.mark.parametrize( + "extra_cmd_args", + [[], ["--lister-name", "github", "--lister-instance-name", "github"]], +) def test_send_to_celery( mocker, swh_scheduler, swh_scheduler_celery_app, listed_origins_by_type, + extra_cmd_args, ): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) visit_type = next(iter(listed_origins_by_type)) for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) get_queue_length = mocker.patch( "swh.scheduler.celery_backend.config.get_queue_length" ) get_queue_length.return_value = None send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") send_task.return_value = None - result = invoke(swh_scheduler, args=("send-to-celery", visit_type)) + cmd_args = ["send-to-celery", visit_type] + extra_cmd_args + + result = invoke(swh_scheduler, args=tuple(cmd_args)) assert result.exit_code == 0 scheduled_tasks = { (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list } expected_tasks = { (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) for origin in listed_origins_by_type[visit_type] } assert expected_tasks == scheduled_tasks def test_update_metrics(swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) assert swh_scheduler.get_metrics() == [] result = invoke(swh_scheduler, args=("update-metrics",)) assert result.exit_code == 0 assert swh_scheduler.get_metrics() != [] diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py index e518984..2d46058 100644 --- a/swh/scheduler/tests/test_utils.py +++ b/swh/scheduler/tests/test_utils.py @@ -1,191 +1,191 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timezone from unittest.mock import patch import uuid from swh.scheduler import model, utils from .common import LISTERS @patch("swh.scheduler.utils.datetime") def test_create_oneshot_task_dict_simple(mock_datetime): mock_datetime.now.return_value = "some-date" actual_task = utils.create_oneshot_task_dict("some-task-type") expected_task = { "policy": "oneshot", "type": "some-task-type", "next_run": "some-date", "arguments": { "args": [], "kwargs": {}, }, } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch("swh.scheduler.utils.datetime") def test_create_oneshot_task_dict_other_call(mock_datetime): mock_datetime.now.return_value = "some-other-date" actual_task = utils.create_oneshot_task_dict( "some-task-type", "arg0", "arg1", priority="high", other_stuff="normal" ) expected_task = { "policy": "oneshot", "type": "some-task-type", "next_run": "some-other-date", "arguments": { "args": ("arg0", "arg1"), "kwargs": {"other_stuff": "normal"}, }, "priority": "high", } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch("swh.scheduler.utils.datetime") def test_create_task_dict(mock_datetime): mock_datetime.now.return_value = "date" actual_task = utils.create_task_dict( "task-type", "recurring", "arg0", "arg1", priority="low", other_stuff="normal", retries_left=3, ) expected_task = { "policy": "recurring", "type": "task-type", "next_run": "date", "arguments": { "args": ("arg0", "arg1"), "kwargs": {"other_stuff": "normal"}, }, "priority": "low", "retries_left": 3, } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) def test_create_origin_task_dict(): lister = model.Lister(**LISTERS[1], id=uuid.uuid4()) origin = model.ListedOrigin( lister_id=lister.id, url="http://example.com/", visit_type="git", ) task = utils.create_origin_task_dict(origin, lister) assert task == { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/", "lister_name": LISTERS[1]["name"], "lister_instance_name": LISTERS[1]["instance_name"], }, }, } loader_args = {"foo": "bar", "baz": {"foo": "bar"}} origin_w_args = model.ListedOrigin( lister_id=lister.id, url="http://example.com/svn/", visit_type="svn", extra_loader_arguments=loader_args, ) task_w_args = utils.create_origin_task_dict(origin_w_args, lister) assert task_w_args == { "type": "load-svn", "arguments": { "args": [], "kwargs": { "url": "http://example.com/svn/", "lister_name": LISTERS[1]["name"], "lister_instance_name": LISTERS[1]["instance_name"], **loader_args, }, }, } def test_create_origin_task_dicts(swh_scheduler): listers = [] for lister_args in LISTERS: listers.append(swh_scheduler.get_or_create_lister(**lister_args)) origin1 = model.ListedOrigin( lister_id=listers[0].id, url="http://example.com/1", visit_type="git", ) origin2 = model.ListedOrigin( lister_id=listers[0].id, url="http://example.com/2", visit_type="git", ) origin3 = model.ListedOrigin( lister_id=listers[1].id, url="http://example.com/3", visit_type="git", ) origins = [origin1, origin2, origin3] tasks = utils.create_origin_task_dicts(origins, swh_scheduler) assert tasks == [ { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/1", "lister_name": LISTERS[0]["name"], - "lister_instance_name": None, + "lister_instance_name": LISTERS[0]["instance_name"], }, }, }, { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/2", "lister_name": LISTERS[0]["name"], - "lister_instance_name": None, + "lister_instance_name": LISTERS[0]["instance_name"], }, }, }, { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/3", "lister_name": LISTERS[1]["name"], "lister_instance_name": LISTERS[1]["instance_name"], }, }, }, ]