diff --git a/java/pom.xml b/java/pom.xml --- a/java/pom.xml +++ b/java/pom.xml @@ -127,6 +127,26 @@ commons-codec 1.15 + + com.github.luben + zstd-jni + 1.5.1-1 + + + org.apache.orc + orc-core + 1.7.1 + + + org.apache.hadoop + hadoop-common + 3.3.1 + + + org.apache.hadoop + hadoop-client-runtime + 3.3.1 + diff --git a/java/src/main/java/org/softwareheritage/graph/Node.java b/java/src/main/java/org/softwareheritage/graph/Node.java --- a/java/src/main/java/org/softwareheritage/graph/Node.java +++ b/java/src/main/java/org/softwareheritage/graph/Node.java @@ -1,8 +1,6 @@ package org.softwareheritage.graph; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; /** * A node in the Software Heritage graph. @@ -92,6 +90,31 @@ return Node.Type.valueOf(strType.toUpperCase()); } + /** + * Converts byte array name to the int code of the corresponding SWH node type. Used for + * performance-critical deserialization. + * + * @param name node type represented as a byte array (e.g. b"cnt") + * @return the ordinal value of the corresponding {@link Node.Type} + * @see org.softwareheritage.graph.Node.Type + */ + public static int byteNameToInt(byte[] name) { + if (Arrays.equals(name, "cnt".getBytes())) { + return 0; + } else if (Arrays.equals(name, "dir".getBytes())) { + return 1; + } else if (Arrays.equals(name, "ori".getBytes())) { + return 2; + } else if (Arrays.equals(name, "rel".getBytes())) { + return 3; + } else if (Arrays.equals(name, "rev".getBytes())) { + return 4; + } else if (Arrays.equals(name, "snp".getBytes())) { + return 5; + } else + return -1; + } + /** * Parses SWH node type possible values from formatted string (see the * API syntax). diff --git a/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java new file mode 100644 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java @@ -0,0 +1,185 @@ +package org.softwareheritage.graph.compress; + +import com.github.luben.zstd.ZstdInputStream; +import it.unimi.dsi.fastutil.bytes.ByteArrays; +import it.unimi.dsi.fastutil.io.FastBufferedInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * A graph dataset in (zstd-compressed) CSV format. + * + * This format does not contain any properties apart from the SWHIDs of the nodes, and optionally + * the labels of the edges and the permissions of the directory entries. + * + * The structure of the dataset is as follows: one directory per object type, each containing: + * + * + * + */ +public class CSVEdgeDataset implements GraphDataset { + final static Logger logger = LoggerFactory.getLogger(CSVEdgeDataset.class); + + final private File datasetDir; + + public CSVEdgeDataset(String datasetPath) { + this(new File(datasetPath)); + } + + public CSVEdgeDataset(File datasetDir) { + if (!datasetDir.exists()) { + throw new IllegalArgumentException("Dataset " + datasetDir.getName() + " does not exist"); + } + this.datasetDir = datasetDir; + } + + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + File[] allTables = datasetDir.listFiles(); + if (allTables == null) { + return; + } + for (File tableFile : allTables) { + File[] allCsvFiles = tableFile.listFiles(); + if (allCsvFiles == null) { + continue; + } + for (File csvFile : allCsvFiles) { + if (csvFile.getName().endsWith(".edges.csv.zst")) { + readEdgesCsvZst(csvFile.getPath(), edgeCb); + } else if (csvFile.getName().endsWith(".nodes.csv.zst")) { + readNodesCsvZst(csvFile.getPath(), nodeCb); + } + } + } + } + + public static void readEdgesCsvZst(String csvZstPath, GraphDataset.EdgeCallback cb) throws IOException { + InputStream csvInputStream = new ZstdInputStream(new BufferedInputStream(new FileInputStream(csvZstPath))); + readEdgesCsv(csvInputStream, cb); + } + + public static void readEdgesCsv(InputStream csvInputStream, GraphDataset.EdgeCallback cb) throws IOException { + FastBufferedInputStream csvReader = new FastBufferedInputStream(csvInputStream); + + Charset charset = StandardCharsets.US_ASCII; + byte[] array = new byte[1024]; + for (long line = 0;; line++) { + int start = 0, len; + while ((len = csvReader.readLine(array, start, array.length - start, + FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) { + start += len; + array = ByteArrays.grow(array, array.length + 1); + } + if (len == -1) + break; // EOF + final int lineLength = start + len; + + // Skip whitespace at the start of the line. + int offset = 0; + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') + offset++; + if (offset == lineLength) { + continue; + } + if (array[0] == '#') + continue; + + // Scan source id. + start = offset; + while (offset < lineLength && (array[offset] < 0 || array[offset] > ' ')) + offset++; + final byte[] ss = Arrays.copyOfRange(array, start, offset); + + // Skip whitespace between identifiers. + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') + offset++; + if (offset == lineLength) { + logger.error("Error at line " + line + ": no target"); + continue; + } + + // Scan target ID + start = offset; + while (offset < lineLength && (array[offset] < 0 || array[offset] > ' ')) + offset++; + final byte[] ts = Arrays.copyOfRange(array, start, offset); + + // Skip whitespace between identifiers. + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') + offset++; + // Scan label + byte[] ls = null; + if (offset < lineLength) { + start = offset; + while (offset < lineLength && (array[offset] < 0 || array[offset] > ' ')) + offset++; + ls = Arrays.copyOfRange(array, start, offset); + } + + // Skip whitespace between identifiers. + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') + offset++; + // Scan permission + int permission = 0; + if (offset < lineLength) { + start = offset; + while (offset < lineLength && (array[offset] < 0 || array[offset] > ' ')) + offset++; + permission = Integer.parseInt(new String(array, start, offset - start, charset)); + } + + cb.onEdge(ss, ts, ls, permission); + } + } + + public static void readNodesCsvZst(String csvZstPath, GraphDataset.NodeCallback cb) throws IOException { + InputStream csvInputStream = new ZstdInputStream(new BufferedInputStream(new FileInputStream(csvZstPath))); + readNodesCsv(csvInputStream, cb); + } + + public static void readNodesCsv(InputStream csvInputStream, GraphDataset.NodeCallback cb) throws IOException { + FastBufferedInputStream csvReader = new FastBufferedInputStream(csvInputStream); + + byte[] array = new byte[1024]; + for (long line = 0;; line++) { + int start = 0, len; + while ((len = csvReader.readLine(array, start, array.length - start, + FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) { + start += len; + array = ByteArrays.grow(array, array.length + 1); + } + if (len == -1) + break; // EOF + final int lineLength = start + len; + + // Skip whitespace at the start of the line. + int offset = 0; + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') + offset++; + if (offset == lineLength) { + continue; + } + if (array[0] == '#') + continue; + + // Scan source id. + start = offset; + while (offset < lineLength && (array[offset] < 0 || array[offset] > ' ')) + offset++; + final byte[] ss = Arrays.copyOfRange(array, start, offset); + + cb.onNode(ss); + } + } +} diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java b/java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java rename from java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java rename to java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java --- a/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java +++ b/java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java @@ -1,4 +1,4 @@ -package org.softwareheritage.graph.utils; +package org.softwareheritage.graph.compress; import com.martiansoftware.jsap.*; import it.unimi.dsi.Util; @@ -11,7 +11,7 @@ * CLI program used to compose two on-disk permutations. * * It takes two on-disk permutations as parameters, p1 and p2, and writes on disk (p1 o p2) at the - * given location. This is useful for multi-step compression (e.g. Unordered -> BFS -> LLP), as it + * given location. This is useful for multi-step compression (e.g., Unordered -> BFS -> LLP), as it * can be used to merge all the intermediate permutations. */ public class ComposePermutations { diff --git a/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java b/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java new file mode 100644 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java @@ -0,0 +1,292 @@ +package org.softwareheritage.graph.compress; + +import com.github.luben.zstd.ZstdOutputStream; +import com.martiansoftware.jsap.*; +import org.softwareheritage.graph.Node; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.*; + +/** + * Read a graph dataset and extract all the unique node SWHIDs it contains, including the ones that + * are not stored as actual objects in the graph, but only referred to by the edges. + * Additionally, extract the set of all unique edge labels in the graph. + * + * + * + *

+ * Rationale: Because the graph can contain holes, loose objects and dangling + * objects, some nodes that are referred to as destinations in the edge relationships might not + * actually be stored in the graph itself. However, to compress the graph using a graph compression + * library, it is necessary to have a list of all the nodes in the graph, including the + * ones that are simply referred to by the edges but not actually stored as concrete objects. + *

+ * + *

+ * This class reads the entire graph dataset, and uses sort -u to extract the set of + * all the unique nodes and unique labels that will be needed as an input for the compression + * process. + *

+ */ +public class ExtractNodes { + private static JSAPResult parseArgs(String[] args) { + JSAPResult config = null; + try { + SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{ + new UnflaggedOption("dataset", JSAP.STRING_PARSER, JSAP.REQUIRED, "Path to the edges dataset"), + new UnflaggedOption("outputBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, + "Basename of the output files"), + + new FlaggedOption("format", JSAP.STRING_PARSER, "orc", JSAP.NOT_REQUIRED, 'f', "format", + "Format of the input dataset (orc, csv)"), + new FlaggedOption("sortBufferSize", JSAP.STRING_PARSER, "30%", JSAP.NOT_REQUIRED, 'S', + "sort-buffer-size", "Size of the memory buffer used by sort"), + new FlaggedOption("sortTmpDir", JSAP.STRING_PARSER, null, JSAP.NOT_REQUIRED, 'T', + "sort-temporary-directory", "Path to the temporary directory used by sort")}); + + config = jsap.parse(args); + if (jsap.messagePrinted()) { + System.exit(1); + } + } catch (JSAPException e) { + System.err.println("Usage error: " + e.getMessage()); + System.exit(1); + } + return config; + } + + public static void main(String[] args) throws IOException, InterruptedException { + JSAPResult parsedArgs = parseArgs(args); + String datasetPath = parsedArgs.getString("dataset"); + String outputBasename = parsedArgs.getString("outputBasename"); + + String datasetFormat = parsedArgs.getString("format"); + String sortBufferSize = parsedArgs.getString("sortBufferSize"); + String sortTmpDir = parsedArgs.getString("sortTmpDir", null); + + // Open edge dataset + GraphDataset dataset; + if (datasetFormat.equals("orc")) { + dataset = new ORCGraphDataset(datasetPath); + } else if (datasetFormat.equals("csv")) { + dataset = new CSVEdgeDataset(datasetPath); + } else { + throw new IllegalArgumentException("Unknown dataset format: " + datasetFormat); + } + + // Spawn node sorting process + Process nodeSort = spawnSort(sortBufferSize, sortTmpDir); + BufferedOutputStream nodeSortStdin = new BufferedOutputStream(nodeSort.getOutputStream()); + BufferedInputStream nodeSortStdout = new BufferedInputStream(nodeSort.getInputStream()); + OutputStream nodesFileOutputStream = new ZstdOutputStream( + new BufferedOutputStream(new FileOutputStream(outputBasename + ".nodes.csv.zst"))); + NodesOutputThread nodesOutputThread = new NodesOutputThread(nodeSortStdout, nodesFileOutputStream); + nodesOutputThread.start(); + + // Spawn label sorting process + Process labelSort = spawnSort(sortBufferSize, sortTmpDir); + BufferedOutputStream labelSortStdin = new BufferedOutputStream(labelSort.getOutputStream()); + BufferedInputStream labelSortStdout = new BufferedInputStream(labelSort.getInputStream()); + OutputStream labelsFileOutputStream = new ZstdOutputStream( + new BufferedOutputStream(new FileOutputStream(outputBasename + ".labels.csv.zst"))); + LabelsOutputThread labelsOutputThread = new LabelsOutputThread(labelSortStdout, labelsFileOutputStream); + labelsOutputThread.start(); + + // Read the dataset and write the nodes and labels to the sorting processes + long[] edgeCount = {0}; + long[][] edgeCountByType = new long[Node.Type.values().length][Node.Type.values().length]; + dataset.readEdges((node) -> { + nodeSortStdin.write(node); + nodeSortStdin.write('\n'); + }, (src, dst, label, perm) -> { + nodeSortStdin.write(src); + nodeSortStdin.write('\n'); + nodeSortStdin.write(dst); + nodeSortStdin.write('\n'); + if (label != null) { + labelSortStdin.write(label); + labelSortStdin.write('\n'); + } + edgeCount[0]++; + // Extract type of src and dst from their SWHID: swh:1:XXX + byte[] srcTypeBytes = Arrays.copyOfRange(src, 6, 6 + 3); + byte[] dstTypeBytes = Arrays.copyOfRange(dst, 6, 6 + 3); + int srcType = Node.Type.byteNameToInt(srcTypeBytes); + int dstType = Node.Type.byteNameToInt(dstTypeBytes); + if (srcType != -1 && dstType != -1) { + edgeCountByType[srcType][dstType]++; + } else { + System.err + .println("Invalid edge type: " + new String(srcTypeBytes) + " -> " + new String(dstTypeBytes)); + System.exit(1); + } + }); + + // Wait for sorting processes to finish + nodeSortStdin.close(); + nodeSort.waitFor(); + labelSortStdin.close(); + labelSort.waitFor(); + + nodesOutputThread.join(); + labelsOutputThread.join(); + + // Write node, edge and label counts/statistics + printEdgeCounts(outputBasename, edgeCount[0], edgeCountByType); + printNodeCounts(outputBasename, nodesOutputThread.getNodeCount(), nodesOutputThread.getNodeTypeCounts()); + printLabelCounts(outputBasename, labelsOutputThread.getLabelCount()); + } + + private static Process spawnSort(String sortBufferSize, String sortTmpDir) throws IOException { + ProcessBuilder sortProcessBuilder = new ProcessBuilder(); + sortProcessBuilder.redirectError(ProcessBuilder.Redirect.INHERIT); + ArrayList command = new ArrayList<>(List.of("sort", "-u", "--buffer-size", sortBufferSize)); + if (sortTmpDir != null) { + command.add("--temporary-directory"); + command.add(sortTmpDir); + } + sortProcessBuilder.command(command); + Map env = sortProcessBuilder.environment(); + env.put("LC_ALL", "C"); + env.put("LC_COLLATE", "C"); + env.put("LANG", "C"); + + return sortProcessBuilder.start(); + } + + private static void printEdgeCounts(String basename, long edgeCount, long[][] edgeTypeCounts) throws IOException { + PrintWriter nodeCountWriter = new PrintWriter(basename + ".edges.count.txt"); + nodeCountWriter.println(edgeCount); + nodeCountWriter.close(); + + PrintWriter nodeTypesCountWriter = new PrintWriter(basename + ".edges.stats.txt"); + TreeMap edgeTypeCountsMap = new TreeMap<>(); + for (Node.Type src : Node.Type.values()) { + for (Node.Type dst : Node.Type.values()) { + long cnt = edgeTypeCounts[Node.Type.toInt(src)][Node.Type.toInt(dst)]; + if (cnt > 0) + edgeTypeCountsMap.put(src.toString().toLowerCase() + ":" + dst.toString().toLowerCase(), cnt); + } + } + for (Map.Entry entry : edgeTypeCountsMap.entrySet()) { + nodeTypesCountWriter.println(entry.getKey() + " " + entry.getValue()); + } + nodeTypesCountWriter.close(); + } + + private static void printNodeCounts(String basename, long nodeCount, long[] nodeTypeCounts) throws IOException { + PrintWriter nodeCountWriter = new PrintWriter(basename + ".nodes.count.txt"); + nodeCountWriter.println(nodeCount); + nodeCountWriter.close(); + + PrintWriter nodeTypesCountWriter = new PrintWriter(basename + ".nodes.stats.txt"); + TreeMap nodeTypeCountsMap = new TreeMap<>(); + for (Node.Type v : Node.Type.values()) { + nodeTypeCountsMap.put(v.toString().toLowerCase(), nodeTypeCounts[Node.Type.toInt(v)]); + } + for (Map.Entry entry : nodeTypeCountsMap.entrySet()) { + nodeTypesCountWriter.println(entry.getKey() + " " + entry.getValue()); + } + nodeTypesCountWriter.close(); + } + + private static void printLabelCounts(String basename, long labelCount) throws IOException { + PrintWriter nodeCountWriter = new PrintWriter(basename + ".labels.count.txt"); + nodeCountWriter.println(labelCount); + nodeCountWriter.close(); + } + + private static class NodesOutputThread extends Thread { + private final InputStream sortedNodesStream; + private final OutputStream nodesOutputStream; + + private long nodeCount = 0; + private final long[] nodeTypeCounts = new long[Node.Type.values().length]; + + NodesOutputThread(InputStream sortedNodesStream, OutputStream nodesOutputStream) { + this.sortedNodesStream = sortedNodesStream; + this.nodesOutputStream = nodesOutputStream; + } + + @Override + public void run() { + BufferedReader reader = new BufferedReader( + new InputStreamReader(sortedNodesStream, StandardCharsets.UTF_8)); + try { + String line; + while ((line = reader.readLine()) != null) { + nodesOutputStream.write(line.getBytes(StandardCharsets.UTF_8)); + nodesOutputStream.write('\n'); + nodeCount++; + try { + Node.Type nodeType = Node.Type.fromStr(line.split(":")[2]); + nodeTypeCounts[Node.Type.toInt(nodeType)]++; + } catch (ArrayIndexOutOfBoundsException e) { + System.err.println("Error parsing SWHID: " + line); + System.exit(1); + } + } + nodesOutputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public long getNodeCount() { + return nodeCount; + } + + public long[] getNodeTypeCounts() { + return nodeTypeCounts; + } + } + + private static class LabelsOutputThread extends Thread { + private final InputStream sortedLabelsStream; + private final OutputStream labelsOutputStream; + + private long labelCount = 0; + + LabelsOutputThread(InputStream sortedLabelsStream, OutputStream labelsOutputStream) { + this.labelsOutputStream = labelsOutputStream; + this.sortedLabelsStream = sortedLabelsStream; + } + + @Override + public void run() { + BufferedReader reader = new BufferedReader( + new InputStreamReader(sortedLabelsStream, StandardCharsets.UTF_8)); + try { + String line; + while ((line = reader.readLine()) != null) { + labelsOutputStream.write(line.getBytes(StandardCharsets.UTF_8)); + labelsOutputStream.write('\n'); + labelCount++; + } + labelsOutputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public long getLabelCount() { + return labelCount; + } + } +} diff --git a/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java new file mode 100644 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java @@ -0,0 +1,44 @@ +package org.softwareheritage.graph.compress; + +import java.io.IOException; + +/** + * GraphDataset is a common interface to represent on-disk graph datasets in various formats, + * usually extracted from the SWH archive with the swh-dataset tool. + */ +public interface GraphDataset { + interface NodeCallback { + void onNode(byte[] node) throws IOException; + } + + interface EdgeCallback { + void onEdge(byte[] src, byte[] dst, byte[] label, long permission) throws IOException; + } + + /** + * Read the graph dataset and call the callback methods for each node and edge encountered. + * + *
    + *
  • The node callback is called for each object stored in the graph.
  • + *
  • The edge callback is called for each relationship (between two nodes) stored in the + * graph.
  • + *
+ * + *

+ * Note that because the graph can contain holes, loose objects and dangling objects, the edge + * callback may be called with parameters representing nodes that are not stored in the graph. This + * is because some nodes that are referred to as destinations in the dataset might not be present in + * the archive (e.g., a revision entry in a directory pointing to a revision that we have not + * crawled yet). + *

+ * + *

+ * In order to generate a complete set of all the nodes that are referred to in the graph + * dataset, see the {@link ExtractNodes} class. + *

+ * + * @param nodeCb callback for each node + * @param edgeCb callback for each edge + */ + void readEdges(NodeCallback nodeCb, EdgeCallback edgeCb) throws IOException; +} diff --git a/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java new file mode 100644 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java @@ -0,0 +1,565 @@ +package org.softwareheritage.graph.compress; + +import com.github.luben.zstd.ZstdOutputStream; +import com.google.common.primitives.Bytes; +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; + +/** + * A graph dataset in ORC format. + * + * This format of dataset is a full export of the graph, including all the edge and node properties. + * + * For convenience purposes, this class also provides a main method to print all the edges of the + * graph, so that the output can be piped to + * {@link it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph}. + * + * Reading edges from ORC files using this class is about ~2.5 times slower than reading them + * directly from a plaintext format. + */ +public class ORCGraphDataset implements GraphDataset { + final static Logger logger = LoggerFactory.getLogger(ORCGraphDataset.class); + + final static public int ORC_BATCH_SIZE = 16 * 1024; + + private final File datasetDir; + + public ORCGraphDataset(String datasetPath) { + this(new File(datasetPath)); + } + + public ORCGraphDataset(File datasetDir) { + if (!datasetDir.exists()) { + throw new IllegalArgumentException("Dataset " + datasetDir.getName() + " does not exist"); + } + this.datasetDir = datasetDir; + } + + /** + * Return the given table as a {@link SwhOrcTable}. The return value can be down-casted to the type + * of the specific table it represents. + */ + public SwhOrcTable getTable(String tableName) { + File tableDir = new File(datasetDir, tableName); + if (!tableDir.exists()) { + return null; + } + switch (tableName) { + case "skipped_content": + return new SkippedContentOrcTable(tableDir); + case "content": + return new ContentOrcTable(tableDir); + case "directory": + return new DirectoryOrcTable(tableDir); + case "directory_entry": + return new DirectoryEntryOrcTable(tableDir); + case "revision": + return new RevisionOrcTable(tableDir); + case "revision_history": + return new RevisionHistoryOrcTable(tableDir); + case "release": + return new ReleaseOrcTable(tableDir); + case "snapshot_branch": + return new SnapshotBranchOrcTable(tableDir); + case "snapshot": + return new SnapshotOrcTable(tableDir); + case "origin_visit_status": + return new OriginVisitStatusOrcTable(tableDir); + case "origin_visit": + return new OriginVisitOrcTable(tableDir); + case "origin": + return new OriginOrcTable(tableDir); + default : + return null; + } + } + + /** Return all the tables in this dataset as a map of {@link SwhOrcTable}. */ + public Map allTables() { + HashMap tables = new HashMap<>(); + File[] tableDirs = datasetDir.listFiles(); + if (tableDirs == null) { + return tables; + } + for (File tableDir : tableDirs) { + SwhOrcTable table = getTable(tableDir.getName()); + if (table != null) { + tables.put(tableDir.getName(), table); + } + } + return tables; + } + + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + Map tables = allTables(); + for (SwhOrcTable table : tables.values()) { + table.readEdges(nodeCb, edgeCb); + } + } + + /** + * A class representing an ORC table, stored on disk as a set of ORC files all in the same + * directory. + */ + public static class ORCTable { + private final File tableDir; + + public ORCTable(File tableDir) { + if (!tableDir.exists()) { + throw new IllegalArgumentException("Table " + tableDir.getName() + " does not exist"); + } + this.tableDir = tableDir; + } + + public static ORCTable load(File tableDir) { + return new ORCTable(tableDir); + } + + /** + * Utility function for byte columns. Return as a byte array the value of the given row in the + * column vector. + */ + public static byte[] getBytesRow(BytesColumnVector columnVector, int row) { + if (columnVector.isRepeating) { + row = 0; + } + return Arrays.copyOfRange(columnVector.vector[row], columnVector.start[row], + columnVector.start[row] + columnVector.length[row]); + } + + interface ReadOrcBatchHandler { + void accept(VectorizedRowBatch batch, Map columnMap) throws IOException; + } + + /** + * Read the table, calling the given handler for each new batch of rows. Optionally, if columns is + * not null, will only scan the columns present in this set instead of the entire table. + */ + public void readOrcTable(ReadOrcBatchHandler batchHandler, Set columns) throws IOException { + File[] listing = tableDir.listFiles(); + if (listing == null) { + throw new IOException("No files found in " + tableDir.getName()); + } + for (File file : listing) { + readOrcFile(file.getPath(), batchHandler, columns); + } + } + + private void readOrcFile(String path, ReadOrcBatchHandler batchHandler, Set columns) + throws IOException { + try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(new Configuration()))) { + TypeDescription schema = reader.getSchema(); + + Reader.Options options = reader.options(); + if (columns != null) { + options.include(createColumnsToRead(schema, columns)); + } + Map columnMap = getColumnMap(schema); + + try (RecordReader records = reader.rows(options)) { + VectorizedRowBatch batch = reader.getSchema().createRowBatch(ORC_BATCH_SIZE); + while (records.nextBatch(batch)) { + batchHandler.accept(batch, columnMap); + } + } + } + } + + private static Map getColumnMap(TypeDescription schema) { + Map columnMap = new HashMap<>(); + List fieldNames = schema.getFieldNames(); + for (int i = 0; i < fieldNames.size(); i++) { + columnMap.put(fieldNames.get(i), i); + } + return columnMap; + } + + private static boolean[] createColumnsToRead(TypeDescription schema, Set columns) { + boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1]; + List fieldNames = schema.getFieldNames(); + List columnTypes = schema.getChildren(); + for (int i = 0; i < fieldNames.size(); i++) { + if (columns.contains(fieldNames.get(i))) { + logger.debug("Adding column " + fieldNames.get(i) + " with ID " + i + " to the read list"); + TypeDescription type = columnTypes.get(i); + for (int id = type.getId(); id <= type.getMaximumId(); id++) { + columnsToRead[id] = true; + } + } + } + return columnsToRead; + } + } + + /** Base class for SWH-specific ORC tables. */ + public static class SwhOrcTable { + protected final ORCTable orcTable; + + protected static final byte[] cntPrefix = "swh:1:cnt:".getBytes(); + protected static byte[] dirPrefix = "swh:1:dir:".getBytes(); + protected static byte[] revPrefix = "swh:1:rev:".getBytes(); + protected static byte[] relPrefix = "swh:1:rel:".getBytes(); + protected static byte[] snpPrefix = "swh:1:snp:".getBytes(); + protected static byte[] oriPrefix = "swh:1:ori:".getBytes(); + + public SwhOrcTable(File tableDir) { + orcTable = new ORCTable(tableDir); + } + + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + // No nodes or edges to read in the table by default. + } + + protected static byte[] urlToOriginId(byte[] url) { + return DigestUtils.sha1Hex(url).getBytes(); + } + } + + public static class SkippedContentOrcTable extends SwhOrcTable { + public SkippedContentOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector contentIdVector = (BytesColumnVector) batch.cols[columnMap.get("sha1_git")]; + for (int row = 0; row < batch.size; row++) { + byte[] contentId = Bytes.concat(cntPrefix, ORCTable.getBytesRow(contentIdVector, row)); + nodeCb.onNode(contentId); + } + }, Set.of("sha1_git")); + } + } + + public static class ContentOrcTable extends SwhOrcTable { + public ContentOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector contentIdVector = (BytesColumnVector) batch.cols[columnMap.get("sha1_git")]; + for (int row = 0; row < batch.size; row++) { + byte[] contentId = Bytes.concat(cntPrefix, ORCTable.getBytesRow(contentIdVector, row)); + nodeCb.onNode(contentId); + } + }, Set.of("sha1_git")); + } + } + + public static class DirectoryOrcTable extends SwhOrcTable { + public DirectoryOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector directoryIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")]; + for (int row = 0; row < batch.size; row++) { + byte[] directoryId = Bytes.concat(dirPrefix, ORCTable.getBytesRow(directoryIdVector, row)); + nodeCb.onNode(directoryId); + } + }, Set.of("id")); + } + } + + public static class DirectoryEntryOrcTable extends SwhOrcTable { + public DirectoryEntryOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + byte[] cntType = "file".getBytes(); + byte[] dirType = "dir".getBytes(); + byte[] revType = "rev".getBytes(); + + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector srcVector = (BytesColumnVector) batch.cols[columnMap.get("directory_id")]; + BytesColumnVector dstVector = (BytesColumnVector) batch.cols[columnMap.get("target")]; + BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("type")]; + BytesColumnVector labelVector = (BytesColumnVector) batch.cols[columnMap.get("name")]; + LongColumnVector permissionVector = (LongColumnVector) batch.cols[columnMap.get("perms")]; + + for (int row = 0; row < batch.size; row++) { + byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row); + byte[] targetPrefix; + if (Arrays.equals(targetType, cntType)) { + targetPrefix = cntPrefix; + } else if (Arrays.equals(targetType, dirType)) { + targetPrefix = dirPrefix; + } else if (Arrays.equals(targetType, revType)) { + targetPrefix = revPrefix; + } else { + continue; + } + + byte[] src = Bytes.concat(dirPrefix, ORCTable.getBytesRow(srcVector, row)); + byte[] dst = Bytes.concat(targetPrefix, ORCTable.getBytesRow(dstVector, row)); + byte[] label = Base64.getEncoder().encode(ORCTable.getBytesRow(labelVector, row)); + long permission = permissionVector.vector[row]; + edgeCb.onEdge(src, dst, label, permission); + } + }, Set.of("directory_id", "target", "type", "name", "perms")); + } + } + + public static class RevisionOrcTable extends SwhOrcTable { + public RevisionOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector revisionIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")]; + BytesColumnVector directoryIdVector = (BytesColumnVector) batch.cols[columnMap.get("directory")]; + for (int row = 0; row < batch.size; row++) { + byte[] revisionId = Bytes.concat(revPrefix, ORCTable.getBytesRow(revisionIdVector, row)); + byte[] directoryId = Bytes.concat(dirPrefix, ORCTable.getBytesRow(directoryIdVector, row)); + nodeCb.onNode(revisionId); + edgeCb.onEdge(revisionId, directoryId, null, -1); + } + }, Set.of("id", "directory")); + } + } + + public static class RevisionHistoryOrcTable extends SwhOrcTable { + public RevisionHistoryOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector revisionIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")]; + BytesColumnVector parentIdVector = (BytesColumnVector) batch.cols[columnMap.get("parent_id")]; + for (int row = 0; row < batch.size; row++) { + byte[] parentId = Bytes.concat(revPrefix, ORCTable.getBytesRow(parentIdVector, row)); + byte[] revisionId = Bytes.concat(revPrefix, ORCTable.getBytesRow(revisionIdVector, row)); + edgeCb.onEdge(parentId, revisionId, null, -1); + } + }, Set.of("id", "parent_id")); + } + } + + public static class ReleaseOrcTable extends SwhOrcTable { + public ReleaseOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + byte[] cntType = "content".getBytes(); + byte[] dirType = "directory".getBytes(); + byte[] revType = "revision".getBytes(); + byte[] relType = "release".getBytes(); + + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector releaseIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")]; + BytesColumnVector targetIdVector = (BytesColumnVector) batch.cols[columnMap.get("target")]; + BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("target_type")]; + + for (int row = 0; row < batch.size; row++) { + byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row); + + byte[] targetPrefix; + if (Arrays.equals(targetType, cntType)) { + targetPrefix = cntPrefix; + } else if (Arrays.equals(targetType, dirType)) { + targetPrefix = dirPrefix; + } else if (Arrays.equals(targetType, revType)) { + targetPrefix = revPrefix; + } else if (Arrays.equals(targetType, relType)) { + targetPrefix = relPrefix; + } else { + continue; + } + + byte[] releaseId = Bytes.concat(relPrefix, ORCTable.getBytesRow(releaseIdVector, row)); + byte[] targetId = Bytes.concat(targetPrefix, ORCTable.getBytesRow(targetIdVector, row)); + nodeCb.onNode(releaseId); + edgeCb.onEdge(releaseId, targetId, null, -1); + } + }, Set.of("id", "target", "target_type")); + } + } + + public static class SnapshotOrcTable extends SwhOrcTable { + public SnapshotOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")]; + for (int row = 0; row < batch.size; row++) { + byte[] snapshotId = Bytes.concat(snpPrefix, ORCTable.getBytesRow(snapshotIdVector, row)); + nodeCb.onNode(snapshotId); + } + }, Set.of("id")); + } + } + + public static class SnapshotBranchOrcTable extends SwhOrcTable { + public SnapshotBranchOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + byte[] cntType = "content".getBytes(); + byte[] dirType = "directory".getBytes(); + byte[] revType = "revision".getBytes(); + byte[] relType = "release".getBytes(); + + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("snapshot_id")]; + BytesColumnVector targetIdVector = (BytesColumnVector) batch.cols[columnMap.get("target")]; + BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("target_type")]; + BytesColumnVector branchNameVector = (BytesColumnVector) batch.cols[columnMap.get("name")]; + + for (int row = 0; row < batch.size; row++) { + byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row); + byte[] targetPrefix; + if (Arrays.equals(targetType, cntType)) { + targetPrefix = cntPrefix; + } else if (Arrays.equals(targetType, dirType)) { + targetPrefix = dirPrefix; + } else if (Arrays.equals(targetType, revType)) { + targetPrefix = revPrefix; + } else if (Arrays.equals(targetType, relType)) { + targetPrefix = relPrefix; + } else { + continue; + } + + byte[] snapshotId = Bytes.concat(snpPrefix, ORCTable.getBytesRow(snapshotIdVector, row)); + byte[] targetId = Bytes.concat(targetPrefix, ORCTable.getBytesRow(targetIdVector, row)); + byte[] branchName = Base64.getEncoder().encode(ORCTable.getBytesRow(branchNameVector, row)); + nodeCb.onNode(snapshotId); + edgeCb.onEdge(snapshotId, targetId, branchName, -1); + } + }, Set.of("snapshot_id", "name", "target", "target_type")); + } + } + + public static class OriginVisitStatusOrcTable extends SwhOrcTable { + public OriginVisitStatusOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector originUrlVector = (BytesColumnVector) batch.cols[columnMap.get("origin")]; + BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("snapshot")]; + + for (int row = 0; row < batch.size; row++) { + byte[] originId = urlToOriginId(ORCTable.getBytesRow(originUrlVector, row)); + byte[] snapshot_id = ORCTable.getBytesRow(snapshotIdVector, row); + if (snapshot_id.length == 0) { + continue; + } + edgeCb.onEdge(Bytes.concat(oriPrefix, originId), Bytes.concat(snpPrefix, snapshot_id), null, -1); + } + }, Set.of("origin", "snapshot")); + } + } + + public static class OriginVisitOrcTable extends SwhOrcTable { + public OriginVisitOrcTable(File tableDir) { + super(tableDir); + } + } + + public static class OriginOrcTable extends SwhOrcTable { + public OriginOrcTable(File tableDir) { + super(tableDir); + } + + @Override + public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { + orcTable.readOrcTable((batch, columnMap) -> { + BytesColumnVector originUrlVector = (BytesColumnVector) batch.cols[columnMap.get("url")]; + for (int row = 0; row < batch.size; row++) { + byte[] originId = Bytes.concat(oriPrefix, + urlToOriginId(ORCTable.getBytesRow(originUrlVector, row))); + nodeCb.onNode(originId); + } + }, Set.of("url")); + } + } + + /** + * Export an ORC graph to the CSV edge dataset format as two different files, + * nodes.csv.zst and edges.csv.zst. + */ + public static void exportToCsvDataset(String orcDataset, String csvDatasetBasename) throws IOException { + ORCGraphDataset dataset = new ORCGraphDataset(orcDataset); + File nodesFile = new File(csvDatasetBasename + ".nodes.csv.zst"); + File edgesFile = new File(csvDatasetBasename + ".edges.csv.zst"); + FastBufferedOutputStream nodesOut = new FastBufferedOutputStream( + new ZstdOutputStream(new FileOutputStream(nodesFile))); + FastBufferedOutputStream edgesOut = new FastBufferedOutputStream( + new ZstdOutputStream(new FileOutputStream(edgesFile))); + dataset.readEdges((node) -> { + nodesOut.write(node); + nodesOut.write('\n'); + }, (src, dst, label, perms) -> { + edgesOut.write(src); + edgesOut.write(' '); + edgesOut.write(dst); + if (label != null) { + edgesOut.write(' '); + edgesOut.write(label); + edgesOut.write(' '); + } + if (perms != -1) { + edgesOut.write(' '); + edgesOut.write(Long.toString(perms).getBytes()); + } + edgesOut.write('\n'); + }); + } + + /** + * Print all the edges of the graph to stdout. Can be piped to + * {@link it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph} to import the graph dataset and convert + * it to a {@link it.unimi.dsi.big.webgraph.BVGraph}. + */ + public static void printSimpleEdges(String orcDataset) throws IOException { + ORCGraphDataset dataset = new ORCGraphDataset(orcDataset); + FastBufferedOutputStream out = new FastBufferedOutputStream(System.out); + dataset.readEdges((node) -> { + }, (src, dst, label, perms) -> { + out.write(src); + out.write(' '); + out.write(dst); + out.write('\n'); + }); + out.flush(); + } + + public static void main(String[] args) throws IOException { + printSimpleEdges(args[0]); + } +} diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -133,7 +133,7 @@ ], CompressionStep.COMPOSE_ORDERS: [ "{java}", - "org.softwareheritage.graph.utils.ComposePermutations", + "org.softwareheritage.graph.compress.ComposePermutations", "{out_dir}/{graph_name}-bfs.order", "{out_dir}/{graph_name}-llp.order", "{out_dir}/{graph_name}.order",