diff --git a/docs/api.rst b/docs/api.rst index 917b571..0fade31 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,227 +1,261 @@ Graph REST API ============== Terminology ----------- This API uses the following notions: - **Node**: a node in the `Software Heritage graph `_, represented by a `persistent identifier `_ (abbreviated as *SWH PID*, or simply *PID*). - **Node type**: the 3-letter specifier from the node PID (``cnt``, ``dir``, ``rel``, ``rev``, ``snp``), or ``*`` for all node types. - **Edge type**: a comma-separated list of ``src:dst`` strings where ``src`` and ``dst`` are node types, or ``*`` for all edge types. Examples ~~~~~~~~ - ``swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2`` the PID of a node of type content containing the full text of the GPL3 license. - ``swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35`` the PID of a node of type revision corresponding to the commit in Linux that merged the 'x86/urgent' branch on 31 December 2017. - ``"dir:dir,dir:cnt"`` node types allowing edges from directories to directories nodes, or directories to contents nodes. - ``"rev:rev,dir:*"`` node types allowing edges from revisions to revisions nodes, or from directories nodes. - ``"*:rel"`` node types allowing all edges to releases. Leaves ------ .. http:get:: /graph/leaves/:src Performs a graph traversal and returns the leaves of the subgraph rooted at the specified source node. :param string src: source node specified as a SWH PID :query string edges: edges types the traversal can follow; default to ``"*"`` :query string direction: direction in which graph edges will be followed; can be either ``forward`` or ``backward``, default to ``forward`` :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found .. sourcecode:: http HTTP/1.1 200 OK Content-Type: text/plain Transfer-Encoding: chunked swh:1:cnt:669ac7c32292798644b21dbb5a0dc657125f444d swh:1:cnt:da4cb28febe66172a9fdf1a235525ae6c00cde1d", ... Neighbors --------- .. http:get:: /graph/neighbors/:src Returns node direct neighbors (linked with exactly one edge) in the graph. :param string src: source node specified as a SWH PID :query string edges: edges types allowed to be listed as neighbors; default to ``"*"`` :query string direction: direction in which graph edges will be followed; can be either ``forward`` or ``backward``, default to ``forward`` :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found .. sourcecode:: http HTTP/1.1 200 OK Content-Type: text/plain Transfer-Encoding: chunked swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2 swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505 ... Walk ---- .. http:get:: /graph/walk/:src/:dst Performs a graph traversal and returns the first found path from source to destination (final destination node included). :param string src: starting node specified as a SWH PID :param string dst: destination node, either as a node PID or a node type. The traversal will stop at the first node encountered matching the desired destination. :query string edges: edges types the traversal can follow; default to ``"*"`` :query string traversal: traversal algorithm; can be either ``dfs`` or ``bfs``, default to ``dfs`` :query string direction: direction in which graph edges will be followed; can be either ``forward`` or ``backward``, default to ``forward`` :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found .. sourcecode:: http HTTP/1.1 200 OK Content-Type: text/plain Transfer-Encoding: chunked swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35 swh:1:rev:52c90f2d32bfa7d6eccd66a56c44ace1f78fbadd swh:1:rev:cea92e843e40452c08ba313abc39f59efbb4c29c swh:1:rev:8d517bdfb57154b8a11d7f1682ecc0f79abf8e02 ... + +.. http:get:: /graph/randomwalk/:src/:dst + + Performs a graph *random* traversal, i.e., picking one random successor + node at each hop, from source to destination (final destination node + included). + + :param string src: starting node specified as a SWH PID + :param string dst: destination node, either as a node PID or a node type. + The traversal will stop at the first node encountered matching the + desired destination. + + :query string edges: edges types the traversal can follow; default to + ``"*"`` + :query string direction: direction in which graph edges will be followed; + can be either ``forward`` or ``backward``, default to ``forward`` + + :statuscode 200: success + :statuscode 400: invalid query string provided + :statuscode 404: starting node cannot be found + + .. sourcecode:: http + + HTTP/1.1 200 OK + Content-Type: text/plain + Transfer-Encoding: chunked + + swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35 + swh:1:rev:52c90f2d32bfa7d6eccd66a56c44ace1f78fbadd + swh:1:rev:cea92e843e40452c08ba313abc39f59efbb4c29c + swh:1:rev:8d517bdfb57154b8a11d7f1682ecc0f79abf8e02 + ... + + Visit ----- .. http:get:: /graph/visit/nodes/:src .. http:get:: /graph/visit/paths/:src Performs a graph traversal and returns explored nodes or paths (in the order of the traversal). :param string src: starting node specified as a SWH PID :query string edges: edges types the traversal can follow; default to ``"*"`` :query string direction: direction in which graph edges will be followed; can be either ``forward`` or ``backward``, default to ``forward`` :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found .. sourcecode:: http GET /graph/visit/nodes/... HTTP/1.1 200 OK Content-Type: text/plain Transfer-Encoding: chunked swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35 swh:1:rev:52c90f2d32bfa7d6eccd66a56c44ace1f78fbadd ... swh:1:rev:a31e58e129f73ab5b04016330b13ed51fde7a961 ... .. sourcecode:: http GET /graph/visit/paths/... HTTP/1.1 200 OK Content-Type: application/x-ndjson Transfer-Encoding: chunked ["swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35", "swh:1:rev:52c90f2d32bfa7d6eccd66a56c44ace1f78fbadd", ...] ["swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35", "swh:1:rev:a31e58e129f73ab5b04016330b13ed51fde7a961", ...] ... Counting results ---------------- The following method variants, with trailing `/count` added, behave like the already discussed methods but, instead of returning results, return the *amount* of results that would have been returned: .. http:get:: /graph/leaves/count/:src .. http:get:: /graph/neighbors/count/:src .. http:get:: /graph/visit/nodes/count/:src Stats ----- .. http:get:: /graph/stats Returns statistics on the compressed graph. :statuscode 200: success .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "counts": { "nodes": 16222788, "edges": 9907464 }, "ratios": { "compression": 0.367, "bits_per_node": 5.846, "bits_per_edge": 9.573, "avg_locality": 270.369 }, "indegree": { "min": 0, "max": 12382, "avg": 0.6107127825377487 }, "outdegree": { "min": 0, "max": 1, "avg": 0.6107127825377487 } } diff --git a/java/src/main/java/org/softwareheritage/graph/Entry.java b/java/src/main/java/org/softwareheritage/graph/Entry.java index 4bfec63..63050ee 100644 --- a/java/src/main/java/org/softwareheritage/graph/Entry.java +++ b/java/src/main/java/org/softwareheritage/graph/Entry.java @@ -1,163 +1,184 @@ package org.softwareheritage.graph; import java.util.ArrayList; import java.io.DataOutputStream; import java.io.FileOutputStream; import java.io.IOException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategy; import org.softwareheritage.graph.algo.NodeIdConsumer; import org.softwareheritage.graph.algo.Stats; import org.softwareheritage.graph.algo.Traversal; public class Entry { private Graph graph; private final long PATH_SEPARATOR_ID = -1; public void load_graph(String graphBasename) throws IOException { System.err.println("Loading graph " + graphBasename + " ..."); this.graph = new Graph(graphBasename); System.err.println("Graph loaded."); } public Graph get_graph() { return graph.copy(); } public String stats() { try { Stats stats = new Stats(graph.getPath()); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); return objectMapper.writeValueAsString(stats); } catch (IOException e) { throw new RuntimeException("Cannot read stats: " + e); } } private interface NodeCountVisitor { void accept(long nodeId, NodeIdConsumer consumer); } private int count_visitor(NodeCountVisitor f, long srcNodeId) { int count[] = { 0 }; f.accept(srcNodeId, (node) -> { count[0]++; }); return count[0]; } public int count_leaves(String direction, String edgesFmt, long srcNodeId) { Traversal t = new Traversal(this.graph.copy(), direction, edgesFmt); return count_visitor(t::leavesVisitor, srcNodeId); } public int count_neighbors(String direction, String edgesFmt, long srcNodeId) { Traversal t = new Traversal(this.graph.copy(), direction, edgesFmt); return count_visitor(t::neighborsVisitor, srcNodeId); } public int count_visit_nodes(String direction, String edgesFmt, long srcNodeId) { Traversal t = new Traversal(this.graph.copy(), direction, edgesFmt); return count_visitor(t::visitNodesVisitor, srcNodeId); } public QueryHandler get_handler(String clientFIFO) { return new QueryHandler(this.graph.copy(), clientFIFO); } public class QueryHandler { Graph graph; DataOutputStream out; String clientFIFO; public QueryHandler(Graph graph, String clientFIFO) { this.graph = graph; this.clientFIFO = clientFIFO; this.out = null; } public void writeNode(long nodeId) { try { out.writeLong(nodeId); } catch (IOException e) { throw new RuntimeException("Cannot write response to client: " + e); } } public void writePath(ArrayList path) { for (Long nodeId : path) { writeNode(nodeId); } writeNode(PATH_SEPARATOR_ID); } public void open() { try { FileOutputStream file = new FileOutputStream(this.clientFIFO); this.out = new DataOutputStream(file); } catch (IOException e) { throw new RuntimeException("Cannot open client FIFO: " + e); } } public void close() { try { out.close(); } catch (IOException e) { throw new RuntimeException("Cannot write response to client: " + e); } } public void leaves(String direction, String edgesFmt, long srcNodeId) { open(); Traversal t = new Traversal(this.graph, direction, edgesFmt); t.leavesVisitor(srcNodeId, this::writeNode); close(); } public void neighbors(String direction, String edgesFmt, long srcNodeId) { open(); Traversal t = new Traversal(this.graph, direction, edgesFmt); t.neighborsVisitor(srcNodeId, this::writeNode); close(); } public void visit_nodes(String direction, String edgesFmt, long srcNodeId) { open(); Traversal t = new Traversal(this.graph, direction, edgesFmt); t.visitNodesVisitor(srcNodeId, this::writeNode); close(); } public void visit_paths(String direction, String edgesFmt, long srcNodeId) { open(); Traversal t = new Traversal(this.graph, direction, edgesFmt); t.visitPathsVisitor(srcNodeId, this::writePath); close(); } public void walk(String direction, String edgesFmt, String algorithm, long srcNodeId, long dstNodeId) { open(); Traversal t = new Traversal(this.graph, direction, edgesFmt); for (Long nodeId : t.walk(srcNodeId, dstNodeId, algorithm)) { writeNode(nodeId); } close(); } public void walk_type(String direction, String edgesFmt, String algorithm, long srcNodeId, String dst) { open(); Node.Type dstType = Node.Type.fromStr(dst); Traversal t = new Traversal(this.graph, direction, edgesFmt); for (Long nodeId : t.walk(srcNodeId, dstType, algorithm)) { writeNode(nodeId); } close(); } + + public void random_walk(String direction, String edgesFmt, int retries, + long srcNodeId, long dstNodeId) { + open(); + Traversal t = new Traversal(this.graph, direction, edgesFmt); + for (Long nodeId : t.randomWalk(srcNodeId, dstNodeId, retries)) { + writeNode(nodeId); + } + close(); + } + + public void random_walk_type(String direction, String edgesFmt, int retries, + long srcNodeId, String dst) { + open(); + Node.Type dstType = Node.Type.fromStr(dst); + Traversal t = new Traversal(this.graph, direction, edgesFmt); + for (Long nodeId : t.randomWalk(srcNodeId, dstType, retries)) { + writeNode(nodeId); + } + close(); + } } } diff --git a/java/src/main/java/org/softwareheritage/graph/algo/Traversal.java b/java/src/main/java/org/softwareheritage/graph/algo/Traversal.java index 8636549..f9e2554 100644 --- a/java/src/main/java/org/softwareheritage/graph/algo/Traversal.java +++ b/java/src/main/java/org/softwareheritage/graph/algo/Traversal.java @@ -1,361 +1,457 @@ package org.softwareheritage.graph.algo; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.Random; import java.util.Stack; import it.unimi.dsi.bits.LongArrayBitVector; import org.softwareheritage.graph.AllowedEdges; import org.softwareheritage.graph.Endpoint; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Neighbors; import org.softwareheritage.graph.Node; /** * Traversal algorithms on the compressed graph. *

