diff --git a/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java b/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java index 8fbc7de..9ba0e38 100644 --- a/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java +++ b/java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java @@ -1,701 +1,711 @@ package org.softwareheritage.graph.compress; import com.github.luben.zstd.ZstdOutputStream; import com.google.common.primitives.Bytes; import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.util.*; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; /** * A graph dataset in ORC format. * * This format of dataset is a full export of the graph, including all the edge and node properties. * * For convenience purposes, this class also provides a main method to print all the edges of the * graph, so that the output can be piped to * {@link it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph}. * * Reading edges from ORC files using this class is about ~2.5 times slower than reading them * directly from a plaintext format. */ public class ORCGraphDataset implements GraphDataset { final static Logger logger = LoggerFactory.getLogger(ORCGraphDataset.class); final static public int ORC_BATCH_SIZE = 16 * 1024; private File datasetDir; protected ORCGraphDataset() { } public ORCGraphDataset(String datasetPath) { this(new File(datasetPath)); } public ORCGraphDataset(File datasetDir) { if (!datasetDir.exists()) { throw new IllegalArgumentException("Dataset " + datasetDir.getName() + " does not exist"); } this.datasetDir = datasetDir; } /** * Return the given table as a {@link SwhOrcTable}. The return value can be down-casted to the type * of the specific table it represents. */ public SwhOrcTable getTable(String tableName) { File tableDir = new File(datasetDir, tableName); if (!tableDir.exists()) { return null; } switch (tableName) { case "skipped_content": return new SkippedContentOrcTable(tableDir); case "content": return new ContentOrcTable(tableDir); case "directory": return new DirectoryOrcTable(tableDir); case "directory_entry": return new DirectoryEntryOrcTable(tableDir); case "revision": return new RevisionOrcTable(tableDir); case "revision_history": return new RevisionHistoryOrcTable(tableDir); case "release": return new ReleaseOrcTable(tableDir); case "snapshot_branch": return new SnapshotBranchOrcTable(tableDir); case "snapshot": return new SnapshotOrcTable(tableDir); case "origin_visit_status": return new OriginVisitStatusOrcTable(tableDir); case "origin_visit": return new OriginVisitOrcTable(tableDir); case "origin": return new OriginOrcTable(tableDir); default : return null; } } /** Return all the tables in this dataset as a map of {@link SwhOrcTable}. */ public Map allTables() { HashMap tables = new HashMap<>(); File[] tableDirs = datasetDir.listFiles(); if (tableDirs == null) { return tables; } for (File tableDir : tableDirs) { SwhOrcTable table = getTable(tableDir.getName()); if (table != null) { tables.put(tableDir.getName(), table); } } return tables; } public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { Map tables = allTables(); for (SwhOrcTable table : tables.values()) { table.readEdges(nodeCb, edgeCb); } } /** * A class representing an ORC table, stored on disk as a set of ORC files all in the same * directory. */ public static class ORCTable { private final File tableDir; public ORCTable(File tableDir) { if (!tableDir.exists()) { throw new IllegalArgumentException("Table " + tableDir.getName() + " does not exist"); } this.tableDir = tableDir; } public static ORCTable load(File tableDir) { return new ORCTable(tableDir); } /** * Utility function for byte columns. Return as a byte array the value of the given row in the * column vector. */ public static byte[] getBytesRow(BytesColumnVector columnVector, int row) { if (columnVector.isRepeating) { row = 0; } + if (columnVector.isNull[row]) { + return null; + } return Arrays.copyOfRange(columnVector.vector[row], columnVector.start[row], columnVector.start[row] + columnVector.length[row]); } /** * Utility function for long columns. Return as a long the value of the given row in the column * vector. */ - public static long getLongRow(LongColumnVector columnVector, int row) { + public static Long getLongRow(LongColumnVector columnVector, int row) { if (columnVector.isRepeating) { row = 0; } + if (columnVector.isNull[row]) { + return null; + } return columnVector.vector[row]; } interface ReadOrcBatchHandler { void accept(VectorizedRowBatch batch, Map columnMap) throws IOException; } /** * Read the table, calling the given handler for each new batch of rows. Optionally, if columns is * not null, will only scan the columns present in this set instead of the entire table. * * If this method is called from within a ForkJoinPool, the ORC table will be read in parallel using * that thread pool. Otherwise, the ORC files will be read sequentially. */ public void readOrcTable(ReadOrcBatchHandler batchHandler, Set columns) throws IOException { File[] listing = tableDir.listFiles(); if (listing == null) { throw new IOException("No files found in " + tableDir.getName()); } ForkJoinPool forkJoinPool = ForkJoinTask.getPool(); if (forkJoinPool == null) { // Sequential case for (File file : listing) { readOrcFile(file.getPath(), batchHandler, columns); } } else { // Parallel case ArrayList listingArray = new ArrayList<>(Arrays.asList(listing)); listingArray.parallelStream().forEach(file -> { try { readOrcFile(file.getPath(), batchHandler, columns); } catch (IOException e) { throw new RuntimeException(e); } }); } } private void readOrcFile(String path, ReadOrcBatchHandler batchHandler, Set columns) throws IOException { try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(new Configuration()))) { TypeDescription schema = reader.getSchema(); Reader.Options options = reader.options(); if (columns != null) { options.include(createColumnsToRead(schema, columns)); } Map columnMap = getColumnMap(schema); try (RecordReader records = reader.rows(options)) { VectorizedRowBatch batch = reader.getSchema().createRowBatch(ORC_BATCH_SIZE); while (records.nextBatch(batch)) { batchHandler.accept(batch, columnMap); } } } } private static Map getColumnMap(TypeDescription schema) { Map columnMap = new HashMap<>(); List fieldNames = schema.getFieldNames(); for (int i = 0; i < fieldNames.size(); i++) { columnMap.put(fieldNames.get(i), i); } return columnMap; } private static boolean[] createColumnsToRead(TypeDescription schema, Set columns) { boolean[] columnsToRead = new boolean[schema.getMaximumId() + 1]; List fieldNames = schema.getFieldNames(); List columnTypes = schema.getChildren(); for (int i = 0; i < fieldNames.size(); i++) { if (columns.contains(fieldNames.get(i))) { logger.debug("Adding column " + fieldNames.get(i) + " with ID " + i + " to the read list"); TypeDescription type = columnTypes.get(i); for (int id = type.getId(); id <= type.getMaximumId(); id++) { columnsToRead[id] = true; } } } return columnsToRead; } } /** Base class for SWH-specific ORC tables. */ public static class SwhOrcTable { protected ORCTable orcTable; protected static final byte[] cntPrefix = "swh:1:cnt:".getBytes(); protected static final byte[] dirPrefix = "swh:1:dir:".getBytes(); protected static final byte[] revPrefix = "swh:1:rev:".getBytes(); protected static final byte[] relPrefix = "swh:1:rel:".getBytes(); protected static final byte[] snpPrefix = "swh:1:snp:".getBytes(); protected static final byte[] oriPrefix = "swh:1:ori:".getBytes(); protected String getIdColumn() { return "id"; } protected byte[] getSwhidPrefix() { throw new UnsupportedOperationException(); } protected byte[] idToSwhid(byte[] id) { return Bytes.concat(getSwhidPrefix(), id); } protected SwhOrcTable() { } public SwhOrcTable(File tableDir) { orcTable = new ORCTable(tableDir); } public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { // No nodes or edges to read in the table by default. } protected static byte[] urlToOriginId(byte[] url) { return DigestUtils.sha1Hex(url).getBytes(); } public void readIdColumn(NodeCallback cb) throws IOException { orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector idVector = (BytesColumnVector) batch.cols[columnMap.get(getIdColumn())]; for (int row = 0; row < batch.size; row++) { byte[] id = idToSwhid(ORCTable.getBytesRow(idVector, row)); cb.onNode(id); } }, Set.of(getIdColumn())); } public void readLongColumn(String longColumn, LongCallback cb) throws IOException { orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector idVector = (BytesColumnVector) batch.cols[columnMap.get(getIdColumn())]; LongColumnVector dateVector = (LongColumnVector) batch.cols[columnMap.get(longColumn)]; for (int row = 0; row < batch.size; row++) { byte[] id = idToSwhid(ORCTable.getBytesRow(idVector, row)); - long date = ORCTable.getLongRow(dateVector, row); - cb.onLong(id, date); + Long date = ORCTable.getLongRow(dateVector, row); + if (date != null) { + cb.onLong(id, date); + } } }, Set.of(getIdColumn(), longColumn)); } public void readTimestampColumn(String dateColumn, String dateOffsetColumn, TimestampCallback cb) throws IOException { orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector idVector = (BytesColumnVector) batch.cols[columnMap.get(getIdColumn())]; TimestampColumnVector dateVector = (TimestampColumnVector) batch.cols[columnMap.get(dateColumn)]; LongColumnVector dateOffsetVector = (LongColumnVector) batch.cols[columnMap.get(dateOffsetColumn)]; for (int row = 0; row < batch.size; row++) { byte[] id = idToSwhid(ORCTable.getBytesRow(idVector, row)); long date = dateVector.getTimestampAsLong(row); // rounded to seconds - short dateOffset = (short) ORCTable.getLongRow(dateOffsetVector, row); - cb.onTimestamp(id, date, dateOffset); + Long dateOffset = ORCTable.getLongRow(dateOffsetVector, row); + if (dateOffset != null) { + cb.onTimestamp(id, date, dateOffset.shortValue()); + } } }, Set.of(getIdColumn(), dateColumn, dateOffsetColumn)); } public void readBytes64Column(String longColumn, BytesCallback cb) throws IOException { orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector idVector = (BytesColumnVector) batch.cols[columnMap.get(getIdColumn())]; BytesColumnVector valueVector = (BytesColumnVector) batch.cols[columnMap.get(longColumn)]; for (int row = 0; row < batch.size; row++) { byte[] id = idToSwhid(ORCTable.getBytesRow(idVector, row)); byte[] value = Base64.getEncoder().encode(ORCTable.getBytesRow(valueVector, row)); cb.onBytes(id, value); } }, Set.of(getIdColumn(), longColumn)); } } public static class SkippedContentOrcTable extends SwhOrcTable { public SkippedContentOrcTable(File tableDir) { super(tableDir); } @Override protected String getIdColumn() { return "sha1_git"; } @Override protected byte[] getSwhidPrefix() { return cntPrefix; } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { readIdColumn(nodeCb); } } public static class ContentOrcTable extends SwhOrcTable { public ContentOrcTable(File tableDir) { super(tableDir); } @Override protected String getIdColumn() { return "sha1_git"; } @Override protected byte[] getSwhidPrefix() { return cntPrefix; } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { readIdColumn(nodeCb); } } public static class DirectoryOrcTable extends SwhOrcTable { public DirectoryOrcTable(File tableDir) { super(tableDir); } @Override protected byte[] getSwhidPrefix() { return dirPrefix; } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { readIdColumn(nodeCb); } } public static class DirectoryEntryOrcTable extends SwhOrcTable { public DirectoryEntryOrcTable(File tableDir) { super(tableDir); } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { byte[] cntType = "file".getBytes(); byte[] dirType = "dir".getBytes(); byte[] revType = "rev".getBytes(); orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector srcVector = (BytesColumnVector) batch.cols[columnMap.get("directory_id")]; BytesColumnVector dstVector = (BytesColumnVector) batch.cols[columnMap.get("target")]; BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("type")]; BytesColumnVector labelVector = (BytesColumnVector) batch.cols[columnMap.get("name")]; LongColumnVector permissionVector = (LongColumnVector) batch.cols[columnMap.get("perms")]; for (int row = 0; row < batch.size; row++) { byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row); byte[] targetPrefix; if (Arrays.equals(targetType, cntType)) { targetPrefix = cntPrefix; } else if (Arrays.equals(targetType, dirType)) { targetPrefix = dirPrefix; } else if (Arrays.equals(targetType, revType)) { targetPrefix = revPrefix; } else { continue; } byte[] src = Bytes.concat(dirPrefix, ORCTable.getBytesRow(srcVector, row)); byte[] dst = Bytes.concat(targetPrefix, ORCTable.getBytesRow(dstVector, row)); byte[] label = Base64.getEncoder().encode(ORCTable.getBytesRow(labelVector, row)); - long permission = ORCTable.getLongRow(permissionVector, row); - edgeCb.onEdge(src, dst, label, (int) permission); + Long permission = ORCTable.getLongRow(permissionVector, row); + edgeCb.onEdge(src, dst, label, permission != null ? permission.intValue() : 0); } }, Set.of("directory_id", "target", "type", "name", "perms")); } } public static class RevisionOrcTable extends SwhOrcTable { public RevisionOrcTable(File tableDir) { super(tableDir); } @Override protected byte[] getSwhidPrefix() { return revPrefix; } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector revisionIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")]; BytesColumnVector directoryIdVector = (BytesColumnVector) batch.cols[columnMap.get("directory")]; for (int row = 0; row < batch.size; row++) { byte[] revisionId = Bytes.concat(revPrefix, ORCTable.getBytesRow(revisionIdVector, row)); byte[] directoryId = Bytes.concat(dirPrefix, ORCTable.getBytesRow(directoryIdVector, row)); nodeCb.onNode(revisionId); edgeCb.onEdge(revisionId, directoryId, null, -1); } }, Set.of("id", "directory")); } } public static class RevisionHistoryOrcTable extends SwhOrcTable { public RevisionHistoryOrcTable(File tableDir) { super(tableDir); } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector revisionIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")]; BytesColumnVector parentIdVector = (BytesColumnVector) batch.cols[columnMap.get("parent_id")]; for (int row = 0; row < batch.size; row++) { byte[] parentId = Bytes.concat(revPrefix, ORCTable.getBytesRow(parentIdVector, row)); byte[] revisionId = Bytes.concat(revPrefix, ORCTable.getBytesRow(revisionIdVector, row)); edgeCb.onEdge(revisionId, parentId, null, -1); } }, Set.of("id", "parent_id")); } } public static class ReleaseOrcTable extends SwhOrcTable { public ReleaseOrcTable(File tableDir) { super(tableDir); } @Override protected byte[] getSwhidPrefix() { return relPrefix; } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { byte[] cntType = "content".getBytes(); byte[] dirType = "directory".getBytes(); byte[] revType = "revision".getBytes(); byte[] relType = "release".getBytes(); orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector releaseIdVector = (BytesColumnVector) batch.cols[columnMap.get("id")]; BytesColumnVector targetIdVector = (BytesColumnVector) batch.cols[columnMap.get("target")]; BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("target_type")]; for (int row = 0; row < batch.size; row++) { byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row); byte[] targetPrefix; if (Arrays.equals(targetType, cntType)) { targetPrefix = cntPrefix; } else if (Arrays.equals(targetType, dirType)) { targetPrefix = dirPrefix; } else if (Arrays.equals(targetType, revType)) { targetPrefix = revPrefix; } else if (Arrays.equals(targetType, relType)) { targetPrefix = relPrefix; } else { continue; } byte[] releaseId = Bytes.concat(relPrefix, ORCTable.getBytesRow(releaseIdVector, row)); byte[] targetId = Bytes.concat(targetPrefix, ORCTable.getBytesRow(targetIdVector, row)); nodeCb.onNode(releaseId); edgeCb.onEdge(releaseId, targetId, null, -1); } }, Set.of("id", "target", "target_type")); } } public static class SnapshotOrcTable extends SwhOrcTable { public SnapshotOrcTable(File tableDir) { super(tableDir); } @Override protected byte[] getSwhidPrefix() { return snpPrefix; } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { readIdColumn(nodeCb); } } public static class SnapshotBranchOrcTable extends SwhOrcTable { public SnapshotBranchOrcTable(File tableDir) { super(tableDir); } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { byte[] cntType = "content".getBytes(); byte[] dirType = "directory".getBytes(); byte[] revType = "revision".getBytes(); byte[] relType = "release".getBytes(); orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("snapshot_id")]; BytesColumnVector targetIdVector = (BytesColumnVector) batch.cols[columnMap.get("target")]; BytesColumnVector targetTypeVector = (BytesColumnVector) batch.cols[columnMap.get("target_type")]; BytesColumnVector branchNameVector = (BytesColumnVector) batch.cols[columnMap.get("name")]; for (int row = 0; row < batch.size; row++) { byte[] targetType = ORCTable.getBytesRow(targetTypeVector, row); byte[] targetPrefix; if (Arrays.equals(targetType, cntType)) { targetPrefix = cntPrefix; } else if (Arrays.equals(targetType, dirType)) { targetPrefix = dirPrefix; } else if (Arrays.equals(targetType, revType)) { targetPrefix = revPrefix; } else if (Arrays.equals(targetType, relType)) { targetPrefix = relPrefix; } else { continue; } byte[] snapshotId = Bytes.concat(snpPrefix, ORCTable.getBytesRow(snapshotIdVector, row)); byte[] targetId = Bytes.concat(targetPrefix, ORCTable.getBytesRow(targetIdVector, row)); byte[] branchName = Base64.getEncoder().encode(ORCTable.getBytesRow(branchNameVector, row)); nodeCb.onNode(snapshotId); edgeCb.onEdge(snapshotId, targetId, branchName, -1); } }, Set.of("snapshot_id", "name", "target", "target_type")); } } public static class OriginVisitStatusOrcTable extends SwhOrcTable { public OriginVisitStatusOrcTable(File tableDir) { super(tableDir); } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector originUrlVector = (BytesColumnVector) batch.cols[columnMap.get("origin")]; BytesColumnVector snapshotIdVector = (BytesColumnVector) batch.cols[columnMap.get("snapshot")]; for (int row = 0; row < batch.size; row++) { byte[] originId = urlToOriginId(ORCTable.getBytesRow(originUrlVector, row)); byte[] snapshot_id = ORCTable.getBytesRow(snapshotIdVector, row); - if (snapshot_id.length == 0) { + if (snapshot_id == null || snapshot_id.length == 0) { continue; } edgeCb.onEdge(Bytes.concat(oriPrefix, originId), Bytes.concat(snpPrefix, snapshot_id), null, -1); } }, Set.of("origin", "snapshot")); } } public static class OriginVisitOrcTable extends SwhOrcTable { public OriginVisitOrcTable(File tableDir) { super(tableDir); } } public static class OriginOrcTable extends SwhOrcTable { public OriginOrcTable(File tableDir) { super(tableDir); } @Override protected byte[] getSwhidPrefix() { return oriPrefix; } @Override protected byte[] idToSwhid(byte[] id) { return Bytes.concat(getSwhidPrefix(), urlToOriginId(id)); } @Override protected String getIdColumn() { return "url"; } @Override public void readEdges(GraphDataset.NodeCallback nodeCb, GraphDataset.EdgeCallback edgeCb) throws IOException { readIdColumn(nodeCb); } public void readURLs(BytesCallback cb) throws IOException { orcTable.readOrcTable((batch, columnMap) -> { BytesColumnVector urlVector = (BytesColumnVector) batch.cols[columnMap.get(getIdColumn())]; for (int row = 0; row < batch.size; row++) { byte[] id = idToSwhid(ORCTable.getBytesRow(urlVector, row)); byte[] url = Base64.getEncoder().encode(ORCTable.getBytesRow(urlVector, row)); cb.onBytes(id, url); } }, Set.of(getIdColumn())); } } /** * Export an ORC graph to the CSV edge dataset format as two different files, * nodes.csv.zst and edges.csv.zst. */ public static void exportToCsvDataset(String orcDataset, String csvDatasetBasename) throws IOException { ORCGraphDataset dataset = new ORCGraphDataset(orcDataset); File nodesFile = new File(csvDatasetBasename + ".nodes.csv.zst"); File edgesFile = new File(csvDatasetBasename + ".edges.csv.zst"); FastBufferedOutputStream nodesOut = new FastBufferedOutputStream( new ZstdOutputStream(new FileOutputStream(nodesFile))); FastBufferedOutputStream edgesOut = new FastBufferedOutputStream( new ZstdOutputStream(new FileOutputStream(edgesFile))); dataset.readEdges((node) -> { nodesOut.write(node); nodesOut.write('\n'); }, (src, dst, label, perms) -> { edgesOut.write(src); edgesOut.write(' '); edgesOut.write(dst); if (label != null) { edgesOut.write(' '); edgesOut.write(label); edgesOut.write(' '); } if (perms != -1) { edgesOut.write(' '); edgesOut.write(Long.toString(perms).getBytes()); } edgesOut.write('\n'); }); } /** * Print all the edges of the graph to stdout. Can be piped to * {@link it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph} to import the graph dataset and convert * it to a {@link it.unimi.dsi.big.webgraph.BVGraph}. */ public static void printSimpleEdges(String orcDataset) throws IOException { ORCGraphDataset dataset = new ORCGraphDataset(orcDataset); FastBufferedOutputStream out = new FastBufferedOutputStream(System.out); dataset.readEdges((node) -> { }, (src, dst, label, perms) -> { out.write(src); out.write(' '); out.write(dst); out.write('\n'); }); out.flush(); } public static void main(String[] args) throws IOException { printSimpleEdges(args[0]); } } diff --git a/java/src/main/java/org/softwareheritage/graph/compress/WriteNodeProperties.java b/java/src/main/java/org/softwareheritage/graph/compress/WriteNodeProperties.java index fed8608..e55d8a4 100644 --- a/java/src/main/java/org/softwareheritage/graph/compress/WriteNodeProperties.java +++ b/java/src/main/java/org/softwareheritage/graph/compress/WriteNodeProperties.java @@ -1,268 +1,273 @@ package org.softwareheritage.graph.compress; import com.martiansoftware.jsap.*; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.BigArrays; import it.unimi.dsi.fastutil.ints.IntBigArrays; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.fastutil.shorts.ShortBigArrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.softwareheritage.graph.maps.NodeIdMap; import org.softwareheritage.graph.compress.ORCGraphDataset.*; import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.*; import java.util.concurrent.atomic.AtomicLong; /** * This class is used to extract the node properties from the graph dataset, and write them to a set * of property files. * * Note: because the nodes are not sorted by type, we have an incentive to minimize the number of * "holes" in offset arrays. This is why many unrelated properties are cobbled together in the same * files (e.g. commit messages, tag messages and origin URLs are all in a "message" property file). * Once we migrate to a TypedImmutableGraph as the underlying storage of the graph, we can split all * the different properties in their own files. */ public class WriteNodeProperties { final static Logger logger = LoggerFactory.getLogger(WriteNodeProperties.class); private final ORCGraphDataset dataset; private final String graphBasename; private final NodeIdMap nodeIdMap; private final long numNodes; public WriteNodeProperties(String dataset, String graphBasename, NodeIdMap nodeIdMap) { this.dataset = new ORCGraphDataset(dataset); this.graphBasename = graphBasename; this.nodeIdMap = nodeIdMap; this.numNodes = nodeIdMap.size64(); } public static String[] PROPERTY_WRITERS = new String[]{"timestamps", "content_length", "content_is_skipped", "person_ids", "messages", "tag_names",}; private static JSAPResult parseArgs(String[] args) { JSAPResult config = null; try { SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{ new UnflaggedOption("dataset", JSAP.STRING_PARSER, JSAP.REQUIRED, "Path to the ORC graph dataset"), new UnflaggedOption("graphBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, "Basename of the output graph"), new FlaggedOption("properties", JSAP.STRING_PARSER, "*", JSAP.NOT_REQUIRED, 'p', "properties", "Properties to write, comma separated (default: all). Possible choices: " + String.join(",", PROPERTY_WRITERS)),}); config = jsap.parse(args); if (jsap.messagePrinted()) { System.exit(1); } } catch (JSAPException e) { System.err.println("Usage error: " + e.getMessage()); System.exit(1); } return config; } public static void main(String[] argv) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { JSAPResult args = parseArgs(argv); String dataset = args.getString("dataset"); String graphBasename = args.getString("graphBasename"); NodeIdMap nodeIdMap = new NodeIdMap(graphBasename); Set properties; if (args.getString("properties").equals("*")) { properties = Set.of(PROPERTY_WRITERS); } else { properties = new HashSet<>(Arrays.asList(args.getString("properties").split(","))); } WriteNodeProperties writer = new WriteNodeProperties(dataset, graphBasename, nodeIdMap); if (properties.contains("timestamps")) { writer.writeTimestamps(); } if (properties.contains("content_length")) { writer.writeContentLength(); } if (properties.contains("content_is_skipped")) { writer.writeContentIsSkipped(); } if (properties.contains("person_ids")) { writer.writePersonIds(); } if (properties.contains("messages")) { writer.writeMessages(); } if (properties.contains("tag_names")) { writer.writeTagNames(); } } public void writeContentLength() throws IOException { logger.info("Writing content lengths"); long[][] valueArray = LongBigArrays.newBigArray(numNodes); BigArrays.fill(valueArray, -1); for (String tableName : new String[]{"content", "skipped_content"}) { SwhOrcTable table = dataset.getTable(tableName); + if (table == null) { + continue; + } table.readLongColumn("length", (swhid, value) -> { long id = nodeIdMap.getNodeId(swhid); BigArrays.set(valueArray, id, value); }); } BinIO.storeLongs(valueArray, graphBasename + ".property.content.length.bin"); } public void writeContentIsSkipped() throws IOException { LongArrayBitVector isSkippedBitVector = LongArrayBitVector.ofLength(numNodes); SwhOrcTable table = dataset.getTable("skipped_content"); - table.readIdColumn((swhid) -> { - long id = nodeIdMap.getNodeId(swhid); - isSkippedBitVector.set(id); - }); + if (table != null) { + table.readIdColumn((swhid) -> { + long id = nodeIdMap.getNodeId(swhid); + isSkippedBitVector.set(id); + }); + } BinIO.storeObject(isSkippedBitVector, graphBasename + ".property.content.is_skipped.bin"); } public void writeTimestamps() throws IOException { logger.info("Writing author/committer timestamps for release + revision"); SwhOrcTable releaseTable = dataset.getTable("release"); SwhOrcTable revisionTable = dataset.getTable("revision"); long[][] timestampArray = LongBigArrays.newBigArray(numNodes); short[][] timestampOffsetArray = ShortBigArrays.newBigArray(numNodes); // Author timestamps BigArrays.fill(timestampArray, Long.MIN_VALUE); BigArrays.fill(timestampOffsetArray, Short.MIN_VALUE); releaseTable.readTimestampColumn("date", "date_offset", (swhid, date, dateOffset) -> { long id = nodeIdMap.getNodeId(swhid); BigArrays.set(timestampArray, id, date); BigArrays.set(timestampOffsetArray, id, dateOffset); }); revisionTable.readTimestampColumn("date", "date_offset", (swhid, date, dateOffset) -> { long id = nodeIdMap.getNodeId(swhid); BigArrays.set(timestampArray, id, date); BigArrays.set(timestampOffsetArray, id, dateOffset); }); BinIO.storeLongs(timestampArray, graphBasename + ".property.author_timestamp.bin"); BinIO.storeShorts(timestampOffsetArray, graphBasename + ".property.author_timestamp_offset.bin"); // Committer timestamps BigArrays.fill(timestampArray, Long.MIN_VALUE); BigArrays.fill(timestampOffsetArray, Short.MIN_VALUE); revisionTable.readTimestampColumn("committer_date", "committer_offset", (swhid, date, dateOffset) -> { long id = nodeIdMap.getNodeId(swhid); BigArrays.set(timestampArray, id, date); BigArrays.set(timestampOffsetArray, id, dateOffset); }); BinIO.storeLongs(timestampArray, graphBasename + ".property.committer_timestamp.bin"); BinIO.storeShorts(timestampOffsetArray, graphBasename + ".property.committer_timestamp_offset.bin"); } public void writePersonIds() throws IOException { logger.info("Writing author/committer IDs for release + revision"); Object2LongFunction personIdMap = NodeIdMap.loadMph(graphBasename + ".persons.mph"); SwhOrcTable releaseTable = dataset.getTable("release"); SwhOrcTable revisionTable = dataset.getTable("revision"); int[][] personArray = IntBigArrays.newBigArray(numNodes); // Author IDs BigArrays.fill(personArray, -1); releaseTable.readBytes64Column("author", (swhid, personBase64) -> { long id = nodeIdMap.getNodeId(swhid); BigArrays.set(personArray, id, (int) personIdMap.getLong(personBase64)); }); revisionTable.readBytes64Column("author", (swhid, personBase64) -> { long id = nodeIdMap.getNodeId(swhid); BigArrays.set(personArray, id, (int) personIdMap.getLong(personBase64)); }); BinIO.storeInts(personArray, graphBasename + ".property.author_id.bin"); // Committer IDs BigArrays.fill(personArray, -1); revisionTable.readBytes64Column("committer", (swhid, personBase64) -> { long id = nodeIdMap.getNodeId(swhid); BigArrays.set(personArray, id, (int) personIdMap.getLong(personBase64)); }); BinIO.storeInts(personArray, graphBasename + ".property.committer_id.bin"); } public void writeMessages() throws IOException { logger.info("Writing messages for release + revision, and URLs for origins"); long[][] messageOffsetArray = LongBigArrays.newBigArray(numNodes); BigArrays.fill(messageOffsetArray, -1); FastBufferedOutputStream messageStream = new FastBufferedOutputStream( new FileOutputStream(graphBasename + ".property.message.bin")); AtomicLong offset = new AtomicLong(0L); SwhOrcTable releaseTable = dataset.getTable("release"); releaseTable.readBytes64Column("message", (swhid, messageBase64) -> { long id = nodeIdMap.getNodeId(swhid); messageStream.write(messageBase64); messageStream.write('\n'); BigArrays.set(messageOffsetArray, id, offset.longValue()); offset.addAndGet(messageBase64.length + 1); }); SwhOrcTable revisionTable = dataset.getTable("revision"); revisionTable.readBytes64Column("message", (swhid, messageBase64) -> { long id = nodeIdMap.getNodeId(swhid); messageStream.write(messageBase64); messageStream.write('\n'); BigArrays.set(messageOffsetArray, id, offset.longValue()); offset.addAndGet(messageBase64.length + 1); }); OriginOrcTable originTable = (OriginOrcTable) dataset.getTable("origin"); originTable.readURLs((swhid, messageBase64) -> { long id = nodeIdMap.getNodeId(swhid); messageStream.write(messageBase64); messageStream.write('\n'); BigArrays.set(messageOffsetArray, id, offset.longValue()); offset.addAndGet(messageBase64.length + 1); }); // TODO: check which one is optimal in terms of memory/disk usage, EF vs mapped file BinIO.storeLongs(messageOffsetArray, graphBasename + ".property.message.offset.bin"); // EliasFanoLongBigList messageOffsetEF = new // EliasFanoLongBigList(LongBigArrayBigList.wrap(messageOffsetArray)); // BinIO.storeObject(messageOffsetEF, graphBasename + ".property.message.offset.bin"); messageStream.close(); } public void writeTagNames() throws IOException { logger.info("Writing tag names for release"); long[][] tagNameOffsetArray = LongBigArrays.newBigArray(numNodes); BigArrays.fill(tagNameOffsetArray, -1); FastBufferedOutputStream tagNameStream = new FastBufferedOutputStream( new FileOutputStream(graphBasename + ".property.tag_name.bin")); AtomicLong offset = new AtomicLong(0L); SwhOrcTable releaseTable = dataset.getTable("release"); releaseTable.readBytes64Column("name", (swhid, tagNameBase64) -> { long id = nodeIdMap.getNodeId(swhid); tagNameStream.write(tagNameBase64); tagNameStream.write('\n'); BigArrays.set(tagNameOffsetArray, id, offset.longValue()); offset.addAndGet(tagNameBase64.length + 1); }); BinIO.storeLongs(tagNameOffsetArray, graphBasename + ".property.tag_name.offset.bin"); // EliasFanoLongBigList tagNameOffsetEF = new // EliasFanoLongBigList(LongBigArrayBigList.wrap(tagNameOffsetArray)); // BinIO.storeObject(tagNameOffsetEF, graphBasename + ".property.tag_name.offset.bin"); tagNameStream.close(); } }