Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/server/app.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | ||||
FIFO as a transport to stream integers between the two languages. | FIFO as a transport to stream integers between the two languages. | ||||
""" | """ | ||||
import asyncio | import asyncio | ||||
import json | import json | ||||
import aiohttp.web | import aiohttp.web | ||||
from swh.core.api.asynchronous import RPCServerApp | from swh.core.api.asynchronous import RPCServerApp | ||||
from swh.model.identifiers import PID_TYPES | from swh.model.identifiers import PID_TYPES | ||||
from swh.model.exceptions import ValidationError | |||||
try: | try: | ||||
from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||
except ImportError: | except ImportError: | ||||
# Compatibility with 3.6 backport | # Compatibility with 3.6 backport | ||||
from async_generator import asynccontextmanager # type: ignore | from async_generator import asynccontextmanager # type: ignore | ||||
Show All 31 Lines | |||||
def get_simple_traversal_handler(ttype): | def get_simple_traversal_handler(ttype): | ||||
async def simple_traversal(request): | async def simple_traversal(request): | ||||
backend = request.app['backend'] | backend = request.app['backend'] | ||||
src = request.match_info['src'] | src = request.match_info['src'] | ||||
edges = request.query.get('edges', '*') | edges = request.query.get('edges', '*') | ||||
direction = request.query.get('direction', 'forward') | direction = request.query.get('direction', 'forward') | ||||
try: | |||||
src_node = backend.pid2node[src] | src_node = backend.pid2node[src] | ||||
except KeyError: | |||||
return aiohttp.web.HTTPNotFound() | |||||
except ValidationError: | |||||
return aiohttp.web.HTTPBadRequest() | |||||
async with stream_response(request) as response: | async with stream_response(request) as response: | ||||
async for res_node in backend.simple_traversal( | async for res_node in backend.simple_traversal( | ||||
ttype, direction, edges, src_node | ttype, direction, edges, src_node | ||||
): | ): | ||||
res_pid = backend.node2pid[res_node] | res_pid = backend.node2pid[res_node] | ||||
await response.write('{}\n'.format(res_pid).encode()) | await response.write('{}\n'.format(res_pid).encode()) | ||||
return response | return response | ||||
return simple_traversal | return simple_traversal | ||||
async def walk(request): | async def walk(request): | ||||
backend = request.app['backend'] | backend = request.app['backend'] | ||||
src = request.match_info['src'] | src = request.match_info['src'] | ||||
dst = request.match_info['dst'] | dst = request.match_info['dst'] | ||||
edges = request.query.get('edges', '*') | edges = request.query.get('edges', '*') | ||||
direction = request.query.get('direction', 'forward') | direction = request.query.get('direction', 'forward') | ||||
algo = request.query.get('traversal', 'dfs') | algo = request.query.get('traversal', 'dfs') | ||||
try: | |||||
src_node = backend.pid2node[src] | src_node = backend.pid2node[src] | ||||
if dst not in PID_TYPES: | if dst not in PID_TYPES: | ||||
dst = backend.pid2node[dst] | dst = backend.pid2node[dst] | ||||
except KeyError: | |||||
return aiohttp.web.HTTPNotFound() | |||||
except ValidationError: | |||||
return aiohttp.web.HTTPBadRequest() | |||||
async with stream_response(request) as response: | async with stream_response(request) as response: | ||||
async for res_node in backend.walk( | async for res_node in backend.walk( | ||||
direction, edges, algo, src_node, dst | direction, edges, algo, src_node, dst | ||||
): | ): | ||||
res_pid = backend.node2pid[res_node] | res_pid = backend.node2pid[res_node] | ||||
await response.write('{}\n'.format(res_pid).encode()) | await response.write('{}\n'.format(res_pid).encode()) | ||||
return response | return response | ||||
async def visit_paths(request): | async def visit_paths(request): | ||||
backend = request.app['backend'] | backend = request.app['backend'] | ||||
src = request.match_info['src'] | src = request.match_info['src'] | ||||
edges = request.query.get('edges', '*') | edges = request.query.get('edges', '*') | ||||
direction = request.query.get('direction', 'forward') | direction = request.query.get('direction', 'forward') | ||||
try: | |||||
src_node = backend.pid2node[src] | src_node = backend.pid2node[src] | ||||
except KeyError: | |||||
return aiohttp.web.HTTPNotFound() | |||||
except ValidationError: | |||||
return aiohttp.web.HTTPBadRequest() | |||||
it = backend.visit_paths(direction, edges, src_node) | it = backend.visit_paths(direction, edges, src_node) | ||||
async with stream_response(request, content_type='application/x-ndjson') \ | async with stream_response(request, content_type='application/x-ndjson') \ | ||||
as response: | as response: | ||||
async for res_path in it: | async for res_path in it: | ||||
res_path_pid = [backend.node2pid[n] for n in res_path] | res_path_pid = [backend.node2pid[n] for n in res_path] | ||||
line = json.dumps(res_path_pid) | line = json.dumps(res_path_pid) | ||||
await response.write('{}\n'.format(line).encode()) | await response.write('{}\n'.format(line).encode()) | ||||
return response | return response | ||||
def get_count_handler(ttype): | def get_count_handler(ttype): | ||||
async def count(request): | async def count(request): | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
backend = request.app['backend'] | backend = request.app['backend'] | ||||
src = request.match_info['src'] | src = request.match_info['src'] | ||||
edges = request.query.get('edges', '*') | edges = request.query.get('edges', '*') | ||||
direction = request.query.get('direction', 'forward') | direction = request.query.get('direction', 'forward') | ||||
try: | |||||
src_node = backend.pid2node[src] | src_node = backend.pid2node[src] | ||||
except KeyError: | |||||
return aiohttp.web.HTTPNotFound() | |||||
except ValidationError: | |||||
return aiohttp.web.HTTPBadRequest() | |||||
cnt = await loop.run_in_executor( | cnt = await loop.run_in_executor( | ||||
None, backend.count, ttype, direction, edges, src_node) | None, backend.count, ttype, direction, edges, src_node) | ||||
return aiohttp.web.Response(body=str(cnt), | return aiohttp.web.Response(body=str(cnt), | ||||
content_type='application/json') | content_type='application/json') | ||||
return count | return count | ||||
Show All 23 Lines |