diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py --- a/swh/scheduler/cli/add_forge_now.py +++ b/swh/scheduler/cli/add_forge_now.py @@ -26,6 +26,45 @@ raise ValueError("Scheduler class (local/remote) must be instantiated") +@add_forge_now.command("register-lister") +@click.argument("task_type_name", nargs=1, required=True) +@click.argument("options", nargs=-1) +@click.option( + "--production/--staging", + "enabled", + is_flag=True, + default=True, + help="""Determine whether we want to scheduled enabled origins (on production) or + disabled ones (on staging).""", +) +@click.pass_context +def register_lister_cli( + ctx, + task_type_name, + options, + enabled, +): + from .utils import parse_options, task_add + + scheduler = ctx.obj["scheduler"] + + (args, kw) = parse_options(options) + + if enabled: # production + policy = "recurring" + else: # staging + policy = "oneshot" + kw.update({"max_pages": 1, "max_origins_per_page": 15, "enable_origins": False}) + + task_add( + scheduler, + task_type_name=task_type_name, + args=args, + kw=kw, + policy=policy, + ) + + @add_forge_now.command("schedule-first-visits") @click.option( "--type-name", diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -122,7 +122,7 @@ """Send the next COUNT origin visits of the TYPE loader to the scheduler as one-shot tasks.""" from ..utils import utcnow - from .task import pretty_print_task + from .utils import pretty_print_task scheduler = ctx.obj["scheduler"] diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -27,128 +27,6 @@ DATETIME = click.DateTime() -def format_dict(d): - """Recursively format date objects in the dict passed as argument""" - import datetime - - ret = {} - for k, v in d.items(): - if isinstance(v, (datetime.date, datetime.datetime)): - v = v.isoformat() - elif isinstance(v, dict): - v = format_dict(v) - ret[k] = v - return ret - - -def pretty_print_list(list, indent=0): - """Pretty-print a list""" - return "".join("%s%r\n" % (" " * indent, item) for item in list) - - -def pretty_print_dict(dict, indent=0): - """Pretty-print a list""" - return "".join( - "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) - for key, value in sorted(dict.items()) - ) - - -def pretty_print_run(run, indent=4): - fmt = ( - "{indent}{backend_id} [{status}]\n" - "{indent} scheduled: {scheduled} [{started}:{ended}]" - ) - return fmt.format(indent=" " * indent, **format_dict(run)) - - -def pretty_print_task(task, full=False): - """Pretty-print a task - - If 'full' is True, also print the status and priority fields. - - >>> import datetime - >>> task = { - ... 'id': 1234, - ... 'arguments': { - ... 'args': ['foo', 'bar', True], - ... 'kwargs': {'key': 'value', 'key2': 42}, - ... }, - ... 'current_interval': datetime.timedelta(hours=1), - ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), - ... 'policy': 'oneshot', - ... 'priority': None, - ... 'status': 'next_run_not_scheduled', - ... 'type': 'test_task', - ... } - >>> print(click.unstyle(pretty_print_task(task))) - Task 1234 - Next run: ... (2019-02-21T13:52:35.407818) - Interval: 1:00:00 - Type: test_task - Policy: oneshot - Args: - 'foo' - 'bar' - True - Keyword args: - key: 'value' - key2: 42 - - >>> print(click.unstyle(pretty_print_task(task, full=True))) - Task 1234 - Next run: ... (2019-02-21T13:52:35.407818) - Interval: 1:00:00 - Type: test_task - Policy: oneshot - Status: next_run_not_scheduled - Priority:\x20 - Args: - 'foo' - 'bar' - True - Keyword args: - key: 'value' - key2: 42 - - """ - import humanize - - next_run = task["next_run"] - lines = [ - "%s %s\n" % (click.style("Task", bold=True), task["id"]), - click.style(" Next run: ", bold=True), - "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), - "\n", - click.style(" Interval: ", bold=True), - str(task["current_interval"]), - "\n", - click.style(" Type: ", bold=True), - task["type"] or "", - "\n", - click.style(" Policy: ", bold=True), - task["policy"] or "", - "\n", - ] - if full: - lines += [ - click.style(" Status: ", bold=True), - task["status"] or "", - "\n", - click.style(" Priority: ", bold=True), - task["priority"] or "", - "\n", - ] - lines += [ - click.style(" Args:\n", bold=True), - pretty_print_list(task["arguments"]["args"], indent=4), - click.style(" Keyword args:\n", bold=True), - pretty_print_dict(task["arguments"]["kwargs"], indent=4), - ] - - return "".join(lines) - - @cli.group("task") @click.pass_context def task(ctx): @@ -202,6 +80,8 @@ from swh.scheduler.utils import utcnow + from .utils import pretty_print_task + tasks = [] now = utcnow() scheduler = ctx.obj["scheduler"] @@ -261,39 +141,22 @@ which is considered as the lowest priority level. """ - from swh.scheduler.utils import utcnow - - from .utils import parse_options + from .utils import parse_options, task_add scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") - if scheduler.get_task_type(task_type_name) is None: - raise ValueError(f"Unknown task type {task_type_name}.") - - now = utcnow() - (args, kw) = parse_options(options) - task = { - "type": task_type_name, - "policy": policy, - "priority": priority, - "arguments": { - "args": args, - "kwargs": kw, - }, - "next_run": next_run or now, - } - created = scheduler.create_tasks([task]) - - output = [ - "Created %d tasks\n" % len(created), - ] - for task in created: - output.append(pretty_print_task(task)) - - click.echo("\n".join(output)) + task_add( + scheduler, + task_type_name=task_type_name, + policy=policy, + priority=priority, + next_run=next_run, + args=args, + kw=kw, + ) def iter_origins( # use string annotations to prevent some pkg loading @@ -412,6 +275,8 @@ You can override the number of tasks to fetch with the --limit flag. """ + from .utils import pretty_print_task + scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") @@ -511,6 +376,8 @@ """List tasks.""" from operator import itemgetter + from .utils import pretty_print_run, pretty_print_task + scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py --- a/swh/scheduler/cli/utils.py +++ b/swh/scheduler/cli/utils.py @@ -13,7 +13,7 @@ import click if TYPE_CHECKING: - from typing import Dict, Optional + from typing import Dict, List, Optional, Tuple from swh.scheduler.interface import SchedulerInterface @@ -73,7 +73,7 @@ raise click.ClickException("Invalid argument: {}".format(option)) -def parse_options(options): +def parse_options(options: List[str]) -> Tuple[List[str], Dict]: """Parses options from a CLI as YAML and turns it into Python args and kwargs. @@ -177,3 +177,158 @@ kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, ) + + +def pretty_print_list(list, indent=0): + """Pretty-print a list""" + return "".join("%s%r\n" % (" " * indent, item) for item in list) + + +def pretty_print_dict(dict, indent=0): + """Pretty-print a list""" + return "".join( + "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) + for key, value in sorted(dict.items()) + ) + + +def format_dict(d): + """Recursively format date objects in the dict passed as argument""" + import datetime + + ret = {} + for k, v in d.items(): + if isinstance(v, (datetime.date, datetime.datetime)): + v = v.isoformat() + elif isinstance(v, dict): + v = format_dict(v) + ret[k] = v + return ret + + +def pretty_print_run(run, indent=4): + fmt = ( + "{indent}{backend_id} [{status}]\n" + "{indent} scheduled: {scheduled} [{started}:{ended}]" + ) + return fmt.format(indent=" " * indent, **format_dict(run)) + + +def pretty_print_task(task, full=False): + """Pretty-print a task + + If 'full' is True, also print the status and priority fields. + + >>> import datetime + >>> task = { + ... 'id': 1234, + ... 'arguments': { + ... 'args': ['foo', 'bar', True], + ... 'kwargs': {'key': 'value', 'key2': 42}, + ... }, + ... 'current_interval': datetime.timedelta(hours=1), + ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), + ... 'policy': 'oneshot', + ... 'priority': None, + ... 'status': 'next_run_not_scheduled', + ... 'type': 'test_task', + ... } + >>> print(click.unstyle(pretty_print_task(task))) + Task 1234 + Next run: ... (2019-02-21T13:52:35.407818) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Args: + 'foo' + 'bar' + True + Keyword args: + key: 'value' + key2: 42 + + >>> print(click.unstyle(pretty_print_task(task, full=True))) + Task 1234 + Next run: ... (2019-02-21T13:52:35.407818) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Status: next_run_not_scheduled + Priority:\x20 + Args: + 'foo' + 'bar' + True + Keyword args: + key: 'value' + key2: 42 + + """ + import humanize + + next_run = task["next_run"] + lines = [ + "%s %s\n" % (click.style("Task", bold=True), task["id"]), + click.style(" Next run: ", bold=True), + "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), + "\n", + click.style(" Interval: ", bold=True), + str(task["current_interval"]), + "\n", + click.style(" Type: ", bold=True), + task["type"] or "", + "\n", + click.style(" Policy: ", bold=True), + task["policy"] or "", + "\n", + ] + if full: + lines += [ + click.style(" Status: ", bold=True), + task["status"] or "", + "\n", + click.style(" Priority: ", bold=True), + task["priority"] or "", + "\n", + ] + lines += [ + click.style(" Args:\n", bold=True), + pretty_print_list(task["arguments"]["args"], indent=4), + click.style(" Keyword args:\n", bold=True), + pretty_print_dict(task["arguments"]["kwargs"], indent=4), + ] + + return "".join(lines) + + +def task_add( + scheduler: SchedulerInterface, + task_type_name: str, + args: List[str], + kw: Dict, + policy: str, + priority: Optional[str] = None, + next_run: Optional[str] = None, +): + from swh.scheduler.utils import utcnow + + if scheduler.get_task_type(task_type_name) is None: + raise ValueError(f"Unknown task type {task_type_name}.") + + task = { + "type": task_type_name, + "policy": policy, + "priority": priority, + "arguments": { + "args": args, + "kwargs": kw, + }, + "next_run": next_run or utcnow(), + } + created = scheduler.create_tasks([task]) + + output = [f"Created {len(created)} tasks\n"] + for task in created: + output.append(pretty_print_task(task)) + + click.echo("\n".join(output)) diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py --- a/swh/scheduler/tests/test_cli_add_forge_now.py +++ b/swh/scheduler/tests/test_cli_add_forge_now.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import re from typing import Tuple import attr @@ -86,3 +87,45 @@ } assert expected_tasks == scheduled_tasks + + +def test_schedule_register_lister(swh_scheduler): + result = invoke( + swh_scheduler, + [ + "register-lister", + "swh-test-ping", + "arg1", + "arg2", + "key=value", + ], + ) + expected = r""" +Created 1 tasks + +Task 1 + Next run: today \(.*\) + Interval: 1 day, 0:00:00 + Type: swh-test-ping + Policy: recurring + Args: + 'arg1' + 'arg2' + Keyword args: + key: 'value' + +""".lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + +def test_register_lister_unknown_task_type(swh_scheduler): + """When scheduling unknown task type, the cli should raise.""" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + [ + "register-lister", + "unknown-task-type-should-raise", + ], + )