Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8391896
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Subscribers
None
View Options
diff --git a/swh/graph/backend.py b/swh/graph/backend.py
index 0305bc3..0f5f3ef 100644
--- a/swh/graph/backend.py
+++ b/swh/graph/backend.py
@@ -1,174 +1,182 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
import io
import os
import pathlib
import struct
import subprocess
import sys
import tempfile
from py4j.java_gateway import JavaGateway
from swh.graph.pid import IntToPidMap, PidToIntMap
from swh.model.identifiers import PID_TYPES
BUF_SIZE = 64*1024
BIN_FMT = '>q' # 64 bit integer, big endian
PATH_SEPARATOR_ID = -1
NODE2PID_EXT = 'node2pid.bin'
PID2NODE_EXT = 'pid2node.bin'
def find_graph_jar():
swh_graph_root = pathlib.Path(__file__).parents[2]
try_paths = [
swh_graph_root / 'java/server/target/',
pathlib.Path(sys.prefix) / 'share/swh-graph/',
]
for path in try_paths:
glob = list(path.glob('swh-graph-*.jar'))
if glob:
return str(glob[0])
raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?")
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
return subprocess.STDOUT
else:
return sys.stderr
class Backend:
def __init__(self, graph_path):
self.gateway = None
self.entry = None
self.graph_path = graph_path
def __enter__(self):
self.gateway = JavaGateway.launch_gateway(
java_path=None,
classpath=find_graph_jar(),
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=_get_pipe_stderr(),
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT)
self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT)
self.stream_proxy = JavaStreamProxy(self.entry)
return self
def __exit__(self, exc_type, exc_value, tb):
self.gateway.shutdown()
def stats(self):
return self.entry.stats()
async def simple_traversal(self, ttype, direction, edges_fmt, src):
assert ttype in ('leaves', 'neighbors', 'visit_nodes')
method = getattr(self.stream_proxy, ttype)
async for node_id in method(direction, edges_fmt, src):
yield node_id
async def walk(self, direction, edges_fmt, algo, src, dst):
if dst in PID_TYPES:
it = self.stream_proxy.walk_type(direction, edges_fmt, algo,
src, dst)
else:
it = self.stream_proxy.walk(direction, edges_fmt, algo,
src, dst)
async for node_id in it:
yield node_id
async def visit_paths(self, direction, edges_fmt, src):
path = []
async for node in self.stream_proxy.visit_paths(
direction, edges_fmt, src):
if node == PATH_SEPARATOR_ID:
yield path
path = []
else:
path.append(node)
class JavaStreamProxy:
"""A proxy class for the org.softwareheritage.graph.Entry Java class that
takes care of the setup and teardown of the named-pipe FIFO communication
between Python and Java.
Initialize JavaStreamProxy using:
proxy = JavaStreamProxy(swh_entry_class_instance)
Then you can call an Entry method and iterate on the FIFO results like
this:
async for value in proxy.java_method(arg1, arg2):
print(value)
"""
def __init__(self, entry):
self.entry = entry
async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
- with (await asyncio.wait_for(loop.run_in_executor(
- None, open, fname, 'rb'), timeout=2)) as f:
+ open_thread = loop.run_in_executor(None, open, fname, 'rb')
+
+ # Since the open() call on the FIFO is blocking until it is also opened
+ # on the Java side, we await it with a timeout in case there is an
+ # exception that prevents the write-side open().
+ with (await asyncio.wait_for(open_thread, timeout=2)) as f:
while True:
data = await loop.run_in_executor(None, f.read, BUF_SIZE)
if not data:
break
for data in struct.iter_unpack(BIN_FMT, data):
yield data[0]
class _HandlerWrapper:
def __init__(self, handler):
self._handler = handler
def __getattr__(self, name):
func = getattr(self._handler, name)
async def java_call(*args, **kwargs):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def java_task(*args, **kwargs):
return asyncio.create_task(java_call(*args, **kwargs))
return java_task
@contextlib.contextmanager
def get_handler(self):
with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname:
cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo')
os.mkfifo(cli_fifo)
reader = self.read_node_ids(cli_fifo)
query_handler = self.entry.get_handler(cli_fifo)
handler = self._HandlerWrapper(query_handler)
yield (handler, reader)
def __getattr__(self, name):
async def java_call_iterator(*args, **kwargs):
with self.get_handler() as (handler, reader):
java_task = getattr(handler, name)(*args, **kwargs)
try:
async for value in reader:
yield value
except asyncio.TimeoutError:
+ # If the read-side open() timeouts, an exception on the
+ # Java side probably happened that prevented the
+ # write-side open(). We propagate this exception here if
+ # that is the case.
task_exc = java_task.exception()
if task_exc:
raise task_exc
raise
await java_task
return java_call_iterator
diff --git a/swh/graph/dot.py b/swh/graph/dot.py
index 265f06b..b2d455a 100644
--- a/swh/graph/dot.py
+++ b/swh/graph/dot.py
@@ -1,55 +1,65 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import lru_cache
import subprocess
import collections
+KIND_TO_SHAPE = {
+ 'ori': 'egg',
+ 'snp': 'doubleoctagon',
+ 'rel': 'octagon',
+ 'rev': 'diamond',
+ 'dir': 'folder',
+ 'cnt': 'oval',
+}
+
+
@lru_cache()
def dot_to_svg(dot):
try:
p = subprocess.run(
['dot', '-Tsvg'], input=dot,
universal_newlines=True, capture_output=True,
check=True
)
except subprocess.CalledProcessError as e:
raise RuntimeError(e.stderr) from e
return p.stdout
def graph_dot(nodes):
ids = {n.id for n in nodes}
by_kind = collections.defaultdict(list)
for n in nodes:
by_kind[n.kind].append(n)
forward_edges = [
(node.id, child.id)
for node in nodes
for child in node.children()
if child.id in ids
]
backward_edges = [
(parent.id, node.id)
for node in nodes
for parent in node.parents()
if parent.id in ids
]
edges = set(forward_edges + backward_edges)
edges_fmt = '\n'.join('{} -> {};'.format(a, b) for a, b in edges)
nodes_fmt = '\n'.join(node.dot_fragment() for node in nodes)
s = """digraph G {{
ranksep=1;
nodesep=0.5;
{nodes}
{edges}
}}""".format(nodes=nodes_fmt, edges=edges_fmt)
return s
diff --git a/swh/graph/graph.py b/swh/graph/graph.py
index 9bcd080..b0dda0b 100644
--- a/swh/graph/graph.py
+++ b/swh/graph/graph.py
@@ -1,167 +1,157 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
from swh.graph.backend import Backend
-from swh.graph.dot import dot_to_svg, graph_dot
-
-
-KIND_TO_SHAPE = {
- 'ori': 'egg',
- 'snp': 'doubleoctagon',
- 'rel': 'octagon',
- 'rev': 'diamond',
- 'dir': 'folder',
- 'cnt': 'oval',
-}
+from swh.graph.dot import dot_to_svg, graph_dot, KIND_TO_SHAPE
KIND_TO_URL = {
'ori': 'https://archive.softwareheritage.org/browse/origin/{}',
'snp': 'https://archive.softwareheritage.org/browse/snapshot/{}',
'rel': 'https://archive.softwareheritage.org/browse/release/{}',
'rev': 'https://archive.softwareheritage.org/browse/revision/{}',
'dir': 'https://archive.softwareheritage.org/browse/directory/{}',
'cnt': 'https://archive.softwareheritage.org/browse/content/sha1_git:{}/',
}
def call_async_gen(generator, *args, **kwargs):
loop = asyncio.get_event_loop()
it = generator(*args, **kwargs).__aiter__()
while True:
try:
res = loop.run_until_complete(it.__anext__())
yield res
except StopAsyncIteration:
break
class Neighbors:
"""Neighbor iterator with custom O(1) length method"""
- def __init__(self, parent_graph, iterator, length_func):
- self.parent_graph = parent_graph
+ def __init__(self, graph, iterator, length_func):
+ self.graph = graph
self.iterator = iterator
self.length_func = length_func
def __iter__(self):
return self
def __next__(self):
succ = self.iterator.nextLong()
if succ == -1:
raise StopIteration
- return GraphNode(self.parent_graph, succ)
+ return GraphNode(self.graph, succ)
def __len__(self):
return self.length_func()
class GraphNode:
"""Node in the SWH graph"""
- def __init__(self, parent_graph, node_id):
- self.parent_graph = parent_graph
+ def __init__(self, graph, node_id):
+ self.graph = graph
self.id = node_id
def children(self):
return Neighbors(
- self.parent_graph,
- self.parent_graph.java_graph.successors(self.id),
- lambda: self.parent_graph.java_graph.outdegree(self.id))
+ self.graph,
+ self.graph.java_graph.successors(self.id),
+ lambda: self.graph.java_graph.outdegree(self.id))
def parents(self):
return Neighbors(
- self.parent_graph,
- self.parent_graph.java_graph.predecessors(self.id),
- lambda: self.parent_graph.java_graph.indegree(self.id))
+ self.graph,
+ self.graph.java_graph.predecessors(self.id),
+ lambda: self.graph.java_graph.indegree(self.id))
def simple_traversal(self, ttype, direction='forward', edges='*'):
for node in call_async_gen(
- self.parent_graph.backend.simple_traversal,
+ self.graph.backend.simple_traversal,
ttype, direction, edges, self.id
):
- yield self.parent_graph[node]
+ yield self.graph[node]
def leaves(self, *args, **kwargs):
yield from self.simple_traversal('leaves', *args, **kwargs)
def visit_nodes(self, *args, **kwargs):
yield from self.simple_traversal('visit_nodes', *args, **kwargs)
def visit_paths(self, direction='forward', edges='*'):
for path in call_async_gen(
- self.parent_graph.backend.visit_paths,
+ self.graph.backend.visit_paths,
direction, edges, self.id
):
- yield [self.parent_graph[node] for node in path]
+ yield [self.graph[node] for node in path]
def walk(self, dst, direction='forward', edges='*', traversal='dfs'):
for node in call_async_gen(
- self.parent_graph.backend.walk,
+ self.graph.backend.walk,
direction, edges, traversal, self.id, dst
):
- yield self.parent_graph[node]
+ yield self.graph[node]
@property
def pid(self):
- return self.parent_graph.node2pid[self.id]
+ return self.graph.node2pid[self.id]
@property
def kind(self):
return self.pid.split(':')[2]
def __str__(self):
return self.pid
def __repr__(self):
return '<{}>'.format(self.pid)
def dot_fragment(self):
swh, version, kind, hash = self.pid.split(':')
label = '{}:{}..{}'.format(kind, hash[0:2], hash[-2:])
url = KIND_TO_URL[kind].format(hash)
shape = KIND_TO_SHAPE[kind]
return ('{} [label="{}", href="{}", target="_blank", shape="{}"];'
.format(self.id, label, url, shape))
def _repr_svg_(self):
nodes = [self, *list(self.children()), *list(self.parents())]
dot = graph_dot(nodes)
svg = dot_to_svg(dot)
return svg
class Graph:
def __init__(self, backend, node2pid, pid2node):
self.backend = backend
self.java_graph = backend.entry.get_graph()
self.node2pid = node2pid
self.pid2node = pid2node
def stats(self):
return self.backend.stats()
@property
def path(self):
return self.java_graph.getPath()
def __len__(self):
return self.java_graph.getNbNodes()
def __getitem__(self, node_id):
if isinstance(node_id, int):
self.node2pid[node_id] # check existence
return GraphNode(self, node_id)
elif isinstance(node_id, str):
node_id = self.pid2node[node_id]
return GraphNode(self, node_id)
@contextlib.contextmanager
def load(graph_path):
with Backend(graph_path) as backend:
yield Graph(backend, backend.node2pid, backend.pid2node)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jun 4 2025, 6:51 PM (13 w, 23 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3213018
Attached To
rDGRPH Compressed graph representation
Event Timeline
Log In to Comment