Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/http_server.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | ||||
FIFO as a transport to stream integers between the two languages. | FIFO as a transport to stream integers between the two languages. | ||||
""" | """ | ||||
Show All 26 Lines | except ImportError: | ||||
# Compatibility with 3.6 backport | # Compatibility with 3.6 backport | ||||
from async_generator import asynccontextmanager # type: ignore | from async_generator import asynccontextmanager # type: ignore | ||||
# maximum number of retries for random walks | # maximum number of retries for random walks | ||||
RANDOM_RETRIES = 10 # TODO make this configurable via rpc-serve configuration | RANDOM_RETRIES = 10 # TODO make this configurable via rpc-serve configuration | ||||
async def _aiorpcerror_middleware(app, handler): | |||||
async def middleware_handler(request): | |||||
try: | |||||
return await handler(request) | |||||
except grpc.aio.AioRpcError as e: | |||||
# The default error handler of the RPC framework tries to serialize this | |||||
# with msgpack; which for some unknown reason causes it to raise | |||||
# ValueError("recursion limit exceeded") with a lot of context, causing | |||||
# Sentry to be overflowed with gigabytes of logs (160KB per event, with | |||||
# potentially hundreds of thousands of events per day). | |||||
# Instead, we simply serialize the exception to a string. | |||||
# https://sentry.softwareheritage.org/share/issue/d6d4db971e4b47728a6c1dd06cb9b8a5/ | |||||
raise aiohttp.web.HTTPServiceUnavailable(text=str(e)) | |||||
return middleware_handler | |||||
class GraphServerApp(RPCServerApp): | class GraphServerApp(RPCServerApp): | ||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, middlewares=(), **kwargs): | ||||
super().__init__(*args, **kwargs) | middlewares = (_aiorpcerror_middleware,) + middlewares | ||||
super().__init__(*args, middlewares=middlewares, **kwargs) | |||||
self.on_startup.append(self._start) | self.on_startup.append(self._start) | ||||
self.on_shutdown.append(self._stop) | self.on_shutdown.append(self._stop) | ||||
@staticmethod | @staticmethod | ||||
async def _start(app): | async def _start(app): | ||||
app["channel"] = grpc.aio.insecure_channel(app["rpc_url"]) | app["channel"] = grpc.aio.insecure_channel(app["rpc_url"]) | ||||
await app["channel"].__aenter__() | await app["channel"].__aenter__() | ||||
app["rpc_client"] = TraversalServiceStub(app["channel"]) | app["rpc_client"] = TraversalServiceStub(app["channel"]) | ||||
▲ Show 20 Lines • Show All 180 Lines • ▼ Show 20 Lines | async def prepare_response(self): | ||||
direction=self.get_direction(), | direction=self.get_direction(), | ||||
return_nodes=NodeFilter(types=self.get_return_types()), | return_nodes=NodeFilter(types=self.get_return_types()), | ||||
mask=FieldMask(paths=["swhid"]), | mask=FieldMask(paths=["swhid"]), | ||||
) | ) | ||||
if self.get_max_edges(): | if self.get_max_edges(): | ||||
self.traversal_request.max_edges = self.get_max_edges() | self.traversal_request.max_edges = self.get_max_edges() | ||||
await self.check_swhid(src) | await self.check_swhid(src) | ||||
self.configure_request() | self.configure_request() | ||||
self.nodes_stream = self.rpc_client.Traverse(self.traversal_request) | |||||
# Force gRPC to query the server and fetch the first nodes; so errors | |||||
# are raised early, so we can return HTTP 503 before HTTP 200 | |||||
await self.nodes_stream.wait_for_connection() | |||||
def configure_request(self): | def configure_request(self): | ||||
pass | pass | ||||
async def stream_response(self): | async def stream_response(self): | ||||
async for node in self.rpc_client.Traverse(self.traversal_request): | async for node in self.nodes_stream: | ||||
await self.stream_line(node.swhid) | await self.stream_line(node.swhid) | ||||
class LeavesView(SimpleTraversalView): | class LeavesView(SimpleTraversalView): | ||||
def configure_request(self): | def configure_request(self): | ||||
self.traversal_request.return_nodes.max_traversal_successors = 0 | self.traversal_request.return_nodes.max_traversal_successors = 0 | ||||
▲ Show 20 Lines • Show All 93 Lines • Show Last 20 Lines |