diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -7,6 +7,7 @@ import concurrent.futures from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor import contextlib +import json import logging import multiprocessing from pathlib import Path @@ -285,7 +286,10 @@ ) futures.append(pool.submit(self.progress_worker, queue=q)) + # Run processes until they all complete, or an error occurs concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION) + + # Propagate potential exceptions for f in futures: if f.running(): continue @@ -318,6 +322,13 @@ ) pbar.update(progress - pbar.n) + # Write final consumer offsets to a save file + ( + self.node_sets_path + / self.obj_type + / f"offsets-final-{int(time.time())}.json" + ).write_text(json.dumps(d)) + def export_worker(self, assignment, progress_queue): worker = JournalProcessorWorker( self.config,