diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -184,7 +184,8 @@ and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they - use the same `group-id`. + use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` + environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -5,6 +5,7 @@ from collections import defaultdict import logging +import os import time from confluent_kafka import Consumer, KafkaException, KafkaError @@ -101,6 +102,35 @@ if debug_logging and 'debug' not in kwargs: kwargs['debug'] = 'consumer' + # Static group instance id management + group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') + if group_instance_id: + kwargs['group.instance.id'] = group_instance_id + + if 'group.instance.id' in kwargs: + # When doing static consumer group membership, set a higher default + # session timeout. The session timeout is the duration after which + # the broker considers that a consumer has left the consumer group + # for good, and triggers a rebalance. Considering our current + # processing pattern, 10 minutes gives the consumer ample time to + # restart before that happens. + if 'session.timeout.ms' not in kwargs: + kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes + + if 'session.timeout.ms' in kwargs: + # When the session timeout is set, rdkafka requires the max poll + # interval to be set to a higher value; the max poll interval is + # rdkafka's way of figuring out whether the client's message + # processing thread has stalled: when the max poll interval lapses + # between two calls to consumer.poll(), rdkafka leaves the consumer + # group and terminates the connection to the brokers. + # + # We default to 1.5 times the session timeout + if 'max.poll.interval.ms' not in kwargs: + kwargs['max.poll.interval.ms'] = ( + kwargs['session.timeout.ms'] // 2 * 3 + ) + consumer_settings = { **kwargs, 'bootstrap.servers': ','.join(brokers), diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -56,13 +56,15 @@ yield storage -def invoke(catch_exceptions, args): +def invoke(catch_exceptions, args, env=None): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args - result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG}) + result = runner.invoke( + cli, args, obj={'log_level': logging.DEBUG}, env=env, + ) if not catch_exceptions and result.exception: print(result.output) raise result.exception @@ -185,6 +187,52 @@ assert objstorages['dst'].get(sha1) == content +@_patch_objstorages(['src', 'dst']) +def test_replay_content_static_group_id( + objstorages, + storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int], + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + # Setup log capture to fish the consumer settings out of the log messages + caplog.set_level(logging.DEBUG, 'swh.journal.client') + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '10', + ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + consumer_settings = None + for record in caplog.records: + if 'Consumer settings' in record.message: + consumer_settings = record.args + break + + assert consumer_settings is not None, ( + 'Failed to get consumer settings out of the consumer log. ' + 'See log capture for details.' + ) + assert consumer_settings['group.instance.id'] == 'static-group-instance-id' + assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000 + assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000 + + for (sha1, content) in contents.items(): + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content + + @_patch_objstorages(['src', 'dst']) def test_replay_content_exclude( objstorages,