diff --git a/swh/graph/backend.py b/swh/graph/backend.py index 0305bc3..0f5f3ef 100644 --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -1,174 +1,182 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import io import os import pathlib import struct import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway from swh.graph.pid import IntToPidMap, PidToIntMap from swh.model.identifiers import PID_TYPES BUF_SIZE = 64*1024 BIN_FMT = '>q' # 64 bit integer, big endian PATH_SEPARATOR_ID = -1 NODE2PID_EXT = 'node2pid.bin' PID2NODE_EXT = 'pid2node.bin' def find_graph_jar(): swh_graph_root = pathlib.Path(__file__).parents[2] try_paths = [ swh_graph_root / 'java/server/target/', pathlib.Path(sys.prefix) / 'share/swh-graph/', ] for path in try_paths: glob = list(path.glob('swh-graph-*.jar')) if glob: return str(glob[0]) raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?") def _get_pipe_stderr(): # Get stderr if possible, or pipe to stdout if running with Jupyter. try: sys.stderr.fileno() except io.UnsupportedOperation: return subprocess.STDOUT else: return sys.stderr class Backend: def __init__(self, graph_path): self.gateway = None self.entry = None self.graph_path = graph_path def __enter__(self): self.gateway = JavaGateway.launch_gateway( java_path=None, classpath=find_graph_jar(), die_on_exit=True, redirect_stdout=sys.stdout, redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT) self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() def stats(self): return self.entry.stats() async def simple_traversal(self, ttype, direction, edges_fmt, src): assert ttype in ('leaves', 'neighbors', 'visit_nodes') method = getattr(self.stream_proxy, ttype) async for node_id in method(direction, edges_fmt, src): yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): if dst in PID_TYPES: it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst) else: it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst) async for node_id in it: yield node_id async def visit_paths(self, direction, edges_fmt, src): path = [] async for node in self.stream_proxy.visit_paths( direction, edges_fmt, src): if node == PATH_SEPARATOR_ID: yield path path = [] else: path.append(node) class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() - with (await asyncio.wait_for(loop.run_in_executor( - None, open, fname, 'rb'), timeout=2)) as f: + open_thread = loop.run_in_executor(None, open, fname, 'rb') + + # Since the open() call on the FIFO is blocking until it is also opened + # on the Java side, we await it with a timeout in case there is an + # exception that prevents the write-side open(). + with (await asyncio.wait_for(open_thread, timeout=2)) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: break for data in struct.iter_unpack(BIN_FMT, data): yield data[0] class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname: cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo') os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) try: async for value in reader: yield value except asyncio.TimeoutError: + # If the read-side open() timeouts, an exception on the + # Java side probably happened that prevented the + # write-side open(). We propagate this exception here if + # that is the case. task_exc = java_task.exception() if task_exc: raise task_exc raise await java_task return java_call_iterator diff --git a/swh/graph/dot.py b/swh/graph/dot.py index 265f06b..b2d455a 100644 --- a/swh/graph/dot.py +++ b/swh/graph/dot.py @@ -1,55 +1,65 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import lru_cache import subprocess import collections +KIND_TO_SHAPE = { + 'ori': 'egg', + 'snp': 'doubleoctagon', + 'rel': 'octagon', + 'rev': 'diamond', + 'dir': 'folder', + 'cnt': 'oval', +} + + @lru_cache() def dot_to_svg(dot): try: p = subprocess.run( ['dot', '-Tsvg'], input=dot, universal_newlines=True, capture_output=True, check=True ) except subprocess.CalledProcessError as e: raise RuntimeError(e.stderr) from e return p.stdout def graph_dot(nodes): ids = {n.id for n in nodes} by_kind = collections.defaultdict(list) for n in nodes: by_kind[n.kind].append(n) forward_edges = [ (node.id, child.id) for node in nodes for child in node.children() if child.id in ids ] backward_edges = [ (parent.id, node.id) for node in nodes for parent in node.parents() if parent.id in ids ] edges = set(forward_edges + backward_edges) edges_fmt = '\n'.join('{} -> {};'.format(a, b) for a, b in edges) nodes_fmt = '\n'.join(node.dot_fragment() for node in nodes) s = """digraph G {{ ranksep=1; nodesep=0.5; {nodes} {edges} }}""".format(nodes=nodes_fmt, edges=edges_fmt) return s diff --git a/swh/graph/graph.py b/swh/graph/graph.py index 9bcd080..b0dda0b 100644 --- a/swh/graph/graph.py +++ b/swh/graph/graph.py @@ -1,167 +1,157 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib from swh.graph.backend import Backend -from swh.graph.dot import dot_to_svg, graph_dot - - -KIND_TO_SHAPE = { - 'ori': 'egg', - 'snp': 'doubleoctagon', - 'rel': 'octagon', - 'rev': 'diamond', - 'dir': 'folder', - 'cnt': 'oval', -} +from swh.graph.dot import dot_to_svg, graph_dot, KIND_TO_SHAPE KIND_TO_URL = { 'ori': 'https://archive.softwareheritage.org/browse/origin/{}', 'snp': 'https://archive.softwareheritage.org/browse/snapshot/{}', 'rel': 'https://archive.softwareheritage.org/browse/release/{}', 'rev': 'https://archive.softwareheritage.org/browse/revision/{}', 'dir': 'https://archive.softwareheritage.org/browse/directory/{}', 'cnt': 'https://archive.softwareheritage.org/browse/content/sha1_git:{}/', } def call_async_gen(generator, *args, **kwargs): loop = asyncio.get_event_loop() it = generator(*args, **kwargs).__aiter__() while True: try: res = loop.run_until_complete(it.__anext__()) yield res except StopAsyncIteration: break class Neighbors: """Neighbor iterator with custom O(1) length method""" - def __init__(self, parent_graph, iterator, length_func): - self.parent_graph = parent_graph + def __init__(self, graph, iterator, length_func): + self.graph = graph self.iterator = iterator self.length_func = length_func def __iter__(self): return self def __next__(self): succ = self.iterator.nextLong() if succ == -1: raise StopIteration - return GraphNode(self.parent_graph, succ) + return GraphNode(self.graph, succ) def __len__(self): return self.length_func() class GraphNode: """Node in the SWH graph""" - def __init__(self, parent_graph, node_id): - self.parent_graph = parent_graph + def __init__(self, graph, node_id): + self.graph = graph self.id = node_id def children(self): return Neighbors( - self.parent_graph, - self.parent_graph.java_graph.successors(self.id), - lambda: self.parent_graph.java_graph.outdegree(self.id)) + self.graph, + self.graph.java_graph.successors(self.id), + lambda: self.graph.java_graph.outdegree(self.id)) def parents(self): return Neighbors( - self.parent_graph, - self.parent_graph.java_graph.predecessors(self.id), - lambda: self.parent_graph.java_graph.indegree(self.id)) + self.graph, + self.graph.java_graph.predecessors(self.id), + lambda: self.graph.java_graph.indegree(self.id)) def simple_traversal(self, ttype, direction='forward', edges='*'): for node in call_async_gen( - self.parent_graph.backend.simple_traversal, + self.graph.backend.simple_traversal, ttype, direction, edges, self.id ): - yield self.parent_graph[node] + yield self.graph[node] def leaves(self, *args, **kwargs): yield from self.simple_traversal('leaves', *args, **kwargs) def visit_nodes(self, *args, **kwargs): yield from self.simple_traversal('visit_nodes', *args, **kwargs) def visit_paths(self, direction='forward', edges='*'): for path in call_async_gen( - self.parent_graph.backend.visit_paths, + self.graph.backend.visit_paths, direction, edges, self.id ): - yield [self.parent_graph[node] for node in path] + yield [self.graph[node] for node in path] def walk(self, dst, direction='forward', edges='*', traversal='dfs'): for node in call_async_gen( - self.parent_graph.backend.walk, + self.graph.backend.walk, direction, edges, traversal, self.id, dst ): - yield self.parent_graph[node] + yield self.graph[node] @property def pid(self): - return self.parent_graph.node2pid[self.id] + return self.graph.node2pid[self.id] @property def kind(self): return self.pid.split(':')[2] def __str__(self): return self.pid def __repr__(self): return '<{}>'.format(self.pid) def dot_fragment(self): swh, version, kind, hash = self.pid.split(':') label = '{}:{}..{}'.format(kind, hash[0:2], hash[-2:]) url = KIND_TO_URL[kind].format(hash) shape = KIND_TO_SHAPE[kind] return ('{} [label="{}", href="{}", target="_blank", shape="{}"];' .format(self.id, label, url, shape)) def _repr_svg_(self): nodes = [self, *list(self.children()), *list(self.parents())] dot = graph_dot(nodes) svg = dot_to_svg(dot) return svg class Graph: def __init__(self, backend, node2pid, pid2node): self.backend = backend self.java_graph = backend.entry.get_graph() self.node2pid = node2pid self.pid2node = pid2node def stats(self): return self.backend.stats() @property def path(self): return self.java_graph.getPath() def __len__(self): return self.java_graph.getNbNodes() def __getitem__(self, node_id): if isinstance(node_id, int): self.node2pid[node_id] # check existence return GraphNode(self, node_id) elif isinstance(node_id, str): node_id = self.pid2node[node_id] return GraphNode(self, node_id) @contextlib.contextmanager def load(graph_path): with Backend(graph_path) as backend: yield Graph(backend, backend.node2pid, backend.pid2node)