diff --git a/swh/graph/cli.py b/swh/graph/cli.py index b35ccbc..161ac10 100644 --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -1,200 +1,200 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Set, Tuple # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group if TYPE_CHECKING: from swh.graph.webgraph import CompressionStep # noqa class StepOption(click.ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step" def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep] from swh.graph.webgraph import COMP_SEQ, CompressionStep # noqa steps: Set[CompressionStep] = set() specs = value.split(",") for spec in specs: if "-" in spec: # step range (raw_l, raw_r) = spec.split("-", maxsplit=1) if raw_l == "": # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == "": # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: self.fail(f"invalid step specification: {value}, " f"see --help") l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union( set(CompressionStep(i) for i in range(l_idx.value, r_idx.value + 1)) ) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: self.fail( f"invalid step specification: {value}, " f"see --help" ) return steps class PathlibPath(click.Path): """A Click path argument that returns a pathlib Path, not a string""" def convert(self, value, param, ctx): return Path(super().convert(value, param, ctx)) DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})} @swh_cli_group.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="YAML configuration file", ) @click.pass_context def graph_cli_group(ctx, config_file): """Software Heritage graph tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file, DEFAULT_CONFIG) if "graph" not in conf: raise ValueError( 'no "graph" stanza found in configuration file %s' % config_file ) ctx.obj["config"] = conf @graph_cli_group.command(name="rpc-serve") @click.option( "--host", "-h", default="0.0.0.0", metavar="IP", show_default=True, help="host IP address to bind the server on", ) @click.option( "--port", "-p", default=5009, type=click.INT, metavar="PORT", show_default=True, help="port to bind the server on", ) @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.pass_context def serve(ctx, host, port, graph): """run the graph RPC service""" - import aiohttp + import aiohttp.web - from swh.graph.server.app import make_app + from swh.graph.http_server import make_app config = ctx.obj["config"] config.setdefault("graph", {}) config["graph"]["path"] = graph app = make_app(config=config) aiohttp.web.run_app(app, host=host, port=port) @graph_cli_group.command() @click.option( "--input-dataset", "-i", required=True, type=PathlibPath(), help="graph dataset directory, in ORC format", ) @click.option( "--output-directory", "-o", required=True, type=PathlibPath(), help="directory where to store compressed graph", ) @click.option( "--graph-name", "-g", default="graph", metavar="NAME", help="name of the output graph (default: 'graph')", ) @click.option( "--steps", "-s", metavar="STEPS", type=StepOption(), help="run only these compression steps (default: all steps)", ) @click.pass_context def compress(ctx, input_dataset, output_directory, graph_name, steps): """Compress a graph using WebGraph Input: a directory containing a graph dataset in ORC format Output: a directory containing a WebGraph compressed graph Compression steps are: (1) extract_nodes, (2) mph, (3) bv, (4) bfs, (5) permute_bfs, (6) transpose_bfs, (7) simplify, (8) llp, (9) permute_llp, (10) obl, (11) compose_orders, (12) stats, (13) transpose, (14) transpose_obl, (15) maps, (16) extract_persons, (17) mph_persons, (18) node_properties, (19) mph_labels, (20) fcl_labels, (21) edge_labels, (22) clean_tmp. Compression steps can be selected by name or number using --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. """ from swh.graph import webgraph try: conf = ctx.obj["config"]["graph"]["compress"] except KeyError: conf = {} # use defaults webgraph.compress(graph_name, input_dataset, output_directory, steps, conf) def main(): return graph_cli_group(auto_envvar_prefix="SWH_GRAPH") if __name__ == "__main__": main() diff --git a/swh/graph/client.py b/swh/graph/http_client.py similarity index 100% rename from swh/graph/client.py rename to swh/graph/http_client.py diff --git a/swh/graph/naive_client.py b/swh/graph/http_naive_client.py similarity index 99% rename from swh/graph/naive_client.py rename to swh/graph/http_naive_client.py index 1b58f6d..c8bf729 100644 --- a/swh/graph/naive_client.py +++ b/swh/graph/http_naive_client.py @@ -1,395 +1,395 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import inspect import re import statistics from typing import ( Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar, Union, ) from swh.model.swhids import CoreSWHID, ExtendedSWHID, ValidationError -from .client import GraphArgumentException +from .http_client import GraphArgumentException _NODE_TYPES = "ori|snp|rel|rev|dir|cnt" NODES_RE = re.compile(rf"(\*|{_NODE_TYPES})") EDGES_RE = re.compile(rf"(\*|{_NODE_TYPES}):(\*|{_NODE_TYPES})") T = TypeVar("T", bound=Callable) SWHIDlike = Union[CoreSWHID, ExtendedSWHID, str] def check_arguments(f: T) -> T: """Decorator for generic argument checking for methods of NaiveClient. Checks ``src`` is a valid and known SWHID, and ``edges`` has the right format.""" signature = inspect.signature(f) @functools.wraps(f) def newf(*args, **kwargs): __tracebackhide__ = True # for pytest try: bound_args = signature.bind(*args, **kwargs) except TypeError as e: # rethrow the exception from here so pytest doesn't flood the terminal # with signature.bind's call stack. raise TypeError(*e.args) from None self = bound_args.arguments["self"] src = bound_args.arguments.get("src") if src: self._check_swhid(src) edges = bound_args.arguments.get("edges") if edges: if edges != "*" and not EDGES_RE.match(edges): raise GraphArgumentException(f"invalid edge restriction: {edges}") return_types = bound_args.arguments.get("return_types") if return_types: if not NODES_RE.match(return_types): raise GraphArgumentException( f"invalid return_types restriction: {return_types}" ) return f(*args, **kwargs) return newf # type: ignore def filter_node_types(node_types: str, nodes: Iterable[str]) -> Iterator[str]: if node_types == "*": yield from nodes else: prefixes = tuple(f"swh:1:{type_}:" for type_ in node_types.split(",")) for node in nodes: if node.startswith(prefixes): yield node class NaiveClient: """An alternative implementation of the graph server, written in pure-python and meant for simulating it in other components' test cases; constructed from a list of nodes and (directed) edges, both represented as SWHIDs. It is NOT meant to be efficient in any way; only to be a very simple implementation that provides the same behavior. >>> nodes = [ ... "swh:1:rev:1111111111111111111111111111111111111111", ... "swh:1:rev:2222222222222222222222222222222222222222", ... "swh:1:rev:3333333333333333333333333333333333333333", ... ] >>> edges = [ ... ( ... "swh:1:rev:1111111111111111111111111111111111111111", ... "swh:1:rev:2222222222222222222222222222222222222222", ... ), ... ( ... "swh:1:rev:2222222222222222222222222222222222222222", ... "swh:1:rev:3333333333333333333333333333333333333333", ... ), ... ] >>> c = NaiveClient(nodes=nodes, edges=edges) >>> list(c.leaves("swh:1:rev:1111111111111111111111111111111111111111")) ['swh:1:rev:3333333333333333333333333333333333333333'] """ def __init__( self, *, nodes: List[SWHIDlike], edges: List[Tuple[SWHIDlike, SWHIDlike]] ): self.graph = Graph(nodes, edges) def _check_swhid(self, swhid): try: ExtendedSWHID.from_string(swhid) except ValidationError as e: raise GraphArgumentException(*e.args) from None if swhid not in self.graph.nodes: raise GraphArgumentException(f"SWHID not found: {swhid}") def stats(self) -> Dict: return { "num_nodes": len(self.graph.nodes), "num_edges": sum(map(len, self.graph.forward_edges.values())), "compression": 1.0, "bits_per_edge": 100.0, "bits_per_node": 100.0, "avg_locality": 0.0, "indegree_min": min(map(len, self.graph.backward_edges.values())), "indegree_max": max(map(len, self.graph.backward_edges.values())), "indegree_avg": statistics.mean( map(len, self.graph.backward_edges.values()) ), "outdegree_min": min(map(len, self.graph.forward_edges.values())), "outdegree_max": max(map(len, self.graph.forward_edges.values())), "outdegree_avg": statistics.mean( map(len, self.graph.forward_edges.values()) ), } @check_arguments def leaves( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, [ node for node in self.graph.get_subgraph(src, edges, direction) if not self.graph.get_filtered_neighbors(node, edges, direction) ], ) @check_arguments def neighbors( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, self.graph.get_filtered_neighbors(src, edges, direction) ) @check_arguments def visit_nodes( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, self.graph.get_subgraph(src, edges, direction) ) @check_arguments def visit_edges( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 ) -> Iterator[Tuple[str, str]]: if max_edges == 0: max_edges = None # type: ignore else: max_edges -= 1 yield from list(self.graph.iter_edges_dfs(direction, edges, src))[:max_edges] @check_arguments def visit_paths( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 ) -> Iterator[List[str]]: # TODO: max_edges for path in self.graph.iter_paths_dfs(direction, edges, src): if path[-1] in self.leaves(src, edges, direction): yield list(path) @check_arguments def walk( self, src: str, dst: str, edges: str = "*", traversal: str = "dfs", direction: str = "forward", limit: Optional[int] = None, ) -> Iterator[str]: # TODO: implement algo="bfs" # TODO: limit match_path: Callable[[str], bool] if ":" in dst: match_path = dst.__eq__ self._check_swhid(dst) else: match_path = lambda node: node.startswith(f"swh:1:{dst}:") # noqa for path in self.graph.iter_paths_dfs(direction, edges, src): if match_path(path[-1]): if not limit: # 0 or None yield from path elif limit > 0: yield from path[0:limit] else: yield from path[limit:] @check_arguments def random_walk( self, src: str, dst: str, edges: str = "*", direction: str = "forward", limit: Optional[int] = None, ): # TODO: limit yield from self.walk(src, dst, edges, "dfs", direction, limit) @check_arguments def count_leaves( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(list(self.leaves(src, edges, direction))) @check_arguments def count_neighbors( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(self.graph.get_filtered_neighbors(src, edges, direction)) @check_arguments def count_visit_nodes( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(self.graph.get_subgraph(src, edges, direction)) class Graph: def __init__( self, nodes: List[SWHIDlike], edges: List[Tuple[SWHIDlike, SWHIDlike]] ): self.nodes = [str(node) for node in nodes] self.forward_edges: Dict[str, List[str]] = {} self.backward_edges: Dict[str, List[str]] = {} for node in nodes: self.forward_edges[str(node)] = [] self.backward_edges[str(node)] = [] for (src, dst) in edges: self.forward_edges[str(src)].append(str(dst)) self.backward_edges[str(dst)].append(str(src)) def get_filtered_neighbors( self, src: str, edges_fmt: str, direction: str, ) -> Set[str]: if direction == "forward": edges = self.forward_edges elif direction == "backward": edges = self.backward_edges else: raise GraphArgumentException(f"invalid direction: {direction}") neighbors = edges.get(src, []) if edges_fmt == "*": return set(neighbors) else: filtered_neighbors: Set[str] = set() for edges_fmt_item in edges_fmt.split(","): (src_fmt, dst_fmt) = edges_fmt_item.split(":") if src_fmt != "*" and not src.startswith(f"swh:1:{src_fmt}:"): continue if dst_fmt == "*": filtered_neighbors.update(neighbors) else: prefix = f"swh:1:{dst_fmt}:" filtered_neighbors.update( n for n in neighbors if n.startswith(prefix) ) return filtered_neighbors def get_subgraph(self, src: str, edges_fmt: str, direction: str) -> Set[str]: seen = set() to_visit = {src} while to_visit: node = to_visit.pop() seen.add(node) neighbors = set(self.get_filtered_neighbors(node, edges_fmt, direction)) new_nodes = neighbors - seen to_visit.update(new_nodes) return seen def iter_paths_dfs( self, direction: str, edges_fmt: str, src: str ) -> Iterator[Tuple[str, ...]]: for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): yield path + (node,) def iter_edges_dfs( self, direction: str, edges_fmt: str, src: str ) -> Iterator[Tuple[str, str]]: for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): if len(path) > 0: yield (path[-1], node) class SubgraphIterator(Iterator[Tuple[Tuple[str, ...], str]]): def __init__(self, graph: Graph, direction: str, edges_fmt: str, src: str): self.graph = graph self.direction = direction self.edges_fmt = edges_fmt self.seen: Set[str] = set() self.src = src def more_work(self) -> bool: raise NotImplementedError() def pop(self) -> Tuple[Tuple[str, ...], str]: raise NotImplementedError() def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: raise NotImplementedError() def __next__(self) -> Tuple[Tuple[str, ...], str]: # Stores (path, next_node) if not self.more_work(): raise StopIteration() (path, node) = self.pop() new_path = path + (node,) if node not in self.seen: neighbors = self.graph.get_filtered_neighbors( node, self.edges_fmt, self.direction ) # We want to visit the first neighbor first, and to_visit is a stack; # so we need to reversed() the list of neighbors to get it on top # of the stack. for neighbor in reversed(list(neighbors)): self.push(new_path, neighbor) self.seen.add(node) return (path, node) class DfsSubgraphIterator(SubgraphIterator): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.to_visit: List[Tuple[Tuple[str, ...], str]] = [((), self.src)] def more_work(self) -> bool: return bool(self.to_visit) def pop(self) -> Tuple[Tuple[str, ...], str]: return self.to_visit.pop() def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: self.to_visit.append((new_path, neighbor)) diff --git a/swh/graph/server/app.py b/swh/graph/http_server.py similarity index 94% rename from swh/graph/server/app.py rename to swh/graph/http_server.py index 57ed7b9..8a9bbc6 100644 --- a/swh/graph/server/app.py +++ b/swh/graph/http_server.py @@ -1,363 +1,344 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using FIFO as a transport to stream integers between the two languages. """ import json import os -import subprocess from typing import Optional import aiohttp.test_utils import aiohttp.web from google.protobuf import json_format import grpc from swh.core.api.asynchronous import RPCServerApp from swh.core.config import read as config_read -from swh.graph.config import check_config from swh.graph.rpc.swhgraph_pb2 import ( CheckSwhidRequest, NodeFields, NodeFilter, StatsRequest, TraversalRequest, ) from swh.graph.rpc.swhgraph_pb2_grpc import TraversalServiceStub +from swh.graph.rpc_server import spawn_java_rpc_server from swh.model.swhids import EXTENDED_SWHID_TYPES try: from contextlib import asynccontextmanager except ImportError: # Compatibility with 3.6 backport from async_generator import asynccontextmanager # type: ignore # maximum number of retries for random walks RANDOM_RETRIES = 10 # TODO make this configurable via rpc-serve configuration class GraphServerApp(RPCServerApp): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.on_startup.append(self._start) self.on_shutdown.append(self._stop) @staticmethod async def _start(app): app["channel"] = grpc.aio.insecure_channel(app["rpc_url"]) await app["channel"].__aenter__() app["rpc_client"] = TraversalServiceStub(app["channel"]) await app["rpc_client"].Stats(StatsRequest(), wait_for_ready=True) @staticmethod async def _stop(app): await app["channel"].__aexit__(None, None, None) if app.get("local_server"): app["local_server"].terminate() async def index(request): return aiohttp.web.Response( content_type="text/html", body=""" Software Heritage graph server

