diff --git a/java/pom.xml b/java/pom.xml
index 9322d6a..234b3b9 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -1,361 +1,420 @@
4.0.0
org.softwareheritage.graph
swh-graph
${git.closest.tag.name}
swh-graph
https://forge.softwareheritage.org/source/swh-graph/
UTF-8
11
+ 3.20.1
+ 1.46.0
ch.qos.logback
logback-classic
1.2.3
org.junit.jupiter
junit-jupiter-api
5.7.0
test
org.junit.vintage
junit-vintage-engine
5.7.0
junit
junit
4.12
org.junit.jupiter
junit-jupiter-engine
5.7.0
test
org.hamcrest
hamcrest
2.2
test
io.javalin
javalin
3.0.0
org.slf4j
slf4j-simple
1.7.26
com.fasterxml.jackson.core
jackson-databind
2.13.0
it.unimi.dsi
webgraph-big
3.6.7
it.unimi.dsi
fastutil
8.5.8
it.unimi.dsi
dsiutils
2.7.1
it.unimi.dsi
sux4j
5.3.1
it.unimi.dsi
law
2.7.2
org.apache.hadoop
hadoop-common
org.umlgraph
umlgraph
org.eclipse.jetty.aggregate
jetty-all
it.unimi.di
mg4j
it.unimi.di
mg4j-big
com.martiansoftware
jsap
2.1
net.sf.py4j
py4j
0.10.9.3
commons-codec
commons-codec
1.15
com.github.luben
zstd-jni
1.5.1-1
org.apache.orc
orc-core
1.7.1
org.apache.hadoop
hadoop-common
3.3.1
org.apache.hadoop
hadoop-client-runtime
3.3.1
+
+ com.google.protobuf
+ protobuf-java
+ ${protobuf.version}
+
+
+ io.grpc
+ grpc-netty-shaded
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-services
+ ${grpc.version}
+
+
+ javax.annotation
+ javax.annotation-api
+ 1.3.2
+
maven-clean-plugin
3.1.0
maven-resources-plugin
3.0.2
maven-compiler-plugin
3.8.0
11
11
-verbose
-Xlint:all
maven-surefire-plugin
2.22.2
maven-failsafe-plugin
2.22.2
maven-jar-plugin
3.0.2
maven-install-plugin
2.5.2
maven-deploy-plugin
2.8.2
maven-site-plugin
3.7.1
maven-project-info-reports-plugin
3.0.0
maven-assembly-plugin
3.3.0
org.softwareheritage.graph.server.App
jar-with-dependencies
false
make-assembly
package
single
com.diffplug.spotless
spotless-maven-plugin
2.22.1
*.md
.gitignore
true
4
4.16.0
.coding-style.xml
pl.project13.maven
git-commit-id-plugin
3.0.1
get-the-git-infos
revision
initialize
true
true
true
true
v*
git.closest.tag.name
^v
true
maven-source-plugin
2.1.1
bundle-sources
package
jar-no-fork
test-jar-no-fork
org.apache.maven.plugins
maven-javadoc-plugin
3.3.1
resource-bundles
package
resource-bundle
test-resource-bundle
false
javadoc-jar
package
jar
true
it.unimi.dsi:webgraph-big:*
https://webgraph.di.unimi.it/docs-big/
https://dsiutils.di.unimi.it/docs/
https://fastutil.di.unimi.it/docs/
https://law.di.unimi.it/software/law-docs/
implSpec
a
Implementation Requirements:
implNote
a
Implementation Note:
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+ com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+
+
+
+ compile
+ compile-custom
+ test-compile
+ test-compile-custom
+
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.6.2
+
+
diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java
new file mode 100644
index 0000000..502e4ac
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java
@@ -0,0 +1,157 @@
+package org.softwareheritage.graph.rpc;
+
+import com.martiansoftware.jsap.*;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.protobuf.services.ProtoReflectionService;
+import it.unimi.dsi.logging.ProgressLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.softwareheritage.graph.SwhBidirectionalGraph;
+import org.softwareheritage.graph.compress.LabelMapBuilder;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Server that manages startup/shutdown of a {@code Greeter} server.
+ */
+public class GraphServer {
+ private final static Logger logger = LoggerFactory.getLogger(GraphServer.class);
+
+ private final SwhBidirectionalGraph graph;
+ private final int port;
+ private final int threads;
+ private Server server;
+
+ public GraphServer(String graphBasename, int port, int threads) throws IOException {
+ this.graph = SwhBidirectionalGraph.loadLabelledMapped(graphBasename, new ProgressLogger(logger));
+ this.port = port;
+ this.threads = threads;
+ graph.loadContentLength();
+ graph.loadContentIsSkipped();
+ graph.loadPersonIds();
+ graph.loadAuthorTimestamps();
+ graph.loadCommitterTimestamps();
+ graph.loadMessages();
+ graph.loadTagNames();
+ graph.loadLabelNames();
+ }
+
+ private void start() throws IOException {
+ server = ServerBuilder.forPort(port).executor(Executors.newFixedThreadPool(threads))
+ .addService(new TraversalService(graph)).addService(ProtoReflectionService.newInstance()).build()
+ .start();
+ logger.info("Server started, listening on " + port);
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ GraphServer.this.stop();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ }
+ }));
+ }
+
+ private void stop() throws InterruptedException {
+ if (server != null) {
+ server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses daemon threads.
+ */
+ private void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ private static JSAPResult parseArgs(String[] args) {
+ JSAPResult config = null;
+ try {
+ SimpleJSAP jsap = new SimpleJSAP(LabelMapBuilder.class.getName(), "",
+ new Parameter[]{
+ new FlaggedOption("port", JSAP.INTEGER_PARSER, "50091", JSAP.NOT_REQUIRED, 'p', "port",
+ "The port on which the server should listen."),
+ new FlaggedOption("threads", JSAP.INTEGER_PARSER, "1", JSAP.NOT_REQUIRED, 't', "threads",
+ "The number of concurrent threads. 0 = number of cores."),
+ new UnflaggedOption("graphBasename", JSAP.STRING_PARSER, JSAP.REQUIRED,
+ "Basename of the output graph")});
+
+ config = jsap.parse(args);
+ if (jsap.messagePrinted()) {
+ System.exit(1);
+ }
+ } catch (JSAPException e) {
+ e.printStackTrace();
+ }
+ return config;
+ }
+
+ /**
+ * Main launches the server from the command line.
+ */
+ public static void main(String[] args) throws IOException, InterruptedException {
+ JSAPResult config = parseArgs(args);
+ String graphBasename = config.getString("graphBasename");
+ int port = config.getInt("port");
+ int threads = config.getInt("threads");
+ if (threads == 0) {
+ threads = Runtime.getRuntime().availableProcessors();
+ }
+
+ final GraphServer server = new GraphServer(graphBasename, port, threads);
+ server.start();
+ server.blockUntilShutdown();
+ }
+
+ static class TraversalService extends TraversalServiceGrpc.TraversalServiceImplBase {
+ SwhBidirectionalGraph graph;
+
+ public TraversalService(SwhBidirectionalGraph graph) {
+ this.graph = graph;
+ }
+
+ @Override
+ public void traverse(TraversalRequest request, StreamObserver responseObserver) {
+ SwhBidirectionalGraph g = graph.copy();
+ Traversal.simpleTraversal(g, request, responseObserver::onNext);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void countNodes(TraversalRequest request, StreamObserver responseObserver) {
+ AtomicInteger count = new AtomicInteger(0);
+ SwhBidirectionalGraph g = graph.copy();
+ TraversalRequest fixedReq = TraversalRequest.newBuilder(request)
+ // Ignore return fields, just count nodes
+ .setReturnFields(NodeFields.getDefaultInstance()).build();
+ Traversal.simpleTraversal(g, fixedReq, (Node node) -> {
+ count.incrementAndGet();
+ });
+ CountResponse response = CountResponse.newBuilder().setCount(count.get()).build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void countEdges(TraversalRequest request, StreamObserver responseObserver) {
+ AtomicInteger count = new AtomicInteger(0);
+ SwhBidirectionalGraph g = graph.copy();
+ TraversalRequest fixedReq = TraversalRequest.newBuilder(request)
+ // Force return empty successors to count the edges
+ .setReturnFields(NodeFields.newBuilder().setSuccessor(true).setSuccessorSwhid(false).build())
+ .build();
+ Traversal.simpleTraversal(g, fixedReq, (Node node) -> {
+ count.addAndGet(node.getSuccessorCount());
+ });
+ CountResponse response = CountResponse.newBuilder().setCount(count.get()).build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
+ }
+}
diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java b/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java
new file mode 100644
index 0000000..e046476
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java
@@ -0,0 +1,275 @@
+package org.softwareheritage.graph.rpc;
+
+import com.google.protobuf.ByteString;
+import it.unimi.dsi.big.webgraph.LazyLongIterator;
+import it.unimi.dsi.big.webgraph.labelling.ArcLabelledNodeIterator;
+import it.unimi.dsi.big.webgraph.labelling.Label;
+import org.softwareheritage.graph.*;
+import org.softwareheritage.graph.labels.DirEntry;
+
+import java.util.*;
+
+public class Traversal {
+ private static LazyLongIterator filterSuccessors(SwhUnidirectionalGraph g, long nodeId, AllowedEdges allowedEdges) {
+ if (allowedEdges.restrictedTo == null) {
+ // All edges are allowed, bypass edge check
+ return g.successors(nodeId);
+ } else {
+ LazyLongIterator allSuccessors = g.successors(nodeId);
+ return new LazyLongIterator() {
+ @Override
+ public long nextLong() {
+ long neighbor;
+ while ((neighbor = allSuccessors.nextLong()) != -1) {
+ if (allowedEdges.isAllowed(g.getNodeType(nodeId), g.getNodeType(neighbor))) {
+ return neighbor;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public long skip(final long n) {
+ long i = 0;
+ while (i < n && nextLong() != -1)
+ i++;
+ return i;
+ }
+ };
+ }
+ }
+
+ private static ArcLabelledNodeIterator.LabelledArcIterator filterLabelledSuccessors(SwhUnidirectionalGraph g,
+ long nodeId, AllowedEdges allowedEdges) {
+ if (allowedEdges.restrictedTo == null) {
+ // All edges are allowed, bypass edge check
+ return g.labelledSuccessors(nodeId);
+ } else {
+ ArcLabelledNodeIterator.LabelledArcIterator allSuccessors = g.labelledSuccessors(nodeId);
+ return new ArcLabelledNodeIterator.LabelledArcIterator() {
+ @Override
+ public Label label() {
+ return allSuccessors.label();
+ }
+
+ @Override
+ public long nextLong() {
+ long neighbor;
+ while ((neighbor = allSuccessors.nextLong()) != -1) {
+ if (allowedEdges.isAllowed(g.getNodeType(nodeId), g.getNodeType(neighbor))) {
+ return neighbor;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public long skip(final long n) {
+ long i = 0;
+ while (i < n && nextLong() != -1)
+ i++;
+ return i;
+ }
+ };
+ }
+ }
+
+ private static class NodeFilterChecker {
+ private final SwhUnidirectionalGraph g;
+ private final NodeFilter filter;
+ private final AllowedNodes allowedNodes;
+
+ private NodeFilterChecker(SwhUnidirectionalGraph graph, NodeFilter filter) {
+ this.g = graph;
+ this.filter = filter;
+ this.allowedNodes = new AllowedNodes(filter.hasTypes() ? filter.getTypes() : "*");
+ }
+
+ public boolean allowed(long nodeId) {
+ if (filter == null) {
+ return true;
+ }
+ if (!this.allowedNodes.isAllowed(g.getNodeType(nodeId))) {
+ return false;
+ }
+
+ long outdegree = g.outdegree(nodeId);
+ if (filter.hasMinTraversalSuccessors() && outdegree < filter.getMinTraversalSuccessors()) {
+ return false;
+ }
+ if (filter.hasMaxTraversalSuccessors() && outdegree > filter.getMaxTraversalSuccessors()) {
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ public static SwhUnidirectionalGraph getDirectedGraph(SwhBidirectionalGraph g, TraversalRequest request) {
+ switch (request.getDirection()) {
+ case FORWARD:
+ return g.getForwardGraph();
+ case BACKWARD:
+ return g.getBackwardGraph();
+ case BOTH:
+ return new SwhUnidirectionalGraph(g.symmetrize(), g.getProperties());
+ }
+ throw new IllegalArgumentException("Unknown direction: " + request.getDirection());
+ }
+
+ public static void simpleTraversal(SwhBidirectionalGraph bidirectionalGraph, TraversalRequest request,
+ NodeObserver nodeObserver) {
+ SwhUnidirectionalGraph g = getDirectedGraph(bidirectionalGraph, request);
+ NodeFilterChecker nodeReturnChecker = new NodeFilterChecker(g, request.getReturnNodes());
+
+ AllowedEdges allowedEdges = new AllowedEdges(request.hasEdges() ? request.getEdges() : "*");
+
+ Queue queue = new ArrayDeque<>();
+ HashSet visited = new HashSet<>();
+ request.getSrcList().forEach(srcSwhid -> {
+ long srcNodeId = g.getNodeId(new SWHID(srcSwhid));
+ queue.add(srcNodeId);
+ visited.add(srcNodeId);
+ });
+ queue.add(-1L); // Depth sentinel
+
+ long edgesAccessed = 0;
+ long currentDepth = 0;
+ while (!queue.isEmpty()) {
+ long curr = queue.poll();
+ if (curr == -1L) {
+ ++currentDepth;
+ if (!queue.isEmpty()) {
+ queue.add(-1L);
+ }
+ continue;
+ }
+ if (request.hasMaxDepth() && currentDepth > request.getMaxDepth()) {
+ break;
+ }
+ edgesAccessed += g.outdegree(curr);
+ if (request.hasMaxEdges() && edgesAccessed >= request.getMaxEdges()) {
+ break;
+ }
+
+ Node.Builder nodeBuilder = null;
+ if (nodeReturnChecker.allowed(curr)) {
+ nodeBuilder = Node.newBuilder();
+ buildNodeProperties(g, request.getReturnFields(), nodeBuilder, curr);
+ }
+
+ ArcLabelledNodeIterator.LabelledArcIterator it = filterLabelledSuccessors(g, curr, allowedEdges);
+ for (long succ; (succ = it.nextLong()) != -1;) {
+ if (!visited.contains(succ)) {
+ queue.add(succ);
+ visited.add(succ);
+ }
+ buildSuccessorProperties(g, request.getReturnFields(), nodeBuilder, curr, succ, it.label());
+ }
+ if (nodeBuilder != null) {
+ nodeObserver.onNext(nodeBuilder.build());
+ }
+ }
+ }
+
+ private static void buildNodeProperties(SwhUnidirectionalGraph graph, NodeFields fields, Node.Builder nodeBuilder,
+ long node) {
+ if (fields == null || !fields.hasSwhid() || fields.getSwhid()) {
+ nodeBuilder.setSwhid(graph.getSWHID(node).toString());
+ }
+ if (fields == null) {
+ return;
+ }
+
+ switch (graph.getNodeType(node)) {
+ case CNT:
+ if (fields.hasCntLength()) {
+ nodeBuilder.setCntLength(graph.getContentLength(node));
+ }
+ if (fields.hasCntIsSkipped()) {
+ nodeBuilder.setCntIsSkipped(graph.isContentSkipped(node));
+ }
+ break;
+ case REV:
+ if (fields.getRevAuthor()) {
+ nodeBuilder.setRevAuthor(graph.getAuthorId(node));
+ }
+ if (fields.getRevCommitter()) {
+ nodeBuilder.setRevAuthor(graph.getCommitterId(node));
+ }
+ if (fields.getRevAuthorDate()) {
+ nodeBuilder.setRevAuthorDate(graph.getAuthorTimestamp(node));
+ }
+ if (fields.getRevAuthorDateOffset()) {
+ nodeBuilder.setRevAuthorDateOffset(graph.getAuthorTimestampOffset(node));
+ }
+ if (fields.getRevCommitterDate()) {
+ nodeBuilder.setRevCommitterDate(graph.getCommitterTimestamp(node));
+ }
+ if (fields.getRevCommitterDateOffset()) {
+ nodeBuilder.setRevCommitterDateOffset(graph.getCommitterTimestampOffset(node));
+ }
+ if (fields.getRevMessage()) {
+ byte[] msg = graph.getMessage(node);
+ if (msg != null) {
+ nodeBuilder.setRevMessage(ByteString.copyFrom(msg));
+ }
+ }
+ break;
+ case REL:
+ if (fields.getRelAuthor()) {
+ nodeBuilder.setRelAuthor(graph.getAuthorId(node));
+ }
+ if (fields.getRelAuthorDate()) {
+ nodeBuilder.setRelAuthorDate(graph.getAuthorTimestamp(node));
+ }
+ if (fields.getRelAuthorDateOffset()) {
+ nodeBuilder.setRelAuthorDateOffset(graph.getAuthorTimestampOffset(node));
+ }
+ if (fields.getRelName()) {
+ byte[] msg = graph.getTagName(node);
+ if (msg != null) {
+ nodeBuilder.setRelName(ByteString.copyFrom(msg));
+ }
+ }
+ if (fields.getRelMessage()) {
+ byte[] msg = graph.getMessage(node);
+ if (msg != null) {
+ nodeBuilder.setRelMessage(ByteString.copyFrom(msg));
+ }
+ }
+ break;
+ case ORI:
+ if (fields.getOriUrl()) {
+ String url = graph.getUrl(node);
+ if (url != null) {
+ nodeBuilder.setOriUrl(url);
+ }
+ }
+ }
+ }
+
+ private static void buildSuccessorProperties(SwhUnidirectionalGraph graph, NodeFields fields,
+ Node.Builder nodeBuilder, long src, long dst, Label label) {
+ if (nodeBuilder != null && fields != null && fields.getSuccessor()) {
+ Successor.Builder successorBuilder = Successor.newBuilder();
+ if (!fields.hasSuccessorSwhid() || fields.getSuccessorSwhid()) {
+ successorBuilder.setSwhid(graph.getSWHID(dst).toString());
+ }
+ if (fields.getSuccessorLabel()) {
+ DirEntry[] entries = (DirEntry[]) label.get();
+ for (DirEntry entry : entries) {
+ EdgeLabel.Builder builder = EdgeLabel.newBuilder();
+ builder.setName(ByteString.copyFrom(graph.getLabelName(entry.filenameId)));
+ builder.setPermission(entry.permission);
+ successorBuilder.addLabel(builder.build());
+ }
+ }
+ nodeBuilder.addSuccessor(successorBuilder.build());
+ }
+ }
+
+ public interface NodeObserver {
+ void onNext(Node nodeId);
+ }
+}
diff --git a/proto/swhgraph.proto b/proto/swhgraph.proto
new file mode 100644
index 0000000..0986234
--- /dev/null
+++ b/proto/swhgraph.proto
@@ -0,0 +1,102 @@
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.softwareheritage.graph.rpc";
+option java_outer_classname = "GraphService";
+
+package swh.graph;
+
+service TraversalService {
+ rpc Traverse (TraversalRequest) returns (stream Node);
+ rpc CountNodes (TraversalRequest) returns (CountResponse);
+ rpc CountEdges (TraversalRequest) returns (CountResponse);
+}
+
+enum GraphDirection {
+ FORWARD = 0;
+ BACKWARD = 1;
+ BOTH = 2;
+}
+
+message TraversalRequest {
+ repeated string src = 1;
+
+ // Traversal options
+ optional GraphDirection direction = 2;
+ optional string edges = 3;
+ optional int64 max_edges = 4;
+ optional int64 max_depth = 5;
+ optional NodeFilter return_nodes = 6;
+ optional NodeFields return_fields = 7;
+}
+
+message NodeFilter {
+ optional string types = 1;
+ optional int64 min_traversal_successors = 2;
+ optional int64 max_traversal_successors = 3;
+}
+
+message NodeFields {
+ optional bool swhid = 1;
+
+ optional bool successor = 2;
+ optional bool successor_swhid = 3;
+ optional bool successor_label = 4;
+
+ optional bool cnt_length = 5;
+ optional bool cnt_is_skipped = 6;
+
+ optional bool rev_author = 7;
+ optional bool rev_author_date = 8;
+ optional bool rev_author_date_offset = 9;
+ optional bool rev_committer = 10;
+ optional bool rev_committer_date = 11;
+ optional bool rev_committer_date_offset = 12;
+ optional bool rev_message = 13;
+
+ optional bool rel_author = 14;
+ optional bool rel_author_date = 15;
+ optional bool rel_author_date_offset = 16;
+ optional bool rel_name = 17;
+ optional bool rel_message = 18;
+
+ optional bool ori_url = 19;
+}
+
+message Node {
+ string swhid = 1;
+ repeated Successor successor = 2;
+
+ optional int64 cnt_length = 3;
+ optional bool cnt_is_skipped = 4;
+
+ optional int64 rev_author = 5;
+ optional int64 rev_author_date = 6;
+ optional int32 rev_author_date_offset = 7;
+ optional int64 rev_committer = 8;
+ optional int64 rev_committer_date = 9;
+ optional int32 rev_committer_date_offset = 10;
+ optional bytes rev_message = 11;
+
+ optional int64 rel_author = 12;
+ optional int64 rel_author_date = 13;
+ optional int32 rel_author_date_offset = 14;
+ optional bytes rel_name = 15;
+ optional bytes rel_message = 16;
+
+ optional string ori_url = 17;
+}
+
+message Successor {
+ optional string swhid = 1;
+ repeated EdgeLabel label = 2;
+}
+
+message EdgeLabel {
+ bytes name = 1;
+ int32 permission = 2;
+}
+
+message CountResponse {
+ int64 count = 1;
+}