diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -14,6 +14,8 @@ import click from typing import Any, Dict +from itertools import islice + from . import cli @@ -298,17 +300,17 @@ help="Number of origins per task", ) @click.option( - "--min-id", + "--page-token", default=0, show_default=True, - type=int, + type=str, help="Only schedule tasks for origins whose ID is greater", ) @click.option( - "--max-id", + "--limit", default=None, type=int, - help="Only schedule tasks for origins whose ID is lower", + help="Limit the tasks scheduling up to this number of tasks", ) @click.option("--storage-url", "-g", help="URL of the (graph) storage API") @click.option( @@ -319,7 +321,7 @@ ) @click.pass_context def schedule_origin_metadata_index( - ctx, type, options, storage_url, origin_batch_size, min_id, max_id, dry_run + ctx, type, options, storage_url, origin_batch_size, page_token, limit, dry_run ): """Schedules tasks for origins that are already known. @@ -345,9 +347,11 @@ if args: raise click.ClickException("Only keywords arguments are allowed.") - origins = iter_origins(storage, origin_from=min_id, origin_to=max_id) - origin_urls = (origin.url for origin in origins) + origins = iter_origins(storage, page_token=page_token) + if limit: + origins = islice(origins, limit) + origin_urls = (origin.url for origin in origins) schedule_origin_batches(scheduler, type, origin_urls, origin_batch_size, kw) diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -6,17 +6,20 @@ import datetime import re import tempfile -from unittest.mock import patch import logging +from itertools import islice + +from unittest.mock import patch from click.testing import CliRunner + import pytest from swh.model.model import Origin from swh.storage import get_storage from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict - +from swh.core.api.classes import stream_results CLI_CONFIG = """ scheduler: @@ -707,3 +710,88 @@ task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} for task in tasks ) + + +def test_task_schedule_origins_with_limit(swh_scheduler, storage): + """Tests support of extra keyword-arguments.""" + _fill_storage_with_origins(storage, 50) + limit = 20 + expected_origins = list(islice(stream_results(storage.origin_list), limit)) + nb_origins = len(expected_origins) + + assert nb_origins == limit + max_task_size = 5 + nb_tasks, remainder = divmod(nb_origins, max_task_size) + assert remainder == 0 # made the numbers go round + + result = invoke( + swh_scheduler, + False, + [ + "task", + "schedule_origins", + "swh-test-ping", + "--batch-size", + max_task_size, + "--limit", + limit, + ], + ) + + # Check the output + expected = rf""" +Scheduled {nb_tasks} tasks \({nb_origins} origins\). +Done. +""".lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) + + tasks = swh_scheduler.search_tasks() + _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) + + +def test_task_schedule_origins_with_page_token(swh_scheduler, storage): + """Tests support of extra keyword-arguments.""" + nb_total_origins = 50 + origins = _fill_storage_with_origins(storage, nb_total_origins) + + # prepare page_token and origins result expectancy + page_result = storage.origin_list(limit=10) + assert len(page_result.results) == 10 + page_token = page_result.next_page_token + assert page_token is not None + + # remove the first 10 origins listed as we won't see those in tasks + expected_origins = [o for o in origins if o not in page_result.results] + nb_origins = len(expected_origins) + assert nb_origins == nb_total_origins - len(page_result.results) + + max_task_size = 10 + nb_tasks, remainder = divmod(nb_origins, max_task_size) + assert remainder == 0 + + result = invoke( + swh_scheduler, + False, + [ + "task", + "schedule_origins", + "swh-test-ping", + "--batch-size", + max_task_size, + "--page-token", + page_token, + ], + ) + + # Check the output + expected = rf""" +Scheduled {nb_tasks} tasks \({nb_origins} origins\). +Done. +""".lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) + + # Check tasks + tasks = swh_scheduler.search_tasks() + _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins)