diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -12,9 +12,9 @@ import multiprocessing from pathlib import Path import time -from typing import Any, Container, Dict, Mapping, Optional, Sequence, Tuple, Type +from typing import Any, Container, Dict, List, Mapping, Optional, Sequence, Tuple, Type -from confluent_kafka import TopicPartition +from confluent_kafka import Message, TopicPartition import tqdm from swh.dataset.exporter import Exporter @@ -55,6 +55,7 @@ self.progress_queue = progress_queue self.refresh_every = refresh_every self.assignment = assignment + self._messages_to_commit: List[Message] = [] self.count = None self.topic_name: Optional[str] = None kwargs["stop_on_eof"] = True # Stop when the assignment is empty @@ -83,11 +84,14 @@ finally: self.progress_queue.put(None) - def handle_offset(self, partition_id, offset): + def handle_offset(self, message): """ Check whether the client has reached the end of the current partition, and trigger a reassignment if that is the case. """ + offset = message.offset() + partition_id = message.partition() + if offset < 0: # Uninitialized partition offset return @@ -97,6 +101,10 @@ if offset >= self.offset_ranges[partition_id][1] - 1: if partition_id in self.assignment: self.progress_queue.put({partition_id: offset}) + # unsubscribe from partition but make sure current message's + # offset will be committed after executing the worker_fn in + # process(); see handle_messages() below + self._messages_to_commit.append(message) self.unsubscribe([partition_id]) def deserialize_message(self, message): @@ -106,10 +114,27 @@ We also return the raw objects instead of deserializing them because we will need the partition ID later. """ - self.handle_offset(message.partition(), message.offset()) + self.handle_offset(message) self.count += 1 return message + def handle_messages(self, messages, worker_fn): + """Override of the handle_messages() method to get a chance to commit messages. + + Make sure messages properly handled by `worker_fn` (executed in + super()) do get committed in kafka even if their originating partition + has been desubscribed from. + + This helps having a consistent view of the consumption of each + partition at the end of the export process (EOF). + + """ + nb_processed, at_eof = super().handle_messages(messages, worker_fn) + for msg in self._messages_to_commit: + self.consumer.commit(message=msg) + self._messages_to_commit.clear() + return nb_processed, at_eof + class ParallelJournalProcessor: """