* Internal implementation of the traversal API endpoints. These methods only input/output internal * long ids, which are converted in the {@link Endpoint} higher-level class to Software Heritage * PID. * * @author The Software Heritage developers * @see org.softwareheritage.graph.Endpoint */ public class Traversal { /** Graph used in the traversal */ Graph graph; /** Boolean to specify the use of the transposed graph */ boolean useTransposed; /** Graph edge restriction */ AllowedEdges edges; /** Hash set storing if we have visited a node */ HashSet visited; /** Hash map storing parent node id for each nodes during a traversal */ Map parentNode; /** Number of edges accessed during traversal */ long nbEdgesAccessed; + /** random number generator, for random walks */ + Random rng; + /** * Constructor. * * @param graph graph used in the traversal * @param direction a string (either "forward" or "backward") specifying edge orientation * @param edgesFmt a formatted string describing allowed edges */ public Traversal(Graph graph, String direction, String edgesFmt) { if (!direction.matches("forward|backward")) { throw new IllegalArgumentException("Unknown traversal direction: " + direction); } this.graph = graph; this.useTransposed = (direction.equals("backward")); this.edges = new AllowedEdges(graph, edgesFmt); long nbNodes = graph.getNbNodes(); this.visited = new HashSet<>(); this.parentNode = new HashMap<>(); this.nbEdgesAccessed = 0; + this.rng = new Random(); } /** * Returns number of accessed edges during traversal. * * @return number of edges accessed in last traversal */ public long getNbEdgesAccessed() { return nbEdgesAccessed; } /** * Returns number of accessed nodes during traversal. * * @return number of nodes accessed in last traversal */ public long getNbNodesAccessed() { return this.visited.size(); } /** * Push version of {@link leaves}: will fire passed callback for each leaf. */ public void leavesVisitor(long srcNodeId, NodeIdConsumer cb) { Stack stack = new Stack(); this.nbEdgesAccessed = 0; stack.push(srcNodeId); visited.add(srcNodeId); while (!stack.isEmpty()) { long currentNodeId = stack.pop(); long neighborsCnt = 0; nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { neighborsCnt++; if (!visited.contains(neighborNodeId)) { stack.push(neighborNodeId); visited.add(neighborNodeId); } } if (neighborsCnt == 0) { cb.accept(currentNodeId); } } } /** * Returns the leaves of a subgraph rooted at the specified source node. * * @param srcNodeId source node * @return list of node ids corresponding to the leaves */ public ArrayList leaves(long srcNodeId) { ArrayList nodeIds = new ArrayList(); leavesVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); return nodeIds; } /** * Push version of {@link neighbors}: will fire passed callback on each * neighbor. */ public void neighborsVisitor(long srcNodeId, NodeIdConsumer cb) { this.nbEdgesAccessed = graph.degree(srcNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, srcNodeId)) { cb.accept(neighborNodeId); } } /** * Returns node direct neighbors (linked with exactly one edge). * * @param srcNodeId source node * @return list of node ids corresponding to the neighbors */ public ArrayList neighbors(long srcNodeId) { ArrayList nodeIds = new ArrayList(); neighborsVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); return nodeIds; } /** * Push version of {@link visitNodes}: will fire passed callback on each * visited node. */ public void visitNodesVisitor(long srcNodeId, NodeIdConsumer cb) { Stack stack = new Stack(); this.nbEdgesAccessed = 0; stack.push(srcNodeId); visited.add(srcNodeId); while (!stack.isEmpty()) { long currentNodeId = stack.pop(); cb.accept(currentNodeId); nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { if (!visited.contains(neighborNodeId)) { stack.push(neighborNodeId); visited.add(neighborNodeId); } } } } /** * Performs a graph traversal and returns explored nodes. * * @param srcNodeId source node * @return list of explored node ids */ public ArrayList visitNodes(long srcNodeId) { ArrayList nodeIds = new ArrayList(); visitNodesVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); return nodeIds; } /** * Push version of {@link visitPaths}: will fire passed callback on each * discovered (complete) path. */ public void visitPathsVisitor(long srcNodeId, PathConsumer cb) { Stack currentPath = new Stack(); this.nbEdgesAccessed = 0; visitPathsInternalVisitor(srcNodeId, currentPath, cb); } /** * Performs a graph traversal and returns explored paths. * * @param srcNodeId source node * @return list of explored paths (represented as a list of node ids) */ public ArrayList> visitPaths(long srcNodeId) { ArrayList> paths = new ArrayList<>(); visitPathsVisitor(srcNodeId, (path) -> paths.add(path)); return paths; } private void visitPathsInternalVisitor(long currentNodeId, Stack currentPath, PathConsumer cb) { currentPath.push(currentNodeId); long visitedNeighbors = 0; nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { visitPathsInternalVisitor(neighborNodeId, currentPath, cb); visitedNeighbors++; } if (visitedNeighbors == 0) { ArrayList path = new ArrayList(); for (long nodeId : currentPath) { path.add(nodeId); } cb.accept(path); } currentPath.pop(); } /** - * Performs a graph traversal and returns the first found path from source to destination. + * Performs a graph traversal with backtracking, and returns the first + * found path from source to destination. * * @param srcNodeId source node * @param dst destination (either a node or a node type) * @return found path as a list of node ids */ - public ArrayList walk(long srcNodeId, T dst, String algorithm) { + public ArrayList walk(long srcNodeId, T dst, String visitOrder) { long dstNodeId = -1; - if (algorithm.equals("dfs")) { - dstNodeId = walkInternalDfs(srcNodeId, dst); - } else if (algorithm.equals("bfs")) { - dstNodeId = walkInternalBfs(srcNodeId, dst); + if (visitOrder.equals("dfs")) { + dstNodeId = walkInternalDFS(srcNodeId, dst); + } else if (visitOrder.equals("bfs")) { + dstNodeId = walkInternalBFS(srcNodeId, dst); } else { - throw new IllegalArgumentException("Unknown traversal algorithm: " + algorithm); + throw new IllegalArgumentException("Unknown visit order: " + visitOrder); } if (dstNodeId == -1) { - throw new IllegalArgumentException("Unable to find destination point: " + dst); + throw new IllegalArgumentException("Cannot find destination: " + dst); } ArrayList nodeIds = backtracking(srcNodeId, dstNodeId); return nodeIds; } + /** + * Performs a random walk (picking a random successor at each step) from + * source to destination. + * + * @param srcNodeId source node + * @param dst destination (either a node or a node type) + * @return found path as a list of node ids or an empty path to indicate + * that no suitable path have been found + */ + public ArrayList randomWalk(long srcNodeId, T dst) { + return randomWalk(srcNodeId, dst, 0); + } + + /** + * Performs a stubborn random walk (picking a random successor at each + * step) from source to destination. The walk is "stubborn" in the sense + * that it will not give up the first time if a satisfying target node is + * found, but it will retry up to a limited amount of times. + * + * @param srcNodeId source node + * @param dst destination (either a node or a node type) + * @param retries number of times to retry; 0 means no retries (single walk) + * @return found path as a list of node ids or an empty path to indicate + * that no suitable path have been found + */ + public ArrayList randomWalk(long srcNodeId, T dst, int retries) { + ArrayList path = new ArrayList(); + this.nbEdgesAccessed = 0; + long curNodeId = srcNodeId; + boolean found; + + if (retries < 0) { + throw new IllegalArgumentException("Negative number of retries given: " + retries); + } + + while (true) { + long nbNeighbors = graph.degree(curNodeId, useTransposed); + if (nbNeighbors == 0) { + found = false; + break; + } + Neighbors neighbors = new Neighbors(graph, useTransposed, edges, curNodeId); + Iterator successors = neighbors.iterator(); + + curNodeId = randomPick(successors, nbNeighbors); + path.add(curNodeId); + + if (isDstNode(curNodeId, dst)) { + found = true; + break; + } + } + + if (found) { + return path; + } else if (retries > 0) { // try again + return randomWalk(srcNodeId, dst, retries - 1); + } else { // not found and no retries left + path.clear(); + return path; + } + } + + /** + * Randomly choose an element from an iterator + * + * @param elements iterator over selection domain + * @param lenght total length of elements iterated upon + * @return randomly chosen element + */ + private T randomPick(Iterator elements, long length) { + long elementsToSkip = Math.round(rng.nextFloat() * (length - 1)); + long skippedElements = -1; + T e; + + while (elements.hasNext()) { + e = elements.next(); + skippedElements++; + this.nbEdgesAccessed++; + if (skippedElements < elementsToSkip) { + continue; + } else { + return e; + } + } + + throw new IllegalStateException("Skipped past all available elements"); + } + /** * Internal DFS function of {@link #walk}. * * @param srcNodeId source node * @param dst destination (either a node or a node type) * @return final destination node or -1 if no path found */ - private long walkInternalDfs(long srcNodeId, T dst) { + private long walkInternalDFS(long srcNodeId, T dst) { Stack stack = new Stack(); this.nbEdgesAccessed = 0; stack.push(srcNodeId); visited.add(srcNodeId); while (!stack.isEmpty()) { long currentNodeId = stack.pop(); if (isDstNode(currentNodeId, dst)) { return currentNodeId; } nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { if (!visited.contains(neighborNodeId)) { stack.push(neighborNodeId); visited.add(neighborNodeId); parentNode.put(neighborNodeId, currentNodeId); } } } return -1; } /** * Internal BFS function of {@link #walk}. * * @param srcNodeId source node * @param dst destination (either a node or a node type) * @return final destination node or -1 if no path found */ - private long walkInternalBfs(long srcNodeId, T dst) { + private long walkInternalBFS(long srcNodeId, T dst) { Queue queue = new LinkedList(); this.nbEdgesAccessed = 0; queue.add(srcNodeId); visited.add(srcNodeId); while (!queue.isEmpty()) { long currentNodeId = queue.poll(); if (isDstNode(currentNodeId, dst)) { return currentNodeId; } nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { if (!visited.contains(neighborNodeId)) { queue.add(neighborNodeId); visited.add(neighborNodeId); parentNode.put(neighborNodeId, currentNodeId); } } } return -1; } /** * Internal function of {@link #walk} to check if a node corresponds to the destination. * * @param nodeId current node * @param dst destination (either a node or a node type) * @return true if the node is a destination, or false otherwise */ private boolean isDstNode(long nodeId, T dst) { if (dst instanceof Long) { long dstNodeId = (Long) dst; return nodeId == dstNodeId; } else if (dst instanceof Node.Type) { Node.Type dstType = (Node.Type) dst; return graph.getNodeType(nodeId) == dstType; } else { return false; } } /** * Internal backtracking function of {@link #walk}. * * @param srcNodeId source node * @param dstNodeId destination node * @return the found path, as a list of node ids */ private ArrayList backtracking(long srcNodeId, long dstNodeId) { ArrayList path = new ArrayList(); long currentNodeId = dstNodeId; while (currentNodeId != srcNodeId) { path.add(currentNodeId); currentNodeId = parentNode.get(currentNodeId); } path.add(srcNodeId); Collections.reverse(path); return path; } } diff --git a/swh/graph/backend.py b/swh/graph/backend.py index 0f96a40..3fdcb5c 100644 --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -1,211 +1,221 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import io import logging import os import pathlib import struct import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway from swh.graph.pid import NodeToPidMap, PidToNodeMap from swh.model.identifiers import PID_TYPES BUF_SIZE = 64*1024 BIN_FMT = '>q' # 64 bit integer, big endian PATH_SEPARATOR_ID = -1 NODE2PID_EXT = 'node2pid.bin' PID2NODE_EXT = 'pid2node.bin' def find_graph_jar(): """find swh-graph.jar, containing the Java part of swh-graph look both in development directories and installed data (for in-production deployments who fecthed the JAR from pypi) """ swh_graph_root = pathlib.Path(__file__).parents[2] try_paths = [ swh_graph_root / 'java/target/', pathlib.Path(sys.prefix) / 'share/swh-graph/', pathlib.Path(sys.prefix) / 'local/share/swh-graph/', ] for path in try_paths: glob = list(path.glob('swh-graph-*.jar')) if glob: if len(glob) > 1: logging.warn('found multiple swh-graph JARs, ' 'arbitrarily picking one') logging.info('using swh-graph JAR: {0}'.format(glob[0])) return str(glob[0]) raise RuntimeError('swh-graph JAR not found. Have you run `make java`?') def _get_pipe_stderr(): # Get stderr if possible, or pipe to stdout if running with Jupyter. try: sys.stderr.fileno() except io.UnsupportedOperation: return subprocess.STDOUT else: return sys.stderr class Backend: def __init__(self, graph_path): self.gateway = None self.entry = None self.graph_path = graph_path def __enter__(self): # TODO: make all of that configurable with sane defaults java_opts = [ '-Xmx200G', '-server', '-XX:PretenureSizeThreshold=512M', '-XX:MaxNewSize=4G', '-XX:+UseLargePages', '-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB', '-XX:+ResizeTLAB', ] self.gateway = JavaGateway.launch_gateway( java_path=None, javaopts=java_opts, classpath=find_graph_jar(), die_on_exit=True, redirect_stdout=sys.stdout, redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) self.node2pid = NodeToPidMap(self.graph_path + '.' + NODE2PID_EXT) self.pid2node = PidToNodeMap(self.graph_path + '.' + PID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() def stats(self): return self.entry.stats() def count(self, ttype, direction, edges_fmt, src): method = getattr(self.entry, 'count_' + ttype) return method(direction, edges_fmt, src) async def simple_traversal(self, ttype, direction, edges_fmt, src): assert ttype in ('leaves', 'neighbors', 'visit_nodes') method = getattr(self.stream_proxy, ttype) async for node_id in method(direction, edges_fmt, src): yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): if dst in PID_TYPES: it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst) else: it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst) async for node_id in it: yield node_id + async def random_walk(self, direction, edges_fmt, retries, src, dst): + if dst in PID_TYPES: + it = self.stream_proxy.random_walk_type(direction, edges_fmt, + retries, src, dst) + else: + it = self.stream_proxy.random_walk(direction, edges_fmt, retries, + src, dst) + async for node_id in it: # TODO return 404 if path is empty + yield node_id + async def visit_paths(self, direction, edges_fmt, src): path = [] async for node in self.stream_proxy.visit_paths( direction, edges_fmt, src): if node == PATH_SEPARATOR_ID: yield path path = [] else: path.append(node) class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() open_thread = loop.run_in_executor(None, open, fname, 'rb') # Since the open() call on the FIFO is blocking until it is also opened # on the Java side, we await it with a timeout in case there is an # exception that prevents the write-side open(). with (await asyncio.wait_for(open_thread, timeout=2)) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: break for data in struct.iter_unpack(BIN_FMT, data): yield data[0] class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname: cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo') os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) try: async for value in reader: yield value except asyncio.TimeoutError: # If the read-side open() timeouts, an exception on the # Java side probably happened that prevented the # write-side open(). We propagate this exception here if # that is the case. task_exc = java_task.exception() if task_exc: raise task_exc raise await java_task return java_call_iterator diff --git a/swh/graph/client.py b/swh/graph/client.py index 8254fe7..475d91c 100644 --- a/swh/graph/client.py +++ b/swh/graph/client.py @@ -1,106 +1,114 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from swh.core.api import RPCClient class GraphAPIError(Exception): """Graph API Error""" def __str__(self): return ('An unexpected error occurred in the Graph backend: {}' .format(self.args)) class RemoteGraphClient(RPCClient): """Client to the Software Heritage Graph.""" def __init__(self, url, timeout=None): super().__init__( api_exception=GraphAPIError, url=url, timeout=timeout) def raw_verb_lines(self, verb, endpoint, **kwargs): response = self.raw_verb(verb, endpoint, stream=True, **kwargs) for line in response.iter_lines(): yield line.decode().lstrip('\n') def get_lines(self, endpoint, **kwargs): yield from self.raw_verb_lines('get', endpoint, **kwargs) # Web API endpoints def stats(self): return self.get('stats') def leaves(self, src, edges="*", direction="forward"): return self.get_lines( 'leaves/{}'.format(src), params={ 'edges': edges, 'direction': direction }) def neighbors(self, src, edges="*", direction="forward"): return self.get_lines( 'neighbors/{}'.format(src), params={ 'edges': edges, 'direction': direction }) def visit_nodes(self, src, edges="*", direction="forward"): return self.get_lines( 'visit/nodes/{}'.format(src), params={ 'edges': edges, 'direction': direction }) def visit_paths(self, src, edges="*", direction="forward"): def decode_path_wrapper(it): for e in it: yield json.loads(e) return decode_path_wrapper( self.get_lines( 'visit/paths/{}'.format(src), params={ 'edges': edges, 'direction': direction })) def walk(self, src, dst, edges="*", traversal="dfs", direction="forward"): return self.get_lines( 'walk/{}/{}'.format(src, dst), params={ 'edges': edges, 'traversal': traversal, 'direction': direction }) + def random_walk(self, src, dst, edges="*", direction="forward"): + return self.get_lines( + 'randomwalk/{}/{}'.format(src, dst), + params={ + 'edges': edges, + 'direction': direction + }) + def count_leaves(self, src, edges="*", direction="forward"): return self.get( 'leaves/count/{}'.format(src), params={ 'edges': edges, 'direction': direction }) def count_neighbors(self, src, edges="*", direction="forward"): return self.get( 'neighbors/count/{}'.format(src), params={ 'edges': edges, 'direction': direction }) def count_visit_nodes(self, src, edges="*", direction="forward"): return self.get( 'visit/nodes/count/{}'.format(src), params={ 'edges': edges, 'direction': direction }) diff --git a/swh/graph/server/app.py b/swh/graph/server/app.py index e27efe6..44a33e0 100644 --- a/swh/graph/server/app.py +++ b/swh/graph/server/app.py @@ -1,203 +1,216 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using FIFO as a transport to stream integers between the two languages. """ import asyncio import json import aiohttp.web from swh.core.api.asynchronous import RPCServerApp from swh.model.identifiers import PID_TYPES from swh.model.exceptions import ValidationError try: from contextlib import asynccontextmanager except ImportError: # Compatibility with 3.6 backport from async_generator import asynccontextmanager # type: ignore +# maximum number of retries for random walks +RANDOM_RETRIES = 5 # TODO make this configurable via rpc-serve configuration + + @asynccontextmanager async def stream_response(request, content_type='text/plain', *args, **kwargs): response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = content_type await response.prepare(request) yield response await response.write_eof() async def index(request): return aiohttp.web.Response( content_type='text/html', body=""" Software Heritage storage server

You have reached the Software Heritage graph API server.

See its API documentation for more information.

""") async def stats(request): stats = request.app['backend'].stats() return aiohttp.web.Response(body=stats, content_type='application/json') def get_direction(request): """validate HTTP query parameter `direction`""" s = request.query.get('direction', 'forward') if s not in ('forward', 'backward'): raise aiohttp.web.HTTPBadRequest(body=f'invalid direction: {s}') return s def get_edges(request): """validate HTTP query parameter `edges`, i.e., edge restrictions""" s = request.query.get('edges', '*') if any([node_type != '*' and node_type not in PID_TYPES for edge in s.split(':') for node_type in edge.split(',', maxsplit=1)]): raise aiohttp.web.HTTPBadRequest(body=f'invalid edge restriction: {s}') return s def get_traversal(request): """validate HTTP query parameter `traversal`, i.e., visit order""" s = request.query.get('traversal', 'dfs') if s not in ('bfs', 'dfs'): raise aiohttp.web.HTTPBadRequest(body=f'invalid traversal order: {s}') return s def node_of_pid(pid, backend): """lookup a PID in a pid2node map, failing in an HTTP-nice way if needed""" try: return backend.pid2node[pid] except KeyError: raise aiohttp.web.HTTPNotFound(body=f'PID not found: {pid}') except ValidationError: raise aiohttp.web.HTTPBadRequest(body=f'malformed PID: {pid}') def pid_of_node(node, backend): """lookup a node in a node2pid map, failing in an HTTP-nice way if needed """ try: return backend.node2pid[node] except KeyError: raise aiohttp.web.HTTPInternalServerError( body=f'reverse lookup failed for node id: {node}') def get_simple_traversal_handler(ttype): async def simple_traversal(request): backend = request.app['backend'] src = request.match_info['src'] edges = get_edges(request) direction = get_direction(request) src_node = node_of_pid(src, backend) async with stream_response(request) as response: async for res_node in backend.simple_traversal( ttype, direction, edges, src_node ): res_pid = pid_of_node(res_node, backend) await response.write('{}\n'.format(res_pid).encode()) return response return simple_traversal -async def walk(request): - backend = request.app['backend'] +def get_walk_handler(random=False): + async def walk(request): + backend = request.app['backend'] - src = request.match_info['src'] - dst = request.match_info['dst'] - edges = get_edges(request) - direction = get_direction(request) - algo = get_traversal(request) + src = request.match_info['src'] + dst = request.match_info['dst'] + edges = get_edges(request) + direction = get_direction(request) + algo = get_traversal(request) - src_node = node_of_pid(src, backend) - if dst not in PID_TYPES: - dst = node_of_pid(dst, backend) - async with stream_response(request) as response: - async for res_node in backend.walk( - direction, edges, algo, src_node, dst - ): - res_pid = pid_of_node(res_node, backend) - await response.write('{}\n'.format(res_pid).encode()) - return response + src_node = node_of_pid(src, backend) + if dst not in PID_TYPES: + dst = node_of_pid(dst, backend) + async with stream_response(request) as response: + if random: + it = backend.random_walk(direction, edges, RANDOM_RETRIES, + src_node, dst) + else: + it = backend.walk(direction, edges, algo, src_node, dst) + async for res_node in it: + res_pid = pid_of_node(res_node, backend) + await response.write('{}\n'.format(res_pid).encode()) + return response + + return walk async def visit_paths(request): backend = request.app['backend'] src = request.match_info['src'] edges = get_edges(request) direction = get_direction(request) src_node = node_of_pid(src, backend) it = backend.visit_paths(direction, edges, src_node) async with stream_response(request, content_type='application/x-ndjson') \ as response: async for res_path in it: res_path_pid = [pid_of_node(n, backend) for n in res_path] line = json.dumps(res_path_pid) await response.write('{}\n'.format(line).encode()) return response def get_count_handler(ttype): async def count(request): loop = asyncio.get_event_loop() backend = request.app['backend'] src = request.match_info['src'] edges = get_edges(request) direction = get_direction(request) src_node = node_of_pid(src, backend) cnt = await loop.run_in_executor( None, backend.count, ttype, direction, edges, src_node) return aiohttp.web.Response(body=str(cnt), content_type='application/json') return count def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) app.router.add_get('/', index) app.router.add_get('/graph/stats', stats) app.router.add_get('/graph/leaves/{src}', get_simple_traversal_handler('leaves')) app.router.add_get('/graph/neighbors/{src}', get_simple_traversal_handler('neighbors')) app.router.add_get('/graph/visit/nodes/{src}', get_simple_traversal_handler('visit_nodes')) app.router.add_get('/graph/visit/paths/{src}', visit_paths) - app.router.add_get('/graph/walk/{src}/{dst}', walk) + app.router.add_get('/graph/walk/{src}/{dst}', + get_walk_handler(random=False)) + app.router.add_get('/graph/randomwalk/{src}/{dst}', + get_walk_handler(random=True)) app.router.add_get('/graph/neighbors/count/{src}', get_count_handler('neighbors')) app.router.add_get('/graph/leaves/count/{src}', get_count_handler('leaves')) app.router.add_get('/graph/visit/nodes/count/{src}', get_count_handler('visit_nodes')) app['backend'] = backend return app diff --git a/swh/graph/tests/test_api_client.py b/swh/graph/tests/test_api_client.py index 598641c..b4c0d45 100644 --- a/swh/graph/tests/test_api_client.py +++ b/swh/graph/tests/test_api_client.py @@ -1,122 +1,135 @@ def test_stats(graph_client): stats = graph_client.stats() assert set(stats.keys()) == {'counts', 'ratios', 'indegree', 'outdegree'} assert set(stats['counts'].keys()) == {'nodes', 'edges'} assert set(stats['ratios'].keys()) == {'compression', 'bits_per_node', 'bits_per_edge', 'avg_locality'} assert set(stats['indegree'].keys()) == {'min', 'max', 'avg'} assert set(stats['outdegree'].keys()) == {'min', 'max', 'avg'} assert stats['counts']['nodes'] == 21 assert stats['counts']['edges'] == 23 assert isinstance(stats['ratios']['compression'], float) assert isinstance(stats['ratios']['bits_per_node'], float) assert isinstance(stats['ratios']['bits_per_edge'], float) assert isinstance(stats['ratios']['avg_locality'], float) assert stats['indegree']['min'] == 0 assert stats['indegree']['max'] == 3 assert isinstance(stats['indegree']['avg'], float) assert stats['outdegree']['min'] == 0 assert stats['outdegree']['max'] == 3 assert isinstance(stats['outdegree']['avg'], float) def test_leaves(graph_client): actual = list(graph_client.leaves( 'swh:1:ori:0000000000000000000000000000000000000021' )) expected = [ 'swh:1:cnt:0000000000000000000000000000000000000001', 'swh:1:cnt:0000000000000000000000000000000000000004', 'swh:1:cnt:0000000000000000000000000000000000000005', 'swh:1:cnt:0000000000000000000000000000000000000007' ] assert set(actual) == set(expected) def test_neighbors(graph_client): actual = list(graph_client.neighbors( 'swh:1:rev:0000000000000000000000000000000000000009', direction='backward' )) expected = [ 'swh:1:snp:0000000000000000000000000000000000000020', 'swh:1:rel:0000000000000000000000000000000000000010', 'swh:1:rev:0000000000000000000000000000000000000013' ] assert set(actual) == set(expected) def test_visit_nodes(graph_client): actual = list(graph_client.visit_nodes( 'swh:1:rel:0000000000000000000000000000000000000010', edges='rel:rev,rev:rev' )) expected = [ 'swh:1:rel:0000000000000000000000000000000000000010', 'swh:1:rev:0000000000000000000000000000000000000009', 'swh:1:rev:0000000000000000000000000000000000000003' ] assert set(actual) == set(expected) def test_visit_paths(graph_client): actual = list(graph_client.visit_paths( 'swh:1:snp:0000000000000000000000000000000000000020', edges='snp:*,rev:*')) actual = [tuple(path) for path in actual] expected = [ ( 'swh:1:snp:0000000000000000000000000000000000000020', 'swh:1:rev:0000000000000000000000000000000000000009', 'swh:1:rev:0000000000000000000000000000000000000003', 'swh:1:dir:0000000000000000000000000000000000000002' ), ( 'swh:1:snp:0000000000000000000000000000000000000020', 'swh:1:rev:0000000000000000000000000000000000000009', 'swh:1:dir:0000000000000000000000000000000000000008' ), ( 'swh:1:snp:0000000000000000000000000000000000000020', 'swh:1:rel:0000000000000000000000000000000000000010' ) ] assert set(actual) == set(expected) def test_walk(graph_client): actual = list(graph_client.walk( 'swh:1:dir:0000000000000000000000000000000000000016', 'rel', edges='dir:dir,dir:rev,rev:*', direction='backward', traversal='bfs' )) expected = [ 'swh:1:dir:0000000000000000000000000000000000000016', 'swh:1:dir:0000000000000000000000000000000000000017', 'swh:1:rev:0000000000000000000000000000000000000018', 'swh:1:rel:0000000000000000000000000000000000000019' ] assert set(actual) == set(expected) +def test_random_walk(graph_client): + """as the walk is random, we test a visit from a cnt node to the only origin in + the dataset, and only check the final node of the path (i.e., the origin) + + """ + actual = list(graph_client.random_walk( + 'swh:1:cnt:0000000000000000000000000000000000000001', 'ori', + direction='backward', + )) + expected_root = 'swh:1:ori:0000000000000000000000000000000000000021' + assert actual[-1] == expected_root + + def test_count(graph_client): actual = graph_client.count_leaves( 'swh:1:ori:0000000000000000000000000000000000000021' ) assert actual == 4 actual = graph_client.count_visit_nodes( 'swh:1:rel:0000000000000000000000000000000000000010', edges='rel:rev,rev:rev' ) assert actual == 3 actual = graph_client.count_neighbors( 'swh:1:rev:0000000000000000000000000000000000000009', direction='backward' ) assert actual == 3