Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/server/app.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | ||||
FIFO as a transport to stream integers between the two languages. | FIFO as a transport to stream integers between the two languages. | ||||
""" | """ | ||||
import json | |||||
import contextlib | import contextlib | ||||
import aiohttp.web | import aiohttp.web | ||||
from swh.core.api.asynchronous import RPCServerApp | from swh.core.api.asynchronous import RPCServerApp | ||||
from swh.model.identifiers import PID_TYPES | |||||
@contextlib.asynccontextmanager | @contextlib.asynccontextmanager | ||||
async def stream_response(request, *args, **kwargs): | async def stream_response(request, *args, **kwargs): | ||||
response = aiohttp.web.StreamResponse(*args, **kwargs) | response = aiohttp.web.StreamResponse(*args, **kwargs) | ||||
await response.prepare(request) | await response.prepare(request) | ||||
yield response | yield response | ||||
await response.write_eof() | await response.write_eof() | ||||
Show All 17 Lines | |||||
async def stats(request): | async def stats(request): | ||||
stats = request.app['backend'].stats() | stats = request.app['backend'].stats() | ||||
return aiohttp.web.Response(body=stats, content_type='application/json') | return aiohttp.web.Response(body=stats, content_type='application/json') | ||||
def get_simple_traversal_handler(ttype): | def get_simple_traversal_handler(ttype): | ||||
async def simple_traversal(request): | async def simple_traversal(request): | ||||
backend = request.app['backend'] | |||||
src = request.match_info['src'] | src = request.match_info['src'] | ||||
edges = request.query.get('edges', '*') | edges = request.query.get('edges', '*') | ||||
direction = request.query.get('direction', 'forward') | direction = request.query.get('direction', 'forward') | ||||
src_node = backend.pid2node[src] | |||||
async with stream_response(request) as response: | async with stream_response(request) as response: | ||||
async for res_pid in request.app['backend'].simple_traversal( | async for res_node in backend.simple_traversal( | ||||
ttype, direction, edges, src | ttype, direction, edges, src_node | ||||
): | ): | ||||
res_pid = backend.node2pid[res_node] | |||||
await response.write('{}\n'.format(res_pid).encode()) | await response.write('{}\n'.format(res_pid).encode()) | ||||
return response | return response | ||||
return simple_traversal | return simple_traversal | ||||
async def walk(request): | async def walk(request): | ||||
backend = request.app['backend'] | |||||
src = request.match_info['src'] | src = request.match_info['src'] | ||||
dst = request.match_info['dst'] | dst = request.match_info['dst'] | ||||
edges = request.query.get('edges', '*') | edges = request.query.get('edges', '*') | ||||
direction = request.query.get('direction', 'forward') | direction = request.query.get('direction', 'forward') | ||||
algo = request.query.get('traversal', 'dfs') | algo = request.query.get('traversal', 'dfs') | ||||
it = request.app['backend'].walk(direction, edges, algo, src, dst) | src_node = backend.pid2node[src] | ||||
if dst not in PID_TYPES: | |||||
dst = backend.pid2node[dst] | |||||
async with stream_response(request) as response: | async with stream_response(request) as response: | ||||
async for res_pid in it: | async for res_node in backend.walk( | ||||
direction, edges, algo, src_node, dst | |||||
): | |||||
res_pid = backend.node2pid[res_node] | |||||
await response.write('{}\n'.format(res_pid).encode()) | await response.write('{}\n'.format(res_pid).encode()) | ||||
return response | return response | ||||
async def visit_paths(request): | async def visit_paths(request): | ||||
backend = request.app['backend'] | |||||
src = request.match_info['src'] | src = request.match_info['src'] | ||||
edges = request.query.get('edges', '*') | edges = request.query.get('edges', '*') | ||||
direction = request.query.get('direction', 'forward') | direction = request.query.get('direction', 'forward') | ||||
it = request.app['backend'].visit_paths(direction, edges, src) | src_node = backend.pid2node[src] | ||||
it = backend.visit_paths(direction, edges, src_node) | |||||
async with stream_response(request) as response: | async with stream_response(request) as response: | ||||
async for res_pid in it: | async for res_path in it: | ||||
await response.write('{}\n'.format(res_pid).encode()) | res_path_pid = [backend.node2pid[n] for n in res_path] | ||||
line = json.dumps(res_path_pid) | |||||
await response.write('{}\n'.format(line).encode()) | |||||
return response | return response | ||||
def make_app(backend, **kwargs): | def make_app(backend, **kwargs): | ||||
app = RPCServerApp(**kwargs) | app = RPCServerApp(**kwargs) | ||||
app.router.add_route('GET', '/', index) | app.router.add_route('GET', '/', index) | ||||
app.router.add_route('GET', '/graph/stats', stats) | app.router.add_route('GET', '/graph/stats', stats) | ||||
app.router.add_route('GET', '/graph/leaves/{src}', | app.router.add_route('GET', '/graph/leaves/{src}', | ||||
Show All 10 Lines |