diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -89,6 +89,7 @@ if offset >= self.offset_ranges[partition_id][1] - 1: if partition_id in self.assignment: + self.progress_queue.put({partition_id: offset}) self.assignment = [ pid for pid in self.assignment if pid != partition_id ] @@ -204,7 +205,7 @@ offsets = self.get_offsets() to_assign = list(offsets.keys()) if not to_assign: - print(" - Journal export ({self.obj_type}): skipped (nothing to export)") + print(" - Export ({self.obj_type}): skipped (nothing to export)") return manager = multiprocessing.Manager() q = manager.Queue() @@ -240,7 +241,7 @@ d = {} active_workers = self.processes offset_diff = sum((hi - lo) for lo, hi in self.offsets.values()) - desc = f" - Journal export ({self.obj_type})" + desc = f" - Export ({self.obj_type})" with tqdm.tqdm(total=offset_diff, desc=desc) as pbar: while active_workers: item = queue.get() @@ -248,10 +249,8 @@ active_workers -= 1 continue d.update(item) - progress = sum(n - self.offsets[p][0] for p, n in d.items()) - pbar.set_postfix( - active_workers=active_workers, total_workers=self.processes - ) + progress = sum(n + 1 - self.offsets[p][0] for p, n in d.items()) + pbar.set_postfix(workers=f"{active_workers}/{self.processes}",) pbar.update(progress - pbar.n) def export_worker(self, assignment, progress_queue):