diff --git a/java/pom.xml b/java/pom.xml index 9322d6a..234b3b9 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -1,361 +1,420 @@ 4.0.0 org.softwareheritage.graph swh-graph ${git.closest.tag.name} swh-graph https://forge.softwareheritage.org/source/swh-graph/ UTF-8 11 + 3.20.1 + 1.46.0 ch.qos.logback logback-classic 1.2.3 org.junit.jupiter junit-jupiter-api 5.7.0 test org.junit.vintage junit-vintage-engine 5.7.0 junit junit 4.12 org.junit.jupiter junit-jupiter-engine 5.7.0 test org.hamcrest hamcrest 2.2 test io.javalin javalin 3.0.0 org.slf4j slf4j-simple 1.7.26 com.fasterxml.jackson.core jackson-databind 2.13.0 it.unimi.dsi webgraph-big 3.6.7 it.unimi.dsi fastutil 8.5.8 it.unimi.dsi dsiutils 2.7.1 it.unimi.dsi sux4j 5.3.1 it.unimi.dsi law 2.7.2 org.apache.hadoop hadoop-common org.umlgraph umlgraph org.eclipse.jetty.aggregate jetty-all it.unimi.di mg4j it.unimi.di mg4j-big com.martiansoftware jsap 2.1 net.sf.py4j py4j 0.10.9.3 commons-codec commons-codec 1.15 com.github.luben zstd-jni 1.5.1-1 org.apache.orc orc-core 1.7.1 org.apache.hadoop hadoop-common 3.3.1 org.apache.hadoop hadoop-client-runtime 3.3.1 + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-services + ${grpc.version} + + + javax.annotation + javax.annotation-api + 1.3.2 + maven-clean-plugin 3.1.0 maven-resources-plugin 3.0.2 maven-compiler-plugin 3.8.0 11 11 -verbose -Xlint:all maven-surefire-plugin 2.22.2 maven-failsafe-plugin 2.22.2 maven-jar-plugin 3.0.2 maven-install-plugin 2.5.2 maven-deploy-plugin 2.8.2 maven-site-plugin 3.7.1 maven-project-info-reports-plugin 3.0.0 maven-assembly-plugin 3.3.0 org.softwareheritage.graph.server.App jar-with-dependencies false make-assembly package single com.diffplug.spotless spotless-maven-plugin 2.22.1 *.md .gitignore true 4 4.16.0 .coding-style.xml pl.project13.maven git-commit-id-plugin 3.0.1 get-the-git-infos revision initialize true true true true v* git.closest.tag.name ^v true maven-source-plugin 2.1.1 bundle-sources package jar-no-fork test-jar-no-fork org.apache.maven.plugins maven-javadoc-plugin 3.3.1 resource-bundles package resource-bundle test-resource-bundle false javadoc-jar package jar true it.unimi.dsi:webgraph-big:* https://webgraph.di.unimi.it/docs-big/ https://dsiutils.di.unimi.it/docs/ https://fastutil.di.unimi.it/docs/ https://law.di.unimi.it/software/law-docs/ implSpec a Implementation Requirements: implNote a Implementation Note: + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + compile + compile-custom + test-compile + test-compile-custom + + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java new file mode 100644 index 0000000..502e4ac --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java @@ -0,0 +1,157 @@ +package org.softwareheritage.graph.rpc; + +import com.martiansoftware.jsap.*; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.protobuf.services.ProtoReflectionService; +import it.unimi.dsi.logging.ProgressLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.softwareheritage.graph.SwhBidirectionalGraph; +import org.softwareheritage.graph.compress.LabelMapBuilder; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Server that manages startup/shutdown of a {@code Greeter} server. + */ +public class GraphServer { + private final static Logger logger = LoggerFactory.getLogger(GraphServer.class); + + private final SwhBidirectionalGraph graph; + private final int port; + private final int threads; + private Server server; + + public GraphServer(String graphBasename, int port, int threads) throws IOException { + this.graph = SwhBidirectionalGraph.loadLabelledMapped(graphBasename, new ProgressLogger(logger)); + this.port = port; + this.threads = threads; + graph.loadContentLength(); + graph.loadContentIsSkipped(); + graph.loadPersonIds(); + graph.loadAuthorTimestamps(); + graph.loadCommitterTimestamps(); + graph.loadMessages(); + graph.loadTagNames(); + graph.loadLabelNames(); + } + + private void start() throws IOException { + server = ServerBuilder.forPort(port).executor(Executors.newFixedThreadPool(threads)) + .addService(new TraversalService(graph)).addService(ProtoReflectionService.newInstance()).build() + .start(); + logger.info("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + GraphServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + })); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + private static JSAPResult parseArgs(String[] args) { + JSAPResult config = null; + try { + SimpleJSAP jsap = new SimpleJSAP(LabelMapBuilder.class.getName(), "", + new Parameter[]{ + new FlaggedOption("port", JSAP.INTEGER_PARSER, "50091", JSAP.NOT_REQUIRED, 'p', "port", + "The port on which the server should listen."), + new FlaggedOption("threads", JSAP.INTEGER_PARSER, "1", JSAP.NOT_REQUIRED, 't', "threads", + "The number of concurrent threads. 0 = number of cores."), + new UnflaggedOption("graphBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, + "Basename of the output graph")}); + + config = jsap.parse(args); + if (jsap.messagePrinted()) { + System.exit(1); + } + } catch (JSAPException e) { + e.printStackTrace(); + } + return config; + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + JSAPResult config = parseArgs(args); + String graphBasename = config.getString("graphBasename"); + int port = config.getInt("port"); + int threads = config.getInt("threads"); + if (threads == 0) { + threads = Runtime.getRuntime().availableProcessors(); + } + + final GraphServer server = new GraphServer(graphBasename, port, threads); + server.start(); + server.blockUntilShutdown(); + } + + static class TraversalService extends TraversalServiceGrpc.TraversalServiceImplBase { + SwhBidirectionalGraph graph; + + public TraversalService(SwhBidirectionalGraph graph) { + this.graph = graph; + } + + @Override + public void traverse(TraversalRequest request, StreamObserver responseObserver) { + SwhBidirectionalGraph g = graph.copy(); + Traversal.simpleTraversal(g, request, responseObserver::onNext); + responseObserver.onCompleted(); + } + + @Override + public void countNodes(TraversalRequest request, StreamObserver responseObserver) { + AtomicInteger count = new AtomicInteger(0); + SwhBidirectionalGraph g = graph.copy(); + TraversalRequest fixedReq = TraversalRequest.newBuilder(request) + // Ignore return fields, just count nodes + .setReturnFields(NodeFields.getDefaultInstance()).build(); + Traversal.simpleTraversal(g, fixedReq, (Node node) -> { + count.incrementAndGet(); + }); + CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void countEdges(TraversalRequest request, StreamObserver responseObserver) { + AtomicInteger count = new AtomicInteger(0); + SwhBidirectionalGraph g = graph.copy(); + TraversalRequest fixedReq = TraversalRequest.newBuilder(request) + // Force return empty successors to count the edges + .setReturnFields(NodeFields.newBuilder().setSuccessor(true).setSuccessorSwhid(false).build()) + .build(); + Traversal.simpleTraversal(g, fixedReq, (Node node) -> { + count.addAndGet(node.getSuccessorCount()); + }); + CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + } +} diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java b/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java new file mode 100644 index 0000000..e046476 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java @@ -0,0 +1,275 @@ +package org.softwareheritage.graph.rpc; + +import com.google.protobuf.ByteString; +import it.unimi.dsi.big.webgraph.LazyLongIterator; +import it.unimi.dsi.big.webgraph.labelling.ArcLabelledNodeIterator; +import it.unimi.dsi.big.webgraph.labelling.Label; +import org.softwareheritage.graph.*; +import org.softwareheritage.graph.labels.DirEntry; + +import java.util.*; + +public class Traversal { + private static LazyLongIterator filterSuccessors(SwhUnidirectionalGraph g, long nodeId, AllowedEdges allowedEdges) { + if (allowedEdges.restrictedTo == null) { + // All edges are allowed, bypass edge check + return g.successors(nodeId); + } else { + LazyLongIterator allSuccessors = g.successors(nodeId); + return new LazyLongIterator() { + @Override + public long nextLong() { + long neighbor; + while ((neighbor = allSuccessors.nextLong()) != -1) { + if (allowedEdges.isAllowed(g.getNodeType(nodeId), g.getNodeType(neighbor))) { + return neighbor; + } + } + return -1; + } + + @Override + public long skip(final long n) { + long i = 0; + while (i < n && nextLong() != -1) + i++; + return i; + } + }; + } + } + + private static ArcLabelledNodeIterator.LabelledArcIterator filterLabelledSuccessors(SwhUnidirectionalGraph g, + long nodeId, AllowedEdges allowedEdges) { + if (allowedEdges.restrictedTo == null) { + // All edges are allowed, bypass edge check + return g.labelledSuccessors(nodeId); + } else { + ArcLabelledNodeIterator.LabelledArcIterator allSuccessors = g.labelledSuccessors(nodeId); + return new ArcLabelledNodeIterator.LabelledArcIterator() { + @Override + public Label label() { + return allSuccessors.label(); + } + + @Override + public long nextLong() { + long neighbor; + while ((neighbor = allSuccessors.nextLong()) != -1) { + if (allowedEdges.isAllowed(g.getNodeType(nodeId), g.getNodeType(neighbor))) { + return neighbor; + } + } + return -1; + } + + @Override + public long skip(final long n) { + long i = 0; + while (i < n && nextLong() != -1) + i++; + return i; + } + }; + } + } + + private static class NodeFilterChecker { + private final SwhUnidirectionalGraph g; + private final NodeFilter filter; + private final AllowedNodes allowedNodes; + + private NodeFilterChecker(SwhUnidirectionalGraph graph, NodeFilter filter) { + this.g = graph; + this.filter = filter; + this.allowedNodes = new AllowedNodes(filter.hasTypes() ? filter.getTypes() : "*"); + } + + public boolean allowed(long nodeId) { + if (filter == null) { + return true; + } + if (!this.allowedNodes.isAllowed(g.getNodeType(nodeId))) { + return false; + } + + long outdegree = g.outdegree(nodeId); + if (filter.hasMinTraversalSuccessors() && outdegree < filter.getMinTraversalSuccessors()) { + return false; + } + if (filter.hasMaxTraversalSuccessors() && outdegree > filter.getMaxTraversalSuccessors()) { + return false; + } + + return true; + } + } + + public static SwhUnidirectionalGraph getDirectedGraph(SwhBidirectionalGraph g, TraversalRequest request) { + switch (request.getDirection()) { + case FORWARD: + return g.getForwardGraph(); + case BACKWARD: + return g.getBackwardGraph(); + case BOTH: + return new SwhUnidirectionalGraph(g.symmetrize(), g.getProperties()); + } + throw new IllegalArgumentException("Unknown direction: " + request.getDirection()); + } + + public static void simpleTraversal(SwhBidirectionalGraph bidirectionalGraph, TraversalRequest request, + NodeObserver nodeObserver) { + SwhUnidirectionalGraph g = getDirectedGraph(bidirectionalGraph, request); + NodeFilterChecker nodeReturnChecker = new NodeFilterChecker(g, request.getReturnNodes()); + + AllowedEdges allowedEdges = new AllowedEdges(request.hasEdges() ? request.getEdges() : "*"); + + Queue queue = new ArrayDeque<>(); + HashSet visited = new HashSet<>(); + request.getSrcList().forEach(srcSwhid -> { + long srcNodeId = g.getNodeId(new SWHID(srcSwhid)); + queue.add(srcNodeId); + visited.add(srcNodeId); + }); + queue.add(-1L); // Depth sentinel + + long edgesAccessed = 0; + long currentDepth = 0; + while (!queue.isEmpty()) { + long curr = queue.poll(); + if (curr == -1L) { + ++currentDepth; + if (!queue.isEmpty()) { + queue.add(-1L); + } + continue; + } + if (request.hasMaxDepth() && currentDepth > request.getMaxDepth()) { + break; + } + edgesAccessed += g.outdegree(curr); + if (request.hasMaxEdges() && edgesAccessed >= request.getMaxEdges()) { + break; + } + + Node.Builder nodeBuilder = null; + if (nodeReturnChecker.allowed(curr)) { + nodeBuilder = Node.newBuilder(); + buildNodeProperties(g, request.getReturnFields(), nodeBuilder, curr); + } + + ArcLabelledNodeIterator.LabelledArcIterator it = filterLabelledSuccessors(g, curr, allowedEdges); + for (long succ; (succ = it.nextLong()) != -1;) { + if (!visited.contains(succ)) { + queue.add(succ); + visited.add(succ); + } + buildSuccessorProperties(g, request.getReturnFields(), nodeBuilder, curr, succ, it.label()); + } + if (nodeBuilder != null) { + nodeObserver.onNext(nodeBuilder.build()); + } + } + } + + private static void buildNodeProperties(SwhUnidirectionalGraph graph, NodeFields fields, Node.Builder nodeBuilder, + long node) { + if (fields == null || !fields.hasSwhid() || fields.getSwhid()) { + nodeBuilder.setSwhid(graph.getSWHID(node).toString()); + } + if (fields == null) { + return; + } + + switch (graph.getNodeType(node)) { + case CNT: + if (fields.hasCntLength()) { + nodeBuilder.setCntLength(graph.getContentLength(node)); + } + if (fields.hasCntIsSkipped()) { + nodeBuilder.setCntIsSkipped(graph.isContentSkipped(node)); + } + break; + case REV: + if (fields.getRevAuthor()) { + nodeBuilder.setRevAuthor(graph.getAuthorId(node)); + } + if (fields.getRevCommitter()) { + nodeBuilder.setRevAuthor(graph.getCommitterId(node)); + } + if (fields.getRevAuthorDate()) { + nodeBuilder.setRevAuthorDate(graph.getAuthorTimestamp(node)); + } + if (fields.getRevAuthorDateOffset()) { + nodeBuilder.setRevAuthorDateOffset(graph.getAuthorTimestampOffset(node)); + } + if (fields.getRevCommitterDate()) { + nodeBuilder.setRevCommitterDate(graph.getCommitterTimestamp(node)); + } + if (fields.getRevCommitterDateOffset()) { + nodeBuilder.setRevCommitterDateOffset(graph.getCommitterTimestampOffset(node)); + } + if (fields.getRevMessage()) { + byte[] msg = graph.getMessage(node); + if (msg != null) { + nodeBuilder.setRevMessage(ByteString.copyFrom(msg)); + } + } + break; + case REL: + if (fields.getRelAuthor()) { + nodeBuilder.setRelAuthor(graph.getAuthorId(node)); + } + if (fields.getRelAuthorDate()) { + nodeBuilder.setRelAuthorDate(graph.getAuthorTimestamp(node)); + } + if (fields.getRelAuthorDateOffset()) { + nodeBuilder.setRelAuthorDateOffset(graph.getAuthorTimestampOffset(node)); + } + if (fields.getRelName()) { + byte[] msg = graph.getTagName(node); + if (msg != null) { + nodeBuilder.setRelName(ByteString.copyFrom(msg)); + } + } + if (fields.getRelMessage()) { + byte[] msg = graph.getMessage(node); + if (msg != null) { + nodeBuilder.setRelMessage(ByteString.copyFrom(msg)); + } + } + break; + case ORI: + if (fields.getOriUrl()) { + String url = graph.getUrl(node); + if (url != null) { + nodeBuilder.setOriUrl(url); + } + } + } + } + + private static void buildSuccessorProperties(SwhUnidirectionalGraph graph, NodeFields fields, + Node.Builder nodeBuilder, long src, long dst, Label label) { + if (nodeBuilder != null && fields != null && fields.getSuccessor()) { + Successor.Builder successorBuilder = Successor.newBuilder(); + if (!fields.hasSuccessorSwhid() || fields.getSuccessorSwhid()) { + successorBuilder.setSwhid(graph.getSWHID(dst).toString()); + } + if (fields.getSuccessorLabel()) { + DirEntry[] entries = (DirEntry[]) label.get(); + for (DirEntry entry : entries) { + EdgeLabel.Builder builder = EdgeLabel.newBuilder(); + builder.setName(ByteString.copyFrom(graph.getLabelName(entry.filenameId))); + builder.setPermission(entry.permission); + successorBuilder.addLabel(builder.build()); + } + } + nodeBuilder.addSuccessor(successorBuilder.build()); + } + } + + public interface NodeObserver { + void onNext(Node nodeId); + } +} diff --git a/proto/swhgraph.proto b/proto/swhgraph.proto new file mode 100644 index 0000000..0986234 --- /dev/null +++ b/proto/swhgraph.proto @@ -0,0 +1,102 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.softwareheritage.graph.rpc"; +option java_outer_classname = "GraphService"; + +package swh.graph; + +service TraversalService { + rpc Traverse (TraversalRequest) returns (stream Node); + rpc CountNodes (TraversalRequest) returns (CountResponse); + rpc CountEdges (TraversalRequest) returns (CountResponse); +} + +enum GraphDirection { + FORWARD = 0; + BACKWARD = 1; + BOTH = 2; +} + +message TraversalRequest { + repeated string src = 1; + + // Traversal options + optional GraphDirection direction = 2; + optional string edges = 3; + optional int64 max_edges = 4; + optional int64 max_depth = 5; + optional NodeFilter return_nodes = 6; + optional NodeFields return_fields = 7; +} + +message NodeFilter { + optional string types = 1; + optional int64 min_traversal_successors = 2; + optional int64 max_traversal_successors = 3; +} + +message NodeFields { + optional bool swhid = 1; + + optional bool successor = 2; + optional bool successor_swhid = 3; + optional bool successor_label = 4; + + optional bool cnt_length = 5; + optional bool cnt_is_skipped = 6; + + optional bool rev_author = 7; + optional bool rev_author_date = 8; + optional bool rev_author_date_offset = 9; + optional bool rev_committer = 10; + optional bool rev_committer_date = 11; + optional bool rev_committer_date_offset = 12; + optional bool rev_message = 13; + + optional bool rel_author = 14; + optional bool rel_author_date = 15; + optional bool rel_author_date_offset = 16; + optional bool rel_name = 17; + optional bool rel_message = 18; + + optional bool ori_url = 19; +} + +message Node { + string swhid = 1; + repeated Successor successor = 2; + + optional int64 cnt_length = 3; + optional bool cnt_is_skipped = 4; + + optional int64 rev_author = 5; + optional int64 rev_author_date = 6; + optional int32 rev_author_date_offset = 7; + optional int64 rev_committer = 8; + optional int64 rev_committer_date = 9; + optional int32 rev_committer_date_offset = 10; + optional bytes rev_message = 11; + + optional int64 rel_author = 12; + optional int64 rel_author_date = 13; + optional int32 rel_author_date_offset = 14; + optional bytes rel_name = 15; + optional bytes rel_message = 16; + + optional string ori_url = 17; +} + +message Successor { + optional string swhid = 1; + repeated EdgeLabel label = 2; +} + +message EdgeLabel { + bytes name = 1; + int32 permission = 2; +} + +message CountResponse { + int64 count = 1; +}