Page MenuHomeSoftware Heritage

D7463.diff
No OneTemporary

D7463.diff

diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py
--- a/swh/dataset/journalprocessor.py
+++ b/swh/dataset/journalprocessor.py
@@ -11,7 +11,18 @@
import multiprocessing
from pathlib import Path
import time
-from typing import Any, Container, Dict, List, Mapping, Optional, Sequence, Tuple, Type
+from typing import (
+ Any,
+ Container,
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+ Type,
+)
from confluent_kafka import Message, TopicPartition
import tqdm
@@ -56,6 +67,8 @@
self.refresh_every = refresh_every
self.assignment = assignment
self._messages_to_commit: List[Message] = []
+ self._partitions_to_unsubscribe: Set[int] = set()
+
self.count = None
self.topic_name: Optional[str] = None
kwargs["stop_on_eof"] = True # Stop when the assignment is empty
@@ -105,7 +118,15 @@
# offset will be committed after executing the worker_fn in
# process(); see handle_messages() below
self._messages_to_commit.append(message)
- self.unsubscribe([partition_id])
+ # delay the unsubcription to handle_messages() to prevent
+ # rdkakfa errors like
+ #
+ # rd_kafka_assignment_partition_stopped:
+ # Assertion `rktp->rktp_started' failed
+ #
+ # in case the unsubscription from parition_id do actually tries
+ # to subscribe an already depleted partition.
+ self._partitions_to_unsubscribe.add(partition_id)
def deserialize_message(self, message, object_type=None):
"""
@@ -133,6 +154,11 @@
for msg in self._messages_to_commit:
self.consumer.commit(message=msg)
self._messages_to_commit.clear()
+ if self._partitions_to_unsubscribe:
+ partitions = list(self._partitions_to_unsubscribe)
+ self._partitions_to_unsubscribe.clear()
+ self.unsubscribe(partitions)
+
return nb_processed, at_eof

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 1:34 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225263

Event Timeline