diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py
index 4e0e399..5986ec3 100644
--- a/swh/graph/luigi.py
+++ b/swh/graph/luigi.py
@@ -1,177 +1,181 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks
===========
This module contains `Luigi `_ tasks,
as an alternative to the CLI that can be composed with other tasks,
such as swh-dataset's.
Unlike the CLI, this requires the graph to be named `graph`.
File layout
-----------
In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`,
:file:`graph.mph`, ...), tasks in this module produce this directory structure::
swh_[_]/
graph.graph
graph.mph
...
meta/
export.json
compression.json
``graph.meta/export.json`` is copied from the ORC dataset exported by
:mod:`swh.dataset.luigi`.
``graph.meta/compression.json`` contains information about the compression itself,
for provenance tracking.
For example:
.. code-block:: json
[
{
"steps": null,
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"object_type": [
"origin",
"origin_visit"
],
"hostname": "desktop5",
"conf": {},
"tool": {
"name": "swh.graph",
"version": "2.2.0"
}
}
]
When the compression pipeline is run in separate steps, each of the steps is recorded
as an object in the root list.
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from typing import List
import luigi
from swh.dataset.luigi import Format, LocalExport, ObjectType
class CompressGraph(luigi.Task):
local_export_path = luigi.PathParameter()
local_graph_path = luigi.PathParameter()
batch_size = luigi.IntParameter(
default=0,
significant=False,
description="""
Size of work batches to use while compressing.
Larger is faster, but consumes more resources.
""",
)
object_types = list(ObjectType)
# To make this configurable, we could use this:
# object_types = luigi.EnumListParameter(
# enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
# )
# then use swh.dataset.luigi._export_metadata_has_object_types to check in
# .meta/export.json that all objects are present before skipping the task
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`LocalExport` task."""
return [
LocalExport(
local_export_path=self.local_export_path,
formats=[Format.orc], # type: ignore[attr-defined]
object_types=self.object_types,
)
]
def output(self) -> List[luigi.LocalTarget]:
"""Returns the ``meta/*.json`` targets"""
return [self._export_meta(), self._compression_meta()]
def _export_meta(self) -> luigi.Target:
"""Returns the metadata on the dataset export"""
return luigi.LocalTarget(self.local_graph_path / "meta/export.json")
def _compression_meta(self) -> luigi.Target:
"""Returns the metadata on the compression pipeline"""
return luigi.LocalTarget(self.local_graph_path / "meta/compression.json")
def run(self):
"""Runs the full compression pipeline, then writes :file:`meta/compression.json`
This does not support running individual steps yet."""
import datetime
import json
import socket
import pkg_resources
from swh.graph import webgraph
conf = {} # TODO: make this configurable
steps = None # TODO: make this configurable
if self.batch_size:
conf["batch_size"] = self.batch_size
# Delete stamps. Otherwise interrupting this compression pipeline may leave
# stamps from a previous successful compression
if self._export_meta().exists():
self._export_meta().remove()
if self._compression_meta().exists():
self._compression_meta().remove()
output_directory = self.local_graph_path
graph_name = "graph"
+ def progress_cb(percentage: int, step: webgraph.CompressionStep):
+ self.set_progress_percentage(percentage)
+ self.set_status_message(f"Running {step.name} (step #{step.value})")
+
start_date = datetime.datetime.now(tz=datetime.timezone.utc)
webgraph.compress(
graph_name,
self.local_export_path / "orc",
output_directory,
steps,
conf,
)
end_date = datetime.datetime.now(tz=datetime.timezone.utc)
# Copy dataset export metadata
with self._export_meta().open("w") as write_fd:
with (self.local_export_path / "meta" / "export.json").open() as read_fd:
write_fd.write(read_fd.read())
# Append metadata about this compression pipeline
if self._compression_meta().exists():
with self._compression_meta().open("w") as fd:
meta = json.load(fd)
else:
meta = []
meta.append(
{
"steps": steps,
"compression_start": start_date.isoformat(),
"compression_end": end_date.isoformat(),
"object_type": [object_type.name for object_type in self.object_types],
"hostname": socket.getfqdn(),
"conf": conf,
"tool": {
"name": "swh.graph",
"version": pkg_resources.get_distribution("swh.graph").version,
},
}
)
with self._compression_meta().open("w") as fd:
json.dump(meta, fd, indent=4)
diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py
index c188b83..bf74dca 100644
--- a/swh/graph/webgraph.py
+++ b/swh/graph/webgraph.py
@@ -1,370 +1,374 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""WebGraph driver
"""
from datetime import datetime
from enum import Enum
import logging
import os
from pathlib import Path
import subprocess
-from typing import Dict, List, Set
+from typing import Callable, Dict, List, Set
from swh.graph.config import check_config_compress
logger = logging.getLogger(__name__)
class CompressionStep(Enum):
EXTRACT_NODES = 1
MPH = 2
BV = 3
BFS = 4
PERMUTE_BFS = 5
TRANSPOSE_BFS = 6
SIMPLIFY = 7
LLP = 8
PERMUTE_LLP = 9
OBL = 10
COMPOSE_ORDERS = 11
STATS = 12
TRANSPOSE = 13
TRANSPOSE_OBL = 14
MAPS = 15
EXTRACT_PERSONS = 16
MPH_PERSONS = 17
NODE_PROPERTIES = 18
MPH_LABELS = 19
FCL_LABELS = 20
EDGE_LABELS = 21
EDGE_LABELS_OBL = 22
EDGE_LABELS_TRANSPOSE_OBL = 23
CLEAN_TMP = 24
def __str__(self):
return self.name
# full compression pipeline
COMP_SEQ = list(CompressionStep)
# Mapping from compression steps to shell commands implementing them. Commands
# will be executed by the shell, so be careful with meta characters. They are
# specified here as lists of tokens that will be joined together only for ease
# of line splitting. In commands, {tokens} will be interpolated with
# configuration values, see :func:`compress`.
STEP_ARGV: Dict[CompressionStep, List[str]] = {
CompressionStep.EXTRACT_NODES: [
"{java}",
"org.softwareheritage.graph.compress.ExtractNodes",
"--format",
"orc",
"--temp-dir",
"{tmp_dir}",
"{in_dir}",
"{out_dir}/{graph_name}",
],
CompressionStep.MPH: [
"{java}",
"it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction",
"--byte-array",
"--temp-dir",
"{tmp_dir}",
"--decompressor",
"com.github.luben.zstd.ZstdInputStream",
"{out_dir}/{graph_name}.mph",
"{out_dir}/{graph_name}.nodes.csv.zst",
],
CompressionStep.BV: [
"{java}",
"org.softwareheritage.graph.compress.ScatteredArcsORCGraph",
"--temp-dir",
"{tmp_dir}",
"--function",
"{out_dir}/{graph_name}.mph",
"{in_dir}",
"{out_dir}/{graph_name}-base",
],
CompressionStep.BFS: [
"{java}",
"it.unimi.dsi.law.big.graph.BFS",
"{out_dir}/{graph_name}-base",
"{out_dir}/{graph_name}-bfs.order",
],
CompressionStep.PERMUTE_BFS: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"mapOffline",
"{out_dir}/{graph_name}-base",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs.order",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.TRANSPOSE_BFS: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"transposeOffline",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs-transposed",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.SIMPLIFY: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"simplify",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs-transposed",
"{out_dir}/{graph_name}-bfs-simplified",
],
CompressionStep.LLP: [
"{java}",
"it.unimi.dsi.law.big.graph.LayeredLabelPropagation",
"-g",
"{llp_gammas}",
"{out_dir}/{graph_name}-bfs-simplified",
"{out_dir}/{graph_name}-llp.order",
],
CompressionStep.PERMUTE_LLP: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"mapOffline",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}",
"{out_dir}/{graph_name}-llp.order",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.BVGraph",
"--list",
"{out_dir}/{graph_name}",
],
CompressionStep.COMPOSE_ORDERS: [
"{java}",
"org.softwareheritage.graph.compress.ComposePermutations",
"{out_dir}/{graph_name}-bfs.order",
"{out_dir}/{graph_name}-llp.order",
"{out_dir}/{graph_name}.order",
],
CompressionStep.STATS: [
"{java}",
"it.unimi.dsi.big.webgraph.Stats",
"{out_dir}/{graph_name}",
],
CompressionStep.TRANSPOSE: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"transposeOffline",
"{out_dir}/{graph_name}",
"{out_dir}/{graph_name}-transposed",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.TRANSPOSE_OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.BVGraph",
"--list",
"{out_dir}/{graph_name}-transposed",
],
CompressionStep.MAPS: [
"{java}",
"org.softwareheritage.graph.compress.NodeMapBuilder",
"{out_dir}/{graph_name}",
"{tmp_dir}",
"< {out_dir}/{graph_name}.nodes.csv.zst",
],
CompressionStep.EXTRACT_PERSONS: [
"{java}",
"org.softwareheritage.graph.compress.ExtractPersons",
"--temp-dir",
"{tmp_dir}",
"{in_dir}",
"{out_dir}/{graph_name}",
],
CompressionStep.MPH_PERSONS: [
"{java}",
"it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction",
"--byte-array",
"--decompressor",
"com.github.luben.zstd.ZstdInputStream",
"--temp-dir",
"{tmp_dir}",
"{out_dir}/{graph_name}.persons.mph",
"{out_dir}/{graph_name}.persons.csv.zst",
],
CompressionStep.NODE_PROPERTIES: [
"{java}",
"org.softwareheritage.graph.compress.WriteNodeProperties",
"{in_dir}",
"{out_dir}/{graph_name}",
],
CompressionStep.MPH_LABELS: [
"{java}",
"it.unimi.dsi.sux4j.mph.LcpMonotoneMinimalPerfectHashFunction",
"--byte-array",
"--temp-dir",
"{tmp_dir}",
"--decompressor",
"com.github.luben.zstd.ZstdInputStream",
"{out_dir}/{graph_name}.labels.mph",
"{out_dir}/{graph_name}.labels.csv.zst",
],
CompressionStep.FCL_LABELS: [
"{java}",
"it.unimi.dsi.big.util.MappedFrontCodedStringBigList",
"--decompressor",
"com.github.luben.zstd.ZstdInputStream",
"{out_dir}/{graph_name}.labels.fcl",
"< {out_dir}/{graph_name}.labels.csv.zst",
],
CompressionStep.EDGE_LABELS: [
"{java}",
"org.softwareheritage.graph.compress.LabelMapBuilder",
"--temp-dir",
"{tmp_dir}",
"{in_dir}",
"{out_dir}/{graph_name}",
],
CompressionStep.EDGE_LABELS_OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.labelling.BitStreamArcLabelledImmutableGraph",
"--list",
"{out_dir}/{graph_name}-labelled",
],
CompressionStep.EDGE_LABELS_TRANSPOSE_OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.labelling.BitStreamArcLabelledImmutableGraph",
"--list",
"{out_dir}/{graph_name}-transposed-labelled",
],
CompressionStep.CLEAN_TMP: [
"rm",
"-rf",
"{out_dir}/{graph_name}-base.graph",
"{out_dir}/{graph_name}-base.offsets",
"{out_dir}/{graph_name}-base.properties",
"{out_dir}/{graph_name}-bfs-simplified.graph",
"{out_dir}/{graph_name}-bfs-simplified.offsets",
"{out_dir}/{graph_name}-bfs-simplified.properties",
"{out_dir}/{graph_name}-bfs-transposed.graph",
"{out_dir}/{graph_name}-bfs-transposed.offsets",
"{out_dir}/{graph_name}-bfs-transposed.properties",
"{out_dir}/{graph_name}-bfs.graph",
"{out_dir}/{graph_name}-bfs.offsets",
"{out_dir}/{graph_name}-bfs.order",
"{out_dir}/{graph_name}-bfs.properties",
"{out_dir}/{graph_name}-llp.order",
"{tmp_dir}",
],
}
def do_step(step, conf):
log_dir = Path(conf["out_dir"]) / "logs"
log_dir.mkdir(exist_ok=True)
step_logger = logger.getChild(f"steps.{step.name.lower()}")
step_handler = logging.FileHandler(
log_dir
/ (
f"{conf['graph_name']}"
f"-{int(datetime.now().timestamp() * 1000)}"
f"-{str(step).lower()}.log"
)
)
step_logger.addHandler(step_handler)
step_start_time = datetime.now()
step_logger.info("Starting compression step %s at %s", step, step_start_time)
cmd = " ".join(STEP_ARGV[step]).format(**conf)
cmd_env = os.environ.copy()
cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"]
cmd_env["CLASSPATH"] = conf["classpath"]
process = subprocess.Popen(
["/bin/bash", "-c", cmd],
env=cmd_env,
encoding="utf8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
step_logger.info("Running: %s", cmd)
with process.stdout as stdout:
for line in stdout:
step_logger.info(line.rstrip())
rc = process.wait()
if rc != 0:
raise RuntimeError(f"Compression step {step} returned non-zero exit code {rc}")
step_end_time = datetime.now()
step_duration = step_end_time - step_start_time
step_logger.info(
"Compression step %s finished at %s (in %s)",
step,
step_end_time,
step_duration,
)
step_logger.removeHandler(step_handler)
step_handler.close()
return rc
def compress(
graph_name: str,
in_dir: Path,
out_dir: Path,
steps: Set[CompressionStep] = set(COMP_SEQ),
conf: Dict[str, str] = {},
+ progress_cb: Callable[[int, CompressionStep], None] = lambda percentage, step: None,
):
"""graph compression pipeline driver from nodes/edges files to compressed
on-disk representation
Args:
graph_name: graph base name, relative to in_dir
in_dir: input directory, where the uncompressed graph can be found
out_dir: output directory, where the compressed graph will be stored
steps: compression steps to run (default: all steps)
conf: compression configuration, supporting the following keys (all are
optional, so an empty configuration is fine and is the default)
- batch_size: batch size for `WebGraph transformations
`_;
defaults to 1 billion
- classpath: java classpath, defaults to swh-graph JAR only
- java: command to run java VM, defaults to "java"
- java_tool_options: value for JAVA_TOOL_OPTIONS environment
variable; defaults to various settings for high memory machines
- logback: path to a logback.xml configuration file; if not provided
a temporary one will be created and used
- max_ram: maximum RAM to use for compression; defaults to available
virtual memory
- tmp_dir: temporary directory, defaults to the "tmp" subdir of
out_dir
+ progress_cb: a callable taking a percentage and step as argument,
+ which is called every time a step starts.
"""
if not steps:
steps = set(COMP_SEQ)
conf = check_config_compress(conf, graph_name, in_dir, out_dir)
compression_start_time = datetime.now()
logger.info("Starting compression at %s", compression_start_time)
seq_no = 0
for step in COMP_SEQ:
if step not in steps:
logger.debug("Skipping compression step %s", step)
continue
seq_no += 1
logger.info("Running compression step %s (%s/%s)", step, seq_no, len(steps))
+ progress_cb(seq_no * 100 // len(steps), step)
do_step(step, conf)
compression_end_time = datetime.now()
compression_duration = compression_end_time - compression_start_time
logger.info("Completed compression in %s", compression_duration)