diff --git a/swh/graph/http_client.py b/swh/graph/http_client.py index aa66108..b204d73 100644 --- a/swh/graph/http_client.py +++ b/swh/graph/http_client.py @@ -1,156 +1,167 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from swh.core.api import RPCClient class GraphAPIError(Exception): """Graph API Error""" def __str__(self): return """An unexpected error occurred in the Graph backend: {}""".format( self.args ) class GraphArgumentException(Exception): def __init__(self, *args, response=None): super().__init__(*args) self.response = response class RemoteGraphClient(RPCClient): """Client to the Software Heritage Graph.""" def __init__(self, url, timeout=None): super().__init__(api_exception=GraphAPIError, url=url, timeout=timeout) def raw_verb_lines(self, verb, endpoint, **kwargs): response = self.raw_verb(verb, endpoint, stream=True, **kwargs) self.raise_for_status(response) for line in response.iter_lines(): yield line.decode().lstrip("\n") def get_lines(self, endpoint, **kwargs): yield from self.raw_verb_lines("get", endpoint, **kwargs) def raise_for_status(self, response) -> None: if response.status_code // 100 == 4: raise GraphArgumentException( response.content.decode("ascii"), response=response ) super().raise_for_status(response) # Web API endpoints def stats(self): return self.get("stats") def leaves( - self, src, edges="*", direction="forward", max_edges=0, return_types="*" + self, + src, + edges="*", + direction="forward", + max_edges=0, + return_types="*", + max_matching_nodes=0, ): return self.get_lines( "leaves/{}".format(src), params={ "edges": edges, "direction": direction, "max_edges": max_edges, "return_types": return_types, + "max_matching_nodes": max_matching_nodes, }, ) def neighbors( self, src, edges="*", direction="forward", max_edges=0, return_types="*" ): return self.get_lines( "neighbors/{}".format(src), params={ "edges": edges, "direction": direction, "max_edges": max_edges, "return_types": return_types, }, ) def visit_nodes( self, src, edges="*", direction="forward", max_edges=0, return_types="*" ): return self.get_lines( "visit/nodes/{}".format(src), params={ "edges": edges, "direction": direction, "max_edges": max_edges, "return_types": return_types, }, ) def visit_edges(self, src, edges="*", direction="forward", max_edges=0): for edge in self.get_lines( "visit/edges/{}".format(src), params={"edges": edges, "direction": direction, "max_edges": max_edges}, ): yield tuple(edge.split()) def visit_paths(self, src, edges="*", direction="forward", max_edges=0): def decode_path_wrapper(it): for e in it: yield json.loads(e) return decode_path_wrapper( self.get_lines( "visit/paths/{}".format(src), params={"edges": edges, "direction": direction, "max_edges": max_edges}, ) ) def walk( self, src, dst, edges="*", traversal="dfs", direction="forward", limit=None ): endpoint = "walk/{}/{}" return self.get_lines( endpoint.format(src, dst), params={ "edges": edges, "traversal": traversal, "direction": direction, "limit": limit, }, ) def random_walk( self, src, dst, edges="*", direction="forward", limit=None, return_types="*" ): endpoint = "randomwalk/{}/{}" return self.get_lines( endpoint.format(src, dst), params={ "edges": edges, "direction": direction, "limit": limit, "return_types": return_types, }, ) - def count_leaves(self, src, edges="*", direction="forward"): + def count_leaves(self, src, edges="*", direction="forward", max_matching_nodes=0): return self.get( "leaves/count/{}".format(src), - params={"edges": edges, "direction": direction}, + params={ + "edges": edges, + "direction": direction, + "max_matching_nodes": max_matching_nodes, + }, ) def count_neighbors(self, src, edges="*", direction="forward"): return self.get( "neighbors/count/{}".format(src), params={"edges": edges, "direction": direction}, ) def count_visit_nodes(self, src, edges="*", direction="forward"): return self.get( "visit/nodes/count/{}".format(src), params={"edges": edges, "direction": direction}, ) diff --git a/swh/graph/http_naive_client.py b/swh/graph/http_naive_client.py index a94efe8..43f0088 100644 --- a/swh/graph/http_naive_client.py +++ b/swh/graph/http_naive_client.py @@ -1,395 +1,412 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import inspect +import itertools import re import statistics from typing import ( Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar, Union, ) from swh.model.swhids import CoreSWHID, ExtendedSWHID, ValidationError from .http_client import GraphArgumentException _NODE_TYPES = "ori|snp|rel|rev|dir|cnt" NODES_RE = re.compile(rf"(\*|{_NODE_TYPES})") EDGES_RE = re.compile(rf"(\*|{_NODE_TYPES}):(\*|{_NODE_TYPES})") T = TypeVar("T", bound=Callable) SWHIDlike = Union[CoreSWHID, ExtendedSWHID, str] def check_arguments(f: T) -> T: """Decorator for generic argument checking for methods of NaiveClient. Checks ``src`` is a valid and known SWHID, and ``edges`` has the right format.""" signature = inspect.signature(f) @functools.wraps(f) def newf(*args, **kwargs): __tracebackhide__ = True # for pytest try: bound_args = signature.bind(*args, **kwargs) except TypeError as e: # rethrow the exception from here so pytest doesn't flood the terminal # with signature.bind's call stack. raise TypeError(*e.args) from None self = bound_args.arguments["self"] src = bound_args.arguments.get("src") if src: self._check_swhid(src) edges = bound_args.arguments.get("edges") if edges: if edges != "*" and not EDGES_RE.match(edges): raise GraphArgumentException(f"invalid edge restriction: {edges}") return_types = bound_args.arguments.get("return_types") if return_types: if not NODES_RE.match(return_types): raise GraphArgumentException( f"invalid return_types restriction: {return_types}" ) return f(*args, **kwargs) return newf # type: ignore def filter_node_types(node_types: str, nodes: Iterable[str]) -> Iterator[str]: if node_types == "*": yield from nodes else: prefixes = tuple(f"swh:1:{type_}:" for type_ in node_types.split(",")) for node in nodes: if node.startswith(prefixes): yield node class NaiveClient: """An alternative implementation of the graph server, written in pure-python and meant for simulating it in other components' test cases; constructed from a list of nodes and (directed) edges, both represented as SWHIDs. It is NOT meant to be efficient in any way; only to be a very simple implementation that provides the same behavior. >>> nodes = [ ... "swh:1:rev:1111111111111111111111111111111111111111", ... "swh:1:rev:2222222222222222222222222222222222222222", ... "swh:1:rev:3333333333333333333333333333333333333333", ... ] >>> edges = [ ... ( ... "swh:1:rev:1111111111111111111111111111111111111111", ... "swh:1:rev:2222222222222222222222222222222222222222", ... ), ... ( ... "swh:1:rev:2222222222222222222222222222222222222222", ... "swh:1:rev:3333333333333333333333333333333333333333", ... ), ... ] >>> c = NaiveClient(nodes=nodes, edges=edges) >>> list(c.leaves("swh:1:rev:1111111111111111111111111111111111111111")) ['swh:1:rev:3333333333333333333333333333333333333333'] """ def __init__( self, *, nodes: List[SWHIDlike], edges: List[Tuple[SWHIDlike, SWHIDlike]] ): self.graph = Graph(nodes, edges) def _check_swhid(self, swhid): try: ExtendedSWHID.from_string(swhid) except ValidationError as e: raise GraphArgumentException(*e.args) from None if swhid not in self.graph.nodes: raise GraphArgumentException(f"SWHID not found: {swhid}") def stats(self) -> Dict: return { "num_nodes": len(self.graph.nodes), "num_edges": sum(map(len, self.graph.forward_edges.values())), "compression_ratio": 1.0, "bits_per_edge": 100.0, "bits_per_node": 100.0, "avg_locality": 0.0, "indegree_min": min(map(len, self.graph.backward_edges.values())), "indegree_max": max(map(len, self.graph.backward_edges.values())), "indegree_avg": statistics.mean( map(len, self.graph.backward_edges.values()) ), "outdegree_min": min(map(len, self.graph.forward_edges.values())), "outdegree_max": max(map(len, self.graph.forward_edges.values())), "outdegree_avg": statistics.mean( map(len, self.graph.forward_edges.values()) ), } @check_arguments def leaves( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", + max_matching_nodes: int = 0, ) -> Iterator[str]: # TODO: max_edges - yield from filter_node_types( + leaves = filter_node_types( return_types, [ node for node in self.graph.get_subgraph(src, edges, direction) if not self.graph.get_filtered_neighbors(node, edges, direction) ], ) + if max_matching_nodes > 0: + leaves = itertools.islice(leaves, max_matching_nodes) + + return leaves + @check_arguments def neighbors( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, self.graph.get_filtered_neighbors(src, edges, direction) ) @check_arguments def visit_nodes( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, self.graph.get_subgraph(src, edges, direction) ) @check_arguments def visit_edges( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 ) -> Iterator[Tuple[str, str]]: if max_edges == 0: max_edges = None # type: ignore else: max_edges -= 1 yield from list(self.graph.iter_edges_dfs(direction, edges, src))[:max_edges] @check_arguments def visit_paths( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 ) -> Iterator[List[str]]: # TODO: max_edges for path in self.graph.iter_paths_dfs(direction, edges, src): if path[-1] in self.leaves(src, edges, direction): yield list(path) @check_arguments def walk( self, src: str, dst: str, edges: str = "*", traversal: str = "dfs", direction: str = "forward", limit: Optional[int] = None, ) -> Iterator[str]: # TODO: implement algo="bfs" # TODO: limit match_path: Callable[[str], bool] if ":" in dst: match_path = dst.__eq__ self._check_swhid(dst) else: match_path = lambda node: node.startswith(f"swh:1:{dst}:") # noqa for path in self.graph.iter_paths_dfs(direction, edges, src): if match_path(path[-1]): if not limit: # 0 or None yield from path elif limit > 0: yield from path[0:limit] else: yield from path[limit:] @check_arguments def random_walk( self, src: str, dst: str, edges: str = "*", direction: str = "forward", limit: Optional[int] = None, ): # TODO: limit yield from self.walk(src, dst, edges, "dfs", direction, limit) @check_arguments def count_leaves( - self, src: str, edges: str = "*", direction: str = "forward" + self, + src: str, + edges: str = "*", + direction: str = "forward", + max_matching_nodes: int = 0, ) -> int: - return len(list(self.leaves(src, edges, direction))) + return len( + list( + self.leaves( + src, edges, direction, max_matching_nodes=max_matching_nodes + ) + ) + ) @check_arguments def count_neighbors( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(self.graph.get_filtered_neighbors(src, edges, direction)) @check_arguments def count_visit_nodes( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(self.graph.get_subgraph(src, edges, direction)) class Graph: def __init__( self, nodes: List[SWHIDlike], edges: List[Tuple[SWHIDlike, SWHIDlike]] ): self.nodes = [str(node) for node in nodes] self.forward_edges: Dict[str, List[str]] = {} self.backward_edges: Dict[str, List[str]] = {} for node in nodes: self.forward_edges[str(node)] = [] self.backward_edges[str(node)] = [] for (src, dst) in edges: self.forward_edges[str(src)].append(str(dst)) self.backward_edges[str(dst)].append(str(src)) def get_filtered_neighbors( self, src: str, edges_fmt: str, direction: str, ) -> Set[str]: if direction == "forward": edges = self.forward_edges elif direction == "backward": edges = self.backward_edges else: raise GraphArgumentException(f"invalid direction: {direction}") neighbors = edges.get(src, []) if edges_fmt == "*": return set(neighbors) else: filtered_neighbors: Set[str] = set() for edges_fmt_item in edges_fmt.split(","): (src_fmt, dst_fmt) = edges_fmt_item.split(":") if src_fmt != "*" and not src.startswith(f"swh:1:{src_fmt}:"): continue if dst_fmt == "*": filtered_neighbors.update(neighbors) else: prefix = f"swh:1:{dst_fmt}:" filtered_neighbors.update( n for n in neighbors if n.startswith(prefix) ) return filtered_neighbors def get_subgraph(self, src: str, edges_fmt: str, direction: str) -> Set[str]: seen = set() to_visit = {src} while to_visit: node = to_visit.pop() seen.add(node) neighbors = set(self.get_filtered_neighbors(node, edges_fmt, direction)) new_nodes = neighbors - seen to_visit.update(new_nodes) return seen def iter_paths_dfs( self, direction: str, edges_fmt: str, src: str ) -> Iterator[Tuple[str, ...]]: for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): yield path + (node,) def iter_edges_dfs( self, direction: str, edges_fmt: str, src: str ) -> Iterator[Tuple[str, str]]: for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): if len(path) > 0: yield (path[-1], node) class SubgraphIterator(Iterator[Tuple[Tuple[str, ...], str]]): def __init__(self, graph: Graph, direction: str, edges_fmt: str, src: str): self.graph = graph self.direction = direction self.edges_fmt = edges_fmt self.seen: Set[str] = set() self.src = src def more_work(self) -> bool: raise NotImplementedError() def pop(self) -> Tuple[Tuple[str, ...], str]: raise NotImplementedError() def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: raise NotImplementedError() def __next__(self) -> Tuple[Tuple[str, ...], str]: # Stores (path, next_node) if not self.more_work(): raise StopIteration() (path, node) = self.pop() new_path = path + (node,) if node not in self.seen: neighbors = self.graph.get_filtered_neighbors( node, self.edges_fmt, self.direction ) # We want to visit the first neighbor first, and to_visit is a stack; # so we need to reversed() the list of neighbors to get it on top # of the stack. for neighbor in reversed(list(neighbors)): self.push(new_path, neighbor) self.seen.add(node) return (path, node) class DfsSubgraphIterator(SubgraphIterator): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.to_visit: List[Tuple[Tuple[str, ...], str]] = [((), self.src)] def more_work(self) -> bool: return bool(self.to_visit) def pop(self) -> Tuple[Tuple[str, ...], str]: return self.to_visit.pop() def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: self.to_visit.append((new_path, neighbor)) diff --git a/swh/graph/http_rpc_server.py b/swh/graph/http_rpc_server.py index 658a4ea..fc617f0 100644 --- a/swh/graph/http_rpc_server.py +++ b/swh/graph/http_rpc_server.py @@ -1,415 +1,419 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using FIFO as a transport to stream integers between the two languages. """ import json import logging import os from typing import Optional import aiohttp.test_utils import aiohttp.web from google.protobuf import json_format from google.protobuf.field_mask_pb2 import FieldMask import grpc from swh.core.api.asynchronous import RPCServerApp from swh.core.config import read as config_read from swh.graph.grpc.swhgraph_pb2 import ( GetNodeRequest, NodeFilter, StatsRequest, TraversalRequest, ) from swh.graph.grpc.swhgraph_pb2_grpc import TraversalServiceStub from swh.graph.grpc_server import spawn_java_grpc_server, stop_java_grpc_server from swh.model.swhids import EXTENDED_SWHID_TYPES try: from contextlib import asynccontextmanager except ImportError: # Compatibility with 3.6 backport from async_generator import asynccontextmanager # type: ignore # maximum number of retries for random walks RANDOM_RETRIES = 10 # TODO make this configurable via rpc-serve configuration logger = logging.getLogger(__name__) async def _aiorpcerror_middleware(app, handler): async def middleware_handler(request): try: return await handler(request) except grpc.aio.AioRpcError as e: # The default error handler of the RPC framework tries to serialize this # with msgpack; which for some unknown reason causes it to raise # ValueError("recursion limit exceeded") with a lot of context, causing # Sentry to be overflowed with gigabytes of logs (160KB per event, with # potentially hundreds of thousands of events per day). # Instead, we simply serialize the exception to a string. # https://sentry.softwareheritage.org/share/issue/d6d4db971e4b47728a6c1dd06cb9b8a5/ raise aiohttp.web.HTTPServiceUnavailable(text=str(e)) return middleware_handler class GraphServerApp(RPCServerApp): def __init__(self, *args, middlewares=(), **kwargs): middlewares = (_aiorpcerror_middleware,) + middlewares super().__init__(*args, middlewares=middlewares, **kwargs) self.on_startup.append(self._start) self.on_shutdown.append(self._stop) @staticmethod async def _start(app): app["channel"] = grpc.aio.insecure_channel(app["rpc_url"]) await app["channel"].__aenter__() app["rpc_client"] = TraversalServiceStub(app["channel"]) await app["rpc_client"].Stats(StatsRequest(), wait_for_ready=True) @staticmethod async def _stop(app): await app["channel"].__aexit__(None, None, None) if app.get("local_server"): stop_java_grpc_server(app["local_server"]) async def index(request): return aiohttp.web.Response( content_type="text/html", body=""" Software Heritage graph server

