diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py index 56c7f88..c5244be 100644 --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,208 +1,177 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from swh.core import config from swh.scheduler import get_scheduler -from swh.scheduler.utils import create_task_dict +from swh.scheduler.cli_utils import schedule_origin_batches from swh.storage import get_storage from swh.indexer import metadata_dictionary from swh.indexer.storage import get_indexer_storage from swh.indexer.storage.api.server import load_and_check_config, app CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) -TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler - @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Indexer CLI interface """ ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj['config'] = conf def _get_api(getter, config, config_key, url): if url: config[config_key] = { 'cls': 'remote', 'args': {'url': url} } elif config_key not in config: raise click.ClickException( 'Missing configuration for {}'.format(config_key)) return getter(**config[config_key]) @cli.group('mapping') def mapping(): pass @mapping.command('list') def mapping_list(): """Prints the list of known mappings.""" mapping_names = [mapping.name for mapping in metadata_dictionary.MAPPINGS.values()] mapping_names.sort() for mapping_name in mapping_names: click.echo(mapping_name) @mapping.command('list-terms') @click.option('--exclude-mapping', multiple=True, help='Exclude the given mapping from the output') @click.option('--concise', is_flag=True, default=False, help='Don\'t print the list of mappings supporting each term.') def mapping_list_terms(concise, exclude_mapping): """Prints the list of known CodeMeta terms, and which mappings support them.""" properties = metadata_dictionary.list_terms() for (property_name, supported_mappings) in sorted(properties.items()): supported_mappings = {m.name for m in supported_mappings} supported_mappings -= set(exclude_mapping) if supported_mappings: if concise: click.echo(property_name) else: click.echo('{}:'.format(property_name)) click.echo('\t' + ', '.join(sorted(supported_mappings))) @cli.group('schedule') @click.option('--scheduler-url', '-s', default=None, help="URL of the scheduler API") @click.option('--indexer-storage-url', '-i', default=None, help="URL of the indexer storage API") @click.option('--storage-url', '-g', default=None, help="URL of the (graph) storage API") @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, - help='Default to list only what would be scheduled.') + help='List only what would be scheduled.') @click.pass_context def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, dry_run): """Manipulate indexer tasks via SWH Scheduler's API.""" ctx.obj['indexer_storage'] = _get_api( get_indexer_storage, ctx.obj['config'], 'indexer_storage', indexer_storage_url ) ctx.obj['storage'] = _get_api( get_storage, ctx.obj['config'], 'storage', storage_url ) ctx.obj['scheduler'] = _get_api( get_scheduler, ctx.obj['config'], 'scheduler', scheduler_url ) if dry_run: ctx.obj['scheduler'] = None def list_origins_by_producer(idx_storage, mappings, tool_ids): start = 0 limit = 10000 while True: origins = list( idx_storage.origin_intrinsic_metadata_search_by_producer( start=start, limit=limit, ids_only=True, mappings=mappings or None, tool_ids=tool_ids or None)) if not origins: break start = origins[-1]+1 yield from origins @schedule.command('reindex_origin_metadata') @click.option('--batch-size', '-b', 'origin_batch_size', default=10, show_default=True, type=int, help="Number of origins per task") @click.option('--tool-id', '-t', 'tool_ids', type=int, multiple=True, help="Restrict search of old metadata to this/these tool ids.") @click.option('--mapping', '-m', 'mappings', multiple=True, help="Mapping(s) that should be re-scheduled (eg. 'npm', " "'gemspec', 'maven')") @click.option('--task-type', default='indexer_origin_metadata', show_default=True, help="Name of the task type to schedule.") @click.pass_context def schedule_origin_metadata_reindex( - ctx, origin_batch_size, mappings, tool_ids, task_type): + ctx, origin_batch_size, tool_ids, mappings, task_type): """Schedules indexing tasks for origins that were already indexed.""" idx_storage = ctx.obj['indexer_storage'] scheduler = ctx.obj['scheduler'] origins = list_origins_by_producer(idx_storage, mappings, tool_ids) - kwargs = {"policy_update": "update-dups", "parse_ids": False} - nb_origins = 0 - nb_tasks = 0 - while True: - task_batch = [] - for _ in range(TASK_BATCH_SIZE): - # Group origins - origin_batch = [] - for (_, origin) in zip(range(origin_batch_size), origins): - origin_batch.append(origin) - nb_origins += len(origin_batch) - if not origin_batch: - break - - # Create a task for these origins - args = [origin_batch] - task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs) - task_batch.append(task_dict) - - # Schedule a batch of tasks - if not task_batch: - break - nb_tasks += len(task_batch) - if scheduler: - scheduler.create_tasks(task_batch) - click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins)) - - # Print final status. - if nb_tasks: - click.echo('Done.') - else: - click.echo('Nothing to do (no origin metadata matched the criteria).') + kwargs = {"policy_update": "update-dups", "parse_ids": False} + schedule_origin_batches( + scheduler, task_type, origins, origin_batch_size, kwargs) @cli.command('api-server') @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5007, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def api_server(config_path, host, port, debug): api_cfg = load_and_check_config(config_path, type='any') app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) def main(): return cli(auto_envvar_prefix='SWH_INDEXER') if __name__ == '__main__': main() diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index d58dd3d..d6aaf02 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,315 +1,315 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import reduce import re import tempfile from unittest.mock import patch from click.testing import CliRunner from swh.model.hashutil import hash_to_bytes from swh.indexer.cli import cli CLI_CONFIG = ''' scheduler: cls: foo args: {} storage: cls: memory args: {} indexer_storage: cls: memory args: {} ''' def fill_idx_storage(idx_storage, nb_rows): tools = [ { 'tool_name': 'tool %d' % i, 'tool_version': '0.0.1', 'tool_configuration': {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ { 'id': origin_id, 'from_revision': hash_to_bytes('abcd{:0>4}'.format(origin_id)), 'indexer_configuration_id': tools[origin_id % 2]['id'], 'metadata': {'name': 'origin %d' % origin_id}, 'mappings': ['mapping%d' % (origin_id % 10)] } for origin_id in range(nb_rows) ] revision_metadata = [ { 'id': hash_to_bytes('abcd{:0>4}'.format(origin_id)), 'indexer_configuration_id': tools[origin_id % 2]['id'], 'metadata': {'name': 'origin %d' % origin_id}, 'mappings': ['mapping%d' % (origin_id % 10)] } for origin_id in range(nb_rows) ] idx_storage.revision_intrinsic_metadata_add(revision_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool['id'] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type indexer_origin_metadata).""" return reduce( set.union, (set(task['arguments']['args'][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {"policy_update": "update-dups", "parse_ids": False} assert {task['type'] for task in tasks} == {'indexer_origin_metadata'} assert all(len(task['arguments']['args']) == 1 for task in tasks) assert all(task['arguments']['kwargs'] == expected_kwargs for task in tasks) assert _origins_in_task_args(tasks) == set(origins) def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch('swh.indexer.cli.get_scheduler') as get_scheduler_mock, \ tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler result = runner.invoke(cli, ['-C' + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_mapping_list(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list', ]) expected_output = '\n'.join([ 'codemeta', 'gemspec', 'maven', 'npm', 'pkg-info', '', ]) assert result.exit_code == 0, result.output assert result.output == expected_output def test_mapping_list_terms(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list-terms', ]) assert result.exit_code == 0, result.output assert re.search(r'http://schema.org/url:\n.*npm', result.output) assert re.search(r'http://schema.org/url:\n.*codemeta', result.output) assert re.search( r'https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta', result.output) def test_mapping_list_terms_exclude(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list-terms', '--exclude-mapping', 'codemeta' ]) assert result.exit_code == 0, result.output assert re.search(r'http://schema.org/url:\n.*npm', result.output) assert not re.search(r'http://schema.org/url:\n.*codemeta', result.output) assert not re.search( r'https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta', result.output) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_empty_db( indexer_scheduler, idx_storage, storage): result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', ]) expected_output = ( 'Nothing to do (no origin metadata matched the criteria).\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_divisor( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (60 origins).\n' 'Scheduled 9 tasks (90 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_dry_run( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke(indexer_scheduler, False, [ 'schedule', '--dry-run', 'reindex_origin_metadata', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (60 origins).\n' 'Scheduled 9 tasks (90 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_nondivisor( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--batch-size', '20', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (60 origins).\n' 'Scheduled 4 tasks (70 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_one_mapping( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--mapping', 'mapping1', ]) # Check the output expected_output = ( 'Scheduled 2 tasks (11 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins( tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_two_mappings( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--mapping', 'mapping1', '--mapping', 'mapping2', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (22 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102]) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_one_tool( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--tool-id', str(tool_ids[0]), ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (55 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins( tasks, [x*2 for x in range(55)])