diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -10,9 +10,11 @@
 import string
 
 from confluent_kafka import Consumer
+from confluent_kafka.admin import AdminClient, ConfigResource
+
 from hypothesis.strategies import one_of
 from subprocess import Popen
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, Iterator, List, Optional, Tuple
 
 from pathlib import Path
 from pytest_kafka import (
@@ -197,9 +199,9 @@
 os.environ['KAFKA_LOG4J_OPTS'] = \
     '-Dlog4j.configuration=file:%s/log4j.properties' % \
     os.path.dirname(__file__)
-kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc',
-                                 kafka_config_template=KAFKA_CONFIG_TEMPLATE,
-                                 scope='session')
+session_kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc',
+                                         kafka_config_template=KAFKA_CONFIG_TEMPLATE,
+                                         scope='session')
 
 kafka_logger = logging.getLogger('kafka')
 kafka_logger.setLevel(logging.WARN)
@@ -217,6 +219,84 @@
     return "test-consumer-%s" % kafka_prefix
 
 
+@pytest.fixture(scope='session')
+def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient:
+    return AdminClient({
+        'bootstrap.servers': 'localhost:%s' % session_kafka_server[1]
+    })
+
+
+@pytest.fixture(scope='function')
+def kafka_server_config_overrides() -> Dict[str, str]:
+    return {}
+
+
+@pytest.fixture(scope='function')
+def kafka_server(
+        session_kafka_server: Tuple[Popen, int],
+        kafka_admin_client: AdminClient,
+        kafka_server_config_overrides: Dict[str, str],
+) -> Iterator[Tuple[Popen, int]]:
+    # No overrides, we can just return the original broker connection
+    if not kafka_server_config_overrides:
+        yield session_kafka_server
+        return
+
+    # This is the minimal operation that the kafka_admin_client gives to
+    # retrieve the cluster metadata, which we need to get the numeric id of the
+    # broker spawned by pytest_kafka.
+    metadata = kafka_admin_client.list_topics('__consumer_offsets')
+    broker_ids = [str(broker) for broker in metadata.brokers.keys()]
+    assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!"
+
+    # Pull the current broker configuration. describe_configs and alter_configs
+    # generate a dict containing one concurrent.future per queried
+    # ConfigResource, hence the use of .result()
+    broker = ConfigResource('broker', broker_ids[0])
+    futures = kafka_admin_client.describe_configs([broker])
+    original_config = futures[broker].result()
+
+    # Gather the list of settings we need to change in the broker
+    # ConfigResource, and their original values in the to_restore dict
+    to_restore = {}
+    for key, new_value in kafka_server_config_overrides.items():
+        if key not in original_config:
+            raise ValueError(f"Cannot override unknown configuration {key}")
+        orig_value = original_config[key].value
+        if orig_value == new_value:
+            continue
+        if original_config[key].is_read_only:
+            raise ValueError(f"Cannot override read-only configuration {key}")
+
+        broker.set_config(key, new_value)
+        to_restore[key] = orig_value
+
+    # to_restore will be empty if all the config "overrides" are equal to the
+    # original value. No need to wait for a config alteration if that's the
+    # case. The result() will raise a KafkaException if the settings change
+    # failed.
+    if to_restore:
+        futures = kafka_admin_client.alter_configs([broker])
+        try:
+            futures[broker].result()
+        except Exception:
+            raise
+
+    yield session_kafka_server
+
+    # Now we can restore the old setting values. Again, the result() will raise
+    # a KafkaException if the settings change failed.
+    if to_restore:
+        for key, orig_value in to_restore.items():
+            broker.set_config(key, orig_value)
+
+        futures = kafka_admin_client.alter_configs([broker])
+        try:
+            futures[broker].result()
+        except Exception:
+            raise
+
+
 TEST_CONFIG = {
     'consumer_id': 'swh.journal.consumer',
     'object_types': OBJECT_TYPE_KEYS.keys(),