You have reached the Software Heritage graph API server.

See its API documentation for more information.

""", ) class GraphView(aiohttp.web.View): """Base class for views working on the graph, with utility functions""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rpc_client: TraversalServiceStub = self.request.app["rpc_client"] def get_direction(self): """Validate HTTP query parameter `direction`""" s = self.request.query.get("direction", "forward") if s not in ("forward", "backward"): raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}") return s.upper() def get_edges(self): """Validate HTTP query parameter `edges`, i.e., edge restrictions""" s = self.request.query.get("edges", "*") if any( [ node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for edge in s.split(":") for node_type in edge.split(",", maxsplit=1) ] ): raise aiohttp.web.HTTPBadRequest(text=f"invalid edge restriction: {s}") return s def get_return_types(self): """Validate HTTP query parameter 'return types', i.e, a set of types which we will filter the query results with""" s = self.request.query.get("return_types", "*") if any( node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for node_type in s.split(",") ): raise aiohttp.web.HTTPBadRequest( text=f"invalid type for filtering res: {s}" ) # if the user puts a star, # then we filter nothing, we don't need the other information if "*" in s: return "*" else: return s - def get_limit(self): - """Validate HTTP query parameter `limit`, i.e., number of results""" - s = self.request.query.get("limit", "0") + def get_max_matching_nodes(self): + """Validate HTTP query parameter `max_matching_nodes`, i.e., number of results""" + s = self.request.query.get("max_matching_nodes", "0") try: return int(s) except ValueError: - raise aiohttp.web.HTTPBadRequest(text=f"invalid limit value: {s}") + raise aiohttp.web.HTTPBadRequest( + text=f"invalid max_matching_nodes value: {s}" + ) def get_max_edges(self): """Validate HTTP query parameter 'max_edges', i.e., the limit of the number of edges that can be visited""" s = self.request.query.get("max_edges", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}") async def check_swhid(self, swhid): """Validate that the given SWHID exists in the graph""" try: await self.rpc_client.GetNode( GetNodeRequest(swhid=swhid, mask=FieldMask(paths=["swhid"])) ) except grpc.aio.AioRpcError as e: if e.code() == grpc.StatusCode.INVALID_ARGUMENT: raise aiohttp.web.HTTPBadRequest(text=str(e.details())) class StreamingGraphView(GraphView): """Base class for views streaming their response line by line.""" content_type = "text/plain" @asynccontextmanager async def response_streamer(self, *args, **kwargs): """Context manager to prepare then close a StreamResponse""" response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = self.content_type await response.prepare(self.request) yield response await response.write_eof() async def get(self): await self.prepare_response() async with self.response_streamer() as self.response_stream: self._buf = [] try: await self.stream_response() finally: await self._flush_buffer() return self.response_stream async def prepare_response(self): """This can be overridden with some setup to be run before the response actually starts streaming. """ pass async def stream_response(self): """Override this to perform the response streaming. Implementations of this should await self.stream_line(line) to write each line. """ raise NotImplementedError async def stream_line(self, line): """Write a line in the response stream.""" self._buf.append(line) if len(self._buf) > 100: await self._flush_buffer() async def _flush_buffer(self): await self.response_stream.write("\n".join(self._buf).encode() + b"\n") self._buf = [] class StatsView(GraphView): """View showing some statistics on the graph""" async def get(self): res = await self.rpc_client.Stats(StatsRequest()) stats = json_format.MessageToDict( res, including_default_value_fields=True, preserving_proto_field_name=True ) # Int64 fields are serialized as strings by default. for descriptor in res.DESCRIPTOR.fields: if descriptor.type == descriptor.TYPE_INT64: try: stats[descriptor.name] = int(stats[descriptor.name]) except KeyError: pass json_body = json.dumps(stats, indent=4, sort_keys=True) return aiohttp.web.Response(body=json_body, content_type="application/json") class SimpleTraversalView(StreamingGraphView): """Base class for views of simple traversals""" async def prepare_response(self): src = self.request.match_info["src"] self.traversal_request = TraversalRequest( src=[src], edges=self.get_edges(), direction=self.get_direction(), return_nodes=NodeFilter(types=self.get_return_types()), mask=FieldMask(paths=["swhid"]), + max_matching_nodes=self.get_max_matching_nodes(), ) if self.get_max_edges(): self.traversal_request.max_edges = self.get_max_edges() await self.check_swhid(src) self.configure_request() self.nodes_stream = self.rpc_client.Traverse(self.traversal_request) # Force gRPC to query the server and fetch the first nodes; so errors # are raised early, so we can return HTTP 503 before HTTP 200 await self.nodes_stream.wait_for_connection() def configure_request(self): pass async def stream_response(self): async for node in self.nodes_stream: await self.stream_line(node.swhid) class LeavesView(SimpleTraversalView): def configure_request(self): self.traversal_request.return_nodes.max_traversal_successors = 0 class NeighborsView(SimpleTraversalView): def configure_request(self): self.traversal_request.min_depth = 1 self.traversal_request.max_depth = 1 class VisitNodesView(SimpleTraversalView): pass class VisitEdgesView(SimpleTraversalView): def configure_request(self): self.traversal_request.mask.paths.extend(["successor", "successor.swhid"]) # self.traversal_request.return_fields.successor = True async def stream_response(self): async for node in self.rpc_client.Traverse(self.traversal_request): for succ in node.successor: await self.stream_line(node.swhid + " " + succ.swhid) class CountView(GraphView): """Base class for counting views.""" count_type: Optional[str] = None async def get(self): src = self.request.match_info["src"] self.traversal_request = TraversalRequest( src=[src], edges=self.get_edges(), direction=self.get_direction(), return_nodes=NodeFilter(types=self.get_return_types()), mask=FieldMask(paths=["swhid"]), + max_matching_nodes=self.get_max_matching_nodes(), ) if self.get_max_edges(): self.traversal_request.max_edges = self.get_max_edges() self.configure_request() res = await self.rpc_client.CountNodes(self.traversal_request) return aiohttp.web.Response( body=str(res.count), content_type="application/json" ) def configure_request(self): pass class CountNeighborsView(CountView): def configure_request(self): self.traversal_request.min_depth = 1 self.traversal_request.max_depth = 1 class CountLeavesView(CountView): def configure_request(self): self.traversal_request.return_nodes.max_traversal_successors = 0 class CountVisitNodesView(CountView): pass def make_app(config=None): """Create an aiohttp server for the HTTP RPC frontend to the swh-graph API. It may either connect to an existing grpc server (cls="remote") or spawn a local grpc server (cls="local"). ``config`` is expected to be a dict like:: graph: cls: "local" grpc_server: port: 50091 http_rpc_server: debug: true or:: graph: cls: "remote" url: "localhost:50091" http_rpc_server: debug: true See: - :mod:`swh.graph.grpc_server` for more details of the content of the grpc_server section, - :class:`~.GraphServerApp` class for more details of the content of the http_rpc_server section. """ if config is None: config = {} if "graph" not in config: logger.info( "Missing 'graph' configuration; default to a locally spawn" "grpc server listening on 0.0.0.0:50091" ) cfg = {"cls": "local", "grpc_server": {"port": 50091}} else: cfg = config["graph"].copy() cls = cfg.pop("cls") grpc_cfg = cfg.pop("grpc_server", {}) app = GraphServerApp(**cfg.get("http_rpc_server", {})) if cls == "remote": if "url" not in cfg: raise KeyError("Missing 'url' configuration entry in the [graph] section") rpc_url = cfg["url"] elif cls == "local": app["local_server"], port = spawn_java_grpc_server(**grpc_cfg) rpc_url = f"localhost:{port}" else: raise ValueError(f"Unknown swh.graph class cls={cls}") app.add_routes( [ aiohttp.web.get("/", index), aiohttp.web.get("/graph", index), aiohttp.web.view("/graph/stats", StatsView), aiohttp.web.view("/graph/leaves/{src}", LeavesView), aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), ] ) app["rpc_url"] = rpc_url return app def make_app_from_configfile(): """Load configuration and then build application to run""" config_file = os.environ.get("SWH_CONFIG_FILENAME") config = config_read(config_file) return make_app(config=config) diff --git a/swh/graph/tests/test_http_client.py b/swh/graph/tests/test_http_client.py index 21021b3..1878029 100644 --- a/swh/graph/tests/test_http_client.py +++ b/swh/graph/tests/test_http_client.py @@ -1,378 +1,408 @@ # Copyright (c) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import pytest from pytest import raises from swh.core.api import RemoteException from swh.graph.http_client import GraphArgumentException TEST_ORIGIN_ID = "swh:1:ori:{}".format( hashlib.sha1(b"https://example.com/swh/graph").hexdigest() ) def test_stats(graph_client): stats = graph_client.stats() assert stats["num_nodes"] == 21 assert stats["num_edges"] == 23 assert isinstance(stats["compression_ratio"], float) assert isinstance(stats["bits_per_node"], float) assert isinstance(stats["bits_per_edge"], float) assert isinstance(stats["avg_locality"], float) assert stats["indegree_min"] == 0 assert stats["indegree_max"] == 3 assert isinstance(stats["indegree_avg"], float) assert stats["outdegree_min"] == 0 assert stats["outdegree_max"] == 3 assert isinstance(stats["outdegree_avg"], float) def test_leaves(graph_client): actual = list(graph_client.leaves(TEST_ORIGIN_ID)) expected = [ "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) +@pytest.mark.parametrize("max_matching_nodes", [0, 1, 2, 3, 4, 5, 10, 1 << 31]) +def test_leaves_with_limit(graph_client, max_matching_nodes): + actual = list( + graph_client.leaves(TEST_ORIGIN_ID, max_matching_nodes=max_matching_nodes) + ) + expected = [ + "swh:1:cnt:0000000000000000000000000000000000000001", + "swh:1:cnt:0000000000000000000000000000000000000004", + "swh:1:cnt:0000000000000000000000000000000000000005", + "swh:1:cnt:0000000000000000000000000000000000000007", + ] + + if max_matching_nodes == 0: + assert set(actual) == set(expected) + else: + assert set(actual) <= set(expected) + assert len(actual) == min(4, max_matching_nodes) + + def test_neighbors(graph_client): actual = list( graph_client.neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) ) expected = [ "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000013", ] assert set(actual) == set(expected) def test_visit_nodes(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_nodes_filtered(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", return_types="dir", ) ) expected = [ "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ] assert set(actual) == set(expected) def test_visit_nodes_filtered_star(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", return_types="*", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", ] assert set(actual) == set(expected) def test_visit_edges(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] assert set(actual) == set(expected) def test_visit_edges_limited(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", max_edges=4, edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] # As there are four valid answers (up to reordering), we cannot check for # equality. Instead, we check the client returned all edges but one. assert set(actual).issubset(set(expected)) assert len(actual) == 3 def test_visit_edges_diamond_pattern(graph_client): actual = list( graph_client.visit_edges( "swh:1:rev:0000000000000000000000000000000000000009", edges="*", ) ) expected = [ ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ( "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000005", ), ] assert set(actual) == set(expected) @pytest.mark.skip(reason="currently disabled due to T1969") def test_walk(graph_client): args = ("swh:1:dir:0000000000000000000000000000000000000016", "rel") kwargs = { "edges": "dir:dir,dir:rev,rev:*", "direction": "backward", "traversal": "bfs", } actual = list(graph_client.walk(*args, **kwargs)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", "swh:1:rev:0000000000000000000000000000000000000018", "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.walk(*args, **kwargs2)) expected = ["swh:1:rel:0000000000000000000000000000000000000019"] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = 2 actual = list(graph_client.walk(*args, **kwargs2)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", ] assert set(actual) == set(expected) @pytest.mark.skip(reason="Random walk is deprecated") def test_random_walk_dst_is_type(graph_client): """as the walk is random, we test a visit from a cnt node to a release reachable from every single path in the backward graph, and only check the final node of the path (i.e., the release) """ args = ("swh:1:cnt:0000000000000000000000000000000000000015", "rel") kwargs = {"direction": "backward"} expected_root = "swh:1:rel:0000000000000000000000000000000000000019" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no release directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 @pytest.mark.skip(reason="Random walk is deprecated") def test_random_walk_dst_is_node(graph_client): """Same as test_random_walk_dst_is_type, but we target the specific release node instead of a type """ args = ( "swh:1:cnt:0000000000000000000000000000000000000015", "swh:1:rel:0000000000000000000000000000000000000019", ) kwargs = {"direction": "backward"} expected_root = "swh:1:rel:0000000000000000000000000000000000000019" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no origin directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 def test_count(graph_client): actual = graph_client.count_leaves(TEST_ORIGIN_ID) assert actual == 4 actual = graph_client.count_visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev" ) assert actual == 3 actual = graph_client.count_neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) assert actual == 3 +@pytest.mark.parametrize("max_matching_nodes", [0, 1, 2, 3, 4, 5, 10, 1 << 31]) +def test_count_with_limit(graph_client, max_matching_nodes): + actual = graph_client.count_leaves( + TEST_ORIGIN_ID, max_matching_nodes=max_matching_nodes + ) + if max_matching_nodes == 0: + assert actual == 4 + else: + assert actual == min(4, max_matching_nodes) + + def test_param_validation(graph_client): with raises(GraphArgumentException) as exc_info: # SWHID not found list(graph_client.leaves("swh:1:rel:00ffffffff000000000000000000000000000010")) if exc_info.value.response: assert exc_info.value.response.status_code == 404 with raises(GraphArgumentException) as exc_info: # malformed SWHID list( graph_client.neighbors("swh:1:rel:00ffffffff00000000zzzzzzz000000000000010") ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed edge specificaiton list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:notanodetype,dir:rev,rev:*", direction="backward", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed direction list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:dir,dir:rev,rev:*", direction="notadirection", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 @pytest.mark.skip(reason="currently disabled due to T1969") def test_param_validation_walk(graph_client): """test validation of walk-specific parameters only""" with raises(RemoteException) as exc_info: # malformed traversal order list( graph_client.walk( "swh:1:dir:0000000000000000000000000000000000000016", "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="notatraversalorder", ) ) assert exc_info.value.response.status_code == 400