diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java index 3fa4359..a7c0ed7 100644 --- a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java +++ b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java @@ -1,274 +1,293 @@ package org.softwareheritage.graph.rpc; import com.google.protobuf.FieldMask; import com.martiansoftware.jsap.*; import io.grpc.Server; import io.grpc.Status; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.netty.shaded.io.netty.channel.ChannelOption; import io.grpc.stub.StreamObserver; import io.grpc.protobuf.services.ProtoReflectionService; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.softwareheritage.graph.SWHID; import org.softwareheritage.graph.SwhBidirectionalGraph; import org.softwareheritage.graph.compress.LabelMapBuilder; import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * Server that manages startup/shutdown of a {@code Greeter} server. */ public class GraphServer { private final static Logger logger = LoggerFactory.getLogger(GraphServer.class); private final SwhBidirectionalGraph graph; private final int port; private final int threads; private Server server; + /** + * @param graphBasename the basename of the SWH graph to load + * @param port the port on which the GRPC server will listen + * @param threads the number of threads to use in the server threadpool + */ public GraphServer(String graphBasename, int port, int threads) throws IOException { this.graph = loadGraph(graphBasename); this.port = port; this.threads = threads; } + /** Load a graph and all its properties. */ public static SwhBidirectionalGraph loadGraph(String basename) throws IOException { // TODO: use loadLabelledMapped() when https://github.com/vigna/webgraph-big/pull/5 is merged SwhBidirectionalGraph g = SwhBidirectionalGraph.loadLabelled(basename, new ProgressLogger(logger)); g.loadContentLength(); g.loadContentIsSkipped(); g.loadPersonIds(); g.loadAuthorTimestamps(); g.loadCommitterTimestamps(); g.loadMessages(); g.loadTagNames(); g.loadLabelNames(); return g; } + /** Start the RPC server. */ private void start() throws IOException { server = NettyServerBuilder.forPort(port).withChildOption(ChannelOption.SO_REUSEADDR, true) .executor(Executors.newFixedThreadPool(threads)).addService(new TraversalService(graph)) .addService(ProtoReflectionService.newInstance()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { GraphServer.this.stop(); } catch (InterruptedException e) { e.printStackTrace(System.err); } })); } private void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } } /** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } private static JSAPResult parseArgs(String[] args) { JSAPResult config = null; try { SimpleJSAP jsap = new SimpleJSAP(LabelMapBuilder.class.getName(), "", new Parameter[]{ new FlaggedOption("port", JSAP.INTEGER_PARSER, "50091", JSAP.NOT_REQUIRED, 'p', "port", "The port on which the server should listen."), new FlaggedOption("threads", JSAP.INTEGER_PARSER, "1", JSAP.NOT_REQUIRED, 't', "threads", "The number of concurrent threads. 0 = number of cores."), new UnflaggedOption("graphBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, "Basename of the output graph")}); config = jsap.parse(args); if (jsap.messagePrinted()) { System.exit(1); } } catch (JSAPException e) { e.printStackTrace(); } return config; } - /** - * Main launches the server from the command line. - */ + /** Main launches the server from the command line. */ public static void main(String[] args) throws IOException, InterruptedException { JSAPResult config = parseArgs(args); String graphBasename = config.getString("graphBasename"); int port = config.getInt("port"); int threads = config.getInt("threads"); if (threads == 0) { threads = Runtime.getRuntime().availableProcessors(); } final GraphServer server = new GraphServer(graphBasename, port, threads); server.start(); server.blockUntilShutdown(); } + /** Implementation of the Traversal service, which contains all the graph querying endpoints. */ static class TraversalService extends TraversalServiceGrpc.TraversalServiceImplBase { SwhBidirectionalGraph graph; public TraversalService(SwhBidirectionalGraph graph) { this.graph = graph; } + /** Return various statistics on the overall graph. */ @Override public void stats(StatsRequest request, StreamObserver responseObserver) { StatsResponse.Builder response = StatsResponse.newBuilder(); response.setNumNodes(graph.numNodes()); response.setNumEdges(graph.numArcs()); Properties properties = new Properties(); try { properties.load(new FileInputStream(graph.getPath() + ".properties")); properties.load(new FileInputStream(graph.getPath() + ".stats")); } catch (IOException e) { throw new RuntimeException(e); } response.setCompressionRatio(Double.parseDouble(properties.getProperty("compratio"))); response.setBitsPerNode(Double.parseDouble(properties.getProperty("bitspernode"))); response.setBitsPerEdge(Double.parseDouble(properties.getProperty("bitsperlink"))); response.setAvgLocality(Double.parseDouble(properties.getProperty("avglocality"))); response.setIndegreeMin(Long.parseLong(properties.getProperty("minindegree"))); response.setIndegreeMax(Long.parseLong(properties.getProperty("maxindegree"))); response.setIndegreeAvg(Double.parseDouble(properties.getProperty("avgindegree"))); response.setOutdegreeMin(Long.parseLong(properties.getProperty("minoutdegree"))); response.setOutdegreeMax(Long.parseLong(properties.getProperty("maxoutdegree"))); response.setOutdegreeAvg(Double.parseDouble(properties.getProperty("avgoutdegree"))); responseObserver.onNext(response.build()); responseObserver.onCompleted(); } + /** Return a single node and its properties. */ @Override public void getNode(GetNodeRequest request, StreamObserver responseObserver) { long nodeId; try { nodeId = graph.getNodeId(new SWHID(request.getSwhid())); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } Node.Builder builder = Node.newBuilder(); NodePropertyBuilder.buildNodeProperties(graph.getForwardGraph(), request.hasMask() ? request.getMask() : null, builder, nodeId); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } + /** Perform a BFS traversal from a set of source nodes and stream the nodes encountered. */ @Override public void traverse(TraversalRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); Traversal.SimpleTraversal t; try { t = new Traversal.SimpleTraversal(g, request, responseObserver::onNext); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); responseObserver.onCompleted(); } + /** + * Find the shortest path between a set of source nodes and a node that matches a given criteria + * using a BFS. + */ @Override public void findPathTo(FindPathToRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); Traversal.FindPathTo t; try { t = new Traversal.FindPathTo(g, request); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); Path path = t.getPath(); if (path == null) { responseObserver.onError(Status.NOT_FOUND.asException()); } else { responseObserver.onNext(path); responseObserver.onCompleted(); } } + /** + * Find the shortest path between a set of source nodes and a set of destination nodes using a + * bidirectional BFS. + */ @Override public void findPathBetween(FindPathBetweenRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); Traversal.FindPathBetween t; try { t = new Traversal.FindPathBetween(g, request); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); Path path = t.getPath(); if (path == null) { responseObserver.onError(Status.NOT_FOUND.asException()); } else { responseObserver.onNext(path); responseObserver.onCompleted(); } } + /** Return the number of nodes traversed by a BFS traversal. */ @Override public void countNodes(TraversalRequest request, StreamObserver responseObserver) { AtomicLong count = new AtomicLong(0); SwhBidirectionalGraph g = graph.copy(); TraversalRequest fixedReq = TraversalRequest.newBuilder(request) // Ignore return fields, just count nodes .setMask(FieldMask.getDefaultInstance()).build(); Traversal.SimpleTraversal t; try { t = new Traversal.SimpleTraversal(g, fixedReq, n -> count.incrementAndGet()); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } + /** Return the number of edges traversed by a BFS traversal. */ @Override public void countEdges(TraversalRequest request, StreamObserver responseObserver) { AtomicLong count = new AtomicLong(0); SwhBidirectionalGraph g = graph.copy(); TraversalRequest fixedReq = TraversalRequest.newBuilder(request) // Force return empty successors to count the edges .setMask(FieldMask.newBuilder().addPaths("num_successors").build()).build(); Traversal.SimpleTraversal t; try { t = new Traversal.SimpleTraversal(g, fixedReq, n -> count.addAndGet(n.getNumSuccessors())); } catch (IllegalArgumentException e) { responseObserver .onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); return; } t.visit(); CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } } } diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/NodePropertyBuilder.java b/java/src/main/java/org/softwareheritage/graph/rpc/NodePropertyBuilder.java index 7906fb7..1cedeb9 100644 --- a/java/src/main/java/org/softwareheritage/graph/rpc/NodePropertyBuilder.java +++ b/java/src/main/java/org/softwareheritage/graph/rpc/NodePropertyBuilder.java @@ -1,198 +1,210 @@ /* * Copyright (c) 2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU General Public License version 3, or any later version * See top-level LICENSE file for more information */ package org.softwareheritage.graph.rpc; import com.google.protobuf.ByteString; import com.google.protobuf.FieldMask; import com.google.protobuf.util.FieldMaskUtil; import it.unimi.dsi.big.webgraph.labelling.Label; import org.softwareheritage.graph.SwhUnidirectionalGraph; import org.softwareheritage.graph.labels.DirEntry; import java.util.*; -/** NodePropertyBuilder is a helper class. */ +/** + * NodePropertyBuilder is a helper class to enrich {@link Node} messages with node and edge + * properties. It is used by {@link GraphServer.TraversalService} to build the response messages or + * streams. Because property access is disk-based and slow, particular care is taken to avoid + * loading unnecessary properties. We use a FieldMask object to check which properties are requested + * by the client, and only load these. + */ public class NodePropertyBuilder { /** * NodeDataMask caches a FieldMask into a more efficient representation (booleans). This avoids the * need of parsing the FieldMask for each node in the stream. */ public static class NodeDataMask { public boolean swhid; public boolean successor; public boolean successorSwhid; public boolean successorLabel; public boolean numSuccessors; public boolean cntLength; public boolean cntIsSkipped; public boolean revAuthor; public boolean revAuthorDate; public boolean revAuthorDateOffset; public boolean revCommitter; public boolean revCommitterDate; public boolean revCommitterDateOffset; public boolean revMessage; public boolean relAuthor; public boolean relAuthorDate; public boolean relAuthorDateOffset; public boolean relName; public boolean relMessage; public boolean oriUrl; public NodeDataMask(FieldMask mask) { Set allowedFields = null; if (mask != null) { mask = FieldMaskUtil.normalize(mask); allowedFields = new HashSet<>(mask.getPathsList()); } this.swhid = allowedFields == null || allowedFields.contains("swhid"); this.successorSwhid = allowedFields == null || allowedFields.contains("successor") || allowedFields.contains("successor.swhid"); this.successorLabel = allowedFields == null || allowedFields.contains("successor") || allowedFields.contains("successor.label"); this.successor = this.successorSwhid || this.successorLabel; this.numSuccessors = allowedFields == null || allowedFields.contains("num_successors"); this.cntLength = allowedFields == null || allowedFields.contains("cnt.length"); this.cntIsSkipped = allowedFields == null || allowedFields.contains("cnt.is_skipped"); this.revAuthor = allowedFields == null || allowedFields.contains("rev.author"); this.revAuthorDate = allowedFields == null || allowedFields.contains("rev.author_date"); this.revAuthorDateOffset = allowedFields == null || allowedFields.contains("rev.author_date_offset"); this.revCommitter = allowedFields == null || allowedFields.contains("rev.committer"); this.revCommitterDate = allowedFields == null || allowedFields.contains("rev.committer_date"); this.revCommitterDateOffset = allowedFields == null || allowedFields.contains("rev.committer_date_offset"); this.revMessage = allowedFields == null || allowedFields.contains("rev.message"); this.relAuthor = allowedFields == null || allowedFields.contains("rel.author"); this.relAuthorDate = allowedFields == null || allowedFields.contains("rel.author_date"); this.relAuthorDateOffset = allowedFields == null || allowedFields.contains("rel.author_date_offset"); this.relName = allowedFields == null || allowedFields.contains("rel.name"); this.relMessage = allowedFields == null || allowedFields.contains("rel.message"); this.oriUrl = allowedFields == null || allowedFields.contains("ori.url"); } } + /** Enrich a Node message with node properties requested in the NodeDataMask. */ public static void buildNodeProperties(SwhUnidirectionalGraph graph, NodeDataMask mask, Node.Builder nodeBuilder, long node) { if (mask.swhid) { nodeBuilder.setSwhid(graph.getSWHID(node).toString()); } switch (graph.getNodeType(node)) { case CNT: ContentData.Builder cntBuilder = ContentData.newBuilder(); if (mask.cntLength) { cntBuilder.setLength(graph.getContentLength(node)); } if (mask.cntIsSkipped) { cntBuilder.setIsSkipped(graph.isContentSkipped(node)); } nodeBuilder.setCnt(cntBuilder.build()); break; case REV: RevisionData.Builder revBuilder = RevisionData.newBuilder(); if (mask.revAuthor) { revBuilder.setAuthor(graph.getAuthorId(node)); } if (mask.revAuthorDate) { revBuilder.setAuthorDate(graph.getAuthorTimestamp(node)); } if (mask.revAuthorDateOffset) { revBuilder.setAuthorDateOffset(graph.getAuthorTimestampOffset(node)); } if (mask.revCommitter) { revBuilder.setCommitter(graph.getCommitterId(node)); } if (mask.revCommitterDate) { revBuilder.setCommitterDate(graph.getCommitterTimestamp(node)); } if (mask.revCommitterDateOffset) { revBuilder.setCommitterDateOffset(graph.getCommitterTimestampOffset(node)); } if (mask.revMessage) { byte[] msg = graph.getMessage(node); if (msg != null) { revBuilder.setMessage(ByteString.copyFrom(msg)); } } nodeBuilder.setRev(revBuilder.build()); break; case REL: ReleaseData.Builder relBuilder = ReleaseData.newBuilder(); if (mask.relAuthor) { relBuilder.setAuthor(graph.getAuthorId(node)); } if (mask.relAuthorDate) { relBuilder.setAuthorDate(graph.getAuthorTimestamp(node)); } if (mask.relAuthorDateOffset) { relBuilder.setAuthorDateOffset(graph.getAuthorTimestampOffset(node)); } if (mask.relName) { byte[] msg = graph.getMessage(node); if (msg != null) { relBuilder.setMessage(ByteString.copyFrom(msg)); } } if (mask.relMessage) { byte[] msg = graph.getMessage(node); if (msg != null) { relBuilder.setMessage(ByteString.copyFrom(msg)); } } nodeBuilder.setRel(relBuilder.build()); break; case ORI: OriginData.Builder oriBuilder = OriginData.newBuilder(); if (mask.oriUrl) { String url = graph.getUrl(node); if (url != null) { oriBuilder.setUrl(url); } } nodeBuilder.setOri(oriBuilder.build()); } } + /** Enrich a Node message with node properties requested in the FieldMask. */ public static void buildNodeProperties(SwhUnidirectionalGraph graph, FieldMask mask, Node.Builder nodeBuilder, long node) { NodeDataMask nodeMask = new NodeDataMask(mask); buildNodeProperties(graph, nodeMask, nodeBuilder, node); } + /** + * Enrich a Node message with edge properties requested in the NodeDataMask, for a specific edge. + */ public static void buildSuccessorProperties(SwhUnidirectionalGraph graph, NodeDataMask mask, Node.Builder nodeBuilder, long src, long dst, Label label) { if (nodeBuilder != null) { Successor.Builder successorBuilder = Successor.newBuilder(); if (mask.successorSwhid) { successorBuilder.setSwhid(graph.getSWHID(dst).toString()); } if (mask.successorLabel) { DirEntry[] entries = (DirEntry[]) label.get(); for (DirEntry entry : entries) { EdgeLabel.Builder builder = EdgeLabel.newBuilder(); builder.setName(ByteString.copyFrom(graph.getLabelName(entry.filenameId))); builder.setPermission(entry.permission); successorBuilder.addLabel(builder.build()); } } Successor successor = successorBuilder.build(); if (successor != Successor.getDefaultInstance()) { nodeBuilder.addSuccessor(successor); } if (mask.numSuccessors) { nodeBuilder.setNumSuccessors(nodeBuilder.getNumSuccessors() + 1); } } } + /** Enrich a Node message with edge properties requested in the FieldMask, for a specific edge. */ public static void buildSuccessorProperties(SwhUnidirectionalGraph graph, FieldMask mask, Node.Builder nodeBuilder, long src, long dst, Label label) { NodeDataMask nodeMask = new NodeDataMask(mask); buildSuccessorProperties(graph, nodeMask, nodeBuilder, src, dst, label); } } diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java b/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java index ba8ed40..5b5bf8e 100644 --- a/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java +++ b/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java @@ -1,429 +1,526 @@ package org.softwareheritage.graph.rpc; import it.unimi.dsi.big.webgraph.labelling.ArcLabelledNodeIterator; import it.unimi.dsi.big.webgraph.labelling.Label; import org.softwareheritage.graph.*; import java.util.*; +/** Traversal contains all the algorithms used for graph traversals */ public class Traversal { + /** + * Wrapper around g.successors(), only follows edges that are allowed by the given + * {@link AllowedEdges} object. + */ private static ArcLabelledNodeIterator.LabelledArcIterator filterLabelledSuccessors(SwhUnidirectionalGraph g, long nodeId, AllowedEdges allowedEdges) { if (allowedEdges.restrictedTo == null) { // All edges are allowed, bypass edge check return g.labelledSuccessors(nodeId); } else { ArcLabelledNodeIterator.LabelledArcIterator allSuccessors = g.labelledSuccessors(nodeId); return new ArcLabelledNodeIterator.LabelledArcIterator() { @Override public Label label() { return allSuccessors.label(); } @Override public long nextLong() { long neighbor; while ((neighbor = allSuccessors.nextLong()) != -1) { if (allowedEdges.isAllowed(g.getNodeType(nodeId), g.getNodeType(neighbor))) { return neighbor; } } return -1; } @Override public long skip(final long n) { long i = 0; while (i < n && nextLong() != -1) i++; return i; } }; } } + /** Helper class to check that a given node is "valid" for some given {@link NodeFilter} */ private static class NodeFilterChecker { private final SwhUnidirectionalGraph g; private final NodeFilter filter; private final AllowedNodes allowedNodes; private NodeFilterChecker(SwhUnidirectionalGraph graph, NodeFilter filter) { this.g = graph; this.filter = filter; this.allowedNodes = new AllowedNodes(filter.hasTypes() ? filter.getTypes() : "*"); } public boolean allowed(long nodeId) { if (filter == null) { return true; } if (!this.allowedNodes.isAllowed(g.getNodeType(nodeId))) { return false; } return true; } } + /** Returns the unidirectional graph from a bidirectional graph and a {@link GraphDirection}. */ public static SwhUnidirectionalGraph getDirectedGraph(SwhBidirectionalGraph g, GraphDirection direction) { switch (direction) { case FORWARD: return g.getForwardGraph(); case BACKWARD: return g.getBackwardGraph(); /* * TODO: add support for BOTH case BOTH: return new SwhUnidirectionalGraph(g.symmetrize(), * g.getProperties()); */ default : throw new IllegalArgumentException("Unknown direction: " + direction); } } + /** Returns the opposite of a given {@link GraphDirection} (equivalent to a graph transposition). */ public static GraphDirection reverseDirection(GraphDirection direction) { switch (direction) { case FORWARD: return GraphDirection.BACKWARD; case BACKWARD: return GraphDirection.FORWARD; /* * TODO: add support for BOTH case BOTH: return GraphDirection.BOTH; */ default : throw new IllegalArgumentException("Unknown direction: " + direction); } } + /** Dummy exception to short-circuit and interrupt a graph traversal. */ static class StopTraversalException extends RuntimeException { } + /** Generic BFS traversal algorithm. */ static class BFSVisitor { + /** The graph to traverse. */ protected final SwhUnidirectionalGraph g; + /** Depth of the node currently being visited */ protected long depth = 0; + /** + * Number of traversal successors (i.e., successors that will be considered by the traversal) of the + * node currently being visited + */ protected long traversalSuccessors = 0; + /** Number of edges accessed since the beginning of the traversal */ protected long edgesAccessed = 0; + /** + * Map from a node ID to its parent node ID. The key set can be used as the set of all visited + * nodes. + */ protected HashMap parents = new HashMap<>(); + /** Queue of nodes to visit (also called "frontier", "open set", "wavefront" etc.) */ protected ArrayDeque queue = new ArrayDeque<>(); + /** If > 0, the maximum depth of the traversal. */ private long maxDepth = -1; + /** If > 0, the maximum number of edges to traverse. */ private long maxEdges = -1; BFSVisitor(SwhUnidirectionalGraph g) { this.g = g; } + /** Add a new source node to the initial queue. */ public void addSource(long nodeId) { queue.add(nodeId); parents.put(nodeId, -1L); } + /** Set the maximum depth of the traversal. */ public void setMaxDepth(long depth) { maxDepth = depth; } + /** Set the maximum number of edges to traverse. */ public void setMaxEdges(long edges) { maxEdges = edges; } + /** Setup the visit counters and depth sentinel. */ public void visitSetup() { edgesAccessed = 0; depth = 0; queue.add(-1L); // depth sentinel } + /** Perform the visit */ public void visit() { visitSetup(); while (!queue.isEmpty()) { visitStep(); } } + /** Single "step" of a visit. Advance the frontier of exactly one node. */ public void visitStep() { try { assert !queue.isEmpty(); long curr = queue.poll(); if (curr == -1L) { ++depth; if (!queue.isEmpty()) { queue.add(-1L); visitStep(); } return; } if (maxDepth >= 0 && depth > maxDepth) { throw new StopTraversalException(); } edgesAccessed += g.outdegree(curr); if (maxEdges >= 0 && edgesAccessed > maxEdges) { throw new StopTraversalException(); } visitNode(curr); } catch (StopTraversalException e) { // Traversal is over, clear the to-do queue. queue.clear(); } } + /** + * Get the successors of a node. Override this function if you want to filter which successors are + * considered during the traversal. + */ protected ArcLabelledNodeIterator.LabelledArcIterator getSuccessors(long nodeId) { return g.labelledSuccessors(nodeId); } + /** Visit a node. Override to do additional processing on the node. */ protected void visitNode(long node) { ArcLabelledNodeIterator.LabelledArcIterator it = getSuccessors(node); traversalSuccessors = 0; for (long succ; (succ = it.nextLong()) != -1;) { traversalSuccessors++; visitEdge(node, succ, it.label()); } } + /** Visit an edge. Override to do additional processing on the edge. */ protected void visitEdge(long src, long dst, Label label) { if (!parents.containsKey(dst)) { queue.add(dst); parents.put(dst, src); } } } + /** + * SimpleTraversal is used by the Traverse endpoint. It extends BFSVisitor with additional + * processing, notably related to graph properties and filters. + */ static class SimpleTraversal extends BFSVisitor { private final NodeFilterChecker nodeReturnChecker; private final AllowedEdges allowedEdges; private final TraversalRequest request; private final NodePropertyBuilder.NodeDataMask nodeDataMask; private final NodeObserver nodeObserver; private Node.Builder nodeBuilder; SimpleTraversal(SwhBidirectionalGraph bidirectionalGraph, TraversalRequest request, NodeObserver nodeObserver) { super(getDirectedGraph(bidirectionalGraph, request.getDirection())); this.request = request; this.nodeObserver = nodeObserver; this.nodeReturnChecker = new NodeFilterChecker(g, request.getReturnNodes()); this.nodeDataMask = new NodePropertyBuilder.NodeDataMask(request.hasMask() ? request.getMask() : null); this.allowedEdges = new AllowedEdges(request.hasEdges() ? request.getEdges() : "*"); request.getSrcList().forEach(srcSwhid -> { long srcNodeId = g.getNodeId(new SWHID(srcSwhid)); addSource(srcNodeId); }); if (request.hasMaxDepth()) { setMaxDepth(request.getMaxDepth()); } if (request.hasMaxEdges()) { setMaxEdges(request.getMaxEdges()); } } @Override protected ArcLabelledNodeIterator.LabelledArcIterator getSuccessors(long nodeId) { return filterLabelledSuccessors(g, nodeId, allowedEdges); } @Override public void visitNode(long node) { nodeBuilder = null; if (nodeReturnChecker.allowed(node) && (!request.hasMinDepth() || depth >= request.getMinDepth())) { nodeBuilder = Node.newBuilder(); NodePropertyBuilder.buildNodeProperties(g, nodeDataMask, nodeBuilder, node); } super.visitNode(node); if (request.getReturnNodes().hasMinTraversalSuccessors() && traversalSuccessors < request.getReturnNodes().getMinTraversalSuccessors() || request.getReturnNodes().hasMaxTraversalSuccessors() && traversalSuccessors > request.getReturnNodes().getMaxTraversalSuccessors()) { nodeBuilder = null; } if (nodeBuilder != null) { nodeObserver.onNext(nodeBuilder.build()); } } @Override protected void visitEdge(long src, long dst, Label label) { super.visitEdge(src, dst, label); NodePropertyBuilder.buildSuccessorProperties(g, nodeDataMask, nodeBuilder, src, dst, label); } } + /** + * FindPathTo searches for a path from a source node to a node matching a given criteria It extends + * BFSVisitor with additional processing, and makes the traversal stop as soon as a node matching + * the given criteria is found. + */ static class FindPathTo extends BFSVisitor { private final AllowedEdges allowedEdges; private final FindPathToRequest request; private final NodePropertyBuilder.NodeDataMask nodeDataMask; private final NodeFilterChecker targetChecker; private Long targetNode = null; FindPathTo(SwhBidirectionalGraph bidirectionalGraph, FindPathToRequest request) { super(getDirectedGraph(bidirectionalGraph, request.getDirection())); this.request = request; this.targetChecker = new NodeFilterChecker(g, request.getTarget()); this.nodeDataMask = new NodePropertyBuilder.NodeDataMask(request.hasMask() ? request.getMask() : null); this.allowedEdges = new AllowedEdges(request.hasEdges() ? request.getEdges() : "*"); if (request.hasMaxDepth()) { setMaxDepth(request.getMaxDepth()); } if (request.hasMaxEdges()) { setMaxEdges(request.getMaxEdges()); } request.getSrcList().forEach(srcSwhid -> { long srcNodeId = g.getNodeId(new SWHID(srcSwhid)); addSource(srcNodeId); }); } @Override protected ArcLabelledNodeIterator.LabelledArcIterator getSuccessors(long nodeId) { return filterLabelledSuccessors(g, nodeId, allowedEdges); } @Override public void visitNode(long node) { if (targetChecker.allowed(node)) { targetNode = node; throw new StopTraversalException(); } super.visitNode(node); } + /** + * Once the visit has been performed and a matching node has been found, return the shortest path + * from the source set to that node. To do so, we need to backtrack the parents of the node until we + * find one of the source nodes (whose parent is -1). + */ public Path getPath() { if (targetNode == null) { - return null; + return null; // No path found. } - Path.Builder pathBuilder = Path.newBuilder(); + + /* Backtrack from targetNode to a source node */ long curNode = targetNode; ArrayList path = new ArrayList<>(); while (curNode != -1) { path.add(curNode); curNode = parents.get(curNode); } Collections.reverse(path); + + /* Enrich path with node properties */ + Path.Builder pathBuilder = Path.newBuilder(); for (long nodeId : path) { Node.Builder nodeBuilder = Node.newBuilder(); NodePropertyBuilder.buildNodeProperties(g, nodeDataMask, nodeBuilder, nodeId); pathBuilder.addNode(nodeBuilder.build()); } return pathBuilder.build(); } } + /** + * FindPathBetween searches for a shortest path between a set of source nodes and a set of + * destination nodes. + * + * It does so by performing a *bidirectional breadth-first search*, i.e., two parallel breadth-first + * searches, one from the source set ("src-BFS") and one from the destination set ("dst-BFS"), until + * both searches find a common node that joins their visited sets. This node is called the "midpoint + * node". The path returned is the path src -> ... -> midpoint -> ... -> dst, which is always a + * shortest path between src and dst. + * + * The graph direction of both BFS can be configured separately. By default, the dst-BFS will use + * the graph in the opposite direction than the src-BFS (if direction = FORWARD, by default + * direction_reverse = BACKWARD, and vice-versa). The default behavior is thus to search for a + * shortest path between two nodes in a given direction. However, one can also specify FORWARD or + * BACKWARD for *both* the src-BFS and the dst-BFS. This will search for a common descendant or a + * common ancestor between the two sets, respectively. These will be the midpoints of the returned + * path. + */ static class FindPathBetween extends BFSVisitor { private final FindPathBetweenRequest request; private final NodePropertyBuilder.NodeDataMask nodeDataMask; private final AllowedEdges allowedEdgesSrc; private final AllowedEdges allowedEdgesDst; private final BFSVisitor srcVisitor; private final BFSVisitor dstVisitor; private Long middleNode = null; FindPathBetween(SwhBidirectionalGraph bidirectionalGraph, FindPathBetweenRequest request) { super(getDirectedGraph(bidirectionalGraph, request.getDirection())); this.request = request; this.nodeDataMask = new NodePropertyBuilder.NodeDataMask(request.hasMask() ? request.getMask() : null); GraphDirection direction = request.getDirection(); + // if direction_reverse is not specified, use the opposite direction of direction GraphDirection directionReverse = request.hasDirectionReverse() ? request.getDirectionReverse() : reverseDirection(request.getDirection()); SwhUnidirectionalGraph srcGraph = getDirectedGraph(bidirectionalGraph, direction); SwhUnidirectionalGraph dstGraph = getDirectedGraph(bidirectionalGraph, directionReverse); + this.allowedEdgesSrc = new AllowedEdges(request.hasEdges() ? request.getEdges() : "*"); + /* + * If edges_reverse is not specified: - If `edges` is not specified either, defaults to "*" - If + * direction == direction_reverse, defaults to `edges` - If direction != direction_reverse, defaults + * to the reverse of `edges` (e.g. "rev:dir" becomes "dir:rev"). + */ this.allowedEdgesDst = request.hasEdgesReverse() ? new AllowedEdges(request.getEdgesReverse()) : (request.hasEdges() ? (direction == directionReverse ? new AllowedEdges(request.getEdges()) : new AllowedEdges(request.getEdges()).reverse()) : new AllowedEdges("*")); + /* + * Source sub-visitor. Aborts as soon as it finds a node already visited by the destination + * sub-visitor. + */ this.srcVisitor = new BFSVisitor(srcGraph) { @Override protected ArcLabelledNodeIterator.LabelledArcIterator getSuccessors(long nodeId) { return filterLabelledSuccessors(g, nodeId, allowedEdgesSrc); } @Override public void visitNode(long node) { if (dstVisitor.parents.containsKey(node)) { middleNode = node; throw new StopTraversalException(); } super.visitNode(node); } }; + + /* + * Destination sub-visitor. Aborts as soon as it finds a node already visited by the source + * sub-visitor. + */ this.dstVisitor = new BFSVisitor(dstGraph) { @Override protected ArcLabelledNodeIterator.LabelledArcIterator getSuccessors(long nodeId) { return filterLabelledSuccessors(g, nodeId, allowedEdgesDst); } @Override public void visitNode(long node) { if (srcVisitor.parents.containsKey(node)) { middleNode = node; throw new StopTraversalException(); } super.visitNode(node); } }; if (request.hasMaxDepth()) { this.srcVisitor.setMaxDepth(request.getMaxDepth()); this.dstVisitor.setMaxDepth(request.getMaxDepth()); } if (request.hasMaxEdges()) { this.srcVisitor.setMaxEdges(request.getMaxEdges()); this.dstVisitor.setMaxEdges(request.getMaxEdges()); } request.getSrcList().forEach(srcSwhid -> { long srcNodeId = g.getNodeId(new SWHID(srcSwhid)); srcVisitor.addSource(srcNodeId); }); request.getDstList().forEach(srcSwhid -> { long srcNodeId = g.getNodeId(new SWHID(srcSwhid)); dstVisitor.addSource(srcNodeId); }); } @Override public void visit() { + /* + * Bidirectional BFS: maintain two sub-visitors, and alternately run a visit step in each of them. + */ srcVisitor.visitSetup(); dstVisitor.visitSetup(); while (!srcVisitor.queue.isEmpty() || !dstVisitor.queue.isEmpty()) { if (!srcVisitor.queue.isEmpty()) { srcVisitor.visitStep(); } if (!dstVisitor.queue.isEmpty()) { dstVisitor.visitStep(); } } } public Path getPath() { if (middleNode == null) { - return null; + return null; // No path found. } Path.Builder pathBuilder = Path.newBuilder(); ArrayList path = new ArrayList<>(); + + /* First section of the path: src -> midpoint */ long curNode = middleNode; while (curNode != -1) { path.add(curNode); curNode = srcVisitor.parents.get(curNode); } pathBuilder.setMidpointIndex(path.size() - 1); Collections.reverse(path); + + /* Second section of the path: midpoint -> dst */ curNode = dstVisitor.parents.get(middleNode); while (curNode != -1) { path.add(curNode); curNode = dstVisitor.parents.get(curNode); } + + /* Enrich path with node properties */ for (long nodeId : path) { Node.Builder nodeBuilder = Node.newBuilder(); NodePropertyBuilder.buildNodeProperties(g, nodeDataMask, nodeBuilder, nodeId); pathBuilder.addNode(nodeBuilder.build()); } return pathBuilder.build(); } } public interface NodeObserver { void onNext(Node nodeId); } }