diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index afb480a..1817a76 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,50 +1,52 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.1.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml - repo: https://gitlab.com/pycqa/flake8 rev: 4.0.1 hooks: - id: flake8 additional_dependencies: [flake8-bugbear==22.3.23] - repo: https://github.com/codespell-project/codespell rev: v2.1.0 hooks: - id: codespell name: Check source code spelling args: ["-L te,wth,alledges"] stages: [commit] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] - repo: https://github.com/PyCQA/isort rev: 5.10.1 hooks: - id: isort - repo: https://github.com/python/black rev: 22.3.0 hooks: - id: black - repo: local hooks: - id: java-coding-style name: java style entry: mvn args: ["-f", "java/pom.xml", "spotless:apply"] pass_filenames: false language: system + +exclude: ^swh/graph/rpc/ diff --git a/Makefile.local b/Makefile.local index 034d1c7..1181cea 100644 --- a/Makefile.local +++ b/Makefile.local @@ -1,14 +1,17 @@ POM_PATH=java/pom.xml java: mvn -f $(POM_PATH) compile assembly:single java-doc: mvn -f $(POM_PATH) javadoc:javadoc java-%: mvn -f $(POM_PATH) $* +protoc: + python -m grpc_tools.protoc -I. --python_out=. --mypy_out=. --grpc_python_out=. swh/graph/rpc/*.proto + clean-java: java-clean .PHONY: java clean-java diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java index 4aecc8b..57a18e1 100644 --- a/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java +++ b/java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java @@ -1,195 +1,203 @@ package org.softwareheritage.graph.rpc; import com.martiansoftware.jsap.*; import io.grpc.Server; -import io.grpc.ServerBuilder; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.channel.ChannelOption; import io.grpc.stub.StreamObserver; import io.grpc.protobuf.services.ProtoReflectionService; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.softwareheritage.graph.SWHID; import org.softwareheritage.graph.SwhBidirectionalGraph; import org.softwareheritage.graph.compress.LabelMapBuilder; import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Server that manages startup/shutdown of a {@code Greeter} server. */ public class GraphServer { private final static Logger logger = LoggerFactory.getLogger(GraphServer.class); private final SwhBidirectionalGraph graph; private final int port; private final int threads; private Server server; public GraphServer(String graphBasename, int port, int threads) throws IOException { // TODO: use loadLabelledMapped() when https://github.com/vigna/webgraph-big/pull/5 is merged this.graph = SwhBidirectionalGraph.loadLabelled(graphBasename, new ProgressLogger(logger)); this.port = port; this.threads = threads; graph.loadContentLength(); graph.loadContentIsSkipped(); graph.loadPersonIds(); graph.loadAuthorTimestamps(); graph.loadCommitterTimestamps(); graph.loadMessages(); graph.loadTagNames(); graph.loadLabelNames(); } private void start() throws IOException { - server = ServerBuilder.forPort(port).executor(Executors.newFixedThreadPool(threads)) - .addService(new TraversalService(graph)).addService(ProtoReflectionService.newInstance()).build() - .start(); + server = NettyServerBuilder.forPort(port).withChildOption(ChannelOption.SO_REUSEADDR, true) + .executor(Executors.newFixedThreadPool(threads)).addService(new TraversalService(graph)) + .addService(ProtoReflectionService.newInstance()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { GraphServer.this.stop(); } catch (InterruptedException e) { e.printStackTrace(System.err); } })); } private void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } } /** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } private static JSAPResult parseArgs(String[] args) { JSAPResult config = null; try { SimpleJSAP jsap = new SimpleJSAP(LabelMapBuilder.class.getName(), "", new Parameter[]{ new FlaggedOption("port", JSAP.INTEGER_PARSER, "50091", JSAP.NOT_REQUIRED, 'p', "port", "The port on which the server should listen."), new FlaggedOption("threads", JSAP.INTEGER_PARSER, "1", JSAP.NOT_REQUIRED, 't', "threads", "The number of concurrent threads. 0 = number of cores."), new UnflaggedOption("graphBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, "Basename of the output graph")}); config = jsap.parse(args); if (jsap.messagePrinted()) { System.exit(1); } } catch (JSAPException e) { e.printStackTrace(); } return config; } /** * Main launches the server from the command line. */ public static void main(String[] args) throws IOException, InterruptedException { JSAPResult config = parseArgs(args); String graphBasename = config.getString("graphBasename"); int port = config.getInt("port"); int threads = config.getInt("threads"); if (threads == 0) { threads = Runtime.getRuntime().availableProcessors(); } final GraphServer server = new GraphServer(graphBasename, port, threads); server.start(); server.blockUntilShutdown(); } static class TraversalService extends TraversalServiceGrpc.TraversalServiceImplBase { SwhBidirectionalGraph graph; public TraversalService(SwhBidirectionalGraph graph) { this.graph = graph; } @Override public void checkSwhid(CheckSwhidRequest request, StreamObserver responseObserver) { - graph.getNodeId(new SWHID(request.getSwhid())); - responseObserver.onNext(CheckSwhidResponse.getDefaultInstance()); + boolean exists = true; + CheckSwhidResponse.Builder builder = CheckSwhidResponse.newBuilder().setExists(true); + try { + graph.getNodeId(new SWHID(request.getSwhid())); + } catch (IllegalArgumentException e) { + builder.setExists(false); + builder.setDetails(e.getMessage()); + } + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } @Override public void stats(StatsRequest request, StreamObserver responseObserver) { StatsResponse.Builder response = StatsResponse.newBuilder(); response.setNumNodes(graph.numNodes()); response.setNumEdges(graph.numArcs()); Properties properties = new Properties(); try { properties.load(new FileInputStream(graph.getPath() + ".properties")); properties.load(new FileInputStream(graph.getPath() + ".stats")); } catch (IOException e) { throw new RuntimeException(e); } response.setCompression(Double.parseDouble(properties.getProperty("compratio"))); response.setBitsPerNode(Double.parseDouble(properties.getProperty("bitspernode"))); response.setBitsPerEdge(Double.parseDouble(properties.getProperty("bitsperlink"))); response.setAvgLocality(Double.parseDouble(properties.getProperty("avglocality"))); response.setIndegreeMin(Long.parseLong(properties.getProperty("minindegree"))); response.setIndegreeMax(Long.parseLong(properties.getProperty("maxindegree"))); response.setIndegreeAvg(Double.parseDouble(properties.getProperty("avgindegree"))); response.setOutdegreeMin(Long.parseLong(properties.getProperty("minoutdegree"))); response.setOutdegreeMax(Long.parseLong(properties.getProperty("maxoutdegree"))); response.setOutdegreeAvg(Double.parseDouble(properties.getProperty("avgoutdegree"))); responseObserver.onNext(response.build()); responseObserver.onCompleted(); } @Override public void traverse(TraversalRequest request, StreamObserver responseObserver) { SwhBidirectionalGraph g = graph.copy(); Traversal.simpleTraversal(g, request, responseObserver::onNext); responseObserver.onCompleted(); } @Override public void countNodes(TraversalRequest request, StreamObserver responseObserver) { AtomicInteger count = new AtomicInteger(0); SwhBidirectionalGraph g = graph.copy(); TraversalRequest fixedReq = TraversalRequest.newBuilder(request) // Ignore return fields, just count nodes .setReturnFields(NodeFields.getDefaultInstance()).build(); Traversal.simpleTraversal(g, fixedReq, (Node node) -> { count.incrementAndGet(); }); CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } @Override public void countEdges(TraversalRequest request, StreamObserver responseObserver) { AtomicInteger count = new AtomicInteger(0); SwhBidirectionalGraph g = graph.copy(); TraversalRequest fixedReq = TraversalRequest.newBuilder(request) // Force return empty successors to count the edges .setReturnFields(NodeFields.newBuilder().setSuccessor(true).setSuccessorSwhid(false).build()) .build(); Traversal.simpleTraversal(g, fixedReq, (Node node) -> { count.addAndGet(node.getSuccessorCount()); }); CountResponse response = CountResponse.newBuilder().setCount(count.get()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } } } diff --git a/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java b/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java index e046476..e696d06 100644 --- a/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java +++ b/java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java @@ -1,275 +1,275 @@ package org.softwareheritage.graph.rpc; import com.google.protobuf.ByteString; import it.unimi.dsi.big.webgraph.LazyLongIterator; import it.unimi.dsi.big.webgraph.labelling.ArcLabelledNodeIterator; import it.unimi.dsi.big.webgraph.labelling.Label; import org.softwareheritage.graph.*; import org.softwareheritage.graph.labels.DirEntry; import java.util.*; public class Traversal { private static LazyLongIterator filterSuccessors(SwhUnidirectionalGraph g, long nodeId, AllowedEdges allowedEdges) { if (allowedEdges.restrictedTo == null) { // All edges are allowed, bypass edge check return g.successors(nodeId); } else { LazyLongIterator allSuccessors = g.successors(nodeId); return new LazyLongIterator() { @Override public long nextLong() { long neighbor; while ((neighbor = allSuccessors.nextLong()) != -1) { if (allowedEdges.isAllowed(g.getNodeType(nodeId), g.getNodeType(neighbor))) { return neighbor; } } return -1; } @Override public long skip(final long n) { long i = 0; while (i < n && nextLong() != -1) i++; return i; } }; } } private static ArcLabelledNodeIterator.LabelledArcIterator filterLabelledSuccessors(SwhUnidirectionalGraph g, long nodeId, AllowedEdges allowedEdges) { if (allowedEdges.restrictedTo == null) { // All edges are allowed, bypass edge check return g.labelledSuccessors(nodeId); } else { ArcLabelledNodeIterator.LabelledArcIterator allSuccessors = g.labelledSuccessors(nodeId); return new ArcLabelledNodeIterator.LabelledArcIterator() { @Override public Label label() { return allSuccessors.label(); } @Override public long nextLong() { long neighbor; while ((neighbor = allSuccessors.nextLong()) != -1) { if (allowedEdges.isAllowed(g.getNodeType(nodeId), g.getNodeType(neighbor))) { return neighbor; } } return -1; } @Override public long skip(final long n) { long i = 0; while (i < n && nextLong() != -1) i++; return i; } }; } } private static class NodeFilterChecker { private final SwhUnidirectionalGraph g; private final NodeFilter filter; private final AllowedNodes allowedNodes; private NodeFilterChecker(SwhUnidirectionalGraph graph, NodeFilter filter) { this.g = graph; this.filter = filter; this.allowedNodes = new AllowedNodes(filter.hasTypes() ? filter.getTypes() : "*"); } public boolean allowed(long nodeId) { if (filter == null) { return true; } if (!this.allowedNodes.isAllowed(g.getNodeType(nodeId))) { return false; } long outdegree = g.outdegree(nodeId); if (filter.hasMinTraversalSuccessors() && outdegree < filter.getMinTraversalSuccessors()) { return false; } if (filter.hasMaxTraversalSuccessors() && outdegree > filter.getMaxTraversalSuccessors()) { return false; } return true; } } public static SwhUnidirectionalGraph getDirectedGraph(SwhBidirectionalGraph g, TraversalRequest request) { switch (request.getDirection()) { case FORWARD: return g.getForwardGraph(); case BACKWARD: return g.getBackwardGraph(); case BOTH: return new SwhUnidirectionalGraph(g.symmetrize(), g.getProperties()); } throw new IllegalArgumentException("Unknown direction: " + request.getDirection()); } public static void simpleTraversal(SwhBidirectionalGraph bidirectionalGraph, TraversalRequest request, NodeObserver nodeObserver) { SwhUnidirectionalGraph g = getDirectedGraph(bidirectionalGraph, request); NodeFilterChecker nodeReturnChecker = new NodeFilterChecker(g, request.getReturnNodes()); AllowedEdges allowedEdges = new AllowedEdges(request.hasEdges() ? request.getEdges() : "*"); Queue queue = new ArrayDeque<>(); HashSet visited = new HashSet<>(); request.getSrcList().forEach(srcSwhid -> { long srcNodeId = g.getNodeId(new SWHID(srcSwhid)); queue.add(srcNodeId); visited.add(srcNodeId); }); queue.add(-1L); // Depth sentinel long edgesAccessed = 0; long currentDepth = 0; while (!queue.isEmpty()) { long curr = queue.poll(); if (curr == -1L) { ++currentDepth; if (!queue.isEmpty()) { queue.add(-1L); } continue; } if (request.hasMaxDepth() && currentDepth > request.getMaxDepth()) { break; } edgesAccessed += g.outdegree(curr); if (request.hasMaxEdges() && edgesAccessed >= request.getMaxEdges()) { break; } Node.Builder nodeBuilder = null; - if (nodeReturnChecker.allowed(curr)) { + if (nodeReturnChecker.allowed(curr) && (!request.hasMinDepth() || currentDepth >= request.getMinDepth())) { nodeBuilder = Node.newBuilder(); buildNodeProperties(g, request.getReturnFields(), nodeBuilder, curr); } ArcLabelledNodeIterator.LabelledArcIterator it = filterLabelledSuccessors(g, curr, allowedEdges); for (long succ; (succ = it.nextLong()) != -1;) { if (!visited.contains(succ)) { queue.add(succ); visited.add(succ); } buildSuccessorProperties(g, request.getReturnFields(), nodeBuilder, curr, succ, it.label()); } if (nodeBuilder != null) { nodeObserver.onNext(nodeBuilder.build()); } } } private static void buildNodeProperties(SwhUnidirectionalGraph graph, NodeFields fields, Node.Builder nodeBuilder, long node) { if (fields == null || !fields.hasSwhid() || fields.getSwhid()) { nodeBuilder.setSwhid(graph.getSWHID(node).toString()); } if (fields == null) { return; } switch (graph.getNodeType(node)) { case CNT: if (fields.hasCntLength()) { nodeBuilder.setCntLength(graph.getContentLength(node)); } if (fields.hasCntIsSkipped()) { nodeBuilder.setCntIsSkipped(graph.isContentSkipped(node)); } break; case REV: if (fields.getRevAuthor()) { nodeBuilder.setRevAuthor(graph.getAuthorId(node)); } if (fields.getRevCommitter()) { nodeBuilder.setRevAuthor(graph.getCommitterId(node)); } if (fields.getRevAuthorDate()) { nodeBuilder.setRevAuthorDate(graph.getAuthorTimestamp(node)); } if (fields.getRevAuthorDateOffset()) { nodeBuilder.setRevAuthorDateOffset(graph.getAuthorTimestampOffset(node)); } if (fields.getRevCommitterDate()) { nodeBuilder.setRevCommitterDate(graph.getCommitterTimestamp(node)); } if (fields.getRevCommitterDateOffset()) { nodeBuilder.setRevCommitterDateOffset(graph.getCommitterTimestampOffset(node)); } if (fields.getRevMessage()) { byte[] msg = graph.getMessage(node); if (msg != null) { nodeBuilder.setRevMessage(ByteString.copyFrom(msg)); } } break; case REL: if (fields.getRelAuthor()) { nodeBuilder.setRelAuthor(graph.getAuthorId(node)); } if (fields.getRelAuthorDate()) { nodeBuilder.setRelAuthorDate(graph.getAuthorTimestamp(node)); } if (fields.getRelAuthorDateOffset()) { nodeBuilder.setRelAuthorDateOffset(graph.getAuthorTimestampOffset(node)); } if (fields.getRelName()) { byte[] msg = graph.getTagName(node); if (msg != null) { nodeBuilder.setRelName(ByteString.copyFrom(msg)); } } if (fields.getRelMessage()) { byte[] msg = graph.getMessage(node); if (msg != null) { nodeBuilder.setRelMessage(ByteString.copyFrom(msg)); } } break; case ORI: if (fields.getOriUrl()) { String url = graph.getUrl(node); if (url != null) { nodeBuilder.setOriUrl(url); } } } } private static void buildSuccessorProperties(SwhUnidirectionalGraph graph, NodeFields fields, Node.Builder nodeBuilder, long src, long dst, Label label) { if (nodeBuilder != null && fields != null && fields.getSuccessor()) { Successor.Builder successorBuilder = Successor.newBuilder(); if (!fields.hasSuccessorSwhid() || fields.getSuccessorSwhid()) { successorBuilder.setSwhid(graph.getSWHID(dst).toString()); } if (fields.getSuccessorLabel()) { DirEntry[] entries = (DirEntry[]) label.get(); for (DirEntry entry : entries) { EdgeLabel.Builder builder = EdgeLabel.newBuilder(); builder.setName(ByteString.copyFrom(graph.getLabelName(entry.filenameId))); builder.setPermission(entry.permission); successorBuilder.addLabel(builder.build()); } } nodeBuilder.addSuccessor(successorBuilder.build()); } } public interface NodeObserver { void onNext(Node nodeId); } } diff --git a/proto/swhgraph.proto b/proto/swhgraph.proto index 957de23..db88a97 100644 --- a/proto/swhgraph.proto +++ b/proto/swhgraph.proto @@ -1,131 +1,134 @@ syntax = "proto3"; option java_multiple_files = true; option java_package = "org.softwareheritage.graph.rpc"; option java_outer_classname = "GraphService"; package swh.graph; service TraversalService { rpc Traverse (TraversalRequest) returns (stream Node); rpc CountNodes (TraversalRequest) returns (CountResponse); rpc CountEdges (TraversalRequest) returns (CountResponse); rpc Stats (StatsRequest) returns (StatsResponse); rpc CheckSwhid (CheckSwhidRequest) returns (CheckSwhidResponse); } enum GraphDirection { FORWARD = 0; BACKWARD = 1; BOTH = 2; } message TraversalRequest { repeated string src = 1; // Traversal options optional GraphDirection direction = 2; optional string edges = 3; optional int64 max_edges = 4; - optional int64 max_depth = 5; - optional NodeFilter return_nodes = 6; - optional NodeFields return_fields = 7; + optional int64 min_depth = 5; + optional int64 max_depth = 6; + optional NodeFilter return_nodes = 7; + optional NodeFields return_fields = 8; } message NodeFilter { optional string types = 1; optional int64 min_traversal_successors = 2; optional int64 max_traversal_successors = 3; } message NodeFields { optional bool swhid = 1; optional bool successor = 2; optional bool successor_swhid = 3; optional bool successor_label = 4; optional bool cnt_length = 5; optional bool cnt_is_skipped = 6; optional bool rev_author = 7; optional bool rev_author_date = 8; optional bool rev_author_date_offset = 9; optional bool rev_committer = 10; optional bool rev_committer_date = 11; optional bool rev_committer_date_offset = 12; optional bool rev_message = 13; optional bool rel_author = 14; optional bool rel_author_date = 15; optional bool rel_author_date_offset = 16; optional bool rel_name = 17; optional bool rel_message = 18; optional bool ori_url = 19; } message Node { string swhid = 1; repeated Successor successor = 2; optional int64 cnt_length = 3; optional bool cnt_is_skipped = 4; optional int64 rev_author = 5; optional int64 rev_author_date = 6; optional int32 rev_author_date_offset = 7; optional int64 rev_committer = 8; optional int64 rev_committer_date = 9; optional int32 rev_committer_date_offset = 10; optional bytes rev_message = 11; optional int64 rel_author = 12; optional int64 rel_author_date = 13; optional int32 rel_author_date_offset = 14; optional bytes rel_name = 15; optional bytes rel_message = 16; optional string ori_url = 17; } message Successor { optional string swhid = 1; repeated EdgeLabel label = 2; } message EdgeLabel { bytes name = 1; int32 permission = 2; } message CountResponse { int64 count = 1; } message StatsRequest { } message StatsResponse { int64 num_nodes = 1; int64 num_edges = 2; double compression = 3; double bits_per_node = 4; double bits_per_edge = 5; double avg_locality = 6; int64 indegree_min = 7; int64 indegree_max = 8; double indegree_avg = 9; int64 outdegree_min = 10; int64 outdegree_max = 11; double outdegree_avg = 12; } message CheckSwhidRequest { string swhid = 1; } message CheckSwhidResponse { + bool exists = 1; + string details = 2; } diff --git a/pyproject.toml b/pyproject.toml index 69b8f4d..8c8af87 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,16 @@ [tool.black] target-version = ['py37'] +extend-exclude = ''' +/( + | swh/graph/rpc +)/ +''' [tool.isort] multi_line_output = 3 include_trailing_comma = true force_grid_wrap = 0 use_parentheses = true ensure_newline_before_comments = true line_length = 88 force_sort_within_sections = true diff --git a/requirements-test.txt b/requirements-test.txt index 183b542..8bf4285 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,6 +1,7 @@ pytest pytest-asyncio types-click types-pyyaml types-requests +types-protobuf diff --git a/requirements.txt b/requirements.txt index c038073..3983067 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ aiohttp click py4j psutil +grpcio-tools +mypy-protobuf diff --git a/swh/graph/naive_client.py b/swh/graph/naive_client.py index 3665ff3..bf4733b 100644 --- a/swh/graph/naive_client.py +++ b/swh/graph/naive_client.py @@ -1,399 +1,395 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import inspect import re import statistics from typing import ( Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar, Union, ) from swh.model.swhids import CoreSWHID, ExtendedSWHID, ValidationError from .client import GraphArgumentException _NODE_TYPES = "ori|snp|rel|rev|dir|cnt" NODES_RE = re.compile(rf"(\*|{_NODE_TYPES})") EDGES_RE = re.compile(rf"(\*|{_NODE_TYPES}):(\*|{_NODE_TYPES})") T = TypeVar("T", bound=Callable) SWHIDlike = Union[CoreSWHID, ExtendedSWHID, str] def check_arguments(f: T) -> T: """Decorator for generic argument checking for methods of NaiveClient. Checks ``src`` is a valid and known SWHID, and ``edges`` has the right format.""" signature = inspect.signature(f) @functools.wraps(f) def newf(*args, **kwargs): __tracebackhide__ = True # for pytest try: bound_args = signature.bind(*args, **kwargs) except TypeError as e: # rethrow the exception from here so pytest doesn't flood the terminal # with signature.bind's call stack. raise TypeError(*e.args) from None self = bound_args.arguments["self"] src = bound_args.arguments.get("src") if src: self._check_swhid(src) edges = bound_args.arguments.get("edges") if edges: if edges != "*" and not EDGES_RE.match(edges): raise GraphArgumentException(f"invalid edge restriction: {edges}") return_types = bound_args.arguments.get("return_types") if return_types: if not NODES_RE.match(return_types): raise GraphArgumentException( f"invalid return_types restriction: {return_types}" ) return f(*args, **kwargs) return newf # type: ignore def filter_node_types(node_types: str, nodes: Iterable[str]) -> Iterator[str]: if node_types == "*": yield from nodes else: prefixes = tuple(f"swh:1:{type_}:" for type_ in node_types.split(",")) for node in nodes: if node.startswith(prefixes): yield node class NaiveClient: """An alternative implementation of :class:`swh.graph.backend.Backend`, written in pure-python and meant for simulating it in other components' test cases; constructed from a list of nodes and (directed) edges, both represented as SWHIDs. It is NOT meant to be efficient in any way; only to be a very simple implementation that provides the same behavior. >>> nodes = [ ... "swh:1:rev:1111111111111111111111111111111111111111", ... "swh:1:rev:2222222222222222222222222222222222222222", ... "swh:1:rev:3333333333333333333333333333333333333333", ... ] >>> edges = [ ... ( ... "swh:1:rev:1111111111111111111111111111111111111111", ... "swh:1:rev:2222222222222222222222222222222222222222", ... ), ... ( ... "swh:1:rev:2222222222222222222222222222222222222222", ... "swh:1:rev:3333333333333333333333333333333333333333", ... ), ... ] >>> c = NaiveClient(nodes=nodes, edges=edges) >>> list(c.leaves("swh:1:rev:1111111111111111111111111111111111111111")) ['swh:1:rev:3333333333333333333333333333333333333333'] """ def __init__( self, *, nodes: List[SWHIDlike], edges: List[Tuple[SWHIDlike, SWHIDlike]] ): self.graph = Graph(nodes, edges) def _check_swhid(self, swhid): try: ExtendedSWHID.from_string(swhid) except ValidationError as e: raise GraphArgumentException(*e.args) from None if swhid not in self.graph.nodes: raise GraphArgumentException(f"SWHID not found: {swhid}") def stats(self) -> Dict: return { - "counts": { - "nodes": len(self.graph.nodes), - "edges": sum(map(len, self.graph.forward_edges.values())), - }, - "ratios": { - "compression": 1.0, - "bits_per_edge": 100.0, - "bits_per_node": 100.0, - "avg_locality": 0.0, - }, - "indegree": { - "min": min(map(len, self.graph.backward_edges.values())), - "max": max(map(len, self.graph.backward_edges.values())), - "avg": statistics.mean(map(len, self.graph.backward_edges.values())), - }, - "outdegree": { - "min": min(map(len, self.graph.forward_edges.values())), - "max": max(map(len, self.graph.forward_edges.values())), - "avg": statistics.mean(map(len, self.graph.forward_edges.values())), - }, + "num_nodes": len(self.graph.nodes), + "num_edges": sum(map(len, self.graph.forward_edges.values())), + "compression": 1.0, + "bits_per_edge": 100.0, + "bits_per_node": 100.0, + "avg_locality": 0.0, + "indegree_min": min(map(len, self.graph.backward_edges.values())), + "indegree_max": max(map(len, self.graph.backward_edges.values())), + "indegree_avg": statistics.mean( + map(len, self.graph.backward_edges.values()) + ), + "outdegree_min": min(map(len, self.graph.forward_edges.values())), + "outdegree_max": max(map(len, self.graph.forward_edges.values())), + "outdegree_avg": statistics.mean( + map(len, self.graph.forward_edges.values()) + ), } @check_arguments def leaves( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, [ node for node in self.graph.get_subgraph(src, edges, direction) if not self.graph.get_filtered_neighbors(node, edges, direction) ], ) @check_arguments def neighbors( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, self.graph.get_filtered_neighbors(src, edges, direction) ) @check_arguments def visit_nodes( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, self.graph.get_subgraph(src, edges, direction) ) @check_arguments def visit_edges( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 ) -> Iterator[Tuple[str, str]]: if max_edges == 0: max_edges = None # type: ignore else: max_edges -= 1 yield from list(self.graph.iter_edges_dfs(direction, edges, src))[:max_edges] @check_arguments def visit_paths( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 ) -> Iterator[List[str]]: # TODO: max_edges for path in self.graph.iter_paths_dfs(direction, edges, src): if path[-1] in self.leaves(src, edges, direction): yield list(path) @check_arguments def walk( self, src: str, dst: str, edges: str = "*", traversal: str = "dfs", direction: str = "forward", limit: Optional[int] = None, ) -> Iterator[str]: # TODO: implement algo="bfs" # TODO: limit match_path: Callable[[str], bool] if ":" in dst: match_path = dst.__eq__ self._check_swhid(dst) else: match_path = lambda node: node.startswith(f"swh:1:{dst}:") # noqa for path in self.graph.iter_paths_dfs(direction, edges, src): if match_path(path[-1]): if not limit: # 0 or None yield from path elif limit > 0: yield from path[0:limit] else: yield from path[limit:] @check_arguments def random_walk( self, src: str, dst: str, edges: str = "*", direction: str = "forward", limit: Optional[int] = None, ): # TODO: limit yield from self.walk(src, dst, edges, "dfs", direction, limit) @check_arguments def count_leaves( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(list(self.leaves(src, edges, direction))) @check_arguments def count_neighbors( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(self.graph.get_filtered_neighbors(src, edges, direction)) @check_arguments def count_visit_nodes( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(self.graph.get_subgraph(src, edges, direction)) class Graph: def __init__( self, nodes: List[SWHIDlike], edges: List[Tuple[SWHIDlike, SWHIDlike]] ): self.nodes = [str(node) for node in nodes] self.forward_edges: Dict[str, List[str]] = {} self.backward_edges: Dict[str, List[str]] = {} for node in nodes: self.forward_edges[str(node)] = [] self.backward_edges[str(node)] = [] for (src, dst) in edges: self.forward_edges[str(src)].append(str(dst)) self.backward_edges[str(dst)].append(str(src)) def get_filtered_neighbors( self, src: str, edges_fmt: str, direction: str, ) -> Set[str]: if direction == "forward": edges = self.forward_edges elif direction == "backward": edges = self.backward_edges else: raise GraphArgumentException(f"invalid direction: {direction}") neighbors = edges.get(src, []) if edges_fmt == "*": return set(neighbors) else: filtered_neighbors: Set[str] = set() for edges_fmt_item in edges_fmt.split(","): (src_fmt, dst_fmt) = edges_fmt_item.split(":") if src_fmt != "*" and not src.startswith(f"swh:1:{src_fmt}:"): continue if dst_fmt == "*": filtered_neighbors.update(neighbors) else: prefix = f"swh:1:{dst_fmt}:" filtered_neighbors.update( n for n in neighbors if n.startswith(prefix) ) return filtered_neighbors def get_subgraph(self, src: str, edges_fmt: str, direction: str) -> Set[str]: seen = set() to_visit = {src} while to_visit: node = to_visit.pop() seen.add(node) neighbors = set(self.get_filtered_neighbors(node, edges_fmt, direction)) new_nodes = neighbors - seen to_visit.update(new_nodes) return seen def iter_paths_dfs( self, direction: str, edges_fmt: str, src: str ) -> Iterator[Tuple[str, ...]]: for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): yield path + (node,) def iter_edges_dfs( self, direction: str, edges_fmt: str, src: str ) -> Iterator[Tuple[str, str]]: for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): if len(path) > 0: yield (path[-1], node) class SubgraphIterator(Iterator[Tuple[Tuple[str, ...], str]]): def __init__(self, graph: Graph, direction: str, edges_fmt: str, src: str): self.graph = graph self.direction = direction self.edges_fmt = edges_fmt self.seen: Set[str] = set() self.src = src def more_work(self) -> bool: raise NotImplementedError() def pop(self) -> Tuple[Tuple[str, ...], str]: raise NotImplementedError() def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: raise NotImplementedError() def __next__(self) -> Tuple[Tuple[str, ...], str]: # Stores (path, next_node) if not self.more_work(): raise StopIteration() (path, node) = self.pop() new_path = path + (node,) if node not in self.seen: neighbors = self.graph.get_filtered_neighbors( node, self.edges_fmt, self.direction ) # We want to visit the first neighbor first, and to_visit is a stack; # so we need to reversed() the list of neighbors to get it on top # of the stack. for neighbor in reversed(list(neighbors)): self.push(new_path, neighbor) self.seen.add(node) return (path, node) class DfsSubgraphIterator(SubgraphIterator): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.to_visit: List[Tuple[Tuple[str, ...], str]] = [((), self.src)] def more_work(self) -> bool: return bool(self.to_visit) def pop(self) -> Tuple[Tuple[str, ...], str]: return self.to_visit.pop() def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: self.to_visit.append((new_path, neighbor)) diff --git a/swh/graph/rpc/swhgraph.proto b/swh/graph/rpc/swhgraph.proto new file mode 120000 index 0000000..ad5cf50 --- /dev/null +++ b/swh/graph/rpc/swhgraph.proto @@ -0,0 +1 @@ +../../../proto/swhgraph.proto \ No newline at end of file diff --git a/swh/graph/rpc/swhgraph_pb2.py b/swh/graph/rpc/swhgraph_pb2.py new file mode 100644 index 0000000..2c77b08 --- /dev/null +++ b/swh/graph/rpc/swhgraph_pb2.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: swh/graph/rpc/swhgraph.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1cswh/graph/rpc/swhgraph.proto\x12\tswh.graph\"\xf8\x02\n\x10TraversalRequest\x12\x0b\n\x03src\x18\x01 \x03(\t\x12\x31\n\tdirection\x18\x02 \x01(\x0e\x32\x19.swh.graph.GraphDirectionH\x00\x88\x01\x01\x12\x12\n\x05\x65\x64ges\x18\x03 \x01(\tH\x01\x88\x01\x01\x12\x16\n\tmax_edges\x18\x04 \x01(\x03H\x02\x88\x01\x01\x12\x16\n\tmin_depth\x18\x05 \x01(\x03H\x03\x88\x01\x01\x12\x16\n\tmax_depth\x18\x06 \x01(\x03H\x04\x88\x01\x01\x12\x30\n\x0creturn_nodes\x18\x07 \x01(\x0b\x32\x15.swh.graph.NodeFilterH\x05\x88\x01\x01\x12\x31\n\rreturn_fields\x18\x08 \x01(\x0b\x32\x15.swh.graph.NodeFieldsH\x06\x88\x01\x01\x42\x0c\n\n_directionB\x08\n\x06_edgesB\x0c\n\n_max_edgesB\x0c\n\n_min_depthB\x0c\n\n_max_depthB\x0f\n\r_return_nodesB\x10\n\x0e_return_fields\"\xb2\x01\n\nNodeFilter\x12\x12\n\x05types\x18\x01 \x01(\tH\x00\x88\x01\x01\x12%\n\x18min_traversal_successors\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12%\n\x18max_traversal_successors\x18\x03 \x01(\x03H\x02\x88\x01\x01\x42\x08\n\x06_typesB\x1b\n\x19_min_traversal_successorsB\x1b\n\x19_max_traversal_successors\"\x86\x07\n\nNodeFields\x12\x12\n\x05swhid\x18\x01 \x01(\x08H\x00\x88\x01\x01\x12\x16\n\tsuccessor\x18\x02 \x01(\x08H\x01\x88\x01\x01\x12\x1c\n\x0fsuccessor_swhid\x18\x03 \x01(\x08H\x02\x88\x01\x01\x12\x1c\n\x0fsuccessor_label\x18\x04 \x01(\x08H\x03\x88\x01\x01\x12\x17\n\ncnt_length\x18\x05 \x01(\x08H\x04\x88\x01\x01\x12\x1b\n\x0e\x63nt_is_skipped\x18\x06 \x01(\x08H\x05\x88\x01\x01\x12\x17\n\nrev_author\x18\x07 \x01(\x08H\x06\x88\x01\x01\x12\x1c\n\x0frev_author_date\x18\x08 \x01(\x08H\x07\x88\x01\x01\x12#\n\x16rev_author_date_offset\x18\t \x01(\x08H\x08\x88\x01\x01\x12\x1a\n\rrev_committer\x18\n \x01(\x08H\t\x88\x01\x01\x12\x1f\n\x12rev_committer_date\x18\x0b \x01(\x08H\n\x88\x01\x01\x12&\n\x19rev_committer_date_offset\x18\x0c \x01(\x08H\x0b\x88\x01\x01\x12\x18\n\x0brev_message\x18\r \x01(\x08H\x0c\x88\x01\x01\x12\x17\n\nrel_author\x18\x0e \x01(\x08H\r\x88\x01\x01\x12\x1c\n\x0frel_author_date\x18\x0f \x01(\x08H\x0e\x88\x01\x01\x12#\n\x16rel_author_date_offset\x18\x10 \x01(\x08H\x0f\x88\x01\x01\x12\x15\n\x08rel_name\x18\x11 \x01(\x08H\x10\x88\x01\x01\x12\x18\n\x0brel_message\x18\x12 \x01(\x08H\x11\x88\x01\x01\x12\x14\n\x07ori_url\x18\x13 \x01(\x08H\x12\x88\x01\x01\x42\x08\n\x06_swhidB\x0c\n\n_successorB\x12\n\x10_successor_swhidB\x12\n\x10_successor_labelB\r\n\x0b_cnt_lengthB\x11\n\x0f_cnt_is_skippedB\r\n\x0b_rev_authorB\x12\n\x10_rev_author_dateB\x19\n\x17_rev_author_date_offsetB\x10\n\x0e_rev_committerB\x15\n\x13_rev_committer_dateB\x1c\n\x1a_rev_committer_date_offsetB\x0e\n\x0c_rev_messageB\r\n\x0b_rel_authorB\x12\n\x10_rel_author_dateB\x19\n\x17_rel_author_date_offsetB\x0b\n\t_rel_nameB\x0e\n\x0c_rel_messageB\n\n\x08_ori_url\"\x90\x06\n\x04Node\x12\r\n\x05swhid\x18\x01 \x01(\t\x12\'\n\tsuccessor\x18\x02 \x03(\x0b\x32\x14.swh.graph.Successor\x12\x17\n\ncnt_length\x18\x03 \x01(\x03H\x00\x88\x01\x01\x12\x1b\n\x0e\x63nt_is_skipped\x18\x04 \x01(\x08H\x01\x88\x01\x01\x12\x17\n\nrev_author\x18\x05 \x01(\x03H\x02\x88\x01\x01\x12\x1c\n\x0frev_author_date\x18\x06 \x01(\x03H\x03\x88\x01\x01\x12#\n\x16rev_author_date_offset\x18\x07 \x01(\x05H\x04\x88\x01\x01\x12\x1a\n\rrev_committer\x18\x08 \x01(\x03H\x05\x88\x01\x01\x12\x1f\n\x12rev_committer_date\x18\t \x01(\x03H\x06\x88\x01\x01\x12&\n\x19rev_committer_date_offset\x18\n \x01(\x05H\x07\x88\x01\x01\x12\x18\n\x0brev_message\x18\x0b \x01(\x0cH\x08\x88\x01\x01\x12\x17\n\nrel_author\x18\x0c \x01(\x03H\t\x88\x01\x01\x12\x1c\n\x0frel_author_date\x18\r \x01(\x03H\n\x88\x01\x01\x12#\n\x16rel_author_date_offset\x18\x0e \x01(\x05H\x0b\x88\x01\x01\x12\x15\n\x08rel_name\x18\x0f \x01(\x0cH\x0c\x88\x01\x01\x12\x18\n\x0brel_message\x18\x10 \x01(\x0cH\r\x88\x01\x01\x12\x14\n\x07ori_url\x18\x11 \x01(\tH\x0e\x88\x01\x01\x42\r\n\x0b_cnt_lengthB\x11\n\x0f_cnt_is_skippedB\r\n\x0b_rev_authorB\x12\n\x10_rev_author_dateB\x19\n\x17_rev_author_date_offsetB\x10\n\x0e_rev_committerB\x15\n\x13_rev_committer_dateB\x1c\n\x1a_rev_committer_date_offsetB\x0e\n\x0c_rev_messageB\r\n\x0b_rel_authorB\x12\n\x10_rel_author_dateB\x19\n\x17_rel_author_date_offsetB\x0b\n\t_rel_nameB\x0e\n\x0c_rel_messageB\n\n\x08_ori_url\"N\n\tSuccessor\x12\x12\n\x05swhid\x18\x01 \x01(\tH\x00\x88\x01\x01\x12#\n\x05label\x18\x02 \x03(\x0b\x32\x14.swh.graph.EdgeLabelB\x08\n\x06_swhid\"-\n\tEdgeLabel\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x12\n\npermission\x18\x02 \x01(\x05\"\x1e\n\rCountResponse\x12\r\n\x05\x63ount\x18\x01 \x01(\x03\"\x0e\n\x0cStatsRequest\"\x95\x02\n\rStatsResponse\x12\x11\n\tnum_nodes\x18\x01 \x01(\x03\x12\x11\n\tnum_edges\x18\x02 \x01(\x03\x12\x13\n\x0b\x63ompression\x18\x03 \x01(\x01\x12\x15\n\rbits_per_node\x18\x04 \x01(\x01\x12\x15\n\rbits_per_edge\x18\x05 \x01(\x01\x12\x14\n\x0c\x61vg_locality\x18\x06 \x01(\x01\x12\x14\n\x0cindegree_min\x18\x07 \x01(\x03\x12\x14\n\x0cindegree_max\x18\x08 \x01(\x03\x12\x14\n\x0cindegree_avg\x18\t \x01(\x01\x12\x15\n\routdegree_min\x18\n \x01(\x03\x12\x15\n\routdegree_max\x18\x0b \x01(\x03\x12\x15\n\routdegree_avg\x18\x0c \x01(\x01\"\"\n\x11\x43heckSwhidRequest\x12\r\n\x05swhid\x18\x01 \x01(\t\"5\n\x12\x43heckSwhidResponse\x12\x0e\n\x06\x65xists\x18\x01 \x01(\x08\x12\x0f\n\x07\x64\x65tails\x18\x02 \x01(\t*5\n\x0eGraphDirection\x12\x0b\n\x07\x46ORWARD\x10\x00\x12\x0c\n\x08\x42\x41\x43KWARD\x10\x01\x12\x08\n\x04\x42OTH\x10\x02\x32\xdf\x02\n\x10TraversalService\x12:\n\x08Traverse\x12\x1b.swh.graph.TraversalRequest\x1a\x0f.swh.graph.Node0\x01\x12\x43\n\nCountNodes\x12\x1b.swh.graph.TraversalRequest\x1a\x18.swh.graph.CountResponse\x12\x43\n\nCountEdges\x12\x1b.swh.graph.TraversalRequest\x1a\x18.swh.graph.CountResponse\x12:\n\x05Stats\x12\x17.swh.graph.StatsRequest\x1a\x18.swh.graph.StatsResponse\x12I\n\nCheckSwhid\x12\x1c.swh.graph.CheckSwhidRequest\x1a\x1d.swh.graph.CheckSwhidResponseB0\n\x1eorg.softwareheritage.graph.rpcB\x0cGraphServiceP\x01\x62\x06proto3') + +_GRAPHDIRECTION = DESCRIPTOR.enum_types_by_name['GraphDirection'] +GraphDirection = enum_type_wrapper.EnumTypeWrapper(_GRAPHDIRECTION) +FORWARD = 0 +BACKWARD = 1 +BOTH = 2 + + +_TRAVERSALREQUEST = DESCRIPTOR.message_types_by_name['TraversalRequest'] +_NODEFILTER = DESCRIPTOR.message_types_by_name['NodeFilter'] +_NODEFIELDS = DESCRIPTOR.message_types_by_name['NodeFields'] +_NODE = DESCRIPTOR.message_types_by_name['Node'] +_SUCCESSOR = DESCRIPTOR.message_types_by_name['Successor'] +_EDGELABEL = DESCRIPTOR.message_types_by_name['EdgeLabel'] +_COUNTRESPONSE = DESCRIPTOR.message_types_by_name['CountResponse'] +_STATSREQUEST = DESCRIPTOR.message_types_by_name['StatsRequest'] +_STATSRESPONSE = DESCRIPTOR.message_types_by_name['StatsResponse'] +_CHECKSWHIDREQUEST = DESCRIPTOR.message_types_by_name['CheckSwhidRequest'] +_CHECKSWHIDRESPONSE = DESCRIPTOR.message_types_by_name['CheckSwhidResponse'] +TraversalRequest = _reflection.GeneratedProtocolMessageType('TraversalRequest', (_message.Message,), { + 'DESCRIPTOR' : _TRAVERSALREQUEST, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.TraversalRequest) + }) +_sym_db.RegisterMessage(TraversalRequest) + +NodeFilter = _reflection.GeneratedProtocolMessageType('NodeFilter', (_message.Message,), { + 'DESCRIPTOR' : _NODEFILTER, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.NodeFilter) + }) +_sym_db.RegisterMessage(NodeFilter) + +NodeFields = _reflection.GeneratedProtocolMessageType('NodeFields', (_message.Message,), { + 'DESCRIPTOR' : _NODEFIELDS, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.NodeFields) + }) +_sym_db.RegisterMessage(NodeFields) + +Node = _reflection.GeneratedProtocolMessageType('Node', (_message.Message,), { + 'DESCRIPTOR' : _NODE, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.Node) + }) +_sym_db.RegisterMessage(Node) + +Successor = _reflection.GeneratedProtocolMessageType('Successor', (_message.Message,), { + 'DESCRIPTOR' : _SUCCESSOR, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.Successor) + }) +_sym_db.RegisterMessage(Successor) + +EdgeLabel = _reflection.GeneratedProtocolMessageType('EdgeLabel', (_message.Message,), { + 'DESCRIPTOR' : _EDGELABEL, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.EdgeLabel) + }) +_sym_db.RegisterMessage(EdgeLabel) + +CountResponse = _reflection.GeneratedProtocolMessageType('CountResponse', (_message.Message,), { + 'DESCRIPTOR' : _COUNTRESPONSE, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.CountResponse) + }) +_sym_db.RegisterMessage(CountResponse) + +StatsRequest = _reflection.GeneratedProtocolMessageType('StatsRequest', (_message.Message,), { + 'DESCRIPTOR' : _STATSREQUEST, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.StatsRequest) + }) +_sym_db.RegisterMessage(StatsRequest) + +StatsResponse = _reflection.GeneratedProtocolMessageType('StatsResponse', (_message.Message,), { + 'DESCRIPTOR' : _STATSRESPONSE, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.StatsResponse) + }) +_sym_db.RegisterMessage(StatsResponse) + +CheckSwhidRequest = _reflection.GeneratedProtocolMessageType('CheckSwhidRequest', (_message.Message,), { + 'DESCRIPTOR' : _CHECKSWHIDREQUEST, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.CheckSwhidRequest) + }) +_sym_db.RegisterMessage(CheckSwhidRequest) + +CheckSwhidResponse = _reflection.GeneratedProtocolMessageType('CheckSwhidResponse', (_message.Message,), { + 'DESCRIPTOR' : _CHECKSWHIDRESPONSE, + '__module__' : 'swh.graph.rpc.swhgraph_pb2' + # @@protoc_insertion_point(class_scope:swh.graph.CheckSwhidResponse) + }) +_sym_db.RegisterMessage(CheckSwhidResponse) + +_TRAVERSALSERVICE = DESCRIPTOR.services_by_name['TraversalService'] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b'\n\036org.softwareheritage.graph.rpcB\014GraphServiceP\001' + _GRAPHDIRECTION._serialized_start=2841 + _GRAPHDIRECTION._serialized_end=2894 + _TRAVERSALREQUEST._serialized_start=44 + _TRAVERSALREQUEST._serialized_end=420 + _NODEFILTER._serialized_start=423 + _NODEFILTER._serialized_end=601 + _NODEFIELDS._serialized_start=604 + _NODEFIELDS._serialized_end=1506 + _NODE._serialized_start=1509 + _NODE._serialized_end=2293 + _SUCCESSOR._serialized_start=2295 + _SUCCESSOR._serialized_end=2373 + _EDGELABEL._serialized_start=2375 + _EDGELABEL._serialized_end=2420 + _COUNTRESPONSE._serialized_start=2422 + _COUNTRESPONSE._serialized_end=2452 + _STATSREQUEST._serialized_start=2454 + _STATSREQUEST._serialized_end=2468 + _STATSRESPONSE._serialized_start=2471 + _STATSRESPONSE._serialized_end=2748 + _CHECKSWHIDREQUEST._serialized_start=2750 + _CHECKSWHIDREQUEST._serialized_end=2784 + _CHECKSWHIDRESPONSE._serialized_start=2786 + _CHECKSWHIDRESPONSE._serialized_end=2839 + _TRAVERSALSERVICE._serialized_start=2897 + _TRAVERSALSERVICE._serialized_end=3248 +# @@protoc_insertion_point(module_scope) diff --git a/swh/graph/rpc/swhgraph_pb2.pyi b/swh/graph/rpc/swhgraph_pb2.pyi new file mode 100644 index 0000000..c41ba12 --- /dev/null +++ b/swh/graph/rpc/swhgraph_pb2.pyi @@ -0,0 +1,418 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" +import builtins +import google.protobuf.descriptor +import google.protobuf.internal.containers +import google.protobuf.internal.enum_type_wrapper +import google.protobuf.message +import typing +import typing_extensions + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +class _GraphDirection: + ValueType = typing.NewType('ValueType', builtins.int) + V: typing_extensions.TypeAlias = ValueType +class _GraphDirectionEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_GraphDirection.ValueType], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + FORWARD: _GraphDirection.ValueType # 0 + BACKWARD: _GraphDirection.ValueType # 1 + BOTH: _GraphDirection.ValueType # 2 +class GraphDirection(_GraphDirection, metaclass=_GraphDirectionEnumTypeWrapper): + pass + +FORWARD: GraphDirection.ValueType # 0 +BACKWARD: GraphDirection.ValueType # 1 +BOTH: GraphDirection.ValueType # 2 +global___GraphDirection = GraphDirection + + +class TraversalRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + SRC_FIELD_NUMBER: builtins.int + DIRECTION_FIELD_NUMBER: builtins.int + EDGES_FIELD_NUMBER: builtins.int + MAX_EDGES_FIELD_NUMBER: builtins.int + MIN_DEPTH_FIELD_NUMBER: builtins.int + MAX_DEPTH_FIELD_NUMBER: builtins.int + RETURN_NODES_FIELD_NUMBER: builtins.int + RETURN_FIELDS_FIELD_NUMBER: builtins.int + @property + def src(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[typing.Text]: ... + direction: global___GraphDirection.ValueType + """Traversal options""" + + edges: typing.Text + max_edges: builtins.int + min_depth: builtins.int + max_depth: builtins.int + @property + def return_nodes(self) -> global___NodeFilter: ... + @property + def return_fields(self) -> global___NodeFields: ... + def __init__(self, + *, + src: typing.Optional[typing.Iterable[typing.Text]] = ..., + direction: typing.Optional[global___GraphDirection.ValueType] = ..., + edges: typing.Optional[typing.Text] = ..., + max_edges: typing.Optional[builtins.int] = ..., + min_depth: typing.Optional[builtins.int] = ..., + max_depth: typing.Optional[builtins.int] = ..., + return_nodes: typing.Optional[global___NodeFilter] = ..., + return_fields: typing.Optional[global___NodeFields] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_direction",b"_direction","_edges",b"_edges","_max_depth",b"_max_depth","_max_edges",b"_max_edges","_min_depth",b"_min_depth","_return_fields",b"_return_fields","_return_nodes",b"_return_nodes","direction",b"direction","edges",b"edges","max_depth",b"max_depth","max_edges",b"max_edges","min_depth",b"min_depth","return_fields",b"return_fields","return_nodes",b"return_nodes"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_direction",b"_direction","_edges",b"_edges","_max_depth",b"_max_depth","_max_edges",b"_max_edges","_min_depth",b"_min_depth","_return_fields",b"_return_fields","_return_nodes",b"_return_nodes","direction",b"direction","edges",b"edges","max_depth",b"max_depth","max_edges",b"max_edges","min_depth",b"min_depth","return_fields",b"return_fields","return_nodes",b"return_nodes","src",b"src"]) -> None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_direction",b"_direction"]) -> typing.Optional[typing_extensions.Literal["direction"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_edges",b"_edges"]) -> typing.Optional[typing_extensions.Literal["edges"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_max_depth",b"_max_depth"]) -> typing.Optional[typing_extensions.Literal["max_depth"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_max_edges",b"_max_edges"]) -> typing.Optional[typing_extensions.Literal["max_edges"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_min_depth",b"_min_depth"]) -> typing.Optional[typing_extensions.Literal["min_depth"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_return_fields",b"_return_fields"]) -> typing.Optional[typing_extensions.Literal["return_fields"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_return_nodes",b"_return_nodes"]) -> typing.Optional[typing_extensions.Literal["return_nodes"]]: ... +global___TraversalRequest = TraversalRequest + +class NodeFilter(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + TYPES_FIELD_NUMBER: builtins.int + MIN_TRAVERSAL_SUCCESSORS_FIELD_NUMBER: builtins.int + MAX_TRAVERSAL_SUCCESSORS_FIELD_NUMBER: builtins.int + types: typing.Text + min_traversal_successors: builtins.int + max_traversal_successors: builtins.int + def __init__(self, + *, + types: typing.Optional[typing.Text] = ..., + min_traversal_successors: typing.Optional[builtins.int] = ..., + max_traversal_successors: typing.Optional[builtins.int] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_max_traversal_successors",b"_max_traversal_successors","_min_traversal_successors",b"_min_traversal_successors","_types",b"_types","max_traversal_successors",b"max_traversal_successors","min_traversal_successors",b"min_traversal_successors","types",b"types"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_max_traversal_successors",b"_max_traversal_successors","_min_traversal_successors",b"_min_traversal_successors","_types",b"_types","max_traversal_successors",b"max_traversal_successors","min_traversal_successors",b"min_traversal_successors","types",b"types"]) -> None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_max_traversal_successors",b"_max_traversal_successors"]) -> typing.Optional[typing_extensions.Literal["max_traversal_successors"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_min_traversal_successors",b"_min_traversal_successors"]) -> typing.Optional[typing_extensions.Literal["min_traversal_successors"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_types",b"_types"]) -> typing.Optional[typing_extensions.Literal["types"]]: ... +global___NodeFilter = NodeFilter + +class NodeFields(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + SWHID_FIELD_NUMBER: builtins.int + SUCCESSOR_FIELD_NUMBER: builtins.int + SUCCESSOR_SWHID_FIELD_NUMBER: builtins.int + SUCCESSOR_LABEL_FIELD_NUMBER: builtins.int + CNT_LENGTH_FIELD_NUMBER: builtins.int + CNT_IS_SKIPPED_FIELD_NUMBER: builtins.int + REV_AUTHOR_FIELD_NUMBER: builtins.int + REV_AUTHOR_DATE_FIELD_NUMBER: builtins.int + REV_AUTHOR_DATE_OFFSET_FIELD_NUMBER: builtins.int + REV_COMMITTER_FIELD_NUMBER: builtins.int + REV_COMMITTER_DATE_FIELD_NUMBER: builtins.int + REV_COMMITTER_DATE_OFFSET_FIELD_NUMBER: builtins.int + REV_MESSAGE_FIELD_NUMBER: builtins.int + REL_AUTHOR_FIELD_NUMBER: builtins.int + REL_AUTHOR_DATE_FIELD_NUMBER: builtins.int + REL_AUTHOR_DATE_OFFSET_FIELD_NUMBER: builtins.int + REL_NAME_FIELD_NUMBER: builtins.int + REL_MESSAGE_FIELD_NUMBER: builtins.int + ORI_URL_FIELD_NUMBER: builtins.int + swhid: builtins.bool + successor: builtins.bool + successor_swhid: builtins.bool + successor_label: builtins.bool + cnt_length: builtins.bool + cnt_is_skipped: builtins.bool + rev_author: builtins.bool + rev_author_date: builtins.bool + rev_author_date_offset: builtins.bool + rev_committer: builtins.bool + rev_committer_date: builtins.bool + rev_committer_date_offset: builtins.bool + rev_message: builtins.bool + rel_author: builtins.bool + rel_author_date: builtins.bool + rel_author_date_offset: builtins.bool + rel_name: builtins.bool + rel_message: builtins.bool + ori_url: builtins.bool + def __init__(self, + *, + swhid: typing.Optional[builtins.bool] = ..., + successor: typing.Optional[builtins.bool] = ..., + successor_swhid: typing.Optional[builtins.bool] = ..., + successor_label: typing.Optional[builtins.bool] = ..., + cnt_length: typing.Optional[builtins.bool] = ..., + cnt_is_skipped: typing.Optional[builtins.bool] = ..., + rev_author: typing.Optional[builtins.bool] = ..., + rev_author_date: typing.Optional[builtins.bool] = ..., + rev_author_date_offset: typing.Optional[builtins.bool] = ..., + rev_committer: typing.Optional[builtins.bool] = ..., + rev_committer_date: typing.Optional[builtins.bool] = ..., + rev_committer_date_offset: typing.Optional[builtins.bool] = ..., + rev_message: typing.Optional[builtins.bool] = ..., + rel_author: typing.Optional[builtins.bool] = ..., + rel_author_date: typing.Optional[builtins.bool] = ..., + rel_author_date_offset: typing.Optional[builtins.bool] = ..., + rel_name: typing.Optional[builtins.bool] = ..., + rel_message: typing.Optional[builtins.bool] = ..., + ori_url: typing.Optional[builtins.bool] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_cnt_is_skipped",b"_cnt_is_skipped","_cnt_length",b"_cnt_length","_ori_url",b"_ori_url","_rel_author",b"_rel_author","_rel_author_date",b"_rel_author_date","_rel_author_date_offset",b"_rel_author_date_offset","_rel_message",b"_rel_message","_rel_name",b"_rel_name","_rev_author",b"_rev_author","_rev_author_date",b"_rev_author_date","_rev_author_date_offset",b"_rev_author_date_offset","_rev_committer",b"_rev_committer","_rev_committer_date",b"_rev_committer_date","_rev_committer_date_offset",b"_rev_committer_date_offset","_rev_message",b"_rev_message","_successor",b"_successor","_successor_label",b"_successor_label","_successor_swhid",b"_successor_swhid","_swhid",b"_swhid","cnt_is_skipped",b"cnt_is_skipped","cnt_length",b"cnt_length","ori_url",b"ori_url","rel_author",b"rel_author","rel_author_date",b"rel_author_date","rel_author_date_offset",b"rel_author_date_offset","rel_message",b"rel_message","rel_name",b"rel_name","rev_author",b"rev_author","rev_author_date",b"rev_author_date","rev_author_date_offset",b"rev_author_date_offset","rev_committer",b"rev_committer","rev_committer_date",b"rev_committer_date","rev_committer_date_offset",b"rev_committer_date_offset","rev_message",b"rev_message","successor",b"successor","successor_label",b"successor_label","successor_swhid",b"successor_swhid","swhid",b"swhid"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_cnt_is_skipped",b"_cnt_is_skipped","_cnt_length",b"_cnt_length","_ori_url",b"_ori_url","_rel_author",b"_rel_author","_rel_author_date",b"_rel_author_date","_rel_author_date_offset",b"_rel_author_date_offset","_rel_message",b"_rel_message","_rel_name",b"_rel_name","_rev_author",b"_rev_author","_rev_author_date",b"_rev_author_date","_rev_author_date_offset",b"_rev_author_date_offset","_rev_committer",b"_rev_committer","_rev_committer_date",b"_rev_committer_date","_rev_committer_date_offset",b"_rev_committer_date_offset","_rev_message",b"_rev_message","_successor",b"_successor","_successor_label",b"_successor_label","_successor_swhid",b"_successor_swhid","_swhid",b"_swhid","cnt_is_skipped",b"cnt_is_skipped","cnt_length",b"cnt_length","ori_url",b"ori_url","rel_author",b"rel_author","rel_author_date",b"rel_author_date","rel_author_date_offset",b"rel_author_date_offset","rel_message",b"rel_message","rel_name",b"rel_name","rev_author",b"rev_author","rev_author_date",b"rev_author_date","rev_author_date_offset",b"rev_author_date_offset","rev_committer",b"rev_committer","rev_committer_date",b"rev_committer_date","rev_committer_date_offset",b"rev_committer_date_offset","rev_message",b"rev_message","successor",b"successor","successor_label",b"successor_label","successor_swhid",b"successor_swhid","swhid",b"swhid"]) -> None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_cnt_is_skipped",b"_cnt_is_skipped"]) -> typing.Optional[typing_extensions.Literal["cnt_is_skipped"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_cnt_length",b"_cnt_length"]) -> typing.Optional[typing_extensions.Literal["cnt_length"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_ori_url",b"_ori_url"]) -> typing.Optional[typing_extensions.Literal["ori_url"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_author",b"_rel_author"]) -> typing.Optional[typing_extensions.Literal["rel_author"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_author_date",b"_rel_author_date"]) -> typing.Optional[typing_extensions.Literal["rel_author_date"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_author_date_offset",b"_rel_author_date_offset"]) -> typing.Optional[typing_extensions.Literal["rel_author_date_offset"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_message",b"_rel_message"]) -> typing.Optional[typing_extensions.Literal["rel_message"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_name",b"_rel_name"]) -> typing.Optional[typing_extensions.Literal["rel_name"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_author",b"_rev_author"]) -> typing.Optional[typing_extensions.Literal["rev_author"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_author_date",b"_rev_author_date"]) -> typing.Optional[typing_extensions.Literal["rev_author_date"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_author_date_offset",b"_rev_author_date_offset"]) -> typing.Optional[typing_extensions.Literal["rev_author_date_offset"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_committer",b"_rev_committer"]) -> typing.Optional[typing_extensions.Literal["rev_committer"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_committer_date",b"_rev_committer_date"]) -> typing.Optional[typing_extensions.Literal["rev_committer_date"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_committer_date_offset",b"_rev_committer_date_offset"]) -> typing.Optional[typing_extensions.Literal["rev_committer_date_offset"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_message",b"_rev_message"]) -> typing.Optional[typing_extensions.Literal["rev_message"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_successor",b"_successor"]) -> typing.Optional[typing_extensions.Literal["successor"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_successor_label",b"_successor_label"]) -> typing.Optional[typing_extensions.Literal["successor_label"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_successor_swhid",b"_successor_swhid"]) -> typing.Optional[typing_extensions.Literal["successor_swhid"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_swhid",b"_swhid"]) -> typing.Optional[typing_extensions.Literal["swhid"]]: ... +global___NodeFields = NodeFields + +class Node(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + SWHID_FIELD_NUMBER: builtins.int + SUCCESSOR_FIELD_NUMBER: builtins.int + CNT_LENGTH_FIELD_NUMBER: builtins.int + CNT_IS_SKIPPED_FIELD_NUMBER: builtins.int + REV_AUTHOR_FIELD_NUMBER: builtins.int + REV_AUTHOR_DATE_FIELD_NUMBER: builtins.int + REV_AUTHOR_DATE_OFFSET_FIELD_NUMBER: builtins.int + REV_COMMITTER_FIELD_NUMBER: builtins.int + REV_COMMITTER_DATE_FIELD_NUMBER: builtins.int + REV_COMMITTER_DATE_OFFSET_FIELD_NUMBER: builtins.int + REV_MESSAGE_FIELD_NUMBER: builtins.int + REL_AUTHOR_FIELD_NUMBER: builtins.int + REL_AUTHOR_DATE_FIELD_NUMBER: builtins.int + REL_AUTHOR_DATE_OFFSET_FIELD_NUMBER: builtins.int + REL_NAME_FIELD_NUMBER: builtins.int + REL_MESSAGE_FIELD_NUMBER: builtins.int + ORI_URL_FIELD_NUMBER: builtins.int + swhid: typing.Text + @property + def successor(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Successor]: ... + cnt_length: builtins.int + cnt_is_skipped: builtins.bool + rev_author: builtins.int + rev_author_date: builtins.int + rev_author_date_offset: builtins.int + rev_committer: builtins.int + rev_committer_date: builtins.int + rev_committer_date_offset: builtins.int + rev_message: builtins.bytes + rel_author: builtins.int + rel_author_date: builtins.int + rel_author_date_offset: builtins.int + rel_name: builtins.bytes + rel_message: builtins.bytes + ori_url: typing.Text + def __init__(self, + *, + swhid: typing.Text = ..., + successor: typing.Optional[typing.Iterable[global___Successor]] = ..., + cnt_length: typing.Optional[builtins.int] = ..., + cnt_is_skipped: typing.Optional[builtins.bool] = ..., + rev_author: typing.Optional[builtins.int] = ..., + rev_author_date: typing.Optional[builtins.int] = ..., + rev_author_date_offset: typing.Optional[builtins.int] = ..., + rev_committer: typing.Optional[builtins.int] = ..., + rev_committer_date: typing.Optional[builtins.int] = ..., + rev_committer_date_offset: typing.Optional[builtins.int] = ..., + rev_message: typing.Optional[builtins.bytes] = ..., + rel_author: typing.Optional[builtins.int] = ..., + rel_author_date: typing.Optional[builtins.int] = ..., + rel_author_date_offset: typing.Optional[builtins.int] = ..., + rel_name: typing.Optional[builtins.bytes] = ..., + rel_message: typing.Optional[builtins.bytes] = ..., + ori_url: typing.Optional[typing.Text] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_cnt_is_skipped",b"_cnt_is_skipped","_cnt_length",b"_cnt_length","_ori_url",b"_ori_url","_rel_author",b"_rel_author","_rel_author_date",b"_rel_author_date","_rel_author_date_offset",b"_rel_author_date_offset","_rel_message",b"_rel_message","_rel_name",b"_rel_name","_rev_author",b"_rev_author","_rev_author_date",b"_rev_author_date","_rev_author_date_offset",b"_rev_author_date_offset","_rev_committer",b"_rev_committer","_rev_committer_date",b"_rev_committer_date","_rev_committer_date_offset",b"_rev_committer_date_offset","_rev_message",b"_rev_message","cnt_is_skipped",b"cnt_is_skipped","cnt_length",b"cnt_length","ori_url",b"ori_url","rel_author",b"rel_author","rel_author_date",b"rel_author_date","rel_author_date_offset",b"rel_author_date_offset","rel_message",b"rel_message","rel_name",b"rel_name","rev_author",b"rev_author","rev_author_date",b"rev_author_date","rev_author_date_offset",b"rev_author_date_offset","rev_committer",b"rev_committer","rev_committer_date",b"rev_committer_date","rev_committer_date_offset",b"rev_committer_date_offset","rev_message",b"rev_message"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_cnt_is_skipped",b"_cnt_is_skipped","_cnt_length",b"_cnt_length","_ori_url",b"_ori_url","_rel_author",b"_rel_author","_rel_author_date",b"_rel_author_date","_rel_author_date_offset",b"_rel_author_date_offset","_rel_message",b"_rel_message","_rel_name",b"_rel_name","_rev_author",b"_rev_author","_rev_author_date",b"_rev_author_date","_rev_author_date_offset",b"_rev_author_date_offset","_rev_committer",b"_rev_committer","_rev_committer_date",b"_rev_committer_date","_rev_committer_date_offset",b"_rev_committer_date_offset","_rev_message",b"_rev_message","cnt_is_skipped",b"cnt_is_skipped","cnt_length",b"cnt_length","ori_url",b"ori_url","rel_author",b"rel_author","rel_author_date",b"rel_author_date","rel_author_date_offset",b"rel_author_date_offset","rel_message",b"rel_message","rel_name",b"rel_name","rev_author",b"rev_author","rev_author_date",b"rev_author_date","rev_author_date_offset",b"rev_author_date_offset","rev_committer",b"rev_committer","rev_committer_date",b"rev_committer_date","rev_committer_date_offset",b"rev_committer_date_offset","rev_message",b"rev_message","successor",b"successor","swhid",b"swhid"]) -> None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_cnt_is_skipped",b"_cnt_is_skipped"]) -> typing.Optional[typing_extensions.Literal["cnt_is_skipped"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_cnt_length",b"_cnt_length"]) -> typing.Optional[typing_extensions.Literal["cnt_length"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_ori_url",b"_ori_url"]) -> typing.Optional[typing_extensions.Literal["ori_url"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_author",b"_rel_author"]) -> typing.Optional[typing_extensions.Literal["rel_author"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_author_date",b"_rel_author_date"]) -> typing.Optional[typing_extensions.Literal["rel_author_date"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_author_date_offset",b"_rel_author_date_offset"]) -> typing.Optional[typing_extensions.Literal["rel_author_date_offset"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_message",b"_rel_message"]) -> typing.Optional[typing_extensions.Literal["rel_message"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rel_name",b"_rel_name"]) -> typing.Optional[typing_extensions.Literal["rel_name"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_author",b"_rev_author"]) -> typing.Optional[typing_extensions.Literal["rev_author"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_author_date",b"_rev_author_date"]) -> typing.Optional[typing_extensions.Literal["rev_author_date"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_author_date_offset",b"_rev_author_date_offset"]) -> typing.Optional[typing_extensions.Literal["rev_author_date_offset"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_committer",b"_rev_committer"]) -> typing.Optional[typing_extensions.Literal["rev_committer"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_committer_date",b"_rev_committer_date"]) -> typing.Optional[typing_extensions.Literal["rev_committer_date"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_committer_date_offset",b"_rev_committer_date_offset"]) -> typing.Optional[typing_extensions.Literal["rev_committer_date_offset"]]: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_rev_message",b"_rev_message"]) -> typing.Optional[typing_extensions.Literal["rev_message"]]: ... +global___Node = Node + +class Successor(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + SWHID_FIELD_NUMBER: builtins.int + LABEL_FIELD_NUMBER: builtins.int + swhid: typing.Text + @property + def label(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___EdgeLabel]: ... + def __init__(self, + *, + swhid: typing.Optional[typing.Text] = ..., + label: typing.Optional[typing.Iterable[global___EdgeLabel]] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_swhid",b"_swhid","swhid",b"swhid"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_swhid",b"_swhid","label",b"label","swhid",b"swhid"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["_swhid",b"_swhid"]) -> typing.Optional[typing_extensions.Literal["swhid"]]: ... +global___Successor = Successor + +class EdgeLabel(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + NAME_FIELD_NUMBER: builtins.int + PERMISSION_FIELD_NUMBER: builtins.int + name: builtins.bytes + permission: builtins.int + def __init__(self, + *, + name: builtins.bytes = ..., + permission: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["name",b"name","permission",b"permission"]) -> None: ... +global___EdgeLabel = EdgeLabel + +class CountResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + COUNT_FIELD_NUMBER: builtins.int + count: builtins.int + def __init__(self, + *, + count: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["count",b"count"]) -> None: ... +global___CountResponse = CountResponse + +class StatsRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + def __init__(self, + ) -> None: ... +global___StatsRequest = StatsRequest + +class StatsResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + NUM_NODES_FIELD_NUMBER: builtins.int + NUM_EDGES_FIELD_NUMBER: builtins.int + COMPRESSION_FIELD_NUMBER: builtins.int + BITS_PER_NODE_FIELD_NUMBER: builtins.int + BITS_PER_EDGE_FIELD_NUMBER: builtins.int + AVG_LOCALITY_FIELD_NUMBER: builtins.int + INDEGREE_MIN_FIELD_NUMBER: builtins.int + INDEGREE_MAX_FIELD_NUMBER: builtins.int + INDEGREE_AVG_FIELD_NUMBER: builtins.int + OUTDEGREE_MIN_FIELD_NUMBER: builtins.int + OUTDEGREE_MAX_FIELD_NUMBER: builtins.int + OUTDEGREE_AVG_FIELD_NUMBER: builtins.int + num_nodes: builtins.int + num_edges: builtins.int + compression: builtins.float + bits_per_node: builtins.float + bits_per_edge: builtins.float + avg_locality: builtins.float + indegree_min: builtins.int + indegree_max: builtins.int + indegree_avg: builtins.float + outdegree_min: builtins.int + outdegree_max: builtins.int + outdegree_avg: builtins.float + def __init__(self, + *, + num_nodes: builtins.int = ..., + num_edges: builtins.int = ..., + compression: builtins.float = ..., + bits_per_node: builtins.float = ..., + bits_per_edge: builtins.float = ..., + avg_locality: builtins.float = ..., + indegree_min: builtins.int = ..., + indegree_max: builtins.int = ..., + indegree_avg: builtins.float = ..., + outdegree_min: builtins.int = ..., + outdegree_max: builtins.int = ..., + outdegree_avg: builtins.float = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["avg_locality",b"avg_locality","bits_per_edge",b"bits_per_edge","bits_per_node",b"bits_per_node","compression",b"compression","indegree_avg",b"indegree_avg","indegree_max",b"indegree_max","indegree_min",b"indegree_min","num_edges",b"num_edges","num_nodes",b"num_nodes","outdegree_avg",b"outdegree_avg","outdegree_max",b"outdegree_max","outdegree_min",b"outdegree_min"]) -> None: ... +global___StatsResponse = StatsResponse + +class CheckSwhidRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + SWHID_FIELD_NUMBER: builtins.int + swhid: typing.Text + def __init__(self, + *, + swhid: typing.Text = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["swhid",b"swhid"]) -> None: ... +global___CheckSwhidRequest = CheckSwhidRequest + +class CheckSwhidResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + EXISTS_FIELD_NUMBER: builtins.int + DETAILS_FIELD_NUMBER: builtins.int + exists: builtins.bool + details: typing.Text + def __init__(self, + *, + exists: builtins.bool = ..., + details: typing.Text = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["details",b"details","exists",b"exists"]) -> None: ... +global___CheckSwhidResponse = CheckSwhidResponse diff --git a/swh/graph/rpc/swhgraph_pb2_grpc.py b/swh/graph/rpc/swhgraph_pb2_grpc.py new file mode 100644 index 0000000..b906c52 --- /dev/null +++ b/swh/graph/rpc/swhgraph_pb2_grpc.py @@ -0,0 +1,198 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from swh.graph.rpc import swhgraph_pb2 as swh_dot_graph_dot_rpc_dot_swhgraph__pb2 + + +class TraversalServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Traverse = channel.unary_stream( + '/swh.graph.TraversalService/Traverse', + request_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.SerializeToString, + response_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.Node.FromString, + ) + self.CountNodes = channel.unary_unary( + '/swh.graph.TraversalService/CountNodes', + request_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.SerializeToString, + response_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CountResponse.FromString, + ) + self.CountEdges = channel.unary_unary( + '/swh.graph.TraversalService/CountEdges', + request_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.SerializeToString, + response_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CountResponse.FromString, + ) + self.Stats = channel.unary_unary( + '/swh.graph.TraversalService/Stats', + request_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.StatsRequest.SerializeToString, + response_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.StatsResponse.FromString, + ) + self.CheckSwhid = channel.unary_unary( + '/swh.graph.TraversalService/CheckSwhid', + request_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CheckSwhidRequest.SerializeToString, + response_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CheckSwhidResponse.FromString, + ) + + +class TraversalServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Traverse(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CountNodes(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CountEdges(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Stats(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CheckSwhid(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_TraversalServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Traverse': grpc.unary_stream_rpc_method_handler( + servicer.Traverse, + request_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.FromString, + response_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.Node.SerializeToString, + ), + 'CountNodes': grpc.unary_unary_rpc_method_handler( + servicer.CountNodes, + request_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.FromString, + response_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CountResponse.SerializeToString, + ), + 'CountEdges': grpc.unary_unary_rpc_method_handler( + servicer.CountEdges, + request_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.FromString, + response_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CountResponse.SerializeToString, + ), + 'Stats': grpc.unary_unary_rpc_method_handler( + servicer.Stats, + request_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.StatsRequest.FromString, + response_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.StatsResponse.SerializeToString, + ), + 'CheckSwhid': grpc.unary_unary_rpc_method_handler( + servicer.CheckSwhid, + request_deserializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CheckSwhidRequest.FromString, + response_serializer=swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CheckSwhidResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'swh.graph.TraversalService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class TraversalService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Traverse(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/swh.graph.TraversalService/Traverse', + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.SerializeToString, + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.Node.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def CountNodes(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/swh.graph.TraversalService/CountNodes', + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.SerializeToString, + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CountResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def CountEdges(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/swh.graph.TraversalService/CountEdges', + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.TraversalRequest.SerializeToString, + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CountResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def Stats(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/swh.graph.TraversalService/Stats', + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.StatsRequest.SerializeToString, + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.StatsResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def CheckSwhid(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/swh.graph.TraversalService/CheckSwhid', + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CheckSwhidRequest.SerializeToString, + swh_dot_graph_dot_rpc_dot_swhgraph__pb2.CheckSwhidResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/swh/graph/server/app.py b/swh/graph/server/app.py index 767a44c..57ed7b9 100644 --- a/swh/graph/server/app.py +++ b/swh/graph/server/app.py @@ -1,371 +1,363 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using FIFO as a transport to stream integers between the two languages. """ -import asyncio -from collections import deque +import json import os +import subprocess from typing import Optional +import aiohttp.test_utils import aiohttp.web +from google.protobuf import json_format +import grpc from swh.core.api.asynchronous import RPCServerApp from swh.core.config import read as config_read -from swh.graph.backend import Backend +from swh.graph.config import check_config +from swh.graph.rpc.swhgraph_pb2 import ( + CheckSwhidRequest, + NodeFields, + NodeFilter, + StatsRequest, + TraversalRequest, +) +from swh.graph.rpc.swhgraph_pb2_grpc import TraversalServiceStub from swh.model.swhids import EXTENDED_SWHID_TYPES try: from contextlib import asynccontextmanager except ImportError: # Compatibility with 3.6 backport from async_generator import asynccontextmanager # type: ignore # maximum number of retries for random walks RANDOM_RETRIES = 10 # TODO make this configurable via rpc-serve configuration class GraphServerApp(RPCServerApp): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.on_startup.append(self._start_gateway) - self.on_shutdown.append(self._stop_gateway) + self.on_startup.append(self._start) + self.on_shutdown.append(self._stop) @staticmethod - async def _start_gateway(app): - # Equivalent to entering `with app["backend"]:` - app["backend"].start_gateway() + async def _start(app): + app["channel"] = grpc.aio.insecure_channel(app["rpc_url"]) + await app["channel"].__aenter__() + app["rpc_client"] = TraversalServiceStub(app["channel"]) + await app["rpc_client"].Stats(StatsRequest(), wait_for_ready=True) @staticmethod - async def _stop_gateway(app): - # Equivalent to exiting `with app["backend"]:` with no error - app["backend"].stop_gateway() + async def _stop(app): + await app["channel"].__aexit__(None, None, None) + if app.get("local_server"): + app["local_server"].terminate() async def index(request): return aiohttp.web.Response( content_type="text/html", body=""" Software Heritage graph server

