Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9341827
D8883.id32087.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
20 KB
Subscribers
None
D8883.id32087.diff
View Options
diff --git a/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java b/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java
new file mode 100644
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/utils/TopoSort.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2022 The Software Heritage developers
+ * See the AUTHORS file at the top-level directory of this distribution
+ * License: GNU General Public License version 3, or any later version
+ * See top-level LICENSE file for more information
+ */
+
+package org.softwareheritage.graph.utils;
+
+import com.martiansoftware.jsap.*;
+import it.unimi.dsi.big.webgraph.LazyLongIterator;
+import it.unimi.dsi.big.webgraph.NodeIterator;
+import org.softwareheritage.graph.*;
+
+import java.io.IOException;
+import java.util.*;
+
+/* Lists all nodes nodes of the types given as argument, in topological order,
+ * from leaves (contents, if selected) to the top (origins, if selected).
+ *
+ * This uses a DFS, so nodes are likely to be close to their neighbors.
+ *
+ * Some extra information is provided to allow more efficient consumption
+ * of the output: number of ancestors, successors, and a sample of two ancestors.
+ *
+ * Sample invocation:
+ *
+ * $ java -cp ~/swh-environment/swh-graph/java/target/swh-graph-*.jar -Xmx1000G -XX:PretenureSizeThreshold=512M -XX:MaxNewSize=4G -XX:+UseLargePages -XX:+UseTransparentHugePages -XX:+UseNUMA -XX:+UseTLAB -XX:+ResizeTLAB org.softwareheritage.graph.utils.TopoSort /dev/shm/swh-graph/default/graph 'rev,rel,snp,ori' \
+ * | pv --line-mode --wait \
+ * | zstdmt \
+ * > /poolswh/softwareheritage/vlorentz/2022-04-25_toposort_rev,rel,snp,ori.txt.zst
+ */
+
+public class TopoSort {
+ private Subgraph graph;
+ private Subgraph transposedGraph;
+
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
+ if (args.length != 2) {
+ System.err.println("Syntax: java org.softwareheritage.graph.utils.TopoSort <path/to/graph> <nodeTypes>");
+ System.exit(1);
+ }
+ String graphPath = args[0];
+ String nodeTypes = args[1];
+
+ TopoSort toposort = new TopoSort();
+
+ toposort.load_graph(graphPath, nodeTypes);
+ toposort.toposortDFS();
+ }
+
+ public void load_graph(String graphBasename, String nodeTypes) throws IOException {
+ System.err.println("Loading graph " + graphBasename + " ...");
+ var underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename);
+ System.err.println("Selecting subgraphs.");
+ graph = new Subgraph(underlyingGraph, new AllowedNodes(nodeTypes));
+ transposedGraph = graph.transpose();
+ System.err.println("Graph loaded.");
+ }
+
+ /* Prints nodes in topological order, based on a DFS. */
+ public void toposortDFS() {
+ HashSet<Long> visited = new HashSet<Long>();
+ Stack<Long> ready = new Stack<>();
+
+ /* First, push all leaves to the stack */
+ System.err.println("Listing leaves.");
+ long total_nodes = 0;
+ NodeIterator nodeIterator = graph.nodeIterator();
+ for (long currentNodeId = nodeIterator.nextLong(); nodeIterator
+ .hasNext(); currentNodeId = nodeIterator.nextLong()) {
+ total_nodes++;
+ long firstSuccessor = graph.successors(currentNodeId).nextLong();
+ if (firstSuccessor != -1) {
+ /* The node has ancestor, so it is not a leaf. */
+ continue;
+ }
+ ready.push(currentNodeId);
+ if (ready.size() % 10000000 == 0) {
+ float ready_size_f = ready.size();
+ float total_nodes_f = total_nodes;
+ System.err.printf("Listed %.02f B leaves (out of %.02f B nodes)\n", ready_size_f / 1000000000.,
+ total_nodes_f / 1000000000.);
+ }
+ }
+ System.err.println("Leaves loaded, starting DFS.");
+
+ System.out.format("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2\n");
+ while (!ready.isEmpty()) {
+ long currentNodeId = ready.pop();
+ visited.add(currentNodeId);
+
+ /* Find its successors which are ready */
+ LazyLongIterator successors = transposedGraph.successors(currentNodeId);
+ long successorCount = 0;
+ for (long successorNodeId; (successorNodeId = successors.nextLong()) != -1;) {
+ successorCount++;
+ LazyLongIterator successorAncestors = graph.successors(successorNodeId);
+ boolean isReady = true;
+ for (long successorAncestorNodeId; (successorAncestorNodeId = successorAncestors.nextLong()) != -1;) {
+ if (!visited.contains(successorAncestorNodeId)) {
+ /*
+ * This ancestor of the successor is not yet visited, so the ancestor is not ready.
+ */
+ isReady = false;
+ break;
+ }
+ }
+ if (isReady) {
+ ready.push(successorNodeId);
+ }
+ }
+
+ String[] sampleAncestors = {"", ""};
+ long ancestorCount = 0;
+ LazyLongIterator ancestors = graph.successors(currentNodeId);
+ for (long ancestorNodeId; (ancestorNodeId = ancestors.nextLong()) != -1;) {
+ if (ancestorCount < sampleAncestors.length) {
+ sampleAncestors[(int) ancestorCount] = graph.getSWHID(ancestorNodeId).toString();
+ }
+ ancestorCount++;
+ }
+
+ /*
+ * Print the node
+ *
+ * TODO: print its depth too?
+ */
+ SWHID currentNodeSWHID = graph.getSWHID(currentNodeId);
+ System.out.format("%s,%d,%d,%s,%s\n", currentNodeSWHID, ancestorCount, successorCount, sampleAncestors[0],
+ sampleAncestors[1]);
+ }
+ }
+}
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -22,3 +22,6 @@
[mypy-pytest.*]
ignore_missing_imports = True
+
+[mypy-tqdm.*]
+ignore_missing_imports = True
diff --git a/requirements-luigi.txt b/requirements-luigi.txt
--- a/requirements-luigi.txt
+++ b/requirements-luigi.txt
@@ -1 +1,2 @@
luigi
+tqdm
diff --git a/requirements-swh-luigi.txt b/requirements-swh-luigi.txt
--- a/requirements-swh-luigi.txt
+++ b/requirements-swh-luigi.txt
@@ -1 +1 @@
-swh.dataset[luigi] >= v0.3.2
+swh.dataset[luigi] >= v1.0.1
diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py
--- a/swh/graph/luigi.py
+++ b/swh/graph/luigi.py
@@ -56,19 +56,33 @@
When the compression pipeline is run in separate steps, each of the steps is recorded
as an object in the root list.
+
+S3 layout
+---------
+
+As ``.bin`` files are meant to be accessed randomly, they are uncompressed on disk.
+However, this is undesirable on at-rest/long-term storage like on S3, because
+some are very sparse (eg. :file:`graph.property.committer_timestamp.bin` can be
+quickly compressed from 300 to 1GB).
+
+Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed
+when downloading.
+
+The layout is otherwise the same as the file layout.
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
-from typing import List
+from pathlib import Path
+from typing import Dict, List
import luigi
-from swh.dataset.luigi import Format, LocalExport, ObjectType
+from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter
class CompressGraph(luigi.Task):
- local_export_path = luigi.PathParameter()
+ local_export_path = luigi.PathParameter(significant=False)
local_graph_path = luigi.PathParameter()
batch_size = luigi.IntParameter(
default=0,
@@ -184,3 +198,281 @@
)
with self._compression_meta().open("w") as fd:
json.dump(meta, fd, indent=4)
+
+
+class UploadGraphToS3(luigi.Task):
+ """Uploads a local compressed graphto S3; creating automatically if it does
+ not exist.
+
+ Example invocation::
+
+ luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 \
+ --local-graph-path=graph/ \
+ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
+ """
+
+ local_graph_path = luigi.PathParameter(significant=False)
+ s3_graph_path = S3PathParameter()
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns a :class:`CompressGraph` task that writes local files at the
+ expected location."""
+ return [
+ CompressGraph(
+ local_graph_path=self.local_graph_path,
+ )
+ ]
+
+ def output(self) -> List[luigi.Target]:
+ """Returns stamp and meta paths on S3."""
+ return [self._meta()]
+
+ def _meta(self):
+ import luigi.contrib.s3
+
+ return luigi.contrib.s3.S3Target(f"{self.s3_graph_path}/meta/compression.json")
+
+ def run(self) -> None:
+ """Copies all files: first the graph itself, then :file:`meta/compression.json`."""
+ import subprocess
+ import tempfile
+
+ import luigi.contrib.s3
+ import tqdm
+
+ compression_metadata_path = self.local_graph_path / "meta" / "compression.json"
+ seen_compression_metadata = False
+
+ client = luigi.contrib.s3.S3Client()
+
+ # recursively copy local files to S3, and end with compression metadata
+ paths = list(self.local_graph_path.glob("**/*"))
+ for (i, path) in tqdm.tqdm(
+ list(enumerate(paths)),
+ desc="Uploading compressed graph",
+ ):
+ if path == compression_metadata_path:
+ # Write it last
+ seen_compression_metadata = True
+ continue
+ if path.is_dir():
+ continue
+ relative_path = path.relative_to(self.local_graph_path)
+ self.set_progress_percentage(int(i * 100 / len(paths)))
+
+ if path.suffix == ".bin":
+ # Large sparse file; store it compressed on S3.
+ with tempfile.NamedTemporaryFile(
+ prefix=path.stem, suffix=".bin.zst"
+ ) as fd:
+ self.set_status_message(f"Compressing {relative_path}")
+ subprocess.run(
+ ["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True
+ )
+ self.set_status_message(f"Uploading {relative_path} (compressed)")
+ client.put_multipart(
+ fd.name,
+ f"{self.s3_graph_path}/{relative_path}.zst",
+ ACL="public-read",
+ )
+ else:
+ self.set_status_message(f"Uploading {relative_path}")
+ client.put_multipart(
+ path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read"
+ )
+
+ assert (
+ seen_compression_metadata
+ ), "did not see meta/compression.json in directory listing"
+
+ # Write it last, to act as a stamp
+ client.put(
+ compression_metadata_path,
+ self._meta().path,
+ ACL="public-read",
+ )
+
+
+class DownloadGraphFromS3(luigi.Task):
+ """Downloads a local dataset graph from S3.
+
+ This performs the inverse operation of :class:`UploadGraphToS3`
+
+ Example invocation::
+
+ luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 \
+ --local-graph-path=graph/ \
+ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
+ """
+
+ local_graph_path = luigi.PathParameter()
+ s3_graph_path = S3PathParameter(significant=False)
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns a :class:`ExportGraph` task that writes local files at the
+ expected location."""
+ return [
+ UploadGraphToS3(
+ local_graph_path=self.local_graph_path,
+ s3_graph_path=self.s3_graph_path,
+ )
+ ]
+
+ def output(self) -> List[luigi.Target]:
+ """Returns stamp and meta paths on the local filesystem."""
+ return [self._meta()]
+
+ def _meta(self):
+ return luigi.LocalTarget(self.local_graph_path / "meta" / "export.json")
+
+ def run(self) -> None:
+ """Copies all files: first the graph itself, then :file:`meta/compression.json`."""
+ import subprocess
+ import tempfile
+
+ import luigi.contrib.s3
+ import tqdm
+
+ client = luigi.contrib.s3.S3Client()
+
+ compression_metadata_path = f"{self.s3_graph_path}/meta/compression.json"
+ seen_compression_metadata = False
+
+ # recursively copy local files to S3, and end with compression metadata
+ files = list(client.list(self.s3_graph_path))
+ for (i, file_) in tqdm.tqdm(
+ list(enumerate(files)),
+ desc="Downloading",
+ ):
+ if file_ == compression_metadata_path:
+ # Will copy it last
+ seen_compression_metadata = True
+ continue
+ self.set_progress_percentage(int(i * 100 / len(files)))
+ local_path = self.local_graph_path / file_
+ local_path.parent.mkdir(parents=True, exist_ok=True)
+ if file_.endswith(".bin.zst"):
+ # The file was compressed before uploading to S3, we need it
+ # to be decompressed locally
+ with tempfile.NamedTemporaryFile(
+ prefix=local_path.stem, suffix=".bin.zst"
+ ) as fd:
+ self.set_status_message(f"Downloading {file_} (compressed)")
+ client.get(
+ f"{self.s3_graph_path}/{file_}",
+ fd.name,
+ )
+ self.set_status_message(f"Decompressing {file_}")
+ subprocess.run(
+ ["zstdmt", "--force", "-d", fd.name, "-o", local_path],
+ check=True,
+ )
+ else:
+ self.set_status_message(f"Downloading {file_}")
+ client.get(
+ f"{self.s3_graph_path}/{file_}",
+ str(local_path),
+ )
+
+ assert (
+ seen_compression_metadata
+ ), "did not see meta/compression.json in directory listing"
+
+ # Write it last, to act as a stamp
+ client.get(
+ compression_metadata_path,
+ self._meta().path,
+ )
+
+
+class LocalGraph(luigi.Task):
+ """Task that depends on a local dataset being present -- either directly from
+ :class:`ExportGraph` or via :class:`DownloadGraphFromS3`.
+ """
+
+ local_graph_path = luigi.PathParameter()
+ compression_task_type = luigi.TaskParameter(
+ default=DownloadGraphFromS3,
+ significant=False,
+ description="""The task used to get the compressed graph if it is not present.
+ Should be either ``swh.graph.luigi.CompressGraph`` or
+ ``swh.graph.luigi.DownloadGraphFromS3``.""",
+ )
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of either :class:`CompressGraph` or
+ :class:`DownloadGraphFromS3` depending on the value of
+ :attr:`compression_task_type`."""
+
+ if issubclass(self.compression_task_type, CompressGraph):
+ return [
+ CompressGraph(
+ local_graph_path=self.local_graph_path,
+ )
+ ]
+ elif issubclass(self.compression_task_type, DownloadGraphFromS3):
+ return [
+ DownloadGraphFromS3(
+ local_graph_path=self.local_graph_path,
+ )
+ ]
+ else:
+ raise ValueError(
+ f"Unexpected compression_task_type: "
+ f"{self.compression_task_type.__name__}"
+ )
+
+ def output(self) -> List[luigi.Target]:
+ """Returns stamp and meta paths on the local filesystem."""
+ return [self._meta()]
+
+ def _meta(self):
+ return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json")
+
+
+class TopoSort(luigi.Task):
+ """Creates a file that contains all SWHIDs in topological order from a compressed
+ graph."""
+
+ local_graph_path = luigi.PathParameter()
+ topological_order_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of :class:`LocalGraph`."""
+ return [LocalGraph(local_graph_path=self.local_graph_path)]
+
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the topological order."""
+ return luigi.LocalTarget(self.topological_order_path)
+
+ def run(self) -> None:
+ """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
+ import os
+ import subprocess
+
+ from .config import check_config
+
+ conf: Dict = {} # TODO: configurable
+
+ conf = check_config(conf)
+ env = {
+ **os.environ.copy(),
+ "JAVA_TOOL_OPTIONS": conf["java_tool_options"],
+ "CLASSPATH": conf["classpath"],
+ }
+
+ tmp_path = Path(f"{self.topological_order_path}.tmp")
+
+ object_types = "rev,rel,snp,ori"
+ class_name = "org.softwareheritage.graph.utils.TopoSort"
+ script = f"""
+ java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \
+ | pv --line-mode --wait \
+ | zstdmt -19 \
+ > '{tmp_path}'
+ """
+ subprocess.run(["bash", "-c", script], env=env, check=True)
+
+ # Atomically write the output file
+ tmp_path.replace(self.topological_order_path)
diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py
new file mode 100644
--- /dev/null
+++ b/swh/graph/tests/test_toposort.py
@@ -0,0 +1,58 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from pathlib import Path
+import subprocess
+
+from swh.graph.luigi import TopoSort
+
+DATA_DIR = Path(__file__).parents[0] / "dataset"
+
+
+EXPECTED_ROWS = """
+swh:1:rev:0000000000000000000000000000000000000003,0,1,,
+swh:1:rev:0000000000000000000000000000000000000009,1,3,swh:1:rev:0000000000000000000000000000000000000003,
+swh:1:rel:0000000000000000000000000000000000000010,1,1,swh:1:rev:0000000000000000000000000000000000000009,
+swh:1:snp:0000000000000000000000000000000000000020,2,1,swh:1:rev:0000000000000000000000000000000000000009,swh:1:rel:0000000000000000000000000000000000000010
+swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0,swh:1:snp:0000000000000000000000000000000000000020,
+swh:1:rev:0000000000000000000000000000000000000013,1,1,swh:1:rev:0000000000000000000000000000000000000009,
+swh:1:rev:0000000000000000000000000000000000000018,1,1,swh:1:rev:0000000000000000000000000000000000000013,
+swh:1:rel:0000000000000000000000000000000000000019,1,0,swh:1:rev:0000000000000000000000000000000000000018,
+"""
+
+
+def test_toposort(tmpdir):
+ tmpdir = Path(tmpdir)
+
+ topological_order_path = tmpdir / "topo_order.csv.zst"
+
+ task = TopoSort(
+ local_graph_path=DATA_DIR / "compressed",
+ topological_order_path=topological_order_path,
+ graph_name="example",
+ )
+
+ task.run()
+
+ csv_text = subprocess.check_output(["zstdcat", topological_order_path]).decode()
+
+ lines = csv_text.split("\n")
+ (header, *rows) = lines
+ assert header == "SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2"
+
+ # The only possible first line
+ assert rows[0] == "swh:1:rev:0000000000000000000000000000000000000003,0,1,,"
+
+ assert set(rows) == set(EXPECTED_ROWS.split("\n"))
+
+ assert rows.pop() == "", "Missing trailing newline"
+
+ # The only two possible last lines
+ assert rows[-1] in [
+ "swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,1,0"
+ ",swh:1:snp:0000000000000000000000000000000000000020,",
+ "swh:1:rel:0000000000000000000000000000000000000019,1,0"
+ ",swh:1:rev:0000000000000000000000000000000000000018,",
+ ]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 12:19 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227419
Attached To
D8883: Add a script to generate a topological sort
Event Timeline
Log In to Comment