Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/luigi.py
Show First 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | |||||
Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed | Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed | ||||
when downloading. | when downloading. | ||||
The layout is otherwise the same as the file layout. | The layout is otherwise the same as the file layout. | ||||
""" | """ | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
from typing import List | from pathlib import Path | ||||
from typing import Dict, List | |||||
import luigi | import luigi | ||||
from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter | from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter | ||||
class CompressGraph(luigi.Task): | class CompressGraph(luigi.Task): | ||||
local_export_path = luigi.PathParameter(significant=False) | local_export_path = luigi.PathParameter(significant=False) | ||||
▲ Show 20 Lines • Show All 344 Lines • ▼ Show 20 Lines | def requires(self) -> List[luigi.Task]: | ||||
) | ) | ||||
def output(self) -> List[luigi.Target]: | def output(self) -> List[luigi.Target]: | ||||
"""Returns stamp and meta paths on the local filesystem.""" | """Returns stamp and meta paths on the local filesystem.""" | ||||
return [self._meta()] | return [self._meta()] | ||||
def _meta(self): | def _meta(self): | ||||
return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") | return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") | ||||
class TopoSort(luigi.Task): | |||||
"""Creates a file that contains all SWHIDs in topological order from a compressed | |||||
graph.""" | |||||
local_graph_path = luigi.PathParameter() | |||||
topological_order_path = luigi.PathParameter() | |||||
graph_name = luigi.Parameter(default="graph") | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns an instance of :class:`LocalGraph`.""" | |||||
return [LocalGraph(local_graph_path=self.local_graph_path)] | |||||
def output(self) -> luigi.Target: | |||||
""".csv.zst file that contains the topological order.""" | |||||
return luigi.LocalTarget(self.topological_order_path) | |||||
def run(self) -> None: | |||||
"""Runs org.softwareheritage.graph.utils.TopoSort and compresses""" | |||||
import os | |||||
import subprocess | |||||
from .config import check_config | |||||
conf: Dict = {} # TODO: configurable | |||||
conf = check_config(conf) | |||||
env = { | |||||
**os.environ.copy(), | |||||
"JAVA_TOOL_OPTIONS": conf["java_tool_options"], | |||||
"CLASSPATH": conf["classpath"], | |||||
} | |||||
tmp_path = Path(f"{self.topological_order_path}.tmp") | |||||
object_types = "rev,rel,snp,ori" | |||||
class_name = "org.softwareheritage.graph.utils.TopoSort" | |||||
script = f""" | |||||
java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ | |||||
| pv --line-mode --wait \ | |||||
| zstdmt -19 \ | |||||
> '{tmp_path}' | |||||
""" | |||||
subprocess.run(["bash", "-c", script], env=env, check=True) | |||||
# Atomically write the output file | |||||
tmp_path.replace(self.topological_order_path) |