You have reached the Software Heritage graph API server.

See its API documentation for more information.

""", ) class GraphView(aiohttp.web.View): """Base class for views working on the graph, with utility functions""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.backend = self.request.app["backend"] + self.rpc_client: TraversalServiceStub = self.request.app["rpc_client"] def get_direction(self): """Validate HTTP query parameter `direction`""" s = self.request.query.get("direction", "forward") if s not in ("forward", "backward"): raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}") - return s + return s.upper() def get_edges(self): """Validate HTTP query parameter `edges`, i.e., edge restrictions""" s = self.request.query.get("edges", "*") if any( [ node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for edge in s.split(":") for node_type in edge.split(",", maxsplit=1) ] ): raise aiohttp.web.HTTPBadRequest(text=f"invalid edge restriction: {s}") return s def get_return_types(self): """Validate HTTP query parameter 'return types', i.e, a set of types which we will filter the query results with""" s = self.request.query.get("return_types", "*") if any( node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for node_type in s.split(",") ): raise aiohttp.web.HTTPBadRequest( text=f"invalid type for filtering res: {s}" ) # if the user puts a star, # then we filter nothing, we don't need the other information if "*" in s: return "*" else: return s def get_traversal(self): """Validate HTTP query parameter `traversal`, i.e., visit order""" s = self.request.query.get("traversal", "dfs") if s not in ("bfs", "dfs"): raise aiohttp.web.HTTPBadRequest(text=f"invalid traversal order: {s}") return s def get_limit(self): """Validate HTTP query parameter `limit`, i.e., number of results""" s = self.request.query.get("limit", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid limit value: {s}") def get_max_edges(self): """Validate HTTP query parameter 'max_edges', i.e., the limit of the number of edges that can be visited""" s = self.request.query.get("max_edges", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}") - def check_swhid(self, swhid): + async def check_swhid(self, swhid): """Validate that the given SWHID exists in the graph""" - try: - self.backend.check_swhid(swhid) - except (NameError, ValueError) as e: - raise aiohttp.web.HTTPBadRequest(text=str(e)) + r = await self.rpc_client.CheckSwhid(CheckSwhidRequest(swhid=swhid)) + if not r.exists: + raise aiohttp.web.HTTPBadRequest(text=str(r.details)) class StreamingGraphView(GraphView): """Base class for views streaming their response line by line.""" content_type = "text/plain" @asynccontextmanager async def response_streamer(self, *args, **kwargs): """Context manager to prepare then close a StreamResponse""" response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = self.content_type await response.prepare(self.request) yield response await response.write_eof() async def get(self): await self.prepare_response() async with self.response_streamer() as self.response_stream: self._buf = [] try: await self.stream_response() finally: await self._flush_buffer() return self.response_stream async def prepare_response(self): """This can be overridden with some setup to be run before the response actually starts streaming. """ pass async def stream_response(self): """Override this to perform the response streaming. Implementations of this should await self.stream_line(line) to write each line. """ raise NotImplementedError async def stream_line(self, line): """Write a line in the response stream.""" self._buf.append(line) if len(self._buf) > 100: await self._flush_buffer() async def _flush_buffer(self): await self.response_stream.write("\n".join(self._buf).encode() + b"\n") self._buf = [] class StatsView(GraphView): """View showing some statistics on the graph""" async def get(self): - stats = self.backend.stats() - return aiohttp.web.Response(body=stats, content_type="application/json") + res = await self.rpc_client.Stats(StatsRequest()) + stats = json_format.MessageToDict( + res, including_default_value_fields=True, preserving_proto_field_name=True + ) + # Int64 fields are serialized as strings by default. + for descriptor in res.DESCRIPTOR.fields: + if descriptor.type == descriptor.TYPE_INT64: + try: + stats[descriptor.name] = int(stats[descriptor.name]) + except KeyError: + pass + json_body = json.dumps(stats, indent=4, sort_keys=True) + return aiohttp.web.Response(body=json_body, content_type="application/json") class SimpleTraversalView(StreamingGraphView): """Base class for views of simple traversals""" - simple_traversal_type: Optional[str] = None - async def prepare_response(self): - self.src = self.request.match_info["src"] - self.edges = self.get_edges() - self.direction = self.get_direction() - self.max_edges = self.get_max_edges() - self.return_types = self.get_return_types() - self.check_swhid(self.src) + src = self.request.match_info["src"] + self.traversal_request = TraversalRequest( + src=[src], + edges=self.get_edges(), + direction=self.get_direction(), + return_nodes=NodeFilter(types=self.get_return_types()), + return_fields=NodeFields(), + ) + if self.get_max_edges(): + self.traversal_request.max_edges = self.get_max_edges() + await self.check_swhid(src) + self.configure_request() + + def configure_request(self): + pass async def stream_response(self): - async for res_line in self.backend.traversal( - self.simple_traversal_type, - self.direction, - self.edges, - self.src, - self.max_edges, - self.return_types, - ): - await self.stream_line(res_line) + async for node in self.rpc_client.Traverse(self.traversal_request): + await self.stream_line(node.swhid) class LeavesView(SimpleTraversalView): - simple_traversal_type = "leaves" + def configure_request(self): + self.traversal_request.return_nodes.max_traversal_successors = 0 class NeighborsView(SimpleTraversalView): - simple_traversal_type = "neighbors" + def configure_request(self): + self.traversal_request.min_depth = 1 + self.traversal_request.max_depth = 1 class VisitNodesView(SimpleTraversalView): - simple_traversal_type = "visit_nodes" + pass class VisitEdgesView(SimpleTraversalView): - simple_traversal_type = "visit_edges" - - -class WalkView(StreamingGraphView): - async def prepare_response(self): - self.src = self.request.match_info["src"] - self.dst = self.request.match_info["dst"] - - self.edges = self.get_edges() - self.direction = self.get_direction() - self.algo = self.get_traversal() - self.limit = self.get_limit() - self.max_edges = self.get_max_edges() - self.return_types = self.get_return_types() - - self.check_swhid(self.src) - if self.dst not in EXTENDED_SWHID_TYPES: - self.check_swhid(self.dst) - - async def get_walk_iterator(self): - return self.backend.traversal( - "walk", - self.direction, - self.edges, - self.algo, - self.src, - self.dst, - self.max_edges, - self.return_types, - ) + def configure_request(self): + self.traversal_request.return_fields.successor = True async def stream_response(self): - it = self.get_walk_iterator() - if self.limit < 0: - queue = deque(maxlen=-self.limit) - async for res_swhid in it: - queue.append(res_swhid) - while queue: - await self.stream_line(queue.popleft()) - else: - count = 0 - async for res_swhid in it: - if self.limit == 0 or count < self.limit: - await self.stream_line(res_swhid) - count += 1 - else: - break - - -class RandomWalkView(WalkView): - def get_walk_iterator(self): - return self.backend.traversal( - "random_walk", - self.direction, - self.edges, - RANDOM_RETRIES, - self.src, - self.dst, - self.max_edges, - self.return_types, - ) + async for node in self.rpc_client.Traverse(self.traversal_request): + for succ in node.successor: + await self.stream_line(node.swhid + " " + succ.swhid) class CountView(GraphView): """Base class for counting views.""" count_type: Optional[str] = None async def get(self): - self.src = self.request.match_info["src"] - self.check_swhid(self.src) - - self.edges = self.get_edges() - self.direction = self.get_direction() - self.max_edges = self.get_max_edges() - - loop = asyncio.get_event_loop() - cnt = await loop.run_in_executor( - None, - self.backend.count, - self.count_type, - self.direction, - self.edges, - self.src, - self.max_edges, + src = self.request.match_info["src"] + self.traversal_request = TraversalRequest( + src=[src], + edges=self.get_edges(), + direction=self.get_direction(), + return_nodes=NodeFilter(types=self.get_return_types()), + return_fields=NodeFields(), + ) + if self.get_max_edges(): + self.traversal_request.max_edges = self.get_max_edges() + self.configure_request() + res = await self.rpc_client.CountNodes(self.traversal_request) + return aiohttp.web.Response( + body=str(res.count), content_type="application/json" ) - return aiohttp.web.Response(body=str(cnt), content_type="application/json") + + def configure_request(self): + pass class CountNeighborsView(CountView): - count_type = "neighbors" + def configure_request(self): + self.traversal_request.min_depth = 1 + self.traversal_request.max_depth = 1 class CountLeavesView(CountView): - count_type = "leaves" + def configure_request(self): + self.traversal_request.return_nodes.max_traversal_successors = 0 class CountVisitNodesView(CountView): - count_type = "visit_nodes" + pass + + +def spawn_java_rpc_server(config, port=None): + if port is None: + port = aiohttp.test_utils.unused_port() + config = check_config(config or {}) + cmd = [ + "java", + "-cp", + config["classpath"], + *config["java_tool_options"].split(), + "org.softwareheritage.graph.rpc.GraphServer", + "--port", + str(port), + config["graph"]["path"], + ] + server = subprocess.Popen(cmd) + rpc_url = f"localhost:{port}" + return server, rpc_url + + +def make_app(config=None, rpc_url=None, **kwargs): + app = GraphServerApp(**kwargs) + if rpc_url is None: + app["local_server"], rpc_url = spawn_java_rpc_server(config) -def make_app(config=None, backend=None, **kwargs): - if (config is None) == (backend is None): - raise ValueError("make_app() expects exactly one of 'config' or 'backend'") - if backend is None: - backend = Backend(graph_path=config["graph"]["path"], config=config["graph"]) - app = GraphServerApp(**kwargs) app.add_routes( [ aiohttp.web.get("/", index), aiohttp.web.get("/graph", index), aiohttp.web.view("/graph/stats", StatsView), aiohttp.web.view("/graph/leaves/{src}", LeavesView), aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), - # temporarily disabled in wait of a proper fix for T1969 - # aiohttp.web.view("/graph/walk/{src}/{dst}", WalkView) - aiohttp.web.view("/graph/randomwalk/{src}/{dst}", RandomWalkView), aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), ] ) - app["backend"] = backend + app["rpc_url"] = rpc_url return app def make_app_from_configfile(): """Load configuration and then build application to run""" config_file = os.environ.get("SWH_CONFIG_FILENAME") config = config_read(config_file) return make_app(config=config) diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py index 09038e7..6554d27 100644 --- a/swh/graph/tests/conftest.py +++ b/swh/graph/tests/conftest.py @@ -1,71 +1,70 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import multiprocessing from pathlib import Path import subprocess from aiohttp.test_utils import TestClient, TestServer, loop_context import pytest from swh.graph.client import RemoteGraphClient from swh.graph.naive_client import NaiveClient SWH_GRAPH_TESTS_ROOT = Path(__file__).parents[0] TEST_GRAPH_PATH = SWH_GRAPH_TESTS_ROOT / "dataset/compressed/example" class GraphServerProcess(multiprocessing.Process): def __init__(self, q, *args, **kwargs): self.q = q super().__init__(*args, **kwargs) def run(self): # Lazy import to allow debian packaging - from swh.graph.backend import Backend from swh.graph.server.app import make_app try: - backend = Backend(graph_path=str(TEST_GRAPH_PATH)) + config = {"graph": {"path": TEST_GRAPH_PATH}} with loop_context() as loop: - app = make_app(backend=backend, debug=True) + app = make_app(config=config, debug=True) client = TestClient(TestServer(app), loop=loop) loop.run_until_complete(client.start_server()) url = client.make_url("/graph/") self.q.put(url) loop.run_forever() except Exception as e: self.q.put(e) @pytest.fixture(scope="module", params=["remote", "naive"]) def graph_client(request): if request.param == "remote": queue = multiprocessing.Queue() server = GraphServerProcess(queue) server.start() res = queue.get() if isinstance(res, Exception): raise res yield RemoteGraphClient(str(res)) server.terminate() else: def zstdcat(*files): p = subprocess.run(["zstdcat", *files], stdout=subprocess.PIPE) return p.stdout.decode() edges_dataset = SWH_GRAPH_TESTS_ROOT / "dataset/edges" edge_files = edges_dataset.glob("*/*.edges.csv.zst") node_files = edges_dataset.glob("*/*.nodes.csv.zst") nodes = set(zstdcat(*node_files).strip().split("\n")) edge_lines = [line.split() for line in zstdcat(*edge_files).strip().split("\n")] edges = [(src, dst) for src, dst, *_ in edge_lines] for src, dst in edges: nodes.add(src) nodes.add(dst) yield NaiveClient(nodes=list(nodes), edges=edges) diff --git a/swh/graph/tests/test_api_client.py b/swh/graph/tests/test_api_client.py index 454c0c5..5eaea60 100644 --- a/swh/graph/tests/test_api_client.py +++ b/swh/graph/tests/test_api_client.py @@ -1,384 +1,373 @@ import hashlib import pytest from pytest import raises from swh.core.api import RemoteException from swh.graph.client import GraphArgumentException TEST_ORIGIN_ID = "swh:1:ori:{}".format( hashlib.sha1(b"https://example.com/swh/graph").hexdigest() ) def test_stats(graph_client): stats = graph_client.stats() - - assert set(stats.keys()) == {"counts", "ratios", "indegree", "outdegree"} - - assert set(stats["counts"].keys()) == {"nodes", "edges"} - assert set(stats["ratios"].keys()) == { - "compression", - "bits_per_node", - "bits_per_edge", - "avg_locality", - } - assert set(stats["indegree"].keys()) == {"min", "max", "avg"} - assert set(stats["outdegree"].keys()) == {"min", "max", "avg"} - - assert stats["counts"]["nodes"] == 21 - assert stats["counts"]["edges"] == 23 - assert isinstance(stats["ratios"]["compression"], float) - assert isinstance(stats["ratios"]["bits_per_node"], float) - assert isinstance(stats["ratios"]["bits_per_edge"], float) - assert isinstance(stats["ratios"]["avg_locality"], float) - assert stats["indegree"]["min"] == 0 - assert stats["indegree"]["max"] == 3 - assert isinstance(stats["indegree"]["avg"], float) - assert stats["outdegree"]["min"] == 0 - assert stats["outdegree"]["max"] == 3 - assert isinstance(stats["outdegree"]["avg"], float) + assert stats["num_nodes"] == 21 + assert stats["num_edges"] == 23 + assert isinstance(stats["compression"], float) + assert isinstance(stats["bits_per_node"], float) + assert isinstance(stats["bits_per_edge"], float) + assert isinstance(stats["avg_locality"], float) + assert stats["indegree_min"] == 0 + assert stats["indegree_max"] == 3 + assert isinstance(stats["indegree_avg"], float) + assert stats["outdegree_min"] == 0 + assert stats["outdegree_max"] == 3 + assert isinstance(stats["outdegree_avg"], float) def test_leaves(graph_client): actual = list(graph_client.leaves(TEST_ORIGIN_ID)) expected = [ "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) def test_neighbors(graph_client): actual = list( graph_client.neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) ) expected = [ "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000013", ] assert set(actual) == set(expected) def test_visit_nodes(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_nodes_filtered(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", return_types="dir", ) ) expected = [ "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ] assert set(actual) == set(expected) def test_visit_nodes_filtered_star(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", return_types="*", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", ] assert set(actual) == set(expected) def test_visit_edges(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] assert set(actual) == set(expected) def test_visit_edges_limited(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", max_edges=4, edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] # As there are four valid answers (up to reordering), we cannot check for # equality. Instead, we check the client returned all edges but one. assert set(actual).issubset(set(expected)) assert len(actual) == 3 def test_visit_edges_diamond_pattern(graph_client): actual = list( graph_client.visit_edges( "swh:1:rev:0000000000000000000000000000000000000009", edges="*", ) ) expected = [ ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ( "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000005", ), ] assert set(actual) == set(expected) @pytest.mark.skip(reason="currently disabled due to T1969") def test_walk(graph_client): args = ("swh:1:dir:0000000000000000000000000000000000000016", "rel") kwargs = { "edges": "dir:dir,dir:rev,rev:*", "direction": "backward", "traversal": "bfs", } actual = list(graph_client.walk(*args, **kwargs)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", "swh:1:rev:0000000000000000000000000000000000000018", "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.walk(*args, **kwargs2)) expected = ["swh:1:rel:0000000000000000000000000000000000000019"] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = 2 actual = list(graph_client.walk(*args, **kwargs2)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", ] assert set(actual) == set(expected) +@pytest.mark.skip(reason="Random walk is deprecated") def test_random_walk_dst_is_type(graph_client): """as the walk is random, we test a visit from a cnt node to a release reachable from every single path in the backward graph, and only check the final node of the path (i.e., the release) """ args = ("swh:1:cnt:0000000000000000000000000000000000000015", "rel") kwargs = {"direction": "backward"} expected_root = "swh:1:rel:0000000000000000000000000000000000000019" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no release directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 +@pytest.mark.skip(reason="Random walk is deprecated") def test_random_walk_dst_is_node(graph_client): """Same as test_random_walk_dst_is_type, but we target the specific release node instead of a type """ args = ( "swh:1:cnt:0000000000000000000000000000000000000015", "swh:1:rel:0000000000000000000000000000000000000019", ) kwargs = {"direction": "backward"} expected_root = "swh:1:rel:0000000000000000000000000000000000000019" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no origin directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 def test_count(graph_client): actual = graph_client.count_leaves(TEST_ORIGIN_ID) assert actual == 4 actual = graph_client.count_visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev" ) assert actual == 3 actual = graph_client.count_neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) assert actual == 3 def test_param_validation(graph_client): with raises(GraphArgumentException) as exc_info: # SWHID not found list(graph_client.leaves("swh:1:rel:00ffffffff000000000000000000000000000010")) if exc_info.value.response: assert exc_info.value.response.status_code == 404 with raises(GraphArgumentException) as exc_info: # malformed SWHID list( graph_client.neighbors("swh:1:rel:00ffffffff00000000zzzzzzz000000000000010") ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed edge specificaiton list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:notanodetype,dir:rev,rev:*", direction="backward", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed direction list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:dir,dir:rev,rev:*", direction="notadirection", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 @pytest.mark.skip(reason="currently disabled due to T1969") def test_param_validation_walk(graph_client): """test validation of walk-specific parameters only""" with raises(RemoteException) as exc_info: # malformed traversal order list( graph_client.walk( "swh:1:dir:0000000000000000000000000000000000000016", "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="notatraversalorder", ) ) assert exc_info.value.response.status_code == 400