Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/backend.py
- This file was moved from swh/graph/server/backend.py.
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | import asyncio | ||||
import contextlib | import contextlib | ||||
import json | import io | ||||
import os | import os | ||||
import pathlib | import pathlib | ||||
import struct | import struct | ||||
import subprocess | |||||
import sys | import sys | ||||
import tempfile | import tempfile | ||||
from py4j.java_gateway import JavaGateway | from py4j.java_gateway import JavaGateway | ||||
from swh.graph.pid import IntToPidMap, PidToIntMap | from swh.graph.pid import IntToPidMap, PidToIntMap | ||||
from swh.model.identifiers import PID_TYPES | from swh.model.identifiers import PID_TYPES | ||||
BUF_SIZE = 64*1024 | BUF_SIZE = 64*1024 | ||||
BIN_FMT = '>q' # 64 bit integer, big endian | BIN_FMT = '>q' # 64 bit integer, big endian | ||||
PATH_SEPARATOR_ID = -1 | PATH_SEPARATOR_ID = -1 | ||||
NODE2PID_EXT = 'node2pid.bin' | NODE2PID_EXT = 'node2pid.bin' | ||||
PID2NODE_EXT = 'pid2node.bin' | PID2NODE_EXT = 'pid2node.bin' | ||||
def find_graph_jar(): | def find_graph_jar(): | ||||
swh_graph_root = pathlib.Path(__file__).parents[3] | swh_graph_root = pathlib.Path(__file__).parents[2] | ||||
try_paths = [ | try_paths = [ | ||||
swh_graph_root / 'java/server/target/', | swh_graph_root / 'java/server/target/', | ||||
pathlib.Path(sys.prefix) / 'share/swh-graph/', | pathlib.Path(sys.prefix) / 'share/swh-graph/', | ||||
] | ] | ||||
for path in try_paths: | for path in try_paths: | ||||
glob = list(path.glob('swh-graph-*.jar')) | glob = list(path.glob('swh-graph-*.jar')) | ||||
if glob: | if glob: | ||||
return str(glob[0]) | return str(glob[0]) | ||||
raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?") | raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?") | ||||
def _get_pipe_stderr(): | |||||
# Get stderr if possible, or pipe to stdout if running with Jupyter. | |||||
try: | |||||
sys.stderr.fileno() | |||||
except io.UnsupportedOperation: | |||||
return subprocess.STDOUT | |||||
else: | |||||
return sys.stderr | |||||
class Backend: | class Backend: | ||||
def __init__(self, graph_path): | def __init__(self, graph_path): | ||||
self.gateway = None | self.gateway = None | ||||
self.entry = None | self.entry = None | ||||
self.graph_path = graph_path | self.graph_path = graph_path | ||||
def __enter__(self): | def __enter__(self): | ||||
self.gateway = JavaGateway.launch_gateway( | self.gateway = JavaGateway.launch_gateway( | ||||
java_path=None, | java_path=None, | ||||
classpath=find_graph_jar(), | classpath=find_graph_jar(), | ||||
die_on_exit=True, | die_on_exit=True, | ||||
redirect_stdout=sys.stdout, | redirect_stdout=sys.stdout, | ||||
redirect_stderr=sys.stderr, | redirect_stderr=_get_pipe_stderr(), | ||||
) | ) | ||||
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() | self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() | ||||
self.entry.load_graph(self.graph_path) | self.entry.load_graph(self.graph_path) | ||||
self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT) | self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT) | ||||
self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT) | self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT) | ||||
self.stream_proxy = JavaStreamProxy(self.entry) | self.stream_proxy = JavaStreamProxy(self.entry) | ||||
return self | |||||
def __exit__(self, exc_type, exc_value, tb): | def __exit__(self, exc_type, exc_value, tb): | ||||
self.gateway.shutdown() | self.gateway.shutdown() | ||||
def stats(self): | def stats(self): | ||||
return self.entry.stats() | return self.entry.stats() | ||||
async def simple_traversal(self, ttype, direction, edges_fmt, src): | async def simple_traversal(self, ttype, direction, edges_fmt, src): | ||||
assert ttype in ('leaves', 'neighbors', 'visit_nodes', 'visit_paths') | assert ttype in ('leaves', 'neighbors', 'visit_nodes') | ||||
src_id = self.pid2node[src] | |||||
method = getattr(self.stream_proxy, ttype) | method = getattr(self.stream_proxy, ttype) | ||||
async for node_id in method(direction, edges_fmt, src_id): | async for node_id in method(direction, edges_fmt, src): | ||||
if node_id == PATH_SEPARATOR_ID: | yield node_id | ||||
yield None | |||||
else: | |||||
yield self.node2pid[node_id] | |||||
async def walk(self, direction, edges_fmt, algo, src, dst): | async def walk(self, direction, edges_fmt, algo, src, dst): | ||||
src_id = self.pid2node[src] | |||||
if dst in PID_TYPES: | if dst in PID_TYPES: | ||||
it = self.stream_proxy.walk_type(direction, edges_fmt, algo, | it = self.stream_proxy.walk_type(direction, edges_fmt, algo, | ||||
src_id, dst) | src, dst) | ||||
else: | else: | ||||
dst_id = self.pid2node[dst] | |||||
it = self.stream_proxy.walk(direction, edges_fmt, algo, | it = self.stream_proxy.walk(direction, edges_fmt, algo, | ||||
src_id, dst_id) | src, dst) | ||||
async for node_id in it: | async for node_id in it: | ||||
yield self.node2pid[node_id] | yield node_id | ||||
async def visit_paths(self, *args): | async def visit_paths(self, direction, edges_fmt, src): | ||||
buffer = [] | path = [] | ||||
async for res_pid in self.simple_traversal('visit_paths', *args): | async for node in self.stream_proxy.visit_paths( | ||||
if res_pid is None: # Path separator, flush | direction, edges_fmt, src): | ||||
yield json.dumps(buffer) | if node == PATH_SEPARATOR_ID: | ||||
buffer = [] | yield path | ||||
path = [] | |||||
else: | else: | ||||
buffer.append(res_pid) | path.append(node) | ||||
class JavaStreamProxy: | class JavaStreamProxy: | ||||
"""A proxy class for the org.softwareheritage.graph.Entry Java class that | """A proxy class for the org.softwareheritage.graph.Entry Java class that | ||||
takes care of the setup and teardown of the named-pipe FIFO communication | takes care of the setup and teardown of the named-pipe FIFO communication | ||||
between Python and Java. | between Python and Java. | ||||
Initialize JavaStreamProxy using: | Initialize JavaStreamProxy using: | ||||
proxy = JavaStreamProxy(swh_entry_class_instance) | proxy = JavaStreamProxy(swh_entry_class_instance) | ||||
Then you can call an Entry method and iterate on the FIFO results like | Then you can call an Entry method and iterate on the FIFO results like | ||||
this: | this: | ||||
async for value in proxy.java_method(arg1, arg2): | async for value in proxy.java_method(arg1, arg2): | ||||
print(value) | print(value) | ||||
""" | """ | ||||
def __init__(self, entry): | def __init__(self, entry): | ||||
self.entry = entry | self.entry = entry | ||||
async def read_node_ids(self, fname): | async def read_node_ids(self, fname): | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
with (await loop.run_in_executor(None, open, fname, 'rb')) as f: | open_thread = loop.run_in_executor(None, open, fname, 'rb') | ||||
zack: why the explicit timeout here?
can't this fail for the wrong reason, e.g., when calling the… | |||||
Done Inline ActionsI'm gonna add a comment for the timeout. No, this is only for opening the FIFO, not reading it, so it can't fail like that. The FIFO is opened on both sides at the beginning. The problem is, if there is an exception in the java call thread and the FIFO is never created, this thread will block endlessly without timeout, because the FIFO has to be opened from both sides for the open(2) call to return. Adding a timeout for this open() call allows Java exceptions that happen between the Java call and the open() on the Java side to be propagated to the main thread. seirl: I'm gonna add a comment for the timeout.
No, this is only for opening the FIFO, not reading it… | |||||
# Since the open() call on the FIFO is blocking until it is also opened | |||||
# on the Java side, we await it with a timeout in case there is an | |||||
# exception that prevents the write-side open(). | |||||
with (await asyncio.wait_for(open_thread, timeout=2)) as f: | |||||
while True: | while True: | ||||
data = await loop.run_in_executor(None, f.read, BUF_SIZE) | data = await loop.run_in_executor(None, f.read, BUF_SIZE) | ||||
if not data: | if not data: | ||||
break | break | ||||
for data in struct.iter_unpack(BIN_FMT, data): | for data in struct.iter_unpack(BIN_FMT, data): | ||||
yield data[0] | yield data[0] | ||||
class _HandlerWrapper: | class _HandlerWrapper: | ||||
Show All 21 Lines | def get_handler(self): | ||||
query_handler = self.entry.get_handler(cli_fifo) | query_handler = self.entry.get_handler(cli_fifo) | ||||
handler = self._HandlerWrapper(query_handler) | handler = self._HandlerWrapper(query_handler) | ||||
yield (handler, reader) | yield (handler, reader) | ||||
def __getattr__(self, name): | def __getattr__(self, name): | ||||
async def java_call_iterator(*args, **kwargs): | async def java_call_iterator(*args, **kwargs): | ||||
with self.get_handler() as (handler, reader): | with self.get_handler() as (handler, reader): | ||||
java_task = getattr(handler, name)(*args, **kwargs) | java_task = getattr(handler, name)(*args, **kwargs) | ||||
try: | |||||
async for value in reader: | async for value in reader: | ||||
yield value | yield value | ||||
except asyncio.TimeoutError: | |||||
# If the read-side open() timeouts, an exception on the | |||||
# Java side probably happened that prevented the | |||||
# write-side open(). We propagate this exception here if | |||||
# that is the case. | |||||
task_exc = java_task.exception() | |||||
if task_exc: | |||||
raise task_exc | |||||
raise | |||||
await java_task | await java_task | ||||
return java_call_iterator | return java_call_iterator |
why the explicit timeout here?
can't this fail for the wrong reason, e.g., when calling the leaves method on large graphs?