diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -57,6 +57,7 @@ ]), # Number of messages to batch process 'max_messages': ('int', 100), + 'consumer_timeout_ms': ('int', 5000), 'auto_offset_reset': ('str', 'earliest') } @@ -83,6 +84,8 @@ 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) + self.max_messages = self.config['max_messages'] + self.consumer = KafkaConsumer( bootstrap_servers=self.config['brokers'], key_deserializer=kafka_to_key, @@ -90,6 +93,8 @@ auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=self.config['consumer_id'], + max_poll_records=self.max_messages, + consumer_timeout_ms=self.config['consumer_timeout_ms'], ) self.consumer.subscribe( @@ -97,8 +102,6 @@ for object_type in object_types], ) - self.max_messages = self.config['max_messages'] - def process(self): """Main entry point to process event message reception. @@ -112,8 +115,9 @@ if num + 1 >= self.max_messages: break - self.process_objects(messages) - self.consumer.commit() + if messages: + self.process_objects(messages) + self.consumer.commit() # Override the following method in the sub-classes