diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py --- a/swh/graph/luigi.py +++ b/swh/graph/luigi.py @@ -137,6 +137,10 @@ output_directory = self.local_graph_path graph_name = "graph" + def progress_cb(percentage: int, step: webgraph.CompressionStep): + self.set_progress_percentage(percentage) + self.set_status_message(f"Running {step.name} (step #{step.value})") + start_date = datetime.datetime.now(tz=datetime.timezone.utc) webgraph.compress( graph_name, diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -13,7 +13,7 @@ import os from pathlib import Path import subprocess -from typing import Dict, List, Set +from typing import Callable, Dict, List, Set from swh.graph.config import check_config_compress @@ -323,6 +323,7 @@ out_dir: Path, steps: Set[CompressionStep] = set(COMP_SEQ), conf: Dict[str, str] = {}, + progress_cb: Callable[[int, CompressionStep], None] = lambda percentage, step: None, ): """graph compression pipeline driver from nodes/edges files to compressed on-disk representation @@ -348,6 +349,8 @@ virtual memory - tmp_dir: temporary directory, defaults to the "tmp" subdir of out_dir + progress_cb: a callable taking a percentage and step as argument, + which is called every time a step starts. """ if not steps: @@ -364,6 +367,7 @@ continue seq_no += 1 logger.info("Running compression step %s (%s/%s)", step, seq_no, len(steps)) + progress_cb(seq_no * 100 // len(steps), step) do_step(step, conf) compression_end_time = datetime.now() compression_duration = compression_end_time - compression_start_time