You have reached the Software Heritage graph API server.

See its API documentation for more information.

""", ) class GraphView(aiohttp.web.View): """Base class for views working on the graph, with utility functions""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rpc_client: TraversalServiceStub = self.request.app["rpc_client"] def get_direction(self): """Validate HTTP query parameter `direction`""" s = self.request.query.get("direction", "forward") if s not in ("forward", "backward"): raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}") return s.upper() def get_edges(self): """Validate HTTP query parameter `edges`, i.e., edge restrictions""" s = self.request.query.get("edges", "*") if any( [ node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for edge in s.split(":") for node_type in edge.split(",", maxsplit=1) ] ): raise aiohttp.web.HTTPBadRequest(text=f"invalid edge restriction: {s}") return s def get_return_types(self): """Validate HTTP query parameter 'return types', i.e, a set of types which we will filter the query results with""" s = self.request.query.get("return_types", "*") if any( node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for node_type in s.split(",") ): raise aiohttp.web.HTTPBadRequest( text=f"invalid type for filtering res: {s}" ) # if the user puts a star, # then we filter nothing, we don't need the other information if "*" in s: return "*" else: return s def get_traversal(self): """Validate HTTP query parameter `traversal`, i.e., visit order""" s = self.request.query.get("traversal", "dfs") if s not in ("bfs", "dfs"): raise aiohttp.web.HTTPBadRequest(text=f"invalid traversal order: {s}") return s def get_limit(self): """Validate HTTP query parameter `limit`, i.e., number of results""" s = self.request.query.get("limit", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid limit value: {s}") def get_max_edges(self): """Validate HTTP query parameter 'max_edges', i.e., the limit of the number of edges that can be visited""" s = self.request.query.get("max_edges", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}") async def check_swhid(self, swhid): """Validate that the given SWHID exists in the graph""" r = await self.rpc_client.CheckSwhid(CheckSwhidRequest(swhid=swhid)) if not r.exists: raise aiohttp.web.HTTPBadRequest(text=str(r.details)) class StreamingGraphView(GraphView): """Base class for views streaming their response line by line.""" content_type = "text/plain" @asynccontextmanager async def response_streamer(self, *args, **kwargs): """Context manager to prepare then close a StreamResponse""" response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = self.content_type await response.prepare(self.request) yield response await response.write_eof() async def get(self): await self.prepare_response() async with self.response_streamer() as self.response_stream: self._buf = [] try: await self.stream_response() finally: await self._flush_buffer() return self.response_stream async def prepare_response(self): """This can be overridden with some setup to be run before the response actually starts streaming. """ pass async def stream_response(self): """Override this to perform the response streaming. Implementations of this should await self.stream_line(line) to write each line. """ raise NotImplementedError async def stream_line(self, line): """Write a line in the response stream.""" self._buf.append(line) if len(self._buf) > 100: await self._flush_buffer() async def _flush_buffer(self): await self.response_stream.write("\n".join(self._buf).encode() + b"\n") self._buf = [] class StatsView(GraphView): """View showing some statistics on the graph""" async def get(self): res = await self.rpc_client.Stats(StatsRequest()) stats = json_format.MessageToDict( res, including_default_value_fields=True, preserving_proto_field_name=True ) # Int64 fields are serialized as strings by default. for descriptor in res.DESCRIPTOR.fields: if descriptor.type == descriptor.TYPE_INT64: try: stats[descriptor.name] = int(stats[descriptor.name]) except KeyError: pass json_body = json.dumps(stats, indent=4, sort_keys=True) return aiohttp.web.Response(body=json_body, content_type="application/json") class SimpleTraversalView(StreamingGraphView): """Base class for views of simple traversals""" async def prepare_response(self): src = self.request.match_info["src"] self.traversal_request = TraversalRequest( src=[src], edges=self.get_edges(), direction=self.get_direction(), return_nodes=NodeFilter(types=self.get_return_types()), return_fields=NodeFields(), ) if self.get_max_edges(): self.traversal_request.max_edges = self.get_max_edges() await self.check_swhid(src) self.configure_request() def configure_request(self): pass async def stream_response(self): async for node in self.rpc_client.Traverse(self.traversal_request): await self.stream_line(node.swhid) class LeavesView(SimpleTraversalView): def configure_request(self): self.traversal_request.return_nodes.max_traversal_successors = 0 class NeighborsView(SimpleTraversalView): def configure_request(self): self.traversal_request.min_depth = 1 self.traversal_request.max_depth = 1 class VisitNodesView(SimpleTraversalView): pass class VisitEdgesView(SimpleTraversalView): def configure_request(self): self.traversal_request.return_fields.successor = True async def stream_response(self): async for node in self.rpc_client.Traverse(self.traversal_request): for succ in node.successor: await self.stream_line(node.swhid + " " + succ.swhid) class CountView(GraphView): """Base class for counting views.""" count_type: Optional[str] = None async def get(self): src = self.request.match_info["src"] self.traversal_request = TraversalRequest( src=[src], edges=self.get_edges(), direction=self.get_direction(), return_nodes=NodeFilter(types=self.get_return_types()), return_fields=NodeFields(), ) if self.get_max_edges(): self.traversal_request.max_edges = self.get_max_edges() self.configure_request() res = await self.rpc_client.CountNodes(self.traversal_request) return aiohttp.web.Response( body=str(res.count), content_type="application/json" ) def configure_request(self): pass class CountNeighborsView(CountView): def configure_request(self): self.traversal_request.min_depth = 1 self.traversal_request.max_depth = 1 class CountLeavesView(CountView): def configure_request(self): self.traversal_request.return_nodes.max_traversal_successors = 0 class CountVisitNodesView(CountView): pass -def spawn_java_rpc_server(config, port=None): - if port is None: - port = aiohttp.test_utils.unused_port() - config = check_config(config or {}) - cmd = [ - "java", - "-cp", - config["classpath"], - *config["java_tool_options"].split(), - "org.softwareheritage.graph.rpc.GraphServer", - "--port", - str(port), - config["graph"]["path"], - ] - server = subprocess.Popen(cmd) - rpc_url = f"localhost:{port}" - return server, rpc_url - - def make_app(config=None, rpc_url=None, **kwargs): app = GraphServerApp(**kwargs) if rpc_url is None: - app["local_server"], rpc_url = spawn_java_rpc_server(config) + app["local_server"], port = spawn_java_rpc_server(config) + rpc_url = f"localhost:{port}" app.add_routes( [ aiohttp.web.get("/", index), aiohttp.web.get("/graph", index), aiohttp.web.view("/graph/stats", StatsView), aiohttp.web.view("/graph/leaves/{src}", LeavesView), aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), ] ) app["rpc_url"] = rpc_url return app def make_app_from_configfile(): """Load configuration and then build application to run""" config_file = os.environ.get("SWH_CONFIG_FILENAME") config = config_read(config_file) return make_app(config=config) diff --git a/swh/graph/rpc_server.py b/swh/graph/rpc_server.py new file mode 100644 index 0000000..e4b4f1e --- /dev/null +++ b/swh/graph/rpc_server.py @@ -0,0 +1,33 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +A simple tool to start the swh-graph GRPC server in Java. +""" + +import subprocess + +import aiohttp.test_utils +import aiohttp.web + +from swh.graph.config import check_config + + +def spawn_java_rpc_server(config, port=None): + if port is None: + port = aiohttp.test_utils.unused_port() + config = check_config(config or {}) + cmd = [ + "java", + "-cp", + config["classpath"], + *config["java_tool_options"].split(), + "org.softwareheritage.graph.rpc.GraphServer", + "--port", + str(port), + config["graph"]["path"], + ] + server = subprocess.Popen(cmd) + return server, port diff --git a/swh/graph/server/__init__.py b/swh/graph/server/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py index 6554d27..f66d2b3 100644 --- a/swh/graph/tests/conftest.py +++ b/swh/graph/tests/conftest.py @@ -1,70 +1,70 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import multiprocessing from pathlib import Path import subprocess from aiohttp.test_utils import TestClient, TestServer, loop_context import pytest -from swh.graph.client import RemoteGraphClient -from swh.graph.naive_client import NaiveClient +from swh.graph.http_client import RemoteGraphClient +from swh.graph.http_naive_client import NaiveClient SWH_GRAPH_TESTS_ROOT = Path(__file__).parents[0] TEST_GRAPH_PATH = SWH_GRAPH_TESTS_ROOT / "dataset/compressed/example" class GraphServerProcess(multiprocessing.Process): def __init__(self, q, *args, **kwargs): self.q = q super().__init__(*args, **kwargs) def run(self): # Lazy import to allow debian packaging - from swh.graph.server.app import make_app + from swh.graph.http_server import make_app try: config = {"graph": {"path": TEST_GRAPH_PATH}} with loop_context() as loop: app = make_app(config=config, debug=True) client = TestClient(TestServer(app), loop=loop) loop.run_until_complete(client.start_server()) url = client.make_url("/graph/") self.q.put(url) loop.run_forever() except Exception as e: self.q.put(e) @pytest.fixture(scope="module", params=["remote", "naive"]) def graph_client(request): if request.param == "remote": queue = multiprocessing.Queue() server = GraphServerProcess(queue) server.start() res = queue.get() if isinstance(res, Exception): raise res yield RemoteGraphClient(str(res)) server.terminate() else: def zstdcat(*files): p = subprocess.run(["zstdcat", *files], stdout=subprocess.PIPE) return p.stdout.decode() edges_dataset = SWH_GRAPH_TESTS_ROOT / "dataset/edges" edge_files = edges_dataset.glob("*/*.edges.csv.zst") node_files = edges_dataset.glob("*/*.nodes.csv.zst") nodes = set(zstdcat(*node_files).strip().split("\n")) edge_lines = [line.split() for line in zstdcat(*edge_files).strip().split("\n")] edges = [(src, dst) for src, dst, *_ in edge_lines] for src, dst in edges: nodes.add(src) nodes.add(dst) yield NaiveClient(nodes=list(nodes), edges=edges) diff --git a/swh/graph/tests/test_api_client.py b/swh/graph/tests/test_http_client.py similarity index 99% rename from swh/graph/tests/test_api_client.py rename to swh/graph/tests/test_http_client.py index 5eaea60..daaa37c 100644 --- a/swh/graph/tests/test_api_client.py +++ b/swh/graph/tests/test_http_client.py @@ -1,373 +1,373 @@ import hashlib import pytest from pytest import raises from swh.core.api import RemoteException -from swh.graph.client import GraphArgumentException +from swh.graph.http_client import GraphArgumentException TEST_ORIGIN_ID = "swh:1:ori:{}".format( hashlib.sha1(b"https://example.com/swh/graph").hexdigest() ) def test_stats(graph_client): stats = graph_client.stats() assert stats["num_nodes"] == 21 assert stats["num_edges"] == 23 assert isinstance(stats["compression"], float) assert isinstance(stats["bits_per_node"], float) assert isinstance(stats["bits_per_edge"], float) assert isinstance(stats["avg_locality"], float) assert stats["indegree_min"] == 0 assert stats["indegree_max"] == 3 assert isinstance(stats["indegree_avg"], float) assert stats["outdegree_min"] == 0 assert stats["outdegree_max"] == 3 assert isinstance(stats["outdegree_avg"], float) def test_leaves(graph_client): actual = list(graph_client.leaves(TEST_ORIGIN_ID)) expected = [ "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) def test_neighbors(graph_client): actual = list( graph_client.neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) ) expected = [ "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000013", ] assert set(actual) == set(expected) def test_visit_nodes(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_nodes_filtered(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", return_types="dir", ) ) expected = [ "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ] assert set(actual) == set(expected) def test_visit_nodes_filtered_star(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", return_types="*", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", ] assert set(actual) == set(expected) def test_visit_edges(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] assert set(actual) == set(expected) def test_visit_edges_limited(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", max_edges=4, edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] # As there are four valid answers (up to reordering), we cannot check for # equality. Instead, we check the client returned all edges but one. assert set(actual).issubset(set(expected)) assert len(actual) == 3 def test_visit_edges_diamond_pattern(graph_client): actual = list( graph_client.visit_edges( "swh:1:rev:0000000000000000000000000000000000000009", edges="*", ) ) expected = [ ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ( "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000005", ), ] assert set(actual) == set(expected) @pytest.mark.skip(reason="currently disabled due to T1969") def test_walk(graph_client): args = ("swh:1:dir:0000000000000000000000000000000000000016", "rel") kwargs = { "edges": "dir:dir,dir:rev,rev:*", "direction": "backward", "traversal": "bfs", } actual = list(graph_client.walk(*args, **kwargs)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", "swh:1:rev:0000000000000000000000000000000000000018", "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.walk(*args, **kwargs2)) expected = ["swh:1:rel:0000000000000000000000000000000000000019"] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = 2 actual = list(graph_client.walk(*args, **kwargs2)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", ] assert set(actual) == set(expected) @pytest.mark.skip(reason="Random walk is deprecated") def test_random_walk_dst_is_type(graph_client): """as the walk is random, we test a visit from a cnt node to a release reachable from every single path in the backward graph, and only check the final node of the path (i.e., the release) """ args = ("swh:1:cnt:0000000000000000000000000000000000000015", "rel") kwargs = {"direction": "backward"} expected_root = "swh:1:rel:0000000000000000000000000000000000000019" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no release directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 @pytest.mark.skip(reason="Random walk is deprecated") def test_random_walk_dst_is_node(graph_client): """Same as test_random_walk_dst_is_type, but we target the specific release node instead of a type """ args = ( "swh:1:cnt:0000000000000000000000000000000000000015", "swh:1:rel:0000000000000000000000000000000000000019", ) kwargs = {"direction": "backward"} expected_root = "swh:1:rel:0000000000000000000000000000000000000019" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no origin directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 def test_count(graph_client): actual = graph_client.count_leaves(TEST_ORIGIN_ID) assert actual == 4 actual = graph_client.count_visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev" ) assert actual == 3 actual = graph_client.count_neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) assert actual == 3 def test_param_validation(graph_client): with raises(GraphArgumentException) as exc_info: # SWHID not found list(graph_client.leaves("swh:1:rel:00ffffffff000000000000000000000000000010")) if exc_info.value.response: assert exc_info.value.response.status_code == 404 with raises(GraphArgumentException) as exc_info: # malformed SWHID list( graph_client.neighbors("swh:1:rel:00ffffffff00000000zzzzzzz000000000000010") ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed edge specificaiton list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:notanodetype,dir:rev,rev:*", direction="backward", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed direction list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:dir,dir:rev,rev:*", direction="notadirection", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 @pytest.mark.skip(reason="currently disabled due to T1969") def test_param_validation_walk(graph_client): """test validation of walk-specific parameters only""" with raises(RemoteException) as exc_info: # malformed traversal order list( graph_client.walk( "swh:1:dir:0000000000000000000000000000000000000016", "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="notatraversalorder", ) ) assert exc_info.value.response.status_code == 400