Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345515
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
50 KB
Subscribers
None
View Options
diff --git a/java/README.md b/java/README.md
index 623e98e..8d7edf7 100644
--- a/java/README.md
+++ b/java/README.md
@@ -1,51 +1,51 @@
Graph service - Java backend
============================
Server side Java RPC API.
Build
-----
```bash
$ mvn compile assembly:single
```
Start RPC API
-------------
```bash
$ java -cp target/swh-graph-*.jar \
org.softwareheritage.graph.server.App \
<compressed_graph_path>
```
Default port is 5009 (use the `--port` option to change port number). If you
need timings metadata send back to the client in addition to the result, use the
`--timings` flag.
Tests
-----
Unit tests rely on test data that are already available in the Git repository
(under `src/swh/graph/tests/dataset/`). You generally only need to run them
using Maven:
```bash
$ mvn test
```
In case you want to regenerate the test data:
```bash
# Graph compression
$ cd src/swh/graph/tests/dataset
$ ./generate_graph.sh
$ cd ../../../..
$ mvn compile assembly:single
# Dump mapping files
$ java -cp target/swh-graph-*.jar \
- org.softwareheritage.graph.maps.NodeMapBuilder \
+ org.softwareheritage.graph.compress.NodeMapBuilder \
src/swh/graph/tests/dataset/example.nodes.csv.gz \
src/swh/graph/tests/dataset/output/example
```
diff --git a/java/src/main/java/org/softwareheritage/graph/maps/LabelMapBuilder.java b/java/src/main/java/org/softwareheritage/graph/compress/LabelMapBuilder.java
similarity index 99%
rename from java/src/main/java/org/softwareheritage/graph/maps/LabelMapBuilder.java
rename to java/src/main/java/org/softwareheritage/graph/compress/LabelMapBuilder.java
index 0ace4bd..8b824e1 100644
--- a/java/src/main/java/org/softwareheritage/graph/maps/LabelMapBuilder.java
+++ b/java/src/main/java/org/softwareheritage/graph/compress/LabelMapBuilder.java
@@ -1,527 +1,528 @@
-package org.softwareheritage.graph.maps;
+package org.softwareheritage.graph.compress;
import com.martiansoftware.jsap.*;
import it.unimi.dsi.big.webgraph.LazyLongIterator;
import it.unimi.dsi.big.webgraph.labelling.ArcLabelledImmutableGraph;
import it.unimi.dsi.big.webgraph.labelling.BitStreamArcLabelledImmutableGraph;
import it.unimi.dsi.fastutil.Size64;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
import it.unimi.dsi.fastutil.objects.Object2LongFunction;
import it.unimi.dsi.io.OutputBitStream;
import it.unimi.dsi.logging.ProgressLogger;
import it.unimi.dsi.big.webgraph.BVGraph;
import it.unimi.dsi.big.webgraph.ImmutableGraph;
import it.unimi.dsi.big.webgraph.NodeIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.softwareheritage.graph.labels.DirEntry;
import org.softwareheritage.graph.labels.SwhLabel;
+import org.softwareheritage.graph.maps.NodeIdMap;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class LabelMapBuilder {
final static String SORT_BUFFER_SIZE = "40%";
final static Logger logger = LoggerFactory.getLogger(LabelMapBuilder.class);
String graphPath;
String outputGraphPath;
String debugPath;
String tmpDir;
ImmutableGraph graph;
long numNodes;
long numArcs;
NodeIdMap nodeIdMap;
Object2LongFunction<byte[]> filenameMph;
long numFilenames;
int totalLabelWidth;
public LabelMapBuilder(String graphPath, String debugPath, String outputGraphPath, String tmpDir)
throws IOException {
this.graphPath = graphPath;
if (outputGraphPath == null) {
this.outputGraphPath = graphPath;
} else {
this.outputGraphPath = outputGraphPath;
}
this.debugPath = debugPath;
this.tmpDir = tmpDir;
// Load the graph in offline mode to retrieve the number of nodes/edges,
// then immediately destroy it. XXX: not even needed?
// ImmutableGraph graphOffline = BVGraph.loadMapped(graphPath);
graph = BVGraph.loadMapped(graphPath);
numArcs = graph.numArcs();
numNodes = graph.numNodes();
nodeIdMap = new NodeIdMap(graphPath);
filenameMph = NodeIdMap.loadMph(graphPath + "-labels.mph");
numFilenames = getMPHSize(filenameMph);
totalLabelWidth = DirEntry.labelWidth(numFilenames);
}
private static JSAPResult parse_args(String[] args) {
JSAPResult config = null;
try {
SimpleJSAP jsap = new SimpleJSAP(LabelMapBuilder.class.getName(), "", new Parameter[]{
new FlaggedOption("graphPath", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED, 'g', "graph",
"Basename of the compressed graph"),
new FlaggedOption("debugPath", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'd',
"debug-path", "Store the intermediate representation here for debug"),
new FlaggedOption("outputGraphPath", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'o',
"output-graph", "Basename of the output graph, same as --graph if not specified"),
new FlaggedOption("tmpDir", JSAP.STRING_PARSER, "tmp", JSAP.NOT_REQUIRED, 't', "tmp",
"Temporary directory path"),});
config = jsap.parse(args);
if (jsap.messagePrinted()) {
System.exit(1);
}
} catch (JSAPException e) {
e.printStackTrace();
}
return config;
}
public static void main(String[] args) throws IOException, InterruptedException {
JSAPResult config = parse_args(args);
String graphPath = config.getString("graphPath");
String outputGraphPath = config.getString("outputGraphPath");
String tmpDir = config.getString("tmpDir");
String debugPath = config.getString("debugPath");
LabelMapBuilder builder = new LabelMapBuilder(graphPath, debugPath, outputGraphPath, tmpDir);
logger.info("Loading graph and MPH functions...");
builder.computeLabelMap();
}
static long getMPHSize(Object2LongFunction<byte[]> mph) {
return (mph instanceof Size64) ? ((Size64) mph).size64() : mph.size();
}
void computeLabelMap() throws IOException, InterruptedException {
this.loadGraph();
// this.computeLabelMapSort();
this.computeLabelMapBsort();
}
void computeLabelMapSort() throws IOException {
// Pass the intermediate representation to sort(1) so that we see the labels in the order they will
// appear in the label file.
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("sort", "-k1,1n", "-k2,2n", // Numerical sort
"--numeric-sort", "--buffer-size", SORT_BUFFER_SIZE, "--temporary-directory", tmpDir);
Process sort = processBuilder.start();
BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream());
// BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream());
FastBufferedInputStream sort_stdout = new FastBufferedInputStream(sort.getInputStream());
final FastBufferedInputStream fbis = new FastBufferedInputStream(System.in);
hashLabelStream(fbis, new EdgeLabelLineWriter() {
@Override
public void writeLine(long src, long dst, long filenameId, int permission) throws IOException {
sort_stdin.write((src + "\t" + dst + "\t" + filenameId + "\t" + permission + "\n")
.getBytes(StandardCharsets.US_ASCII));
}
});
sort_stdin.close();
EdgeLabelLineIterator mapLines = new TextualEdgeLabelLineIterator(sort_stdout);
writeLabels(mapLines);
logger.info("Done");
}
void computeLabelMapBsort() throws IOException, InterruptedException {
// Pass the intermediate representation to bsort(1) so that we see the labels in the order they will
// appear in the label file.
String tmpFile = tmpDir + "/labelsToSort.bin";
final FastBufferedInputStream fbis = new FastBufferedInputStream(System.in);
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tmpFile)));
// Number of bytes to represent a node.
final int nodeBytes = (Long.SIZE - Long.numberOfLeadingZeros(graph.numNodes())) / 8 + 1;
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
logger.info("Writing labels to a packed binary files (node bytes: {})", nodeBytes);
hashLabelStream(fbis, new EdgeLabelLineWriter() {
@Override
public void writeLine(long src, long dst, long filenameId, int permission) throws IOException {
buffer.putLong(0, src);
out.write(buffer.array(), Long.BYTES - nodeBytes, nodeBytes);
buffer.putLong(0, dst);
out.write(buffer.array(), Long.BYTES - nodeBytes, nodeBytes);
out.writeLong(filenameId);
out.writeInt(permission);
}
});
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("/home/seirl/bsort/src/bsort", "-v", "-r",
String.valueOf(nodeBytes * 2 + Long.BYTES + Integer.BYTES), "-k", String.valueOf(nodeBytes * 2),
tmpFile);
Process sort = processBuilder.start();
sort.waitFor();
final DataInputStream sortedLabels = new DataInputStream(new BufferedInputStream(new FileInputStream(tmpFile)));
BinaryEdgeLabelLineIterator mapLines = new BinaryEdgeLabelLineIterator(sortedLabels, nodeBytes);
writeLabels(mapLines);
logger.info("Done");
}
void loadGraph() throws IOException {
}
void hashLabelStream(FastBufferedInputStream input, EdgeLabelLineWriter writer) throws IOException {
// Compute intermediate representation and write it on :
// "<src node id> <dst node id> <label ids>\n"
ProgressLogger plInter = new ProgressLogger(logger, 10, TimeUnit.SECONDS);
plInter.itemsName = "edges";
plInter.expectedUpdates = this.numArcs;
plInter.start("Hashing the label stream");
var charset = StandardCharsets.US_ASCII;
byte[] array = new byte[1024];
for (long line = 0;; line++) {
int start = 0, len;
while ((len = input.readLine(array, start, array.length - start,
FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) {
start += len;
array = ByteArrays.grow(array, array.length + 1);
}
if (len == -1)
break; // EOF
final int lineLength = start + len;
// Skip whitespace at the start of the line.
int offset = 0;
while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
offset++;
if (offset == lineLength) {
continue;
}
if (array[0] == '#')
continue;
// Scan source id.
start = offset;
while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
offset++;
final byte[] ss = Arrays.copyOfRange(array, start, offset);
// Skip whitespace between identifiers.
while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
offset++;
if (offset == lineLength) {
logger.error("Error at line " + line + ": no target");
continue;
}
// Scan target ID
start = offset;
while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
offset++;
final byte[] ts = Arrays.copyOfRange(array, start, offset);
// Skip whitespace between identifiers.
while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
offset++;
if (offset == lineLength)
continue; // No label, skip
// Scan label
start = offset;
while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
offset++;
final byte[] ls = Arrays.copyOfRange(array, start, offset);
// Skip whitespace between identifiers.
while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
offset++;
// Scan permission
int permission = 0;
if (offset < lineLength) {
start = offset;
while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
offset++;
permission = Integer.parseInt(new String(array, start, offset - start, charset));
}
// System.err.format("DEBUG: read %s %s %s %d\n", ss, ts, ls, permission);
long srcNode = nodeIdMap.getNodeId(ss);
long dstNode = nodeIdMap.getNodeId(ts);
long filenameId = filenameMph.getLong(ls);
writer.writeLine(srcNode, dstNode, filenameId, permission);
plInter.lightUpdate();
}
plInter.done();
}
void writeLabels(EdgeLabelLineIterator mapLines) throws IOException {
// Get the sorted output and write the labels and label offsets
ProgressLogger plLabels = new ProgressLogger(logger, 10, TimeUnit.SECONDS);
plLabels.itemsName = "edges";
plLabels.expectedUpdates = this.numArcs;
plLabels.start("Writing the labels to the label file.");
FileWriter debugFile = null;
if (debugPath != null) {
debugFile = new FileWriter(debugPath);
}
OutputBitStream labels = new OutputBitStream(
new File(outputGraphPath + "-labelled" + BitStreamArcLabelledImmutableGraph.LABELS_EXTENSION));
OutputBitStream offsets = new OutputBitStream(
new File(outputGraphPath + "-labelled" + BitStreamArcLabelledImmutableGraph.LABEL_OFFSETS_EXTENSION));
offsets.writeGamma(0);
EdgeLabelLine line = new EdgeLabelLine(-1, -1, -1, -1);
NodeIterator it = graph.nodeIterator();
boolean started = false;
ArrayList<DirEntry> labelBuffer = new ArrayList<>(128);
while (it.hasNext()) {
long srcNode = it.nextLong();
int bits = 0;
LazyLongIterator s = it.successors();
long dstNode;
while ((dstNode = s.nextLong()) >= 0) {
while (line != null && line.srcNode <= srcNode && line.dstNode <= dstNode) {
if (line.srcNode == srcNode && line.dstNode == dstNode) {
labelBuffer.add(new DirEntry(line.filenameId, line.permission));
if (debugFile != null) {
debugFile.write(line.srcNode + " " + line.dstNode + " " + line.filenameId + " "
+ line.permission + "\n");
}
}
if (!mapLines.hasNext())
break;
line = mapLines.next();
if (!started) {
plLabels.start("Writing label map to file...");
started = true;
}
}
SwhLabel l = new SwhLabel("edgelabel", totalLabelWidth, labelBuffer.toArray(new DirEntry[0]));
labelBuffer.clear();
bits += l.toBitStream(labels, -1);
plLabels.lightUpdate();
}
offsets.writeGamma(bits);
}
labels.close();
offsets.close();
plLabels.done();
if (debugFile != null) {
debugFile.close();
}
PrintWriter pw = new PrintWriter(
new FileWriter((new File(outputGraphPath)).getName() + "-labelled.properties"));
pw.println(ImmutableGraph.GRAPHCLASS_PROPERTY_KEY + " = " + BitStreamArcLabelledImmutableGraph.class.getName());
pw.println(BitStreamArcLabelledImmutableGraph.LABELSPEC_PROPERTY_KEY + " = " + SwhLabel.class.getName()
+ "(DirEntry," + totalLabelWidth + ")");
pw.println(ArcLabelledImmutableGraph.UNDERLYINGGRAPH_PROPERTY_KEY + " = " + outputGraphPath);
pw.close();
}
public abstract static class EdgeLabelLineWriter {
public abstract void writeLine(long src, long dst, long filenameId, int permission) throws IOException;
}
public static class EdgeLabelLine {
public long srcNode;
public long dstNode;
public long filenameId;
public int permission;
public EdgeLabelLine(long labelSrcNode, long labelDstNode, long labelFilenameId, int labelPermission) {
this.srcNode = labelSrcNode;
this.dstNode = labelDstNode;
this.filenameId = labelFilenameId;
this.permission = labelPermission;
}
}
public abstract static class EdgeLabelLineIterator implements Iterator<EdgeLabelLine> {
@Override
public abstract boolean hasNext();
@Override
public abstract EdgeLabelLine next();
}
public static class ScannerEdgeLabelLineIterator extends EdgeLabelLineIterator {
private final Scanner scanner;
public ScannerEdgeLabelLineIterator(InputStream input) {
this.scanner = new Scanner(input);
}
@Override
public boolean hasNext() {
return this.scanner.hasNext();
}
@Override
public EdgeLabelLine next() {
String line = scanner.nextLine();
String[] parts = line.split("\\t");
return new EdgeLabelLine(Long.parseLong(parts[0]), Long.parseLong(parts[1]), Long.parseLong(parts[2]),
Integer.parseInt(parts[3]));
/*
* String line = scanner.nextLine(); long src = scanner.nextLong(); long dst = scanner.nextLong();
* long label = scanner.nextLong(); int permission = scanner.nextInt(); return new
* EdgeLabelLine(src, dst, label, permission);
*/
}
}
public static class TextualEdgeLabelLineIterator extends EdgeLabelLineIterator {
private final FastBufferedInputStream input;
private final Charset charset;
private byte[] array;
boolean finished;
public TextualEdgeLabelLineIterator(FastBufferedInputStream input) {
this.input = input;
this.finished = false;
this.charset = StandardCharsets.US_ASCII;
this.array = new byte[1024];
}
@Override
public boolean hasNext() {
return !this.finished;
}
@Override
public EdgeLabelLine next() {
int start = 0, len;
try {
while ((len = input.readLine(array, start, array.length - start,
FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) {
start += len;
array = ByteArrays.grow(array, array.length + 1);
}
} catch (IOException e) {
e.printStackTrace();
return null;
}
if (len == -1) {
finished = true;
return null;
}
final int lineLength = start + len;
int offset = 0;
// Scan source id.
start = offset;
while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
offset++;
long src = Long.parseLong(new String(array, start, offset - start, charset));
// Skip whitespace between identifiers.
while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
offset++;
// Scan target ID
start = offset;
while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
offset++;
long dst = Long.parseLong(new String(array, start, offset - start, charset));
// Skip whitespace between identifiers.
while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
offset++;
// Scan label
start = offset;
while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
offset++;
long label = Long.parseLong(new String(array, start, offset - start, charset));
// Skip whitespace between identifiers.
while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ')
offset++;
// Scan permission
int permission = 0;
if (offset < lineLength) {
start = offset;
while (offset < lineLength && (array[offset] < 0 || array[offset] > ' '))
offset++;
permission = Integer.parseInt(new String(array, start, offset - start, charset));
}
return new EdgeLabelLine(src, dst, label, permission);
}
}
public static class BinaryEdgeLabelLineIterator extends EdgeLabelLineIterator {
private final int nodeBytes;
DataInputStream stream;
boolean finished;
ByteBuffer buffer;
public BinaryEdgeLabelLineIterator(DataInputStream stream, int nodeBytes) {
this.stream = stream;
this.nodeBytes = nodeBytes;
this.buffer = ByteBuffer.allocate(Long.BYTES);
this.finished = false;
}
@Override
public boolean hasNext() {
return !finished;
}
@Override
public EdgeLabelLine next() {
try {
// long src = stream.readLong();
// long dst = stream.readLong();
stream.readFully(this.buffer.array(), Long.BYTES - nodeBytes, nodeBytes);
this.buffer.position(0);
long src = this.buffer.getLong();
this.buffer.clear();
stream.readFully(this.buffer.array(), Long.BYTES - nodeBytes, nodeBytes);
this.buffer.position(0);
long dst = this.buffer.getLong();
this.buffer.clear();
long label = stream.readLong();
int perm = stream.readInt();
return new EdgeLabelLine(src, dst, label, perm);
} catch (EOFException e) {
finished = true;
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java b/java/src/main/java/org/softwareheritage/graph/compress/NodeMapBuilder.java
similarity index 98%
rename from java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java
rename to java/src/main/java/org/softwareheritage/graph/compress/NodeMapBuilder.java
index 626c747..14b1150 100644
--- a/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java
+++ b/java/src/main/java/org/softwareheritage/graph/compress/NodeMapBuilder.java
@@ -1,191 +1,193 @@
-package org.softwareheritage.graph.maps;
+package org.softwareheritage.graph.compress;
import it.unimi.dsi.bits.LongArrayBitVector;
import it.unimi.dsi.fastutil.BigArrays;
import it.unimi.dsi.fastutil.Size64;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigArrays;
import it.unimi.dsi.fastutil.longs.LongBigList;
import it.unimi.dsi.fastutil.objects.Object2LongFunction;
import it.unimi.dsi.io.FastBufferedReader;
import it.unimi.dsi.io.LineIterator;
import it.unimi.dsi.logging.ProgressLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.softwareheritage.graph.Node;
import org.softwareheritage.graph.SWHID;
+import org.softwareheritage.graph.maps.NodeIdMap;
+import org.softwareheritage.graph.maps.NodeTypesMap;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
/**
* Create maps needed at runtime by the graph service, in particular:
* <p>
* <ul>
* <li>SWHID → WebGraph long node id</li>
* <li>WebGraph long node id → SWHID (converse of the former)</li>
* <li>WebGraph long node id → SWH node type (enum)</li>
* </ul>
*
* @author The Software Heritage developers
*/
public class NodeMapBuilder {
final static String SORT_BUFFER_SIZE = "40%";
final static Logger logger = LoggerFactory.getLogger(NodeMapBuilder.class);
/**
* Main entrypoint.
*
* @param args command line arguments
*/
public static void main(String[] args) throws IOException {
if (args.length != 2) {
logger.error("Usage: COMPRESSED_GRAPH_BASE_NAME TEMP_DIR < NODES_CSV");
System.exit(1);
}
String graphPath = args[0];
String tmpDir = args[1];
logger.info("starting maps generation...");
precomputeNodeIdMap(graphPath, tmpDir);
logger.info("maps generation completed");
}
/**
* Computes and dumps on disk mapping files.
*
* @param graphPath path of the compressed graph
*/
static void precomputeNodeIdMap(String graphPath, String tmpDir) throws IOException {
ProgressLogger plSWHID2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS);
ProgressLogger plNode2SWHID = new ProgressLogger(logger, 10, TimeUnit.SECONDS);
plSWHID2Node.itemsName = "Hashing swhid→node";
plNode2SWHID.itemsName = "Building map node→swhid";
// first half of SWHID->node mapping: SWHID -> WebGraph MPH (long)
Object2LongFunction<byte[]> mphMap = NodeIdMap.loadMph(graphPath + ".mph");
long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size();
plSWHID2Node.expectedUpdates = nbIds;
plNode2SWHID.expectedUpdates = nbIds;
// second half of SWHID->node mapping: WebGraph MPH (long) -> BFS order (long)
long[][] bfsMap = LongBigArrays.newBigArray(nbIds);
logger.info("loading BFS order file...");
long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap);
logger.info("BFS order file loaded");
if (loaded != nbIds) {
logger.error("graph contains " + nbIds + " nodes, but read " + loaded);
System.exit(2);
}
/*
* Read on stdin a list of SWHIDs, hash them with MPH, then permute them according to the .order
* file
*/
FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(System.in, StandardCharsets.US_ASCII));
LineIterator swhidIterator = new LineIterator(buffer);
/*
* The WebGraph node id -> SWHID mapping can be obtained from the SWHID->node one by numerically
* sorting on node id and sequentially writing obtained SWHIDs to a binary map. Delegates the
* sorting job to /usr/bin/sort via pipes
*/
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("sort", "--numeric-sort", "--key", "2", "--buffer-size", SORT_BUFFER_SIZE,
"--temporary-directory", tmpDir);
Process sort = processBuilder.start();
BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream());
BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream());
// for the binary format of nodeToSwhidMap, see Python module swh.graph.swhid:IntToSwhidMap
try (BufferedOutputStream nodeToSwhidMap = new BufferedOutputStream(
new FileOutputStream(graphPath + NodeIdMap.NODE_TO_SWHID))) {
/*
* background handler for sort output, it will be fed SWHID/node pairs, and will itself fill
* nodeToSwhidMap as soon as data from sort is ready.
*/
SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToSwhidMap, plNode2SWHID);
outputHandler.start();
/*
* Type map from WebGraph node ID to SWH type. Used at runtime by pure Java graph traversals to
* efficiently check edge restrictions.
*/
final int nbBitsPerNodeType = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2));
LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds);
LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType);
plSWHID2Node.start("Hashing SWHIDs to fill sort input");
for (long iNode = 0; iNode < nbIds && swhidIterator.hasNext(); iNode++) {
String swhidStr = swhidIterator.next().toString();
SWHID swhid = new SWHID(swhidStr);
long mphId = mphMap.getLong(swhidStr.getBytes(StandardCharsets.US_ASCII));
long nodeId = BigArrays.get(bfsMap, mphId);
sort_stdin.write((swhidStr + "\t" + nodeId + "\n").getBytes(StandardCharsets.US_ASCII));
nodeTypesMap.set(nodeId, swhid.getType().ordinal());
plSWHID2Node.lightUpdate();
}
plSWHID2Node.done();
sort_stdin.close();
// write type map
logger.info("storing type map");
BinIO.storeObject(nodeTypesMap, graphPath + NodeTypesMap.NODE_TO_TYPE);
logger.info("type map stored");
// wait for nodeToSwhidMap filling
try {
logger.info("waiting for node2swhid map...");
int sortExitCode = sort.waitFor();
if (sortExitCode != 0) {
logger.error("sort returned non-zero exit code: " + sortExitCode);
System.exit(2);
}
outputHandler.join();
} catch (InterruptedException e) {
logger.error("processing of sort output failed with: " + e);
System.exit(2);
}
}
}
private static class SortOutputHandler extends Thread {
private final Scanner input;
private final OutputStream output;
private final ProgressLogger pl;
SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) {
this.input = new Scanner(input, StandardCharsets.US_ASCII);
this.output = output;
this.pl = pl;
}
public void run() {
boolean sortDone = false;
logger.info("node2swhid: waiting for sort output...");
while (input.hasNextLine()) {
if (!sortDone) {
sortDone = true;
this.pl.start("filling node2swhid map");
}
String line = input.nextLine(); // format: SWHID <TAB> NODE_ID
SWHID swhid = new SWHID(line.split("\\t")[0]); // get SWHID
try {
output.write(swhid.toBytes());
} catch (IOException e) {
logger.error("writing to node->SWHID map failed with: " + e);
}
this.pl.lightUpdate();
}
this.pl.done();
}
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java
index 7db9c03..1b4c917 100644
--- a/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java
+++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java
@@ -1,182 +1,183 @@
package org.softwareheritage.graph.maps;
import it.unimi.dsi.fastutil.Size64;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigList;
import it.unimi.dsi.fastutil.objects.Object2LongFunction;
import it.unimi.dsi.util.ByteBufferLongBigList;
import org.softwareheritage.graph.SWHID;
+import org.softwareheritage.graph.compress.NodeMapBuilder;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* Mapping between internal long node id and external SWHID.
* <p>
* The SWHID -> node mapping is obtained from hashing the SWHID with a MPH, then permuting it using
* an mmap()-ed .order file containing the graph permutation.
*
* The node -> SWHID reverse mapping is pre-computed and dumped on disk in the
* {@link NodeMapBuilder} class, then it is loaded here using mmap().
*
* @author The Software Heritage developers
* @see NodeMapBuilder
*/
public class NodeIdMap {
/** Fixed length of binary SWHID buffer */
public static final int SWHID_BIN_SIZE = 22;
/** File extension for the long node id to SWHID map */
public static final String NODE_TO_SWHID = ".node2swhid.bin";
/** Graph path and basename */
String graphPath;
/** mmap()-ed NODE_TO_SWHID file */
MapFile nodeToSwhMap;
/** Minimal perfect hash (MPH) function SWHID -> initial order */
Object2LongFunction<byte[]> mph;
/** mmap()-ed long list with the permutation initial order -> graph order */
LongBigList orderMap;
/** FileInputStream containing the permutation */
FileInputStream orderInputStream;
/**
* Constructor.
*
* @param graphPath full graph path
*/
public NodeIdMap(String graphPath) throws IOException {
this.graphPath = graphPath;
// node -> SWHID
this.nodeToSwhMap = new MapFile(graphPath + NODE_TO_SWHID, SWHID_BIN_SIZE);
// SWHID -> node
this.mph = loadMph(graphPath + ".mph");
this.orderInputStream = new FileInputStream(graphPath + ".order");
this.orderMap = ByteBufferLongBigList.map(orderInputStream.getChannel());
}
@SuppressWarnings("unchecked")
public static Object2LongFunction<byte[]> loadMph(String path) throws IOException {
Object obj;
try {
obj = BinIO.loadObject(path);
} catch (ClassNotFoundException e) {
throw new IOException(e.getMessage());
}
Object2LongFunction<byte[]> res = (Object2LongFunction<byte[]>) obj;
// Backward-compatibility for old maps parametrized with <String>.
// New maps should be parametrized with <byte[]>, which is faster.
try {
// Try to call it with bytes, will fail if it's a O2LF<String>.
res.getLong("42".getBytes(StandardCharsets.UTF_8));
} catch (ClassCastException e) {
class StringCompatibleByteFunction implements Object2LongFunction<byte[]>, Size64 {
private final Object2LongFunction<String> legacyFunction;
public StringCompatibleByteFunction(Object2LongFunction<String> legacyFunction) {
this.legacyFunction = legacyFunction;
}
@Override
public long getLong(Object o) {
byte[] bi = (byte[]) o;
return legacyFunction.getLong(new String(bi, StandardCharsets.UTF_8));
}
@Override
public int size() {
return legacyFunction.size();
}
@Override
public long size64() {
return (legacyFunction instanceof Size64)
? ((Size64) legacyFunction).size64()
: legacyFunction.size();
}
}
Object2LongFunction<String> mphLegacy = (Object2LongFunction<String>) obj;
return new StringCompatibleByteFunction(mphLegacy);
}
// End of backward-compatibility block
return res;
}
/**
* Converts byte-form SWHID to corresponding long node id. Low-level function, does not check if the
* SWHID is valid.
*
* @param swhid node represented as bytes
* @return corresponding node as a long id
*/
public long getNodeId(byte[] swhid) {
// 1. Hash the SWHID with the MPH to get its original ID
long origNodeId = mph.getLong(swhid);
// 2. Use the order permutation to get the position in the permuted graph
return this.orderMap.getLong(origNodeId);
}
/**
* Converts SWHID to corresponding long node id.
*
* @param swhid node represented as a {@link SWHID}
* @param checkExists if true, error if the SWHID is not present in the graph, if false the check
* will be skipped and invalid data will be returned for non-existing SWHIDs.
* @return corresponding node as a long id
* @see SWHID
*/
public long getNodeId(SWHID swhid, boolean checkExists) {
// Convert the SWHID to bytes and call getNodeId()
long nodeId = getNodeId(swhid.toString().getBytes(StandardCharsets.US_ASCII));
// Check that the position effectively corresponds to a real node using the reverse map.
// This is necessary because the MPH makes no guarantees on whether the input SWHID is valid.
if (!checkExists || getSWHID(nodeId).equals(swhid)) {
return nodeId;
} else {
throw new IllegalArgumentException("Unknown SWHID: " + swhid);
}
}
public long getNodeId(SWHID swhid) {
return getNodeId(swhid, true);
}
/**
* Converts a node long id to corresponding SWHID.
*
* @param nodeId node as a long id
* @return corresponding node as a {@link SWHID}
* @see SWHID
*/
public SWHID getSWHID(long nodeId) {
/*
* Each line in NODE_TO_SWHID is formatted as: swhid The file is ordered by nodeId, meaning node0's
* swhid is at line 0, hence we can read the nodeId-th line to get corresponding swhid
*/
if (nodeId < 0 || nodeId >= nodeToSwhMap.size()) {
throw new IllegalArgumentException("Node id " + nodeId + " should be between 0 and " + nodeToSwhMap.size());
}
return SWHID.fromBytes(nodeToSwhMap.readAtLine(nodeId));
}
/**
* Closes the mapping files.
*/
public void close() throws IOException {
orderInputStream.close();
nodeToSwhMap.close();
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java
index befe094..d3da61d 100644
--- a/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java
+++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java
@@ -1,54 +1,55 @@
package org.softwareheritage.graph.maps;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigList;
import org.softwareheritage.graph.Node;
import java.io.IOException;
/**
* Mapping between long node id and SWH node type as described in the
* <a href="https://docs.softwareheritage.org/devel/swh-model/data-model.html">data model</a>.
* <p>
- * The type mapping is pre-computed and dumped on disk in the {@link NodeMapBuilder} class, then it
- * is loaded in-memory here using <a href="http://fastutil.di.unimi.it/">fastutil</a> LongBigList.
- * To be space-efficient, the mapping is stored as a bitmap using minimum number of bits per
+ * The type mapping is pre-computed and dumped on disk in the
+ * {@link org.softwareheritage.graph.compress.NodeMapBuilder} class, then it is loaded in-memory
+ * here using <a href="http://fastutil.di.unimi.it/">fastutil</a> LongBigList. To be
+ * space-efficient, the mapping is stored as a bitmap using minimum number of bits per
* {@link Node.Type}.
*
* @author The Software Heritage developers
*/
public class NodeTypesMap {
/** File extension for the long node id to node type map */
public static final String NODE_TO_TYPE = ".node2type.map";
/**
* Array storing for each node its type
*/
public LongBigList nodeTypesMap;
/**
* Constructor.
*
* @param graphPath path and basename of the compressed graph
*/
public NodeTypesMap(String graphPath) throws IOException {
try {
nodeTypesMap = (LongBigList) BinIO.loadObject(graphPath + NODE_TO_TYPE);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Unknown class object: " + e);
}
}
/**
* Returns node type from a node long id.
*
* @param nodeId node as a long id
* @return corresponding {@link Node.Type} value
* @see org.softwareheritage.graph.Node.Type
*/
public Node.Type getType(long nodeId) {
long type = nodeTypesMap.getLong(nodeId);
return Node.Type.fromInt((int) type);
}
}
diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py
index 9e8c4a1..26b8e34 100644
--- a/swh/graph/webgraph.py
+++ b/swh/graph/webgraph.py
@@ -1,280 +1,280 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""WebGraph driver
"""
from datetime import datetime
from enum import Enum
import logging
import os
from pathlib import Path
import subprocess
from typing import Dict, List, Set
from swh.graph.config import check_config_compress
class CompressionStep(Enum):
MPH = 1
BV = 2
BFS = 3
PERMUTE_BFS = 4
TRANSPOSE_BFS = 5
SIMPLIFY = 6
LLP = 7
PERMUTE_LLP = 8
OBL = 9
COMPOSE_ORDERS = 10
STATS = 11
TRANSPOSE = 12
TRANSPOSE_OBL = 13
MAPS = 14
CLEAN_TMP = 15
def __str__(self):
return self.name
# full compression pipeline
COMP_SEQ = list(CompressionStep)
# Mapping from compression steps to shell commands implementing them. Commands
# will be executed by the shell, so be careful with meta characters. They are
# specified here as lists of tokens that will be joined together only for ease
# of line splitting. In commands, {tokens} will be interpolated with
# configuration values, see :func:`compress`.
STEP_ARGV: Dict[CompressionStep, List[str]] = {
CompressionStep.MPH: [
"{java}",
"it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction",
"--byte-array",
"--temp-dir",
"{tmp_dir}",
"{out_dir}/{graph_name}.mph",
"<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )",
],
# use process substitution (and hence FIFO) above as MPH class load the
# entire file in memory when reading from stdin
CompressionStep.BV: [
"zstdcat",
"{in_dir}/{graph_name}.edges.csv.zst",
"|",
"cut -d' ' -f1,2",
"|",
"{java}",
"it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph",
"--byte-array",
"--temp-dir",
"{tmp_dir}",
"--function",
"{out_dir}/{graph_name}.mph",
"{out_dir}/{graph_name}-base",
],
CompressionStep.BFS: [
"{java}",
"it.unimi.dsi.law.big.graph.BFS",
"{out_dir}/{graph_name}-base",
"{out_dir}/{graph_name}-bfs.order",
],
CompressionStep.PERMUTE_BFS: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"mapOffline",
"{out_dir}/{graph_name}-base",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs.order",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.TRANSPOSE_BFS: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"transposeOffline",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs-transposed",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.SIMPLIFY: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"simplify",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}-bfs-transposed",
"{out_dir}/{graph_name}-bfs-simplified",
],
CompressionStep.LLP: [
"{java}",
"it.unimi.dsi.law.big.graph.LayeredLabelPropagation",
"-g",
"{llp_gammas}",
"{out_dir}/{graph_name}-bfs-simplified",
"{out_dir}/{graph_name}-llp.order",
],
CompressionStep.PERMUTE_LLP: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"mapOffline",
"{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}",
"{out_dir}/{graph_name}-llp.order",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.BVGraph",
"--list",
"{out_dir}/{graph_name}",
],
CompressionStep.COMPOSE_ORDERS: [
"{java}",
"org.softwareheritage.graph.compress.ComposePermutations",
"{out_dir}/{graph_name}-bfs.order",
"{out_dir}/{graph_name}-llp.order",
"{out_dir}/{graph_name}.order",
],
CompressionStep.STATS: [
"{java}",
"it.unimi.dsi.big.webgraph.Stats",
"{out_dir}/{graph_name}",
],
CompressionStep.TRANSPOSE: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"transposeOffline",
"{out_dir}/{graph_name}",
"{out_dir}/{graph_name}-transposed",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.TRANSPOSE_OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.BVGraph",
"--list",
"{out_dir}/{graph_name}-transposed",
],
CompressionStep.MAPS: [
"zstdcat",
"{in_dir}/{graph_name}.nodes.csv.zst",
"|",
"{java}",
- "org.softwareheritage.graph.maps.NodeMapBuilder",
+ "org.softwareheritage.graph.compress.NodeMapBuilder",
"{out_dir}/{graph_name}",
"{tmp_dir}",
],
CompressionStep.CLEAN_TMP: [
"rm",
"-rf",
"{out_dir}/{graph_name}-base.graph",
"{out_dir}/{graph_name}-base.offsets",
"{out_dir}/{graph_name}-base.properties",
"{out_dir}/{graph_name}-bfs-simplified.graph",
"{out_dir}/{graph_name}-bfs-simplified.offsets",
"{out_dir}/{graph_name}-bfs-simplified.properties",
"{out_dir}/{graph_name}-bfs-transposed.graph",
"{out_dir}/{graph_name}-bfs-transposed.offsets",
"{out_dir}/{graph_name}-bfs-transposed.properties",
"{out_dir}/{graph_name}-bfs.graph",
"{out_dir}/{graph_name}-bfs.offsets",
"{out_dir}/{graph_name}-bfs.order",
"{out_dir}/{graph_name}-bfs.properties",
"{out_dir}/{graph_name}-llp.order",
"{tmp_dir}",
],
}
def do_step(step, conf):
cmd = " ".join(STEP_ARGV[step]).format(**conf)
cmd_env = os.environ.copy()
cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"]
cmd_env["CLASSPATH"] = conf["classpath"]
logging.info(f"running: {cmd}")
process = subprocess.Popen(
["/bin/bash", "-c", cmd],
env=cmd_env,
encoding="utf8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
with process.stdout as stdout:
for line in stdout:
logging.info(line.rstrip())
rc = process.wait()
if rc != 0:
raise RuntimeError(
f"compression step {step} returned non-zero " f"exit code {rc}"
)
else:
return rc
def compress(
graph_name: str,
in_dir: Path,
out_dir: Path,
steps: Set[CompressionStep] = set(COMP_SEQ),
conf: Dict[str, str] = {},
):
"""graph compression pipeline driver from nodes/edges files to compressed
on-disk representation
Args:
graph_name: graph base name, relative to in_dir
in_dir: input directory, where the uncompressed graph can be found
out_dir: output directory, where the compressed graph will be stored
steps: compression steps to run (default: all steps)
conf: compression configuration, supporting the following keys (all are
optional, so an empty configuration is fine and is the default)
- batch_size: batch size for `WebGraph transformations
<http://webgraph.di.unimi.it/docs/it/unimi/dsi/webgraph/Transform.html>`_;
defaults to 1 billion
- classpath: java classpath, defaults to swh-graph JAR only
- java: command to run java VM, defaults to "java"
- java_tool_options: value for JAVA_TOOL_OPTIONS environment
variable; defaults to various settings for high memory machines
- logback: path to a logback.xml configuration file; if not provided
a temporary one will be created and used
- max_ram: maximum RAM to use for compression; defaults to available
virtual memory
- tmp_dir: temporary directory, defaults to the "tmp" subdir of
out_dir
"""
if not steps:
steps = set(COMP_SEQ)
conf = check_config_compress(conf, graph_name, in_dir, out_dir)
compression_start_time = datetime.now()
logging.info(f"starting compression at {compression_start_time}")
seq_no = 0
for step in COMP_SEQ:
if step not in steps:
logging.debug(f"skipping compression step {step}")
continue
seq_no += 1
step_start_time = datetime.now()
logging.info(
f"starting compression step {step} "
f"({seq_no}/{len(steps)}) at {step_start_time}"
)
do_step(step, conf)
step_end_time = datetime.now()
step_duration = step_end_time - step_start_time
logging.info(
f"completed compression step {step} "
f"({seq_no}/{len(steps)}) "
f"at {step_end_time} in {step_duration}"
)
compression_end_time = datetime.now()
compression_duration = compression_end_time - compression_start_time
logging.info(f"completed compression in {compression_duration}")
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:23 PM (6 d, 6 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3341473
Attached To
rDGRPH Compressed graph representation
Event Timeline
Log In to Comment