diff --git a/swh/graph/server/backend.py b/swh/graph/backend.py similarity index 84% rename from swh/graph/server/backend.py rename to swh/graph/backend.py index a0d2ccb..569b6f7 100644 --- a/swh/graph/server/backend.py +++ b/swh/graph/backend.py @@ -1,162 +1,166 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib -import json +import io import os import pathlib import struct +import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway from swh.graph.pid import IntToPidMap, PidToIntMap from swh.model.identifiers import PID_TYPES BUF_SIZE = 64*1024 BIN_FMT = '>q' # 64 bit integer, big endian PATH_SEPARATOR_ID = -1 NODE2PID_EXT = 'node2pid.bin' PID2NODE_EXT = 'pid2node.bin' def find_graph_jar(): - swh_graph_root = pathlib.Path(__file__).parents[3] + swh_graph_root = pathlib.Path(__file__).parents[2] try_paths = [ swh_graph_root / 'java/server/target/', pathlib.Path(sys.prefix) / 'share/swh-graph/', ] for path in try_paths: glob = list(path.glob('swh-graph-*.jar')) if glob: return str(glob[0]) raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?") +def _get_pipe_stderr(): + # Get stderr if possible, or pipe to stdout if running with Jupyter. + try: + sys.stderr.fileno() + except io.UnsupportedOperation: + return subprocess.STDOUT + else: + return sys.stderr + + class Backend: def __init__(self, graph_path): self.gateway = None self.entry = None self.graph_path = graph_path def __enter__(self): self.gateway = JavaGateway.launch_gateway( java_path=None, classpath=find_graph_jar(), die_on_exit=True, redirect_stdout=sys.stdout, - redirect_stderr=sys.stderr, + redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT) self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() def stats(self): return self.entry.stats() async def simple_traversal(self, ttype, direction, edges_fmt, src): assert ttype in ('leaves', 'neighbors', 'visit_nodes', 'visit_paths') - src_id = self.pid2node[src] method = getattr(self.stream_proxy, ttype) - async for node_id in method(direction, edges_fmt, src_id): - if node_id == PATH_SEPARATOR_ID: - yield None - else: - yield self.node2pid[node_id] + async for node_id in method(direction, edges_fmt, src): + yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): - src_id = self.pid2node[src] if dst in PID_TYPES: it = self.stream_proxy.walk_type(direction, edges_fmt, algo, - src_id, dst) + src, dst) else: - dst_id = self.pid2node[dst] it = self.stream_proxy.walk(direction, edges_fmt, algo, - src_id, dst_id) - + src, dst) async for node_id in it: - yield self.node2pid[node_id] + yield node_id async def visit_paths(self, *args): - buffer = [] - async for res_pid in self.simple_traversal('visit_paths', *args): - if res_pid is None: # Path separator, flush - yield json.dumps(buffer) - buffer = [] + path = [] + async for node in self.simple_traversal('visit_paths', *args): + if node == PATH_SEPARATOR_ID: + yield path + path = [] else: - buffer.append(res_pid) + path.append(node) class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() with (await loop.run_in_executor(None, open, fname, 'rb')) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: break for data in struct.iter_unpack(BIN_FMT, data): yield data[0] class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname: cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo') os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) async for value in reader: yield value await java_task return java_call_iterator diff --git a/swh/graph/cli.py b/swh/graph/cli.py index c0f5626..97c0ad2 100644 --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -1,150 +1,150 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import aiohttp import click import sys from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.graph import client from swh.graph.pid import PidToIntMap, IntToPidMap from swh.graph.server.app import make_app -from swh.graph.server.backend import Backend +from swh.graph.backend import Backend @click.group(name='graph', context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.pass_context def cli(ctx): """Software Heritage graph tools.""" ctx.ensure_object(dict) @cli.command('api-client') @click.option('--host', default='localhost', help='Graph server host') @click.option('--port', default='5009', help='Graph server port') @click.pass_context def api_client(ctx, host, port): """Client for the Software Heritage Graph REST service """ url = 'http://{}:{}'.format(host, port) app = client.RemoteGraphClient(url) # TODO: run web app print(app.stats()) @cli.group('map') @click.pass_context def map(ctx): """Manage swh-graph on-disk maps""" pass def dump_pid2int(filename): for (pid, int) in PidToIntMap(filename): print('{}\t{}'.format(pid, int)) def dump_int2pid(filename): for (int, pid) in IntToPidMap(filename): print('{}\t{}'.format(int, pid)) def restore_pid2int(filename): """read a textual PID->int map from stdin and write its binary version to filename """ with open(filename, 'wb') as dst: for line in sys.stdin: (str_pid, str_int) = line.split() PidToIntMap.write_record(dst, str_pid, int(str_int)) def restore_int2pid(filename, length): """read a textual int->PID map from stdin and write its binary version to filename """ int2pid = IntToPidMap(filename, mode='wb', length=length) for line in sys.stdin: (str_int, str_pid) = line.split() int2pid[int(str_int)] = str_pid int2pid.close() @map.command('dump') @click.option('--type', '-t', 'map_type', required=True, type=click.Choice(['pid2int', 'int2pid']), help='type of map to dump') @click.argument('filename', required=True, type=click.Path(exists=True)) @click.pass_context def dump_map(ctx, map_type, filename): """dump a binary PID<->int map to textual format""" if map_type == 'pid2int': dump_pid2int(filename) elif map_type == 'int2pid': dump_int2pid(filename) else: raise ValueError('invalid map type: ' + map_type) pass @map.command('restore') @click.option('--type', '-t', 'map_type', required=True, type=click.Choice(['pid2int', 'int2pid']), help='type of map to dump') @click.option('--length', '-l', type=int, help='''map size in number of logical records (required for int2pid maps)''') @click.argument('filename', required=True, type=click.Path()) @click.pass_context def restore_map(ctx, map_type, length, filename): """restore a binary PID<->int map from textual format""" if map_type == 'pid2int': restore_pid2int(filename) elif map_type == 'int2pid': if length is None: raise click.UsageError( 'map length is required when restoring {} maps'.format( map_type), ctx) restore_int2pid(filename, length) else: raise ValueError('invalid map type: ' + map_type) @cli.group('graph') @click.pass_context def graph(ctx): """Manage swh-graph on-disk maps""" pass @graph.command(name='rpc-serve') @click.option('--host', default='0.0.0.0', metavar='IP', show_default=True, help="Host ip address to bind the server on") @click.option('--port', default=5009, type=click.INT, metavar='PORT', show_default=True, help="Binding port of the server") @click.option('--graph', required=True, metavar='GRAPH', help="Path prefix of the graph to load") @click.pass_context def serve(ctx, host, port, graph): backend = Backend(graph_path=graph) app = make_app(backend=backend) with backend: aiohttp.web.run_app(app, host=host, port=port) def main(): return cli(auto_envvar_prefix='SWH_GRAPH') if __name__ == '__main__': main() diff --git a/swh/graph/dot.py b/swh/graph/dot.py index accdc56..265f06b 100644 --- a/swh/graph/dot.py +++ b/swh/graph/dot.py @@ -1,49 +1,55 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import lru_cache import subprocess import collections @lru_cache() def dot_to_svg(dot): - p = subprocess.run(['dot', '-Tsvg'], input=dot, universal_newlines=True, - capture_output=True, check=True) + try: + p = subprocess.run( + ['dot', '-Tsvg'], input=dot, + universal_newlines=True, capture_output=True, + check=True + ) + except subprocess.CalledProcessError as e: + raise RuntimeError(e.stderr) from e return p.stdout def graph_dot(nodes): ids = {n.id for n in nodes} by_kind = collections.defaultdict(list) for n in nodes: by_kind[n.kind].append(n) forward_edges = [ (node.id, child.id) for node in nodes for child in node.children() if child.id in ids ] backward_edges = [ (parent.id, node.id) for node in nodes for parent in node.parents() if parent.id in ids ] edges = set(forward_edges + backward_edges) edges_fmt = '\n'.join('{} -> {};'.format(a, b) for a, b in edges) nodes_fmt = '\n'.join(node.dot_fragment() for node in nodes) s = """digraph G {{ ranksep=1; nodesep=0.5; {nodes} {edges} }}""".format(nodes=nodes_fmt, edges=edges_fmt) return s diff --git a/swh/graph/graph.py b/swh/graph/graph.py index f295fb4..b863c34 100644 --- a/swh/graph/graph.py +++ b/swh/graph/graph.py @@ -1,113 +1,164 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import asyncio import contextlib -from swh.graph.server.backend import Backend +from swh.graph.backend import Backend from swh.graph.dot import dot_to_svg, graph_dot KIND_TO_SHAPE = { 'ori': 'egg', 'snp': 'doubleoctagon', 'rel': 'octagon', 'rev': 'diamond', 'dir': 'folder', 'cnt': 'oval', } +KIND_TO_URL = { + 'ori': 'https://archive.softwareheritage.org/browse/origin/{}', + 'snp': 'https://archive.softwareheritage.org/browse/snapshot/{}', + 'rel': 'https://archive.softwareheritage.org/browse/release/{}', + 'rev': 'https://archive.softwareheritage.org/browse/revision/{}', + 'dir': 'https://archive.softwareheritage.org/browse/directory/{}', + 'cnt': 'https://archive.softwareheritage.org/browse/content/sha1_git:{}/', +} + + +def call_async_gen(generator, *args, **kwargs): + loop = asyncio.get_event_loop() + it = generator(*args, **kwargs).__aiter__() + while True: + try: + res = loop.run_until_complete(it.__anext__()) + yield res + except StopAsyncIteration: + break + + class Neighbors: """Neighbor iterator with custom O(1) length method""" def __init__(self, parent_graph, iterator, length_func): self.parent_graph = parent_graph self.iterator = iterator self.length_func = length_func def __iter__(self): return self def __next__(self): succ = self.iterator.nextLong() if succ == -1: raise StopIteration return GraphNode(self.parent_graph, succ) def __len__(self): return self.length_func() class GraphNode: """Node in the SWH graph""" def __init__(self, parent_graph, node_id): self.parent_graph = parent_graph self.id = node_id def children(self): return Neighbors( self.parent_graph, self.parent_graph.java_graph.successors(self.id), lambda: self.parent_graph.java_graph.outdegree(self.id)) def parents(self): return Neighbors( self.parent_graph, self.parent_graph.java_graph.predecessors(self.id), lambda: self.parent_graph.java_graph.indegree(self.id)) + def simple_traversal(self, ttype, direction='forward', edges='*'): + for node in call_async_gen( + self.parent_graph.backend.simple_traversal, + ttype, direction, edges, self.id + ): + yield self.parent_graph[node] + + def leaves(self, *args, **kwargs): + yield from self.simple_traversal('leaves', *args, **kwargs) + + def visit_nodes(self, *args, **kwargs): + yield from self.simple_traversal('visit_nodes', *args, **kwargs) + + def visit_paths(self, direction='forward', edges='*'): + for path in call_async_gen( + self.parent_graph.backend.visit_paths, + direction, edges, self.id + ): + yield [self.parent_graph[node] for node in path] + + def walk(self, dst, direction='forward', edges='*', traversal='dfs'): + for node in call_async_gen( + self.parent_graph.backend.walk, + direction, edges, traversal, self.id, dst + ): + yield self.parent_graph[node] + @property def pid(self): return self.parent_graph.node2pid[self.id] @property def kind(self): return self.pid.split(':')[2] def __str__(self): return self.pid def __repr__(self): return '<{}>'.format(self.pid) def dot_fragment(self): swh, version, kind, hash = self.pid.split(':') label = '{}:{}..{}'.format(kind, hash[0:2], hash[-2:]) + url = KIND_TO_URL[kind].format(hash) shape = KIND_TO_SHAPE[kind] - return '{} [label="{}", shape="{}"];'.format(self.id, label, shape) + return ('{} [label="{}", href="{}", target="_blank", shape="{}"];' + .format(self.id, label, url, shape)) def _repr_svg_(self): nodes = [self, *list(self.children()), *list(self.parents())] dot = graph_dot(nodes) svg = dot_to_svg(dot) return svg class Graph: - def __init__(self, java_graph, node2pid, pid2node): - self.java_graph = java_graph + def __init__(self, backend, node2pid, pid2node): + self.backend = backend + self.java_graph = backend.entry.get_graph() self.node2pid = node2pid self.pid2node = pid2node @property def path(self): return self.java_graph.getPath() def __len__(self): return self.java_graph.getNbNodes() def __getitem__(self, node_id): if isinstance(node_id, int): self.node2pid[node_id] # check existence return GraphNode(self, node_id) elif isinstance(node_id, str): node_id = self.pid2node[node_id] return GraphNode(self, node_id) @contextlib.contextmanager def load(graph_path): with Backend(graph_path) as backend: - yield Graph(backend.entry.get_graph(), - backend.node2pid, backend.pid2node) + yield Graph(backend, backend.node2pid, backend.pid2node) diff --git a/swh/graph/server/app.py b/swh/graph/server/app.py index 21b9dbc..8e57946 100644 --- a/swh/graph/server/app.py +++ b/swh/graph/server/app.py @@ -1,102 +1,120 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using FIFO as a transport to stream integers between the two languages. """ +import json import contextlib import aiohttp.web from swh.core.api.asynchronous import RPCServerApp +from swh.model.identifiers import PID_TYPES @contextlib.asynccontextmanager async def stream_response(request, *args, **kwargs): response = aiohttp.web.StreamResponse(*args, **kwargs) await response.prepare(request) yield response await response.write_eof() async def index(request): return aiohttp.web.Response( content_type='text/html', body=""" Software Heritage storage server

