diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -3,4 +3,4 @@ swh.objstorage >= 0.0.28 swh.scheduler >= 0.0.47 swh.storage >= 0.0.156 -swh.journal >= 0.0.17 +swh.journal >= 0.0.27 diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -193,12 +193,12 @@ help='Prefix of Kafka topic names to read from.') @click.option('--group-id', type=str, help='Consumer/group id for reading from Kafka.') -@click.option('--max-messages', '-m', default=None, type=int, +@click.option('--stop-after-objects', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.pass_context def journal_client(ctx, scheduler_url, origin_metadata_task_type, - brokers, prefix, group_id, max_messages): + brokers, prefix, group_id, stop_after_objects): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin-intrinsic-metadata) on these new objects.""" @@ -212,7 +212,7 @@ client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, object_types=['origin_visit'], - max_messages=max_messages, + stop_after_objects=stop_after_objects, ) worker_fn = functools.partial( @@ -225,7 +225,7 @@ nb_messages = 0 last_log_time = 0 try: - while not max_messages or nb_messages < max_messages: + while not stop_after_objects or nb_messages < stop_after_objects: nb_messages += client.process(worker_fn) if time.monotonic() - last_log_time >= 60: print('Processed %d messages.' % nb_messages) diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -338,7 +338,7 @@ return_value=consumer): result = invoke(indexer_scheduler, False, [ 'journal-client', - '--max-messages', '1', + '--stop-after-objects', '1', '--broker', '192.0.2.1', '--prefix', 'swh.journal.objects', '--group-id', 'test-consumer',