diff --git a/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java b/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java new file mode 100644 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2022 The Software Heritage developers + * See the AUTHORS file at the top-level directory of this distribution + * License: GNU General Public License version 3, or any later version + * See top-level LICENSE file for more information + */ + +package org.softwareheritage.graph.utils; + +import com.martiansoftware.jsap.*; +import it.unimi.dsi.big.webgraph.LazyLongIterator; +import it.unimi.dsi.big.webgraph.NodeIterator; +import org.softwareheritage.graph.*; + +import java.io.IOException; +import java.util.*; + +/* Lists all nodes nodes of the types given as argument, in topological order, + * from leaves (contents, if selected) to the top (origins, if selected). + * + * This uses a DFS, so nodes are likely to be close to their neighbors. + * + * Some extra information is provided to allow more efficient consumption + * of the output: number of ancestors, successors, and a sample of two ancestors. + * + * Sample invocation: + * + * $ java -cp ~/swh-environment/swh-graph/java/target/swh-graph-*.jar -Xmx1000G -XX:PretenureSizeThreshold=512M -XX:MaxNewSize=4G -XX:+UseLargePages -XX:+UseTransparentHugePages -XX:+UseNUMA -XX:+UseTLAB -XX:+ResizeTLAB org.softwareheritage.graph.utils.TopoSort /dev/shm/swh-graph/default/graph 'rev,rel,snp,ori' \ + * | pv --line-mode --wait \ + * | zstdmt \ + * > /poolswh/softwareheritage/vlorentz/2022-04-25_toposort_rev,rel,snp,ori.txt.zst + */ + +public class TopoSort { + private Subgraph graph; + private Subgraph transposedGraph; + + public static void main(String[] args) throws IOException, ClassNotFoundException { + if (args.length != 2) { + System.err.println("Syntax: java org.softwareheritage.graph.utils.TopoSort "); + System.exit(1); + } + String graphPath = args[0]; + String nodeTypes = args[1]; + + TopoSort toposort = new TopoSort(); + + toposort.load_graph(graphPath, nodeTypes); + toposort.toposortDFS(); + } + + public void load_graph(String graphBasename, String nodeTypes) throws IOException { + System.err.println("Loading graph " + graphBasename + " ..."); + var underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename); + System.err.println("Selecting subgraphs."); + graph = new Subgraph(underlyingGraph, new AllowedNodes(nodeTypes)); + transposedGraph = graph.transpose(); + System.err.println("Graph loaded."); + } + + /* Prints nodes in topological order, based on a DFS. */ + public void toposortDFS() { + HashSet visited = new HashSet(); + Stack ready = new Stack<>(); + + /* First, push all leaves to the stack */ + System.err.println("Listing leaves."); + long total_nodes = 0; + NodeIterator nodeIterator = graph.nodeIterator(); + for (long currentNodeId = nodeIterator.nextLong(); nodeIterator + .hasNext(); currentNodeId = nodeIterator.nextLong()) { + total_nodes++; + long firstSuccessor = graph.successors(currentNodeId).nextLong(); + if (firstSuccessor != -1) { + /* The node has ancestor, so it is not a leaf. */ + continue; + } + ready.push(currentNodeId); + if (ready.size() % 10000000 == 0) { + float ready_size_f = ready.size(); + float total_nodes_f = total_nodes; + System.err.printf("Listed %.02f B leaves (out of %.02f B nodes)\n", ready_size_f / 1000000000., + total_nodes_f / 1000000000.); + } + } + System.err.println("Leaves loaded, starting DFS."); + + System.out.format("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2\n"); + while (!ready.isEmpty()) { + long currentNodeId = ready.pop(); + visited.add(currentNodeId); + + /* Find its successors which are ready */ + LazyLongIterator successors = transposedGraph.successors(currentNodeId); + long successorCount = 0; + for (long successorNodeId; (successorNodeId = successors.nextLong()) != -1;) { + successorCount++; + LazyLongIterator successorAncestors = graph.successors(successorNodeId); + boolean isReady = true; + for (long successorAncestorNodeId; (successorAncestorNodeId = successorAncestors.nextLong()) != -1;) { + if (!visited.contains(successorAncestorNodeId)) { + /* + * This ancestor of the successor is not yet visited, so the ancestor is not ready. + */ + isReady = false; + break; + } + } + if (isReady) { + ready.push(successorNodeId); + } + } + + String[] sampleAncestors = {"", ""}; + long ancestorCount = 0; + LazyLongIterator ancestors = graph.successors(currentNodeId); + for (long ancestorNodeId; (ancestorNodeId = ancestors.nextLong()) != -1;) { + if (ancestorCount < sampleAncestors.length) { + sampleAncestors[(int) ancestorCount] = graph.getSWHID(ancestorNodeId).toString(); + } + ancestorCount++; + } + + /* + * Print the node + * + * TODO: print its depth too? + */ + SWHID currentNodeSWHID = graph.getSWHID(currentNodeId); + System.out.format("%s,%d,%d,%s,%s\n", currentNodeSWHID, ancestorCount, successorCount, sampleAncestors[0], + sampleAncestors[1]); + } + } +} diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -22,3 +22,6 @@ [mypy-pytest.*] ignore_missing_imports = True + +[mypy-tqdm.*] +ignore_missing_imports = True diff --git a/requirements-luigi.txt b/requirements-luigi.txt --- a/requirements-luigi.txt +++ b/requirements-luigi.txt @@ -1 +1,2 @@ luigi +tqdm diff --git a/requirements-swh-luigi.txt b/requirements-swh-luigi.txt --- a/requirements-swh-luigi.txt +++ b/requirements-swh-luigi.txt @@ -1 +1 @@ -swh.dataset[luigi] >= v0.3.2 +swh.dataset[luigi] >= v1.0.1 diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py --- a/swh/graph/luigi.py +++ b/swh/graph/luigi.py @@ -56,19 +56,33 @@ When the compression pipeline is run in separate steps, each of the steps is recorded as an object in the root list. + +S3 layout +--------- + +As ``.bin`` files are meant to be accessed randomly, they are uncompressed on disk. +However, this is undesirable on at-rest/long-term storage like on S3, because +some are very sparse (eg. :file:`graph.property.committer_timestamp.bin` can be +quickly compressed from 300 to 1GB). + +Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed +when downloading. + +The layout is otherwise the same as the file layout. """ # WARNING: do not import unnecessary things here to keep cli startup time under # control -from typing import List +from pathlib import Path +from typing import Dict, List import luigi -from swh.dataset.luigi import Format, LocalExport, ObjectType +from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter class CompressGraph(luigi.Task): - local_export_path = luigi.PathParameter() + local_export_path = luigi.PathParameter(significant=False) local_graph_path = luigi.PathParameter() batch_size = luigi.IntParameter( default=0, @@ -184,3 +198,281 @@ ) with self._compression_meta().open("w") as fd: json.dump(meta, fd, indent=4) + + +class UploadGraphToS3(luigi.Task): + """Uploads a local compressed graphto S3; creating automatically if it does + not exist. + + Example invocation:: + + luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 \ + --local-graph-path=graph/ \ + --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ + """ + + local_graph_path = luigi.PathParameter(significant=False) + s3_graph_path = S3PathParameter() + + def requires(self) -> List[luigi.Task]: + """Returns a :class:`CompressGraph` task that writes local files at the + expected location.""" + return [ + CompressGraph( + local_graph_path=self.local_graph_path, + ) + ] + + def output(self) -> List[luigi.Target]: + """Returns stamp and meta paths on S3.""" + return [self._meta()] + + def _meta(self): + import luigi.contrib.s3 + + return luigi.contrib.s3.S3Target(f"{self.s3_graph_path}/meta/compression.json") + + def run(self) -> None: + """Copies all files: first the graph itself, then :file:`meta/compression.json`.""" + import subprocess + import tempfile + + import luigi.contrib.s3 + import tqdm + + compression_metadata_path = self.local_graph_path / "meta" / "compression.json" + seen_compression_metadata = False + + client = luigi.contrib.s3.S3Client() + + # recursively copy local files to S3, and end with compression metadata + paths = list(self.local_graph_path.glob("**/*")) + for (i, path) in tqdm.tqdm( + list(enumerate(paths)), + desc="Uploading compressed graph", + ): + if path == compression_metadata_path: + # Write it last + seen_compression_metadata = True + continue + if path.is_dir(): + continue + relative_path = path.relative_to(self.local_graph_path) + self.set_progress_percentage(int(i * 100 / len(paths))) + + if path.suffix == ".bin": + # Large sparse file; store it compressed on S3. + with tempfile.NamedTemporaryFile( + prefix=path.stem, suffix=".bin.zst" + ) as fd: + self.set_status_message(f"Compressing {relative_path}") + subprocess.run( + ["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True + ) + self.set_status_message(f"Uploading {relative_path} (compressed)") + client.put_multipart( + fd.name, + f"{self.s3_graph_path}/{relative_path}.zst", + ACL="public-read", + ) + else: + self.set_status_message(f"Uploading {relative_path}") + client.put_multipart( + path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read" + ) + + assert ( + seen_compression_metadata + ), "did not see meta/compression.json in directory listing" + + # Write it last, to act as a stamp + client.put( + compression_metadata_path, + self._meta().path, + ACL="public-read", + ) + + +class DownloadGraphFromS3(luigi.Task): + """Downloads a local dataset graph from S3. + + This performs the inverse operation of :class:`UploadGraphToS3` + + Example invocation:: + + luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 \ + --local-graph-path=graph/ \ + --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ + """ + + local_graph_path = luigi.PathParameter() + s3_graph_path = S3PathParameter(significant=False) + + def requires(self) -> List[luigi.Task]: + """Returns a :class:`ExportGraph` task that writes local files at the + expected location.""" + return [ + UploadGraphToS3( + local_graph_path=self.local_graph_path, + s3_graph_path=self.s3_graph_path, + ) + ] + + def output(self) -> List[luigi.Target]: + """Returns stamp and meta paths on the local filesystem.""" + return [self._meta()] + + def _meta(self): + return luigi.LocalTarget(self.local_graph_path / "meta" / "export.json") + + def run(self) -> None: + """Copies all files: first the graph itself, then :file:`meta/compression.json`.""" + import subprocess + import tempfile + + import luigi.contrib.s3 + import tqdm + + client = luigi.contrib.s3.S3Client() + + compression_metadata_path = f"{self.s3_graph_path}/meta/compression.json" + seen_compression_metadata = False + + # recursively copy local files to S3, and end with compression metadata + files = list(client.list(self.s3_graph_path)) + for (i, file_) in tqdm.tqdm( + list(enumerate(files)), + desc="Downloading", + ): + if file_ == compression_metadata_path: + # Will copy it last + seen_compression_metadata = True + continue + self.set_progress_percentage(int(i * 100 / len(files))) + local_path = self.local_graph_path / file_ + local_path.parent.mkdir(parents=True, exist_ok=True) + if file_.endswith(".bin.zst"): + # The file was compressed before uploading to S3, we need it + # to be decompressed locally + with tempfile.NamedTemporaryFile( + prefix=local_path.stem, suffix=".bin.zst" + ) as fd: + self.set_status_message(f"Downloading {file_} (compressed)") + client.get( + f"{self.s3_graph_path}/{file_}", + fd.name, + ) + self.set_status_message(f"Decompressing {file_}") + subprocess.run( + ["zstdmt", "--force", "-d", fd.name, "-o", local_path], + check=True, + ) + else: + self.set_status_message(f"Downloading {file_}") + client.get( + f"{self.s3_graph_path}/{file_}", + str(local_path), + ) + + assert ( + seen_compression_metadata + ), "did not see meta/compression.json in directory listing" + + # Write it last, to act as a stamp + client.get( + compression_metadata_path, + self._meta().path, + ) + + +class LocalGraph(luigi.Task): + """Task that depends on a local dataset being present -- either directly from + :class:`ExportGraph` or via :class:`DownloadGraphFromS3`. + """ + + local_graph_path = luigi.PathParameter() + compression_task_type = luigi.TaskParameter( + default=DownloadGraphFromS3, + significant=False, + description="""The task used to get the compressed graph if it is not present. + Should be either ``swh.graph.luigi.CompressGraph`` or + ``swh.graph.luigi.DownloadGraphFromS3``.""", + ) + + def requires(self) -> List[luigi.Task]: + """Returns an instance of either :class:`CompressGraph` or + :class:`DownloadGraphFromS3` depending on the value of + :attr:`compression_task_type`.""" + + if issubclass(self.compression_task_type, CompressGraph): + return [ + CompressGraph( + local_graph_path=self.local_graph_path, + ) + ] + elif issubclass(self.compression_task_type, DownloadGraphFromS3): + return [ + DownloadGraphFromS3( + local_graph_path=self.local_graph_path, + ) + ] + else: + raise ValueError( + f"Unexpected compression_task_type: " + f"{self.compression_task_type.__name__}" + ) + + def output(self) -> List[luigi.Target]: + """Returns stamp and meta paths on the local filesystem.""" + return [self._meta()] + + def _meta(self): + return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") + + +class TopoSort(luigi.Task): + """Creates a file that contains all SWHIDs in topological order from a compressed + graph.""" + + local_graph_path = luigi.PathParameter() + topological_order_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + + def requires(self) -> List[luigi.Task]: + """Returns an instance of :class:`LocalGraph`.""" + return [LocalGraph(local_graph_path=self.local_graph_path)] + + def output(self) -> luigi.Target: + """.csv.zst file that contains the topological order.""" + return luigi.LocalTarget(self.topological_order_path) + + def run(self) -> None: + """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" + import os + import subprocess + + from .config import check_config + + conf: Dict = {} # TODO: configurable + + conf = check_config(conf) + env = { + **os.environ.copy(), + "JAVA_TOOL_OPTIONS": conf["java_tool_options"], + "CLASSPATH": conf["classpath"], + } + + tmp_path = Path(f"{self.topological_order_path}.tmp") + + object_types = "rev,rel,snp,ori" + class_name = "org.softwareheritage.graph.utils.TopoSort" + script = f""" + java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ + | pv --line-mode --wait \ + | zstdmt -19 \ + > '{tmp_path}' + """ + subprocess.run(["bash", "-c", script], env=env, check=True) + + # Atomically write the output file + tmp_path.replace(self.topological_order_path) diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py new file mode 100644 --- /dev/null +++ b/swh/graph/tests/test_toposort.py @@ -0,0 +1,58 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from pathlib import Path +import subprocess + +from swh.graph.luigi import TopoSort + +DATA_DIR = Path(__file__).parents[0] / "dataset" + + +EXPECTED_ROWS = """ +swh:1:rev:0000000000000000000000000000000000000003,0,1,, +swh:1:rev:0000000000000000000000000000000000000009,1,3,swh:1:rev:0000000000000000000000000000000000000003, +swh:1:rel:0000000000000000000000000000000000000010,1,1,swh:1:rev:0000000000000000000000000000000000000009, +swh:1:snp:0000000000000000000000000000000000000020,2,1,swh:1:rev:0000000000000000000000000000000000000009,swh:1:rel:0000000000000000000000000000000000000010 +swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0,swh:1:snp:0000000000000000000000000000000000000020, +swh:1:rev:0000000000000000000000000000000000000013,1,1,swh:1:rev:0000000000000000000000000000000000000009, +swh:1:rev:0000000000000000000000000000000000000018,1,1,swh:1:rev:0000000000000000000000000000000000000013, +swh:1:rel:0000000000000000000000000000000000000019,1,0,swh:1:rev:0000000000000000000000000000000000000018, +""" + + +def test_toposort(tmpdir): + tmpdir = Path(tmpdir) + + topological_order_path = tmpdir / "topo_order.csv.zst" + + task = TopoSort( + local_graph_path=DATA_DIR / "compressed", + topological_order_path=topological_order_path, + graph_name="example", + ) + + task.run() + + csv_text = subprocess.check_output(["zstdcat", topological_order_path]).decode() + + lines = csv_text.split("\n") + (header, *rows) = lines + assert header == "SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2" + + # The only possible first line + assert rows[0] == "swh:1:rev:0000000000000000000000000000000000000003,0,1,," + + assert set(rows) == set(EXPECTED_ROWS.split("\n")) + + assert rows.pop() == "", "Missing trailing newline" + + # The only two possible last lines + assert rows[-1] in [ + "swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0" + ",swh:1:snp:0000000000000000000000000000000000000020,", + "swh:1:rel:0000000000000000000000000000000000000019,1,0" + ",swh:1:rev:0000000000000000000000000000000000000018,", + ]