diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -100,3 +100,37 @@ origins = scheduler.grab_next_visits(count, policy=policy) for line in format_origins(origins, fields=parsed_fields, with_header=with_header): click.echo(line) + + +@origin.command("schedule-next") +@click.option( + "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" +) +@click.argument("count", type=int) +@click.pass_context +def schedule_next(ctx, policy: str, count: int): + """Send the next COUNT origin visits to the scheduler as one-shot tasks.""" + from ..utils import utcnow + from .task import pretty_print_task + + scheduler = ctx.obj["scheduler"] + + origins = scheduler.grab_next_visits(count, policy=policy) + + created = scheduler.create_tasks( + [ + { + **origin.as_task_dict(), + "policy": "oneshot", + "next_run": utcnow(), + "retries_left": 1, + } + for origin in origins + ] + ) + + output = ["Created %d tasks\n" % len(created)] + for task in created: + output.append(pretty_print_task(task)) + + click.echo_via_pager("\n".join(output)) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -171,6 +171,15 @@ metadata={"auto_now": True}, ) + def as_task_dict(self): + return { + "type": f"load-{self.visit_type}", + "arguments": { + "args": [], + "kwargs": {"url": self.url, **self.extra_loader_arguments}, + }, + } + ListedOriginPageToken = Tuple[UUID, str] diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -8,6 +8,7 @@ import pytest from swh.scheduler.cli.origin import format_origins +from swh.scheduler.tests.common import TASK_TYPES from swh.scheduler.tests.test_cli import invoke as basic_invoke @@ -74,3 +75,29 @@ assert set(origin["url"] for origin in returned_origins) <= set( origin.url for origin in listed_origins ) + + +def test_schedule_next(swh_scheduler, listed_origins): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + num_origins = 10 + assert len(listed_origins) >= num_origins + + swh_scheduler.record_listed_origins(listed_origins) + + result = invoke(swh_scheduler, args=("schedule-next", str(num_origins))) + assert result.exit_code == 0 + + # pull all tasks out of the scheduler + tasks = swh_scheduler.search_tasks() + assert len(tasks) == num_origins + + scheduled_tasks = { + (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks + } + all_possible_tasks = { + (f"load-{origin.visit_type}", origin.url) for origin in listed_origins + } + + assert scheduled_tasks <= all_possible_tasks diff --git a/swh/scheduler/tests/test_model.py b/swh/scheduler/tests/test_model.py --- a/swh/scheduler/tests/test_model.py +++ b/swh/scheduler/tests/test_model.py @@ -1,9 +1,10 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime +import uuid import attr @@ -92,3 +93,31 @@ test1 = attr.ib(type=str) assert TestModel2.primary_key_columns() == ("col1", "col2") + + +def test_listed_origin_as_task_dict(): + origin = model.ListedOrigin( + lister_id=uuid.uuid4(), url="http://example.com/", visit_type="git", + ) + + task = origin.as_task_dict() + assert task == { + "type": "load-git", + "arguments": {"args": [], "kwargs": {"url": "http://example.com/"}}, + } + + origin_w_args = model.ListedOrigin( + lister_id=uuid.uuid4(), + url="http://example.com/svn/", + visit_type="svn", + extra_loader_arguments={"foo": "bar"}, + ) + + task_w_args = origin_w_args.as_task_dict() + assert task_w_args == { + "type": "load-svn", + "arguments": { + "args": [], + "kwargs": {"url": "http://example.com/svn/", "foo": "bar"}, + }, + }