diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -13,7 +13,11 @@ import csv import click -from typing import Any, Dict +from typing import Any, Dict, Optional, Iterator +from itertools import islice + +from swh.model.model import Origin +from swh.storage.interface import StorageInterface from . import cli @@ -285,6 +289,27 @@ click.echo("\n".join(output)) +def iter_origins( + storage: StorageInterface, page_token: Optional[str] = None +) -> Iterator[Origin]: + """Iterate over origins in the storage. Optionally starting from page_token. + + This logs regularly an info message during pagination with the page_token. This, in + order to feed it back to the cli if the process interrupted. + + Yields + origin model objects from the storage + + """ + while True: + page_result = storage.origin_list(page_token=page_token) + page_token = page_result.next_page_token + yield from page_result.results + if not page_token: + break + click.echo(f"page_token: {page_token}\n") + + @task.command("schedule_origins") @click.argument("type", nargs=1, required=True) @click.argument("options", nargs=-1) @@ -298,17 +323,17 @@ help="Number of origins per task", ) @click.option( - "--min-id", + "--page-token", default=0, show_default=True, - type=int, + type=str, help="Only schedule tasks for origins whose ID is greater", ) @click.option( - "--max-id", + "--limit", default=None, type=int, - help="Only schedule tasks for origins whose ID is lower", + help="Limit the tasks scheduling up to this number of tasks", ) @click.option("--storage-url", "-g", help="URL of the (graph) storage API") @click.option( @@ -319,7 +344,7 @@ ) @click.pass_context def schedule_origin_metadata_index( - ctx, type, options, storage_url, origin_batch_size, min_id, max_id, dry_run + ctx, type, options, storage_url, origin_batch_size, page_token, limit, dry_run ): """Schedules tasks for origins that are already known. @@ -333,7 +358,6 @@ task schedule_origins index-origin-metadata """ from swh.storage import get_storage - from swh.storage.algos.origin import iter_origins from .utils import parse_options, schedule_origin_batches scheduler = ctx.obj["scheduler"] @@ -345,9 +369,11 @@ if args: raise click.ClickException("Only keywords arguments are allowed.") - origins = iter_origins(storage, origin_from=min_id, origin_to=max_id) - origin_urls = (origin.url for origin in origins) + origins = iter_origins(storage, page_token=page_token) + if limit: + origins = islice(origins, limit) + origin_urls = (origin.url for origin in origins) schedule_origin_batches(scheduler, type, origin_urls, origin_batch_size, kw) diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -6,17 +6,20 @@ import datetime import re import tempfile -from unittest.mock import patch import logging +from itertools import islice + +from unittest.mock import patch from click.testing import CliRunner + import pytest from swh.model.model import Origin from swh.storage import get_storage from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict - +from swh.core.api.classes import stream_results CLI_CONFIG = """ scheduler: @@ -707,3 +710,88 @@ task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} for task in tasks ) + + +def test_task_schedule_origins_with_limit(swh_scheduler, storage): + """Tests support of extra keyword-arguments.""" + _fill_storage_with_origins(storage, 50) + limit = 20 + expected_origins = list(islice(stream_results(storage.origin_list), limit)) + nb_origins = len(expected_origins) + + assert nb_origins == limit + max_task_size = 5 + nb_tasks, remainder = divmod(nb_origins, max_task_size) + assert remainder == 0 # made the numbers go round + + result = invoke( + swh_scheduler, + False, + [ + "task", + "schedule_origins", + "swh-test-ping", + "--batch-size", + max_task_size, + "--limit", + limit, + ], + ) + + # Check the output + expected = rf""" +Scheduled {nb_tasks} tasks \({nb_origins} origins\). +Done. +""".lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) + + tasks = swh_scheduler.search_tasks() + _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) + + +def test_task_schedule_origins_with_page_token(swh_scheduler, storage): + """Tests support of extra keyword-arguments.""" + nb_total_origins = 50 + origins = _fill_storage_with_origins(storage, nb_total_origins) + + # prepare page_token and origins result expectancy + page_result = storage.origin_list(limit=10) + assert len(page_result.results) == 10 + page_token = page_result.next_page_token + assert page_token is not None + + # remove the first 10 origins listed as we won't see those in tasks + expected_origins = [o for o in origins if o not in page_result.results] + nb_origins = len(expected_origins) + assert nb_origins == nb_total_origins - len(page_result.results) + + max_task_size = 10 + nb_tasks, remainder = divmod(nb_origins, max_task_size) + assert remainder == 0 + + result = invoke( + swh_scheduler, + False, + [ + "task", + "schedule_origins", + "swh-test-ping", + "--batch-size", + max_task_size, + "--page-token", + page_token, + ], + ) + + # Check the output + expected = rf""" +Scheduled {nb_tasks} tasks \({nb_origins} origins\). +Done. +""".lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) + + # Check tasks + tasks = swh_scheduler.search_tasks() + _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins)