diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -89,6 +89,7 @@ if offset >= self.offset_ranges[partition_id][1] - 1: if partition_id in self.assignment: + self.progress_queue.put({partition_id: offset}) self.assignment = [ pid for pid in self.assignment if pid != partition_id ] @@ -234,7 +235,7 @@ d = {} active_workers = self.processes offset_diff = sum((hi - lo) for lo, hi in self.offsets.values()) - desc = f" - Journal export ({self.obj_type})" + desc = f" - Export ({self.obj_type})" with tqdm.tqdm(total=offset_diff, desc=desc) as pbar: while active_workers: item = queue.get() @@ -242,10 +243,8 @@ active_workers -= 1 continue d.update(item) - progress = sum(n - self.offsets[p][0] for p, n in d.items()) - pbar.set_postfix( - active_workers=active_workers, total_workers=self.processes - ) + progress = sum(n + 1 - self.offsets[p][0] for p, n in d.items()) + pbar.set_postfix(workers=f"{active_workers}/{self.processes}",) pbar.update(progress - pbar.n) def export_worker(self, assignment, progress_queue):