diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -57,6 +57,7 @@ self.assignment = assignment self.count = None self.topic_name = None + kwargs["stop_on_eof"] = True # Stop when the assignment is empty super().__init__(*args, **kwargs) def subscribe(self): @@ -73,8 +74,6 @@ self.handle_committed_offsets() if self.assignment: super().process(*args, **kwargs) - except EOFError: - pass finally: self.progress_queue.put(None) @@ -92,7 +91,6 @@ """ Check whether the client has reached the end of the current partition, and trigger a reassignment if that is the case. - Raise EOFError if all the partitions have reached the end. """ if offset < 0: # Uninitialized partition offset return @@ -107,9 +105,6 @@ ] self.subscribe() # Actually, unsubscribes from the partition_id - if not self.assignment: - raise EOFError - def deserialize_message(self, message): """ Override of the message deserialization to hook the handling of the