Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/server/app.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | ||||
FIFO as a transport to stream integers between the two languages. | FIFO as a transport to stream integers between the two languages. | ||||
""" | """ | ||||
import asyncio | import asyncio | ||||
from collections import deque | from collections import deque | ||||
import json | |||||
import os | import os | ||||
from typing import Optional | from typing import Optional | ||||
import aiohttp.web | import aiohttp.web | ||||
from swh.core.api.asynchronous import RPCServerApp | from swh.core.api.asynchronous import RPCServerApp | ||||
from swh.core.config import read as config_read | from swh.core.config import read as config_read | ||||
from swh.graph.backend import Backend | from swh.graph.backend import Backend | ||||
from swh.model.exceptions import ValidationError | |||||
from swh.model.swhids import EXTENDED_SWHID_TYPES | from swh.model.swhids import EXTENDED_SWHID_TYPES | ||||
try: | try: | ||||
from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||
except ImportError: | except ImportError: | ||||
# Compatibility with 3.6 backport | # Compatibility with 3.6 backport | ||||
from async_generator import asynccontextmanager # type: ignore | from async_generator import asynccontextmanager # type: ignore | ||||
Show All 38 Lines | |||||
class GraphView(aiohttp.web.View): | class GraphView(aiohttp.web.View): | ||||
"""Base class for views working on the graph, with utility functions""" | """Base class for views working on the graph, with utility functions""" | ||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
self.backend = self.request.app["backend"] | self.backend = self.request.app["backend"] | ||||
def node_of_swhid(self, swhid): | |||||
"""Lookup a SWHID in a swhid2node map, failing in an HTTP-nice way if | |||||
needed.""" | |||||
try: | |||||
return self.backend.swhid2node[swhid] | |||||
except KeyError: | |||||
raise aiohttp.web.HTTPNotFound(text=f"SWHID not found: {swhid}") | |||||
except ValidationError: | |||||
raise aiohttp.web.HTTPBadRequest(text=f"malformed SWHID: {swhid}") | |||||
def swhid_of_node(self, node): | |||||
"""Lookup a node in a node2swhid map, failing in an HTTP-nice way if | |||||
needed.""" | |||||
try: | |||||
return self.backend.node2swhid[node] | |||||
except KeyError: | |||||
raise aiohttp.web.HTTPInternalServerError( | |||||
text=f"reverse lookup failed for node id: {node}" | |||||
) | |||||
def get_direction(self): | def get_direction(self): | ||||
"""Validate HTTP query parameter `direction`""" | """Validate HTTP query parameter `direction`""" | ||||
s = self.request.query.get("direction", "forward") | s = self.request.query.get("direction", "forward") | ||||
if s not in ("forward", "backward"): | if s not in ("forward", "backward"): | ||||
raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}") | raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}") | ||||
return s | return s | ||||
def get_edges(self): | def get_edges(self): | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def get_max_edges(self): | ||||
"""Validate HTTP query parameter 'max_edges', i.e., | """Validate HTTP query parameter 'max_edges', i.e., | ||||
the limit of the number of edges that can be visited""" | the limit of the number of edges that can be visited""" | ||||
s = self.request.query.get("max_edges", "0") | s = self.request.query.get("max_edges", "0") | ||||
try: | try: | ||||
return int(s) | return int(s) | ||||
except ValueError: | except ValueError: | ||||
raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}") | raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}") | ||||
def check_swhid(self, swhid): | |||||
"""Validate that the given SWHID exists in the graph""" | |||||
try: | |||||
self.backend.check_swhid(swhid) | |||||
except (NameError, ValueError) as e: | |||||
raise aiohttp.web.HTTPBadRequest(text=str(e)) | |||||
class StreamingGraphView(GraphView): | class StreamingGraphView(GraphView): | ||||
"""Base class for views streaming their response line by line.""" | """Base class for views streaming their response line by line.""" | ||||
content_type = "text/plain" | content_type = "text/plain" | ||||
@asynccontextmanager | @asynccontextmanager | ||||
async def response_streamer(self, *args, **kwargs): | async def response_streamer(self, *args, **kwargs): | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | |||||
class SimpleTraversalView(StreamingGraphView): | class SimpleTraversalView(StreamingGraphView): | ||||
"""Base class for views of simple traversals""" | """Base class for views of simple traversals""" | ||||
simple_traversal_type: Optional[str] = None | simple_traversal_type: Optional[str] = None | ||||
async def prepare_response(self): | async def prepare_response(self): | ||||
src = self.request.match_info["src"] | self.src = self.request.match_info["src"] | ||||
self.src_node = self.node_of_swhid(src) | |||||
self.edges = self.get_edges() | self.edges = self.get_edges() | ||||
self.direction = self.get_direction() | self.direction = self.get_direction() | ||||
self.max_edges = self.get_max_edges() | self.max_edges = self.get_max_edges() | ||||
self.return_types = self.get_return_types() | self.return_types = self.get_return_types() | ||||
self.check_swhid(self.src) | |||||
async def stream_response(self): | async def stream_response(self): | ||||
async for res_node in self.backend.simple_traversal( | async for res_line in self.backend.traversal( | ||||
self.simple_traversal_type, | self.simple_traversal_type, | ||||
self.direction, | self.direction, | ||||
self.edges, | self.edges, | ||||
self.src_node, | self.src, | ||||
self.max_edges, | self.max_edges, | ||||
self.return_types, | self.return_types, | ||||
): | ): | ||||
res_swhid = self.swhid_of_node(res_node) | await self.stream_line(res_line) | ||||
await self.stream_line(res_swhid) | |||||
class LeavesView(SimpleTraversalView): | class LeavesView(SimpleTraversalView): | ||||
simple_traversal_type = "leaves" | simple_traversal_type = "leaves" | ||||
class NeighborsView(SimpleTraversalView): | class NeighborsView(SimpleTraversalView): | ||||
simple_traversal_type = "neighbors" | simple_traversal_type = "neighbors" | ||||
class VisitNodesView(SimpleTraversalView): | class VisitNodesView(SimpleTraversalView): | ||||
simple_traversal_type = "visit_nodes" | simple_traversal_type = "visit_nodes" | ||||
class VisitEdgesView(SimpleTraversalView): | |||||
simple_traversal_type = "visit_edges" | |||||
class WalkView(StreamingGraphView): | class WalkView(StreamingGraphView): | ||||
async def prepare_response(self): | async def prepare_response(self): | ||||
src = self.request.match_info["src"] | self.src = self.request.match_info["src"] | ||||
dst = self.request.match_info["dst"] | self.dst = self.request.match_info["dst"] | ||||
self.src_node = self.node_of_swhid(src) | |||||
if dst not in EXTENDED_SWHID_TYPES: | |||||
self.dst_thing = self.node_of_swhid(dst) | |||||
else: | |||||
self.dst_thing = dst | |||||
self.edges = self.get_edges() | self.edges = self.get_edges() | ||||
self.direction = self.get_direction() | self.direction = self.get_direction() | ||||
self.algo = self.get_traversal() | self.algo = self.get_traversal() | ||||
self.limit = self.get_limit() | self.limit = self.get_limit() | ||||
self.return_types = self.get_return_types() | |||||
self.check_swhid(self.src) | |||||
if self.dst not in EXTENDED_SWHID_TYPES: | |||||
self.check_swhid(self.dst) | |||||
async def get_walk_iterator(self): | async def get_walk_iterator(self): | ||||
return self.backend.walk( | return self.backend.traversal( | ||||
self.direction, self.edges, self.algo, self.src_node, self.dst_thing | "walk", self.direction, self.edges, self.algo, self.src, self.dst | ||||
) | ) | ||||
async def stream_response(self): | async def stream_response(self): | ||||
it = self.get_walk_iterator() | it = self.get_walk_iterator() | ||||
if self.limit < 0: | if self.limit < 0: | ||||
queue = deque(maxlen=-self.limit) | queue = deque(maxlen=-self.limit) | ||||
async for res_node in it: | async for res_swhid in it: | ||||
res_swhid = self.swhid_of_node(res_node) | |||||
queue.append(res_swhid) | queue.append(res_swhid) | ||||
while queue: | while queue: | ||||
await self.stream_line(queue.popleft()) | await self.stream_line(queue.popleft()) | ||||
else: | else: | ||||
count = 0 | count = 0 | ||||
async for res_node in it: | async for res_swhid in it: | ||||
if self.limit == 0 or count < self.limit: | if self.limit == 0 or count < self.limit: | ||||
res_swhid = self.swhid_of_node(res_node) | |||||
await self.stream_line(res_swhid) | await self.stream_line(res_swhid) | ||||
count += 1 | count += 1 | ||||
else: | else: | ||||
break | break | ||||
class RandomWalkView(WalkView): | class RandomWalkView(WalkView): | ||||
def get_walk_iterator(self): | def get_walk_iterator(self): | ||||
return self.backend.random_walk( | return self.backend.traversal( | ||||
"random_walk", | |||||
self.direction, | self.direction, | ||||
self.edges, | self.edges, | ||||
RANDOM_RETRIES, | RANDOM_RETRIES, | ||||
self.src_node, | self.src, | ||||
self.dst_thing, | self.dst, | ||||
self.return_types, | |||||
) | |||||
class VisitEdgesView(SimpleTraversalView): | |||||
async def stream_response(self): | |||||
it = self.backend.visit_edges( | |||||
self.direction, self.edges, self.src_node, self.max_edges | |||||
) | |||||
async for (res_src, res_dst) in it: | |||||
res_src_swhid = self.swhid_of_node(res_src) | |||||
res_dst_swhid = self.swhid_of_node(res_dst) | |||||
await self.stream_line("{} {}".format(res_src_swhid, res_dst_swhid)) | |||||
class VisitPathsView(SimpleTraversalView): | |||||
content_type = "application/x-ndjson" | |||||
async def stream_response(self): | |||||
it = self.backend.visit_paths( | |||||
self.direction, self.edges, self.src_node, self.max_edges | |||||
) | ) | ||||
async for res_path in it: | |||||
res_path_swhid = [self.swhid_of_node(n) for n in res_path] | |||||
line = json.dumps(res_path_swhid) | |||||
await self.stream_line(line) | |||||
class CountView(GraphView): | class CountView(GraphView): | ||||
"""Base class for counting views.""" | """Base class for counting views.""" | ||||
count_type: Optional[str] = None | count_type: Optional[str] = None | ||||
async def get(self): | async def get(self): | ||||
src = self.request.match_info["src"] | self.src = self.request.match_info["src"] | ||||
self.src_node = self.node_of_swhid(src) | self.check_swhid(self.src) | ||||
self.edges = self.get_edges() | self.edges = self.get_edges() | ||||
self.direction = self.get_direction() | self.direction = self.get_direction() | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
cnt = await loop.run_in_executor( | cnt = await loop.run_in_executor( | ||||
None, | None, | ||||
self.backend.count, | self.backend.count, | ||||
self.count_type, | self.count_type, | ||||
self.direction, | self.direction, | ||||
self.edges, | self.edges, | ||||
self.src_node, | self.src, | ||||
) | ) | ||||
return aiohttp.web.Response(body=str(cnt), content_type="application/json") | return aiohttp.web.Response(body=str(cnt), content_type="application/json") | ||||
class CountNeighborsView(CountView): | class CountNeighborsView(CountView): | ||||
count_type = "neighbors" | count_type = "neighbors" | ||||
Show All 15 Lines | app.add_routes( | ||||
[ | [ | ||||
aiohttp.web.get("/", index), | aiohttp.web.get("/", index), | ||||
aiohttp.web.get("/graph", index), | aiohttp.web.get("/graph", index), | ||||
aiohttp.web.view("/graph/stats", StatsView), | aiohttp.web.view("/graph/stats", StatsView), | ||||
aiohttp.web.view("/graph/leaves/{src}", LeavesView), | aiohttp.web.view("/graph/leaves/{src}", LeavesView), | ||||
aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), | aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), | ||||
aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), | aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), | ||||
aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), | aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), | ||||
aiohttp.web.view("/graph/visit/paths/{src}", VisitPathsView), | |||||
# temporarily disabled in wait of a proper fix for T1969 | # temporarily disabled in wait of a proper fix for T1969 | ||||
# aiohttp.web.view("/graph/walk/{src}/{dst}", WalkView) | # aiohttp.web.view("/graph/walk/{src}/{dst}", WalkView) | ||||
aiohttp.web.view("/graph/randomwalk/{src}/{dst}", RandomWalkView), | aiohttp.web.view("/graph/randomwalk/{src}/{dst}", RandomWalkView), | ||||
aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), | aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), | ||||
aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), | aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), | ||||
aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), | aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), | ||||
] | ] | ||||
) | ) | ||||
Show All 12 Lines |