Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9340473
D6247.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Subscribers
None
D6247.diff
View Options
diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py
--- a/swh/dataset/journalprocessor.py
+++ b/swh/dataset/journalprocessor.py
@@ -12,9 +12,9 @@
import multiprocessing
from pathlib import Path
import time
-from typing import Any, Container, Dict, Mapping, Optional, Sequence, Tuple, Type
+from typing import Any, Container, Dict, List, Mapping, Optional, Sequence, Tuple, Type
-from confluent_kafka import TopicPartition
+from confluent_kafka import Message, TopicPartition
import tqdm
from swh.dataset.exporter import Exporter
@@ -55,6 +55,7 @@
self.progress_queue = progress_queue
self.refresh_every = refresh_every
self.assignment = assignment
+ self._messages_to_commit: List[Message] = []
self.count = None
self.topic_name: Optional[str] = None
kwargs["stop_on_eof"] = True # Stop when the assignment is empty
@@ -83,11 +84,14 @@
finally:
self.progress_queue.put(None)
- def handle_offset(self, partition_id, offset):
+ def handle_offset(self, message):
"""
Check whether the client has reached the end of the current
partition, and trigger a reassignment if that is the case.
"""
+ offset = message.offset()
+ partition_id = message.partition()
+
if offset < 0: # Uninitialized partition offset
return
@@ -97,6 +101,10 @@
if offset >= self.offset_ranges[partition_id][1] - 1:
if partition_id in self.assignment:
self.progress_queue.put({partition_id: offset})
+ # unsubscribe from partition but make sure current message's
+ # offset will be committed after executing the worker_fn in
+ # process(); see handle_messages() below
+ self._messages_to_commit.append(message)
self.unsubscribe([partition_id])
def deserialize_message(self, message):
@@ -106,10 +114,27 @@
We also return the raw objects instead of deserializing them because we
will need the partition ID later.
"""
- self.handle_offset(message.partition(), message.offset())
+ self.handle_offset(message)
self.count += 1
return message
+ def handle_messages(self, messages, worker_fn):
+ """Override of the handle_messages() method to get a chance to commit messages.
+
+ Make sure messages properly handled by `worker_fn` (executed in
+ super()) do get committed in kafka even if their originating partition
+ has been desubscribed from.
+
+ This helps having a consistent view of the consumption of each
+ partition at the end of the export process (EOF).
+
+ """
+ nb_processed, at_eof = super().handle_messages(messages, worker_fn)
+ for msg in self._messages_to_commit:
+ self.consumer.commit(message=msg)
+ self._messages_to_commit.clear()
+ return nb_processed, at_eof
+
class ParallelJournalProcessor:
"""
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 10:43 AM (4 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225134
Attached To
D6247: Commit kafka messages which offset has reach the high limit
Event Timeline
Log In to Comment