diff --git a/swh/graph/backend.py b/swh/graph/backend.py --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -61,6 +61,9 @@ def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() + def get_graph(self): + return self.entry.get_graph() + def stats(self): return self.entry.stats() diff --git a/swh/graph/graph.py b/swh/graph/graph.py --- a/swh/graph/graph.py +++ b/swh/graph/graph.py @@ -149,7 +149,7 @@ class Graph: def __init__(self, backend, node2swhid, swhid2node): self.backend = backend - self.java_graph = backend.entry.get_graph() + self.java_graph = backend.get_graph() self.node2swhid = node2swhid self.swhid2node = swhid2node @@ -185,6 +185,9 @@ @contextlib.contextmanager -def load(graph_path): - with Backend(graph_path) as backend: +def load(graph_path, backend=None): + if backend is None: + with Backend(graph_path) as backend: + yield Graph(backend, backend.node2swhid, backend.swhid2node) + else: yield Graph(backend, backend.node2swhid, backend.swhid2node) diff --git a/swh/graph/simple_backend.py b/swh/graph/simple_backend.py new file mode 100644 --- /dev/null +++ b/swh/graph/simple_backend.py @@ -0,0 +1,291 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import collections +import statistics +from typing import AsyncIterator, Dict, Iterable, Iterator, List, Set, Tuple + +from swh.model.identifiers import ExtendedSWHID + + +class Swhid2NodeDict(collections.UserDict): + def iter_type(self, swhid_type: str) -> Iterator[Tuple[str, str]]: + prefix = "swh:1:{}:".format(swhid_type) + for (swhid, node) in self.items(): + if swhid.startswith(prefix): + yield (swhid, node) + + def __getitem__(self, swhid): + ExtendedSWHID.from_string(swhid) # Raises ValidationError, caught by server + return self.data[swhid] + + +class Node2SwhidDict(collections.UserDict): + def __getitem__(self, key): + try: + return self.data[key] + except KeyError: + # Pretend to be a list + raise IndexError(key) from None + + +class JavaIterator: + def __init__(self, iterator: Iterable): + self.iterator = iter(iterator) + + def nextLong(self): + return next(self.iterator) + + def __getattr__(self, name): + return getattr(self.iterator, name) + + +class SimpleBackend: + """An alternative implementation of :class:`swh.graph.backend.Backend`, + written in pure-python and meant for simulating it in other components' test + cases. + + It is NOT meant to be efficient in any way; only to be a very simple + implementation that provides the same behavior.""" + + def __init__(self, *, nodes: List[str], edges: List[Tuple[str, str]]): + self.graph = Graph(nodes, edges) + + # Used by :class:`swh.graph.graph.Graph`. In the real implementation, + # these dicts map between SWHIDs and integers. + self.swhid2node = Swhid2NodeDict({node: node for node in nodes}) + self.node2swhid = Node2SwhidDict({node: node for node in nodes}) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + pass + + def get_graph(self): + return self.graph + + def stats(self) -> Dict: + return { + "counts": { + "nodes": len(self.graph.nodes), + "edges": len(self.graph.forward_edges), + }, + "ratios": { + "compression": 1.0, + "bits_per_edge": 100, + "bits_per_node": 100, + "avg_locality": 0, + }, + "indegree": { + "min": min(map(len, self.graph.backward_edges.values())), + "max": max(map(len, self.graph.backward_edges.values())), + "avg": statistics.mean(map(len, self.graph.backward_edges.values())), + }, + "outdegree": { + "min": min(map(len, self.graph.forward_edges.values())), + "max": max(map(len, self.graph.forward_edges.values())), + "avg": statistics.mean(map(len, self.graph.forward_edges.values())), + }, + } + + def count(self, ttype, direction, edges_fmt, src) -> int: + if ttype == "neighbors": + return len(self.graph.get_filtered_neighbors(direction, edges_fmt, src)) + elif ttype == "visit_nodes": + return len(self.graph.get_subgraph(direction, edges_fmt, src)) + elif ttype == "leaves": + return len(list(self.leaves(direction, edges_fmt, src))) + else: + assert False, f"unknown ttype {ttype!r}" + + async def simple_traversal(self, ttype, direction, edges_fmt, src, max_edges): + # TODO: max_edges? + if ttype == "visit_nodes": + for node in self.graph.get_subgraph(direction, edges_fmt, src): + yield node + elif ttype == "leaves": + for node in self.leaves(direction, edges_fmt, src): + yield node + else: + assert False, f"unknown ttype {ttype!r}" + + def leaves(self, direction, edges_fmt, src) -> Iterator[str]: + yield from [ + node + for node in self.graph.get_subgraph(direction, edges_fmt, src) + if not self.graph.get_filtered_neighbors(direction, edges_fmt, node) + ] + + async def walk(self, direction, edges_fmt, algo, src, dst) -> AsyncIterator[str]: + # TODO: implement algo="bfs" + if ":" in dst: + match_path = dst.__eq__ + else: + match_path = lambda node: node.startswith(f"swh:1:{dst}:") # noqa + for path in self.graph.iter_paths_dfs(direction, edges_fmt, src): + if match_path(path[-1]): + for node in path: + yield node + + async def random_walk( + self, direction, edges_fmt, retries, src, dst + ) -> AsyncIterator[str]: + async for node in self.walk(direction, edges_fmt, "dfs", src, dst): + yield node + + async def visit_paths( + self, direction, edges_fmt, src, max_edges + ) -> AsyncIterator[List[str]]: + # TODO: max_edges? + for path in self.graph.iter_paths_dfs(direction, edges_fmt, src): + if path[-1] in self.leaves(direction, edges_fmt, src): + yield list(path) + + async def visit_edges( + self, direction, edges_fmt, src, max_edges + ) -> AsyncIterator[Tuple[str, str]]: + if max_edges == 0: + max_edges = None + else: + max_edges -= 1 + edges = list(self.graph.iter_edges_dfs(direction, edges_fmt, src)) + for (from_, to) in edges[:max_edges]: + yield (from_, to) + + +class Graph: + def __init__(self, nodes: List[str], edges: List[Tuple[str, str]]): + self.nodes = nodes + self.forward_edges: Dict[str, List[str]] = {} + self.backward_edges: Dict[str, List[str]] = {} + for (src, dst) in edges: + self.forward_edges.setdefault(src, []).append(dst) + self.backward_edges.setdefault(dst, []).append(src) + + def numNodes(self) -> int: + return len(self.nodes) + + def successors(self, node: str) -> Iterator[str]: + return JavaIterator(self.forward_edges[node]) + + def outdegree(self, node: str) -> int: + return len(self.forward_edges[node]) + + def predecessors(self, node: str) -> Iterator[str]: + return JavaIterator(self.backward_edges[node]) + + def indegree(self, node: str) -> int: + return len(self.backward_edges[node]) + + def get_filtered_neighbors( + self, direction: str, edges_fmt: str, src: str + ) -> Set[str]: + if direction == "forward": + edges = self.forward_edges + elif direction == "backward": + edges = self.backward_edges + else: + assert False, f"unknown direction {direction!r}" + + neighbors = edges.get(src, []) + + if edges_fmt == "*": + return set(neighbors) + else: + filtered_neighbors: Set[str] = set() + for edges_fmt_item in edges_fmt.split(","): + (src_fmt, dst_fmt) = edges_fmt_item.split(":") + if src_fmt != "*" and not src.startswith(f"swh:1:{src_fmt}:"): + continue + if dst_fmt == "*": + filtered_neighbors.update(neighbors) + else: + prefix = f"swh:1:{dst_fmt}:" + filtered_neighbors.update( + n for n in neighbors if n.startswith(prefix) + ) + return filtered_neighbors + + def get_subgraph(self, direction: str, edges_fmt: str, src: str) -> Set[str]: + seen = set() + to_visit = {src} + while to_visit: + node = to_visit.pop() + seen.add(node) + neighbors = set(self.get_filtered_neighbors(direction, edges_fmt, node)) + new_nodes = neighbors - seen + to_visit.update(new_nodes) + + return seen + + def iter_paths_dfs( + self, direction: str, edges_fmt: str, src: str + ) -> Iterator[Tuple[str, ...]]: + for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): + yield path + (node,) + + def iter_edges_dfs( + self, direction: str, edges_fmt: str, src: str + ) -> Iterator[Tuple[str, ...]]: + for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): + if len(path) > 0: + yield (path[-1], node) + + +class SubgraphIterator(Iterator[Tuple[Tuple[str, ...], str]]): + def __init__(self, graph: Graph, direction: str, edges_fmt: str, src: str): + self.graph = graph + self.direction = direction + self.edges_fmt = edges_fmt + self.seen: Set[str] = set() + self.src = src + + def more_work(self) -> bool: + raise NotImplementedError() + + def pop(self) -> Tuple[Tuple[str, ...], str]: + raise NotImplementedError() + + def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: + raise NotImplementedError() + + def __next__(self) -> Tuple[Tuple[str, ...], str]: + # Stores (path, next_node) + if not self.more_work(): + raise StopIteration() + + (path, node) = self.pop() + + new_path = path + (node,) + + if node not in self.seen: + neighbors = self.graph.get_filtered_neighbors( + self.direction, self.edges_fmt, node + ) + + # We want to visit the first neighbor first, and to_visit is a stack; + # so we need to reversed() the list of neighbors to get it on top + # of the stack. + for neighbor in reversed(list(neighbors)): + self.push(new_path, neighbor) + + self.seen.add(node) + return (path, node) + + +class DfsSubgraphIterator(SubgraphIterator): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.to_visit: List[Tuple[Tuple[str, ...], str]] = [((), self.src)] + + def more_work(self) -> bool: + return bool(self.to_visit) + + def pop(self) -> Tuple[Tuple[str, ...], str]: + return self.to_visit.pop() + + def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: + self.to_visit.append((new_path, neighbor)) diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py --- a/swh/graph/tests/conftest.py +++ b/swh/graph/tests/conftest.py @@ -1,3 +1,9 @@ +# Copyright (C) 2019-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import csv import multiprocessing from pathlib import Path @@ -8,22 +14,23 @@ from swh.graph.client import RemoteGraphClient from swh.graph.graph import load as graph_load from swh.graph.server.app import make_app +from swh.graph.simple_backend import SimpleBackend SWH_GRAPH_TESTS_ROOT = Path(__file__).parents[0] TEST_GRAPH_PATH = SWH_GRAPH_TESTS_ROOT / "dataset/output/example" class GraphServerProcess(multiprocessing.Process): - def __init__(self, q, *args, **kwargs): + def __init__(self, q, *args, backend, **kwargs): self.q = q + self.backend = backend super().__init__(*args, **kwargs) def run(self): try: - backend = Backend(graph_path=str(TEST_GRAPH_PATH)) - with backend: + with self.backend: with loop_context() as loop: - app = make_app(backend=backend, debug=True) + app = make_app(backend=self.backend, debug=True) client = TestClient(TestServer(app), loop=loop) loop.run_until_complete(client.start_server()) url = client.make_url("/graph/") @@ -33,19 +40,41 @@ self.q.put(e) -@pytest.fixture(scope="module") -def graph_client(): +def simple_graph_backend(): + with open(SWH_GRAPH_TESTS_ROOT / "dataset/example.nodes.csv") as fd: + nodes = [node for (node,) in csv.reader(fd, delimiter=" ")] + with open(SWH_GRAPH_TESTS_ROOT / "dataset/example.edges.csv") as fd: + edges = list(csv.reader(fd, delimiter=" ")) + return SimpleBackend(nodes=nodes, edges=edges) + + +@pytest.fixture(scope="module", params=["main-backend", "simple-backend"]) +def graph_server(request): queue = multiprocessing.Queue() - server = GraphServerProcess(queue) + if request.param == "main-backend": + backend = Backend(graph_path=str(TEST_GRAPH_PATH)) + else: + backend = simple_graph_backend() + server = GraphServerProcess(queue, backend=backend) server.start() res = queue.get() if isinstance(res, Exception): raise res - yield RemoteGraphClient(str(res)) + yield str(res) server.terminate() @pytest.fixture(scope="module") -def graph(): - with graph_load(str(TEST_GRAPH_PATH)) as g: +def graph_client(graph_server): + yield RemoteGraphClient(graph_server) + + +@pytest.fixture(scope="module", params=["main-backend", "simple-backend"]) +def graph(request): + if request.param == "main-backend": + backend = None # will use the default one + else: + backend = simple_graph_backend() + + with graph_load(str(TEST_GRAPH_PATH), backend=backend) as g: yield g