Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/http_rpc_server.py
- This file was moved from swh/graph/http_server.py.
# Copyright (C) 2019-2022 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||
""" | """ | ||||||||||||||||
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using | ||||||||||||||||
FIFO as a transport to stream integers between the two languages. | FIFO as a transport to stream integers between the two languages. | ||||||||||||||||
""" | """ | ||||||||||||||||
import json | import json | ||||||||||||||||
import logging | |||||||||||||||||
import os | import os | ||||||||||||||||
from typing import Optional | from typing import Optional | ||||||||||||||||
import aiohttp.test_utils | import aiohttp.test_utils | ||||||||||||||||
import aiohttp.web | import aiohttp.web | ||||||||||||||||
from google.protobuf import json_format | from google.protobuf import json_format | ||||||||||||||||
from google.protobuf.field_mask_pb2 import FieldMask | from google.protobuf.field_mask_pb2 import FieldMask | ||||||||||||||||
import grpc | import grpc | ||||||||||||||||
from swh.core.api.asynchronous import RPCServerApp | from swh.core.api.asynchronous import RPCServerApp | ||||||||||||||||
from swh.core.config import read as config_read | from swh.core.config import read as config_read | ||||||||||||||||
from swh.graph.rpc.swhgraph_pb2 import ( | from swh.graph.grpc.swhgraph_pb2 import ( | ||||||||||||||||
GetNodeRequest, | GetNodeRequest, | ||||||||||||||||
NodeFilter, | NodeFilter, | ||||||||||||||||
StatsRequest, | StatsRequest, | ||||||||||||||||
TraversalRequest, | TraversalRequest, | ||||||||||||||||
) | ) | ||||||||||||||||
from swh.graph.rpc.swhgraph_pb2_grpc import TraversalServiceStub | from swh.graph.grpc.swhgraph_pb2_grpc import TraversalServiceStub | ||||||||||||||||
from swh.graph.rpc_server import spawn_java_rpc_server, stop_java_rpc_server | from swh.graph.grpc_server import spawn_java_grpc_server, stop_java_grpc_server | ||||||||||||||||
from swh.model.swhids import EXTENDED_SWHID_TYPES | from swh.model.swhids import EXTENDED_SWHID_TYPES | ||||||||||||||||
try: | try: | ||||||||||||||||
from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||||||||||||||
except ImportError: | except ImportError: | ||||||||||||||||
# Compatibility with 3.6 backport | # Compatibility with 3.6 backport | ||||||||||||||||
from async_generator import asynccontextmanager # type: ignore | from async_generator import asynccontextmanager # type: ignore | ||||||||||||||||
# maximum number of retries for random walks | # maximum number of retries for random walks | ||||||||||||||||
RANDOM_RETRIES = 10 # TODO make this configurable via rpc-serve configuration | RANDOM_RETRIES = 10 # TODO make this configurable via rpc-serve configuration | ||||||||||||||||
logger = logging.getLogger(__name__) | |||||||||||||||||
async def _aiorpcerror_middleware(app, handler): | async def _aiorpcerror_middleware(app, handler): | ||||||||||||||||
async def middleware_handler(request): | async def middleware_handler(request): | ||||||||||||||||
try: | try: | ||||||||||||||||
return await handler(request) | return await handler(request) | ||||||||||||||||
except grpc.aio.AioRpcError as e: | except grpc.aio.AioRpcError as e: | ||||||||||||||||
# The default error handler of the RPC framework tries to serialize this | # The default error handler of the RPC framework tries to serialize this | ||||||||||||||||
# with msgpack; which for some unknown reason causes it to raise | # with msgpack; which for some unknown reason causes it to raise | ||||||||||||||||
Show All 20 Lines | async def _start(app): | ||||||||||||||||
await app["channel"].__aenter__() | await app["channel"].__aenter__() | ||||||||||||||||
app["rpc_client"] = TraversalServiceStub(app["channel"]) | app["rpc_client"] = TraversalServiceStub(app["channel"]) | ||||||||||||||||
await app["rpc_client"].Stats(StatsRequest(), wait_for_ready=True) | await app["rpc_client"].Stats(StatsRequest(), wait_for_ready=True) | ||||||||||||||||
@staticmethod | @staticmethod | ||||||||||||||||
async def _stop(app): | async def _stop(app): | ||||||||||||||||
await app["channel"].__aexit__(None, None, None) | await app["channel"].__aexit__(None, None, None) | ||||||||||||||||
if app.get("local_server"): | if app.get("local_server"): | ||||||||||||||||
stop_java_rpc_server(app["local_server"]) | stop_java_grpc_server(app["local_server"]) | ||||||||||||||||
async def index(request): | async def index(request): | ||||||||||||||||
return aiohttp.web.Response( | return aiohttp.web.Response( | ||||||||||||||||
content_type="text/html", | content_type="text/html", | ||||||||||||||||
body="""<html> | body="""<html> | ||||||||||||||||
<head><title>Software Heritage graph server</title></head> | <head><title>Software Heritage graph server</title></head> | ||||||||||||||||
<body> | <body> | ||||||||||||||||
▲ Show 20 Lines • Show All 239 Lines • ▼ Show 20 Lines | class CountLeavesView(CountView): | ||||||||||||||||
def configure_request(self): | def configure_request(self): | ||||||||||||||||
self.traversal_request.return_nodes.max_traversal_successors = 0 | self.traversal_request.return_nodes.max_traversal_successors = 0 | ||||||||||||||||
class CountVisitNodesView(CountView): | class CountVisitNodesView(CountView): | ||||||||||||||||
pass | pass | ||||||||||||||||
def make_app(config=None, rpc_url=None, spawn_rpc_port=50091, **kwargs): | def make_app(config=None): | ||||||||||||||||
app = GraphServerApp(**kwargs) | """Create an aiohttp server for the HTTP RPC frontend to the swh-graph API. | ||||||||||||||||
if rpc_url is None: | It may either connect to an existing grpc server (cls="remote") or spawn a | ||||||||||||||||
app["local_server"], port = spawn_java_rpc_server(config, port=spawn_rpc_port) | local grpc server (cls="local"). | ||||||||||||||||
rpc_url = f"localhost:{port}" | |||||||||||||||||
vlorentzUnsubmitted Not Done Inline Actions
vlorentz: | |||||||||||||||||
Done Inline Actionssome day I'll remember this... douardda: some day I'll remember this... | |||||||||||||||||
``config`` is expected to be a dict like:: | |||||||||||||||||
graph: | |||||||||||||||||
cls: "local" | |||||||||||||||||
grpc_server: | |||||||||||||||||
port: 50091 | |||||||||||||||||
http_rpc_server: | |||||||||||||||||
debug: true | |||||||||||||||||
or:: | |||||||||||||||||
graph: | |||||||||||||||||
cls: "remote" | |||||||||||||||||
url: "localhost:50091" | |||||||||||||||||
http_rpc_server: | |||||||||||||||||
debug: true | |||||||||||||||||
See: | |||||||||||||||||
- :mod:`swh.graph.grpc_server` for more details of the content of the | |||||||||||||||||
grpc_server section, | |||||||||||||||||
- :class:`~.GraphServerApp` class for more details of the content of the | |||||||||||||||||
Not Done Inline Actions
vlorentz: | |||||||||||||||||
http_rpc_server section. | |||||||||||||||||
""" | |||||||||||||||||
if config is None: | |||||||||||||||||
config = {} | |||||||||||||||||
if "graph" not in config: | |||||||||||||||||
logger.info( | |||||||||||||||||
"Missing 'graph' configuration; default to a locally spawn" | |||||||||||||||||
"grpc server listening on 0.0.0.0:50091" | |||||||||||||||||
) | |||||||||||||||||
cfg = {"cls": "local", "grpc_server": {"port": 50091}} | |||||||||||||||||
else: | |||||||||||||||||
cfg = config["graph"].copy() | |||||||||||||||||
cls = cfg.pop("cls") | |||||||||||||||||
grpc_cfg = cfg.pop("grpc_server", {}) | |||||||||||||||||
app = GraphServerApp(**cfg.get("http_rpc_server", {})) | |||||||||||||||||
if cls == "remote": | |||||||||||||||||
if "url" not in cfg: | |||||||||||||||||
raise KeyError("Missing 'url' configuration entry in the [graph] section") | |||||||||||||||||
rpc_url = cfg["url"] | |||||||||||||||||
elif cls == "local": | |||||||||||||||||
app["local_server"], port = spawn_java_grpc_server(**grpc_cfg) | |||||||||||||||||
rpc_url = f"localhost:{port}" | |||||||||||||||||
else: | |||||||||||||||||
raise ValueError(f"Unknown swh.graph class cls={cls}") | |||||||||||||||||
app.add_routes( | app.add_routes( | ||||||||||||||||
[ | [ | ||||||||||||||||
aiohttp.web.get("/", index), | aiohttp.web.get("/", index), | ||||||||||||||||
aiohttp.web.get("/graph", index), | aiohttp.web.get("/graph", index), | ||||||||||||||||
aiohttp.web.view("/graph/stats", StatsView), | aiohttp.web.view("/graph/stats", StatsView), | ||||||||||||||||
aiohttp.web.view("/graph/leaves/{src}", LeavesView), | aiohttp.web.view("/graph/leaves/{src}", LeavesView), | ||||||||||||||||
aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), | aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), | ||||||||||||||||
aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), | aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), | ||||||||||||||||
Show All 16 Lines |