diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -111,12 +111,12 @@ for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] logger.debug('num: %s, object_type: %s, message: %s' % ( - num, object_type, message)) + num+1, object_type, message)) messages[object_type].append(message.value) if num + 1 >= self.max_messages: break - logger.debug('number of messages: %s', num) + logger.debug('number of messages: %s', num+1) new_objects = self.process_objects(messages) self.produce_messages(new_objects) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -218,7 +218,7 @@ value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, - client_id=test_config['publisher_id']) + group_id="test-consumer") _, kafka_port = request.getfixturevalue('kafka_server') @@ -232,6 +232,13 @@ *kafka_topics, **used_consumer_kwargs, ) + + # Enforce auto_offset_reset=earliest even if the consumer was created + # too soon wrt the server. + while len(consumer.assignment()) == 0: + consumer.poll(timeout_ms=20) + consumer.seek_to_beginning() + return consumer diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py --- a/swh/journal/tests/test_publisher_kafka.py +++ b/swh/journal/tests/test_publisher_kafka.py @@ -44,14 +44,12 @@ nb_messages = len(objects) - # publisher should poll 1 message and send 1 reified object - publisher.poll(max_messages=nb_messages) + for _ in range(nb_messages): + publisher.poll(max_messages=1) # then (client reads from the messages from output topic) - msgs = [] + num = -1 for num, msg in enumerate(consumer_from_publisher): - msgs.append((msg.topic, msg.key, msg.value)) - expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) assert expected_topic == msg.topic @@ -63,6 +61,8 @@ expected_value = kafka_to_value(value_to_kafka(expected_objects[num])) assert expected_value == msg.value + assert num + 1 == len(expected_objects) + def test_publish( publisher: JournalPublisher,