Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/graph/backend.py b/swh/graph/backend.py
index 0305bc3..0f5f3ef 100644
--- a/swh/graph/backend.py
+++ b/swh/graph/backend.py
@@ -1,174 +1,182 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
import io
import os
import pathlib
import struct
import subprocess
import sys
import tempfile
from py4j.java_gateway import JavaGateway
from swh.graph.pid import IntToPidMap, PidToIntMap
from swh.model.identifiers import PID_TYPES
BUF_SIZE = 64*1024
BIN_FMT = '>q' # 64 bit integer, big endian
PATH_SEPARATOR_ID = -1
NODE2PID_EXT = 'node2pid.bin'
PID2NODE_EXT = 'pid2node.bin'
def find_graph_jar():
swh_graph_root = pathlib.Path(__file__).parents[2]
try_paths = [
swh_graph_root / 'java/server/target/',
pathlib.Path(sys.prefix) / 'share/swh-graph/',
]
for path in try_paths:
glob = list(path.glob('swh-graph-*.jar'))
if glob:
return str(glob[0])
raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?")
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
return subprocess.STDOUT
else:
return sys.stderr
class Backend:
def __init__(self, graph_path):
self.gateway = None
self.entry = None
self.graph_path = graph_path
def __enter__(self):
self.gateway = JavaGateway.launch_gateway(
java_path=None,
classpath=find_graph_jar(),
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=_get_pipe_stderr(),
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT)
self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT)
self.stream_proxy = JavaStreamProxy(self.entry)
return self
def __exit__(self, exc_type, exc_value, tb):
self.gateway.shutdown()
def stats(self):
return self.entry.stats()
async def simple_traversal(self, ttype, direction, edges_fmt, src):
assert ttype in ('leaves', 'neighbors', 'visit_nodes')
method = getattr(self.stream_proxy, ttype)
async for node_id in method(direction, edges_fmt, src):
yield node_id
async def walk(self, direction, edges_fmt, algo, src, dst):
if dst in PID_TYPES:
it = self.stream_proxy.walk_type(direction, edges_fmt, algo,
src, dst)
else:
it = self.stream_proxy.walk(direction, edges_fmt, algo,
src, dst)
async for node_id in it:
yield node_id
async def visit_paths(self, direction, edges_fmt, src):
path = []
async for node in self.stream_proxy.visit_paths(
direction, edges_fmt, src):
if node == PATH_SEPARATOR_ID:
yield path
path = []
else:
path.append(node)
class JavaStreamProxy:
"""A proxy class for the org.softwareheritage.graph.Entry Java class that
takes care of the setup and teardown of the named-pipe FIFO communication
between Python and Java.
Initialize JavaStreamProxy using:
proxy = JavaStreamProxy(swh_entry_class_instance)
Then you can call an Entry method and iterate on the FIFO results like
this:
async for value in proxy.java_method(arg1, arg2):
print(value)
"""
def __init__(self, entry):
self.entry = entry
async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
- with (await asyncio.wait_for(loop.run_in_executor(
- None, open, fname, 'rb'), timeout=2)) as f:
+ open_thread = loop.run_in_executor(None, open, fname, 'rb')
+
+ # Since the open() call on the FIFO is blocking until it is also opened
+ # on the Java side, we await it with a timeout in case there is an
+ # exception that prevents the write-side open().
+ with (await asyncio.wait_for(open_thread, timeout=2)) as f:
while True:
data = await loop.run_in_executor(None, f.read, BUF_SIZE)
if not data:
break
for data in struct.iter_unpack(BIN_FMT, data):
yield data[0]
class _HandlerWrapper:
def __init__(self, handler):
self._handler = handler
def __getattr__(self, name):
func = getattr(self._handler, name)
async def java_call(*args, **kwargs):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def java_task(*args, **kwargs):
return asyncio.create_task(java_call(*args, **kwargs))
return java_task
@contextlib.contextmanager
def get_handler(self):
with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname:
cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo')
os.mkfifo(cli_fifo)
reader = self.read_node_ids(cli_fifo)
query_handler = self.entry.get_handler(cli_fifo)
handler = self._HandlerWrapper(query_handler)
yield (handler, reader)
def __getattr__(self, name):
async def java_call_iterator(*args, **kwargs):
with self.get_handler() as (handler, reader):
java_task = getattr(handler, name)(*args, **kwargs)
try:
async for value in reader:
yield value
except asyncio.TimeoutError:
+ # If the read-side open() timeouts, an exception on the
+ # Java side probably happened that prevented the
+ # write-side open(). We propagate this exception here if
+ # that is the case.
task_exc = java_task.exception()
if task_exc:
raise task_exc
raise
await java_task
return java_call_iterator
diff --git a/swh/graph/dot.py b/swh/graph/dot.py
index 265f06b..b2d455a 100644
--- a/swh/graph/dot.py
+++ b/swh/graph/dot.py
@@ -1,55 +1,65 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import lru_cache
import subprocess
import collections
+KIND_TO_SHAPE = {
+ 'ori': 'egg',
+ 'snp': 'doubleoctagon',
+ 'rel': 'octagon',
+ 'rev': 'diamond',
+ 'dir': 'folder',
+ 'cnt': 'oval',
+}
+
+
@lru_cache()
def dot_to_svg(dot):
try:
p = subprocess.run(
['dot', '-Tsvg'], input=dot,
universal_newlines=True, capture_output=True,
check=True
)
except subprocess.CalledProcessError as e:
raise RuntimeError(e.stderr) from e
return p.stdout
def graph_dot(nodes):
ids = {n.id for n in nodes}
by_kind = collections.defaultdict(list)
for n in nodes:
by_kind[n.kind].append(n)
forward_edges = [
(node.id, child.id)
for node in nodes
for child in node.children()
if child.id in ids
]
backward_edges = [
(parent.id, node.id)
for node in nodes
for parent in node.parents()
if parent.id in ids
]
edges = set(forward_edges + backward_edges)
edges_fmt = '\n'.join('{} -> {};'.format(a, b) for a, b in edges)
nodes_fmt = '\n'.join(node.dot_fragment() for node in nodes)
s = """digraph G {{
ranksep=1;
nodesep=0.5;
{nodes}
{edges}
}}""".format(nodes=nodes_fmt, edges=edges_fmt)
return s
diff --git a/swh/graph/graph.py b/swh/graph/graph.py
index 9bcd080..b0dda0b 100644
--- a/swh/graph/graph.py
+++ b/swh/graph/graph.py
@@ -1,167 +1,157 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
from swh.graph.backend import Backend
-from swh.graph.dot import dot_to_svg, graph_dot
-
-
-KIND_TO_SHAPE = {
- 'ori': 'egg',
- 'snp': 'doubleoctagon',
- 'rel': 'octagon',
- 'rev': 'diamond',
- 'dir': 'folder',
- 'cnt': 'oval',
-}
+from swh.graph.dot import dot_to_svg, graph_dot, KIND_TO_SHAPE
KIND_TO_URL = {
'ori': 'https://archive.softwareheritage.org/browse/origin/{}',
'snp': 'https://archive.softwareheritage.org/browse/snapshot/{}',
'rel': 'https://archive.softwareheritage.org/browse/release/{}',
'rev': 'https://archive.softwareheritage.org/browse/revision/{}',
'dir': 'https://archive.softwareheritage.org/browse/directory/{}',
'cnt': 'https://archive.softwareheritage.org/browse/content/sha1_git:{}/',
}
def call_async_gen(generator, *args, **kwargs):
loop = asyncio.get_event_loop()
it = generator(*args, **kwargs).__aiter__()
while True:
try:
res = loop.run_until_complete(it.__anext__())
yield res
except StopAsyncIteration:
break
class Neighbors:
"""Neighbor iterator with custom O(1) length method"""
- def __init__(self, parent_graph, iterator, length_func):
- self.parent_graph = parent_graph
+ def __init__(self, graph, iterator, length_func):
+ self.graph = graph
self.iterator = iterator
self.length_func = length_func
def __iter__(self):
return self
def __next__(self):
succ = self.iterator.nextLong()
if succ == -1:
raise StopIteration
- return GraphNode(self.parent_graph, succ)
+ return GraphNode(self.graph, succ)
def __len__(self):
return self.length_func()
class GraphNode:
"""Node in the SWH graph"""
- def __init__(self, parent_graph, node_id):
- self.parent_graph = parent_graph
+ def __init__(self, graph, node_id):
+ self.graph = graph
self.id = node_id
def children(self):
return Neighbors(
- self.parent_graph,
- self.parent_graph.java_graph.successors(self.id),
- lambda: self.parent_graph.java_graph.outdegree(self.id))
+ self.graph,
+ self.graph.java_graph.successors(self.id),
+ lambda: self.graph.java_graph.outdegree(self.id))
def parents(self):
return Neighbors(
- self.parent_graph,
- self.parent_graph.java_graph.predecessors(self.id),
- lambda: self.parent_graph.java_graph.indegree(self.id))
+ self.graph,
+ self.graph.java_graph.predecessors(self.id),
+ lambda: self.graph.java_graph.indegree(self.id))
def simple_traversal(self, ttype, direction='forward', edges='*'):
for node in call_async_gen(
- self.parent_graph.backend.simple_traversal,
+ self.graph.backend.simple_traversal,
ttype, direction, edges, self.id
):
- yield self.parent_graph[node]
+ yield self.graph[node]
def leaves(self, *args, **kwargs):
yield from self.simple_traversal('leaves', *args, **kwargs)
def visit_nodes(self, *args, **kwargs):
yield from self.simple_traversal('visit_nodes', *args, **kwargs)
def visit_paths(self, direction='forward', edges='*'):
for path in call_async_gen(
- self.parent_graph.backend.visit_paths,
+ self.graph.backend.visit_paths,
direction, edges, self.id
):
- yield [self.parent_graph[node] for node in path]
+ yield [self.graph[node] for node in path]
def walk(self, dst, direction='forward', edges='*', traversal='dfs'):
for node in call_async_gen(
- self.parent_graph.backend.walk,
+ self.graph.backend.walk,
direction, edges, traversal, self.id, dst
):
- yield self.parent_graph[node]
+ yield self.graph[node]
@property
def pid(self):
- return self.parent_graph.node2pid[self.id]
+ return self.graph.node2pid[self.id]
@property
def kind(self):
return self.pid.split(':')[2]
def __str__(self):
return self.pid
def __repr__(self):
return '<{}>'.format(self.pid)
def dot_fragment(self):
swh, version, kind, hash = self.pid.split(':')
label = '{}:{}..{}'.format(kind, hash[0:2], hash[-2:])
url = KIND_TO_URL[kind].format(hash)
shape = KIND_TO_SHAPE[kind]
return ('{} [label="{}", href="{}", target="_blank", shape="{}"];'
.format(self.id, label, url, shape))
def _repr_svg_(self):
nodes = [self, *list(self.children()), *list(self.parents())]
dot = graph_dot(nodes)
svg = dot_to_svg(dot)
return svg
class Graph:
def __init__(self, backend, node2pid, pid2node):
self.backend = backend
self.java_graph = backend.entry.get_graph()
self.node2pid = node2pid
self.pid2node = pid2node
def stats(self):
return self.backend.stats()
@property
def path(self):
return self.java_graph.getPath()
def __len__(self):
return self.java_graph.getNbNodes()
def __getitem__(self, node_id):
if isinstance(node_id, int):
self.node2pid[node_id] # check existence
return GraphNode(self, node_id)
elif isinstance(node_id, str):
node_id = self.pid2node[node_id]
return GraphNode(self, node_id)
@contextlib.contextmanager
def load(graph_path):
with Backend(graph_path) as backend:
yield Graph(backend, backend.node2pid, backend.pid2node)

File Metadata

Mime Type
text/x-diff
Expires
Jun 4 2025, 6:51 PM (13 w, 23 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3213018

Event Timeline