diff --git a/swh/journal/client.py b/swh/journal/client.py
index 028f94b..a8c4ef5 100644
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -1,355 +1,356 @@
 # Copyright (C) 2017-2022  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from collections import defaultdict
 from importlib import import_module
 import logging
 import os
 from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
 
 from confluent_kafka import Consumer, KafkaError, KafkaException
 
 from swh.core.statsd import statsd
 from swh.journal import DEFAULT_PREFIX
 
 from .serializers import kafka_to_value
 
 logger = logging.getLogger(__name__)
 rdkafka_logger = logging.getLogger(__name__ + ".rdkafka")
 
 
 # Only accepted offset reset policy accepted
 ACCEPTED_OFFSET_RESET = ["earliest", "latest"]
 
 # Errors that Kafka raises too often and are not useful; therefore they
 # we lower their log level to DEBUG instead of INFO.
 _SPAMMY_ERRORS = [
     KafkaError._NO_OFFSET,
 ]
 
 JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_total"
 JOURNAL_STATUS_METRIC = "swh_journal_client_status"
 
 
 def get_journal_client(cls: str, **kwargs: Any):
     """Factory function to instantiate a journal client object.
 
     Currently, only the "kafka" journal client is supported.
     """
     if cls == "kafka":
         if "stats_cb" in kwargs:
             stats_cb = kwargs["stats_cb"]
             if isinstance(stats_cb, str):
                 try:
                     module_path, func_name = stats_cb.split(":")
                 except ValueError:
                     raise ValueError(
                         "Invalid stats_cb configuration option: "
                         "it should be a string like 'path.to.module:function'"
                     )
                 try:
                     module = import_module(module_path, package=__package__)
                 except ModuleNotFoundError:
                     raise ValueError(
                         "Invalid stats_cb configuration option: "
                         f"module {module_path} not found"
                     )
                 try:
                     kwargs["stats_cb"] = getattr(module, func_name)
                 except AttributeError:
                     raise ValueError(
                         "Invalid stats_cb configuration option: "
                         f"function {func_name} not found in module {module_path}"
                     )
         return JournalClient(**kwargs)
     raise ValueError("Unknown journal client class `%s`" % cls)
 
 
 def _error_cb(error):
     if error.fatal():
         raise KafkaException(error)
     if error.code() in _SPAMMY_ERRORS:
         logger.debug("Received non-fatal kafka error: %s", error)
     else:
         logger.info("Received non-fatal kafka error: %s", error)
 
 
 def _on_commit(error, partitions):
     if error is not None:
         _error_cb(error)
 
 
 class JournalClient:
     """A base client for the Software Heritage journal.
 
     The current implementation of the journal uses Apache Kafka
     brokers to publish messages under a given topic prefix, with each
     object type using a specific topic under that prefix. If the `prefix`
     argument is None (default value), it will take the default value
     `'swh.journal.objects'`.
 
     Clients subscribe to events specific to each object type as listed in the
     `object_types` argument (if unset, defaults to all existing kafka topic under
     the prefix).
 
     Clients can be sharded by setting the `group_id` to a common
     value across instances. The journal will share the message
     throughput across the nodes sharing the same group_id.
 
     Messages are processed by the `worker_fn` callback passed to the `process`
     method, in batches of maximum `batch_size` messages (defaults to 200).
 
     The objects passed to the `worker_fn` callback are the result of the kafka
     message converted by the `value_deserializer` function. By default (if this
     argument is not given), it will produce dicts (using the `kafka_to_value`
     function). This signature of the function is:
 
         `value_deserializer(object_type: str, kafka_msg: bytes) -> Any`
 
     If the value returned by `value_deserializer` is None, it is ignored and
     not passed the `worker_fn` function.
 
     If set, the processing stops after processing `stop_after_objects` messages
     in total.
 
     `stop_on_eof` stops the processing when the client has reached the end of
     each partition in turn.
 
     `auto_offset_reset` sets the behavior of the client when the consumer group
     initializes: `'earliest'` (the default) processes all objects since the
     inception of the topics; `''`
 
     Any other named argument is passed directly to KafkaConsumer().
 
     """
 
     def __init__(
         self,
         brokers: Union[str, List[str]],
         group_id: str,
         prefix: Optional[str] = None,
         object_types: Optional[List[str]] = None,
         privileged: bool = False,
         stop_after_objects: Optional[int] = None,
         batch_size: int = 200,
         process_timeout: Optional[float] = None,
         auto_offset_reset: str = "earliest",
         stop_on_eof: bool = False,
         value_deserializer: Optional[Callable[[str, bytes], Any]] = None,
         **kwargs,
     ):
         if prefix is None:
             prefix = DEFAULT_PREFIX
         if auto_offset_reset not in ACCEPTED_OFFSET_RESET:
             raise ValueError(
                 "Option 'auto_offset_reset' only accept %s, not %s"
                 % (ACCEPTED_OFFSET_RESET, auto_offset_reset)
             )
 
         if batch_size <= 0:
             raise ValueError("Option 'batch_size' needs to be positive")
         if value_deserializer:
             self.value_deserializer = value_deserializer
         else:
             self.value_deserializer = lambda _, value: kafka_to_value(value)
 
         if isinstance(brokers, str):
             brokers = [brokers]
 
         debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG)
         if debug_logging and "debug" not in kwargs:
             kwargs["debug"] = "consumer"
 
         # Static group instance id management
         group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID")
         if group_instance_id:
             kwargs["group.instance.id"] = group_instance_id
 
         if "group.instance.id" in kwargs:
             # When doing static consumer group membership, set a higher default
             # session timeout. The session timeout is the duration after which
             # the broker considers that a consumer has left the consumer group
             # for good, and triggers a rebalance. Considering our current
             # processing pattern, 10 minutes gives the consumer ample time to
             # restart before that happens.
             if "session.timeout.ms" not in kwargs:
                 kwargs["session.timeout.ms"] = 10 * 60 * 1000  # 10 minutes
 
         if "session.timeout.ms" in kwargs:
             # When the session timeout is set, rdkafka requires the max poll
             # interval to be set to a higher value; the max poll interval is
             # rdkafka's way of figuring out whether the client's message
             # processing thread has stalled: when the max poll interval lapses
             # between two calls to consumer.poll(), rdkafka leaves the consumer
             # group and terminates the connection to the brokers.
             #
             # We default to 1.5 times the session timeout
             if "max.poll.interval.ms" not in kwargs:
                 kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3
 
         consumer_settings = {
             **kwargs,
             "bootstrap.servers": ",".join(brokers),
             "auto.offset.reset": auto_offset_reset,
             "group.id": group_id,
             "on_commit": _on_commit,
             "error_cb": _error_cb,
             "enable.auto.commit": False,
             "logger": rdkafka_logger,
         }
 
         self.stop_on_eof = stop_on_eof
         if self.stop_on_eof:
             consumer_settings["enable.partition.eof"] = True
 
         logger.debug("Consumer settings: %s", consumer_settings)
 
         self.consumer = Consumer(consumer_settings)
         if privileged:
             privileged_prefix = f"{prefix}_privileged"
         else:  # do not attempt to subscribe to privileged topics
             privileged_prefix = f"{prefix}"
         existing_topics = [
             topic
             for topic in self.consumer.list_topics(timeout=10).topics.keys()
             if (
                 topic.startswith(f"{prefix}.")
                 or topic.startswith(f"{privileged_prefix}.")
             )
         ]
         if not existing_topics:
             raise ValueError(
                 f"The prefix {prefix} does not match any existing topic "
                 "on the kafka broker"
             )
 
         if not object_types:
             object_types = list({topic.split(".")[-1] for topic in existing_topics})
 
         self.subscription = []
         unknown_types = []
         for object_type in object_types:
             topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}")
             for topic in topics:
                 if topic in existing_topics:
                     self.subscription.append(topic)
                     break
             else:
                 unknown_types.append(object_type)
         if unknown_types:
             raise ValueError(
                 f"Topic(s) for object types {','.join(unknown_types)} "
                 "are unknown on the kafka broker"
             )
 
         logger.debug(f"Upstream topics: {existing_topics}")
         self.subscribe()
 
         self.stop_after_objects = stop_after_objects
 
         self.eof_reached: Set[Tuple[str, str]] = set()
         self.batch_size = batch_size
 
         if process_timeout is not None:
             raise DeprecationWarning(
                 "'process_timeout' argument is not supported anymore by "
                 "JournalClient; please remove it from your configuration.",
             )
 
     def subscribe(self):
         """Subscribe to topics listed in self.subscription
 
         This can be overridden if you need, for instance, to manually assign partitions.
         """
         logger.debug(f"Subscribing to: {self.subscription}")
         self.consumer.subscribe(topics=self.subscription)
 
     def process(self, worker_fn):
         """Polls Kafka for a batch of messages, and calls the worker_fn
         with these messages.
 
         Args:
             worker_fn Callable[Dict[str, List[dict]]]: Function called with
                                                        the messages as
                                                        argument.
         """
         total_objects_processed = 0
         # timeout for message poll
         timeout = 1.0
 
         with statsd.status_gauge(
             JOURNAL_STATUS_METRIC, statuses=["idle", "processing", "waiting"]
         ) as set_status:
             set_status("idle")
             while True:
                 batch_size = self.batch_size
                 if self.stop_after_objects:
                     if total_objects_processed >= self.stop_after_objects:
                         break
 
                     # clamp batch size to avoid overrunning stop_after_objects
                     batch_size = min(
-                        self.stop_after_objects - total_objects_processed, batch_size,
+                        self.stop_after_objects - total_objects_processed,
+                        batch_size,
                     )
 
                 set_status("waiting")
                 while True:
                     messages = self.consumer.consume(
                         timeout=timeout, num_messages=batch_size
                     )
                     if messages:
                         break
 
                 set_status("processing")
                 batch_processed, at_eof = self.handle_messages(messages, worker_fn)
 
                 set_status("idle")
                 # report the number of handled messages
                 statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed)
                 total_objects_processed += batch_processed
                 if at_eof:
                     break
 
         return total_objects_processed
 
     def handle_messages(self, messages, worker_fn):
         objects: Dict[str, List[Any]] = defaultdict(list)
         nb_processed = 0
 
         for message in messages:
             error = message.error()
             if error is not None:
                 if error.code() == KafkaError._PARTITION_EOF:
                     self.eof_reached.add((message.topic(), message.partition()))
                 else:
                     _error_cb(error)
                 continue
             if message.value() is None:
                 # ignore message with no payload, these can be generated in tests
                 continue
             nb_processed += 1
             object_type = message.topic().split(".")[-1]
             deserialized_object = self.deserialize_message(
                 message, object_type=object_type
             )
             if deserialized_object is not None:
                 objects[object_type].append(deserialized_object)
 
         if objects:
             worker_fn(dict(objects))
         self.consumer.commit()
 
         at_eof = self.stop_on_eof and all(
             (tp.topic, tp.partition) in self.eof_reached
             for tp in self.consumer.assignment()
         )
 
         return nb_processed, at_eof
 
     def deserialize_message(self, message, object_type=None):
         return self.value_deserializer(object_type, message.value())
 
     def close(self):
         self.consumer.close()
diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py
index eb82b69..3476d4e 100644
--- a/swh/journal/pytest_plugin.py
+++ b/swh/journal/pytest_plugin.py
@@ -1,261 +1,257 @@
 # Copyright (C) 2019-2021 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from collections import defaultdict
 import random
 import string
 from typing import Any, Collection, Dict, Iterator, Optional
 
 import attr
 from confluent_kafka import Consumer, KafkaException, Producer
 from confluent_kafka.admin import AdminClient
 import pytest
 
 from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key
 from swh.model.tests.swh_model_data import TEST_OBJECTS
 
 
 def ensure_lists(value: Any) -> Any:
     """
     >>> ensure_lists(["foo", 42])
     ['foo', 42]
     >>> ensure_lists(("foo", 42))
     ['foo', 42]
     >>> ensure_lists({"a": ["foo", 42]})
     {'a': ['foo', 42]}
     >>> ensure_lists({"a": ("foo", 42)})
     {'a': ['foo', 42]}
     """
     if isinstance(value, (tuple, list)):
         return list(map(ensure_lists, value))
     elif isinstance(value, dict):
         return dict(ensure_lists(list(value.items())))
     else:
         return value
 
 
 def consume_messages(consumer, kafka_prefix, expected_messages):
     """Consume expected_messages from the consumer;
     Sort them all into a consumed_objects dict"""
     consumed_messages = defaultdict(list)
 
     fetched_messages = 0
     retries_left = 1000
 
     while fetched_messages < expected_messages:
         if retries_left == 0:
             raise ValueError(
                 "Timed out fetching messages from kafka. "
                 f"Only {fetched_messages}/{expected_messages} fetched"
             )
 
         msg = consumer.poll(timeout=0.1)
 
         if not msg:
             retries_left -= 1
             continue
 
         error = msg.error()
         if error is not None:
             if error.fatal():
                 raise KafkaException(error)
             retries_left -= 1
             continue
 
         fetched_messages += 1
         topic = msg.topic()
         assert topic.startswith(f"{kafka_prefix}.") or topic.startswith(
             f"{kafka_prefix}_privileged."
         ), "Unexpected topic"
         object_type = topic[len(kafka_prefix + ".") :]
 
         consumed_messages[object_type].append(
             (kafka_to_key(msg.key()), kafka_to_value(msg.value()))
         )
 
     return consumed_messages
 
 
 def assert_all_objects_consumed(
     consumed_messages: Dict, exclude: Optional[Collection] = None
 ):
     """Check whether all objects from TEST_OBJECTS have been consumed
 
     `exclude` can be a list of object types for which we do not want to compare the
     values (eg. for anonymized object).
 
     """
     for object_type, known_objects in TEST_OBJECTS.items():
         known_keys = [obj.unique_key() for obj in known_objects]
 
         if not consumed_messages[object_type]:
             return
 
         (received_keys, received_values) = zip(*consumed_messages[object_type])
 
         if object_type in ("content", "skipped_content"):
             for value in received_values:
                 value.pop("ctime", None)
         if object_type == "content":
             known_objects = [attr.evolve(o, data=None) for o in known_objects]
 
         for key in known_keys:
             assert key in received_keys, (
                 f"expected {object_type} key {pprint_key(key)} "
                 "absent from consumed messages"
             )
 
         if exclude and object_type in exclude:
             continue
 
         for value in known_objects:
             expected_value = value.to_dict()
             if value.object_type in ("content", "skipped_content"):
                 expected_value.pop("ctime", None)
             assert ensure_lists(expected_value) in received_values, (
                 f"expected {object_type} value {value!r} is "
                 "absent from consumed messages"
             )
 
 
 @pytest.fixture(scope="function")
 def kafka_prefix():
     """Pick a random prefix for kafka topics on each call"""
     return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
 
 
 @pytest.fixture(scope="function")
 def kafka_consumer_group(kafka_prefix: str):
     """Pick a random consumer group for kafka consumers on each call"""
     return "test-consumer-%s" % kafka_prefix
 
 
 @pytest.fixture(scope="function")
 def object_types():
     """Set of object types to precreate topics for."""
     return set(TEST_OBJECTS.keys())
 
 
 @pytest.fixture(scope="function")
 def privileged_object_types():
     """Set of object types to precreate privileged topics for."""
     return {"revision", "release"}
 
 
 @pytest.fixture(scope="function")
 def kafka_server(
     kafka_server_base: str,
     kafka_prefix: str,
     object_types: Iterator[str],
     privileged_object_types: Iterator[str],
 ) -> str:
     """A kafka server with existing topics
 
     Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type
     from the ``object_types`` list.
 
     Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with
     object_type from the ``privileged_object_types`` list.
 
     """
     topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [
         f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
     ]
 
     # unfortunately, the Mock broker does not support the CreatTopic admin API, so we
     # have to create topics using a Producer.
     producer = Producer(
         {
             "bootstrap.servers": kafka_server_base,
             "client.id": "bootstrap producer",
             "acks": "all",
         }
     )
     for topic in topics:
         producer.produce(topic=topic, value=None)
     for i in range(10):
         if producer.flush(0.1) == 0:
             break
 
     return kafka_server_base
 
 
 @pytest.fixture(scope="session")
 def kafka_server_base() -> Iterator[str]:
     """Create a mock kafka cluster suitable for tests.
 
     Yield a connection string.
 
     Note: this is a generator to keep the mock broker alive during the whole test
     session.
 
     see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h
     """
     admin = AdminClient({"test.mock.num.brokers": "1"})
 
     metadata = admin.list_topics()
     brokers = [str(broker) for broker in metadata.brokers.values()]
     assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
 
     broker_connstr, broker_id = brokers[0].split("/")
     yield broker_connstr
 
 
 TEST_CONFIG = {
     "consumer_id": "swh.journal.consumer",
     "stop_on_eof": True,
     "storage": {"cls": "memory", "args": {}},
 }
 
 
 @pytest.fixture
 def test_config(
     kafka_server_base: str,
     kafka_prefix: str,
     object_types: Iterator[str],
     privileged_object_types: Iterator[str],
 ):
