diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,6 +1,6 @@ swh.core >= 0.0.53 swh.model >= 0.0.15 swh.objstorage >= 0.0.28 -swh.scheduler >= 0.0.39 +swh.scheduler >= 0.0.47 swh.storage >= 0.0.123 swh.journal >= 0.0.6 diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,25 +1,170 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click +from swh.core import config +from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_task_dict +from swh.storage import get_storage +from swh.indexer.storage import get_indexer_storage from swh.indexer.storage.api.server import load_and_check_config, app -@click.command() +CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) + +TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler + + +@click.group(context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") +@click.pass_context +def cli(ctx, config_file): + """Software Heritage Indexer CLI interface + """ + ctx.ensure_object(dict) + + conf = config.read(config_file) + ctx.obj['config'] = conf + + +def _get_api(getter, config, config_key, url): + if url: + config[config_key] = { + 'cls': 'remote', + 'args': {'url': url} + } + elif config_key not in config: + raise click.ClickException( + 'Missing configuration for {}'.format(config_key)) + return getter(**config[config_key]) + + +@cli.group('schedule') +@click.option('--scheduler-url', '-s', default=None, + help="URL of the scheduler API") +@click.option('--indexer-storage-url', '-i', default=None, + help="URL of the indexer storage API") +@click.option('--storage-url', '-g', default=None, + help="URL of the (graph) storage API") +@click.option('--dry-run/--no-dry-run', is_flag=True, + default=False, + help='Default to list only what would be scheduled.') +@click.pass_context +def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, + dry_run): + """Manipulate indexer tasks via SWH Scheduler's API.""" + ctx.obj['indexer_storage'] = _get_api( + get_indexer_storage, + ctx.obj['config'], + 'indexer_storage', + indexer_storage_url + ) + ctx.obj['storage'] = _get_api( + get_storage, + ctx.obj['config'], + 'storage', + storage_url + ) + ctx.obj['scheduler'] = _get_api( + get_scheduler, + ctx.obj['config'], + 'scheduler', + scheduler_url + ) + if dry_run: + ctx.obj['scheduler'] = None + + +def list_origins_by_producer(idx_storage, mappings, tool_ids): + start = 0 + limit = 10000 + while True: + origins = list( + idx_storage.origin_intrinsic_metadata_search_by_producer( + start=start, limit=limit, ids_only=True, + mappings=mappings or None, tool_ids=tool_ids or None)) + if not origins: + break + start = origins[-1]+1 + yield from origins + + +@schedule.command('reindex_origin_metadata') +@click.option('--batch-size', '-b', 'origin_batch_size', + default=10, show_default=True, type=int, + help="Number of origins per task") +@click.option('--tool-id', '-t', 'tool_ids', type=int, multiple=True, + help="Restrict search of old metadata to this/these tool ids.") +@click.option('--mapping', '-m', 'mappings', multiple=True, + help="Mapping(s) that should be re-scheduled (eg. 'npm', " + "'gemspec', 'maven')") +@click.option('--task-type', + default='indexer_origin_metadata', show_default=True, + help="Name of the task type to schedule.") +@click.pass_context +def schedule_origin_metadata_reindex( + ctx, origin_batch_size, mappings, tool_ids, task_type): + """Schedules indexing tasks for origins that were already indexed.""" + idx_storage = ctx.obj['indexer_storage'] + scheduler = ctx.obj['scheduler'] + + origins = list_origins_by_producer(idx_storage, mappings, tool_ids) + kwargs = {"policy_update": "update-dups", "parse_ids": False} + nb_origins = 0 + nb_tasks = 0 + + while True: + task_batch = [] + for _ in range(TASK_BATCH_SIZE): + # Group origins + origin_batch = [] + for (_, origin) in zip(range(origin_batch_size), origins): + origin_batch.append(origin) + nb_origins += len(origin_batch) + if not origin_batch: + break + + # Create a task for these origins + args = [origin_batch] + task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs) + task_batch.append(task_dict) + + # Schedule a batch of tasks + if not task_batch: + break + nb_tasks += len(task_batch) + if scheduler: + scheduler.create_tasks(task_batch) + click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins)) + + # Print final status. + if nb_tasks: + click.echo('Done.') + else: + click.echo('Nothing to do (no origin metadata matched the criteria).') + + +@cli.command('api-server') @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5007, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") -def main(config_path, host, port, debug): +def api_server(config_path, host, port, debug): api_cfg = load_and_check_config(config_path, type='any') app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) +def main(): + return cli(auto_envvar_prefix='SWH_INDEXER') + + if __name__ == '__main__': main() diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py new file mode 100644 --- /dev/null +++ b/swh/indexer/tests/test_cli.py @@ -0,0 +1,278 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import reduce +import tempfile +from unittest.mock import patch + +from click.testing import CliRunner + +from swh.model.hashutil import hash_to_bytes + +from swh.indexer.cli import cli + + +CLI_CONFIG = ''' +scheduler: + cls: foo + args: {} +storage: + cls: memory + args: {} +indexer_storage: + cls: memory + args: {} +''' + + +def fill_idx_storage(idx_storage, nb_rows): + tools = [ + { + 'tool_name': 'tool %d' % i, + 'tool_version': '0.0.1', + 'tool_configuration': {}, + } + for i in range(2) + ] + tools = idx_storage.indexer_configuration_add(tools) + + origin_metadata = [ + { + 'origin_id': origin_id, + 'from_revision': hash_to_bytes('abcd{:0>4}'.format(origin_id)), + 'indexer_configuration_id': tools[origin_id % 2]['id'], + 'metadata': {'name': 'origin %d' % origin_id}, + 'mappings': ['mapping%d' % (origin_id % 10)] + } + for origin_id in range(nb_rows) + ] + revision_metadata = [ + { + 'id': hash_to_bytes('abcd{:0>4}'.format(origin_id)), + 'indexer_configuration_id': tools[origin_id % 2]['id'], + 'metadata': {'name': 'origin %d' % origin_id}, + 'mappings': ['mapping%d' % (origin_id % 10)] + } + for origin_id in range(nb_rows) + ] + + idx_storage.revision_metadata_add(revision_metadata) + idx_storage.origin_intrinsic_metadata_add(origin_metadata) + + return [tool['id'] for tool in tools] + + +def _origins_in_task_args(tasks): + """Returns the set of origins contained in the arguments of the + provided tasks (assumed to be of type indexer_origin_metadata).""" + return reduce( + set.union, + (set(task['arguments']['args'][0]) for task in tasks), + set() + ) + + +def _assert_tasks_for_origins(tasks, origins): + expected_kwargs = {"policy_update": "update-dups", "parse_ids": False} + assert {task['type'] for task in tasks} == {'indexer_origin_metadata'} + assert all(len(task['arguments']['args']) == 1 for task in tasks) + assert all(task['arguments']['kwargs'] == expected_kwargs + for task in tasks) + assert _origins_in_task_args(tasks) == set(origins) + + +def invoke(scheduler, catch_exceptions, args): + runner = CliRunner() + with patch('swh.indexer.cli.get_scheduler') as get_scheduler_mock, \ + tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: + config_fd.write(CLI_CONFIG) + config_fd.seek(0) + get_scheduler_mock.return_value = scheduler + result = runner.invoke(cli, ['-C' + config_fd.name] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_empty_db( + indexer_scheduler, idx_storage, storage): + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + ]) + expected_output = ( + 'Nothing to do (no origin metadata matched the criteria).\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 0 + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_divisor( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + fill_idx_storage(idx_storage, 90) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (30 origins).\n' + 'Scheduled 6 tasks (60 origins).\n' + 'Scheduled 9 tasks (90 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 9 + _assert_tasks_for_origins(tasks, range(90)) + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_dry_run( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + fill_idx_storage(idx_storage, 90) + + result = invoke(indexer_scheduler, False, [ + 'schedule', '--dry-run', 'reindex_origin_metadata', + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (30 origins).\n' + 'Scheduled 6 tasks (60 origins).\n' + 'Scheduled 9 tasks (90 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 0 + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_nondivisor( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when neither origin_batch_size or + task_batch_size is a divisor of nb_origins.""" + fill_idx_storage(idx_storage, 70) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + '--batch-size', '20', + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (60 origins).\n' + 'Scheduled 4 tasks (70 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 4 + _assert_tasks_for_origins(tasks, range(70)) + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_filter_one_mapping( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + fill_idx_storage(idx_storage, 110) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + '--mapping', 'mapping1', + ]) + + # Check the output + expected_output = ( + 'Scheduled 2 tasks (11 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 2 + _assert_tasks_for_origins( + tasks, + [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_filter_two_mappings( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + fill_idx_storage(idx_storage, 110) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + '--mapping', 'mapping1', '--mapping', 'mapping2', + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (22 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 3 + _assert_tasks_for_origins( + tasks, + [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, + 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102]) + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_filter_one_tool( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + tool_ids = fill_idx_storage(idx_storage, 110) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + '--tool-id', str(tool_ids[0]), + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (30 origins).\n' + 'Scheduled 6 tasks (55 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 6 + _assert_tasks_for_origins( + tasks, + [x*2 for x in range(55)])