diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -76,7 +76,7 @@ def __init__( self, brokers, group_id, prefix=None, object_types=None, max_messages=0, process_timeout=0, auto_offset_reset='earliest', - **kwargs): + stop_on_eof=False, **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: @@ -112,6 +112,10 @@ 'logger': rdkafka_logger, } + self.stop_on_eof = stop_on_eof + if self.stop_on_eof: + consumer_settings['enable.partition.eof'] = True + logger.debug('Consumer settings: %s', consumer_settings) self.consumer = Consumer(consumer_settings) @@ -127,6 +131,7 @@ self.max_messages = max_messages self.process_timeout = process_timeout + self.eof_reached = set() self._object_types = object_types @@ -171,9 +176,10 @@ if not messages: continue - # Process messages and add the successfully processed ones to the - # message counter. - nb_messages += self.handle_messages(messages, worker_fn) + nb_processed, at_eof = self.handle_messages(messages, worker_fn) + nb_messages += nb_processed + if at_eof: + break return nb_messages @@ -184,7 +190,12 @@ for message in messages: error = message.error() if error is not None: - _error_cb(error) + if error.code() == KafkaError._PARTITION_EOF: + self.eof_reached.add( + (message.topic(), message.partition()) + ) + else: + _error_cb(error) continue nb_processed += 1 @@ -199,7 +210,12 @@ worker_fn(dict(objects)) self.consumer.commit() - return nb_processed + at_eof = (self.stop_on_eof and all( + (tp.topic, tp.partition) in self.eof_reached + for tp in self.consumer.assignment() + )) + + return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -48,3 +48,38 @@ client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) + + +def test_client_eof( + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) + + rev = revisions().example() + + # Fill Kafka + producer.produce( + topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), + value=value_to_kafka(rev.to_dict()), + ) + producer.flush() + + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'group_id': 'replayer', + 'prefix': kafka_prefix, + 'stop_on_eof': True, + } + client = JournalClient(**config) + + worker_fn = MagicMock() + client.process(worker_fn) + + worker_fn.assert_called_once_with({'revision': [rev.to_dict()]})