Page MenuHomeSoftware Heritage

D8883.id32087.diff
No OneTemporary

D8883.id32087.diff

diff --git a/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java b/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java
new file mode 100644
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2022 The Software Heritage developers
+ * See the AUTHORS file at the top-level directory of this distribution
+ * License: GNU General Public License version 3, or any later version
+ * See top-level LICENSE file for more information
+ */
+
+package org.softwareheritage.graph.utils;
+
+import com.martiansoftware.jsap.*;
+import it.unimi.dsi.big.webgraph.LazyLongIterator;
+import it.unimi.dsi.big.webgraph.NodeIterator;
+import org.softwareheritage.graph.*;
+
+import java.io.IOException;
+import java.util.*;
+
+/* Lists all nodes nodes of the types given as argument, in topological order,
+ * from leaves (contents, if selected) to the top (origins, if selected).
+ *
+ * This uses a DFS, so nodes are likely to be close to their neighbors.
+ *
+ * Some extra information is provided to allow more efficient consumption
+ * of the output: number of ancestors, successors, and a sample of two ancestors.
+ *
+ * Sample invocation:
+ *
+ * $ java -cp ~/swh-environment/swh-graph/java/target/swh-graph-*.jar -Xmx1000G -XX:PretenureSizeThreshold=512M -XX:MaxNewSize=4G -XX:+UseLargePages -XX:+UseTransparentHugePages -XX:+UseNUMA -XX:+UseTLAB -XX:+ResizeTLAB org.softwareheritage.graph.utils.TopoSort /dev/shm/swh-graph/default/graph 'rev,rel,snp,ori' \
+ * | pv --line-mode --wait \
+ * | zstdmt \
+ * > /poolswh/softwareheritage/vlorentz/2022-04-25_toposort_rev,rel,snp,ori.txt.zst
+ */
+
+public class TopoSort {
+ private Subgraph graph;
+ private Subgraph transposedGraph;
+
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
+ if (args.length != 2) {
+ System.err.println("Syntax: java org.softwareheritage.graph.utils.TopoSort <path/to/graph> <nodeTypes>");
+ System.exit(1);
+ }
+ String graphPath = args[0];
+ String nodeTypes = args[1];
+
+ TopoSort toposort = new TopoSort();
+
+ toposort.load_graph(graphPath, nodeTypes);
+ toposort.toposortDFS();
+ }
+
+ public void load_graph(String graphBasename, String nodeTypes) throws IOException {
+ System.err.println("Loading graph " + graphBasename + " ...");
+ var underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename);
+ System.err.println("Selecting subgraphs.");
+ graph = new Subgraph(underlyingGraph, new AllowedNodes(nodeTypes));
+ transposedGraph = graph.transpose();
+ System.err.println("Graph loaded.");
+ }
+
+ /* Prints nodes in topological order, based on a DFS. */
+ public void toposortDFS() {
+ HashSet<Long> visited = new HashSet<Long>();
+ Stack<Long> ready = new Stack<>();
+
+ /* First, push all leaves to the stack */
+ System.err.println("Listing leaves.");
+ long total_nodes = 0;
+ NodeIterator nodeIterator = graph.nodeIterator();
+ for (long currentNodeId = nodeIterator.nextLong(); nodeIterator
+ .hasNext(); currentNodeId = nodeIterator.nextLong()) {
+ total_nodes++;
+ long firstSuccessor = graph.successors(currentNodeId).nextLong();
+ if (firstSuccessor != -1) {
+ /* The node has ancestor, so it is not a leaf. */
+ continue;
+ }
+ ready.push(currentNodeId);
+ if (ready.size() % 10000000 == 0) {
+ float ready_size_f = ready.size();
+ float total_nodes_f = total_nodes;
+ System.err.printf("Listed %.02f B leaves (out of %.02f B nodes)\n", ready_size_f / 1000000000.,
+ total_nodes_f / 1000000000.);
+ }
+ }
+ System.err.println("Leaves loaded, starting DFS.");
+
+ System.out.format("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2\n");
+ while (!ready.isEmpty()) {
+ long currentNodeId = ready.pop();
+ visited.add(currentNodeId);
+
+ /* Find its successors which are ready */
+ LazyLongIterator successors = transposedGraph.successors(currentNodeId);
+ long successorCount = 0;
+ for (long successorNodeId; (successorNodeId = successors.nextLong()) != -1;) {
+ successorCount++;
+ LazyLongIterator successorAncestors = graph.successors(successorNodeId);
+ boolean isReady = true;
+ for (long successorAncestorNodeId; (successorAncestorNodeId = successorAncestors.nextLong()) != -1;) {
+ if (!visited.contains(successorAncestorNodeId)) {
+ /*
+ * This ancestor of the successor is not yet visited, so the ancestor is not ready.
+ */
+ isReady = false;
+ break;
+ }
+ }
+ if (isReady) {
+ ready.push(successorNodeId);
+ }
+ }
+
+ String[] sampleAncestors = {"", ""};
+ long ancestorCount = 0;
+ LazyLongIterator ancestors = graph.successors(currentNodeId);
+ for (long ancestorNodeId; (ancestorNodeId = ancestors.nextLong()) != -1;) {
+ if (ancestorCount < sampleAncestors.length) {
+ sampleAncestors[(int) ancestorCount] = graph.getSWHID(ancestorNodeId).toString();
+ }
+ ancestorCount++;
+ }
+
+ /*
+ * Print the node
+ *
+ * TODO: print its depth too?
+ */
+ SWHID currentNodeSWHID = graph.getSWHID(currentNodeId);
+ System.out.format("%s,%d,%d,%s,%s\n", currentNodeSWHID, ancestorCount, successorCount, sampleAncestors[0],
+ sampleAncestors[1]);
+ }
+ }
+}
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -22,3 +22,6 @@
[mypy-pytest.*]
ignore_missing_imports = True
+
+[mypy-tqdm.*]
+ignore_missing_imports = True
diff --git a/requirements-luigi.txt b/requirements-luigi.txt
--- a/requirements-luigi.txt
+++ b/requirements-luigi.txt
@@ -1 +1,2 @@
luigi
+tqdm
diff --git a/requirements-swh-luigi.txt b/requirements-swh-luigi.txt
--- a/requirements-swh-luigi.txt
+++ b/requirements-swh-luigi.txt
@@ -1 +1 @@
-swh.dataset[luigi] >= v0.3.2
+swh.dataset[luigi] >= v1.0.1
diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py
--- a/swh/graph/luigi.py
+++ b/swh/graph/luigi.py
@@ -56,19 +56,33 @@
When the compression pipeline is run in separate steps, each of the steps is recorded
as an object in the root list.
+
+S3 layout
+---------
+
+As ``.bin`` files are meant to be accessed randomly, they are uncompressed on disk.
+However, this is undesirable on at-rest/long-term storage like on S3, because
+some are very sparse (eg. :file:`graph.property.committer_timestamp.bin` can be
+quickly compressed from 300 to 1GB).
+
+Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed
+when downloading.
+
+The layout is otherwise the same as the file layout.
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
-from typing import List
+from pathlib import Path
+from typing import Dict, List
import luigi
-from swh.dataset.luigi import Format, LocalExport, ObjectType
+from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter
class CompressGraph(luigi.Task):
- local_export_path = luigi.PathParameter()
+ local_export_path = luigi.PathParameter(significant=False)
local_graph_path = luigi.PathParameter()
batch_size = luigi.IntParameter(
default=0,
@@ -184,3 +198,281 @@
)
with self._compression_meta().open("w") as fd:
json.dump(meta, fd, indent=4)
+
+
+class UploadGraphToS3(luigi.Task):
+ """Uploads a local compressed graphto S3; creating automatically if it does
+ not exist.
+
+ Example invocation::
+
+ luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 \
+ --local-graph-path=graph/ \
+ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
+ """
+
+ local_graph_path = luigi.PathParameter(significant=False)
+ s3_graph_path = S3PathParameter()
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns a :class:`CompressGraph` task that writes local files at the
+ expected location."""
+ return [
+ CompressGraph(
+ local_graph_path=self.local_graph_path,
+ )
+ ]
+
+ def output(self) -> List[luigi.Target]:
+ """Returns stamp and meta paths on S3."""
+ return [self._meta()]
+
+ def _meta(self):
+ import luigi.contrib.s3
+
+ return luigi.contrib.s3.S3Target(f"{self.s3_graph_path}/meta/compression.json")
+
+ def run(self) -> None:
+ """Copies all files: first the graph itself, then :file:`meta/compression.json`."""
+ import subprocess
+ import tempfile
+
+ import luigi.contrib.s3
+ import tqdm
+
+ compression_metadata_path = self.local_graph_path / "meta" / "compression.json"
+ seen_compression_metadata = False
+
+ client = luigi.contrib.s3.S3Client()
+
+ # recursively copy local files to S3, and end with compression metadata
+ paths = list(self.local_graph_path.glob("**/*"))
+ for (i, path) in tqdm.tqdm(
+ list(enumerate(paths)),
+ desc="Uploading compressed graph",
+ ):
+ if path == compression_metadata_path:
+ # Write it last
+ seen_compression_metadata = True
+ continue
+ if path.is_dir():
+ continue
+ relative_path = path.relative_to(self.local_graph_path)
+ self.set_progress_percentage(int(i * 100 / len(paths)))
+
+ if path.suffix == ".bin":
+ # Large sparse file; store it compressed on S3.
+ with tempfile.NamedTemporaryFile(
+ prefix=path.stem, suffix=".bin.zst"
+ ) as fd:
+ self.set_status_message(f"Compressing {relative_path}")
+ subprocess.run(
+ ["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True
+ )
+ self.set_status_message(f"Uploading {relative_path} (compressed)")
+ client.put_multipart(
+ fd.name,
+ f"{self.s3_graph_path}/{relative_path}.zst",
+ ACL="public-read",
+ )
+ else:
+ self.set_status_message(f"Uploading {relative_path}")
+ client.put_multipart(
+ path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read"
+ )
+
+ assert (
+ seen_compression_metadata
+ ), "did not see meta/compression.json in directory listing"
+
+ # Write it last, to act as a stamp
+ client.put(
+ compression_metadata_path,
+ self._meta().path,
+ ACL="public-read",
+ )
+
+
+class DownloadGraphFromS3(luigi.Task):
+ """Downloads a local dataset graph from S3.
+
+ This performs the inverse operation of :class:`UploadGraphToS3`
+
+ Example invocation::
+
+ luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 \
+ --local-graph-path=graph/ \
+ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
+ """
+
+ local_graph_path = luigi.PathParameter()
+ s3_graph_path = S3PathParameter(significant=False)
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns a :class:`ExportGraph` task that writes local files at the
+ expected location."""
+ return [
+ UploadGraphToS3(
+ local_graph_path=self.local_graph_path,
+ s3_graph_path=self.s3_graph_path,
+ )
+ ]
+
+ def output(self) -> List[luigi.Target]:
+ """Returns stamp and meta paths on the local filesystem."""
+ return [self._meta()]
+
+ def _meta(self):
+ return luigi.LocalTarget(self.local_graph_path / "meta" / "export.json")
+
+ def run(self) -> None:
+ """Copies all files: first the graph itself, then :file:`meta/compression.json`."""
+ import subprocess
+ import tempfile
+
+ import luigi.contrib.s3
+ import tqdm
+
+ client = luigi.contrib.s3.S3Client()
+
+ compression_metadata_path = f"{self.s3_graph_path}/meta/compression.json"
+ seen_compression_metadata = False
+
+ # recursively copy local files to S3, and end with compression metadata
+ files = list(client.list(self.s3_graph_path))
+ for (i, file_) in tqdm.tqdm(
+ list(enumerate(files)),
+ desc="Downloading",
+ ):
+ if file_ == compression_metadata_path:
+ # Will copy it last
+ seen_compression_metadata = True
+ continue
+ self.set_progress_percentage(int(i * 100 / len(files)))
+ local_path = self.local_graph_path / file_
+ local_path.parent.mkdir(parents=True, exist_ok=True)
+ if file_.endswith(".bin.zst"):
+ # The file was compressed before uploading to S3, we need it
+ # to be decompressed locally
+ with tempfile.NamedTemporaryFile(
+ prefix=local_path.stem, suffix=".bin.zst"
+ ) as fd:
+ self.set_status_message(f"Downloading {file_} (compressed)")
+ client.get(
+ f"{self.s3_graph_path}/{file_}",
+ fd.name,
+ )
+ self.set_status_message(f"Decompressing {file_}")
+ subprocess.run(
+ ["zstdmt", "--force", "-d", fd.name, "-o", local_path],
+ check=True,
+ )
+ else:
+ self.set_status_message(f"Downloading {file_}")
+ client.get(
+ f"{self.s3_graph_path}/{file_}",
+ str(local_path),
+ )
+
+ assert (
+ seen_compression_metadata
+ ), "did not see meta/compression.json in directory listing"
+
+ # Write it last, to act as a stamp
+ client.get(
+ compression_metadata_path,
+ self._meta().path,
+ )
+
+
+class LocalGraph(luigi.Task):
+ """Task that depends on a local dataset being present -- either directly from
+ :class:`ExportGraph` or via :class:`DownloadGraphFromS3`.
+ """
+
+ local_graph_path = luigi.PathParameter()
+ compression_task_type = luigi.TaskParameter(
+ default=DownloadGraphFromS3,
+ significant=False,
+ description="""The task used to get the compressed graph if it is not present.
+ Should be either ``swh.graph.luigi.CompressGraph`` or
+ ``swh.graph.luigi.DownloadGraphFromS3``.""",
+ )
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of either :class:`CompressGraph` or
+ :class:`DownloadGraphFromS3` depending on the value of
+ :attr:`compression_task_type`."""
+
+ if issubclass(self.compression_task_type, CompressGraph):
+ return [
+ CompressGraph(
+ local_graph_path=self.local_graph_path,
+ )
+ ]
+ elif issubclass(self.compression_task_type, DownloadGraphFromS3):
+ return [
+ DownloadGraphFromS3(
+ local_graph_path=self.local_graph_path,
+ )
+ ]
+ else:
+ raise ValueError(
+ f"Unexpected compression_task_type: "
+ f"{self.compression_task_type.__name__}"
+ )
+
+ def output(self) -> List[luigi.Target]:
+ """Returns stamp and meta paths on the local filesystem."""
+ return [self._meta()]
+
+ def _meta(self):
+ return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json")
+
+
+class TopoSort(luigi.Task):
+ """Creates a file that contains all SWHIDs in topological order from a compressed
+ graph."""
+
+ local_graph_path = luigi.PathParameter()
+ topological_order_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of :class:`LocalGraph`."""
+ return [LocalGraph(local_graph_path=self.local_graph_path)]
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the topological order."""
+ return luigi.LocalTarget(self.topological_order_path)
+
+ def run(self) -> None:
+ """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
+ import os
+ import subprocess
+
+ from .config import check_config
+
+ conf: Dict = {} # TODO: configurable
+
+ conf = check_config(conf)
+ env = {
+ **os.environ.copy(),
+ "JAVA_TOOL_OPTIONS": conf["java_tool_options"],
+ "CLASSPATH": conf["classpath"],
+ }
+
+ tmp_path = Path(f"{self.topological_order_path}.tmp")
+
+ object_types = "rev,rel,snp,ori"
+ class_name = "org.softwareheritage.graph.utils.TopoSort"
+ script = f"""
+ java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \
+ | pv --line-mode --wait \
+ | zstdmt -19 \
+ > '{tmp_path}'
+ """
+ subprocess.run(["bash", "-c", script], env=env, check=True)
+
+ # Atomically write the output file
+ tmp_path.replace(self.topological_order_path)
diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py
new file mode 100644
--- /dev/null
+++ b/swh/graph/tests/test_toposort.py
@@ -0,0 +1,58 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from pathlib import Path
+import subprocess
+
+from swh.graph.luigi import TopoSort
+
+DATA_DIR = Path(__file__).parents[0] / "dataset"
+
+
+EXPECTED_ROWS = """
+swh:1:rev:0000000000000000000000000000000000000003,0,1,,
+swh:1:rev:0000000000000000000000000000000000000009,1,3,swh:1:rev:0000000000000000000000000000000000000003,
+swh:1:rel:0000000000000000000000000000000000000010,1,1,swh:1:rev:0000000000000000000000000000000000000009,
+swh:1:snp:0000000000000000000000000000000000000020,2,1,swh:1:rev:0000000000000000000000000000000000000009,swh:1:rel:0000000000000000000000000000000000000010
+swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0,swh:1:snp:0000000000000000000000000000000000000020,
+swh:1:rev:0000000000000000000000000000000000000013,1,1,swh:1:rev:0000000000000000000000000000000000000009,
+swh:1:rev:0000000000000000000000000000000000000018,1,1,swh:1:rev:0000000000000000000000000000000000000013,
+swh:1:rel:0000000000000000000000000000000000000019,1,0,swh:1:rev:0000000000000000000000000000000000000018,
+"""
+
+
+def test_toposort(tmpdir):
+ tmpdir = Path(tmpdir)
+
+ topological_order_path = tmpdir / "topo_order.csv.zst"
+
+ task = TopoSort(
+ local_graph_path=DATA_DIR / "compressed",
+ topological_order_path=topological_order_path,
+ graph_name="example",
+ )
+
+ task.run()
+
+ csv_text = subprocess.check_output(["zstdcat", topological_order_path]).decode()
+
+ lines = csv_text.split("\n")
+ (header, *rows) = lines
+ assert header == "SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2"
+
+ # The only possible first line
+ assert rows[0] == "swh:1:rev:0000000000000000000000000000000000000003,0,1,,"
+
+ assert set(rows) == set(EXPECTED_ROWS.split("\n"))
+
+ assert rows.pop() == "", "Missing trailing newline"
+
+ # The only two possible last lines
+ assert rows[-1] in [
+ "swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0"
+ ",swh:1:snp:0000000000000000000000000000000000000020,",
+ "swh:1:rel:0000000000000000000000000000000000000019,1,0"
+ ",swh:1:rev:0000000000000000000000000000000000000018,",
+ ]

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:19 PM (2 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227419

Event Timeline