nodeTypes = Arrays.asList(Node.Type.values());
types.addAll(nodeTypes);
} else {
types.add(Node.Type.fromStr(strFmtType));
}
return types;
}
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java
new file mode 100644
index 0000000..ee71713
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java
@@ -0,0 +1,185 @@
+package org.softwareheritage.graph.compress;
+
+import com.github.luben.zstd.ZstdInputStream;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * A graph dataset in (zstd-compressed) CSV format.
+ *
+ * This format does not contain any properties apart from the SWHIDs of the nodes, and optionally
+ * the labels of the edges and the permissions of the directory entries.
+ *
+ * The structure of the dataset is as follows: one directory per object type, each containing:
+ *
+ *
+ * - a number of files
*.nodes.csv.zst
containing the SWHIDs of the objects stored in
+ * the graph, one per line.
+ * - a number of files
*.edges.csv.zst
containing the edges of the graph, one per
+ * line. The format of each edge is as follows:
+ * SRC_SWHID DST_SWHID [BASE64_LABEL] [INT_PERMISSION]
.
+ *
+ *
+ */
+public class CSVEdgeDataset implements GraphDataset {
+ final static Logger logger = LoggerFactory.getLogger(CSVEdgeDataset.class);
+
+ final private File datasetDir;
+
+ public CSVEdgeDataset(String datasetPath) {
+ this(new File(datasetPath));
+ }
+
+ public CSVEdgeDataset(File datasetDir) {
+ if (!datasetDir.exists()) {
+ throw new IllegalArgumentException("Dataset " + datasetDir.getName() + " does not exist");
+ }
+ this.datasetDir = datasetDir;
+ }
+
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ File[] allTables = datasetDir.listFiles();
+ if (allTables == null) {
+ return;
+ }
+ for (File tableFile : allTables) {
+ File[] allCsvFiles = tableFile.listFiles();
+ if (allCsvFiles == null) {
+ continue;
+ }
+ for (File csvFile : allCsvFiles) {
+ if (csvFile.getName().endsWith(".edges.csv.zst")) {
+ readEdgesCsvZst(csvFile.getPath(), edgeCb);
+ } else if (csvFile.getName().endsWith(".nodes.csv.zst")) {
+ readNodesCsvZst(csvFile.getPath(), nodeCb);
+ }
+ }
+ }
+ }
+
+ public static void readEdgesCsvZst(String csvZstPath, GraphDataset.EdgeCallback cb) throws IOException {
+ InputStream csvInputStream = new ZstdInputStream(new BufferedInputStream(new FileInputStream(csvZstPath)));
+ readEdgesCsv(csvInputStream, cb);
+ }
+
+ public static void readEdgesCsv(InputStream csvInputStream, GraphDataset.EdgeCallback cb) throws IOException {
+ FastBufferedInputStream csvReader = new FastBufferedInputStream(csvInputStream);
+
+ Charset charset = StandardCharsets.US_ASCII;
+ byte[] array = new byte[1024];
+ for (long line = 0;; line++) {
+ int start = 0, len;
+ while ((len = csvReader.readLine(array, start, array.length - start,
+ FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) {
+ start += len;
+ array = ByteArrays.grow(array, array.length + 1);
+ }
+ if (len == -1)
+ break; // EOF
+ final int lineLength = start + len;
+
+ // Skip whitespace at the start of the line.
+ int offset = 0;
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ if (offset == lineLength) {
+ continue;
+ }
+ if (array[0] == '#')
+ continue;
+
+ // Scan source id.
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ final byte[] ss = Arrays.copyOfRange(array, start, offset);
+
+ // Skip whitespace between identifiers.
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ if (offset == lineLength) {
+ logger.error("Error at line " + line + ": no target");
+ continue;
+ }
+
+ // Scan target ID
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ final byte[] ts = Arrays.copyOfRange(array, start, offset);
+
+ // Skip whitespace between identifiers.
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ // Scan label
+ byte[] ls = null;
+ if (offset < lineLength) {
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ ls = Arrays.copyOfRange(array, start, offset);
+ }
+
+ // Skip whitespace between identifiers.
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ // Scan permission
+ int permission = 0;
+ if (offset < lineLength) {
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ permission = Integer.parseInt(new String(array, start, offset - start, charset));
+ }
+
+ cb.onEdge(ss, ts, ls, permission);
+ }
+ }
+
+ public static void readNodesCsvZst(String csvZstPath, GraphDataset.NodeCallback cb) throws IOException {
+ InputStream csvInputStream = new ZstdInputStream(new BufferedInputStream(new FileInputStream(csvZstPath)));
+ readNodesCsv(csvInputStream, cb);
+ }
+
+ public static void readNodesCsv(InputStream csvInputStream, GraphDataset.NodeCallback cb) throws IOException {
+ FastBufferedInputStream csvReader = new FastBufferedInputStream(csvInputStream);
+
+ byte[] array = new byte[1024];
+ for (long line = 0;; line++) {
+ int start = 0, len;
+ while ((len = csvReader.readLine(array, start, array.length - start,
+ FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) {
+ start += len;
+ array = ByteArrays.grow(array, array.length + 1);
+ }
+ if (len == -1)
+ break; // EOF
+ final int lineLength = start + len;
+
+ // Skip whitespace at the start of the line.
+ int offset = 0;
+ while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
+ offset++;
+ if (offset == lineLength) {
+ continue;
+ }
+ if (array[0] == '#')
+ continue;
+
+ // Scan source id.
+ start = offset;
+ while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
+ offset++;
+ final byte[] ss = Arrays.copyOfRange(array, start, offset);
+
+ cb.onNode(ss);
+ }
+ }
+}
diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java b/java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java
similarity index 96%
rename from java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java
rename to java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java
index 3e094e8..ef13166 100644
--- a/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java
+++ b/java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java
@@ -1,51 +1,51 @@
-package org.softwareheritage.graph.utils;
+package org.softwareheritage.graph.compress;
import com.martiansoftware.jsap.*;
import it.unimi.dsi.Util;
import it.unimi.dsi.fastutil.io.BinIO;
import java.io.File;
import java.io.IOException;
/**
* CLI program used to compose two on-disk permutations.
*
* It takes two on-disk permutations as parameters, p1 and p2, and writes on disk (p1 o p2) at the
- * given location. This is useful for multi-step compression (e.g. Unordered -> BFS -> LLP), as it
+ * given location. This is useful for multi-step compression (e.g., Unordered -> BFS -> LLP), as it
* can be used to merge all the intermediate permutations.
*/
public class ComposePermutations {
private static JSAPResult parse_args(String[] args) {
JSAPResult config = null;
try {
SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{
new UnflaggedOption("firstPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED, "The first permutation"),
new UnflaggedOption("secondPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED,
"The second permutation"),
new UnflaggedOption("outputPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED,
"The output permutation"),});
config = jsap.parse(args);
if (jsap.messagePrinted()) {
System.exit(1);
}
} catch (JSAPException e) {
e.printStackTrace();
}
return config;
}
public static void main(String[] args) throws IOException, ClassNotFoundException {
JSAPResult config = parse_args(args);
String firstPermFilename = config.getString("firstPermutation");
String secondPermFilename = config.getString("secondPermutation");
String outputPermFilename = config.getString("outputPermutation");
long[][] firstPerm = BinIO.loadLongsBig(new File(firstPermFilename));
long[][] secondPerm = BinIO.loadLongsBig(new File(secondPermFilename));
long[][] outputPerm = Util.composePermutationsInPlace(firstPerm, secondPerm);
BinIO.storeLongs(outputPerm, outputPermFilename);
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java b/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
new file mode 100644
index 0000000..707b44f
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
@@ -0,0 +1,292 @@
+package org.softwareheritage.graph.compress;
+
+import com.github.luben.zstd.ZstdOutputStream;
+import com.martiansoftware.jsap.*;
+import org.softwareheritage.graph.Node;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+/**
+ * Read a graph dataset and extract all the unique node SWHIDs it contains, including the ones that
+ * are not stored as actual objects in the graph, but only referred to by the edges.
+ * Additionally, extract the set of all unique edge labels in the graph.
+ *
+ *
+ * - The set of nodes is written in
${outputBasename}.nodes.csv.zst
, as a
+ * zst-compressed sorted list of SWHIDs, one per line.
+ * - The set of edge labels is written in
${outputBasename}.labels.csv.zst
, as a
+ * zst-compressed sorted list of labels encoded in base64, one per line.
+ * - The number of unique nodes referred to in the graph is written in a text file,
+ *
${outputBasename}.nodes.count.txt
+ * - The number of unique edges referred to in the graph is written in a text file,
+ *
${outputBasename}.edges.count.txt
+ * - The number of unique edge labels is written in a text file,
+ *
${outputBasename}.labels.count.txt
+ * - Statistics on the number of nodes of each type are written in a text file,
+ *
${outputBasename}.nodes.stats.txt
+ * - Statistics on the number of edges of each type are written in a text file,
+ *
${outputBasename}.edges.stats.txt
+ *
+ *
+ *
+ * Rationale: Because the graph can contain holes, loose objects and dangling
+ * objects, some nodes that are referred to as destinations in the edge relationships might not
+ * actually be stored in the graph itself. However, to compress the graph using a graph compression
+ * library, it is necessary to have a list of all the nodes in the graph, including the
+ * ones that are simply referred to by the edges but not actually stored as concrete objects.
+ *
+ *
+ *
+ * This class reads the entire graph dataset, and uses sort -u
to extract the set of
+ * all the unique nodes and unique labels that will be needed as an input for the compression
+ * process.
+ *
+ */
+public class ExtractNodes {
+ private static JSAPResult parseArgs(String[] args) {
+ JSAPResult config = null;
+ try {
+ SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{
+ new UnflaggedOption("dataset", JSAP.STRING_PARSER, JSAP.REQUIRED, "Path to the edges dataset"),
+ new UnflaggedOption("outputBasename", JSAP.STRING_PARSER, JSAP.REQUIRED,
+ "Basename of the output files"),
+
+ new FlaggedOption("format", JSAP.STRING_PARSER, "orc", JSAP.NOT_REQUIRED, 'f', "format",
+ "Format of the input dataset (orc, csv)"),
+ new FlaggedOption("sortBufferSize", JSAP.STRING_PARSER, "30%", JSAP.NOT_REQUIRED, 'S',
+ "sort-buffer-size", "Size of the memory buffer used by sort"),
+ new FlaggedOption("sortTmpDir", JSAP.STRING_PARSER, null, JSAP.NOT_REQUIRED, 'T',
+ "sort-temporary-directory", "Path to the temporary directory used by sort")});
+
+ config = jsap.parse(args);
+ if (jsap.messagePrinted()) {
+ System.exit(1);
+ }
+ } catch (JSAPException e) {
+ System.err.println("Usage error: " + e.getMessage());
+ System.exit(1);
+ }
+ return config;
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ JSAPResult parsedArgs = parseArgs(args);
+ String datasetPath = parsedArgs.getString("dataset");
+ String outputBasename = parsedArgs.getString("outputBasename");
+
+ String datasetFormat = parsedArgs.getString("format");
+ String sortBufferSize = parsedArgs.getString("sortBufferSize");
+ String sortTmpDir = parsedArgs.getString("sortTmpDir", null);
+
+ // Open edge dataset
+ GraphDataset dataset;
+ if (datasetFormat.equals("orc")) {
+ dataset = new ORCGraphDataset(datasetPath);
+ } else if (datasetFormat.equals("csv")) {
+ dataset = new CSVEdgeDataset(datasetPath);
+ } else {
+ throw new IllegalArgumentException("Unknown dataset format: " + datasetFormat);
+ }
+
+ // Spawn node sorting process
+ Process nodeSort = spawnSort(sortBufferSize, sortTmpDir);
+ BufferedOutputStream nodeSortStdin = new BufferedOutputStream(nodeSort.getOutputStream());
+ BufferedInputStream nodeSortStdout = new BufferedInputStream(nodeSort.getInputStream());
+ OutputStream nodesFileOutputStream = new ZstdOutputStream(
+ new BufferedOutputStream(new FileOutputStream(outputBasename + ".nodes.csv.zst")));
+ NodesOutputThread nodesOutputThread = new NodesOutputThread(nodeSortStdout, nodesFileOutputStream);
+ nodesOutputThread.start();
+
+ // Spawn label sorting process
+ Process labelSort = spawnSort(sortBufferSize, sortTmpDir);
+ BufferedOutputStream labelSortStdin = new BufferedOutputStream(labelSort.getOutputStream());
+ BufferedInputStream labelSortStdout = new BufferedInputStream(labelSort.getInputStream());
+ OutputStream labelsFileOutputStream = new ZstdOutputStream(
+ new BufferedOutputStream(new FileOutputStream(outputBasename + ".labels.csv.zst")));
+ LabelsOutputThread labelsOutputThread = new LabelsOutputThread(labelSortStdout, labelsFileOutputStream);
+ labelsOutputThread.start();
+
+ // Read the dataset and write the nodes and labels to the sorting processes
+ long[] edgeCount = {0};
+ long[][] edgeCountByType = new long[Node.Type.values().length][Node.Type.values().length];
+ dataset.readEdges((node) -> {
+ nodeSortStdin.write(node);
+ nodeSortStdin.write('\n');
+ }, (src, dst, label, perm) -> {
+ nodeSortStdin.write(src);
+ nodeSortStdin.write('\n');
+ nodeSortStdin.write(dst);
+ nodeSortStdin.write('\n');
+ if (label != null) {
+ labelSortStdin.write(label);
+ labelSortStdin.write('\n');
+ }
+ edgeCount[0]++;
+ // Extract type of src and dst from their SWHID: swh:1:XXX
+ byte[] srcTypeBytes = Arrays.copyOfRange(src, 6, 6 + 3);
+ byte[] dstTypeBytes = Arrays.copyOfRange(dst, 6, 6 + 3);
+ int srcType = Node.Type.byteNameToInt(srcTypeBytes);
+ int dstType = Node.Type.byteNameToInt(dstTypeBytes);
+ if (srcType != -1 && dstType != -1) {
+ edgeCountByType[srcType][dstType]++;
+ } else {
+ System.err
+ .println("Invalid edge type: " + new String(srcTypeBytes) + " -> " + new String(dstTypeBytes));
+ System.exit(1);
+ }
+ });
+
+ // Wait for sorting processes to finish
+ nodeSortStdin.close();
+ nodeSort.waitFor();
+ labelSortStdin.close();
+ labelSort.waitFor();
+
+ nodesOutputThread.join();
+ labelsOutputThread.join();
+
+ // Write node, edge and label counts/statistics
+ printEdgeCounts(outputBasename, edgeCount[0], edgeCountByType);
+ printNodeCounts(outputBasename, nodesOutputThread.getNodeCount(), nodesOutputThread.getNodeTypeCounts());
+ printLabelCounts(outputBasename, labelsOutputThread.getLabelCount());
+ }
+
+ private static Process spawnSort(String sortBufferSize, String sortTmpDir) throws IOException {
+ ProcessBuilder sortProcessBuilder = new ProcessBuilder();
+ sortProcessBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
+ ArrayList command = new ArrayList<>(List.of("sort", "-u", "--buffer-size", sortBufferSize));
+ if (sortTmpDir != null) {
+ command.add("--temporary-directory");
+ command.add(sortTmpDir);
+ }
+ sortProcessBuilder.command(command);
+ Map env = sortProcessBuilder.environment();
+ env.put("LC_ALL", "C");
+ env.put("LC_COLLATE", "C");
+ env.put("LANG", "C");
+
+ return sortProcessBuilder.start();
+ }
+
+ private static void printEdgeCounts(String basename, long edgeCount, long[][] edgeTypeCounts) throws IOException {
+ PrintWriter nodeCountWriter = new PrintWriter(basename + ".edges.count.txt");
+ nodeCountWriter.println(edgeCount);
+ nodeCountWriter.close();
+
+ PrintWriter nodeTypesCountWriter = new PrintWriter(basename + ".edges.stats.txt");
+ TreeMap edgeTypeCountsMap = new TreeMap<>();
+ for (Node.Type src : Node.Type.values()) {
+ for (Node.Type dst : Node.Type.values()) {
+ long cnt = edgeTypeCounts[Node.Type.toInt(src)][Node.Type.toInt(dst)];
+ if (cnt > 0)
+ edgeTypeCountsMap.put(src.toString().toLowerCase() + ":" + dst.toString().toLowerCase(), cnt);
+ }
+ }
+ for (Map.Entry entry : edgeTypeCountsMap.entrySet()) {
+ nodeTypesCountWriter.println(entry.getKey() + " " + entry.getValue());
+ }
+ nodeTypesCountWriter.close();
+ }
+
+ private static void printNodeCounts(String basename, long nodeCount, long[] nodeTypeCounts) throws IOException {
+ PrintWriter nodeCountWriter = new PrintWriter(basename + ".nodes.count.txt");
+ nodeCountWriter.println(nodeCount);
+ nodeCountWriter.close();
+
+ PrintWriter nodeTypesCountWriter = new PrintWriter(basename + ".nodes.stats.txt");
+ TreeMap nodeTypeCountsMap = new TreeMap<>();
+ for (Node.Type v : Node.Type.values()) {
+ nodeTypeCountsMap.put(v.toString().toLowerCase(), nodeTypeCounts[Node.Type.toInt(v)]);
+ }
+ for (Map.Entry entry : nodeTypeCountsMap.entrySet()) {
+ nodeTypesCountWriter.println(entry.getKey() + " " + entry.getValue());
+ }
+ nodeTypesCountWriter.close();
+ }
+
+ private static void printLabelCounts(String basename, long labelCount) throws IOException {
+ PrintWriter nodeCountWriter = new PrintWriter(basename + ".labels.count.txt");
+ nodeCountWriter.println(labelCount);
+ nodeCountWriter.close();
+ }
+
+ private static class NodesOutputThread extends Thread {
+ private final InputStream sortedNodesStream;
+ private final OutputStream nodesOutputStream;
+
+ private long nodeCount = 0;
+ private final long[] nodeTypeCounts = new long[Node.Type.values().length];
+
+ NodesOutputThread(InputStream sortedNodesStream, OutputStream nodesOutputStream) {
+ this.sortedNodesStream = sortedNodesStream;
+ this.nodesOutputStream = nodesOutputStream;
+ }
+
+ @Override
+ public void run() {
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(sortedNodesStream, StandardCharsets.UTF_8));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ nodesOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
+ nodesOutputStream.write('\n');
+ nodeCount++;
+ try {
+ Node.Type nodeType = Node.Type.fromStr(line.split(":")[2]);
+ nodeTypeCounts[Node.Type.toInt(nodeType)]++;
+ } catch (ArrayIndexOutOfBoundsException e) {
+ System.err.println("Error parsing SWHID: " + line);
+ System.exit(1);
+ }
+ }
+ nodesOutputStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long getNodeCount() {
+ return nodeCount;
+ }
+
+ public long[] getNodeTypeCounts() {
+ return nodeTypeCounts;
+ }
+ }
+
+ private static class LabelsOutputThread extends Thread {
+ private final InputStream sortedLabelsStream;
+ private final OutputStream labelsOutputStream;
+
+ private long labelCount = 0;
+
+ LabelsOutputThread(InputStream sortedLabelsStream, OutputStream labelsOutputStream) {
+ this.labelsOutputStream = labelsOutputStream;
+ this.sortedLabelsStream = sortedLabelsStream;
+ }
+
+ @Override
+ public void run() {
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(sortedLabelsStream, StandardCharsets.UTF_8));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ labelsOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
+ labelsOutputStream.write('\n');
+ labelCount++;
+ }
+ labelsOutputStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long getLabelCount() {
+ return labelCount;
+ }
+ }
+}
diff --git a/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java
new file mode 100644
index 0000000..ada0319
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java
@@ -0,0 +1,44 @@
+package org.softwareheritage.graph.compress;
+
+import java.io.IOException;
+
+/**
+ * GraphDataset is a common interface to represent on-disk graph datasets in various formats,
+ * usually extracted from the SWH archive with the swh-dataset tool.
+ */
+public interface GraphDataset {
+ interface NodeCallback {
+ void onNode(byte[] node) throws IOException;
+ }
+
+ interface EdgeCallback {
+ void onEdge(byte[] src, byte[] dst, byte[] label, long permission) throws IOException;
+ }
+
+ /**
+ * Read the graph dataset and call the callback methods for each node and edge encountered.
+ *
+ *
+ * - The node callback is called for each object stored in the graph.
+ * - The edge callback is called for each relationship (between two nodes) stored in the
+ * graph.
+ *
+ *
+ *
+ * Note that because the graph can contain holes, loose objects and dangling objects, the edge
+ * callback may be called with parameters representing nodes that are not stored in the graph. This
+ * is because some nodes that are referred to as destinations in the dataset might not be present in
+ * the archive (e.g., a revision entry in a directory pointing to a revision that we have not
+ * crawled yet).
+ *
+ *
+ *
+ * In order to generate a complete set of all the nodes that are referred to in the graph
+ * dataset, see the {@link ExtractNodes} class.
+ *
+ *
+ * @param nodeCb callback for each node
+ * @param edgeCb callback for each edge
+ */
+ void readEdges(NodeCallback nodeCb, EdgeCallback edgeCb) throws IOException;
+}
diff --git a/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java
new file mode 100644
index 0000000..b16b435
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java
@@ -0,0 +1,565 @@
+package org.softwareheritage.graph.compress;
+
+import com.github.luben.zstd.ZstdOutputStream;
+import com.google.common.primitives.Bytes;
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * A graph dataset in ORC format.
+ *
+ * This format of dataset is a full export of the graph, including all the edge and node properties.
+ *
+ * For convenience purposes, this class also provides a main method to print all the edges of the
+ * graph, so that the output can be piped to
+ * {@link it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph}.
+ *
+ * Reading edges from ORC files using this class is about ~2.5 times slower than reading them
+ * directly from a plaintext format.
+ */
+public class ORCGraphDataset implements GraphDataset {
+ final static Logger logger = LoggerFactory.getLogger(ORCGraphDataset.class);
+
+ final static public int ORC_BATCH_SIZE = 16 * 1024;
+
+ private final File datasetDir;
+
+ public ORCGraphDataset(String datasetPath) {
+ this(new File(datasetPath));
+ }
+
+ public ORCGraphDataset(File datasetDir) {
+ if (!datasetDir.exists()) {
+ throw new IllegalArgumentException("Dataset " + datasetDir.getName() + " does not exist");
+ }
+ this.datasetDir = datasetDir;
+ }
+
+ /**
+ * Return the given table as a {@link SwhOrcTable}. The return value can be down-casted to the type
+ * of the specific table it represents.
+ */
+ public SwhOrcTable getTable(String tableName) {
+ File tableDir = new File(datasetDir, tableName);
+ if (!tableDir.exists()) {
+ return null;
+ }
+ switch (tableName) {
+ case "skipped_content":
+ return new SkippedContentOrcTable(tableDir);
+ case "content":
+ return new ContentOrcTable(tableDir);
+ case "directory":
+ return new DirectoryOrcTable(tableDir);
+ case "directory_entry":
+ return new DirectoryEntryOrcTable(tableDir);
+ case "revision":
+ return new RevisionOrcTable(tableDir);
+ case "revision_history":
+ return new RevisionHistoryOrcTable(tableDir);
+ case "release":
+ return new ReleaseOrcTable(tableDir);
+ case "snapshot_branch":
+ return new SnapshotBranchOrcTable(tableDir);
+ case "snapshot":
+ return new SnapshotOrcTable(tableDir);
+ case "origin_visit_status":
+ return new OriginVisitStatusOrcTable(tableDir);
+ case "origin_visit":
+ return new OriginVisitOrcTable(tableDir);
+ case "origin":
+ return new OriginOrcTable(tableDir);
+ default :
+ return null;
+ }
+ }
+
+ /** Return all the tables in this dataset as a map of {@link SwhOrcTable}. */
+ public Map allTables() {
+ HashMap tables = new HashMap<>();
+ File[] tableDirs = datasetDir.listFiles();
+ if (tableDirs == null) {
+ return tables;
+ }
+ for (File tableDir : tableDirs) {
+ SwhOrcTable table = getTable(tableDir.getName());
+ if (table != null) {
+ tables.put(tableDir.getName(), table);
+ }
+ }
+ return tables;
+ }
+
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ Map tables = allTables();
+ for (SwhOrcTable table : tables.values()) {
+ table.readEdges(nodeCb, edgeCb);
+ }
+ }
+
+ /**
+ * A class representing an ORC table, stored on disk as a set of ORC files all in the same
+ * directory.
+ */
+ public static class ORCTable {
+ private final File tableDir;
+
+ public ORCTable(File tableDir) {
+ if (!tableDir.exists()) {
+ throw new IllegalArgumentException("Table " + tableDir.getName() + " does not exist");
+ }
+ this.tableDir = tableDir;
+ }
+
+ public static ORCTable load(File tableDir) {
+ return new ORCTable(tableDir);
+ }
+
+ /**
+ * Utility function for byte columns. Return as a byte array the value of the given row in the
+ * column vector.
+ */
+ public static byte[] getBytesRow(BytesColumnVector columnVector, int row) {
+ if (columnVector.isRepeating) {
+ row = 0;
+ }
+ return Arrays.copyOfRange(columnVector.vector[row], columnVector.start[row],
+ columnVector.start[row] + columnVector.length[row]);
+ }
+
+ interface ReadOrcBatchHandler {
+ void accept(VectorizedRowBatch batch, Map columnMap) throws IOException;
+ }
+
+ /**
+ * Read the table, calling the given handler for each new batch of rows. Optionally, if columns is
+ * not null, will only scan the columns present in this set instead of the entire table.
+ */
+ public void readOrcTable(ReadOrcBatchHandler batchHandler, Set columns) throws IOException {
+ File[] listing = tableDir.listFiles();
+ if (listing == null) {
+ throw new IOException("No files found in " + tableDir.getName());
+ }
+ for (File file : listing) {
+ readOrcFile(file.getPath(), batchHandler, columns);
+ }
+ }
+
+ private void readOrcFile(String path, ReadOrcBatchHandler batchHandler, Set columns)
+ throws IOException {
+ try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(new Configuration()))) {
+ TypeDescription schema = reader.getSchema();
+
+ Reader.Options options = reader.options();
+ if (columns != null) {
+ options.include(createColumnsToRead(schema, columns));
+ }
+ Map columnMap = getColumnMap(schema);
+
+ try (RecordReader records = reader.rows(options)) {
+ VectorizedRowBatch batch = reader.getSchema().createRowBatch(ORC_BATCH_SIZE);
+ while (records.nextBatch(batch)) {
+ batchHandler.accept(batch, columnMap);
+ }
+ }
+ }
+ }
+
+ private static Map getColumnMap(TypeDescription schema) {
+ Map columnMap = new HashMap<>();
+ List fieldNames = schema.getFieldNames();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ columnMap.put(fieldNames.get(i), i);
+ }
+ return columnMap;
+ }
+
+ private static boolean[] createColumnsToRead(TypeDescription schema, Set columns) {
+ boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1];
+ List fieldNames = schema.getFieldNames();
+ List columnTypes = schema.getChildren();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ if (columns.contains(fieldNames.get(i))) {
+ logger.debug("Adding column " + fieldNames.get(i) + " with ID " + i + " to the read list");
+ TypeDescription type = columnTypes.get(i);
+ for (int id = type.getId(); id <= type.getMaximumId(); id++) {
+ columnsToRead[id] = true;
+ }
+ }
+ }
+ return columnsToRead;
+ }
+ }
+
+ /** Base class for SWH-specific ORC tables. */
+ public static class SwhOrcTable {
+ protected final ORCTable orcTable;
+
+ protected static final byte[] cntPrefix = "swh:1:cnt:".getBytes();
+ protected static byte[] dirPrefix = "swh:1:dir:".getBytes();
+ protected static byte[] revPrefix = "swh:1:rev:".getBytes();
+ protected static byte[] relPrefix = "swh:1:rel:".getBytes();
+ protected static byte[] snpPrefix = "swh:1:snp:".getBytes();
+ protected static byte[] oriPrefix = "swh:1:ori:".getBytes();
+
+ public SwhOrcTable(File tableDir) {
+ orcTable = new ORCTable(tableDir);
+ }
+
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ // No nodes or edges to read in the table by default.
+ }
+
+ protected static byte[] urlToOriginId(byte[] url) {
+ return DigestUtils.sha1Hex(url).getBytes();
+ }
+ }
+
+ public static class SkippedContentOrcTable extends SwhOrcTable {
+ public SkippedContentOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector contentIdVector = (BytesColumnVector) batch.cols[columnMap.get("sha1_git")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] contentId = Bytes.concat(cntPrefix, ORCTable.getBytesRow(contentIdVector, row));
+ nodeCb.onNode(contentId);
+ }
+ }, Set.of("sha1_git"));
+ }
+ }
+
+ public static class ContentOrcTable extends SwhOrcTable {
+ public ContentOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector contentIdVector = (BytesColumnVector) batch.cols[columnMap.get("sha1_git")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] contentId = Bytes.concat(cntPrefix, ORCTable.getBytesRow(contentIdVector, row));
+ nodeCb.onNode(contentId);
+ }
+ }, Set.of("sha1_git"));
+ }
+ }
+
+ public static class DirectoryOrcTable extends SwhOrcTable {
+ public DirectoryOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector directoryIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] directoryId = Bytes.concat(dirPrefix, ORCTable.getBytesRow(directoryIdVector, row));
+ nodeCb.onNode(directoryId);
+ }
+ }, Set.of("id"));
+ }
+ }
+
+ public static class DirectoryEntryOrcTable extends SwhOrcTable {
+ public DirectoryEntryOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ byte[] cntType = "file".getBytes();
+ byte[] dirType = "dir".getBytes();
+ byte[] revType = "rev".getBytes();
+
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector srcVector = (BytesColumnVector) batch.cols[columnMap.get("directory_id")];
+ BytesColumnVector dstVector = (BytesColumnVector) batch.cols[columnMap.get("target")];
+ BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("type")];
+ BytesColumnVector labelVector = (BytesColumnVector) batch.cols[columnMap.get("name")];
+ LongColumnVector permissionVector = (LongColumnVector) batch.cols[columnMap.get("perms")];
+
+ for (int row = 0; row < batch.size; row++) {
+ byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row);
+ byte[] targetPrefix;
+ if (Arrays.equals(targetType, cntType)) {
+ targetPrefix = cntPrefix;
+ } else if (Arrays.equals(targetType, dirType)) {
+ targetPrefix = dirPrefix;
+ } else if (Arrays.equals(targetType, revType)) {
+ targetPrefix = revPrefix;
+ } else {
+ continue;
+ }
+
+ byte[] src = Bytes.concat(dirPrefix, ORCTable.getBytesRow(srcVector, row));
+ byte[] dst = Bytes.concat(targetPrefix, ORCTable.getBytesRow(dstVector, row));
+ byte[] label = Base64.getEncoder().encode(ORCTable.getBytesRow(labelVector, row));
+ long permission = permissionVector.vector[row];
+ edgeCb.onEdge(src, dst, label, permission);
+ }
+ }, Set.of("directory_id", "target", "type", "name", "perms"));
+ }
+ }
+
+ public static class RevisionOrcTable extends SwhOrcTable {
+ public RevisionOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector revisionIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ BytesColumnVector directoryIdVector = (BytesColumnVector) batch.cols[columnMap.get("directory")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] revisionId = Bytes.concat(revPrefix, ORCTable.getBytesRow(revisionIdVector, row));
+ byte[] directoryId = Bytes.concat(dirPrefix, ORCTable.getBytesRow(directoryIdVector, row));
+ nodeCb.onNode(revisionId);
+ edgeCb.onEdge(revisionId, directoryId, null, -1);
+ }
+ }, Set.of("id", "directory"));
+ }
+ }
+
+ public static class RevisionHistoryOrcTable extends SwhOrcTable {
+ public RevisionHistoryOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector revisionIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ BytesColumnVector parentIdVector = (BytesColumnVector) batch.cols[columnMap.get("parent_id")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] parentId = Bytes.concat(revPrefix, ORCTable.getBytesRow(parentIdVector, row));
+ byte[] revisionId = Bytes.concat(revPrefix, ORCTable.getBytesRow(revisionIdVector, row));
+ edgeCb.onEdge(parentId, revisionId, null, -1);
+ }
+ }, Set.of("id", "parent_id"));
+ }
+ }
+
+ public static class ReleaseOrcTable extends SwhOrcTable {
+ public ReleaseOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ byte[] cntType = "content".getBytes();
+ byte[] dirType = "directory".getBytes();
+ byte[] revType = "revision".getBytes();
+ byte[] relType = "release".getBytes();
+
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector releaseIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ BytesColumnVector targetIdVector = (BytesColumnVector) batch.cols[columnMap.get("target")];
+ BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("target_type")];
+
+ for (int row = 0; row < batch.size; row++) {
+ byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row);
+
+ byte[] targetPrefix;
+ if (Arrays.equals(targetType, cntType)) {
+ targetPrefix = cntPrefix;
+ } else if (Arrays.equals(targetType, dirType)) {
+ targetPrefix = dirPrefix;
+ } else if (Arrays.equals(targetType, revType)) {
+ targetPrefix = revPrefix;
+ } else if (Arrays.equals(targetType, relType)) {
+ targetPrefix = relPrefix;
+ } else {
+ continue;
+ }
+
+ byte[] releaseId = Bytes.concat(relPrefix, ORCTable.getBytesRow(releaseIdVector, row));
+ byte[] targetId = Bytes.concat(targetPrefix, ORCTable.getBytesRow(targetIdVector, row));
+ nodeCb.onNode(releaseId);
+ edgeCb.onEdge(releaseId, targetId, null, -1);
+ }
+ }, Set.of("id", "target", "target_type"));
+ }
+ }
+
+ public static class SnapshotOrcTable extends SwhOrcTable {
+ public SnapshotOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] snapshotId = Bytes.concat(snpPrefix, ORCTable.getBytesRow(snapshotIdVector, row));
+ nodeCb.onNode(snapshotId);
+ }
+ }, Set.of("id"));
+ }
+ }
+
+ public static class SnapshotBranchOrcTable extends SwhOrcTable {
+ public SnapshotBranchOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ byte[] cntType = "content".getBytes();
+ byte[] dirType = "directory".getBytes();
+ byte[] revType = "revision".getBytes();
+ byte[] relType = "release".getBytes();
+
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("snapshot_id")];
+ BytesColumnVector targetIdVector = (BytesColumnVector) batch.cols[columnMap.get("target")];
+ BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("target_type")];
+ BytesColumnVector branchNameVector = (BytesColumnVector) batch.cols[columnMap.get("name")];
+
+ for (int row = 0; row < batch.size; row++) {
+ byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row);
+ byte[] targetPrefix;
+ if (Arrays.equals(targetType, cntType)) {
+ targetPrefix = cntPrefix;
+ } else if (Arrays.equals(targetType, dirType)) {
+ targetPrefix = dirPrefix;
+ } else if (Arrays.equals(targetType, revType)) {
+ targetPrefix = revPrefix;
+ } else if (Arrays.equals(targetType, relType)) {
+ targetPrefix = relPrefix;
+ } else {
+ continue;
+ }
+
+ byte[] snapshotId = Bytes.concat(snpPrefix, ORCTable.getBytesRow(snapshotIdVector, row));
+ byte[] targetId = Bytes.concat(targetPrefix, ORCTable.getBytesRow(targetIdVector, row));
+ byte[] branchName = Base64.getEncoder().encode(ORCTable.getBytesRow(branchNameVector, row));
+ nodeCb.onNode(snapshotId);
+ edgeCb.onEdge(snapshotId, targetId, branchName, -1);
+ }
+ }, Set.of("snapshot_id", "name", "target", "target_type"));
+ }
+ }
+
+ public static class OriginVisitStatusOrcTable extends SwhOrcTable {
+ public OriginVisitStatusOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector originUrlVector = (BytesColumnVector) batch.cols[columnMap.get("origin")];
+ BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("snapshot")];
+
+ for (int row = 0; row < batch.size; row++) {
+ byte[] originId = urlToOriginId(ORCTable.getBytesRow(originUrlVector, row));
+ byte[] snapshot_id = ORCTable.getBytesRow(snapshotIdVector, row);
+ if (snapshot_id.length == 0) {
+ continue;
+ }
+ edgeCb.onEdge(Bytes.concat(oriPrefix, originId), Bytes.concat(snpPrefix, snapshot_id), null, -1);
+ }
+ }, Set.of("origin", "snapshot"));
+ }
+ }
+
+ public static class OriginVisitOrcTable extends SwhOrcTable {
+ public OriginVisitOrcTable(File tableDir) {
+ super(tableDir);
+ }
+ }
+
+ public static class OriginOrcTable extends SwhOrcTable {
+ public OriginOrcTable(File tableDir) {
+ super(tableDir);
+ }
+
+ @Override
+ public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException {
+ orcTable.readOrcTable((batch, columnMap) -> {
+ BytesColumnVector originUrlVector = (BytesColumnVector) batch.cols[columnMap.get("url")];
+ for (int row = 0; row < batch.size; row++) {
+ byte[] originId = Bytes.concat(oriPrefix,
+ urlToOriginId(ORCTable.getBytesRow(originUrlVector, row)));
+ nodeCb.onNode(originId);
+ }
+ }, Set.of("url"));
+ }
+ }
+
+ /**
+ * Export an ORC graph to the CSV edge dataset format as two different files,
+ * nodes.csv.zst
and edges.csv.zst
.
+ */
+ public static void exportToCsvDataset(String orcDataset, String csvDatasetBasename) throws IOException {
+ ORCGraphDataset dataset = new ORCGraphDataset(orcDataset);
+ File nodesFile = new File(csvDatasetBasename + ".nodes.csv.zst");
+ File edgesFile = new File(csvDatasetBasename + ".edges.csv.zst");
+ FastBufferedOutputStream nodesOut = new FastBufferedOutputStream(
+ new ZstdOutputStream(new FileOutputStream(nodesFile)));
+ FastBufferedOutputStream edgesOut = new FastBufferedOutputStream(
+ new ZstdOutputStream(new FileOutputStream(edgesFile)));
+ dataset.readEdges((node) -> {
+ nodesOut.write(node);
+ nodesOut.write('\n');
+ }, (src, dst, label, perms) -> {
+ edgesOut.write(src);
+ edgesOut.write(' ');
+ edgesOut.write(dst);
+ if (label != null) {
+ edgesOut.write(' ');
+ edgesOut.write(label);
+ edgesOut.write(' ');
+ }
+ if (perms != -1) {
+ edgesOut.write(' ');
+ edgesOut.write(Long.toString(perms).getBytes());
+ }
+ edgesOut.write('\n');
+ });
+ }
+
+ /**
+ * Print all the edges of the graph to stdout. Can be piped to
+ * {@link it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph} to import the graph dataset and convert
+ * it to a {@link it.unimi.dsi.big.webgraph.BVGraph}.
+ */
+ public static void printSimpleEdges(String orcDataset) throws IOException {
+ ORCGraphDataset dataset = new ORCGraphDataset(orcDataset);
+ FastBufferedOutputStream out = new FastBufferedOutputStream(System.out);
+ dataset.readEdges((node) -> {
+ }, (src, dst, label, perms) -> {
+ out.write(src);
+ out.write(' ');
+ out.write(dst);
+ out.write('\n');
+ });
+ out.flush();
+ }
+
+ public static void main(String[] args) throws IOException {
+ printSimpleEdges(args[0]);
+ }
+}
diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py
index 24bb4b5..9e8c4a1 100644
--- a/swh/graph/webgraph.py
+++ b/swh/graph/webgraph.py
@@ -1,280 +1,280 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""WebGraph driver
"""
from datetime import datetime
from enum import Enum
import logging
import os
from pathlib import Path
import subprocess
from typing import Dict, List, Set
from swh.graph.config import check_config_compress
class CompressionStep(Enum):
MPH = 1
BV = 2
BFS = 3
PERMUTE_BFS = 4
TRANSPOSE_BFS = 5
SIMPLIFY = 6
LLP = 7
PERMUTE_LLP = 8
OBL = 9
COMPOSE_ORDERS = 10
STATS = 11
TRANSPOSE = 12
TRANSPOSE_OBL = 13
MAPS = 14
CLEAN_TMP = 15
def __str__(self):
return self.name
# full compression pipeline
COMP_SEQ = list(CompressionStep)
# Mapping from compression steps to shell commands implementing them. Commands
# will be executed by the shell, so be careful with meta characters. They are
# specified here as lists of tokens that will be joined together only for ease
# of line splitting. In commands, {tokens} will be interpolated with
# configuration values, see :func:`compress`.
STEP_ARGV: Dict[CompressionStep, List[str]] = {
CompressionStep.MPH: [
"{java}",
"it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction",
"--byte-array",
"--temp-dir",
"{tmp_dir}",
"{out_dir}/{graph_name}.mph",
"<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )",
],
# use process substitution (and hence FIFO) above as MPH class load the
# entire file in memory when reading from stdin
CompressionStep.BV: [
"zstdcat",
"{in_dir}/{graph_name}.edges.csv.zst",
"|",
"cut -d' ' -f1,2",
"|",
"{java}",
"it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph",
"--byte-array",
"--temp-dir",
"{tmp_dir}",
"--function",
"{out_dir}/{graph_name}.mph",
"{out_dir}/{graph_name}-base",
],
CompressionStep.BFS: [
"{java}",
"it.unimi.dsi.law.big.graph.BFS",
"{out_dir}/{graph_name}-base",
"{out_dir}/{graph_name}-bfs.order",
],
CompressionStep.PERMUTE_BFS: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"mapOffline",
"{out_dir}/{graph_name}-base",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs.order",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.TRANSPOSE_BFS: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"transposeOffline",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs-transposed",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.SIMPLIFY: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"simplify",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs-transposed",
"{out_dir}/{graph_name}-bfs-simplified",
],
CompressionStep.LLP: [
"{java}",
"it.unimi.dsi.law.big.graph.LayeredLabelPropagation",
"-g",
"{llp_gammas}",
"{out_dir}/{graph_name}-bfs-simplified",
"{out_dir}/{graph_name}-llp.order",
],
CompressionStep.PERMUTE_LLP: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"mapOffline",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}",
"{out_dir}/{graph_name}-llp.order",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.BVGraph",
"--list",
"{out_dir}/{graph_name}",
],
CompressionStep.COMPOSE_ORDERS: [
"{java}",
- "org.softwareheritage.graph.utils.ComposePermutations",
+ "org.softwareheritage.graph.compress.ComposePermutations",
"{out_dir}/{graph_name}-bfs.order",
"{out_dir}/{graph_name}-llp.order",
"{out_dir}/{graph_name}.order",
],
CompressionStep.STATS: [
"{java}",
"it.unimi.dsi.big.webgraph.Stats",
"{out_dir}/{graph_name}",
],
CompressionStep.TRANSPOSE: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"transposeOffline",
"{out_dir}/{graph_name}",
"{out_dir}/{graph_name}-transposed",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.TRANSPOSE_OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.BVGraph",
"--list",
"{out_dir}/{graph_name}-transposed",
],
CompressionStep.MAPS: [
"zstdcat",
"{in_dir}/{graph_name}.nodes.csv.zst",
"|",
"{java}",
"org.softwareheritage.graph.maps.NodeMapBuilder",
"{out_dir}/{graph_name}",
"{tmp_dir}",
],
CompressionStep.CLEAN_TMP: [
"rm",
"-rf",
"{out_dir}/{graph_name}-base.graph",
"{out_dir}/{graph_name}-base.offsets",
"{out_dir}/{graph_name}-base.properties",
"{out_dir}/{graph_name}-bfs-simplified.graph",
"{out_dir}/{graph_name}-bfs-simplified.offsets",
"{out_dir}/{graph_name}-bfs-simplified.properties",
"{out_dir}/{graph_name}-bfs-transposed.graph",
"{out_dir}/{graph_name}-bfs-transposed.offsets",
"{out_dir}/{graph_name}-bfs-transposed.properties",
"{out_dir}/{graph_name}-bfs.graph",
"{out_dir}/{graph_name}-bfs.offsets",
"{out_dir}/{graph_name}-bfs.order",
"{out_dir}/{graph_name}-bfs.properties",
"{out_dir}/{graph_name}-llp.order",
"{tmp_dir}",
],
}
def do_step(step, conf):
cmd = " ".join(STEP_ARGV[step]).format(**conf)
cmd_env = os.environ.copy()
cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"]
cmd_env["CLASSPATH"] = conf["classpath"]
logging.info(f"running: {cmd}")
process = subprocess.Popen(
["/bin/bash", "-c", cmd],
env=cmd_env,
encoding="utf8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
with process.stdout as stdout:
for line in stdout:
logging.info(line.rstrip())
rc = process.wait()
if rc != 0:
raise RuntimeError(
f"compression step {step} returned non-zero " f"exit code {rc}"
)
else:
return rc
def compress(
graph_name: str,
in_dir: Path,
out_dir: Path,
steps: Set[CompressionStep] = set(COMP_SEQ),
conf: Dict[str, str] = {},
):
"""graph compression pipeline driver from nodes/edges files to compressed
on-disk representation
Args:
graph_name: graph base name, relative to in_dir
in_dir: input directory, where the uncompressed graph can be found
out_dir: output directory, where the compressed graph will be stored
steps: compression steps to run (default: all steps)
conf: compression configuration, supporting the following keys (all are
optional, so an empty configuration is fine and is the default)
- batch_size: batch size for `WebGraph transformations
`_;
defaults to 1 billion
- classpath: java classpath, defaults to swh-graph JAR only
- java: command to run java VM, defaults to "java"
- java_tool_options: value for JAVA_TOOL_OPTIONS environment
variable; defaults to various settings for high memory machines
- logback: path to a logback.xml configuration file; if not provided
a temporary one will be created and used
- max_ram: maximum RAM to use for compression; defaults to available
virtual memory
- tmp_dir: temporary directory, defaults to the "tmp" subdir of
out_dir
"""
if not steps:
steps = set(COMP_SEQ)
conf = check_config_compress(conf, graph_name, in_dir, out_dir)
compression_start_time = datetime.now()
logging.info(f"starting compression at {compression_start_time}")
seq_no = 0
for step in COMP_SEQ:
if step not in steps:
logging.debug(f"skipping compression step {step}")
continue
seq_no += 1
step_start_time = datetime.now()
logging.info(
f"starting compression step {step} "
f"({seq_no}/{len(steps)}) at {step_start_time}"
)
do_step(step, conf)
step_end_time = datetime.now()
step_duration = step_end_time - step_start_time
logging.info(
f"completed compression step {step} "
f"({seq_no}/{len(steps)}) "
f"at {step_end_time} in {step_duration}"
)
compression_end_time = datetime.now()
compression_duration = compression_end_time - compression_start_time
logging.info(f"completed compression in {compression_duration}")