-    """Test configuration needed for producer/consumer
-
-    """
+    """Test configuration needed for producer/consumer"""
     return {
         **TEST_CONFIG,
         "object_types": object_types,
         "privileged_object_types": privileged_object_types,
         "brokers": [kafka_server_base],
         "prefix": kafka_prefix,
     }
 
 
 @pytest.fixture
 def consumer(
     kafka_server: str, test_config: Dict, kafka_consumer_group: str
 ) -> Consumer:
-    """Get a connected Kafka consumer.
-
-    """
+    """Get a connected Kafka consumer."""
     consumer = Consumer(
         {
             "bootstrap.servers": kafka_server,
             "auto.offset.reset": "earliest",
             "enable.auto.commit": True,
             "group.id": kafka_consumer_group,
         }
     )
     prefix = test_config["prefix"]
     kafka_topics = [
         f"{prefix}.{object_type}" for object_type in test_config["object_types"]
     ] + [
         f"{prefix}_privileged.{object_type}"
         for object_type in test_config["privileged_object_types"]
     ]
     consumer.subscribe(kafka_topics)
 
     yield consumer
 
     # Explicitly perform the commit operation on the consumer before closing it
     # to avoid possible hang since confluent-kafka v1.6.0
     consumer.commit()
     consumer.close()
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
index 007e652..3d032d5 100644
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -1,397 +1,409 @@
 # Copyright (C) 2019 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from typing import Dict, List, cast
 from unittest.mock import MagicMock
 
 from confluent_kafka import Producer
 import pytest
 
 from swh.journal.client import JournalClient
 from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka
 from swh.model.model import Content, Revision
 from swh.model.tests.swh_model_data import TEST_OBJECTS
 
 REV = {
     "message": b"something cool",
     "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"},
     "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None},
     "date": {
         "timestamp": {"seconds": 123456789, "microseconds": 123},
         "offset": 120,
         "negative_utc": False,
     },
     "committer_date": {
         "timestamp": {"seconds": 123123456, "microseconds": 0},
         "offset": 0,
         "negative_utc": False,
     },
     "type": "git",
     "directory": (
         b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
         b"\x01\x02\x03\x04\x05"
     ),
     "synthetic": False,
     "metadata": None,
     "parents": [],
     "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c",
 }
 
 
 def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
     producer = Producer(
         {
             "bootstrap.servers": kafka_server,
             "client.id": "test producer",
             "acks": "all",
         }
     )
 
     # Fill Kafka
     producer.produce(
-        topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
+        topic=kafka_prefix + ".revision",
+        key=REV["id"],
+        value=value_to_kafka(REV),
     )
     producer.flush()
 
     client = JournalClient(
         brokers=[kafka_server],
         group_id=kafka_consumer_group,
         prefix=kafka_prefix,
         stop_on_eof=True,
     )
     worker_fn = MagicMock()
     client.process(worker_fn)
 
     worker_fn.assert_called_once_with({"revision": [REV]})
 
 
 @pytest.mark.parametrize("count", [1, 2])
 def test_client_stop_after_objects(
     kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, count: int
 ):
     producer = Producer(
         {
             "bootstrap.servers": kafka_server,
             "client.id": "test producer",
             "acks": "all",
         }
     )
 
     # Fill Kafka
     revisions = cast(List[Revision], TEST_OBJECTS["revision"])
     for rev in revisions:
         producer.produce(
             topic=kafka_prefix + ".revision",
             key=rev.id,
             value=value_to_kafka(rev.to_dict()),
         )
     producer.flush()
 
     client = JournalClient(
         brokers=[kafka_server],
         group_id=kafka_consumer_group,
         prefix=kafka_prefix,
         stop_on_eof=False,
         stop_after_objects=count,
     )
 
     worker_fn = MagicMock()
     client.process(worker_fn)
 
     # this code below is not pretty, but needed since we have to deal with
     # dicts (so no set) which can have values that are list vs tuple, and we do
     # not know for sure how many calls of the worker_fn will happen during the
     # consumption of the topic...
     worker_fn.assert_called()
     revs = []  # list of (unique) rev dicts we got from the client
     for call in worker_fn.call_args_list:
         callrevs = call[0][0]["revision"]
         for rev in callrevs:
             assert Revision.from_dict(rev) in revisions
             if rev not in revs:
                 revs.append(rev)
     assert len(revs) == count
 
 
 @pytest.mark.parametrize("batch_size", [1, 5, 100])
 def test_client_batch_size(
-    kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int,
+    kafka_prefix: str,
+    kafka_consumer_group: str,
+    kafka_server: str,
+    batch_size: int,
 ):
     num_objects = 2 * batch_size + 1
     assert num_objects < 256, "Too many objects, generation will fail"
 
     producer = Producer(
         {
             "bootstrap.servers": kafka_server,
             "client.id": "test producer",
             "acks": "all",
         }
     )
 
     contents = [Content.from_data(bytes([i])) for i in range(num_objects)]
 
     # Fill Kafka
     for content in contents:
         producer.produce(
             topic=kafka_prefix + ".content",
             key=key_to_kafka(content.sha1),
             value=value_to_kafka(content.to_dict()),
         )
 
     producer.flush()
 
     client = JournalClient(
         brokers=[kafka_server],
         group_id=kafka_consumer_group,
         prefix=kafka_prefix,
         stop_on_eof=True,
         batch_size=batch_size,
     )
 
     collected_output: List[Dict] = []
 
     def worker_fn(objects):
         received = objects["content"]
         assert len(received) <= batch_size
         collected_output.extend(received)
 
     client.process(worker_fn)
 
     expected_output = [content.to_dict() for content in contents]
     assert len(collected_output) == len(expected_output)
 
     for output in collected_output:
         assert output in expected_output
 
 
 @pytest.fixture()
 def kafka_producer(kafka_prefix: str, kafka_server_base: str):
     producer = Producer(
         {
             "bootstrap.servers": kafka_server_base,
             "client.id": "test producer",
             "acks": "all",
         }
     )
 
     # Fill Kafka
     producer.produce(
         topic=kafka_prefix + ".something",
         key=key_to_kafka(b"key1"),
         value=value_to_kafka("value1"),
     )
     producer.produce(
         topic=kafka_prefix + ".else",
         key=key_to_kafka(b"key1"),
         value=value_to_kafka("value2"),
     )
     producer.flush()
     return producer
 
 
 def test_client_subscribe_all(
     kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
 ):
     client = JournalClient(
         brokers=[kafka_server_base],
         group_id="whatever",
         prefix=kafka_prefix,
         stop_on_eof=True,
     )
     assert set(client.subscription) == {
         f"{kafka_prefix}.something",
         f"{kafka_prefix}.else",
     }
 
     worker_fn = MagicMock()
     client.process(worker_fn)
     worker_fn.assert_called_once_with(
-        {"something": ["value1"], "else": ["value2"],}
+        {
+            "something": ["value1"],
+            "else": ["value2"],
+        }
     )
 
 
 def test_client_subscribe_one_topic(
     kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
 ):
     client = JournalClient(
         brokers=[kafka_server_base],
         group_id="whatever",
         prefix=kafka_prefix,
         stop_on_eof=True,
         object_types=["else"],
     )
     assert client.subscription == [f"{kafka_prefix}.else"]
 
     worker_fn = MagicMock()
     client.process(worker_fn)
     worker_fn.assert_called_once_with({"else": ["value2"]})
 
 
 def test_client_subscribe_absent_topic(
     kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
 ):
     with pytest.raises(ValueError):
         JournalClient(
             brokers=[kafka_server_base],
             group_id="whatever",
             prefix=kafka_prefix,
             stop_on_eof=True,
             object_types=["really"],
         )
 
 
 def test_client_subscribe_absent_prefix(
     kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str
 ):
     with pytest.raises(ValueError):
         JournalClient(
             brokers=[kafka_server_base],
             group_id="whatever",
             prefix="wrong.prefix",
             stop_on_eof=True,
         )
     with pytest.raises(ValueError):
         JournalClient(
             brokers=[kafka_server_base],
             group_id="whatever",
             prefix="wrong.prefix",
             stop_on_eof=True,
             object_types=["else"],
         )
 
 
 def test_client_subscriptions_with_anonymized_topics(
     kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str
 ):
     producer = Producer(
         {
             "bootstrap.servers": kafka_server_base,
             "client.id": "test producer",
             "acks": "all",
         }
     )
 
     # Fill Kafka with revision object on both the regular prefix (normally for
     # anonymized objects in this case) and privileged one
     producer.produce(
-        topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
+        topic=kafka_prefix + ".revision",
+        key=REV["id"],
+        value=value_to_kafka(REV),
     )
     producer.produce(
         topic=kafka_prefix + "_privileged.revision",
         key=REV["id"],
         value=value_to_kafka(REV),
     )
     producer.flush()
 
     # without privileged "channels" activated on the client side
     client = JournalClient(
         brokers=[kafka_server_base],
         group_id=kafka_consumer_group,
         prefix=kafka_prefix,
         stop_on_eof=True,
         privileged=False,
     )
     # we only subscribed to "standard" topics
     assert client.subscription == [kafka_prefix + ".revision"]
 
     # with privileged "channels" activated on the client side
     client = JournalClient(
         brokers=[kafka_server_base],
         group_id=kafka_consumer_group,
         prefix=kafka_prefix,
         privileged=True,
     )
     # we only subscribed to "privileged" topics
     assert client.subscription == [kafka_prefix + "_privileged.revision"]
 
 
 def test_client_subscriptions_without_anonymized_topics(
     kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str
 ):
     producer = Producer(
         {
             "bootstrap.servers": kafka_server_base,
             "client.id": "test producer",
             "acks": "all",
         }
     )
 
     # Fill Kafka with revision objects only on the standard prefix
     producer.produce(
-        topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV),
+        topic=kafka_prefix + ".revision",
+        key=REV["id"],
+        value=value_to_kafka(REV),
     )
     producer.flush()
 
     # without privileged channel activated on the client side
     client = JournalClient(
         brokers=[kafka_server_base],
         group_id=kafka_consumer_group,
         prefix=kafka_prefix,
         stop_on_eof=True,
         privileged=False,
     )
     # we only subscribed to the standard prefix
     assert client.subscription == [kafka_prefix + ".revision"]
 
     # with privileged channel activated on the client side
     client = JournalClient(
         brokers=[kafka_server_base],
         group_id=kafka_consumer_group,
         prefix=kafka_prefix,
         stop_on_eof=True,
         privileged=True,
     )
     # we also only subscribed to the standard prefix, since there is no priviled prefix
     # on the kafka broker
     assert client.subscription == [kafka_prefix + ".revision"]
 
 
 def test_client_with_deserializer(
     kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
 ):
     producer = Producer(
         {
             "bootstrap.servers": kafka_server,
             "client.id": "test producer",
             "acks": "all",
         }
     )
 
     # Fill Kafka
     revisions = cast(List[Revision], TEST_OBJECTS["revision"])
     for rev in revisions:
         producer.produce(
             topic=kafka_prefix + ".revision",
             key=rev.id,
             value=value_to_kafka(rev.to_dict()),
         )
     producer.flush()
 
     def custom_deserializer(object_type, msg):
         assert object_type == "revision"
         obj = kafka_to_value(msg)
         # filter the first revision
         if obj["id"] == revisions[0].id:
             return None
         return Revision.from_dict(obj)
 
     client = JournalClient(
         brokers=[kafka_server],
         group_id=kafka_consumer_group,
         prefix=kafka_prefix,
         stop_on_eof=True,
         value_deserializer=custom_deserializer,
     )
     worker_fn = MagicMock()
     client.process(worker_fn)
 
     # a commit seems to be needed to prevent some race condition situation
     # where the worker_fn has not yet been called at this point (not sure how)
     client.consumer.commit()
 
     # Check the first revision has not been passed to worker_fn
     processed_revisions = set(worker_fn.call_args[0][0]["revision"])
     assert revisions[0] not in processed_revisions
     assert all(rev in processed_revisions for rev in revisions[1:])
diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py
index ece39ba..b5966da 100644
--- a/swh/journal/tests/test_pytest_plugin.py
+++ b/swh/journal/tests/test_pytest_plugin.py
@@ -1,73 +1,76 @@
 # Copyright (C) 2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from typing import Iterator
 
 from confluent_kafka.admin import AdminClient
 
 
 def test_kafka_server(kafka_server_base: str):
     ip, port_str = kafka_server_base.split(":")
     assert ip == "127.0.0.1"
     assert int(port_str)
 
     admin = AdminClient({"bootstrap.servers": kafka_server_base})
 
     topics = admin.list_topics()
 
     assert len(topics.brokers) == 1
 
 
 def test_kafka_server_with_topics(
     kafka_server: str,
     kafka_prefix: str,
     object_types: Iterator[str],
     privileged_object_types: Iterator[str],
 ):
     admin = AdminClient({"bootstrap.servers": kafka_server})
 
     # check unprivileged topics are present
     topics = {
         topic
         for topic in admin.list_topics().topics
         if topic.startswith(f"{kafka_prefix}.")
     }
     assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types}
 
     # check privileged topics are present
     topics = {
         topic
         for topic in admin.list_topics().topics
         if topic.startswith(f"{kafka_prefix}_privileged.")
     }
     assert topics == {
         f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types
     }
 
 
 def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str):
     assert test_config == {
         "consumer_id": "swh.journal.consumer",
         "stop_on_eof": True,
         "storage": {"cls": "memory", "args": {}},
         "object_types": {
             "content",
             "directory",
             "extid",
             "metadata_authority",
             "metadata_fetcher",
             "origin",
             "origin_visit",
             "origin_visit_status",
             "raw_extrinsic_metadata",
             "release",
             "revision",
             "snapshot",
             "skipped_content",
         },
-        "privileged_object_types": {"release", "revision",},
+        "privileged_object_types": {
+            "release",
+            "revision",
+        },
         "brokers": [kafka_server_base],
         "prefix": kafka_prefix,
     }
diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py
index b94825f..5dde7bd 100644
--- a/swh/journal/tests/test_serializers.py
+++ b/swh/journal/tests/test_serializers.py
@@ -1,115 +1,119 @@
 # Copyright (C) 2017-2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 from collections import OrderedDict
 from datetime import datetime, timedelta, timezone
 import itertools
 from typing import Iterable
 
 import pytest
 
 from swh.journal import serializers
 from swh.model.tests.swh_model_data import TEST_OBJECTS
 
 
 def test_key_to_kafka_repeatable():
     """Check the kafka key encoding is repeatable"""
     base_dict = {
         "a": "foo",
         "b": "bar",
         "c": "baz",
     }
 
     key = serializers.key_to_kafka(base_dict)
 
     for dict_keys in itertools.permutations(base_dict):
         d = OrderedDict()
         for k in dict_keys:
             d[k] = base_dict[k]
 
         assert key == serializers.key_to_kafka(d)
 
 
 def test_pprint_key():
     """Test whether get_key works on all our objects"""
     for object_type, objects in TEST_OBJECTS.items():
         for obj in objects:
             key = obj.unique_key()
             pprinted_key = serializers.pprint_key(key)
             assert isinstance(pprinted_key, str)
 
             if isinstance(key, dict):
                 assert pprinted_key[0], pprinted_key[-1] == "{}"
                 for dict_key in key.keys():
                     assert f"{dict_key}:" in pprinted_key
 
             if isinstance(key, bytes):
                 assert pprinted_key == key.hex()
 
 
 def test_kafka_to_key():
