diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -7,7 +7,6 @@ import logging import mmap import os -import time import click @@ -67,8 +66,8 @@ @cli.command() -@click.option('--max-messages', '-m', default=None, type=int, - help='Maximum number of objects to replay. Default is to ' +@click.option('--stop-after-objects', '-n', default=None, type=int, + help='Stop after processing this many objects. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to. ' @@ -80,13 +79,12 @@ help='Name of the group id for reading from Kafka. ' '(deprecated, use the config file instead)') @click.pass_context -def replay(ctx, brokers, prefix, group_id, max_messages): +def replay(ctx, brokers, prefix, group_id, stop_after_objects): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ - logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) @@ -95,23 +93,14 @@ client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, - max_messages=max_messages) + stop_after_objects=stop_after_objects) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify('READY=1') try: - nb_messages = 0 - last_log_time = 0 - while not max_messages or nb_messages < max_messages: - nb_messages += client.process(worker_fn) - if notify: - notify('WATCHDOG=1') - if time.time() - last_log_time >= 60: - # Log at most once per minute. - logger.info('Processed %d messages.' % nb_messages) - last_log_time = time.time() + client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: @@ -163,8 +152,8 @@ @cli.command('content-replay') -@click.option('--max-messages', '-m', default=None, type=int, - help='Maximum number of objects to replay. Default is to ' +@click.option('--stop-after-objects', '-n', default=None, type=int, + help='Stop after processing this many objects. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.' @@ -181,7 +170,7 @@ help='Check whether the destination contains the object before ' 'copying.') @click.pass_context -def content_replay(ctx, max_messages, +def content_replay(ctx, stop_after_objects, brokers, prefix, group_id, exclude_sha1_file, check_dst): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. @@ -205,7 +194,6 @@ ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ - logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) @@ -232,7 +220,7 @@ client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, - max_messages=max_messages, object_types=('content',)) + stop_after_objects=stop_after_objects, object_types=('content',)) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, @@ -242,16 +230,7 @@ notify('READY=1') try: - nb_messages = 0 - last_log_time = 0 - while not max_messages or nb_messages < max_messages: - nb_messages += client.process(worker_fn) - if notify: - notify('WATCHDOG=1') - if time.time() - last_log_time >= 60: - # Log at most once per minute. - logger.info('Processed %d messages.' % nb_messages) - last_log_time = time.time() + client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -58,9 +58,9 @@ The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each - object type using a specific topic under that prefix. If the 'prefix' + object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value - 'swh.journal.objects'. + `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted object types). @@ -69,8 +69,17 @@ value across instances. The journal will share the message throughput across the nodes sharing the same group_id. - Messages are processed by the `worker_fn` callback passed to the - `process` method, in batches of maximum `max_messages`. + Messages are processed by the `worker_fn` callback passed to the `process` + method, in batches of maximum 20 messages (currently hardcoded). If set, + the processing stops after processing `stop_after_objects` messages in + total. + + `stop_on_eof` stops the processing when the client has reached the end of + each partition in turn. + + `auto_offset_reset` sets the behavior of the client when the consumer group + initializes: `'earliest'` (the default) processes all objects since the + inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). @@ -81,7 +90,7 @@ group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, - max_messages: Optional[int] = None, + stop_after_objects: Optional[int] = None, process_timeout: Optional[float] = None, auto_offset_reset: str = 'earliest', stop_on_eof: bool = False, @@ -168,7 +177,7 @@ self.consumer.subscribe(topics=topics) - self.max_messages = max_messages + self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() @@ -184,7 +193,7 @@ argument. """ start_time = time.monotonic() - nb_messages = 0 + total_objects_processed = 0 while True: # timeout for message poll @@ -203,24 +212,29 @@ timeout = self.process_timeout - elapsed - num_messages = 20 + batch_size = 20 - if self.max_messages: - if nb_messages >= self.max_messages: + if self.stop_after_objects: + if total_objects_processed >= self.stop_after_objects: break - num_messages = min(num_messages, self.max_messages-nb_messages) + + # clamp batch size to avoid overrunning stop_after_objects + batch_size = min( + self.stop_after_objects-total_objects_processed, + batch_size, + ) messages = self.consumer.consume( - timeout=timeout, num_messages=num_messages) + timeout=timeout, num_messages=batch_size) if not messages: continue - nb_processed, at_eof = self.handle_messages(messages, worker_fn) - nb_messages += nb_processed + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + total_objects_processed += batch_processed if at_eof: break - return nb_messages + return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -197,7 +197,7 @@ TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), - 'max_messages': 1, # will read 1 message and stops + 'stop_after_objects': 1, # will read 1 object and stop 'storage': {'cls': 'memory', 'args': {}}, } diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -115,7 +115,7 @@ '--broker', '127.0.0.1:%d' % port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', '1', + '--stop-after-objects', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output @@ -191,7 +191,7 @@ '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output @@ -225,7 +225,7 @@ '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output @@ -264,7 +264,7 @@ '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) expected = r'Done.\n' assert result.exit_code == 0, result.output @@ -313,7 +313,7 @@ '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--exclude-sha1-file', fd.name, ]) expected = r'Done.\n' @@ -365,7 +365,7 @@ '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--check-dst' if check_dst else '--no-check-dst', ]) expected = r'Done.\n' @@ -450,7 +450,7 @@ '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--check-dst', ]) expected = r'Done.\n' @@ -524,7 +524,7 @@ '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output @@ -585,7 +585,7 @@ '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -41,7 +41,7 @@ brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, - max_messages=1, + stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) @@ -75,7 +75,7 @@ brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, - max_messages=1, + stop_after_objects=None, stop_on_eof=True, ) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -76,7 +76,7 @@ brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, - max_messages=nb_sent, + stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 @@ -146,14 +146,13 @@ writer.send('origin_visit', 'foo', visit) queue_size = len(queue) - assert replayer.max_messages is None - replayer.max_messages = queue_size + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get('http://example.com/')) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -82,14 +82,13 @@ queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - assert replayer.max_messages is None - replayer.max_messages = queue_size + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage2) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) assert replayer.consumer.committed @@ -137,8 +136,8 @@ queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - assert replayer.max_messages is None - replayer.max_messages = queue_size + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) @@ -148,9 +147,8 @@ worker_fn = functools.partial(process_replay_objects_content, src=objstorage1, dst=objstorage2) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -70,6 +70,6 @@ self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None - self.max_messages = None + self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False