byteNameToIntMap = Map.of("cnt".getBytes(StandardCharsets.UTF_8), 0,
+ "dir".getBytes(StandardCharsets.UTF_8), 1, "ori".getBytes(StandardCharsets.UTF_8), 2,
+ "rel".getBytes(StandardCharsets.UTF_8), 3, "rev".getBytes(StandardCharsets.UTF_8), 4,
+ "snp".getBytes(StandardCharsets.UTF_8), 5);
+
+ /**
+ * Converts byte array name to the int code of the corresponding SWH node type. Used for
+ * performance-critical deserialization.
+ *
+ * @param name node type represented as a byte array (e.g. b"cnt")
+ * @return the
+ * @see org.softwareheritage.graph.Node.Type
+ */
+ public static int byteNameToInt(byte[] name) {
+ if (Arrays.equals(name, "cnt".getBytes())) {
+ return 0;
+ } else if (Arrays.equals(name, "dir".getBytes())) {
+ return 1;
+ } else if (Arrays.equals(name, "ori".getBytes())) {
+ return 2;
+ } else if (Arrays.equals(name, "rel".getBytes())) {
+ return 3;
+ } else if (Arrays.equals(name, "rev".getBytes())) {
+ return 4;
+ } else if (Arrays.equals(name, "snp".getBytes())) {
+ return 5;
+ } else
+ return -1;
+ }
+
/**
* Parses SWH node type possible values from formatted string (see the
* API syntax).
diff --git a/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java
new file mode 100644
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java
@@ -0,0 +1,186 @@
+package org.softwareheritage.graph.compress;
+
+import com.github.luben.zstd.ZstdInputStream;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.softwareheritage.graph.maps.LabelMapBuilder;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * A graph dataset in (zstd-compressed) CSV format.
+ *
+ * This format does not contain any properties apart from the SWHIDs of the nodes, and optionally
+ * the labels of the edges and the permissions of the directory entries.
+ *
+ * The structure of the dataset is as follows: one directory per object type, each containing:
+ *
+ *
+ * - a number of files
*.nodes.csv.zst
containing the SWHIDs of the objects stored in
+ * the graph, one per line.
+ * - a number of files
*.edges.csv.zst
containing the edges of the graph, one per
+ * line. The format of each edge is as follows:
+ * SRC_SWHID DST_SWHID [BASE64_LABEL] [INT_PERMISSION]
.
+ *
+ *
+ */
+public class CSVEdgeDataset implements GraphDataset {
+ final static Logger logger = LoggerFactory.getLogger(LabelMapBuilder.class);
+
+ final private File datasetDir;
+
+ public CSVEdgeDataset(String datasetPath) {
+ this(new File(datasetPath));
+ }
+
+ public CSVEdgeDataset(File datasetDir) {
+ if (!datasetDir.exists()) {
+ throw new IllegalArgumentException("Dataset " + datasetDir.getName() + " does not exist");
+ }
+ this.datasetDir = datasetDir;
+ }
+
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ File[] allTables = datasetDir.listFiles();
+ if (allTables == null) {
+ return;
+ }
+ for (File tableFile : allTables) {
+ File[] allCsvFiles = tableFile.listFiles();
+ if (allCsvFiles == null) {
+ continue;
+ }
+ for (File csvFile : allCsvFiles) {
+ if (csvFile.getName().endsWith(".edges.csv.zst")) {
+ readEdgesCsvZst(csvFile.getPath(), edgeCb);
+ } else if (csvFile.getName().endsWith(".nodes.csv.zst")) {
+ readNodesCsvZst(csvFile.getPath(), nodeCb);
+ }
+ }
+ }
+ }
+
+ public static void readEdgesCsvZst(String csvZstPath, GraphDataset.EdgeCallback cb) throws IOException {
+ InputStream csvInputStream = new ZstdInputStream(new BufferedInputStream(new FileInputStream(csvZstPath)));
+ readEdgesCsv(csvInputStream, cb);
+ }
+
+ public static void readEdgesCsv(InputStream csvInputStream, GraphDataset.EdgeCallback cb) throws IOException {
+ FastBufferedInputStream csvReader = new FastBufferedInputStream(csvInputStream);
+
+ Charset charset = StandardCharsets.US_ASCII;
+ byte[] array = new byte[1024];
+ for (long line = 0;; line++) {
+ int start = 0, len;
+ while ((len = csvReader.readLine(array, start, array.length - start,
+ FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) {
+ start += len;
+ array = ByteArrays.grow(array, array.length + 1);
+ }
+ if (len == -1)
+ break; // EOF
+ final int lineLength = start + len;
+
+ // Skip whitespace at the start of the line.
+ int offset = 0;
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ if (offset == lineLength) {
+ continue;
+ }
+ if (array[0] == '#')
+ continue;
+
+ // Scan source id.
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ final byte[] ss = Arrays.copyOfRange(array, start, offset);
+
+ // Skip whitespace between identifiers.
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ if (offset == lineLength) {
+ logger.error("Error at line " + line + ": no target");
+ continue;
+ }
+
+ // Scan target ID
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ final byte[] ts = Arrays.copyOfRange(array, start, offset);
+
+ // Skip whitespace between identifiers.
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ // Scan label
+ byte[] ls = null;
+ if (offset < lineLength) {
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ ls = Arrays.copyOfRange(array, start, offset);
+ }
+
+ // Skip whitespace between identifiers.
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ // Scan permission
+ int permission = 0;
+ if (offset < lineLength) {
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ permission = Integer.parseInt(new String(array, start, offset - start, charset));
+ }
+
+ cb.onEdge(ss, ts, ls, permission);
+ }
+ }
+
+ public static void readNodesCsvZst(String csvZstPath, GraphDataset.NodeCallback cb) throws IOException {
+ InputStream csvInputStream = new ZstdInputStream(new BufferedInputStream(new FileInputStream(csvZstPath)));
+ readNodesCsv(csvInputStream, cb);
+ }
+
+ public static void readNodesCsv(InputStream csvInputStream, GraphDataset.NodeCallback cb) throws IOException {
+ FastBufferedInputStream csvReader = new FastBufferedInputStream(csvInputStream);
+
+ byte[] array = new byte[1024];
+ for (long line = 0;; line++) {
+ int start = 0, len;
+ while ((len = csvReader.readLine(array, start, array.length - start,
+ FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) {
+ start += len;
+ array = ByteArrays.grow(array, array.length + 1);
+ }
+ if (len == -1)
+ break; // EOF
+ final int lineLength = start + len;
+
+ // Skip whitespace at the start of the line.
+ int offset = 0;
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ if (offset == lineLength) {
+ continue;
+ }
+ if (array[0] == '#')
+ continue;
+
+ // Scan source id.
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ final byte[] ss = Arrays.copyOfRange(array, start, offset);
+
+ cb.onNode(ss);
+ }
+ }
+}
diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java b/java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java
rename from java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java
rename to java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java
--- a/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java
+++ b/java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java
@@ -1,4 +1,4 @@
-package org.softwareheritage.graph.utils;
+package org.softwareheritage.graph.compress;
import com.martiansoftware.jsap.*;
import it.unimi.dsi.Util;
@@ -11,7 +11,7 @@
* CLI program used to compose two on-disk permutations.
*
* It takes two on-disk permutations as parameters, p1 and p2, and writes on disk (p1 o p2) at the
- * given location. This is useful for multi-step compression (e.g. Unordered -> BFS -> LLP), as it
+ * given location. This is useful for multi-step compression (e.g., Unordered -> BFS -> LLP), as it
* can be used to merge all the intermediate permutations.
*/
public class ComposePermutations {
diff --git a/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java b/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
new file mode 100644
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
@@ -0,0 +1,294 @@
+package org.softwareheritage.graph.compress;
+
+import com.github.luben.zstd.ZstdOutputStream;
+import com.martiansoftware.jsap.*;
+import org.softwareheritage.graph.Node;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+// Java (python3k) new String() 1445.33s user 99.59s system 186% cpu 13:47.20 total
+// Java (python3k) new String() 421.99s user 105.45s system 178% cpu 14:14.05 total
+// Java (python3k) Array.copyOf() + string 1002.04s user 161.91s system 272% cpu 7:07.41 total
+// Java (python3) only writes: 887.68s user 97.80s system 270% cpu 6:03.76 total
+// Java (python3) no label type counting: 879.08s user 101.84s system 273% cpu 5:59.13 total
+
+// Shell (python3k) 641.48s user 340.56s system 387% cpu 4:13.68 total | 8.02GiB 0:01:52
+
+/**
+ * Read a graph dataset and extract all the unique node SWHIDs it contains, including the ones that
+ * are not stored as actual objects in the graph, but only referred to by the edges.
+ * Additionally, extract the set of all unique edge labels in the graph.
+ *
+ *
+ * - The set of nodes is written in
${outputBasename}.nodes.csv.zst
, as a
+ * zst-compressed sorted list of SWHIDs, one per line.
+ * - The set of edge labels is written in
${outputBasename}.labels.csv.zst
, as a
+ * zst-compressed sorted list of labels encoded in base64, one per line.
+ * - The number of unique nodes referred to in the graph is written in a text file,
+ *
${outputBasename}.nodes.count.txt
+ * - The number of unique edges referred to in the graph is written in a text file,
+ *
${outputBasename}.edges.count.txt
+ * - The number of unique edge labels is written in a text file,
+ *
${outputBasename}.labels.count.txt
+ * - Statistics on the number of nodes of each type are written in a text file,
+ *
${outputBasename}.nodes.stats.txt
+ * - Statistics on the number of edges of each type are written in a text file,
+ *
${outputBasename}.edges.stats.txt
+ *
+ *
+ *
+ * Rationale: Because the graph can contain holes, loose objects and dangling
+ * objects, some nodes that are referred to as destinations in the edge relationships might not
+ * actually be stored in the graph itself. However, to compress the graph using a graph compression
+ * library, it is necessary to have a list of all the nodes in the graph, including the
+ * ones that are simply referred to by the edges but not actually stored as concrete objects.
+ *
+ *
+ *
+ * This class reads the entire graph dataset, and uses sort -u
to extract the set of
+ * all the unique nodes and unique labels that will be needed as an input for the compression
+ * process.
+ *
+ */
+public class ExtractNodes {
+ private static JSAPResult parseArgs(String[] args) {
+ JSAPResult config = null;
+ try {
+ SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{
+ new UnflaggedOption("dataset", JSAP.STRING_PARSER, JSAP.REQUIRED, "Path to the edges dataset"),
+ new UnflaggedOption("outputBasename", JSAP.STRING_PARSER, JSAP.REQUIRED,
+ "Basename of the output files"),
+
+ new FlaggedOption("format", JSAP.STRING_PARSER, "orc", JSAP.NOT_REQUIRED, 'f', "format",
+ "Format of the input dataset (orc, csv)"),
+ new FlaggedOption("sortBufferSize", JSAP.STRING_PARSER, "30%", JSAP.NOT_REQUIRED, 'S',
+ "sort-buffer-size", "Size of the memory buffer used by sort"),
+ new FlaggedOption("sortTmpDir", JSAP.STRING_PARSER, null, JSAP.NOT_REQUIRED, 'T',
+ "sort-temporary-directory", "Path to the temporary directory used by sort")});
+
+ config = jsap.parse(args);
+ if (jsap.messagePrinted()) {
+ System.exit(1);
+ }
+ } catch (JSAPException e) {
+ e.printStackTrace();
+ }
+ return config;
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ JSAPResult parsedArgs = parseArgs(args);
+ String datasetPath = parsedArgs.getString("dataset");
+ String outputBasename = parsedArgs.getString("outputBasename");
+
+ String datasetFormat = parsedArgs.getString("format");
+ String sortBufferSize = parsedArgs.getString("sortBufferSize");
+ String sortTmpDir = parsedArgs.getString("sortTmpDir", null);
+
+ // Open edge dataset
+ GraphDataset dataset;
+ if (datasetFormat.equals("orc")) {
+ dataset = new ORCGraphDataset(datasetPath);
+ } else if (datasetFormat.equals("csv")) {
+ dataset = new CSVEdgeDataset(datasetPath);
+ } else {
+ throw new IllegalArgumentException("Unknown dataset format: " + datasetFormat);
+ }
+
+ // Spawn node sorting process
+ Process nodeSort = spawnSort(sortBufferSize, sortTmpDir);
+ BufferedOutputStream nodeSortStdin = new BufferedOutputStream(nodeSort.getOutputStream());
+ BufferedInputStream nodeSortStdout = new BufferedInputStream(nodeSort.getInputStream());
+ OutputStream nodesFileOutputStream = new ZstdOutputStream(
+ new BufferedOutputStream(new FileOutputStream(outputBasename + ".nodes.csv.zst")));
+ NodesOutputThread nodesOutputThread = new NodesOutputThread(nodeSortStdout, nodesFileOutputStream);
+ nodesOutputThread.start();
+
+ // Spawn label sorting process
+ Process labelSort = spawnSort(sortBufferSize, sortTmpDir);
+ BufferedOutputStream labelSortStdin = new BufferedOutputStream(labelSort.getOutputStream());
+ BufferedInputStream labelSortStdout = new BufferedInputStream(labelSort.getInputStream());
+ OutputStream labelsFileOutputStream = new ZstdOutputStream(
+ new BufferedOutputStream(new FileOutputStream(outputBasename + ".labels.csv.zst")));
+ LabelsOutputThread labelsOutputThread = new LabelsOutputThread(labelSortStdout, labelsFileOutputStream);
+ labelsOutputThread.start();
+
+ // Read the dataset and write the nodes and labels to the sorting processes
+ long[] edgeCount = {0};
+ long[][] edgeCountByType = new long[Node.Type.values().length][Node.Type.values().length];
+ dataset.readEdges((node) -> {
+ nodeSortStdin.write(node);
+ nodeSortStdin.write('\n');
+ }, (src, dst, label, perm) -> {
+ nodeSortStdin.write(src);
+ nodeSortStdin.write('\n');
+ nodeSortStdin.write(dst);
+ nodeSortStdin.write('\n');
+ if (label != null) {
+ labelSortStdin.write(label);
+ labelSortStdin.write('\n');
+ }
+ edgeCount[0]++;
+ // Extract type of src and dst from their SWHID: swh:1:XXX
+ byte[] srcTypeBytes = Arrays.copyOfRange(src, 6, 6 + 3);
+ byte[] dstTypeBytes = Arrays.copyOfRange(dst, 6, 6 + 3);
+ int srcType = Node.Type.byteNameToInt(srcTypeBytes);
+ int dstType = Node.Type.byteNameToInt(dstTypeBytes);
+ if (srcType != -1 && dstType != -1) {
+ edgeCountByType[srcType][dstType]++;
+ } else {
+ System.err
+ .println("Invalid edge type: " + new String(srcTypeBytes) + " -> " + new String(dstTypeBytes));
+ System.exit(1);
+ }
+ });
+
+ // Wait for sorting processes to finish
+ nodeSortStdin.close();
+ nodeSort.waitFor();
+ labelSortStdin.close();
+ labelSort.waitFor();
+
+ nodesOutputThread.join();
+ labelsOutputThread.join();
+
+ // Write node, edge and label counts/statistics
+ printEdgeCounts(outputBasename, edgeCount[0], edgeCountByType);
+ printNodeCounts(outputBasename, nodesOutputThread.getNodeCount(), nodesOutputThread.getNodeTypeCounts());
+ printLabelCounts(outputBasename, labelsOutputThread.getLabelCount());
+ }
+
+ private static Process spawnSort(String sortBufferSize, String sortTmpDir) throws IOException {
+ ProcessBuilder sortProcessBuilder = new ProcessBuilder();
+ sortProcessBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
+ ArrayList command = new ArrayList<>(List.of("sort", "-u", "--buffer-size", sortBufferSize));
+ if (sortTmpDir != null) {
+ command.add("--temporary-directory");
+ command.add(sortTmpDir);
+ }
+ sortProcessBuilder.command(command);
+ Map env = sortProcessBuilder.environment();
+ env.put("LC_ALL", "C");
+ env.put("LC_COLLATE", "C");
+ env.put("LANG", "C");
+
+ return sortProcessBuilder.start();
+ }
+
+ private static void printEdgeCounts(String basename, long edgeCount, long[][] edgeTypeCounts) throws IOException {
+ PrintWriter nodeCountWriter = new PrintWriter(basename + ".edges.count.txt");
+ nodeCountWriter.println(edgeCount);
+ nodeCountWriter.close();
+
+ PrintWriter nodeTypesCountWriter = new PrintWriter(basename + ".edges.stats.txt");
+ TreeMap edgeTypeCountsMap = new TreeMap<>();
+ for (Node.Type src : Node.Type.values()) {
+ for (Node.Type dst : Node.Type.values()) {
+ long cnt = edgeTypeCounts[Node.Type.toInt(src)][Node.Type.toInt(dst)];
+ if (cnt > 0)
+ edgeTypeCountsMap.put(src.toString().toLowerCase() + ":" + dst.toString().toLowerCase(), cnt);
+ }
+ }
+ for (Map.Entry entry : edgeTypeCountsMap.entrySet()) {
+ nodeTypesCountWriter.println(entry.getKey() + " " + entry.getValue());
+ }
+ nodeTypesCountWriter.close();
+ }
+
+ private static void printNodeCounts(String basename, long nodeCount, long[] nodeTypeCounts) throws IOException {
+ PrintWriter nodeCountWriter = new PrintWriter(basename + ".nodes.count.txt");
+ nodeCountWriter.println(nodeCount);
+ nodeCountWriter.close();
+
+ PrintWriter nodeTypesCountWriter = new PrintWriter(basename + ".nodes.stats.txt");
+ TreeMap nodeTypeCountsMap = new TreeMap<>();
+ for (Node.Type v : Node.Type.values()) {
+ nodeTypeCountsMap.put(v.toString().toLowerCase(), nodeTypeCounts[Node.Type.toInt(v)]);
+ }
+ for (Map.Entry entry : nodeTypeCountsMap.entrySet()) {
+ nodeTypesCountWriter.println(entry.getKey() + " " + entry.getValue());
+ }
+ nodeTypesCountWriter.close();
+ }
+
+ private static void printLabelCounts(String basename, long labelCount) throws IOException {
+ PrintWriter nodeCountWriter = new PrintWriter(basename + ".labels.count.txt");
+ nodeCountWriter.println(labelCount);
+ nodeCountWriter.close();
+ }
+
+ private static class NodesOutputThread extends Thread {
+ private final InputStream sortedNodesStream;
+ private final OutputStream nodesOutputStream;
+
+ private long nodeCount = 0;
+ private final long[] nodeTypeCounts = new long[Node.Type.values().length];
+
+ NodesOutputThread(InputStream sortedNodesStream, OutputStream nodesOutputStream) {
+ this.sortedNodesStream = sortedNodesStream;
+ this.nodesOutputStream = nodesOutputStream;
+ }
+
+ @Override
+ public void run() {
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(sortedNodesStream, StandardCharsets.UTF_8));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ nodesOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
+ nodesOutputStream.write('\n');
+ nodeCount++;
+ Node.Type nodeType = Node.Type.fromStr(line.split(":")[2]);
+ nodeTypeCounts[Node.Type.toInt(nodeType)]++;
+ }
+ nodesOutputStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long getNodeCount() {
+ return nodeCount;
+ }
+
+ public long[] getNodeTypeCounts() {
+ return nodeTypeCounts;
+ }
+ }
+
+ private static class LabelsOutputThread extends Thread {
+ private final InputStream sortedLabelsStream;
+ private final OutputStream labelsOutputStream;
+
+ private long labelCount = 0;
+
+ LabelsOutputThread(InputStream sortedLabelsStream, OutputStream labelsOutputStream) {
+ this.labelsOutputStream = labelsOutputStream;
+ this.sortedLabelsStream = sortedLabelsStream;
+ }
+
+ @Override
+ public void run() {
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(sortedLabelsStream, StandardCharsets.UTF_8));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ labelsOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
+ labelsOutputStream.write('\n');
+ labelCount++;
+ }
+ labelsOutputStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long getLabelCount() {
+ return labelCount;
+ }
+ }
+}
diff --git a/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java
new file mode 100644
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java
@@ -0,0 +1,44 @@
+package org.softwareheritage.graph.compress;
+
+import java.io.IOException;
+
+/**
+ * GraphDataset is a common interface to represent on-disk graph datasets in various formats,
+ * usually extracted from the SWH archive with the swh-dataset tool.
+ */
+public interface GraphDataset {
+ interface NodeCallback {
+ void onNode(byte[] node) throws IOException;
+ }
+
+ interface EdgeCallback {
+ void onEdge(byte[] src, byte[] dst, byte[] label, long permission) throws IOException;
+ }
+
+ /**
+ * Read the graph dataset and call the callback methods for each node and edge encountered.
+ *
+ *
+ * - The node callback is called for each object stored in the graph.
+ * - The edge callback is called for each relationship (between two nodes) stored in the
+ * graph.
+ *
+ *
+ *
+ * Note that because the graph can contain holes, loose objects and dangling objects, the edge
+ * callback may be called with parameters representing nodes that are not stored in the graph. This
+ * is because some nodes that are referred to as destinations in the dataset might not be present in
+ * the archive (e.g., a revision entry in a directory pointing to a revision that we have not
+ * crawled yet).
+ *
+ *
+ *
+ * In order to generate a complete set of all the nodes that are referred to in the graph
+ * dataset, see the {@link ExtractNodes} class.
+ *
+ *
+ * @param nodeCb callback for each node
+ * @param edgeCb callback for each edge
+ */
+ void readEdges(NodeCallback nodeCb, EdgeCallback edgeCb) throws IOException;
+}
diff --git a/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java
new file mode 100644
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java
@@ -0,0 +1,545 @@
+package org.softwareheritage.graph.compress;
+
+import com.github.luben.zstd.ZstdOutputStream;
+import com.google.common.primitives.Bytes;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * A graph dataset in ORC format.
+ *
+ * This format of dataset is a full export of the graph, including all the edge and node properties.
+ *
+ * For convenience purposes, this class also provides a main method to print all the edges of the
+ * graph, so that the output can be piped to
+ * {@link it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph}.
+ */
+public class ORCGraphDataset implements GraphDataset {
+ private final File datasetDir;
+
+ public ORCGraphDataset(String datasetPath) {
+ this(new File(datasetPath));
+ }
+
+ public ORCGraphDataset(File datasetDir) {
+ if (!datasetDir.exists()) {
+ throw new IllegalArgumentException("Dataset " + datasetDir.getName() + " does not exist");
+ }
+ this.datasetDir = datasetDir;
+ }
+
+ /**
+ * Return the given table as a {@link SwhOrcTable}. The return value can be down-casted to the type
+ * of the specific table it represents.
+ */
+ public SwhOrcTable getTable(String tableName) {
+ File tableDir = new File(datasetDir, tableName);
+ if (!tableDir.exists()) {
+ return null;
+ }
+ switch (tableName) {
+ case "skipped_content":
+ return new SkippedContentOrcTable(tableDir);
+ case "content":
+ return new ContentOrcTable(tableDir);
+ case "directory":
+ return new DirectoryOrcTable(tableDir);
+ case "directory_entry":
+ return new DirectoryEntryOrcTable(tableDir);
+ case "revision":
+ return new RevisionOrcTable(tableDir);
+ case "revision_history":
+ return new RevisionHistoryOrcTable(tableDir);
+ case "release":
+ return new ReleaseOrcTable(tableDir);
+ case "snapshot_branch":
+ return new SnapshotBranchOrcTable(tableDir);
+ case "snapshot":
+ return new SnapshotOrcTable(tableDir);
+ case "origin_visit_status":
+ return new OriginVisitStatusOrcTable(tableDir);
+ case "origin_visit":
+ return new OriginVisitOrcTable(tableDir);
+ case "origin":
+ return new OriginOrcTable(tableDir);
+ default :
+ return null;
+ }
+ }
+
+ /** Return all the tables in this dataset as a map of {@link SwhOrcTable}. */
+ public Map allTables() {
+ HashMap tables = new HashMap<>();
+ File[] tableDirs = datasetDir.listFiles();
+ if (tableDirs == null) {
+ return tables;
+ }
+ for (File tableDir : tableDirs) {
+ SwhOrcTable table = getTable(tableDir.getName());
+ if (table != null) {
+ tables.put(tableDir.getName(), table);
+ }
+ }
+ return tables;
+ }
+
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ Map tables = allTables();
+ for (SwhOrcTable table : tables.values()) {
+ table.readEdges(nodeCb, edgeCb);
+ }
+ }
+
+ /**
+ * A class representing an ORC table, stored on disk as a set of ORC files all in the same
+ * directory.
+ */
+ public static class ORCTable {
+ private final File tableDir;
+
+ public ORCTable(File tableDir) {
+ if (!tableDir.exists()) {
+ throw new IllegalArgumentException("Table " + tableDir.getName() + " does not exist");
+ }
+ this.tableDir = tableDir;
+ }
+
+ public static ORCTable load(File tableDir) {
+ return new ORCTable(tableDir);
+ }
+
+ /**
+ * Utility function for byte columns. Return as a byte array the value of the given row in the
+ * column vector.
+ */
+ public static byte[] getBytesRow(BytesColumnVector columnVector, int row) {
+ return Arrays.copyOfRange(columnVector.vector[row], columnVector.start[row],
+ columnVector.start[row] + columnVector.length[row]);
+ }
+
+ interface ReadOrcBatchHandler {
+ void accept(VectorizedRowBatch batch, Map columnMap) throws IOException;
+ }
+
+ /**
+ * Read the table, calling the given handler for each new batch of rows. Optionally, if columns is
+ * not null, will only scan the columns present in this set instead of the entire table.
+ */
+ public void readOrcTable(ReadOrcBatchHandler batchHandler, Set columns) throws IOException {
+ File[] listing = tableDir.listFiles();
+ if (listing == null) {
+ throw new IOException("No files found in " + tableDir.getName());
+ }
+ for (File file : listing) {
+ readOrcFile(file.getPath(), batchHandler, columns);
+ }
+ }
+
+ private void readOrcFile(String path, ReadOrcBatchHandler batchHandler, Set columns)
+ throws IOException {
+ try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(new Configuration()))) {
+ TypeDescription schema = reader.getSchema();
+
+ Reader.Options options = reader.options();
+ if (columns != null) {
+ options.include(createColumnsToRead(schema, columns));
+ }
+ Map columnMap = getColumnMap(schema);
+
+ try (RecordReader records = reader.rows(options)) {
+ VectorizedRowBatch batch = reader.getSchema().createRowBatch(4096);
+ while (records.nextBatch(batch)) {
+ batchHandler.accept(batch, columnMap);
+ }
+ }
+ }
+ }
+
+ private static Map getColumnMap(TypeDescription schema) {
+ Map columnMap = new HashMap<>();
+ List fieldNames = schema.getFieldNames();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ columnMap.put(fieldNames.get(i), i);
+ }
+ return columnMap;
+ }
+
+ private static boolean[] createColumnsToRead(TypeDescription schema, Set columns) {
+ boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1];
+ List fieldNames = schema.getFieldNames();
+ List columnTypes = schema.getChildren();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ if (columns.contains(fieldNames.get(i))) {
+ System.out.println("Adding column " + fieldNames.get(i) + " with ID " + i + " to the read list");
+ TypeDescription type = columnTypes.get(i);
+ for (int id = type.getId(); id <= type.getMaximumId(); id++) {
+ columnsToRead[id] = true;
+ }
+ }
+ }
+ return columnsToRead;
+ }
+ }
+
+ /** Base class for SWH-specific ORC tables. */
+ public static class SwhOrcTable {
+ protected final ORCTable orcTable;
+
+ protected static final byte[] cntPrefix = "swh:1:cnt:".getBytes();
+ protected static byte[] dirPrefix = "swh:1:dir:".getBytes();
+ protected static byte[] revPrefix = "swh:1:rev:".getBytes();
+ protected static byte[] relPrefix = "swh:1:rel:".getBytes();
+ protected static byte[] snpPrefix = "swh:1:snp:".getBytes();
+ protected static byte[] oriPrefix = "swh:1:ori:".getBytes();
+
+ public SwhOrcTable(File tableDir) {
+ orcTable = new ORCTable(tableDir);
+ }
+
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ // No nodes or edges to read in the table by default.
+ }
+
+ protected static byte[] urlToOriginId(byte[] url) {
+ return DigestUtils.sha1Hex(url).getBytes();
+ }
+ }
+
+ public static class SkippedContentOrcTable extends SwhOrcTable {
+ public SkippedContentOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector contentIdVector = (BytesColumnVector) batch.cols[columnMap.get("sha1_git")];
+ for (int row = 0; row < batch.size; row++) {
+ nodeCb.onNode(ORCTable.getBytesRow(contentIdVector, row));
+ }
+ }, Set.of("sha1_git"));
+ }
+ }
+
+ public static class ContentOrcTable extends SwhOrcTable {
+ public ContentOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector contentIdVector = (BytesColumnVector) batch.cols[columnMap.get("sha1_git")];
+ for (int row = 0; row < batch.size; row++) {
+ nodeCb.onNode(ORCTable.getBytesRow(contentIdVector, row));
+ }
+ }, Set.of("sha1_git"));
+ }
+ }
+
+ public static class DirectoryOrcTable extends SwhOrcTable {
+ public DirectoryOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector directoryIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ for (int row = 0; row < batch.size; row++) {
+ nodeCb.onNode(ORCTable.getBytesRow(directoryIdVector, row));
+ }
+ }, Set.of("id"));
+ }
+ }
+
+ public static class DirectoryEntryOrcTable extends SwhOrcTable {
+ public DirectoryEntryOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ byte[] cntType = "file".getBytes();
+ byte[] dirType = "dir".getBytes();
+ byte[] revType = "rev".getBytes();
+
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector srcVector = (BytesColumnVector) batch.cols[columnMap.get("directory_id")];
+ BytesColumnVector dstVector = (BytesColumnVector) batch.cols[columnMap.get("target")];
+ BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("type")];
+ BytesColumnVector labelVector = (BytesColumnVector) batch.cols[columnMap.get("name")];
+ LongColumnVector permissionVector = (LongColumnVector) batch.cols[columnMap.get("perms")];
+
+ for (int row = 0; row < batch.size; row++) {
+ byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row);
+ byte[] targetPrefix;
+ if (Arrays.equals(targetType, cntType)) {
+ targetPrefix = cntPrefix;
+ } else if (Arrays.equals(targetType, dirType)) {
+ targetPrefix = dirPrefix;
+ } else if (Arrays.equals(targetType, revType)) {
+ targetPrefix = revPrefix;
+ } else {
+ continue;
+ }
+
+ byte[] src = Bytes.concat(dirPrefix, ORCTable.getBytesRow(srcVector, row));
+ byte[] dst = Bytes.concat(targetPrefix, ORCTable.getBytesRow(dstVector, row));
+ byte[] label = Base64.getEncoder().encode(ORCTable.getBytesRow(labelVector, row));
+ long permission = permissionVector.vector[row];
+ edgeCb.onEdge(src, dst, label, permission);
+ }
+ }, Set.of("directory_id", "target", "type", "name", "perms"));
+ }
+ }
+
+ public static class RevisionOrcTable extends SwhOrcTable {
+ public RevisionOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector revisionIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ BytesColumnVector directoryIdVector = (BytesColumnVector) batch.cols[columnMap.get("directory")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] revisionId = Bytes.concat(revPrefix, ORCTable.getBytesRow(revisionIdVector, row));
+ byte[] directoryId = Bytes.concat(dirPrefix, ORCTable.getBytesRow(directoryIdVector, row));
+ nodeCb.onNode(revisionId);
+ edgeCb.onEdge(revisionId, directoryId, null, -1);
+ }
+ }, Set.of("id", "directory"));
+ }
+ }
+
+ public static class RevisionHistoryOrcTable extends SwhOrcTable {
+ public RevisionHistoryOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector revisionIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ BytesColumnVector parentIdVector = (BytesColumnVector) batch.cols[columnMap.get("parent_id")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] parentId = Bytes.concat(revPrefix, ORCTable.getBytesRow(parentIdVector, row));
+ byte[] revisionId = Bytes.concat(revPrefix, ORCTable.getBytesRow(revisionIdVector, row));
+ edgeCb.onEdge(parentId, revisionId, null, -1);
+ }
+ }, Set.of("id", "parent_id"));
+ }
+ }
+
+ public static class ReleaseOrcTable extends SwhOrcTable {
+ public ReleaseOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ System.out.println("Reading release edges");
+ byte[] cntType = "content".getBytes();
+ byte[] dirType = "directory".getBytes();
+ byte[] revType = "revision".getBytes();
+ byte[] relType = "release".getBytes();
+
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector releaseIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ BytesColumnVector targetIdVector = (BytesColumnVector) batch.cols[columnMap.get("target")];
+ BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("target_type")];
+
+ for (int row = 0; row < batch.size; row++) {
+ byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row);
+
+ byte[] targetPrefix;
+ if (Arrays.equals(targetType, cntType)) {
+ targetPrefix = cntPrefix;
+ } else if (Arrays.equals(targetType, dirType)) {
+ targetPrefix = dirPrefix;
+ } else if (Arrays.equals(targetType, revType)) {
+ targetPrefix = revPrefix;
+ } else if (Arrays.equals(targetType, relType)) {
+ targetPrefix = relPrefix;
+ } else {
+ continue;
+ }
+
+ byte[] releaseId = Bytes.concat(relPrefix, ORCTable.getBytesRow(releaseIdVector, row));
+ byte[] targetId = Bytes.concat(targetPrefix, ORCTable.getBytesRow(targetIdVector, row));
+ nodeCb.onNode(releaseId);
+ edgeCb.onEdge(releaseId, targetId, null, -1);
+ }
+ }, Set.of("id", "target", "target_type"));
+ }
+ }
+
+ public static class SnapshotOrcTable extends SwhOrcTable {
+ public SnapshotOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ for (int row = 0; row < batch.size; row++) {
+ nodeCb.onNode(ORCTable.getBytesRow(snapshotIdVector, row));
+ }
+ }, Set.of("id"));
+ }
+ }
+
+ public static class SnapshotBranchOrcTable extends SwhOrcTable {
+ public SnapshotBranchOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ byte[] cntType = "content".getBytes();
+ byte[] dirType = "directory".getBytes();
+ byte[] revType = "revision".getBytes();
+ byte[] relType = "release".getBytes();
+
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("snapshot_id")];
+ BytesColumnVector targetIdVector = (BytesColumnVector) batch.cols[columnMap.get("target")];
+ BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("target_type")];
+ BytesColumnVector branchNameVector = (BytesColumnVector) batch.cols[columnMap.get("name")];
+
+ for (int row = 0; row < batch.size; row++) {
+ byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row);
+ byte[] targetPrefix;
+ if (Arrays.equals(targetType, cntType)) {
+ targetPrefix = cntPrefix;
+ } else if (Arrays.equals(targetType, dirType)) {
+ targetPrefix = dirPrefix;
+ } else if (Arrays.equals(targetType, revType)) {
+ targetPrefix = revPrefix;
+ } else if (Arrays.equals(targetType, relType)) {
+ targetPrefix = relPrefix;
+ } else {
+ continue;
+ }
+
+ byte[] snapshotId = Bytes.concat(snpPrefix, ORCTable.getBytesRow(snapshotIdVector, row));
+ byte[] targetId = Bytes.concat(targetPrefix, ORCTable.getBytesRow(targetIdVector, row));
+ byte[] branchName = Base64.getEncoder().encode(ORCTable.getBytesRow(branchNameVector, row));
+ nodeCb.onNode(snapshotId);
+ edgeCb.onEdge(snapshotId, targetId, branchName, -1);
+ }
+ }, Set.of("snapshot_id", "name", "target", "target_type"));
+ }
+ }
+
+ public static class OriginVisitStatusOrcTable extends SwhOrcTable {
+ public OriginVisitStatusOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector originUrlVector = (BytesColumnVector) batch.cols[columnMap.get("origin")];
+ BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("snapshot")];
+
+ for (int row = 0; row < batch.size; row++) {
+ byte[] originId = urlToOriginId(ORCTable.getBytesRow(originUrlVector, row));
+ byte[] snapshot_id = ORCTable.getBytesRow(snapshotIdVector, row);
+ if (snapshot_id.length == 0) {
+ continue;
+ }
+ edgeCb.onEdge(Bytes.concat(oriPrefix, originId), Bytes.concat(snpPrefix, snapshot_id), null, -1);
+ }
+ }, Set.of("origin", "snapshot"));
+ }
+ }
+
+ public static class OriginVisitOrcTable extends SwhOrcTable {
+ public OriginVisitOrcTable(File tableDir) {
+ super(tableDir);
+ }
+ }
+
+ public static class OriginOrcTable extends SwhOrcTable {
+ public OriginOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector originUrlVector = (BytesColumnVector) batch.cols[columnMap.get("url")];
+ for (int row = 0; row < batch.size; row++) {
+ nodeCb.onNode(Bytes.concat(oriPrefix, urlToOriginId(ORCTable.getBytesRow(originUrlVector, row))));
+ }
+ }, Set.of("url"));
+ }
+ }
+
+ /**
+ * Export an ORC graph to the CSV edge dataset format as two different files,
+ * nodes.csv.zst
and edges.csv.zst
.
+ */
+ public static void exportToCsvDataset(String orcDataset, String csvDatasetBasename) throws IOException {
+ ORCGraphDataset dataset = new ORCGraphDataset(orcDataset);
+ File nodesFile = new File(csvDatasetBasename + ".nodes.csv.zst");
+ File edgesFile = new File(csvDatasetBasename + ".edges.csv.zst");
+ BufferedOutputStream nodesOut = new BufferedOutputStream(new ZstdOutputStream(new FileOutputStream(nodesFile)));
+ BufferedOutputStream edgesOut = new BufferedOutputStream(new ZstdOutputStream(new FileOutputStream(edgesFile)));
+ dataset.readEdges((node) -> {
+ nodesOut.write(node);
+ nodesOut.write('\n');
+ }, (src, dst, label, perms) -> {
+ edgesOut.write(src);
+ edgesOut.write(' ');
+ edgesOut.write(dst);
+ if (label != null) {
+ edgesOut.write(' ');
+ edgesOut.write(label);
+ edgesOut.write(' ');
+ }
+ if (perms != -1) {
+ edgesOut.write(' ');
+ edgesOut.write(Long.toString(perms).getBytes());
+ }
+ edgesOut.write('\n');
+ });
+ }
+
+ /**
+ * Print all the edges of the graph to stdout. Can be piped to
+ * {@link it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph} to import the graph dataset and convert
+ * it to a {@link it.unimi.dsi.big.webgraph.BVGraph}.
+ */
+ public static void printSimpleEdges(String orcDataset) throws IOException {
+ ORCGraphDataset dataset = new ORCGraphDataset(orcDataset);
+ BufferedOutputStream out = new BufferedOutputStream(System.out);
+ dataset.readEdges((node) -> {
+ }, (src, dst, label, perms) -> {
+ out.write(src);
+ out.write(' ');
+ out.write(dst);
+ out.write('\n');
+ });
+ out.flush();
+ }
+
+ public static void main(String[] args) throws IOException {
+ printSimpleEdges(args[0]);
+ }
+}