Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/webgraph.py
Show All 11 Lines | |||||
import logging | import logging | ||||
import os | import os | ||||
from pathlib import Path | from pathlib import Path | ||||
import subprocess | import subprocess | ||||
from typing import Dict, List, Set | from typing import Dict, List, Set | ||||
from swh.graph.config import check_config_compress | from swh.graph.config import check_config_compress | ||||
logger = logging.getLogger(__name__) | |||||
class CompressionStep(Enum): | class CompressionStep(Enum): | ||||
EXTRACT_NODES = 1 | EXTRACT_NODES = 1 | ||||
MPH = 2 | MPH = 2 | ||||
BV = 3 | BV = 3 | ||||
BFS = 4 | BFS = 4 | ||||
PERMUTE_BFS = 5 | PERMUTE_BFS = 5 | ||||
TRANSPOSE_BFS = 6 | TRANSPOSE_BFS = 6 | ||||
▲ Show 20 Lines • Show All 229 Lines • ▼ Show 20 Lines | CompressionStep.CLEAN_TMP: [ | ||||
"{out_dir}/{graph_name}-bfs.properties", | "{out_dir}/{graph_name}-bfs.properties", | ||||
"{out_dir}/{graph_name}-llp.order", | "{out_dir}/{graph_name}-llp.order", | ||||
"{tmp_dir}", | "{tmp_dir}", | ||||
], | ], | ||||
} | } | ||||
def do_step(step, conf): | def do_step(step, conf): | ||||
cmd = " ".join(STEP_ARGV[step]).format(**conf) | log_dir = Path(conf["out_dir"]) / "logs" | ||||
log_dir.mkdir(exist_ok=True) | |||||
olasd: I would call that `f"steps.{step.name}"` to make it clear that this is a separate logger… | |||||
step_logger = logger.getChild(f"steps.{step.name.lower()}") | |||||
step_handler = logging.FileHandler( | |||||
log_dir | |||||
/ ( | |||||
f"{conf['graph_name']}" | |||||
f"-{int(datetime.now().timestamp() * 1000)}" | |||||
f"-{str(step).lower()}.log" | |||||
) | |||||
) | |||||
Not Done Inline ActionsThat format doesn't have %(asctime)s, so I'd use a more specific one. Probably something along the lines of "%(asctime)s %(levelname)s(%(func)s) %(message)s" (which drops the redundant %(name)s too, as you have one file per step). olasd: That format doesn't have `%(asctime)s`, so I'd use a more specific one. Probably something… | |||||
step_logger.addHandler(step_handler) | |||||
step_start_time = datetime.now() | |||||
step_logger.info("Starting compression step %s at %s", step, step_start_time) | |||||
cmd = " ".join(STEP_ARGV[step]).format(**conf) | |||||
cmd_env = os.environ.copy() | cmd_env = os.environ.copy() | ||||
cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"] | cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"] | ||||
cmd_env["CLASSPATH"] = conf["classpath"] | cmd_env["CLASSPATH"] = conf["classpath"] | ||||
logging.info(f"running: {cmd}") | |||||
process = subprocess.Popen( | process = subprocess.Popen( | ||||
["/bin/bash", "-c", cmd], | ["/bin/bash", "-c", cmd], | ||||
env=cmd_env, | env=cmd_env, | ||||
encoding="utf8", | encoding="utf8", | ||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
stderr=subprocess.STDOUT, | stderr=subprocess.STDOUT, | ||||
) | ) | ||||
step_logger.info("Running: %s", cmd) | |||||
with process.stdout as stdout: | with process.stdout as stdout: | ||||
for line in stdout: | for line in stdout: | ||||
logging.info(line.rstrip()) | step_logger.info(line.rstrip()) | ||||
rc = process.wait() | rc = process.wait() | ||||
if rc != 0: | if rc != 0: | ||||
raise RuntimeError( | raise RuntimeError(f"Compression step {step} returned non-zero exit code {rc}") | ||||
f"compression step {step} returned non-zero " f"exit code {rc}" | step_end_time = datetime.now() | ||||
step_duration = step_end_time - step_start_time | |||||
step_logger.info( | |||||
"Compression step %s finished at %s (in %s)", | |||||
step, | |||||
step_end_time, | |||||
step_duration, | |||||
) | ) | ||||
else: | step_logger.removeHandler(step_handler) | ||||
step_handler.close() | |||||
return rc | return rc | ||||
Not Done Inline ActionsYou should add step_logger.removeHandler(step_handler) and step_handler.close() when this function ends to avoid keeping the per-step logfiles open until the logging module tears down. olasd: You should add `step_logger.removeHandler(step_handler)` and `step_handler.close()` when this… | |||||
def compress( | def compress( | ||||
graph_name: str, | graph_name: str, | ||||
in_dir: Path, | in_dir: Path, | ||||
out_dir: Path, | out_dir: Path, | ||||
steps: Set[CompressionStep] = set(COMP_SEQ), | steps: Set[CompressionStep] = set(COMP_SEQ), | ||||
conf: Dict[str, str] = {}, | conf: Dict[str, str] = {}, | ||||
Show All 25 Lines | ): | ||||
""" | """ | ||||
if not steps: | if not steps: | ||||
steps = set(COMP_SEQ) | steps = set(COMP_SEQ) | ||||
conf = check_config_compress(conf, graph_name, in_dir, out_dir) | conf = check_config_compress(conf, graph_name, in_dir, out_dir) | ||||
compression_start_time = datetime.now() | compression_start_time = datetime.now() | ||||
logging.info(f"starting compression at {compression_start_time}") | logger.info("Starting compression at %s", compression_start_time) | ||||
seq_no = 0 | seq_no = 0 | ||||
for step in COMP_SEQ: | for step in COMP_SEQ: | ||||
if step not in steps: | if step not in steps: | ||||
logging.debug(f"skipping compression step {step}") | logger.debug("Skipping compression step %s", step) | ||||
continue | continue | ||||
seq_no += 1 | seq_no += 1 | ||||
step_start_time = datetime.now() | logger.info("Running compression step %s (%s/%s)", step, seq_no, len(steps)) | ||||
logging.info( | |||||
f"starting compression step {step} " | |||||
f"({seq_no}/{len(steps)}) at {step_start_time}" | |||||
) | |||||
do_step(step, conf) | do_step(step, conf) | ||||
step_end_time = datetime.now() | |||||
step_duration = step_end_time - step_start_time | |||||
logging.info( | |||||
f"completed compression step {step} " | |||||
f"({seq_no}/{len(steps)}) " | |||||
f"at {step_end_time} in {step_duration}" | |||||
) | |||||
compression_end_time = datetime.now() | compression_end_time = datetime.now() | ||||
compression_duration = compression_end_time - compression_start_time | compression_duration = compression_end_time - compression_start_time | ||||
logging.info(f"completed compression in {compression_duration}") | logger.info("Completed compression in %s", compression_duration) |
I would call that f"steps.{step.name}" to make it clear that this is a separate logger hierarchy for steps.