Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/server/app.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | ||||
FIFO as a transport to stream integers between the two languages. | FIFO as a transport to stream integers between the two languages. | ||||
""" | """ | ||||
import asyncio | import asyncio | ||||
import json | import json | ||||
import aiohttp.web | import aiohttp.web | ||||
from collections import deque | from collections import deque | ||||
from swh.core.api.asynchronous import RPCServerApp | from swh.core.api.asynchronous import RPCServerApp | ||||
from swh.model.identifiers import PID_TYPES | from swh.model.identifiers import PID_TYPES as SWHID_TYPES | ||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
try: | try: | ||||
from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||
except ImportError: | except ImportError: | ||||
# Compatibility with 3.6 backport | # Compatibility with 3.6 backport | ||||
from async_generator import asynccontextmanager # type: ignore | from async_generator import asynccontextmanager # type: ignore | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def get_direction(request): | ||||
return s | return s | ||||
def get_edges(request): | def get_edges(request): | ||||
"""validate HTTP query parameter `edges`, i.e., edge restrictions""" | """validate HTTP query parameter `edges`, i.e., edge restrictions""" | ||||
s = request.query.get("edges", "*") | s = request.query.get("edges", "*") | ||||
if any( | if any( | ||||
[ | [ | ||||
node_type != "*" and node_type not in PID_TYPES | node_type != "*" and node_type not in SWHID_TYPES | ||||
for edge in s.split(":") | for edge in s.split(":") | ||||
for node_type in edge.split(",", maxsplit=1) | for node_type in edge.split(",", maxsplit=1) | ||||
] | ] | ||||
): | ): | ||||
raise aiohttp.web.HTTPBadRequest(body=f"invalid edge restriction: {s}") | raise aiohttp.web.HTTPBadRequest(body=f"invalid edge restriction: {s}") | ||||
return s | return s | ||||
Show All 9 Lines | def get_limit(request): | ||||
"""validate HTTP query parameter `limit`, i.e., number of results""" | """validate HTTP query parameter `limit`, i.e., number of results""" | ||||
s = request.query.get("limit", "0") | s = request.query.get("limit", "0") | ||||
try: | try: | ||||
return int(s) | return int(s) | ||||
except ValueError: | except ValueError: | ||||
raise aiohttp.web.HTTPBadRequest(body=f"invalid limit value: {s}") | raise aiohttp.web.HTTPBadRequest(body=f"invalid limit value: {s}") | ||||
def node_of_pid(pid, backend): | def node_of_swhid(swhid, backend): | ||||
"""lookup a PID in a pid2node map, failing in an HTTP-nice way if needed""" | """lookup a SWHID in a swhid2node map, failing in an HTTP-nice way if needed""" | ||||
try: | try: | ||||
return backend.pid2node[pid] | return backend.swhid2node[swhid] | ||||
except KeyError: | except KeyError: | ||||
raise aiohttp.web.HTTPNotFound(body=f"PID not found: {pid}") | raise aiohttp.web.HTTPNotFound(body=f"SWHID not found: {swhid}") | ||||
except ValidationError: | except ValidationError: | ||||
raise aiohttp.web.HTTPBadRequest(body=f"malformed PID: {pid}") | raise aiohttp.web.HTTPBadRequest(body=f"malformed SWHID: {swhid}") | ||||
def pid_of_node(node, backend): | def swhid_of_node(node, backend): | ||||
"""lookup a node in a node2pid map, failing in an HTTP-nice way if needed | """lookup a node in a node2swhid map, failing in an HTTP-nice way if needed | ||||
""" | """ | ||||
try: | try: | ||||
return backend.node2pid[node] | return backend.node2swhid[node] | ||||
except KeyError: | except KeyError: | ||||
raise aiohttp.web.HTTPInternalServerError( | raise aiohttp.web.HTTPInternalServerError( | ||||
body=f"reverse lookup failed for node id: {node}" | body=f"reverse lookup failed for node id: {node}" | ||||
) | ) | ||||
def get_simple_traversal_handler(ttype): | def get_simple_traversal_handler(ttype): | ||||
async def simple_traversal(request): | async def simple_traversal(request): | ||||
backend = request.app["backend"] | backend = request.app["backend"] | ||||
src = request.match_info["src"] | src = request.match_info["src"] | ||||
edges = get_edges(request) | edges = get_edges(request) | ||||
direction = get_direction(request) | direction = get_direction(request) | ||||
src_node = node_of_pid(src, backend) | src_node = node_of_swhid(src, backend) | ||||
async with stream_response(request) as response: | async with stream_response(request) as response: | ||||
async for res_node in backend.simple_traversal( | async for res_node in backend.simple_traversal( | ||||
ttype, direction, edges, src_node | ttype, direction, edges, src_node | ||||
): | ): | ||||
res_pid = pid_of_node(res_node, backend) | res_swhid = swhid_of_node(res_node, backend) | ||||
await response.write("{}\n".format(res_pid).encode()) | await response.write("{}\n".format(res_swhid).encode()) | ||||
return response | return response | ||||
return simple_traversal | return simple_traversal | ||||
def get_walk_handler(random=False): | def get_walk_handler(random=False): | ||||
async def walk(request): | async def walk(request): | ||||
backend = request.app["backend"] | backend = request.app["backend"] | ||||
src = request.match_info["src"] | src = request.match_info["src"] | ||||
dst = request.match_info["dst"] | dst = request.match_info["dst"] | ||||
edges = get_edges(request) | edges = get_edges(request) | ||||
direction = get_direction(request) | direction = get_direction(request) | ||||
algo = get_traversal(request) | algo = get_traversal(request) | ||||
limit = get_limit(request) | limit = get_limit(request) | ||||
src_node = node_of_pid(src, backend) | src_node = node_of_swhid(src, backend) | ||||
if dst not in PID_TYPES: | if dst not in SWHID_TYPES: | ||||
dst = node_of_pid(dst, backend) | dst = node_of_swhid(dst, backend) | ||||
async with stream_response(request) as response: | async with stream_response(request) as response: | ||||
if random: | if random: | ||||
it = backend.random_walk( | it = backend.random_walk( | ||||
direction, edges, RANDOM_RETRIES, src_node, dst | direction, edges, RANDOM_RETRIES, src_node, dst | ||||
) | ) | ||||
else: | else: | ||||
it = backend.walk(direction, edges, algo, src_node, dst) | it = backend.walk(direction, edges, algo, src_node, dst) | ||||
if limit < 0: | if limit < 0: | ||||
queue = deque(maxlen=-limit) | queue = deque(maxlen=-limit) | ||||
async for res_node in it: | async for res_node in it: | ||||
res_pid = pid_of_node(res_node, backend) | res_swhid = swhid_of_node(res_node, backend) | ||||
queue.append("{}\n".format(res_pid).encode()) | queue.append("{}\n".format(res_swhid).encode()) | ||||
while queue: | while queue: | ||||
await response.write(queue.popleft()) | await response.write(queue.popleft()) | ||||
else: | else: | ||||
count = 0 | count = 0 | ||||
async for res_node in it: | async for res_node in it: | ||||
if limit == 0 or count < limit: | if limit == 0 or count < limit: | ||||
res_pid = pid_of_node(res_node, backend) | res_swhid = swhid_of_node(res_node, backend) | ||||
await response.write("{}\n".format(res_pid).encode()) | await response.write("{}\n".format(res_swhid).encode()) | ||||
count += 1 | count += 1 | ||||
else: | else: | ||||
break | break | ||||
return response | return response | ||||
return walk | return walk | ||||
async def visit_paths(request): | async def visit_paths(request): | ||||
backend = request.app["backend"] | backend = request.app["backend"] | ||||
src = request.match_info["src"] | src = request.match_info["src"] | ||||
edges = get_edges(request) | edges = get_edges(request) | ||||
direction = get_direction(request) | direction = get_direction(request) | ||||
src_node = node_of_pid(src, backend) | src_node = node_of_swhid(src, backend) | ||||
it = backend.visit_paths(direction, edges, src_node) | it = backend.visit_paths(direction, edges, src_node) | ||||
async with stream_response( | async with stream_response( | ||||
request, content_type="application/x-ndjson" | request, content_type="application/x-ndjson" | ||||
) as response: | ) as response: | ||||
async for res_path in it: | async for res_path in it: | ||||
res_path_pid = [pid_of_node(n, backend) for n in res_path] | res_path_swhid = [swhid_of_node(n, backend) for n in res_path] | ||||
line = json.dumps(res_path_pid) | line = json.dumps(res_path_swhid) | ||||
await response.write("{}\n".format(line).encode()) | await response.write("{}\n".format(line).encode()) | ||||
return response | return response | ||||
def get_count_handler(ttype): | def get_count_handler(ttype): | ||||
async def count(request): | async def count(request): | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
backend = request.app["backend"] | backend = request.app["backend"] | ||||
src = request.match_info["src"] | src = request.match_info["src"] | ||||
edges = get_edges(request) | edges = get_edges(request) | ||||
direction = get_direction(request) | direction = get_direction(request) | ||||
src_node = node_of_pid(src, backend) | src_node = node_of_swhid(src, backend) | ||||
cnt = await loop.run_in_executor( | cnt = await loop.run_in_executor( | ||||
None, backend.count, ttype, direction, edges, src_node | None, backend.count, ttype, direction, edges, src_node | ||||
) | ) | ||||
return aiohttp.web.Response(body=str(cnt), content_type="application/json") | return aiohttp.web.Response(body=str(cnt), content_type="application/json") | ||||
return count | return count | ||||
Show All 31 Lines |