-    """Standard back and forth serialization with keys
-
-    """
+    """Standard back and forth serialization with keys"""
     # All KeyType(s)
     keys: Iterable[serializers.KeyType] = [
-        {"a": "foo", "b": "bar", "c": "baz",},
-        {"a": b"foobarbaz",},
+        {
+            "a": "foo",
+            "b": "bar",
+            "c": "baz",
+        },
+        {
+            "a": b"foobarbaz",
+        },
         b"foo",
     ]
     for object_type, objects in TEST_OBJECTS.items():
         for obj in objects:
             key = obj.unique_key()
             keys.append(key)
 
     for key in keys:
         ktk = serializers.key_to_kafka(key)
         v = serializers.kafka_to_key(ktk)
 
         assert v == key
 
 
 # limits of supported int values by msgpack
-MININT = -(2 ** 63)
-MAXINT = 2 ** 64 - 1
+MININT = -(2**63)
+MAXINT = 2**64 - 1
 
 intvalues = [
     MININT * 2,
     MININT - 1,
     MININT,
     MININT + 1,
     -10,
     0,
     10,
     MAXINT - 1,
     MAXINT,
     MAXINT + 1,
     MAXINT * 2,
 ]
 
 
 @pytest.mark.parametrize("value", intvalues)
 def test_encode_int(value):
     assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value
 
 
 datevalues = [
     datetime.now(tz=timezone.utc),
     datetime.now(tz=timezone(timedelta(hours=-23, minutes=-59))),
     datetime.now(tz=timezone(timedelta(hours=23, minutes=59))),
     datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc),
     datetime(2100, 1, 1, 1, 1, tzinfo=timezone.utc),
 ]
 
 
 @pytest.mark.parametrize("value", datevalues)
 def test_encode_datetime(value):
     assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value
 
 
 @pytest.mark.parametrize("value", datevalues)
 def test_encode_datetime_bw(value):
     bwdate = {b"swhtype": "datetime", b"d": value.isoformat()}
     assert serializers.kafka_to_value(serializers.value_to_kafka(bwdate)) == value
