diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -68,25 +68,14 @@ [TopicPartition(self.topic_name, pid) for pid in self.assignment] ) - def process(self, *args, **kwargs): + def process(self, worker_fn): self.count = 0 try: - self.handle_committed_offsets() if self.assignment: - super().process(*args, **kwargs) + super().process(worker_fn) finally: self.progress_queue.put(None) - def handle_committed_offsets(self,): - """ - Handle already committed partition offsets before starting processing. - """ - committed = self.consumer.committed( - [TopicPartition(self.topic_name, pid) for pid in self.assignment] - ) - for tp in committed: - self.handle_offset(tp.partition, tp.offset) - def handle_offset(self, partition_id, offset): """ Check whether the client has reached the end of the current @@ -153,8 +142,12 @@ def get_offsets(self): """ - First pass to fetch all the current low and high offsets of each + Compute (lo, high) offset boundaries for all partitions. + + First pass to fetch all the current low and high watermark offsets of each partition to define the consumption boundaries. + + If available, use committed offsets as lo offset boundaries. """ if self.offsets is None: client = JournalClient( @@ -168,10 +161,23 @@ self.offsets = {} + # LOW watermark offset: The offset of the earliest message in the + # topic/partition. If no messages have been written to the topic, + # the low watermark offset is set to 0. The low watermark will also + # be 0 if one message has been written to the partition (with + # offset 0). + # HIGH watermark offset: the offset of the latest message in the + # topic/partition available for consumption + 1 + def fetch_insert_partition_id(partition_id): tp = TopicPartition(topic_name, partition_id) (lo, hi) = client.consumer.get_watermark_offsets(tp) - if lo != hi: + if hi > lo: + # import pdb; pdb.set_trace() + committed = client.consumer.committed([tp])[0] + + lo = max(lo, committed.offset) + if hi > lo: self.offsets[partition_id] = (lo, hi) with concurrent.futures.ThreadPoolExecutor( @@ -184,6 +190,7 @@ desc=" - Partition offsets", ) ) + client.close() return self.offsets def run(self):