diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -184,7 +184,8 @@ and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they - use the same `group-id`. + use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` + environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -5,6 +5,7 @@ from collections import defaultdict import logging +import os import time from confluent_kafka import Consumer, KafkaException, KafkaError @@ -101,6 +102,10 @@ if debug_logging and 'debug' not in kwargs: kwargs['debug'] = 'consumer' + group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') + if group_instance_id: + kwargs['group.instance.id'] = group_instance_id + consumer_settings = { **kwargs, 'bootstrap.servers': ','.join(brokers),