diff --git a/java/pom.xml b/java/pom.xml
index 0fa2200..405ce93 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -1,403 +1,403 @@
4.0.0
org.softwareheritage.graph
swh-graph
${git.closest.tag.name}
swh-graph
https://forge.softwareheritage.org/source/swh-graph/
UTF-8
11
3.21.1
1.47.0
ch.qos.logback
logback-classic
1.2.3
org.junit.jupiter
junit-jupiter-api
5.7.0
test
org.junit.jupiter
junit-jupiter-engine
5.7.0
test
org.slf4j
slf4j-simple
1.7.26
it.unimi.dsi
webgraph-big
- 3.6.7
+ 3.7.0
it.unimi.dsi
fastutil
8.5.8
it.unimi.dsi
dsiutils
- 2.7.1
+ 2.7.2
it.unimi.dsi
sux4j
5.3.1
it.unimi.dsi
law
2.7.2
org.apache.hadoop
hadoop-common
org.umlgraph
umlgraph
org.eclipse.jetty.aggregate
jetty-all
it.unimi.di
mg4j
it.unimi.di
mg4j-big
com.martiansoftware
jsap
2.1
commons-codec
commons-codec
1.15
com.github.luben
zstd-jni
1.5.1-1
org.apache.orc
orc-core
1.7.1
org.apache.hadoop
hadoop-common
3.3.1
org.apache.hadoop
hadoop-client-runtime
3.3.1
com.google.protobuf
protobuf-java
${protobuf.version}
io.grpc
grpc-netty-shaded
${grpc.version}
io.grpc
grpc-protobuf
${grpc.version}
io.grpc
grpc-stub
${grpc.version}
io.grpc
grpc-services
${grpc.version}
io.grpc
grpc-testing
${grpc.version}
javax.annotation
javax.annotation-api
1.3.2
com.google.protobuf
protobuf-java-util
${protobuf.version}
maven-clean-plugin
3.1.0
maven-resources-plugin
3.0.2
maven-compiler-plugin
3.8.0
11
11
-verbose
-Xlint:all
maven-surefire-plugin
2.22.2
maven-failsafe-plugin
2.22.2
maven-jar-plugin
3.0.2
maven-install-plugin
2.5.2
maven-deploy-plugin
2.8.2
maven-site-plugin
3.7.1
maven-project-info-reports-plugin
3.0.0
maven-dependency-plugin
3.1.2
maven-assembly-plugin
3.3.0
org.softwareheritage.graph.rpc.GraphServer
jar-with-dependencies
false
make-assembly
package
single
com.diffplug.spotless
spotless-maven-plugin
2.22.1
*.md
.gitignore
true
4
4.16.0
.coding-style.xml
pl.project13.maven
git-commit-id-plugin
3.0.1
get-the-git-infos
revision
initialize
true
true
true
true
v*
git.closest.tag.name
^v
true
maven-source-plugin
2.1.1
bundle-sources
package
jar-no-fork
test-jar-no-fork
org.apache.maven.plugins
maven-javadoc-plugin
3.3.1
resource-bundles
package
resource-bundle
test-resource-bundle
false
javadoc-jar
package
jar
true
it.unimi.dsi:webgraph-big:*
https://webgraph.di.unimi.it/docs-big/
https://dsiutils.di.unimi.it/docs/
https://fastutil.di.unimi.it/docs/
https://law.di.unimi.it/software/law-docs/
implSpec
a
Implementation Requirements:
implNote
a
Implementation Note:
org.xolstice.maven.plugins
protobuf-maven-plugin
0.6.1
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
grpc-java
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
compile
compile-custom
test-compile
test-compile-custom
kr.motd.maven
os-maven-plugin
1.6.2
diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java
index b7d19d9..64acfba 100644
--- a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java
+++ b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java
@@ -1,294 +1,293 @@
package org.softwareheritage.graph.rpc;
import com.google.protobuf.FieldMask;
import com.martiansoftware.jsap.*;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
import io.grpc.stub.StreamObserver;
import io.grpc.protobuf.services.ProtoReflectionService;
import it.unimi.dsi.logging.ProgressLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.softwareheritage.graph.SWHID;
import org.softwareheritage.graph.SwhBidirectionalGraph;
import org.softwareheritage.graph.compress.LabelMapBuilder;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
*/
public class GraphServer {
private final static Logger logger = LoggerFactory.getLogger(GraphServer.class);
private final SwhBidirectionalGraph graph;
private final int port;
private final int threads;
private Server server;
/**
* @param graphBasename the basename of the SWH graph to load
* @param port the port on which the GRPC server will listen
* @param threads the number of threads to use in the server threadpool
*/
public GraphServer(String graphBasename, int port, int threads) throws IOException {
this.graph = loadGraph(graphBasename);
this.port = port;
this.threads = threads;
}
/** Load a graph and all its properties. */
public static SwhBidirectionalGraph loadGraph(String basename) throws IOException {
- // TODO: use loadLabelledMapped() when https://github.com/vigna/webgraph-big/pull/5 is merged
- SwhBidirectionalGraph g = SwhBidirectionalGraph.loadLabelled(basename, new ProgressLogger(logger));
+ SwhBidirectionalGraph g = SwhBidirectionalGraph.loadLabelledMapped(basename, new ProgressLogger(logger));
g.loadContentLength();
g.loadContentIsSkipped();
g.loadPersonIds();
g.loadAuthorTimestamps();
g.loadCommitterTimestamps();
g.loadMessages();
g.loadTagNames();
g.loadLabelNames();
return g;
}
/** Start the RPC server. */
private void start() throws IOException {
server = NettyServerBuilder.forPort(port).withChildOption(ChannelOption.SO_REUSEADDR, true)
.executor(Executors.newFixedThreadPool(threads)).addService(new TraversalService(graph))
.addService(ProtoReflectionService.newInstance()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
GraphServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}));
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
private static JSAPResult parseArgs(String[] args) {
JSAPResult config = null;
try {
SimpleJSAP jsap = new SimpleJSAP(LabelMapBuilder.class.getName(), "",
new Parameter[]{
new FlaggedOption("port", JSAP.INTEGER_PARSER, "50091", JSAP.NOT_REQUIRED, 'p', "port",
"The port on which the server should listen."),
new FlaggedOption("threads", JSAP.INTEGER_PARSER, "0", JSAP.NOT_REQUIRED, 't', "threads",
"The number of concurrent threads. 0 = number of cores."),
new UnflaggedOption("graphBasename", JSAP.STRING_PARSER, JSAP.REQUIRED,
"Basename of the output graph")});
config = jsap.parse(args);
if (jsap.messagePrinted()) {
System.exit(1);
}
} catch (JSAPException e) {
e.printStackTrace();
}
return config;
}
/** Main launches the server from the command line. */
public static void main(String[] args) throws IOException, InterruptedException {
JSAPResult config = parseArgs(args);
String graphBasename = config.getString("graphBasename");
int port = config.getInt("port");
int threads = config.getInt("threads");
if (threads == 0) {
threads = Runtime.getRuntime().availableProcessors();
}
final GraphServer server = new GraphServer(graphBasename, port, threads);
server.start();
server.blockUntilShutdown();
}
/** Implementation of the Traversal service, which contains all the graph querying endpoints. */
static class TraversalService extends TraversalServiceGrpc.TraversalServiceImplBase {
SwhBidirectionalGraph graph;
public TraversalService(SwhBidirectionalGraph graph) {
this.graph = graph;
}
/** Return various statistics on the overall graph. */
@Override
public void stats(StatsRequest request, StreamObserver responseObserver) {
StatsResponse.Builder response = StatsResponse.newBuilder();
response.setNumNodes(graph.numNodes());
response.setNumEdges(graph.numArcs());
Properties properties = new Properties();
try {
properties.load(new FileInputStream(graph.getPath() + ".properties"));
properties.load(new FileInputStream(graph.getPath() + ".stats"));
} catch (IOException e) {
throw new RuntimeException(e);
}
response.setCompressionRatio(Double.parseDouble(properties.getProperty("compratio")));
response.setBitsPerNode(Double.parseDouble(properties.getProperty("bitspernode")));
response.setBitsPerEdge(Double.parseDouble(properties.getProperty("bitsperlink")));
response.setAvgLocality(Double.parseDouble(properties.getProperty("avglocality")));
response.setIndegreeMin(Long.parseLong(properties.getProperty("minindegree")));
response.setIndegreeMax(Long.parseLong(properties.getProperty("maxindegree")));
response.setIndegreeAvg(Double.parseDouble(properties.getProperty("avgindegree")));
response.setOutdegreeMin(Long.parseLong(properties.getProperty("minoutdegree")));
response.setOutdegreeMax(Long.parseLong(properties.getProperty("maxoutdegree")));
response.setOutdegreeAvg(Double.parseDouble(properties.getProperty("avgoutdegree")));
responseObserver.onNext(response.build());
responseObserver.onCompleted();
}
/** Return a single node and its properties. */
@Override
public void getNode(GetNodeRequest request, StreamObserver responseObserver) {
SwhBidirectionalGraph g = graph.copy();
long nodeId;
try {
nodeId = g.getNodeId(new SWHID(request.getSwhid()));
} catch (IllegalArgumentException e) {
responseObserver
.onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
return;
}
Node.Builder builder = Node.newBuilder();
NodePropertyBuilder.buildNodeProperties(g.getForwardGraph(), request.hasMask() ? request.getMask() : null,
builder, nodeId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
/** Perform a BFS traversal from a set of source nodes and stream the nodes encountered. */
@Override
public void traverse(TraversalRequest request, StreamObserver responseObserver) {
SwhBidirectionalGraph g = graph.copy();
Traversal.SimpleTraversal t;
try {
t = new Traversal.SimpleTraversal(g, request, responseObserver::onNext);
} catch (IllegalArgumentException e) {
responseObserver
.onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
return;
}
t.visit();
responseObserver.onCompleted();
}
/**
* Find the shortest path between a set of source nodes and a node that matches a given criteria
* using a BFS.
*/
@Override
public void findPathTo(FindPathToRequest request, StreamObserver responseObserver) {
SwhBidirectionalGraph g = graph.copy();
Traversal.FindPathTo t;
try {
t = new Traversal.FindPathTo(g, request);
} catch (IllegalArgumentException e) {
responseObserver
.onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
return;
}
t.visit();
Path path = t.getPath();
if (path == null) {
responseObserver.onError(Status.NOT_FOUND.asException());
} else {
responseObserver.onNext(path);
responseObserver.onCompleted();
}
}
/**
* Find the shortest path between a set of source nodes and a set of destination nodes using a
* bidirectional BFS.
*/
@Override
public void findPathBetween(FindPathBetweenRequest request, StreamObserver responseObserver) {
SwhBidirectionalGraph g = graph.copy();
Traversal.FindPathBetween t;
try {
t = new Traversal.FindPathBetween(g, request);
} catch (IllegalArgumentException e) {
responseObserver
.onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
return;
}
t.visit();
Path path = t.getPath();
if (path == null) {
responseObserver.onError(Status.NOT_FOUND.asException());
} else {
responseObserver.onNext(path);
responseObserver.onCompleted();
}
}
/** Return the number of nodes traversed by a BFS traversal. */
@Override
public void countNodes(TraversalRequest request, StreamObserver responseObserver) {
AtomicLong count = new AtomicLong(0);
SwhBidirectionalGraph g = graph.copy();
TraversalRequest fixedReq = TraversalRequest.newBuilder(request)
// Ignore return fields, just count nodes
.setMask(FieldMask.getDefaultInstance()).build();
Traversal.SimpleTraversal t;
try {
t = new Traversal.SimpleTraversal(g, fixedReq, n -> count.incrementAndGet());
} catch (IllegalArgumentException e) {
responseObserver
.onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
return;
}
t.visit();
CountResponse response = CountResponse.newBuilder().setCount(count.get()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/** Return the number of edges traversed by a BFS traversal. */
@Override
public void countEdges(TraversalRequest request, StreamObserver responseObserver) {
AtomicLong count = new AtomicLong(0);
SwhBidirectionalGraph g = graph.copy();
TraversalRequest fixedReq = TraversalRequest.newBuilder(request)
// Force return empty successors to count the edges
.setMask(FieldMask.newBuilder().addPaths("num_successors").build()).build();
Traversal.SimpleTraversal t;
try {
t = new Traversal.SimpleTraversal(g, fixedReq, n -> count.addAndGet(n.getNumSuccessors()));
} catch (IllegalArgumentException e) {
responseObserver
.onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
return;
}
t.visit();
CountResponse response = CountResponse.newBuilder().setCount(count.get()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
}