Page MenuHomeSoftware Heritage

D6235.id22557.diff
No OneTemporary

D6235.id22557.diff

diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py
--- a/swh/dataset/journalprocessor.py
+++ b/swh/dataset/journalprocessor.py
@@ -12,9 +12,9 @@
import multiprocessing
from pathlib import Path
import time
-from typing import Any, Dict, Mapping, Sequence, Tuple, Type
+from typing import Any, Dict, List, Mapping, Sequence, Tuple, Type
-from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, TopicPartition
+from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, Message, TopicPartition
import tqdm
from swh.dataset.exporter import Exporter
@@ -56,6 +56,7 @@
self.progress_queue = progress_queue
self.refresh_every = refresh_every
self.assignment = assignment
+ self._messages_to_commit: List[Message] = []
self.count = None
self.topic_name = None
self.reset_offsets = reset_offsets
@@ -85,11 +86,14 @@
finally:
self.progress_queue.put(None)
- def handle_offset(self, partition_id, offset):
+ def handle_offset(self, message):
"""
Check whether the client has reached the end of the current
partition, and trigger a reassignment if that is the case.
"""
+ offset = message.offset()
+ partition_id = message.partition()
+
if offset < 0: # Uninitialized partition offset
return
@@ -99,6 +103,10 @@
if offset >= self.offset_ranges[partition_id][1] - 1:
if partition_id in self.assignment:
self.progress_queue.put({partition_id: offset})
+ # unsubscribe from partition but make sure current message's
+ # offset will be committed after executing the worker_fn in
+ # process(); see handle_messages() below
+ self._messages_to_commit.append(message)
self.unsubscribe([partition_id])
def deserialize_message(self, message):
@@ -108,10 +116,17 @@
We also return the raw objects instead of deserializing them because we
will need the partition ID later.
"""
- self.handle_offset(message.partition(), message.offset())
+ self.handle_offset(message)
self.count += 1
return message
+ def handle_messages(self, messages, worker_fn):
+ nb_processed, at_eof = super().handle_messages(messages, worker_fn)
+ for msg in self._messages_to_commit:
+ self.consumer.commit(message=msg)
+ self._messages_to_commit.clear()
+ return nb_processed, at_eof
+
class ParallelJournalProcessor:
"""

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:31 PM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232838

Event Timeline