diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1,2 @@ swh.core >= 0.0.51 +swh.storage >= 0.0.129 diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -14,10 +14,13 @@ import datetime from swh.core import utils, config +from swh.storage import get_storage +from swh.storage.algos.origin import iter_origins + from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient from . import get_scheduler, DEFAULT_CONFIG -from .cli_utils import parse_options +from .cli_utils import parse_options, schedule_origin_batches locale.setlocale(locale.LC_ALL, '') @@ -288,6 +291,10 @@ def schedule_task(ctx, type, options, policy, priority, next_run): """Schedule one task from arguments. + The first argument is the name of the task type, further ones are + positional and keyword argument(s) of the task, in YAML format. + Keyword args are of the form key=value. + Usage sample: swh-scheduler --database 'service=swh-scheduler' \ @@ -327,6 +334,54 @@ click.echo('\n'.join(output)) +@task.command('schedule_origins') +@click.argument('type', nargs=1, required=True) +@click.argument('options', nargs=-1) +@click.option('--batch-size', '-b', 'origin_batch_size', + default=10, show_default=True, type=int, + help="Number of origins per task") +@click.option('--min-id', + default=0, show_default=True, type=int, + help="Only schedule tasks for origins whose ID is greater") +@click.option('--max-id', + default=None, type=int, + help="Only schedule tasks for origins whose ID is lower") +@click.option('--storage-url', '-g', + help="URL of the (graph) storage API") +@click.option('--dry-run/--no-dry-run', is_flag=True, + default=False, + help='List only what would be scheduled.') +@click.pass_context +def schedule_origin_metadata_index( + ctx, type, options, storage_url, origin_batch_size, + min_id, max_id, dry_run): + """Schedules tasks for origins that are already known. + + The first argument is the name of the task type, further ones are + keyword argument(s) of the task in the form key=value, where value is + in YAML format. + + Usage sample: + + swh-scheduler --database 'service=swh-scheduler' \ + task schedule_origins indexer_origin_metadata + """ + scheduler = ctx.obj['scheduler'] + storage = get_storage('remote', {'url': storage_url}) + if dry_run: + scheduler = None + + (args, kw) = parse_options(options) + if args: + raise click.ClickException('Only keywords arguments are allowed.') + + origins = iter_origins(storage, origin_from=min_id, origin_to=max_id) + origin_ids = (origin['id'] for origin in origins) + + schedule_origin_batches( + scheduler, type, origin_ids, origin_batch_size, kw) + + @task.command('list-pending') @click.argument('task-types', required=True, nargs=-1) @click.option('--limit', '-l', required=False, type=click.INT, diff --git a/swh/scheduler/cli_utils.py b/swh/scheduler/cli_utils.py --- a/swh/scheduler/cli_utils.py +++ b/swh/scheduler/cli_utils.py @@ -3,9 +3,51 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import itertools + import click import yaml +from .utils import create_task_dict + +TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler + + +def schedule_origin_batches( + scheduler, task_type, origins, origin_batch_size, kwargs): + nb_origins = 0 + nb_tasks = 0 + + while True: + task_batch = [] + for _ in range(TASK_BATCH_SIZE): + # Group origins + origin_batch = [] + for origin in itertools.islice(origins, origin_batch_size): + origin_batch.append(origin) + nb_origins += len(origin_batch) + if not origin_batch: + break + + # Create a task for these origins + args = [origin_batch] + task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs) + task_batch.append(task_dict) + + # Schedule a batch of tasks + if not task_batch: + break + nb_tasks += len(task_batch) + if scheduler: + scheduler.create_tasks(task_batch) + click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins)) + + # Print final status. + if nb_tasks: + click.echo('Done.') + else: + click.echo('Nothing to do (no origin metadata matched the criteria).') + def parse_argument(option): try: diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -11,6 +11,8 @@ from click.testing import CliRunner import pytest +from swh.storage.in_memory import Storage + from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict @@ -575,3 +577,107 @@ '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + +def _fill_storage_with_origins(storage, nb_origins): + storage.origin_add([ + { + 'type': 'type{}'.format(i), + 'url': 'http://example.com/{}'.format(i), + } + for i in range(nb_origins) + ]) + + +@pytest.fixture +def storage(): + """An instance of swh.storage.in_memory.Storage that gets injected + into the CLI functions.""" + storage = Storage() + with patch('swh.scheduler.cli.get_storage') as get_storage_mock: + get_storage_mock.return_value = storage + yield storage + + +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) +def test_task_schedule_origins_dry_run( + swh_scheduler, storage): + """Tests the scheduling when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + _fill_storage_with_origins(storage, 90) + + result = invoke(swh_scheduler, False, [ + 'task', 'schedule_origins', '--dry-run', 'swh-test-ping', + ]) + + # Check the output + expected = r'''^\[INFO\] swh.core.config -- Loading config file .* +Scheduled 3 tasks \(30 origins\). +Scheduled 6 tasks \(60 origins\). +Scheduled 9 tasks \(90 origins\). +Done. +''' + assert result.exit_code == 0, result.output + assert re.match(expected, result.output, re.MULTILINE), repr(result.output) + + # Check scheduled tasks + tasks = swh_scheduler.search_tasks() + assert len(tasks) == 0 + + +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) +def test_task_schedule_origins(swh_scheduler, storage): + """Tests the scheduling when neither origin_batch_size or + task_batch_size is a divisor of nb_origins.""" + _fill_storage_with_origins(storage, 70) + + result = invoke(swh_scheduler, False, [ + 'task', 'schedule_origins', 'swh-test-ping', + '--batch-size', '20', + ]) + + # Check the output + expected = r'''^\[INFO\] swh.core.config -- Loading config file .* +Scheduled 3 tasks \(60 origins\). +Scheduled 4 tasks \(70 origins\). +Done. +''' + assert result.exit_code == 0, result.output + assert re.match(expected, result.output, re.MULTILINE), repr(result.output) + + # Check scheduled tasks + tasks = swh_scheduler.search_tasks() + assert len(tasks) == 4 + assert tasks[0]['arguments']['args'] == [list(range(1, 21))] + assert tasks[1]['arguments']['args'] == [list(range(21, 41))] + assert tasks[2]['arguments']['args'] == [list(range(41, 61))] + assert tasks[3]['arguments']['args'] == [list(range(61, 71))] + assert all(task['arguments']['kwargs'] == {} for task in tasks) + + +def test_task_schedule_origins_kwargs(swh_scheduler, storage): + """Tests support of extra keyword-arguments.""" + _fill_storage_with_origins(storage, 30) + + result = invoke(swh_scheduler, False, [ + 'task', 'schedule_origins', 'swh-test-ping', + '--batch-size', '20', + 'key1="value1"', 'key2="value2"', + ]) + + # Check the output + expected = r'''^\[INFO\] swh.core.config -- Loading config file .* +Scheduled 2 tasks \(30 origins\). +Done. +''' + assert result.exit_code == 0, result.output + assert re.match(expected, result.output, re.MULTILINE), repr(result.output) + + # Check scheduled tasks + tasks = swh_scheduler.search_tasks() + assert len(tasks) == 2 + assert tasks[0]['arguments']['args'] == [list(range(1, 21))] + assert tasks[1]['arguments']['args'] == [list(range(21, 31))] + assert all(task['arguments']['kwargs'] == + {'key1': 'value1', 'key2': 'value2'} + for task in tasks)