diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java index 502e4ac..6c1fcbc 100644 --- a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java +++ b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java @@ -1,157 +1,194 @@ package org.softwareheritage.graph.rpc; import com.martiansoftware.jsap.*; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.protobuf.services.ProtoReflectionService; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.softwareheritage.graph.SWHID; import org.softwareheritage.graph.SwhBidirectionalGraph; import org.softwareheritage.graph.compress.LabelMapBuilder; +import java.io.FileInputStream; import java.io.IOException; +import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Server that manages startup/shutdown of a {@code Greeter} server. */ public class GraphServer { private final static Logger logger = LoggerFactory.getLogger(GraphServer.class); private final SwhBidirectionalGraph graph; private final int port; private final int threads; private Server server; public GraphServer(String graphBasename, int port, int threads) throws IOException { this.graph = SwhBidirectionalGraph.loadLabelledMapped(graphBasename, new ProgressLogger(logger)); this.port = port; this.threads = threads; graph.loadContentLength(); graph.loadContentIsSkipped(); graph.loadPersonIds(); graph.loadAuthorTimestamps(); graph.loadCommitterTimestamps(); graph.loadMessages(); graph.loadTagNames(); graph.loadLabelNames(); } private void start() throws IOException { server = ServerBuilder.forPort(port).executor(Executors.newFixedThreadPool(threads)) .addService(new TraversalService(graph)).addService(ProtoReflectionService.newInstance()).build() .start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { GraphServer.this.stop(); } catch (InterruptedException e) { e.printStackTrace(System.err); } })); } private void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } } /** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } private static JSAPResult parseArgs(String[] args) { JSAPResult config = null; try { SimpleJSAP jsap = new SimpleJSAP(LabelMapBuilder.class.getName(), "", new Parameter[]{ new FlaggedOption("port", JSAP.INTEGER_PARSER, "50091", JSAP.NOT_REQUIRED, 'p', "port", "The port on which the server should listen."), new FlaggedOption("threads", JSAP.INTEGER_PARSER, "1", JSAP.NOT_REQUIRED, 't', "threads", "The number of concurrent threads. 0 = number of cores."), new UnflaggedOption("graphBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, "Basename of the output graph")}); config = jsap.parse(args); if (jsap.messagePrinted()) { System.exit(1); } } catch (JSAPException e) { e.printStackTrace(); } return config; } /** * Main launches the server from the command line. */ public static void main(String[] args) throws IOException, InterruptedException { JSAPResult config = parseArgs(args); String graphBasename = config.getString("graphBasename"); int port = config.getInt("port"); int threads = config.getInt("threads"); if (threads == 0) { threads = Runtime.getRuntime().availableProcessors(); } final GraphServer server = new GraphServer(graphBasename, port, threads); server.start(); server.blockUntilShutdown(); } static class TraversalService extends TraversalServiceGrpc.TraversalServiceImplBase { SwhBidirectionalGraph graph; public TraversalService(SwhBidirectionalGraph graph) { this.graph = graph; } + @Override + public void checkSwhid(CheckSwhidRequest request, StreamObserver responseObserver) { + graph.getNodeId(new SWHID(request.getSwhid())); + responseObserver.onNext(CheckSwhidResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void stats(StatsRequest request, StreamObserver responseObserver) { + StatsResponse.Builder response = StatsResponse.newBuilder(); + response.setNumNodes(graph.numNodes()); + response.setNumEdges(graph.numArcs()); + + Properties properties = new Properties(); + try { + properties.load(new FileInputStream(graph.getPath() + ".properties")); + properties.load(new FileInputStream(graph.getPath() + ".stats")); + } catch (IOException e) { + throw new RuntimeException(e); + } + response.setCompression(Double.parseDouble(properties.getProperty("compratio"))); + response.setBitsPerNode(Double.parseDouble(properties.getProperty("bitspernode"))); + response.setBitsPerEdge(Double.parseDouble(properties.getProperty("bitsperlink"))); + response.setAvgLocality(Double.parseDouble(properties.getProperty("avglocality"))); + response.setIndegreeMin(Long.parseLong(properties.getProperty("minindegree"))); + response.setIndegreeMax(Long.parseLong(properties.getProperty("maxindegree"))); + response.setIndegreeAvg(Double.parseDouble(properties.getProperty("avgindegree"))); + response.setOutdegreeMin(Long.parseLong(properties.getProperty("minoutdegree"))); + response.setOutdegreeMax(Long.parseLong(properties.getProperty("maxoutdegree"))); + response.setOutdegreeAvg(Double.parseDouble(properties.getProperty("avgoutdegree"))); + responseObserver.onNext(response.build()); + responseObserver.onCompleted(); + } + @Override public void traverse(TraversalRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); Traversal.simpleTraversal(g, request, responseObserver::onNext); responseObserver.onCompleted(); } @Override public void countNodes(TraversalRequest request, StreamObserver responseObserver) { AtomicInteger count = new AtomicInteger(0); SwhBidirectionalGraph g = graph.copy(); TraversalRequest fixedReq = TraversalRequest.newBuilder(request) // Ignore return fields, just count nodes .setReturnFields(NodeFields.getDefaultInstance()).build(); Traversal.simpleTraversal(g, fixedReq, (Node node) -> { count.incrementAndGet(); }); CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } @Override public void countEdges(TraversalRequest request, StreamObserver responseObserver) { AtomicInteger count = new AtomicInteger(0); SwhBidirectionalGraph g = graph.copy(); TraversalRequest fixedReq = TraversalRequest.newBuilder(request) // Force return empty successors to count the edges .setReturnFields(NodeFields.newBuilder().setSuccessor(true).setSuccessorSwhid(false).build()) .build(); Traversal.simpleTraversal(g, fixedReq, (Node node) -> { count.addAndGet(node.getSuccessorCount()); }); CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } } } diff --git a/proto/swhgraph.proto b/proto/swhgraph.proto index 0986234..957de23 100644 --- a/proto/swhgraph.proto +++ b/proto/swhgraph.proto @@ -1,102 +1,131 @@ syntax = "proto3"; option java_multiple_files = true; option java_package = "org.softwareheritage.graph.rpc"; option java_outer_classname = "GraphService"; package swh.graph; service TraversalService { rpc Traverse (TraversalRequest) returns (stream Node); rpc CountNodes (TraversalRequest) returns (CountResponse); rpc CountEdges (TraversalRequest) returns (CountResponse); + rpc Stats (StatsRequest) returns (StatsResponse); + rpc CheckSwhid (CheckSwhidRequest) returns (CheckSwhidResponse); } enum GraphDirection { FORWARD = 0; BACKWARD = 1; BOTH = 2; } message TraversalRequest { repeated string src = 1; // Traversal options optional GraphDirection direction = 2; optional string edges = 3; optional int64 max_edges = 4; optional int64 max_depth = 5; optional NodeFilter return_nodes = 6; optional NodeFields return_fields = 7; } message NodeFilter { optional string types = 1; optional int64 min_traversal_successors = 2; optional int64 max_traversal_successors = 3; } message NodeFields { optional bool swhid = 1; optional bool successor = 2; optional bool successor_swhid = 3; optional bool successor_label = 4; optional bool cnt_length = 5; optional bool cnt_is_skipped = 6; optional bool rev_author = 7; optional bool rev_author_date = 8; optional bool rev_author_date_offset = 9; optional bool rev_committer = 10; optional bool rev_committer_date = 11; optional bool rev_committer_date_offset = 12; optional bool rev_message = 13; optional bool rel_author = 14; optional bool rel_author_date = 15; optional bool rel_author_date_offset = 16; optional bool rel_name = 17; optional bool rel_message = 18; optional bool ori_url = 19; } message Node { string swhid = 1; repeated Successor successor = 2; optional int64 cnt_length = 3; optional bool cnt_is_skipped = 4; optional int64 rev_author = 5; optional int64 rev_author_date = 6; optional int32 rev_author_date_offset = 7; optional int64 rev_committer = 8; optional int64 rev_committer_date = 9; optional int32 rev_committer_date_offset = 10; optional bytes rev_message = 11; optional int64 rel_author = 12; optional int64 rel_author_date = 13; optional int32 rel_author_date_offset = 14; optional bytes rel_name = 15; optional bytes rel_message = 16; optional string ori_url = 17; } message Successor { optional string swhid = 1; repeated EdgeLabel label = 2; } message EdgeLabel { bytes name = 1; int32 permission = 2; } message CountResponse { int64 count = 1; } + +message StatsRequest { +} + +message StatsResponse { + int64 num_nodes = 1; + int64 num_edges = 2; + + double compression = 3; + double bits_per_node = 4; + double bits_per_edge = 5; + double avg_locality = 6; + + int64 indegree_min = 7; + int64 indegree_max = 8; + double indegree_avg = 9; + int64 outdegree_min = 10; + int64 outdegree_max = 11; + double outdegree_avg = 12; +} + +message CheckSwhidRequest { + string swhid = 1; +} + +message CheckSwhidResponse { +}