diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -71,7 +71,8 @@ self.count = 0 try: self.handle_committed_offsets() - super().process(*args, **kwargs) + if self.assignment: + super().process(*args, **kwargs) except EOFError: pass finally: @@ -104,7 +105,7 @@ self.assignment = [ pid for pid in self.assignment if pid != partition_id ] - self.subscribe() + self.subscribe() # Actually, unsubscribes from the partition_id if not self.assignment: raise EOFError @@ -175,7 +176,8 @@ def fetch_insert_partition_id(partition_id): tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) - self.offsets[partition_id] = (lo, hi) + if lo != hi: + self.offsets[partition_id] = (lo, hi) with concurrent.futures.ThreadPoolExecutor( max_workers=self.processes