Page MenuHomeSoftware Heritage

D1991.id6705.diff
No OneTemporary

D1991.id6705.diff

diff --git a/java/server/src/main/java/org/softwareheritage/graph/Entry.java b/java/server/src/main/java/org/softwareheritage/graph/Entry.java
--- a/java/server/src/main/java/org/softwareheritage/graph/Entry.java
+++ b/java/server/src/main/java/org/softwareheritage/graph/Entry.java
@@ -1,12 +1,17 @@
package org.softwareheritage.graph;
+import java.util.Map;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import py4j.GatewayServer;
import org.softwareheritage.graph.Graph;
+import org.softwareheritage.graph.Node;
+import org.softwareheritage.graph.algo.Stats;
import org.softwareheritage.graph.algo.NodeIdsConsumer;
import org.softwareheritage.graph.algo.Traversal;
@@ -19,21 +24,98 @@
System.err.println("Graph loaded.");
}
- public void visit(long srcNodeId, String direction, String edgesFmt,
- String clientFIFO) {
- Traversal t = new Traversal(this.graph, direction, edgesFmt);
+ public String stats() {
try {
- FileOutputStream file = new FileOutputStream(clientFIFO);
- DataOutputStream data = new DataOutputStream(file);
- t.visitNodesVisitor(srcNodeId, (nodeId) -> {
- try {
- data.writeLong(nodeId);
- } catch (IOException e) {
- throw new RuntimeException("cannot write response to client: " + e);
- }});
- data.close();
+ Stats stats = new Stats(graph.getPath());
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
+ String res = objectMapper.writeValueAsString(stats);
+ return res;
} catch (IOException e) {
- System.err.println("cannot write response to client: " + e);
+ throw new RuntimeException("Cannot read stats: " + e);
+ }
+ }
+
+ public QueryHandler get_handler(String clientFIFO) {
+ return new QueryHandler(this.graph.copy(), clientFIFO);
+ }
+
+ public class QueryHandler {
+ Graph graph;
+ DataOutputStream out;
+ String clientFIFO;
+
+ public QueryHandler(Graph graph, String clientFIFO) {
+ this.graph = graph;
+ this.clientFIFO = clientFIFO;
+ this.out = null;
+ }
+
+ public void writeNode(long nodeId) {
+ try {
+ out.writeLong(nodeId);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot write response to client: " + e);
+ }
+ }
+
+ public void open() {
+ try {
+ FileOutputStream file = new FileOutputStream(this.clientFIFO);
+ this.out = new DataOutputStream(file);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot create FIFO: " + e);
+ }
+ }
+
+ public void close() {
+ try {
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot write response to client: " + e);
+ }
+ }
+
+ public void leaves(String direction, String edgesFmt, long srcNodeId) {
+ open();
+ Traversal t = new Traversal(this.graph, direction, edgesFmt);
+ t.leavesVisitor(srcNodeId, this::writeNode);
+ close();
+ }
+
+ public void neighbors(String direction, String edgesFmt, long srcNodeId) {
+ open();
+ Traversal t = new Traversal(this.graph, direction, edgesFmt);
+ t.neighborsVisitor(srcNodeId, this::writeNode);
+ close();
+ }
+
+ public void visit_nodes(String direction, String edgesFmt, long srcNodeId) {
+ open();
+ Traversal t = new Traversal(this.graph, direction, edgesFmt);
+ t.visitNodesVisitor(srcNodeId, this::writeNode);
+ close();
+ }
+
+ public void walk(String direction, String edgesFmt, String algorithm,
+ long srcNodeId, long dstNodeId) {
+ open();
+ Traversal t = new Traversal(this.graph, direction, edgesFmt);
+ for (Long nodeId : t.walk(srcNodeId, dstNodeId, algorithm)) {
+ writeNode(nodeId);
+ }
+ close();
+ }
+
+ public void walk_type(String direction, String edgesFmt, String algorithm,
+ long srcNodeId, String dst) {
+ open();
+ Node.Type dstType = Node.Type.fromStr(dst);
+ Traversal t = new Traversal(this.graph, direction, edgesFmt);
+ for (Long nodeId : t.walk(srcNodeId, dstType, algorithm)) {
+ writeNode(nodeId);
+ }
+ close();
}
}
}
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
aiohttp
click
vcversioner
+py4j
diff --git a/swh/graph/cli.py b/swh/graph/cli.py
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -103,7 +103,7 @@
def restore_map(ctx, map_type, length, filename):
"""restore a binary PID<->int map from textual format"""
if map_type == 'pid2int':
- restore_pid2int(filename, length)
+ restore_pid2int(filename)
elif map_type == 'int2pid':
if length is None:
raise click.UsageError(
diff --git a/swh/graph/server/__main__.py b/swh/graph/server/__main__.py
--- a/swh/graph/server/__main__.py
+++ b/swh/graph/server/__main__.py
@@ -5,32 +5,79 @@
import argparse
+import contextlib
import aiohttp.web
from swh.core.api.asynchronous import RPCServerApp
from swh.graph.server.backend import Backend
+@contextlib.asynccontextmanager
+async def stream_response(request, *args, **kwargs):
+ response = aiohttp.web.StreamResponse(*args, **kwargs)
+ await response.prepare(request)
+ yield response
+ await response.write_eof()
+
+
async def index(request):
return aiohttp.web.Response(body="SWH Graph API server")
+async def stats(request):
+ stats = request.app['backend'].stats()
+ return aiohttp.web.Response(body=stats, content_type='application/json')
+
+
+async def _simple_traversal(request, ttype):
+ assert ttype in ('leaves', 'neighbors', 'visit_nodes')
+ method = getattr(request.app['backend'], ttype)
+
+ src = request.match_info['src']
+ edges = request.query.get('edges', '*')
+ direction = request.query.get('direction', 'forward')
+
+ async with stream_response(request) as response:
+ async for res_pid in method(direction, edges, src):
+ await response.write('{}\n'.format(res_pid).encode())
+ return response
+
+
+async def leaves(request):
+ return (await _simple_traversal(request, 'leaves'))
+
+
+async def neighbors(request):
+ return (await _simple_traversal(request, 'neighbors'))
+
+
async def visit(request):
- node_id = int(request.match_info['id'])
- response = aiohttp.web.StreamResponse(status=200)
- await response.prepare(request)
- async for node_id in request.app['backend'].visit(node_id):
- await response.write('{}\n'.format(node_id).encode())
- await response.write_eof()
- return response
+ return (await _simple_traversal(request, 'visit_nodes'))
+
+
+async def walk(request):
+ src = request.match_info['src']
+ dst = request.match_info['dst']
+ edges = request.query.get('edges', '*')
+ direction = request.query.get('direction', 'forward')
+ algo = request.query.get('traversal', 'dfs')
+
+ it = request.app['backend'].walk(direction, edges, algo, src, dst)
+ async with stream_response(request) as response:
+ async for res_pid in it:
+ await response.write('{}\n'.format(res_pid).encode())
+ return response
def make_app(backend, **kwargs):
app = RPCServerApp(**kwargs)
app.router.add_route('GET', '/', index)
-
- # Endpoints used by the web API
- app.router.add_route('GET', '/visit/{id}', visit)
+ app.router.add_route('GET', '/graph/stats', stats)
+ app.router.add_route('GET', '/graph/leaves/{src}', leaves)
+ app.router.add_route('GET', '/graph/neighbors/{src}', neighbors)
+ app.router.add_route('GET', '/graph/walk/{src}/{dst}', walk)
+ app.router.add_route('GET', '/graph/visit/nodes/{src}', visit)
+ # TODO: graph/visit/paths/ ?
app['backend'] = backend
return app
diff --git a/swh/graph/server/backend.py b/swh/graph/server/backend.py
--- a/swh/graph/server/backend.py
+++ b/swh/graph/server/backend.py
@@ -1,4 +1,5 @@
import asyncio
+import contextlib
import os
import struct
import sys
@@ -6,23 +7,12 @@
from py4j.java_gateway import JavaGateway
-GATEWAY_SERVER_PORT = 25335
+from swh.graph.pid import IntToPidMap, PidToIntMap
BUF_SIZE = 64*1024
BIN_FMT = '>q' # 64 bit integer, big endian
-async def read_node_ids(fname):
- loop = asyncio.get_event_loop()
- with open(fname, 'rb') as f:
- while True:
- data = await loop.run_in_executor(None, f.read, BUF_SIZE)
- if not data:
- break
- for data in struct.iter_unpack(BIN_FMT, data):
- yield data[0]
-
-
class Backend:
def __init__(self, graph_path):
self.gateway = None
@@ -31,30 +21,118 @@
def __enter__(self):
self.gateway = JavaGateway.launch_gateway(
- port=GATEWAY_SERVER_PORT,
- classpath='java/server/target/swh-graph-0.0.2-jar-with-dependencies.jar',
+ java_path=None,
+ classpath='java/server/target/swh-graph-0.0.2-jar-with-dependencies.jar', # noqa
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=sys.stderr,
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
+ self.node2pid = IntToPidMap(self.graph_path + '.node2pid.bin')
+ self.pid2node = PidToIntMap(self.graph_path + '.pid2node.bin')
+ self.stream_proxy = JavaStreamProxy(self.entry)
# "/home/seirl/swh-graph/sample/big/compressed/swh-graph")
- def __exit__(self):
+ def __exit__(self, exc_type, exc_value, tb):
self.gateway.shutdown()
- async def visit(self, node_id):
+ def stats(self):
+ return self.entry.stats()
+
+ async def _simple_traversal(self, ttype, direction, edges_fmt, src):
+ assert ttype in ('leaves', 'neighbors', 'visit_nodes')
+ src_id = self.pid2node[src]
+ method = getattr(self.stream_proxy, ttype)
+ async for node_id in method(direction, edges_fmt, src_id):
+ yield self.node2pid[node_id]
+
+ async def leaves(self, *args):
+ async for res_pid in self._simple_traversal('leaves', *args):
+ yield res_pid
+
+ async def neighbors(self, *args):
+ async for res_pid in self._simple_traversal('neighbors', *args):
+ yield res_pid
+
+ async def visit_nodes(self, *args):
+ async for res_pid in self._simple_traversal('visit_nodes', *args):
+ yield res_pid
+
+ async def walk(self, direction, edges_fmt, algo, src, dst):
+ src_id = self.pid2node[src]
+ if dst in ('cnt', 'dir', 'rel', 'rev', 'snp', 'ori'):
+ it = self.stream_proxy.walk_type(direction, edges_fmt, algo,
+ src_id, dst)
+ else:
+ dst_id = self.pid2node[dst]
+ it = self.stream_proxy.walk(direction, edges_fmt, algo,
+ src_id, dst_id)
+
+ async for node_id in it:
+ yield self.node2pid[node_id]
+
+
+class JavaStreamProxy:
+ """A proxy class for the org.softwareheritage.graph.Entry Java class that
+ takes care of the setup and teardown of the named-pipe FIFO communication
+ between Python and Java.
+
+ Initialize JavaStreamProxy using:
+
+ proxy = JavaStreamProxy(swh_entry_class_instance)
+
+ Then you can call an Entry method and iterate on the FIFO results like
+ this:
+
+ async for value in proxy.java_method(arg1, arg2):
+ print(value)
+ """
+
+ def __init__(self, entry):
+ self.entry = entry
+
+ async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
+ with (await loop.run_in_executor(None, open, fname, 'rb')) as f:
+ while True:
+ data = await loop.run_in_executor(None, f.read, BUF_SIZE)
+ if not data:
+ break
+ for data in struct.iter_unpack(BIN_FMT, data):
+ yield data[0]
+
+ class _HandlerWrapper:
+ def __init__(self, handler):
+ self._handler = handler
+ def __getattr__(self, name):
+ func = getattr(self._handler, name)
+
+ async def java_call(*args, **kwargs):
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, lambda: func(*args, **kwargs))
+
+ def java_task(*args, **kwargs):
+ return asyncio.create_task(java_call(*args, **kwargs))
+
+ return java_task
+
+ @contextlib.contextmanager
+ def get_handler(self):
with tempfile.TemporaryDirectory() as tmpdirname:
cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo')
os.mkfifo(cli_fifo)
+ reader = self.read_node_ids(cli_fifo)
+ query_handler = self.entry.get_handler(cli_fifo)
+ handler = self._HandlerWrapper(query_handler)
+ yield (handler, reader)
- def _visit():
- return self.entry.visit(node_id, 'forward', '*', cli_fifo)
-
- java_call = loop.run_in_executor(None, _visit)
- async for node_id in read_node_ids(cli_fifo):
- yield node_id
- await java_call
+ def __getattr__(self, name):
+ async def java_call_iterator(*args, **kwargs):
+ with self.get_handler() as (handler, reader):
+ java_task = getattr(handler, name)(*args, **kwargs)
+ async for value in reader:
+ yield value
+ await java_task
+ return java_call_iterator

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:20 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232393

Event Timeline