diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
index 779bcb8..92f4c7a 100644
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,289 +1,369 @@
 # Copyright (C) 2019 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import os
 import pytest
 import logging
 import random
 import string
 
 from confluent_kafka import Consumer
+from confluent_kafka.admin import AdminClient, ConfigResource
+
 from hypothesis.strategies import one_of
 from subprocess import Popen
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, Iterator, List, Optional, Tuple
 
 from pathlib import Path
 from pytest_kafka import (
     make_zookeeper_process, make_kafka_server, KAFKA_SERVER_CONFIG_TEMPLATE,
     ZOOKEEPER_CONFIG_TEMPLATE,
 )
 
 from swh.model import hypothesis_strategies as strategies
 from swh.model.hashutil import MultiHash, hash_to_bytes
 
 
 logger = logging.getLogger(__name__)
 
 CONTENTS = [
     {
         **MultiHash.from_data(b'foo').digest(),
         'length': 3,
         'status': 'visible',
     },
 ]
 
 duplicate_content1 = {
     'length': 4,
     'sha1': hash_to_bytes(
         '44973274ccef6ab4dfaaf86599792fa9c3fe4689'),
     'sha1_git': b'another-foo',
     'blake2s256': b'another-bar',
     'sha256': b'another-baz',
     'status': 'visible',
 }
 
 # Craft a sha1 collision
 duplicate_content2 = duplicate_content1.copy()
 sha1_array = bytearray(duplicate_content1['sha1_git'])
 sha1_array[0] += 1
 duplicate_content2['sha1_git'] = bytes(sha1_array)
 
 
 DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
 
 
 COMMITTERS = [
     {
         'fullname': b'foo',
         'name': b'foo',
         'email': b'',
     },
     {
         'fullname': b'bar',
         'name': b'bar',
         'email': b'',
     }
 ]
 
 DATES = [
     {
         'timestamp': {
             'seconds': 1234567891,
             'microseconds': 0,
         },
         'offset': 120,
         'negative_utc': False,
     },
     {
         'timestamp': {
             'seconds': 1234567892,
             'microseconds': 0,
         },
         'offset': 120,
         'negative_utc': False,
     }
 ]
 
 REVISIONS = [
     {
         'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'),
         'message': b'hello',
         'date': DATES[0],
         'committer': COMMITTERS[0],
         'author': COMMITTERS[0],
         'committer_date': DATES[0],
         'type': 'git',
         'directory': b'\x01' * 20,
         'synthetic': False,
         'metadata': None,
         'parents': [],
     },
     {
         'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'),
         'message': b'hello again',
         'date': DATES[1],
         'committer': COMMITTERS[1],
         'author': COMMITTERS[1],
         'committer_date': DATES[1],
         'type': 'hg',
         'directory': b'\x02' * 20,
         'synthetic': False,
         'metadata': None,
         'parents': [],
     },
 ]
 
 RELEASES = [
     {
         'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
         'name': b'v0.0.1',
         'date': {
             'timestamp': {
                 'seconds': 1234567890,
                 'microseconds': 0,
             },
             'offset': 120,
             'negative_utc': False,
         },
         'author': COMMITTERS[0],
         'target_type': 'revision',
         'target': b'\x04' * 20,
         'message': b'foo',
         'synthetic': False,
     },
 ]
 
 ORIGINS = [
     {
         'url': 'https://somewhere.org/den/fox',
     },
     {
         'url': 'https://overtherainbow.org/fox/den',
     }
 ]
 
 ORIGIN_VISITS = [
     {
         'origin': ORIGINS[0]['url'],
         'date': '2013-05-07 04:20:39.369271+00:00',
         'snapshot': None,  # TODO
         'status': 'ongoing',  # TODO
         'metadata': {'foo': 'bar'},
         'type': 'git',
     },
     {
         'origin': ORIGINS[0]['url'],
         'date': '2018-11-27 17:20:39+00:00',
         'snapshot': None,  # TODO
         'status': 'ongoing',  # TODO
         'metadata': {'baz': 'qux'},
         'type': 'git',
     }
 ]
 
 # From type to tuple (id, <objects instances to test>)
 OBJECT_TYPE_KEYS = {
     'content': ('sha1', CONTENTS),
     'revision': ('id', REVISIONS),
     'release': ('id', RELEASES),
     'origin': (None, ORIGINS),
     'origin_visit': (None, ORIGIN_VISITS),
 }  # type: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]]
 
 
 KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT')
 KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka'
 if not os.path.exists(KAFKA_ROOT):
     msg = ('Development error: %s must exist and target an '
            'existing kafka installation' % KAFKA_ROOT)
     raise ValueError(msg)
 
 KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin'
 
 KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh')
 ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh')
 
 ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + '\nadmin.enableServer=false\n'
 KAFKA_CONFIG_TEMPLATE = (
     KAFKA_SERVER_CONFIG_TEMPLATE + '\nmessage.max.bytes=104857600\n'
 )
 
 # Those defines fixtures
 zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN,
                                         zk_config_template=ZK_CONFIG_TEMPLATE,
                                         scope='session')
 os.environ['KAFKA_LOG4J_OPTS'] = \
     '-Dlog4j.configuration=file:%s/log4j.properties' % \
     os.path.dirname(__file__)
-kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc',
-                                 kafka_config_template=KAFKA_CONFIG_TEMPLATE,
-                                 scope='session')
+session_kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc',
+                                         kafka_config_template=KAFKA_CONFIG_TEMPLATE,
+                                         scope='session')
 
 kafka_logger = logging.getLogger('kafka')
 kafka_logger.setLevel(logging.WARN)
 
 
 @pytest.fixture(scope='function')
 def kafka_prefix():
     """Pick a random prefix for kafka topics on each call"""
     return ''.join(random.choice(string.ascii_lowercase) for _ in range(10))
 
 
 @pytest.fixture(scope='function')
 def kafka_consumer_group(kafka_prefix: str):
     """Pick a random consumer group for kafka consumers on each call"""
     return "test-consumer-%s" % kafka_prefix
 
 
+@pytest.fixture(scope='session')
+def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient:
+    return AdminClient({
+        'bootstrap.servers': 'localhost:%s' % session_kafka_server[1]
+    })
+
+
+@pytest.fixture(scope='function')
+def kafka_server_config_overrides() -> Dict[str, str]:
+    return {}
+
+
+@pytest.fixture(scope='function')
+def kafka_server(
+        session_kafka_server: Tuple[Popen, int],
+        kafka_admin_client: AdminClient,
+        kafka_server_config_overrides: Dict[str, str],
+) -> Iterator[Tuple[Popen, int]]:
+    # No overrides, we can just return the original broker connection
+    if not kafka_server_config_overrides:
+        yield session_kafka_server
+        return
+
+    # This is the minimal operation that the kafka_admin_client gives to
+    # retrieve the cluster metadata, which we need to get the numeric id of the
+    # broker spawned by pytest_kafka.
+    metadata = kafka_admin_client.list_topics('__consumer_offsets')
+    broker_ids = [str(broker) for broker in metadata.brokers.keys()]
+    assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!"
+
+    # Pull the current broker configuration. describe_configs and alter_configs
+    # generate a dict containing one concurrent.future per queried
+    # ConfigResource, hence the use of .result()
+    broker = ConfigResource('broker', broker_ids[0])
+    futures = kafka_admin_client.describe_configs([broker])
+    original_config = futures[broker].result()
+
+    # Gather the list of settings we need to change in the broker
+    # ConfigResource, and their original values in the to_restore dict
+    to_restore = {}
+    for key, new_value in kafka_server_config_overrides.items():
+        if key not in original_config:
+            raise ValueError(f"Cannot override unknown configuration {key}")
+        orig_value = original_config[key].value
+        if orig_value == new_value:
+            continue
+        if original_config[key].is_read_only:
+            raise ValueError(f"Cannot override read-only configuration {key}")
+
+        broker.set_config(key, new_value)
+        to_restore[key] = orig_value
+
+    # to_restore will be empty if all the config "overrides" are equal to the
+    # original value. No need to wait for a config alteration if that's the
+    # case. The result() will raise a KafkaException if the settings change
+    # failed.
+    if to_restore:
+        futures = kafka_admin_client.alter_configs([broker])
+        try:
+            futures[broker].result()
+        except Exception:
+            raise
+
+    yield session_kafka_server
+
+    # Now we can restore the old setting values. Again, the result() will raise
+    # a KafkaException if the settings change failed.
+    if to_restore:
+        for key, orig_value in to_restore.items():
+            broker.set_config(key, orig_value)
+
+        futures = kafka_admin_client.alter_configs([broker])
+        try:
+            futures[broker].result()
+        except Exception:
+            raise
+
+
 TEST_CONFIG = {
     'consumer_id': 'swh.journal.consumer',
     'object_types': OBJECT_TYPE_KEYS.keys(),
     'stop_after_objects': 1,  # will read 1 object and stop
     'storage': {'cls': 'memory', 'args': {}},
 }
 
 
 @pytest.fixture
 def test_config(kafka_server: Tuple[Popen, int],
                 kafka_prefix: str):
     """Test configuration needed for producer/consumer
 
     """
     _, port = kafka_server
     return {
         **TEST_CONFIG,
         'brokers': ['127.0.0.1:{}'.format(port)],
         'prefix': kafka_prefix + '.swh.journal.objects',
     }
 
 
 @pytest.fixture
 def consumer(
     kafka_server: Tuple[Popen, int],
     test_config: Dict,
     kafka_consumer_group: str,
 ) -> Consumer:
     """Get a connected Kafka consumer.
 
     """
     _, kafka_port = kafka_server
     consumer = Consumer({
         'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port),
         'auto.offset.reset': 'earliest',
         'enable.auto.commit': True,
         'group.id': kafka_consumer_group,
     })
 
     kafka_topics = [
         '%s.%s' % (test_config['prefix'], object_type)
         for object_type in test_config['object_types']
     ]
 
     consumer.subscribe(kafka_topics)
 
     yield consumer
 
     consumer.close()
 
 
 def objects_d():
     return one_of(
         strategies.origins().map(
             lambda x: ('origin', x.to_dict())),
         strategies.origin_visits().map(
             lambda x: ('origin_visit', x.to_dict())),
         strategies.snapshots().map(
             lambda x: ('snapshot', x.to_dict())),
         strategies.releases().map(
             lambda x: ('release', x.to_dict())),
         strategies.revisions().map(
             lambda x: ('revision', x.to_dict())),
         strategies.directories().map(
             lambda x: ('directory', x.to_dict())),
         strategies.skipped_contents().map(
             lambda x: ('skipped_content', x.to_dict())),
         strategies.present_contents().map(
             lambda x: ('content', x.to_dict())),
     )