diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -184,9 +184,16 @@ @pytest.fixture(scope='function') def kafka_prefix(): + """Pick a random prefix for kafka topics on each call""" return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) +@pytest.fixture(scope='function') +def kafka_consumer_group(kafka_prefix: str): + """Pick a random consumer group for kafka consumers on each call""" + return "test-consumer-%s" % kafka_prefix + + TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), @@ -213,7 +220,7 @@ def consumer( kafka_server: Tuple[Popen, int], test_config: Dict, - kafka_prefix: str, + kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. @@ -223,7 +230,7 @@ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'auto.offset.reset': 'earliest', 'enable.auto.commit': True, - 'group.id': "test-consumer-%s" % kafka_prefix, + 'group.id': kafka_consumer_group, }) kafka_topics = [ diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -74,6 +74,7 @@ def test_replay( storage, kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -102,7 +103,7 @@ result = invoke(False, [ 'replay', '--broker', '127.0.0.1:%d' % port, - '--group-id', 'test-cli-consumer', + '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, '--max-messages', '1', ]) @@ -167,6 +168,7 @@ objstorages, storage, kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -177,7 +179,7 @@ result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', 'test-cli-consumer', + '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, '--max-messages', str(NUM_CONTENTS), ]) @@ -195,6 +197,7 @@ objstorages, storage, kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): (_, kafka_port) = kafka_server @@ -209,7 +212,7 @@ result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', 'test-cli-consumer', + '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, '--max-messages', str(NUM_CONTENTS), ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) @@ -241,6 +244,7 @@ objstorages, storage, kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -257,7 +261,7 @@ result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', 'test-cli-consumer', + '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, '--max-messages', str(NUM_CONTENTS), '--exclude-sha1-file', fd.name, @@ -286,6 +290,7 @@ objstorages, storage, kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int], check_dst: bool, expected_copied: int, @@ -308,7 +313,7 @@ result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', 'test-cli-consumer', + '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, '--max-messages', str(NUM_CONTENTS), '--check-dst' if check_dst else '--no-check-dst', diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -17,6 +17,7 @@ def test_client( kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -38,7 +39,7 @@ config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': 'replayer', + 'group_id': kafka_consumer_group, 'prefix': kafka_prefix, 'max_messages': 1, } @@ -52,6 +53,7 @@ def test_client_eof( kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -73,7 +75,7 @@ config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': 'replayer', + 'group_id': kafka_consumer_group, 'prefix': kafka_prefix, 'stop_on_eof': True, } diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -35,6 +35,7 @@ def test_storage_play( kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -73,7 +74,7 @@ # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': 'replayer', + 'group_id': kafka_consumer_group, 'prefix': kafka_prefix, 'max_messages': nb_sent, }