diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1,2 @@ swh.core >= 0.0.51 +swh.storage >= 0.0.129 diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ flask kombu psycopg2 +pyyaml vcversioner # test dependencies diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -14,10 +14,13 @@ import datetime from swh.core import utils, config +from swh.storage import get_storage +from swh.storage.algos.origin import iter_origins + from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient from . import get_scheduler, DEFAULT_CONFIG -from .cli_utils import parse_options +from .cli_utils import parse_options, schedule_origin_batches locale.setlocale(locale.LC_ALL, '') @@ -51,14 +54,14 @@ def pretty_print_list(list, indent=0): """Pretty-print a list""" - return ''.join('%s%s\n' % (' ' * indent, item) for item in list) + return ''.join('%s%r\n' % (' ' * indent, item) for item in list) def pretty_print_dict(dict, indent=0): """Pretty-print a list""" - return ''.join('%s%s: %s\n' % + return ''.join('%s%s: %r\n' % (' ' * indent, click.style(key, bold=True), value) - for key, value in dict.items()) + for key, value in sorted(dict.items())) def pretty_print_run(run, indent=4): @@ -75,8 +78,8 @@ >>> task = { ... 'id': 1234, ... 'arguments': { - ... 'args': ['foo', 'bar'], - ... 'kwargs': {'key': 'value'}, + ... 'args': ['foo', 'bar', True], + ... 'kwargs': {'key': 'value', 'key2': 42}, ... }, ... 'current_interval': datetime.timedelta(hours=1), ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), @@ -92,10 +95,12 @@ Type: test_task Policy: oneshot Args: - foo - bar + 'foo' + 'bar' + True Keyword args: - key: value + key: 'value' + key2: 42 >>> print(click.unstyle(pretty_print_task(task, full=True))) Task 1234 @@ -106,10 +111,12 @@ Status: next_run_not_scheduled Priority:\x20 Args: - foo - bar + 'foo' + 'bar' + True Keyword args: - key: value + key: 'value' + key2: 42 """ next_run = arrow.get(task['next_run']) @@ -284,6 +291,10 @@ def schedule_task(ctx, type, options, policy, priority, next_run): """Schedule one task from arguments. + The first argument is the name of the task type, further ones are + positional and keyword argument(s) of the task, in YAML format. + Keyword args are of the form key=value. + Usage sample: swh-scheduler --database 'service=swh-scheduler' \ @@ -323,6 +334,54 @@ click.echo('\n'.join(output)) +@task.command('schedule_origins') +@click.argument('type', nargs=1, required=True) +@click.argument('options', nargs=-1) +@click.option('--batch-size', '-b', 'origin_batch_size', + default=10, show_default=True, type=int, + help="Number of origins per task") +@click.option('--min-id', + default=0, show_default=True, type=int, + help="Only schedule tasks for origins whose ID is greater") +@click.option('--max-id', + default=None, type=int, + help="Only schedule tasks for origins whose ID is lower") +@click.option('--storage-url', '-g', + help="URL of the (graph) storage API") +@click.option('--dry-run/--no-dry-run', is_flag=True, + default=False, + help='List only what would be scheduled.') +@click.pass_context +def schedule_origin_metadata_index( + ctx, type, options, storage_url, origin_batch_size, + min_id, max_id, dry_run): + """Schedules tasks for origins that are already knows. + + The first argument is the name of the task type, further ones are + keyword argument(s) of the task in the form key=value, where value is + in YAML format. + + Usage sample: + + swh-scheduler --database 'service=swh-scheduler' \ + task schedule_origins indexer_origin_metadata + """ + scheduler = ctx.obj['scheduler'] + storage = get_storage('remote', {'url': storage_url}) + if dry_run: + scheduler = None + + (args, kw) = parse_options(options) + if args: + raise click.ClickException('Only keywords arguments are allowed.') + + origins = iter_origins(storage, origin_from=min_id, origin_to=max_id) + origin_ids = (origin['id'] for origin in origins) + + schedule_origin_batches( + scheduler, type, origin_ids, origin_batch_size, kw) + + @task.command('list-pending') @click.argument('task-types', required=True, nargs=-1) @click.option('--limit', '-l', required=False, type=click.INT, diff --git a/swh/scheduler/cli_utils.py b/swh/scheduler/cli_utils.py --- a/swh/scheduler/cli_utils.py +++ b/swh/scheduler/cli_utils.py @@ -3,19 +3,77 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import itertools + +import click +import yaml + +from .utils import create_task_dict + +TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler + + +def schedule_origin_batches( + scheduler, task_type, origins, origin_batch_size, kwargs): + nb_origins = 0 + nb_tasks = 0 + + while True: + task_batch = [] + for _ in range(TASK_BATCH_SIZE): + # Group origins + origin_batch = [] + for (_, origin) in itertools.islice(origins, origin_batch_size): + origin_batch.append(origin) + nb_origins += len(origin_batch) + if not origin_batch: + break + + # Create a task for these origins + args = [origin_batch] + task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs) + task_batch.append(task_dict) + + # Schedule a batch of tasks + if not task_batch: + break + nb_tasks += len(task_batch) + if scheduler: + scheduler.create_tasks(task_batch) + click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins)) + + # Print final status. + if nb_tasks: + click.echo('Done.') + else: + click.echo('Nothing to do (no origin metadata matched the criteria).') + def parse_options(options): - """Parses options from a CLI and turns it into Python args and kwargs. + """Parses options from a CLI as YAML and turns it into Python + args and kwargs. >>> parse_options([]) ([], {}) >>> parse_options(['foo', 'bar']) (['foo', 'bar'], {}) - >>> parse_options(['foo=bar']) + >>> parse_options(['[foo, bar]']) + ([['foo', 'bar']], {}) + >>> parse_options(['"foo"', '"bar"']) + (['foo', 'bar'], {}) + >>> parse_options(['foo="bar"']) ([], {'foo': 'bar'}) - >>> parse_options(['foo', 'bar=baz']) + >>> parse_options(['"foo"', 'bar="baz"']) (['foo'], {'bar': 'baz'}) + >>> parse_options(['42', 'bar=False']) + ([42], {'bar': False}) + >>> parse_options(['42', 'bar=false']) + ([42], {'bar': False}) """ - args = [x for x in options if '=' not in x] - kw = dict(x.split('=', 1) for x in options if '=' in x) + kw_pairs = [x.split('=', 1) for x in options if '=' in x] + try: + args = [yaml.load(x) for x in options if '=' not in x] + kw = {k: yaml.load(v) for (k, v) in kw_pairs} + except ValueError: + raise click.ClickException('Invalid argument') return (args, kw) diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -9,6 +9,9 @@ from unittest.mock import patch from click.testing import CliRunner +import pytest + +from swh.storage.in_memory import Storage from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict @@ -61,7 +64,7 @@ Args: \['arg1', 'arg2'\] Keyword args: - key: value + key: 'value' Task 2 Next run: just now \(.*\) @@ -71,7 +74,7 @@ Args: \['arg3', 'arg4'\] Keyword args: - key: value + key: 'value' '''.lstrip() assert result.exit_code == 0, result.output @@ -99,10 +102,10 @@ Type: swh-test-ping Policy: oneshot Args: - arg1 - arg2 + 'arg1' + 'arg2' Keyword args: - key: value + key: 'value' '''.lstrip() assert result.exit_code == 0, result.output @@ -112,7 +115,7 @@ def test_schedule_task(swh_scheduler): result = invoke(swh_scheduler, False, [ 'task', 'add', - 'swh-test-ping', 'arg1', 'arg2', 'key=value', + 'swh-test-ping', '"arg1"', '"arg2"', 'key="value"', ]) expected = r''' \[INFO\] swh.core.config -- Loading config file .* @@ -124,10 +127,10 @@ Type: swh-test-ping Policy: recurring Args: - arg1 - arg2 + 'arg1' + 'arg2' Keyword args: - key: value + key: 'value' '''.lstrip() assert result.exit_code == 0, result.output @@ -167,7 +170,7 @@ Policy: oneshot Args: Keyword args: - key: value + key: 'value' '''.lstrip() assert result.exit_code == 0, result.output @@ -189,3 +192,107 @@ '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + +def _fill_storage_with_origins(storage, nb_origins): + storage.origin_add([ + { + 'type': 'type{}'.format(i), + 'url': 'http://example.com/{}'.format(i), + } + for i in range(nb_origins) + ]) + + +@pytest.fixture +def storage(): + """An instance of swh.storage.in_memory.Storage that gets injected + into the CLI functions.""" + storage = Storage() + with patch('swh.scheduler.cli.get_storage') as get_storage_mock: + get_storage_mock.return_value = storage + yield storage + + +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) +def test_task_schedule_origins_dry_run( + swh_scheduler, storage): + """Tests the scheduling when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + _fill_storage_with_origins(storage, 90) + + result = invoke(swh_scheduler, False, [ + 'task', 'schedule_origins', '--dry-run', 'swh-test-ping', + ]) + + # Check the output + expected = r'''^\[INFO\] swh.core.config -- Loading config file .* +Scheduled 3 tasks \(30 origins\). +Scheduled 6 tasks \(60 origins\). +Scheduled 9 tasks \(90 origins\). +Done. +''' + assert result.exit_code == 0, result.output + assert re.match(expected, result.output, re.MULTILINE), repr(result.output) + + # Check scheduled tasks + tasks = swh_scheduler.search_tasks() + assert len(tasks) == 0 + + +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) +def test_task_schedule_origins(swh_scheduler, storage): + """Tests the scheduling when neither origin_batch_size or + task_batch_size is a divisor of nb_origins.""" + _fill_storage_with_origins(storage, 70) + + result = invoke(swh_scheduler, False, [ + 'task', 'schedule_origins', 'swh-test-ping', + '--batch-size', '20', + ]) + + # Check the output + expected = r'''^\[INFO\] swh.core.config -- Loading config file .* +Scheduled 3 tasks \(60 origins\). +Scheduled 4 tasks \(70 origins\). +Done. +''' + assert result.exit_code == 0, result.output + assert re.match(expected, result.output, re.MULTILINE), repr(result.output) + + # Check scheduled tasks + tasks = swh_scheduler.search_tasks() + assert len(tasks) == 4 + assert tasks[0]['arguments']['args'] == [list(range(1, 21))] + assert tasks[1]['arguments']['args'] == [list(range(21, 41))] + assert tasks[2]['arguments']['args'] == [list(range(41, 61))] + assert tasks[3]['arguments']['args'] == [list(range(61, 71))] + assert all(task['arguments']['kwargs'] == {} for task in tasks) + + +def test_task_schedule_origins_kwargs(swh_scheduler, storage): + """Tests support of extra keyword-arguments.""" + _fill_storage_with_origins(storage, 30) + + result = invoke(swh_scheduler, False, [ + 'task', 'schedule_origins', 'swh-test-ping', + '--batch-size', '20', + 'key1="value1"', 'key2="value2"', + ]) + + # Check the output + expected = r'''^\[INFO\] swh.core.config -- Loading config file .* +Scheduled 2 tasks \(30 origins\). +Done. +''' + assert result.exit_code == 0, result.output + assert re.match(expected, result.output, re.MULTILINE), repr(result.output) + + # Check scheduled tasks + tasks = swh_scheduler.search_tasks() + assert len(tasks) == 2 + assert tasks[0]['arguments']['args'] == [list(range(1, 21))] + assert tasks[1]['arguments']['args'] == [list(range(21, 31))] + assert all(task['arguments']['kwargs'] == + {'key1': 'value1', 'key2': 'value2'} + for task in tasks)