diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -11,7 +11,18 @@ import multiprocessing from pathlib import Path import time -from typing import Any, Container, Dict, List, Mapping, Optional, Sequence, Tuple, Type +from typing import ( + Any, + Container, + Dict, + List, + Mapping, + Optional, + Sequence, + Set, + Tuple, + Type, +) from confluent_kafka import Message, TopicPartition import tqdm @@ -56,6 +67,8 @@ self.refresh_every = refresh_every self.assignment = assignment self._messages_to_commit: List[Message] = [] + self._partitions_to_unsubscribe: Set[int] = set() + self.count = None self.topic_name: Optional[str] = None kwargs["stop_on_eof"] = True # Stop when the assignment is empty @@ -105,7 +118,15 @@ # offset will be committed after executing the worker_fn in # process(); see handle_messages() below self._messages_to_commit.append(message) - self.unsubscribe([partition_id]) + # delay the unsubcription to handle_messages() to prevent + # rdkakfa errors like + # + # rd_kafka_assignment_partition_stopped: + # Assertion `rktp->rktp_started' failed + # + # in case the unsubscription from parition_id do actually tries + # to subscribe an already depleted partition. + self._partitions_to_unsubscribe.add(partition_id) def deserialize_message(self, message, object_type=None): """ @@ -133,6 +154,11 @@ for msg in self._messages_to_commit: self.consumer.commit(message=msg) self._messages_to_commit.clear() + if self._partitions_to_unsubscribe: + partitions = list(self._partitions_to_unsubscribe) + self._partitions_to_unsubscribe.clear() + self.unsubscribe(partitions) + return nb_processed, at_eof