Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346048
D2751.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
D2751.diff
View Options
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -184,7 +184,8 @@
and retrieving objects from an existing source ObjStorage.
There can be several 'replayers' filling a given ObjStorage as long as they
- use the same `group-id`.
+ use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID`
+ environment variable to use KIP-345 static group membership.
This service retrieves object ids to copy from the 'content' topic. It will
only copy object's content if the object's description in the kafka
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -5,6 +5,7 @@
from collections import defaultdict
import logging
+import os
import time
from confluent_kafka import Consumer, KafkaException, KafkaError
@@ -101,6 +102,35 @@
if debug_logging and 'debug' not in kwargs:
kwargs['debug'] = 'consumer'
+ # Static group instance id management
+ group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID')
+ if group_instance_id:
+ kwargs['group.instance.id'] = group_instance_id
+
+ if 'group.instance.id' in kwargs:
+ # When doing static consumer group membership, set a higher default
+ # session timeout. The session timeout is the duration after which
+ # the broker considers that a consumer has left the consumer group
+ # for good, and triggers a rebalance. Considering our current
+ # processing pattern, 10 minutes gives the consumer ample time to
+ # restart before that happens.
+ if 'session.timeout.ms' not in kwargs:
+ kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes
+
+ if 'session.timeout.ms' in kwargs:
+ # When the session timeout is set, rdkafka requires the max poll
+ # interval to be set to a higher value; the max poll interval is
+ # rdkafka's way of figuring out whether the client's message
+ # processing thread has stalled: when the max poll interval lapses
+ # between two calls to consumer.poll(), rdkafka leaves the consumer
+ # group and terminates the connection to the brokers.
+ #
+ # We default to 1.5 times the session timeout
+ if 'max.poll.interval.ms' not in kwargs:
+ kwargs['max.poll.interval.ms'] = (
+ kwargs['session.timeout.ms'] // 2 * 3
+ )
+
consumer_settings = {
**kwargs,
'bootstrap.servers': ','.join(brokers),
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -56,13 +56,15 @@
yield storage
-def invoke(catch_exceptions, args):
+def invoke(catch_exceptions, args, env=None):
runner = CliRunner()
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd:
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
args = ['-C' + config_fd.name] + args
- result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG})
+ result = runner.invoke(
+ cli, args, obj={'log_level': logging.DEBUG}, env=env,
+ )
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
@@ -185,6 +187,52 @@
assert objstorages['dst'].get(sha1) == content
+@_patch_objstorages(['src', 'dst'])
+def test_replay_content_static_group_id(
+ objstorages,
+ storage,
+ kafka_prefix: str,
+ kafka_server: Tuple[Popen, int],
+ caplog):
+ (_, kafka_port) = kafka_server
+ kafka_prefix += '.swh.journal.objects'
+
+ contents = _fill_objstorage_and_kafka(
+ kafka_port, kafka_prefix, objstorages)
+
+ # Setup log capture to fish the consumer settings out of the log messages
+ caplog.set_level(logging.DEBUG, 'swh.journal.client')
+
+ result = invoke(False, [
+ 'content-replay',
+ '--broker', '127.0.0.1:%d' % kafka_port,
+ '--group-id', 'test-cli-consumer',
+ '--prefix', kafka_prefix,
+ '--max-messages', '10',
+ ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'})
+ expected = r'Done.\n'
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ consumer_settings = None
+ for record in caplog.records:
+ if 'Consumer settings' in record.message:
+ consumer_settings = record.args
+ break
+
+ assert consumer_settings is not None, (
+ 'Failed to get consumer settings out of the consumer log. '
+ 'See log capture for details.'
+ )
+ assert consumer_settings['group.instance.id'] == 'static-group-instance-id'
+ assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000
+ assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000
+
+ for (sha1, content) in contents.items():
+ assert sha1 in objstorages['dst'], sha1
+ assert objstorages['dst'].get(sha1) == content
+
+
@_patch_objstorages(['src', 'dst'])
def test_replay_content_exclude(
objstorages,
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:41 PM (2 w, 17 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223120
Attached To
D2751: Add support for the static consumer group feature to journal client
Event Timeline
Log In to Comment