diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -60,6 +60,7 @@ ]), # Number of messages to batch process 'max_messages': ('int', 100), + 'consumer_timeout_ms': ('int', 5000), 'auto_offset_reset': ('str', 'earliest') } @@ -88,6 +89,8 @@ 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) + self.max_messages = self.config['max_messages'] + self.consumer = KafkaConsumer( bootstrap_servers=self.config['brokers'], key_deserializer=kafka_to_key, @@ -95,6 +98,8 @@ auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=self.config['consumer_id'], + max_poll_records=self.max_messages, + consumer_timeout_ms=self.config['consumer_timeout_ms'], ) self.consumer.subscribe( @@ -102,8 +107,6 @@ for object_type in object_types], ) - self.max_messages = self.config['max_messages'] - def process(self): """Main entry point to process event message reception. @@ -117,8 +120,9 @@ if num + 1 >= self.max_messages: break - self.process_objects(messages) - self.consumer.commit() + if messages: + self.process_objects(messages) + self.consumer.commit() # Override the following method in the sub-classes diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -34,10 +34,10 @@ def __init__(self, config): self.log = logging.getLogger(__name__) self.config = config + self.max_messages = self.config['max_messages'] self.check_config(config) self._prepare_storage(config) self._prepare_journal(config) - self.max_messages = self.config['max_messages'] def check_config(self, config): """Check the configuration is fine. @@ -60,6 +60,11 @@ actually be able to discuss with the journal. """ + + consumer_timeout_ms = 5000 + if 'consumer_timeout_ms' in self.config: + consumer_timeout_ms = self.config['consumer_timeout_ms'] + # yes, the temporary topics contain values that are actually _keys_ self.consumer = KafkaConsumer( bootstrap_servers=config['brokers'], @@ -67,6 +72,8 @@ auto_offset_reset='earliest', enable_auto_commit=False, group_id=config['consumer_id'], + max_poll_records=self.max_messages, + consumer_timeout_ms=consumer_timeout_ms, ) self.producer = KafkaProducer( @@ -116,11 +123,11 @@ if num + 1 >= self.max_messages: break - self.log.debug('number of messages: %s', num+1) - - new_objects = self.process_objects(messages) - self.produce_messages(new_objects) - self.consumer.commit() + if messages: + self.log.debug('number of messages: %s', num+1) + new_objects = self.process_objects(messages) + self.produce_messages(new_objects) + self.consumer.commit() def process_objects(self, messages): """Given a dict of messages {object type: [object id]}, reify those