diff --git a/swh/graph/backend.py b/swh/graph/backend.py index 569b6f7..0305bc3 100644 --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -1,166 +1,174 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import io import os import pathlib import struct import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway from swh.graph.pid import IntToPidMap, PidToIntMap from swh.model.identifiers import PID_TYPES BUF_SIZE = 64*1024 BIN_FMT = '>q' # 64 bit integer, big endian PATH_SEPARATOR_ID = -1 NODE2PID_EXT = 'node2pid.bin' PID2NODE_EXT = 'pid2node.bin' def find_graph_jar(): swh_graph_root = pathlib.Path(__file__).parents[2] try_paths = [ swh_graph_root / 'java/server/target/', pathlib.Path(sys.prefix) / 'share/swh-graph/', ] for path in try_paths: glob = list(path.glob('swh-graph-*.jar')) if glob: return str(glob[0]) raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?") def _get_pipe_stderr(): # Get stderr if possible, or pipe to stdout if running with Jupyter. try: sys.stderr.fileno() except io.UnsupportedOperation: return subprocess.STDOUT else: return sys.stderr class Backend: def __init__(self, graph_path): self.gateway = None self.entry = None self.graph_path = graph_path def __enter__(self): self.gateway = JavaGateway.launch_gateway( java_path=None, classpath=find_graph_jar(), die_on_exit=True, redirect_stdout=sys.stdout, redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT) self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() def stats(self): return self.entry.stats() async def simple_traversal(self, ttype, direction, edges_fmt, src): - assert ttype in ('leaves', 'neighbors', 'visit_nodes', 'visit_paths') + assert ttype in ('leaves', 'neighbors', 'visit_nodes') method = getattr(self.stream_proxy, ttype) async for node_id in method(direction, edges_fmt, src): yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): if dst in PID_TYPES: it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst) else: it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst) async for node_id in it: yield node_id - async def visit_paths(self, *args): + async def visit_paths(self, direction, edges_fmt, src): path = [] - async for node in self.simple_traversal('visit_paths', *args): + async for node in self.stream_proxy.visit_paths( + direction, edges_fmt, src): if node == PATH_SEPARATOR_ID: yield path path = [] else: path.append(node) class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() - with (await loop.run_in_executor(None, open, fname, 'rb')) as f: + with (await asyncio.wait_for(loop.run_in_executor( + None, open, fname, 'rb'), timeout=2)) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: break for data in struct.iter_unpack(BIN_FMT, data): yield data[0] class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname: cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo') os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) - async for value in reader: - yield value + try: + async for value in reader: + yield value + except asyncio.TimeoutError: + task_exc = java_task.exception() + if task_exc: + raise task_exc + raise await java_task return java_call_iterator diff --git a/swh/graph/graph.py b/swh/graph/graph.py index b863c34..9bcd080 100644 --- a/swh/graph/graph.py +++ b/swh/graph/graph.py @@ -1,164 +1,167 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib from swh.graph.backend import Backend from swh.graph.dot import dot_to_svg, graph_dot KIND_TO_SHAPE = { 'ori': 'egg', 'snp': 'doubleoctagon', 'rel': 'octagon', 'rev': 'diamond', 'dir': 'folder', 'cnt': 'oval', } KIND_TO_URL = { 'ori': 'https://archive.softwareheritage.org/browse/origin/{}', 'snp': 'https://archive.softwareheritage.org/browse/snapshot/{}', 'rel': 'https://archive.softwareheritage.org/browse/release/{}', 'rev': 'https://archive.softwareheritage.org/browse/revision/{}', 'dir': 'https://archive.softwareheritage.org/browse/directory/{}', 'cnt': 'https://archive.softwareheritage.org/browse/content/sha1_git:{}/', } def call_async_gen(generator, *args, **kwargs): loop = asyncio.get_event_loop() it = generator(*args, **kwargs).__aiter__() while True: try: res = loop.run_until_complete(it.__anext__()) yield res except StopAsyncIteration: break class Neighbors: """Neighbor iterator with custom O(1) length method""" def __init__(self, parent_graph, iterator, length_func): self.parent_graph = parent_graph self.iterator = iterator self.length_func = length_func def __iter__(self): return self def __next__(self): succ = self.iterator.nextLong() if succ == -1: raise StopIteration return GraphNode(self.parent_graph, succ) def __len__(self): return self.length_func() class GraphNode: """Node in the SWH graph""" def __init__(self, parent_graph, node_id): self.parent_graph = parent_graph self.id = node_id def children(self): return Neighbors( self.parent_graph, self.parent_graph.java_graph.successors(self.id), lambda: self.parent_graph.java_graph.outdegree(self.id)) def parents(self): return Neighbors( self.parent_graph, self.parent_graph.java_graph.predecessors(self.id), lambda: self.parent_graph.java_graph.indegree(self.id)) def simple_traversal(self, ttype, direction='forward', edges='*'): for node in call_async_gen( self.parent_graph.backend.simple_traversal, ttype, direction, edges, self.id ): yield self.parent_graph[node] def leaves(self, *args, **kwargs): yield from self.simple_traversal('leaves', *args, **kwargs) def visit_nodes(self, *args, **kwargs): yield from self.simple_traversal('visit_nodes', *args, **kwargs) def visit_paths(self, direction='forward', edges='*'): for path in call_async_gen( self.parent_graph.backend.visit_paths, direction, edges, self.id ): yield [self.parent_graph[node] for node in path] def walk(self, dst, direction='forward', edges='*', traversal='dfs'): for node in call_async_gen( self.parent_graph.backend.walk, direction, edges, traversal, self.id, dst ): yield self.parent_graph[node] @property def pid(self): return self.parent_graph.node2pid[self.id] @property def kind(self): return self.pid.split(':')[2] def __str__(self): return self.pid def __repr__(self): return '<{}>'.format(self.pid) def dot_fragment(self): swh, version, kind, hash = self.pid.split(':') label = '{}:{}..{}'.format(kind, hash[0:2], hash[-2:]) url = KIND_TO_URL[kind].format(hash) shape = KIND_TO_SHAPE[kind] return ('{} [label="{}", href="{}", target="_blank", shape="{}"];' .format(self.id, label, url, shape)) def _repr_svg_(self): nodes = [self, *list(self.children()), *list(self.parents())] dot = graph_dot(nodes) svg = dot_to_svg(dot) return svg class Graph: def __init__(self, backend, node2pid, pid2node): self.backend = backend self.java_graph = backend.entry.get_graph() self.node2pid = node2pid self.pid2node = pid2node + def stats(self): + return self.backend.stats() + @property def path(self): return self.java_graph.getPath() def __len__(self): return self.java_graph.getNbNodes() def __getitem__(self, node_id): if isinstance(node_id, int): self.node2pid[node_id] # check existence return GraphNode(self, node_id) elif isinstance(node_id, str): node_id = self.pid2node[node_id] return GraphNode(self, node_id) @contextlib.contextmanager def load(graph_path): with Backend(graph_path) as backend: yield Graph(backend, backend.node2pid, backend.pid2node)