diff --git a/java/server/src/main/java/org/softwareheritage/graph/Entry.java b/java/server/src/main/java/org/softwareheritage/graph/Entry.java --- a/java/server/src/main/java/org/softwareheritage/graph/Entry.java +++ b/java/server/src/main/java/org/softwareheritage/graph/Entry.java @@ -27,6 +27,10 @@ System.err.println("Graph loaded."); } + public Graph get_graph() { + return graph.copy(); + } + public String stats() { try { Stats stats = new Stats(graph.getPath()); diff --git a/swh/graph/server/backend.py b/swh/graph/backend.py rename from swh/graph/server/backend.py rename to swh/graph/backend.py --- a/swh/graph/server/backend.py +++ b/swh/graph/backend.py @@ -5,10 +5,11 @@ import asyncio import contextlib -import json +import io import os import pathlib import struct +import subprocess import sys import tempfile @@ -25,7 +26,7 @@ def find_graph_jar(): - swh_graph_root = pathlib.Path(__file__).parents[3] + swh_graph_root = pathlib.Path(__file__).parents[2] try_paths = [ swh_graph_root / 'java/server/target/', pathlib.Path(sys.prefix) / 'share/swh-graph/', @@ -37,6 +38,16 @@ raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?") +def _get_pipe_stderr(): + # Get stderr if possible, or pipe to stdout if running with Jupyter. + try: + sys.stderr.fileno() + except io.UnsupportedOperation: + return subprocess.STDOUT + else: + return sys.stderr + + class Backend: def __init__(self, graph_path): self.gateway = None @@ -49,13 +60,14 @@ classpath=find_graph_jar(), die_on_exit=True, redirect_stdout=sys.stdout, - redirect_stderr=sys.stderr, + redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT) self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) + return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() @@ -64,36 +76,30 @@ return self.entry.stats() async def simple_traversal(self, ttype, direction, edges_fmt, src): - assert ttype in ('leaves', 'neighbors', 'visit_nodes', 'visit_paths') - src_id = self.pid2node[src] + assert ttype in ('leaves', 'neighbors', 'visit_nodes') method = getattr(self.stream_proxy, ttype) - async for node_id in method(direction, edges_fmt, src_id): - if node_id == PATH_SEPARATOR_ID: - yield None - else: - yield self.node2pid[node_id] + async for node_id in method(direction, edges_fmt, src): + yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): - src_id = self.pid2node[src] if dst in PID_TYPES: it = self.stream_proxy.walk_type(direction, edges_fmt, algo, - src_id, dst) + src, dst) else: - dst_id = self.pid2node[dst] it = self.stream_proxy.walk(direction, edges_fmt, algo, - src_id, dst_id) - + src, dst) async for node_id in it: - yield self.node2pid[node_id] - - async def visit_paths(self, *args): - buffer = [] - async for res_pid in self.simple_traversal('visit_paths', *args): - if res_pid is None: # Path separator, flush - yield json.dumps(buffer) - buffer = [] + yield node_id + + async def visit_paths(self, direction, edges_fmt, src): + path = [] + async for node in self.stream_proxy.visit_paths( + direction, edges_fmt, src): + if node == PATH_SEPARATOR_ID: + yield path + path = [] else: - buffer.append(res_pid) + path.append(node) class JavaStreamProxy: @@ -117,7 +123,12 @@ async def read_node_ids(self, fname): loop = asyncio.get_event_loop() - with (await loop.run_in_executor(None, open, fname, 'rb')) as f: + open_thread = loop.run_in_executor(None, open, fname, 'rb') + + # Since the open() call on the FIFO is blocking until it is also opened + # on the Java side, we await it with a timeout in case there is an + # exception that prevents the write-side open(). + with (await asyncio.wait_for(open_thread, timeout=2)) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: @@ -155,7 +166,17 @@ async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) - async for value in reader: - yield value + try: + async for value in reader: + yield value + except asyncio.TimeoutError: + # If the read-side open() timeouts, an exception on the + # Java side probably happened that prevented the + # write-side open(). We propagate this exception here if + # that is the case. + task_exc = java_task.exception() + if task_exc: + raise task_exc + raise await java_task return java_call_iterator diff --git a/swh/graph/cli.py b/swh/graph/cli.py --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -11,7 +11,7 @@ from swh.graph import client from swh.graph.pid import PidToIntMap, IntToPidMap from swh.graph.server.app import make_app -from swh.graph.server.backend import Backend +from swh.graph.backend import Backend @click.group(name='graph', context_settings=CONTEXT_SETTINGS, diff --git a/swh/graph/dot.py b/swh/graph/dot.py new file mode 100644 --- /dev/null +++ b/swh/graph/dot.py @@ -0,0 +1,65 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import lru_cache +import subprocess +import collections + + +KIND_TO_SHAPE = { + 'ori': 'egg', + 'snp': 'doubleoctagon', + 'rel': 'octagon', + 'rev': 'diamond', + 'dir': 'folder', + 'cnt': 'oval', +} + + +@lru_cache() +def dot_to_svg(dot): + try: + p = subprocess.run( + ['dot', '-Tsvg'], input=dot, + universal_newlines=True, capture_output=True, + check=True + ) + except subprocess.CalledProcessError as e: + raise RuntimeError(e.stderr) from e + return p.stdout + + +def graph_dot(nodes): + ids = {n.id for n in nodes} + + by_kind = collections.defaultdict(list) + for n in nodes: + by_kind[n.kind].append(n) + + forward_edges = [ + (node.id, child.id) + for node in nodes + for child in node.children() + if child.id in ids + ] + backward_edges = [ + (parent.id, node.id) + for node in nodes + for parent in node.parents() + if parent.id in ids + ] + edges = set(forward_edges + backward_edges) + edges_fmt = '\n'.join('{} -> {};'.format(a, b) for a, b in edges) + nodes_fmt = '\n'.join(node.dot_fragment() for node in nodes) + + s = """digraph G {{ + ranksep=1; + nodesep=0.5; + + {nodes} + {edges} + + }}""".format(nodes=nodes_fmt, edges=edges_fmt) + return s diff --git a/swh/graph/graph.py b/swh/graph/graph.py new file mode 100644 --- /dev/null +++ b/swh/graph/graph.py @@ -0,0 +1,157 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import asyncio +import contextlib +from swh.graph.backend import Backend +from swh.graph.dot import dot_to_svg, graph_dot, KIND_TO_SHAPE + + +KIND_TO_URL = { + 'ori': 'https://archive.softwareheritage.org/browse/origin/{}', + 'snp': 'https://archive.softwareheritage.org/browse/snapshot/{}', + 'rel': 'https://archive.softwareheritage.org/browse/release/{}', + 'rev': 'https://archive.softwareheritage.org/browse/revision/{}', + 'dir': 'https://archive.softwareheritage.org/browse/directory/{}', + 'cnt': 'https://archive.softwareheritage.org/browse/content/sha1_git:{}/', +} + + +def call_async_gen(generator, *args, **kwargs): + loop = asyncio.get_event_loop() + it = generator(*args, **kwargs).__aiter__() + while True: + try: + res = loop.run_until_complete(it.__anext__()) + yield res + except StopAsyncIteration: + break + + +class Neighbors: + """Neighbor iterator with custom O(1) length method""" + def __init__(self, graph, iterator, length_func): + self.graph = graph + self.iterator = iterator + self.length_func = length_func + + def __iter__(self): + return self + + def __next__(self): + succ = self.iterator.nextLong() + if succ == -1: + raise StopIteration + return GraphNode(self.graph, succ) + + def __len__(self): + return self.length_func() + + +class GraphNode: + """Node in the SWH graph""" + + def __init__(self, graph, node_id): + self.graph = graph + self.id = node_id + + def children(self): + return Neighbors( + self.graph, + self.graph.java_graph.successors(self.id), + lambda: self.graph.java_graph.outdegree(self.id)) + + def parents(self): + return Neighbors( + self.graph, + self.graph.java_graph.predecessors(self.id), + lambda: self.graph.java_graph.indegree(self.id)) + + def simple_traversal(self, ttype, direction='forward', edges='*'): + for node in call_async_gen( + self.graph.backend.simple_traversal, + ttype, direction, edges, self.id + ): + yield self.graph[node] + + def leaves(self, *args, **kwargs): + yield from self.simple_traversal('leaves', *args, **kwargs) + + def visit_nodes(self, *args, **kwargs): + yield from self.simple_traversal('visit_nodes', *args, **kwargs) + + def visit_paths(self, direction='forward', edges='*'): + for path in call_async_gen( + self.graph.backend.visit_paths, + direction, edges, self.id + ): + yield [self.graph[node] for node in path] + + def walk(self, dst, direction='forward', edges='*', traversal='dfs'): + for node in call_async_gen( + self.graph.backend.walk, + direction, edges, traversal, self.id, dst + ): + yield self.graph[node] + + @property + def pid(self): + return self.graph.node2pid[self.id] + + @property + def kind(self): + return self.pid.split(':')[2] + + def __str__(self): + return self.pid + + def __repr__(self): + return '<{}>'.format(self.pid) + + def dot_fragment(self): + swh, version, kind, hash = self.pid.split(':') + label = '{}:{}..{}'.format(kind, hash[0:2], hash[-2:]) + url = KIND_TO_URL[kind].format(hash) + shape = KIND_TO_SHAPE[kind] + return ('{} [label="{}", href="{}", target="_blank", shape="{}"];' + .format(self.id, label, url, shape)) + + def _repr_svg_(self): + nodes = [self, *list(self.children()), *list(self.parents())] + dot = graph_dot(nodes) + svg = dot_to_svg(dot) + return svg + + +class Graph: + def __init__(self, backend, node2pid, pid2node): + self.backend = backend + self.java_graph = backend.entry.get_graph() + self.node2pid = node2pid + self.pid2node = pid2node + + def stats(self): + return self.backend.stats() + + @property + def path(self): + return self.java_graph.getPath() + + def __len__(self): + return self.java_graph.getNbNodes() + + def __getitem__(self, node_id): + if isinstance(node_id, int): + self.node2pid[node_id] # check existence + return GraphNode(self, node_id) + elif isinstance(node_id, str): + node_id = self.pid2node[node_id] + return GraphNode(self, node_id) + + +@contextlib.contextmanager +def load(graph_path): + with Backend(graph_path) as backend: + yield Graph(backend, backend.node2pid, backend.pid2node) diff --git a/swh/graph/server/app.py b/swh/graph/server/app.py --- a/swh/graph/server/app.py +++ b/swh/graph/server/app.py @@ -8,10 +8,12 @@ FIFO as a transport to stream integers between the two languages. """ +import json import contextlib import aiohttp.web from swh.core.api.asynchronous import RPCServerApp +from swh.model.identifiers import PID_TYPES @contextlib.asynccontextmanager @@ -45,14 +47,18 @@ def get_simple_traversal_handler(ttype): async def simple_traversal(request): + backend = request.app['backend'] + src = request.match_info['src'] edges = request.query.get('edges', '*') direction = request.query.get('direction', 'forward') + src_node = backend.pid2node[src] async with stream_response(request) as response: - async for res_pid in request.app['backend'].simple_traversal( - ttype, direction, edges, src + async for res_node in backend.simple_traversal( + ttype, direction, edges, src_node ): + res_pid = backend.node2pid[res_node] await response.write('{}\n'.format(res_pid).encode()) return response @@ -60,28 +66,40 @@ async def walk(request): + backend = request.app['backend'] + src = request.match_info['src'] dst = request.match_info['dst'] edges = request.query.get('edges', '*') direction = request.query.get('direction', 'forward') algo = request.query.get('traversal', 'dfs') - it = request.app['backend'].walk(direction, edges, algo, src, dst) + src_node = backend.pid2node[src] + if dst not in PID_TYPES: + dst = backend.pid2node[dst] async with stream_response(request) as response: - async for res_pid in it: + async for res_node in backend.walk( + direction, edges, algo, src_node, dst + ): + res_pid = backend.node2pid[res_node] await response.write('{}\n'.format(res_pid).encode()) return response async def visit_paths(request): + backend = request.app['backend'] + src = request.match_info['src'] edges = request.query.get('edges', '*') direction = request.query.get('direction', 'forward') - it = request.app['backend'].visit_paths(direction, edges, src) + src_node = backend.pid2node[src] + it = backend.visit_paths(direction, edges, src_node) async with stream_response(request) as response: - async for res_pid in it: - await response.write('{}\n'.format(res_pid).encode()) + async for res_path in it: + res_path_pid = [backend.node2pid[n] for n in res_path] + line = json.dumps(res_path_pid) + await response.write('{}\n'.format(line).encode()) return response diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py --- a/swh/graph/tests/conftest.py +++ b/swh/graph/tests/conftest.py @@ -4,8 +4,9 @@ from aiohttp.test_utils import TestServer, TestClient, loop_context +from swh.graph.graph import load as graph_load from swh.graph.client import RemoteGraphClient -from swh.graph.server.backend import Backend +from swh.graph.backend import Backend from swh.graph.server.app import make_app SWH_GRAPH_ROOT = Path(__file__).parents[3] @@ -21,7 +22,7 @@ backend = Backend(graph_path=str(TEST_GRAPH_PATH)) with backend: with loop_context() as loop: - app = make_app(backend=backend) + app = make_app(backend=backend, debug=True) client = TestClient(TestServer(app), loop=loop) loop.run_until_complete(client.start_server()) url = client.make_url('/graph/') @@ -37,3 +38,9 @@ url = queue.get() yield RemoteGraphClient(str(url)) server.terminate() + + +@pytest.fixture(scope="module") +def graph(): + with graph_load(str(TEST_GRAPH_PATH)) as g: + yield g diff --git a/swh/graph/tests/test_graph.py b/swh/graph/tests/test_graph.py new file mode 100644 --- /dev/null +++ b/swh/graph/tests/test_graph.py @@ -0,0 +1,103 @@ +import pytest + + +def test_graph(graph): + assert len(graph) == 21 + + obj = 'swh:1:dir:0000000000000000000000000000000000000008' + node = graph[obj] + + assert str(node) == obj + assert len(node.children()) == 3 + assert len(node.parents()) == 2 + + actual = {p.pid for p in node.children()} + expected = { + 'swh:1:cnt:0000000000000000000000000000000000000001', + 'swh:1:dir:0000000000000000000000000000000000000006', + 'swh:1:cnt:0000000000000000000000000000000000000007' + } + assert expected == actual + + actual = {p.pid for p in node.parents()} + expected = { + 'swh:1:rev:0000000000000000000000000000000000000009', + 'swh:1:dir:0000000000000000000000000000000000000012', + } + assert expected == actual + + +def test_invalid_pid(graph): + with pytest.raises(IndexError): + graph[1337] + + with pytest.raises(IndexError): + graph[len(graph) + 1] + + with pytest.raises(KeyError): + graph['swh:1:dir:0000000000000000000000000000000420000012'] + + +def test_leaves(graph): + actual = list(graph['swh:1:ori:0000000000000000000000000000000000000021'] + .leaves()) + actual = [p.pid for p in actual] + expected = [ + 'swh:1:cnt:0000000000000000000000000000000000000001', + 'swh:1:cnt:0000000000000000000000000000000000000004', + 'swh:1:cnt:0000000000000000000000000000000000000005', + 'swh:1:cnt:0000000000000000000000000000000000000007' + ] + assert set(actual) == set(expected) + + +def test_visit_nodes(graph): + actual = list(graph['swh:1:rel:0000000000000000000000000000000000000010'] + .visit_nodes(edges='rel:rev,rev:rev')) + actual = [p.pid for p in actual] + expected = [ + 'swh:1:rel:0000000000000000000000000000000000000010', + 'swh:1:rev:0000000000000000000000000000000000000009', + 'swh:1:rev:0000000000000000000000000000000000000003' + ] + assert set(actual) == set(expected) + + +def test_visit_paths(graph): + actual = list(graph['swh:1:snp:0000000000000000000000000000000000000020'] + .visit_paths(edges='snp:*,rev:*')) + actual = [tuple(n.pid for n in path) for path in actual] + expected = [ + ( + 'swh:1:snp:0000000000000000000000000000000000000020', + 'swh:1:rev:0000000000000000000000000000000000000009', + 'swh:1:rev:0000000000000000000000000000000000000003', + 'swh:1:dir:0000000000000000000000000000000000000002' + ), + ( + 'swh:1:snp:0000000000000000000000000000000000000020', + 'swh:1:rev:0000000000000000000000000000000000000009', + 'swh:1:dir:0000000000000000000000000000000000000008' + ), + ( + 'swh:1:snp:0000000000000000000000000000000000000020', + 'swh:1:rel:0000000000000000000000000000000000000010' + ) + ] + assert set(actual) == set(expected) + + +def test_walk(graph): + actual = list(graph['swh:1:dir:0000000000000000000000000000000000000016'] + .walk('rel', + edges='dir:dir,dir:rev,rev:*', + direction='backward', + traversal='bfs')) + actual = [p.pid for p in actual] + expected = [ + 'swh:1:dir:0000000000000000000000000000000000000016', + 'swh:1:dir:0000000000000000000000000000000000000017', + 'swh:1:rev:0000000000000000000000000000000000000018', + 'swh:1:rel:0000000000000000000000000000000000000019' + ] + assert set(actual) == set(expected)