diff --git a/java/pom.xml b/java/pom.xml index 0fa2200..405ce93 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -1,403 +1,403 @@ 4.0.0 org.softwareheritage.graph swh-graph ${git.closest.tag.name} swh-graph https://forge.softwareheritage.org/source/swh-graph/ UTF-8 11 3.21.1 1.47.0 ch.qos.logback logback-classic 1.2.3 org.junit.jupiter junit-jupiter-api 5.7.0 test org.junit.jupiter junit-jupiter-engine 5.7.0 test org.slf4j slf4j-simple 1.7.26 it.unimi.dsi webgraph-big - 3.6.7 + 3.7.0 it.unimi.dsi fastutil 8.5.8 it.unimi.dsi dsiutils - 2.7.1 + 2.7.2 it.unimi.dsi sux4j 5.3.1 it.unimi.dsi law 2.7.2 org.apache.hadoop hadoop-common org.umlgraph umlgraph org.eclipse.jetty.aggregate jetty-all it.unimi.di mg4j it.unimi.di mg4j-big com.martiansoftware jsap 2.1 commons-codec commons-codec 1.15 com.github.luben zstd-jni 1.5.1-1 org.apache.orc orc-core 1.7.1 org.apache.hadoop hadoop-common 3.3.1 org.apache.hadoop hadoop-client-runtime 3.3.1 com.google.protobuf protobuf-java ${protobuf.version} io.grpc grpc-netty-shaded ${grpc.version} io.grpc grpc-protobuf ${grpc.version} io.grpc grpc-stub ${grpc.version} io.grpc grpc-services ${grpc.version} io.grpc grpc-testing ${grpc.version} javax.annotation javax.annotation-api 1.3.2 com.google.protobuf protobuf-java-util ${protobuf.version} maven-clean-plugin 3.1.0 maven-resources-plugin 3.0.2 maven-compiler-plugin 3.8.0 11 11 -verbose -Xlint:all maven-surefire-plugin 2.22.2 maven-failsafe-plugin 2.22.2 maven-jar-plugin 3.0.2 maven-install-plugin 2.5.2 maven-deploy-plugin 2.8.2 maven-site-plugin 3.7.1 maven-project-info-reports-plugin 3.0.0 maven-dependency-plugin 3.1.2 maven-assembly-plugin 3.3.0 org.softwareheritage.graph.rpc.GraphServer jar-with-dependencies false make-assembly package single com.diffplug.spotless spotless-maven-plugin 2.22.1 *.md .gitignore true 4 4.16.0 .coding-style.xml pl.project13.maven git-commit-id-plugin 3.0.1 get-the-git-infos revision initialize true true true true v* git.closest.tag.name ^v true maven-source-plugin 2.1.1 bundle-sources package jar-no-fork test-jar-no-fork org.apache.maven.plugins maven-javadoc-plugin 3.3.1 resource-bundles package resource-bundle test-resource-bundle false javadoc-jar package jar true it.unimi.dsi:webgraph-big:* https://webgraph.di.unimi.it/docs-big/ https://dsiutils.di.unimi.it/docs/ https://fastutil.di.unimi.it/docs/ https://law.di.unimi.it/software/law-docs/ implSpec a Implementation Requirements: implNote a Implementation Note: org.xolstice.maven.plugins protobuf-maven-plugin 0.6.1 com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} grpc-java io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} compile compile-custom test-compile test-compile-custom kr.motd.maven os-maven-plugin 1.6.2 diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java index b7d19d9..64acfba 100644 --- a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java +++ b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java @@ -1,294 +1,293 @@ package org.softwareheritage.graph.rpc; import com.google.protobuf.FieldMask; import com.martiansoftware.jsap.*; import io.grpc.Server; import io.grpc.Status; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.netty.shaded.io.netty.channel.ChannelOption; import io.grpc.stub.StreamObserver; import io.grpc.protobuf.services.ProtoReflectionService; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.softwareheritage.graph.SWHID; import org.softwareheritage.graph.SwhBidirectionalGraph; import org.softwareheritage.graph.compress.LabelMapBuilder; import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * Server that manages startup/shutdown of a {@code Greeter} server. */ public class GraphServer { private final static Logger logger = LoggerFactory.getLogger(GraphServer.class); private final SwhBidirectionalGraph graph; private final int port; private final int threads; private Server server; /** * @param graphBasename the basename of the SWH graph to load * @param port the port on which the GRPC server will listen * @param threads the number of threads to use in the server threadpool */ public GraphServer(String graphBasename, int port, int threads) throws IOException { this.graph = loadGraph(graphBasename); this.port = port; this.threads = threads; } /** Load a graph and all its properties. */ public static SwhBidirectionalGraph loadGraph(String basename) throws IOException { - // TODO: use loadLabelledMapped() when https://github.com/vigna/webgraph-big/pull/5 is merged - SwhBidirectionalGraph g = SwhBidirectionalGraph.loadLabelled(basename, new ProgressLogger(logger)); + SwhBidirectionalGraph g = SwhBidirectionalGraph.loadLabelledMapped(basename, new ProgressLogger(logger)); g.loadContentLength(); g.loadContentIsSkipped(); g.loadPersonIds(); g.loadAuthorTimestamps(); g.loadCommitterTimestamps(); g.loadMessages(); g.loadTagNames(); g.loadLabelNames(); return g; } /** Start the RPC server. */ private void start() throws IOException { server = NettyServerBuilder.forPort(port).withChildOption(ChannelOption.SO_REUSEADDR, true) .executor(Executors.newFixedThreadPool(threads)).addService(new TraversalService(graph)) .addService(ProtoReflectionService.newInstance()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { GraphServer.this.stop(); } catch (InterruptedException e) { e.printStackTrace(System.err); } })); } private void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } } /** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } private static JSAPResult parseArgs(String[] args) { JSAPResult config = null; try { SimpleJSAP jsap = new SimpleJSAP(LabelMapBuilder.class.getName(), "", new Parameter[]{ new FlaggedOption("port", JSAP.INTEGER_PARSER, "50091", JSAP.NOT_REQUIRED, 'p', "port", "The port on which the server should listen."), new FlaggedOption("threads", JSAP.INTEGER_PARSER, "0", JSAP.NOT_REQUIRED, 't', "threads", "The number of concurrent threads. 0 = number of cores."), new UnflaggedOption("graphBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, "Basename of the output graph")}); config = jsap.parse(args); if (jsap.messagePrinted()) { System.exit(1); } } catch (JSAPException e) { e.printStackTrace(); } return config; } /** Main launches the server from the command line. */ public static void main(String[] args) throws IOException, InterruptedException { JSAPResult config = parseArgs(args); String graphBasename = config.getString("graphBasename"); int port = config.getInt("port"); int threads = config.getInt("threads"); if (threads == 0) { threads = Runtime.getRuntime().availableProcessors(); } final GraphServer server = new GraphServer(graphBasename, port, threads); server.start(); server.blockUntilShutdown(); } /** Implementation of the Traversal service, which contains all the graph querying endpoints. */ static class TraversalService extends TraversalServiceGrpc.TraversalServiceImplBase { SwhBidirectionalGraph graph; public TraversalService(SwhBidirectionalGraph graph) { this.graph = graph; } /** Return various statistics on the overall graph. */ @Override public void stats(StatsRequest request, StreamObserver responseObserver) { StatsResponse.Builder response = StatsResponse.newBuilder(); response.setNumNodes(graph.numNodes()); response.setNumEdges(graph.numArcs()); Properties properties = new Properties(); try { properties.load(new FileInputStream(graph.getPath() + ".properties")); properties.load(new FileInputStream(graph.getPath() + ".stats")); } catch (IOException e) { throw new RuntimeException(e); } response.setCompressionRatio(Double.parseDouble(properties.getProperty("compratio"))); response.setBitsPerNode(Double.parseDouble(properties.getProperty("bitspernode"))); response.setBitsPerEdge(Double.parseDouble(properties.getProperty("bitsperlink"))); response.setAvgLocality(Double.parseDouble(properties.getProperty("avglocality"))); response.setIndegreeMin(Long.parseLong(properties.getProperty("minindegree"))); response.setIndegreeMax(Long.parseLong(properties.getProperty("maxindegree"))); response.setIndegreeAvg(Double.parseDouble(properties.getProperty("avgindegree"))); response.setOutdegreeMin(Long.parseLong(properties.getProperty("minoutdegree"))); response.setOutdegreeMax(Long.parseLong(properties.getProperty("maxoutdegree"))); response.setOutdegreeAvg(Double.parseDouble(properties.getProperty("avgoutdegree"))); responseObserver.onNext(response.build()); responseObserver.onCompleted(); } /** Return a single node and its properties. */ @Override public void getNode(GetNodeRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); long nodeId; try { nodeId = g.getNodeId(new SWHID(request.getSwhid())); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } Node.Builder builder = Node.newBuilder(); NodePropertyBuilder.buildNodeProperties(g.getForwardGraph(), request.hasMask() ? request.getMask() : null, builder, nodeId); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } /** Perform a BFS traversal from a set of source nodes and stream the nodes encountered. */ @Override public void traverse(TraversalRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); Traversal.SimpleTraversal t; try { t = new Traversal.SimpleTraversal(g, request, responseObserver::onNext); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); responseObserver.onCompleted(); } /** * Find the shortest path between a set of source nodes and a node that matches a given criteria * using a BFS. */ @Override public void findPathTo(FindPathToRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); Traversal.FindPathTo t; try { t = new Traversal.FindPathTo(g, request); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); Path path = t.getPath(); if (path == null) { responseObserver.onError(Status.NOT_FOUND.asException()); } else { responseObserver.onNext(path); responseObserver.onCompleted(); } } /** * Find the shortest path between a set of source nodes and a set of destination nodes using a * bidirectional BFS. */ @Override public void findPathBetween(FindPathBetweenRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); Traversal.FindPathBetween t; try { t = new Traversal.FindPathBetween(g, request); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); Path path = t.getPath(); if (path == null) { responseObserver.onError(Status.NOT_FOUND.asException()); } else { responseObserver.onNext(path); responseObserver.onCompleted(); } } /** Return the number of nodes traversed by a BFS traversal. */ @Override public void countNodes(TraversalRequest request, StreamObserver responseObserver) { AtomicLong count = new AtomicLong(0); SwhBidirectionalGraph g = graph.copy(); TraversalRequest fixedReq = TraversalRequest.newBuilder(request) // Ignore return fields, just count nodes .setMask(FieldMask.getDefaultInstance()).build(); Traversal.SimpleTraversal t; try { t = new Traversal.SimpleTraversal(g, fixedReq, n -> count.incrementAndGet()); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } /** Return the number of edges traversed by a BFS traversal. */ @Override public void countEdges(TraversalRequest request, StreamObserver responseObserver) { AtomicLong count = new AtomicLong(0); SwhBidirectionalGraph g = graph.copy(); TraversalRequest fixedReq = TraversalRequest.newBuilder(request) // Force return empty successors to count the edges .setMask(FieldMask.newBuilder().addPaths("num_successors").build()).build(); Traversal.SimpleTraversal t; try { t = new Traversal.SimpleTraversal(g, fixedReq, n -> count.addAndGet(n.getNumSuccessors())); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } } }