diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -10,9 +10,11 @@ import string from confluent_kafka import Consumer +from confluent_kafka.admin import AdminClient, ConfigResource + from hypothesis.strategies import one_of from subprocess import Popen -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Optional, Tuple from pathlib import Path from pytest_kafka import ( @@ -197,9 +199,9 @@ os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) -kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', - kafka_config_template=KAFKA_CONFIG_TEMPLATE, - scope='session') +session_kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', + kafka_config_template=KAFKA_CONFIG_TEMPLATE, + scope='session') kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.WARN) @@ -217,6 +219,84 @@ return "test-consumer-%s" % kafka_prefix +@pytest.fixture(scope='session') +def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient: + return AdminClient({ + 'bootstrap.servers': 'localhost:%s' % session_kafka_server[1] + }) + + +@pytest.fixture(scope='function') +def kafka_server_config_overrides() -> Dict[str, str]: + return {} + + +@pytest.fixture(scope='function') +def kafka_server( + session_kafka_server: Tuple[Popen, int], + kafka_admin_client: AdminClient, + kafka_server_config_overrides: Dict[str, str], +) -> Iterator[Tuple[Popen, int]]: + # No overrides, we can just return the original broker connection + if not kafka_server_config_overrides: + yield session_kafka_server + return + + # This is the minimal operation that the kafka_admin_client gives to + # retrieve the cluster metadata, which we need to get the numeric id of the + # broker spawned by pytest_kafka. + metadata = kafka_admin_client.list_topics('__consumer_offsets') + broker_ids = [str(broker) for broker in metadata.brokers.keys()] + assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!" + + # Pull the current broker configuration. describe_configs and alter_configs + # generate a dict containing one concurrent.future per queried + # ConfigResource, hence the use of .result() + broker = ConfigResource('broker', broker_ids[0]) + futures = kafka_admin_client.describe_configs([broker]) + original_config = futures[broker].result() + + # Gather the list of settings we need to change in the broker + # ConfigResource, and their original values in the to_restore dict + to_restore = {} + for key, new_value in kafka_server_config_overrides.items(): + if key not in original_config: + raise ValueError(f"Cannot override unknown configuration {key}") + orig_value = original_config[key].value + if orig_value == new_value: + continue + if original_config[key].is_read_only: + raise ValueError(f"Cannot override read-only configuration {key}") + + broker.set_config(key, new_value) + to_restore[key] = orig_value + + # to_restore will be empty if all the config "overrides" are equal to the + # original value. No need to wait for a config alteration if that's the + # case. The result() will raise a KafkaException if the settings change + # failed. + if to_restore: + futures = kafka_admin_client.alter_configs([broker]) + try: + futures[broker].result() + except Exception: + raise + + yield session_kafka_server + + # Now we can restore the old setting values. Again, the result() will raise + # a KafkaException if the settings change failed. + if to_restore: + for key, orig_value in to_restore.items(): + broker.set_config(key, orig_value) + + futures = kafka_admin_client.alter_configs([broker]) + try: + futures[broker].result() + except Exception: + raise + + TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(),