diff --git a/swh/journal/tests/test_stream.py b/swh/journal/tests/test_stream.py
index c9bfc90..b11b8c4 100644
--- a/swh/journal/tests/test_stream.py
+++ b/swh/journal/tests/test_stream.py
@@ -1,47 +1,49 @@
 # Copyright (C) 2021 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import io
 
 import msgpack
 
 from swh.journal.serializers import msgpack_ext_hook
 from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer
 from swh.model.tests.swh_model_data import TEST_OBJECTS
 
 
 def test_write_additions_with_test_objects():
     outs = io.BytesIO()
 
     writer = get_journal_writer(
-        cls="stream", value_sanitizer=model_object_dict_sanitizer, output_stream=outs,
+        cls="stream",
+        value_sanitizer=model_object_dict_sanitizer,
+        output_stream=outs,
     )
     expected = []
 
     n = 0
     for object_type, objects in TEST_OBJECTS.items():
         writer.write_additions(object_type, objects)
 
         for object in objects:
             objd = object.to_dict()
             if object_type == "content":
                 objd.pop("data")
 
             expected.append((object_type, objd))
         n += len(objects)
 
     outs.seek(0, 0)
     unpacker = msgpack.Unpacker(
         outs,
         raw=False,
         ext_hook=msgpack_ext_hook,
         strict_map_key=False,
         use_list=False,
         timestamp=3,  # convert Timestamp in datetime objects (tz UTC)
     )
 
     for i, (objtype, objd) in enumerate(unpacker, start=1):
         assert (objtype, objd) in expected
     assert len(expected) == i
diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py
index 2434146..35f12c5 100644
--- a/swh/journal/writer/kafka.py
+++ b/swh/journal/writer/kafka.py
@@ -1,273 +1,275 @@
 # Copyright (C) 2019-2020 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import logging
 import time
 from typing import (
     Any,
     Callable,
     Dict,
     Generic,
     Iterable,
     List,
     NamedTuple,
     Optional,
     Type,
     TypeVar,
 )
 
 from confluent_kafka import KafkaException, Producer
 
 from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka
 
 from . import ValueProtocol
 
 logger = logging.getLogger(__name__)
 
 
 class DeliveryTag(NamedTuple):
     """Unique tag allowing us to check for a message delivery"""
 
     topic: str
     kafka_key: bytes
 
 
 class DeliveryFailureInfo(NamedTuple):
     """Verbose information for failed deliveries"""
 
     object_type: str
     key: KeyType
     message: str
     code: str
 
 
 def get_object_type(topic: str) -> str:
     """Get the object type from a topic string"""
     return topic.rsplit(".", 1)[-1]
 
 
 class KafkaDeliveryError(Exception):
     """Delivery failed on some kafka messages."""
 
     def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]):
         self.message = message
         self.delivery_failures = list(delivery_failures)
 
     def pretty_failures(self) -> str:
         return ", ".join(
             f"{f.object_type} {pprint_key(f.key)} ({f.message})"
             for f in self.delivery_failures
         )
 
     def __str__(self):
         return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])"
 
 
 TValue = TypeVar("TValue", bound=ValueProtocol)
 
 
 class KafkaJournalWriter(Generic[TValue]):
     """This class is used to write serialized versions of value objects to a
     series of Kafka topics. The type parameter `TValue`, which must implement the
     `ValueProtocol`, is the type of values this writer will write.
     Typically, `TValue` will be `swh.model.model.BaseModel`.
 
     Topics used to send objects representations are built from a ``prefix`` plus the
     type of the object:
 
       ``{prefix}.{object_type}``
 
     Objects can be sent as is, or can be anonymized. The anonymization feature, when
     activated, will write anonymized versions of value objects in the main topic, and
     stock (non-anonymized) objects will be sent to a dedicated (privileged) set of
     topics:
 
       ``{prefix}_privileged.{object_type}``
 
     The anonymization of a value object is the result of calling its
     ``anonymize()`` method. An object is considered anonymizable if this
     method returns a (non-None) value.
 
     Args:
       brokers: list of broker addresses and ports.
       prefix: the prefix used to build the topic names for objects.
       client_id: the id of the writer sent to kafka.
       value_sanitizer: a function that takes the object type and the dict
         representation of an object as argument, and returns an other dict
         that should be actually stored in the journal (eg. removing keys
         that do no belong there)
       producer_config: extra configuration keys passed to the `Producer`.
       flush_timeout: timeout, in seconds, after which the `flush` operation
         will fail if some message deliveries are still pending.
       producer_class: override for the kafka producer class.
       anonymize: if True, activate the anonymization feature.
 
     """
 
     def __init__(
         self,
         brokers: Iterable[str],
         prefix: str,
         client_id: str,
         value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]],
         producer_config: Optional[Dict] = None,
         flush_timeout: float = 120,
         producer_class: Type[Producer] = Producer,
         anonymize: bool = False,
     ):
         self._prefix = prefix
         self._prefix_privileged = f"{self._prefix}_privileged"
         self.anonymize = anonymize
 
         if not producer_config:
             producer_config = {}
 
         if "message.max.bytes" not in producer_config:
             producer_config = {
                 "message.max.bytes": 100 * 1024 * 1024,
                 **producer_config,
             }
 
         self.producer = producer_class(
             {
                 "bootstrap.servers": ",".join(brokers),
                 "client.id": client_id,
                 "on_delivery": self._on_delivery,
                 "error_cb": self._error_cb,
                 "logger": logger,
                 "acks": "all",
                 **producer_config,
             }
         )
 
         # Delivery management
         self.flush_timeout = flush_timeout
 
         # delivery tag -> original object "key" mapping
         self.deliveries_pending: Dict[DeliveryTag, KeyType] = {}
 
         # List of (object_type, key, error_msg, error_name) for failed deliveries
         self.delivery_failures: List[DeliveryFailureInfo] = []
 
         self.value_sanitizer = value_sanitizer
 
     def _error_cb(self, error):
         if error.fatal():
             raise KafkaException(error)
         logger.info("Received non-fatal kafka error: %s", error)
 
     def _on_delivery(self, error, message):
         (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key())
         sent_key = self.deliveries_pending.pop(delivery_tag, None)
 
         if error is not None:
             self.delivery_failures.append(
                 DeliveryFailureInfo(
                     get_object_type(topic), sent_key, error.str(), error.name()
                 )
             )
 
     def send(self, topic: str, key: KeyType, value):
         kafka_key = key_to_kafka(key)
         max_attempts = 5
         last_exception: Optional[Exception] = None
         for attempt in range(max_attempts):
             try:
                 self.producer.produce(
-                    topic=topic, key=kafka_key, value=value_to_kafka(value),
+                    topic=topic,
+                    key=kafka_key,
+                    value=value_to_kafka(value),
                 )
             except BufferError as e:
                 last_exception = e
                 wait = 1 + 3 * attempt
 
                 if logger.isEnabledFor(logging.DEBUG):  # pprint_key is expensive
                     logger.debug(
                         "BufferError producing %s %s; waiting for %ss",
                         get_object_type(topic),
                         pprint_key(kafka_key),
                         wait,
                     )
                 self.producer.poll(wait)
             else:
                 self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key
                 return
 
         # We reach this point if all delivery attempts have failed
         self.delivery_failures.append(
             DeliveryFailureInfo(
                 get_object_type(topic), key, str(last_exception), "SWH_BUFFER_ERROR"
             )
         )
 
     def delivery_error(self, message) -> KafkaDeliveryError:
         """Get all failed deliveries, and clear them"""
         ret = self.delivery_failures
         self.delivery_failures = []
 
         while self.deliveries_pending:
             delivery_tag, orig_key = self.deliveries_pending.popitem()
             (topic, kafka_key) = delivery_tag
             ret.append(
                 DeliveryFailureInfo(
                     get_object_type(topic),
                     orig_key,
                     "No delivery before flush() timeout",
                     "SWH_FLUSH_TIMEOUT",
                 )
             )
 
         return KafkaDeliveryError(message, ret)
 
     def flush(self):
         start = time.monotonic()
 
         self.producer.flush(self.flush_timeout)
 
         while self.deliveries_pending:
             if time.monotonic() - start > self.flush_timeout:
                 break
             self.producer.poll(0.1)
 
         if self.deliveries_pending:
             # Delivery timeout
             raise self.delivery_error(
                 "flush() exceeded timeout (%ss)" % self.flush_timeout,
             )
         elif self.delivery_failures:
             raise self.delivery_error("Failed deliveries after flush()")
 
     def _write_addition(self, object_type: str, object_: TValue) -> None:
         """Write a single object to the journal"""
         key = object_.unique_key()
 
         if self.anonymize:
             anon_object_ = object_.anonymize()
             if anon_object_:  # can be either None, or an anonymized object
                 # if the object is anonymizable, send the non-anonymized version in the
                 # privileged channel
                 topic = f"{self._prefix_privileged}.{object_type}"
                 dict_ = self.value_sanitizer(object_type, object_.to_dict())
                 logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_)
                 self.send(topic, key=key, value=dict_)
                 object_ = anon_object_
 
         topic = f"{self._prefix}.{object_type}"
         dict_ = self.value_sanitizer(object_type, object_.to_dict())
         logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_)
         self.send(topic, key=key, value=dict_)
 
     def write_addition(self, object_type: str, object_: TValue) -> None:
         """Write a single object to the journal"""
         self._write_addition(object_type, object_)
         self.flush()
 
     write_update = write_addition
 
     def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None:
         """Write a set of objects to the journal"""
         for object_ in objects:
             self._write_addition(object_type, object_)
 
         self.flush()