Changeset View
Standalone View
swh/graph/server/app.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | ||||
FIFO as a transport to stream integers between the two languages. | FIFO as a transport to stream integers between the two languages. | ||||
""" | """ | ||||
import asyncio | import asyncio | ||||
import json | import json | ||||
import aiohttp.web | import aiohttp.web | ||||
from collections import deque | |||||
from swh.core.api.asynchronous import RPCServerApp | from swh.core.api.asynchronous import RPCServerApp | ||||
from swh.model.identifiers import PID_TYPES | from swh.model.identifiers import PID_TYPES | ||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
try: | try: | ||||
from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||
except ImportError: | except ImportError: | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | if s not in ('forward', 'backward'): | ||||
raise aiohttp.web.HTTPBadRequest(body=f'invalid direction: {s}') | raise aiohttp.web.HTTPBadRequest(body=f'invalid direction: {s}') | ||||
return s | return s | ||||
def get_edges(request): | def get_edges(request): | ||||
"""validate HTTP query parameter `edges`, i.e., edge restrictions""" | """validate HTTP query parameter `edges`, i.e., edge restrictions""" | ||||
s = request.query.get('edges', '*') | s = request.query.get('edges', '*') | ||||
if any([node_type != '*' and node_type not in PID_TYPES | if any([node_type != '*' and node_type not in PID_TYPES | ||||
for edge in s.split(':') | for edge in s.split(':') | ||||
for node_type in edge.split(',', maxsplit=1)]): | for node_type in edge.split(',', maxsplit=1)]): | ||||
raise aiohttp.web.HTTPBadRequest(body=f'invalid edge restriction: {s}') | raise aiohttp.web.HTTPBadRequest(body=f'invalid edge restriction: {s}') | ||||
return s | return s | ||||
def get_traversal(request): | def get_traversal(request): | ||||
"""validate HTTP query parameter `traversal`, i.e., visit order""" | """validate HTTP query parameter `traversal`, i.e., visit order""" | ||||
s = request.query.get('traversal', 'dfs') | s = request.query.get('traversal', 'dfs') | ||||
if s not in ('bfs', 'dfs'): | if s not in ('bfs', 'dfs'): | ||||
raise aiohttp.web.HTTPBadRequest(body=f'invalid traversal order: {s}') | raise aiohttp.web.HTTPBadRequest(body=f'invalid traversal order: {s}') | ||||
return s | return s | ||||
def get_limit(request): | |||||
"""validate HTTP query parameter `limit`, i.e., number of results""" | |||||
vlorentz: `'-1'` for consistent typing | |||||
s = request.query.get('limit', '0') | |||||
try: | |||||
return int(s) | |||||
except ValueError: | |||||
raise aiohttp.web.HTTPBadRequest(body=f'invalid limit value: {s}') | |||||
def node_of_pid(pid, backend): | def node_of_pid(pid, backend): | ||||
"""lookup a PID in a pid2node map, failing in an HTTP-nice way if needed""" | """lookup a PID in a pid2node map, failing in an HTTP-nice way if needed""" | ||||
try: | try: | ||||
return backend.pid2node[pid] | return backend.pid2node[pid] | ||||
except KeyError: | except KeyError: | ||||
raise aiohttp.web.HTTPNotFound(body=f'PID not found: {pid}') | raise aiohttp.web.HTTPNotFound(body=f'PID not found: {pid}') | ||||
except ValidationError: | except ValidationError: | ||||
raise aiohttp.web.HTTPBadRequest(body=f'malformed PID: {pid}') | raise aiohttp.web.HTTPBadRequest(body=f'malformed PID: {pid}') | ||||
Show All 25 Lines | async def simple_traversal(request): | ||||
): | ): | ||||
res_pid = pid_of_node(res_node, backend) | res_pid = pid_of_node(res_node, backend) | ||||
await response.write('{}\n'.format(res_pid).encode()) | await response.write('{}\n'.format(res_pid).encode()) | ||||
return response | return response | ||||
return simple_traversal | return simple_traversal | ||||
def get_walk_handler(random=False, last=False): | def get_walk_handler(random=False): | ||||
async def walk(request): | async def walk(request): | ||||
backend = request.app['backend'] | backend = request.app['backend'] | ||||
src = request.match_info['src'] | src = request.match_info['src'] | ||||
dst = request.match_info['dst'] | dst = request.match_info['dst'] | ||||
edges = get_edges(request) | edges = get_edges(request) | ||||
direction = get_direction(request) | direction = get_direction(request) | ||||
algo = get_traversal(request) | algo = get_traversal(request) | ||||
limit = get_limit(request) | |||||
src_node = node_of_pid(src, backend) | src_node = node_of_pid(src, backend) | ||||
if dst not in PID_TYPES: | if dst not in PID_TYPES: | ||||
dst = node_of_pid(dst, backend) | dst = node_of_pid(dst, backend) | ||||
async with stream_response(request) as response: | async with stream_response(request) as response: | ||||
if random: | if random: | ||||
it = backend.random_walk(direction, edges, RANDOM_RETRIES, | it = backend.random_walk(direction, edges, RANDOM_RETRIES, | ||||
src_node, dst) | src_node, dst) | ||||
else: | else: | ||||
it = backend.walk(direction, edges, algo, src_node, dst) | it = backend.walk(direction, edges, algo, src_node, dst) | ||||
res_node = None | |||||
if limit < 0: | |||||
queue = deque(maxlen=-limit) | |||||
async for res_node in it: | async for res_node in it: | ||||
if not last: | |||||
res_pid = pid_of_node(res_node, backend) | res_pid = pid_of_node(res_node, backend) | ||||
await response.write('{}\n'.format(res_pid).encode()) | queue.append('{}\n'.format(res_pid).encode()) | ||||
Done Inline ActionsCan't count be strictly lower than limit? If yes, you should add checks for this case in tests. vlorentz: Can't `count` be strictly lower than `limit`?
If yes, you should add checks for this case in… | |||||
Done Inline ActionsIt can if last is True. But there's still the open question of what to do with the /last endpoint as ?limit is supposed to be a generalization. So until then I don't know what this should be. legau: It can if last is True. But there's still the open question of what to do with the /last… | |||||
Done Inline ActionsYou keep saying/asking that, but the easy hack you're referring to already clarifies in its description that last is not a generalization of limit. I suggest you reread the task description. zack: You keep saying/asking that, but the easy hack you're referring to already clarifies in its… | |||||
Done Inline ActionsI'm saying the contrary, that limit is a generalization of last, which is I think what is written in the task description. Does that mean a /last?limit=N returns the last N nodes ? legau: I'm saying the contrary, that limit is a generalization of last, which is I think what is… | |||||
Done Inline ActionsThe task description specifies that ?limit is for the head, not the tail. If you want to generalize that to the tail and replace /last, maybe you could use positive numbers for head limits and negative numbers for tail limits?
This is consistent with what Python uses for indexing negative numbers, but I don't know if it's a good API. seirl: The task description specifies that `?limit` is for the head, not the tail.
If you want to… | |||||
Done Inline ActionsI had the same idea and it's true that it seems weird for an API but why not. Tell me what way you'd prefer. legau: I had the same idea and it's true that it seems weird for an API but why not.
I see two other… | |||||
Done Inline ActionsI'm fine with the positive/negative limit idea. zack: I'm fine with the positive/negative limit idea.
(And once we have it, we can drop /last ) | |||||
if last and res_node is not None: | while queue: | ||||
await response.write(queue.popleft()) | |||||
else: | |||||
count = 0 | |||||
async for res_node in it: | |||||
if limit == 0 or count < limit: | |||||
res_pid = pid_of_node(res_node, backend) | res_pid = pid_of_node(res_node, backend) | ||||
await response.write('{}\n'.format(res_pid).encode()) | await response.write('{}\n'.format(res_pid).encode()) | ||||
count += 1 | |||||
else: | |||||
break | |||||
return response | return response | ||||
return walk | return walk | ||||
async def visit_paths(request): | async def visit_paths(request): | ||||
backend = request.app['backend'] | backend = request.app['backend'] | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def make_app(backend, **kwargs): | ||||
# temporarily disabled in wait of a proper fix for T1969 | # temporarily disabled in wait of a proper fix for T1969 | ||||
# app.router.add_get('/graph/walk/{src}/{dst}', | # app.router.add_get('/graph/walk/{src}/{dst}', | ||||
# get_walk_handler(random=False)) | # get_walk_handler(random=False)) | ||||
# app.router.add_get('/graph/walk/last/{src}/{dst}', | # app.router.add_get('/graph/walk/last/{src}/{dst}', | ||||
# get_walk_handler(random=False, last=True)) | # get_walk_handler(random=False, last=True)) | ||||
app.router.add_get('/graph/randomwalk/{src}/{dst}', | app.router.add_get('/graph/randomwalk/{src}/{dst}', | ||||
get_walk_handler(random=True, last=False)) | get_walk_handler(random=True)) | ||||
app.router.add_get('/graph/randomwalk/last/{src}/{dst}', | |||||
get_walk_handler(random=True, last=True)) | |||||
app.router.add_get('/graph/neighbors/count/{src}', | app.router.add_get('/graph/neighbors/count/{src}', | ||||
get_count_handler('neighbors')) | get_count_handler('neighbors')) | ||||
app.router.add_get('/graph/leaves/count/{src}', | app.router.add_get('/graph/leaves/count/{src}', | ||||
get_count_handler('leaves')) | get_count_handler('leaves')) | ||||
app.router.add_get('/graph/visit/nodes/count/{src}', | app.router.add_get('/graph/visit/nodes/count/{src}', | ||||
get_count_handler('visit_nodes')) | get_count_handler('visit_nodes')) | ||||
app['backend'] = backend | app['backend'] = backend | ||||
return app | return app |
'-1' for consistent typing