You have reached the Software Heritage graph API server.

See its API documentation for more information.

""") async def stats(request): stats = request.app['backend'].stats() return aiohttp.web.Response(body=stats, content_type='application/json') def get_simple_traversal_handler(ttype): async def simple_traversal(request): + backend = request.app['backend'] + src = request.match_info['src'] edges = request.query.get('edges', '*') direction = request.query.get('direction', 'forward') + src_node = backend.pid2node[src] async with stream_response(request) as response: - async for res_pid in request.app['backend'].simple_traversal( - ttype, direction, edges, src + async for res_node in backend.simple_traversal( + ttype, direction, edges, src_node ): + res_pid = backend.node2pid[res_node] await response.write('{}\n'.format(res_pid).encode()) return response return simple_traversal async def walk(request): + backend = request.app['backend'] + src = request.match_info['src'] dst = request.match_info['dst'] edges = request.query.get('edges', '*') direction = request.query.get('direction', 'forward') algo = request.query.get('traversal', 'dfs') - it = request.app['backend'].walk(direction, edges, algo, src, dst) + src_node = backend.pid2node[src] + if dst not in PID_TYPES: + dst = backend.pid2node[dst] async with stream_response(request) as response: - async for res_pid in it: + async for res_node in backend.walk( + direction, edges, algo, src_node, dst + ): + res_pid = backend.node2pid[res_node] await response.write('{}\n'.format(res_pid).encode()) return response async def visit_paths(request): + backend = request.app['backend'] + src = request.match_info['src'] edges = request.query.get('edges', '*') direction = request.query.get('direction', 'forward') - it = request.app['backend'].visit_paths(direction, edges, src) + src_node = backend.pid2node[src] + it = backend.visit_paths(direction, edges, src_node) async with stream_response(request) as response: - async for res_pid in it: - await response.write('{}\n'.format(res_pid).encode()) + async for res_path in it: + res_path_pid = [backend.node2pid[n] for n in res_path] + line = json.dumps(res_path_pid) + await response.write('{}\n'.format(line).encode()) return response def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/graph/stats', stats) app.router.add_route('GET', '/graph/leaves/{src}', get_simple_traversal_handler('leaves')) app.router.add_route('GET', '/graph/neighbors/{src}', get_simple_traversal_handler('neighbors')) app.router.add_route('GET', '/graph/visit/nodes/{src}', get_simple_traversal_handler('visit_nodes')) app.router.add_route('GET', '/graph/visit/paths/{src}', visit_paths) app.router.add_route('GET', '/graph/walk/{src}/{dst}', walk) app['backend'] = backend return app diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py index eff5f0d..1c391a8 100644 --- a/swh/graph/tests/conftest.py +++ b/swh/graph/tests/conftest.py @@ -1,46 +1,46 @@ import multiprocessing import pytest from pathlib import Path from aiohttp.test_utils import TestServer, TestClient, loop_context from swh.graph.graph import load as graph_load from swh.graph.client import RemoteGraphClient -from swh.graph.server.backend import Backend +from swh.graph.backend import Backend from swh.graph.server.app import make_app SWH_GRAPH_ROOT = Path(__file__).parents[3] TEST_GRAPH_PATH = SWH_GRAPH_ROOT / 'tests/dataset/output/example' class GraphServerProcess(multiprocessing.Process): def __init__(self, q, *args, **kwargs): self.q = q super().__init__(*args, **kwargs) def run(self): backend = Backend(graph_path=str(TEST_GRAPH_PATH)) with backend: with loop_context() as loop: - app = make_app(backend=backend) + app = make_app(backend=backend, debug=True) client = TestClient(TestServer(app), loop=loop) loop.run_until_complete(client.start_server()) url = client.make_url('/graph/') self.q.put(url) loop.run_forever() @pytest.fixture(scope="module") def graph_client(): queue = multiprocessing.Queue() server = GraphServerProcess(queue) server.start() url = queue.get() yield RemoteGraphClient(str(url)) server.terminate() @pytest.fixture(scope="module") def graph(): with graph_load(str(TEST_GRAPH_PATH)) as g: yield g diff --git a/swh/graph/tests/test_graph.py b/swh/graph/tests/test_graph.py index 36d1cf1..ef5919b 100644 --- a/swh/graph/tests/test_graph.py +++ b/swh/graph/tests/test_graph.py @@ -1,38 +1,103 @@ import pytest def test_graph(graph): assert len(graph) == 21 obj = 'swh:1:dir:0000000000000000000000000000000000000008' node = graph[obj] assert str(node) == obj assert len(node.children()) == 3 assert len(node.parents()) == 2 actual = {p.pid for p in node.children()} expected = { 'swh:1:cnt:0000000000000000000000000000000000000001', 'swh:1:dir:0000000000000000000000000000000000000006', 'swh:1:cnt:0000000000000000000000000000000000000007' } assert expected == actual actual = {p.pid for p in node.parents()} expected = { 'swh:1:rev:0000000000000000000000000000000000000009', 'swh:1:dir:0000000000000000000000000000000000000012', } assert expected == actual def test_invalid_pid(graph): with pytest.raises(IndexError): graph[1337] with pytest.raises(IndexError): graph[len(graph) + 1] with pytest.raises(KeyError): graph['swh:1:dir:0000000000000000000000000000000420000012'] + + +def test_leaves(graph): + actual = list(graph['swh:1:ori:0000000000000000000000000000000000000021'] + .leaves()) + actual = [p.pid for p in actual] + expected = [ + 'swh:1:cnt:0000000000000000000000000000000000000001', + 'swh:1:cnt:0000000000000000000000000000000000000004', + 'swh:1:cnt:0000000000000000000000000000000000000005', + 'swh:1:cnt:0000000000000000000000000000000000000007' + ] + assert set(actual) == set(expected) + + +def test_visit_nodes(graph): + actual = list(graph['swh:1:rel:0000000000000000000000000000000000000010'] + .visit_nodes(edges='rel:rev,rev:rev')) + actual = [p.pid for p in actual] + expected = [ + 'swh:1:rel:0000000000000000000000000000000000000010', + 'swh:1:rev:0000000000000000000000000000000000000009', + 'swh:1:rev:0000000000000000000000000000000000000003' + ] + assert set(actual) == set(expected) + + +def test_visit_paths(graph): + actual = list(graph['swh:1:snp:0000000000000000000000000000000000000020'] + .visit_paths(edges='snp:*,rev:*')) + actual = [tuple(n.pid for n in path) for path in actual] + expected = [ + ( + 'swh:1:snp:0000000000000000000000000000000000000020', + 'swh:1:rev:0000000000000000000000000000000000000009', + 'swh:1:rev:0000000000000000000000000000000000000003', + 'swh:1:dir:0000000000000000000000000000000000000002' + ), + ( + 'swh:1:snp:0000000000000000000000000000000000000020', + 'swh:1:rev:0000000000000000000000000000000000000009', + 'swh:1:dir:0000000000000000000000000000000000000008' + ), + ( + 'swh:1:snp:0000000000000000000000000000000000000020', + 'swh:1:rel:0000000000000000000000000000000000000010' + ) + ] + assert set(actual) == set(expected) + + +def test_walk(graph): + actual = list(graph['swh:1:dir:0000000000000000000000000000000000000016'] + .walk('rel', + edges='dir:dir,dir:rev,rev:*', + direction='backward', + traversal='bfs')) + actual = [p.pid for p in actual] + expected = [ + 'swh:1:dir:0000000000000000000000000000000000000016', + 'swh:1:dir:0000000000000000000000000000000000000017', + 'swh:1:rev:0000000000000000000000000000000000000018', + 'swh:1:rel:0000000000000000000000000000000000000019' + ] + assert set(actual) == set(expected)