diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -12,9 +12,9 @@ import multiprocessing from pathlib import Path import time -from typing import Any, Dict, Mapping, Sequence, Tuple, Type +from typing import Any, Dict, List, Mapping, Sequence, Tuple, Type -from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, TopicPartition +from confluent_kafka import OFFSET_BEGINNING, OFFSET_STORED, Message, TopicPartition import tqdm from swh.dataset.exporter import Exporter @@ -56,6 +56,7 @@ self.progress_queue = progress_queue self.refresh_every = refresh_every self.assignment = assignment + self._messages_to_commit: List[Message] = [] self.count = None self.topic_name = None self.reset_offsets = reset_offsets @@ -85,11 +86,14 @@ finally: self.progress_queue.put(None) - def handle_offset(self, partition_id, offset): + def handle_offset(self, message): """ Check whether the client has reached the end of the current partition, and trigger a reassignment if that is the case. """ + offset = message.offset() + partition_id = message.partition() + if offset < 0: # Uninitialized partition offset return @@ -99,6 +103,10 @@ if offset >= self.offset_ranges[partition_id][1] - 1: if partition_id in self.assignment: self.progress_queue.put({partition_id: offset}) + # unsubscribe from partition but make sure current message's + # offset will be committed after executing the worker_fn in + # process(); see handle_messages() below + self._messages_to_commit.append(message) self.unsubscribe([partition_id]) def deserialize_message(self, message): @@ -108,10 +116,17 @@ We also return the raw objects instead of deserializing them because we will need the partition ID later. """ - self.handle_offset(message.partition(), message.offset()) + self.handle_offset(message) self.count += 1 return message + def handle_messages(self, messages, worker_fn): + nb_processed, at_eof = super().handle_messages(messages, worker_fn) + for msg in self._messages_to_commit: + self.consumer.commit(message=msg) + self._messages_to_commit.clear() + return nb_processed, at_eof + class ParallelJournalProcessor: """