Page MenuHomeSoftware Heritage

D2751.diff
No OneTemporary

D2751.diff

diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -184,7 +184,8 @@
and retrieving objects from an existing source ObjStorage.
There can be several 'replayers' filling a given ObjStorage as long as they
- use the same `group-id`.
+ use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID`
+ environment variable to use KIP-345 static group membership.
This service retrieves object ids to copy from the 'content' topic. It will
only copy object's content if the object's description in the kafka
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -5,6 +5,7 @@
from collections import defaultdict
import logging
+import os
import time
from confluent_kafka import Consumer, KafkaException, KafkaError
@@ -101,6 +102,35 @@
if debug_logging and 'debug' not in kwargs:
kwargs['debug'] = 'consumer'
+ # Static group instance id management
+ group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID')
+ if group_instance_id:
+ kwargs['group.instance.id'] = group_instance_id
+
+ if 'group.instance.id' in kwargs:
+ # When doing static consumer group membership, set a higher default
+ # session timeout. The session timeout is the duration after which
+ # the broker considers that a consumer has left the consumer group
+ # for good, and triggers a rebalance. Considering our current
+ # processing pattern, 10 minutes gives the consumer ample time to
+ # restart before that happens.
+ if 'session.timeout.ms' not in kwargs:
+ kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes
+
+ if 'session.timeout.ms' in kwargs:
+ # When the session timeout is set, rdkafka requires the max poll
+ # interval to be set to a higher value; the max poll interval is
+ # rdkafka's way of figuring out whether the client's message
+ # processing thread has stalled: when the max poll interval lapses
+ # between two calls to consumer.poll(), rdkafka leaves the consumer
+ # group and terminates the connection to the brokers.
+ #
+ # We default to 1.5 times the session timeout
+ if 'max.poll.interval.ms' not in kwargs:
+ kwargs['max.poll.interval.ms'] = (
+ kwargs['session.timeout.ms'] // 2 * 3
+ )
+
consumer_settings = {
**kwargs,
'bootstrap.servers': ','.join(brokers),
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -56,13 +56,15 @@
yield storage
-def invoke(catch_exceptions, args):
+def invoke(catch_exceptions, args, env=None):
runner = CliRunner()
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd:
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
args = ['-C' + config_fd.name] + args
- result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG})
+ result = runner.invoke(
+ cli, args, obj={'log_level': logging.DEBUG}, env=env,
+ )
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
@@ -185,6 +187,52 @@
assert objstorages['dst'].get(sha1) == content
+@_patch_objstorages(['src', 'dst'])
+def test_replay_content_static_group_id(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_server: Tuple[Popen, int],
+ caplog):
+ (_, kafka_port) = kafka_server
+ kafka_prefix += '.swh.journal.objects'
+
+ contents = _fill_objstorage_and_kafka(
+ kafka_port, kafka_prefix, objstorages)
+
+ # Setup log capture to fish the consumer settings out of the log messages
+ caplog.set_level(logging.DEBUG, 'swh.journal.client')
+
+ result = invoke(False, [
+ 'content-replay',
+ '--broker', '127.0.0.1:%d' % kafka_port,
+ '--group-id', 'test-cli-consumer',
+ '--prefix', kafka_prefix,
+ '--max-messages', '10',
+ ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'})
+ expected = r'Done.\n'
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ consumer_settings = None
+ for record in caplog.records:
+ if 'Consumer settings' in record.message:
+ consumer_settings = record.args
+ break
+
+ assert consumer_settings is not None, (
+ 'Failed to get consumer settings out of the consumer log. '
+ 'See log capture for details.'
+ )
+ assert consumer_settings['group.instance.id'] == 'static-group-instance-id'
+ assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000
+ assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages['dst'], sha1
+ assert objstorages['dst'].get(sha1) == content
+
+
@_patch_objstorages(['src', 'dst'])
def test_replay_content_exclude(
objstorages,

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:41 PM (2 w, 17 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223120

Event Timeline