diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -7,7 +7,7 @@ from swh.core import config from swh.scheduler import get_scheduler -from swh.scheduler.utils import create_task_dict +from swh.scheduler.cli_utils import schedule_origin_batches from swh.storage import get_storage from swh.indexer import metadata_dictionary @@ -17,8 +17,6 @@ CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) -TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler - @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, @@ -91,7 +89,7 @@ help="URL of the (graph) storage API") @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, - help='Default to list only what would be scheduled.') + help='List only what would be scheduled.') @click.pass_context def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, dry_run): @@ -146,45 +144,16 @@ help="Name of the task type to schedule.") @click.pass_context def schedule_origin_metadata_reindex( - ctx, origin_batch_size, mappings, tool_ids, task_type): + ctx, origin_batch_size, tool_ids, mappings, task_type): """Schedules indexing tasks for origins that were already indexed.""" idx_storage = ctx.obj['indexer_storage'] scheduler = ctx.obj['scheduler'] origins = list_origins_by_producer(idx_storage, mappings, tool_ids) - kwargs = {"policy_update": "update-dups", "parse_ids": False} - nb_origins = 0 - nb_tasks = 0 - while True: - task_batch = [] - for _ in range(TASK_BATCH_SIZE): - # Group origins - origin_batch = [] - for (_, origin) in zip(range(origin_batch_size), origins): - origin_batch.append(origin) - nb_origins += len(origin_batch) - if not origin_batch: - break - - # Create a task for these origins - args = [origin_batch] - task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs) - task_batch.append(task_dict) - - # Schedule a batch of tasks - if not task_batch: - break - nb_tasks += len(task_batch) - if scheduler: - scheduler.create_tasks(task_batch) - click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins)) - - # Print final status. - if nb_tasks: - click.echo('Done.') - else: - click.echo('Nothing to do (no origin metadata matched the criteria).') + kwargs = {"policy_update": "update-dups", "parse_ids": False} + schedule_origin_batches( + scheduler, task_type, origins, origin_batch_size, kwargs) @cli.command('api-server') diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -134,7 +134,7 @@ result.output) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_empty_db( indexer_scheduler, idx_storage, storage): result = invoke(indexer_scheduler, False, [ @@ -149,7 +149,7 @@ assert len(tasks) == 0 -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_divisor( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a @@ -176,7 +176,7 @@ _assert_tasks_for_origins(tasks, range(90)) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_dry_run( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a @@ -202,7 +202,7 @@ assert len(tasks) == 0 -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_nondivisor( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when neither origin_batch_size or @@ -229,7 +229,7 @@ _assert_tasks_for_origins(tasks, range(70)) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_one_mapping( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a @@ -257,7 +257,7 @@ [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_two_mappings( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a @@ -286,7 +286,7 @@ 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102]) -@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_one_tool( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a