Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345788
D6235.id22557.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Subscribers
None
D6235.id22557.diff
View Options
diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py
--- a/swh/dataset/journalprocessor.py
+++ b/swh/dataset/journalprocessor.py
@@ -12,9 +12,9 @@
import multiprocessing
from pathlib import Path
import time
-from typing import Any, Dict, Mapping, Sequence, Tuple, Type
+from typing import Any, Dict, List, Mapping, Sequence, Tuple, Type
-from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, TopicPartition
+from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, Message, TopicPartition
import tqdm
from swh.dataset.exporter import Exporter
@@ -56,6 +56,7 @@
self.progress_queue = progress_queue
self.refresh_every = refresh_every
self.assignment = assignment
+ self._messages_to_commit: List[Message] = []
self.count = None
self.topic_name = None
self.reset_offsets = reset_offsets
@@ -85,11 +86,14 @@
finally:
self.progress_queue.put(None)
- def handle_offset(self, partition_id, offset):
+ def handle_offset(self, message):
"""
Check whether the client has reached the end of the current
partition, and trigger a reassignment if that is the case.
"""
+ offset = message.offset()
+ partition_id = message.partition()
+
if offset < 0: # Uninitialized partition offset
return
@@ -99,6 +103,10 @@
if offset >= self.offset_ranges[partition_id][1] - 1:
if partition_id in self.assignment:
self.progress_queue.put({partition_id: offset})
+ # unsubscribe from partition but make sure current message's
+ # offset will be committed after executing the worker_fn in
+ # process(); see handle_messages() below
+ self._messages_to_commit.append(message)
self.unsubscribe([partition_id])
def deserialize_message(self, message):
@@ -108,10 +116,17 @@
We also return the raw objects instead of deserializing them because we
will need the partition ID later.
"""
- self.handle_offset(message.partition(), message.offset())
+ self.handle_offset(message)
self.count += 1
return message
+ def handle_messages(self, messages, worker_fn):
+ nb_processed, at_eof = super().handle_messages(messages, worker_fn)
+ for msg in self._messages_to_commit:
+ self.consumer.commit(message=msg)
+ self._messages_to_commit.clear()
+ return nb_processed, at_eof
+
class ParallelJournalProcessor:
"""
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:31 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232838
Attached To
D6235: Commit kafka messages wich offset has reach the high limit
Event Timeline
Log In to Comment