diff --git a/swh/core/api/__init__.py b/swh/core/api/__init__.py index 8ffc3b0..e41ef11 100644 --- a/swh/core/api/__init__.py +++ b/swh/core/api/__init__.py @@ -1,456 +1,464 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import abc import functools import inspect import logging import pickle from typing import ( Any, Callable, ClassVar, Dict, List, Optional, Tuple, Type, TypeVar, Union, ) from flask import Flask, Request, Response, abort, request import requests from werkzeug.exceptions import HTTPException from .negotiation import Formatter as FormatterBase from .negotiation import Negotiator as NegotiatorBase from .negotiation import negotiate as _negotiate from .serializers import ( exception_to_dict, json_dumps, json_loads, msgpack_dumps, msgpack_loads, ) from .serializers import decode_response from .serializers import encode_data_client as encode_data logger = logging.getLogger(__name__) # support for content negotiation class Negotiator(NegotiatorBase): def best_mimetype(self): return request.accept_mimetypes.best_match( self.accept_mimetypes, "application/json" ) def _abort(self, status_code, err=None): return abort(status_code, err) def negotiate(formatter_cls, *args, **kwargs): return _negotiate(Negotiator, formatter_cls, *args, **kwargs) class Formatter(FormatterBase): def _make_response(self, body, content_type): return Response(body, content_type=content_type) def configure(self, extra_encoders=None): self.extra_encoders = extra_encoders class JSONFormatter(Formatter): format = "json" mimetypes = ["application/json"] def render(self, obj): return json_dumps(obj, extra_encoders=self.extra_encoders) class MsgpackFormatter(Formatter): format = "msgpack" mimetypes = ["application/x-msgpack"] def render(self, obj): return msgpack_dumps(obj, extra_encoders=self.extra_encoders) # base API classes class RemoteException(Exception): """raised when remote returned an out-of-band failure notification, e.g., as a HTTP status code or serialized exception Attributes: response: HTTP response corresponding to the failure """ def __init__( self, payload: Optional[Any] = None, response: Optional[requests.Response] = None, ): if payload is not None: super().__init__(payload) else: super().__init__() self.response = response def __str__(self): if ( self.args and isinstance(self.args[0], dict) and "type" in self.args[0] and "args" in self.args[0] ): return ( f"' ) else: return super().__str__() F = TypeVar("F", bound=Callable) def remote_api_endpoint(path: str, method: str = "POST") -> Callable[[F], F]: def dec(f: F) -> F: f._endpoint_path = path # type: ignore f._method = method # type: ignore return f return dec class APIError(Exception): """API Error""" def __str__(self): return "An unexpected error occurred in the backend: {}".format(self.args) class MetaRPCClient(type): """Metaclass for RPCClient, which adds a method for each endpoint of the database it is designed to access. See for example :class:`swh.indexer.storage.api.client.RemoteStorage`""" def __new__(cls, name, bases, attributes): # For each method wrapped with @remote_api_endpoint in an API backend # (eg. :class:`swh.indexer.storage.IndexerStorage`), add a new # method in RemoteStorage, with the same documentation. # # Note that, despite the usage of decorator magic (eg. functools.wrap), # this never actually calls an IndexerStorage method. backend_class = attributes.get("backend_class", None) for base in bases: if backend_class is not None: break backend_class = getattr(base, "backend_class", None) if backend_class: for (meth_name, meth) in backend_class.__dict__.items(): if hasattr(meth, "_endpoint_path"): cls.__add_endpoint(meth_name, meth, attributes) return super().__new__(cls, name, bases, attributes) @staticmethod def __add_endpoint(meth_name, meth, attributes): wrapped_meth = inspect.unwrap(meth) @functools.wraps(meth) # Copy signature and doc def meth_(*args, **kwargs): # Match arguments and parameters post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) # Remove arguments that should not be passed self = post_data.pop("self") post_data.pop("cur", None) post_data.pop("db", None) # Send the request. return self.post(meth._endpoint_path, post_data) if meth_name not in attributes: attributes[meth_name] = meth_ class RPCClient(metaclass=MetaRPCClient): """Proxy to an internal SWH RPC """ backend_class = None # type: ClassVar[Optional[type]] """For each method of `backend_class` decorated with :func:`remote_api_endpoint`, a method with the same prototype and docstring will be added to this class. Calls to this new method will be translated into HTTP requests to a remote server. This backend class will never be instantiated, it only serves as a template.""" api_exception = APIError # type: ClassVar[Type[Exception]] """The exception class to raise in case of communication error with the server.""" reraise_exceptions: ClassVar[List[Type[Exception]]] = [] """On server errors, if any of the exception classes in this list has the same name as the error name, then the exception will be instantiated and raised instead of a generic RemoteException.""" extra_type_encoders: List[Tuple[type, str, Callable]] = [] """Value of `extra_encoders` passed to `json_dumps` or `msgpack_dumps` to be able to serialize more object types.""" extra_type_decoders: Dict[str, Callable] = {} """Value of `extra_decoders` passed to `json_loads` or `msgpack_loads` to be able to deserialize more object types.""" def __init__( self, url, api_exception=None, timeout=None, chunk_size=4096, reraise_exceptions=None, **kwargs, ): if api_exception: self.api_exception = api_exception if reraise_exceptions: self.reraise_exceptions = reraise_exceptions base_url = url if url.endswith("/") else url + "/" self.url = base_url self.session = requests.Session() adapter = requests.adapters.HTTPAdapter( max_retries=kwargs.get("max_retries", 3), pool_connections=kwargs.get("pool_connections", 20), pool_maxsize=kwargs.get("pool_maxsize", 100), ) self.session.mount(self.url, adapter) self.timeout = timeout self.chunk_size = chunk_size def _url(self, endpoint): return "%s%s" % (self.url, endpoint) def raw_verb(self, verb, endpoint, **opts): if "chunk_size" in opts: # if the chunk_size argument has been passed, consider the user # also wants stream=True, otherwise, what's the point. opts["stream"] = True if self.timeout and "timeout" not in opts: opts["timeout"] = self.timeout try: return getattr(self.session, verb)(self._url(endpoint), **opts) except requests.exceptions.ConnectionError as e: raise self.api_exception(e) def post(self, endpoint, data, **opts): if isinstance(data, (abc.Iterator, abc.Generator)): data = (self._encode_data(x) for x in data) else: data = self._encode_data(data) chunk_size = opts.pop("chunk_size", self.chunk_size) response = self.raw_verb( "post", endpoint, data=data, headers={ "content-type": "application/x-msgpack", "accept": "application/x-msgpack", }, **opts, ) if opts.get("stream") or response.headers.get("transfer-encoding") == "chunked": self.raise_for_status(response) return response.iter_content(chunk_size) else: return self._decode_response(response) def _encode_data(self, data): return encode_data(data, extra_encoders=self.extra_type_encoders) post_stream = post def get(self, endpoint, **opts): chunk_size = opts.pop("chunk_size", self.chunk_size) response = self.raw_verb( "get", endpoint, headers={"accept": "application/x-msgpack"}, **opts ) if opts.get("stream") or response.headers.get("transfer-encoding") == "chunked": self.raise_for_status(response) return response.iter_content(chunk_size) else: return self._decode_response(response) def get_stream(self, endpoint, **opts): return self.get(endpoint, stream=True, **opts) def raise_for_status(self, response) -> None: """check response HTTP status code and raise an exception if it denotes an error; do nothing otherwise """ status_code = response.status_code status_class = response.status_code // 100 if status_code == 404: raise RemoteException(payload="404 not found", response=response) exception = None # TODO: only old servers send pickled error; stop trying to unpickle # after they are all upgraded try: if status_class == 4: data = self._decode_response(response, check_status=False) if isinstance(data, dict): for exc_type in self.reraise_exceptions: - if exc_type.__name__ == data["exception"]["type"]: - exception = exc_type(*data["exception"]["args"]) + if exc_type.__name__ == data["type"]: + exception = exc_type(*data["args"]) break - else: + # old dict encoded exception schema + # TODO: Remove that code once all servers are using new schema + if "exception" in data: exception = RemoteException( payload=data["exception"], response=response ) + else: + exception = RemoteException(payload=data, response=response) else: exception = pickle.loads(data) elif status_class == 5: data = self._decode_response(response, check_status=False) if "exception_pickled" in data: exception = pickle.loads(data["exception_pickled"]) - else: + # old dict encoded exception schema + # TODO: Remove that code once all servers are using new schema + elif "exception" in data: exception = RemoteException( payload=data["exception"], response=response ) + else: + exception = RemoteException(payload=data, response=response) except (TypeError, pickle.UnpicklingError): raise RemoteException(payload=data, response=response) if exception: raise exception from None if status_class != 2: raise RemoteException( payload=f"API HTTP error: {status_code} {response.content}", response=response, ) def _decode_response(self, response, check_status=True): if check_status: self.raise_for_status(response) return decode_response(response, extra_decoders=self.extra_type_decoders) def __repr__(self): return "<{} url={}>".format(self.__class__.__name__, self.url) class BytesRequest(Request): """Request with proper escaping of arbitrary byte sequences.""" encoding = "utf-8" encoding_errors = "surrogateescape" ENCODERS: Dict[str, Callable[[Any], Union[bytes, str]]] = { "application/x-msgpack": msgpack_dumps, "application/json": json_dumps, } def encode_data_server( data, content_type="application/x-msgpack", extra_type_encoders=None ): encoded_data = ENCODERS[content_type](data, extra_encoders=extra_type_encoders) return Response(encoded_data, mimetype=content_type,) def decode_request(request, extra_decoders=None): content_type = request.mimetype data = request.get_data() if not data: return {} if content_type == "application/x-msgpack": r = msgpack_loads(data, extra_decoders=extra_decoders) elif content_type == "application/json": # XXX this .decode() is needed for py35. # Should not be needed any more with py37 r = json_loads(data.decode("utf-8"), extra_decoders=extra_decoders) else: raise ValueError("Wrong content type `%s` for API request" % content_type) return r def error_handler(exception, encoder, status_code=500): logging.exception(exception) response = encoder(exception_to_dict(exception)) if isinstance(exception, HTTPException): response.status_code = exception.code else: # TODO: differentiate between server errors and client errors response.status_code = status_code return response class RPCServerApp(Flask): """For each endpoint of the given `backend_class`, tells app.route to call a function that decodes the request and sends it to the backend object provided by the factory. :param Any backend_class: The class of the backend, which will be analyzed to look for API endpoints. :param Optional[Callable[[], backend_class]] backend_factory: A function with no argument that returns an instance of `backend_class`. If unset, defaults to calling `backend_class` constructor directly. """ request_class = BytesRequest extra_type_encoders: List[Tuple[type, str, Callable]] = [] """Value of `extra_encoders` passed to `json_dumps` or `msgpack_dumps` to be able to serialize more object types.""" extra_type_decoders: Dict[str, Callable] = {} """Value of `extra_decoders` passed to `json_loads` or `msgpack_loads` to be able to deserialize more object types.""" def __init__(self, *args, backend_class=None, backend_factory=None, **kwargs): super().__init__(*args, **kwargs) if backend_class is None and backend_factory is not None: raise ValueError( "backend_factory should only be provided if backend_class is" ) self.backend_class = backend_class if backend_class is not None: backend_factory = backend_factory or backend_class for (meth_name, meth) in backend_class.__dict__.items(): if hasattr(meth, "_endpoint_path"): self.__add_endpoint(meth_name, meth, backend_factory) def __add_endpoint(self, meth_name, meth, backend_factory): from flask import request @self.route("/" + meth._endpoint_path, methods=["POST"]) @negotiate(MsgpackFormatter, extra_encoders=self.extra_type_encoders) @negotiate(JSONFormatter, extra_encoders=self.extra_type_encoders) @functools.wraps(meth) # Copy signature and doc def _f(): # Call the actual code obj_meth = getattr(backend_factory(), meth_name) kw = decode_request(request, extra_decoders=self.extra_type_decoders) return obj_meth(**kw) diff --git a/swh/core/api/serializers.py b/swh/core/api/serializers.py index 3b6c139..16298c2 100644 --- a/swh/core/api/serializers.py +++ b/swh/core/api/serializers.py @@ -1,293 +1,300 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import datetime from enum import Enum import json import traceback import types from typing import Any, Dict, Tuple, Union from uuid import UUID import arrow import iso8601 import msgpack from requests import Response from swh.core.api.classes import PagedResult def encode_datetime(dt: datetime.datetime) -> str: """Wrapper of datetime.datetime.isoformat() that forbids naive datetimes.""" if dt.tzinfo is None: raise ValueError(f"{dt} is a naive datetime.") return dt.isoformat() def _encode_paged_result(obj: PagedResult) -> Dict[str, Any]: """Serialize PagedResult to a Dict.""" return { "results": obj.results, "next_page_token": obj.next_page_token, } def _decode_paged_result(obj: Dict[str, Any]) -> PagedResult: """Deserialize Dict into PagedResult""" return PagedResult(results=obj["results"], next_page_token=obj["next_page_token"],) +def exception_to_dict(exception: Exception) -> Dict[str, Any]: + tb = traceback.format_exception(None, exception, exception.__traceback__) + exc_type = type(exception) + return { + "type": exc_type.__name__, + "module": exc_type.__module__, + "args": exception.args, + "message": str(exception), + "traceback": tb, + } + + +def dict_to_exception(exc_dict: Dict[str, Any]) -> Exception: + temp = __import__(exc_dict["module"], fromlist=[exc_dict["type"]]) + return getattr(temp, exc_dict["type"])(*exc_dict["args"]) + + ENCODERS = [ (arrow.Arrow, "arrow", arrow.Arrow.isoformat), (datetime.datetime, "datetime", encode_datetime), ( datetime.timedelta, "timedelta", lambda o: { "days": o.days, "seconds": o.seconds, "microseconds": o.microseconds, }, ), (UUID, "uuid", str), (PagedResult, "paged_result", _encode_paged_result), # Only for JSON: (bytes, "bytes", lambda o: base64.b85encode(o).decode("ascii")), + (Exception, "exception", exception_to_dict), ] DECODERS = { "arrow": arrow.get, "datetime": lambda d: iso8601.parse_date(d, default_timezone=None), "timedelta": lambda d: datetime.timedelta(**d), "uuid": UUID, "paged_result": _decode_paged_result, # Only for JSON: "bytes": base64.b85decode, + "exception": dict_to_exception, } class MsgpackExtTypeCodes(Enum): LONG_INT = 1 def encode_data_client(data: Any, extra_encoders=None) -> bytes: try: return msgpack_dumps(data, extra_encoders=extra_encoders) except OverflowError as e: raise ValueError("Limits were reached. Please, check your input.\n" + str(e)) def decode_response(response: Response, extra_decoders=None) -> Any: content_type = response.headers["content-type"] if content_type.startswith("application/x-msgpack"): r = msgpack_loads(response.content, extra_decoders=extra_decoders) elif content_type.startswith("application/json"): r = json_loads(response.text, extra_decoders=extra_decoders) elif content_type.startswith("text/"): r = response.text else: raise ValueError("Wrong content type `%s` for API response" % content_type) return r class SWHJSONEncoder(json.JSONEncoder): """JSON encoder for data structures generated by Software Heritage. This JSON encoder extends the default Python JSON encoder and adds awareness for the following specific types: - bytes (get encoded as a Base85 string); - datetime.datetime (get encoded as an ISO8601 string). Non-standard types get encoded as a a dictionary with two keys: - swhtype with value 'bytes' or 'datetime'; - d containing the encoded value. SWHJSONEncoder also encodes arbitrary iterables as a list (allowing serialization of generators). Caveats: Limitations in the JSONEncoder extension mechanism prevent us from "escaping" dictionaries that only contain the swhtype and d keys, and therefore arbitrary data structures can't be round-tripped through SWHJSONEncoder and SWHJSONDecoder. """ def __init__(self, extra_encoders=None, **kwargs): super().__init__(**kwargs) self.encoders = ENCODERS if extra_encoders: self.encoders += extra_encoders def default(self, o: Any) -> Union[Dict[str, Union[Dict[str, int], str]], list]: for (type_, type_name, encoder) in self.encoders: if isinstance(o, type_): return { "swhtype": type_name, "d": encoder(o), } try: return super().default(o) except TypeError as e: try: iterable = iter(o) except TypeError: raise e from None else: return list(iterable) class SWHJSONDecoder(json.JSONDecoder): """JSON decoder for data structures encoded with SWHJSONEncoder. This JSON decoder extends the default Python JSON decoder, allowing the decoding of: - bytes (encoded as a Base85 string); - datetime.datetime (encoded as an ISO8601 string). Non-standard types must be encoded as a a dictionary with exactly two keys: - swhtype with value 'bytes' or 'datetime'; - d containing the encoded value. To limit the impact our encoding, if the swhtype key doesn't contain a known value, the dictionary is decoded as-is. """ def __init__(self, extra_decoders=None, **kwargs): super().__init__(**kwargs) self.decoders = DECODERS if extra_decoders: self.decoders = {**self.decoders, **extra_decoders} def decode_data(self, o: Any) -> Any: if isinstance(o, dict): if set(o.keys()) == {"d", "swhtype"}: if o["swhtype"] == "bytes": return base64.b85decode(o["d"]) decoder = self.decoders.get(o["swhtype"]) if decoder: return decoder(self.decode_data(o["d"])) return {key: self.decode_data(value) for key, value in o.items()} if isinstance(o, list): return [self.decode_data(value) for value in o] else: return o def raw_decode(self, s: str, idx: int = 0) -> Tuple[Any, int]: data, index = super().raw_decode(s, idx) return self.decode_data(data), index def json_dumps(data: Any, extra_encoders=None) -> str: return json.dumps(data, cls=SWHJSONEncoder, extra_encoders=extra_encoders) def json_loads(data: str, extra_decoders=None) -> Any: return json.loads(data, cls=SWHJSONDecoder, extra_decoders=extra_decoders) def msgpack_dumps(data: Any, extra_encoders=None) -> bytes: """Write data as a msgpack stream""" encoders = ENCODERS if extra_encoders: encoders += extra_encoders def encode_types(obj): if isinstance(obj, int): # integer overflowed while packing. Handle it as an extended type length, rem = divmod(obj.bit_length(), 8) if rem: length += 1 return msgpack.ExtType( MsgpackExtTypeCodes.LONG_INT.value, int.to_bytes(obj, length, "big") ) if isinstance(obj, types.GeneratorType): return list(obj) for (type_, type_name, encoder) in encoders: if isinstance(obj, type_): return { b"swhtype": type_name, b"d": encoder(obj), } return obj return msgpack.packb(data, use_bin_type=True, default=encode_types) def msgpack_loads(data: bytes, extra_decoders=None) -> Any: """Read data as a msgpack stream. .. Caution:: This function is used by swh.journal to decode the contents of the journal. This function **must** be kept backwards-compatible. """ decoders = DECODERS if extra_decoders: decoders = {**decoders, **extra_decoders} def ext_hook(code, data): if code == MsgpackExtTypeCodes.LONG_INT.value: return int.from_bytes(data, "big") raise ValueError("Unknown msgpack extended code %s" % code) def decode_types(obj): # Support for current encodings if set(obj.keys()) == {b"d", b"swhtype"}: decoder = decoders.get(obj[b"swhtype"]) if decoder: return decoder(obj[b"d"]) # Support for legacy encodings if b"__datetime__" in obj and obj[b"__datetime__"]: return iso8601.parse_date(obj[b"s"], default_timezone=None) if b"__uuid__" in obj and obj[b"__uuid__"]: return UUID(obj[b"s"]) if b"__timedelta__" in obj and obj[b"__timedelta__"]: return datetime.timedelta(**obj[b"s"]) if b"__arrow__" in obj and obj[b"__arrow__"]: return arrow.get(obj[b"s"]) # Fallthrough return obj try: try: return msgpack.unpackb( data, raw=False, object_hook=decode_types, ext_hook=ext_hook, strict_map_key=False, ) except TypeError: # msgpack < 0.6.0 return msgpack.unpackb( data, raw=False, object_hook=decode_types, ext_hook=ext_hook ) except TypeError: # msgpack < 0.5.2 return msgpack.unpackb( data, encoding="utf-8", object_hook=decode_types, ext_hook=ext_hook ) - - -def exception_to_dict(exception): - tb = traceback.format_exception(None, exception, exception.__traceback__) - return { - "exception": { - "type": type(exception).__name__, - "args": exception.args, - "message": str(exception), - "traceback": tb, - } - } diff --git a/swh/core/api/tests/test_async.py b/swh/core/api/tests/test_async.py index dafbb81..1fad189 100644 --- a/swh/core/api/tests/test_async.py +++ b/swh/core/api/tests/test_async.py @@ -1,256 +1,256 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json import msgpack import pytest from swh.core.api.asynchronous import ( Response, RPCServerApp, decode_data, decode_request, encode_msgpack, ) from swh.core.api.serializers import SWHJSONEncoder, json_dumps, msgpack_dumps pytest_plugins = ["aiohttp.pytest_plugin", "pytester"] class TestServerException(Exception): pass class TestClientError(Exception): pass async def root(request): return Response("toor") STRUCT = { "txt": "something stupid", # 'date': datetime.date(2019, 6, 9), # not supported "datetime": datetime.datetime(2019, 6, 9, 10, 12, tzinfo=datetime.timezone.utc), "timedelta": datetime.timedelta(days=-2, hours=3), "int": 42, "float": 3.14, "subdata": { "int": 42, "datetime": datetime.datetime( 2019, 6, 10, 11, 12, tzinfo=datetime.timezone.utc ), }, "list": [ 42, datetime.datetime(2019, 9, 10, 11, 12, tzinfo=datetime.timezone.utc), "ok", ], } async def struct(request): return Response(STRUCT) async def echo(request): data = await decode_request(request) return Response(data) async def server_exception(request): raise TestServerException() async def client_error(request): raise TestClientError() async def echo_no_nego(request): # let the content negotiation handle the serialization for us... data = await decode_request(request) ret = encode_msgpack(data) return ret def check_mimetype(src, dst): src = src.split(";")[0].strip() dst = dst.split(";")[0].strip() assert src == dst @pytest.fixture def async_app(): app = RPCServerApp() app.client_exception_classes = (TestClientError,) app.router.add_route("GET", "/", root) app.router.add_route("GET", "/struct", struct) app.router.add_route("POST", "/echo", echo) app.router.add_route("GET", "/server_exception", server_exception) app.router.add_route("GET", "/client_error", client_error) app.router.add_route("POST", "/echo-no-nego", echo_no_nego) return app @pytest.fixture def cli(async_app, aiohttp_client, loop): return loop.run_until_complete(aiohttp_client(async_app)) async def test_get_simple(cli) -> None: resp = await cli.get("/") assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/x-msgpack") data = await resp.read() value = msgpack.unpackb(data, raw=False) assert value == "toor" async def test_get_server_exception(cli) -> None: resp = await cli.get("/server_exception") assert resp.status == 500 data = await resp.read() data = msgpack.unpackb(data, raw=False) - assert data["exception"]["type"] == "TestServerException" + assert data["type"] == "TestServerException" async def test_get_client_error(cli) -> None: resp = await cli.get("/client_error") assert resp.status == 400 data = await resp.read() data = msgpack.unpackb(data, raw=False) - assert data["exception"]["type"] == "TestClientError" + assert data["type"] == "TestClientError" async def test_get_simple_nego(cli) -> None: for ctype in ("x-msgpack", "json"): resp = await cli.get("/", headers={"Accept": "application/%s" % ctype}) assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/%s" % ctype) assert (await decode_request(resp)) == "toor" async def test_get_struct(cli) -> None: """Test returned structured from a simple GET data is OK""" resp = await cli.get("/struct") assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/x-msgpack") assert (await decode_request(resp)) == STRUCT async def test_get_struct_nego(cli) -> None: """Test returned structured from a simple GET data is OK""" for ctype in ("x-msgpack", "json"): resp = await cli.get("/struct", headers={"Accept": "application/%s" % ctype}) assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/%s" % ctype) assert (await decode_request(resp)) == STRUCT async def test_post_struct_msgpack(cli) -> None: """Test that msgpack encoded posted struct data is returned as is""" # simple struct resp = await cli.post( "/echo", headers={"Content-Type": "application/x-msgpack"}, data=msgpack_dumps({"toto": 42}), ) assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/x-msgpack") assert (await decode_request(resp)) == {"toto": 42} # complex struct resp = await cli.post( "/echo", headers={"Content-Type": "application/x-msgpack"}, data=msgpack_dumps(STRUCT), ) assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/x-msgpack") assert (await decode_request(resp)) == STRUCT async def test_post_struct_json(cli) -> None: """Test that json encoded posted struct data is returned as is""" resp = await cli.post( "/echo", headers={"Content-Type": "application/json"}, data=json.dumps({"toto": 42}, cls=SWHJSONEncoder), ) assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/x-msgpack") assert (await decode_request(resp)) == {"toto": 42} resp = await cli.post( "/echo", headers={"Content-Type": "application/json"}, data=json.dumps(STRUCT, cls=SWHJSONEncoder), ) assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/x-msgpack") # assert resp.headers['Content-Type'] == 'application/x-msgpack' assert (await decode_request(resp)) == STRUCT async def test_post_struct_nego(cli) -> None: """Test that json encoded posted struct data is returned as is using content negotiation (accept json or msgpack). """ for ctype in ("x-msgpack", "json"): resp = await cli.post( "/echo", headers={ "Content-Type": "application/json", "Accept": "application/%s" % ctype, }, data=json.dumps(STRUCT, cls=SWHJSONEncoder), ) assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/%s" % ctype) assert (await decode_request(resp)) == STRUCT async def test_post_struct_no_nego(cli) -> None: """Test that json encoded posted struct data is returned as msgpack when using non-negotiation-compatible handlers. """ for ctype in ("x-msgpack", "json"): resp = await cli.post( "/echo-no-nego", headers={ "Content-Type": "application/json", "Accept": "application/%s" % ctype, }, data=json.dumps(STRUCT, cls=SWHJSONEncoder), ) assert resp.status == 200 check_mimetype(resp.headers["Content-Type"], "application/x-msgpack") assert (await decode_request(resp)) == STRUCT def test_async_decode_data_failure(): with pytest.raises(ValueError, match="Wrong content type"): decode_data("some-data", "unknown-content-type") @pytest.mark.parametrize("data", [None, "", {}, []]) def test_async_decode_data_empty_cases(data): assert decode_data(data, "unknown-content-type") == {} @pytest.mark.parametrize( "data,content_type,encode_data_fn", [ ({"a": 1}, "application/json", json_dumps), ({"a": 1}, "application/x-msgpack", msgpack_dumps), ], ) def test_async_decode_data_nominal(data, content_type, encode_data_fn): actual_data = decode_data(encode_data_fn(data), content_type) assert actual_data == data diff --git a/swh/core/api/tests/test_rpc_client_server.py b/swh/core/api/tests/test_rpc_client_server.py index 81b0afa..0d4e70a 100644 --- a/swh/core/api/tests/test_rpc_client_server.py +++ b/swh/core/api/tests/test_rpc_client_server.py @@ -1,117 +1,130 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.core.api import ( RemoteException, RPCClient, RPCServerApp, encode_data_server, error_handler, remote_api_endpoint, ) # this class is used on the server part class RPCTest: @remote_api_endpoint("endpoint_url") def endpoint(self, test_data, db=None, cur=None): assert test_data == "spam" return "egg" @remote_api_endpoint("path/to/endpoint") def something(self, data, db=None, cur=None): return data @remote_api_endpoint("raises_typeerror") def raise_typeerror(self): raise TypeError("Did I pass through?") + @remote_api_endpoint("raise_exception_exc_arg") + def raise_exception_exc_arg(self): + raise Exception(Exception("error")) + # this class is used on the client part. We cannot inherit from RPCTest # because the automagic metaclass based code that generates the RPCClient # proxy class from this does not handle inheritance properly. # We do add an endpoint on the client side that has no implementation # server-side to test this very situation (in should generate a 404) class RPCTest2: @remote_api_endpoint("endpoint_url") def endpoint(self, test_data, db=None, cur=None): assert test_data == "spam" return "egg" @remote_api_endpoint("path/to/endpoint") def something(self, data, db=None, cur=None): return data @remote_api_endpoint("not_on_server") def not_on_server(self, db=None, cur=None): return "ok" @remote_api_endpoint("raises_typeerror") def raise_typeerror(self): return "data" class RPCTestClient(RPCClient): backend_class = RPCTest2 @pytest.fixture def app(): # This fixture is used by the 'swh_rpc_adapter' fixture # which is defined in swh/core/pytest_plugin.py application = RPCServerApp("testapp", backend_class=RPCTest) @application.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data_server) return application @pytest.fixture def swh_rpc_client_class(): # This fixture is used by the 'swh_rpc_client' fixture # which is defined in swh/core/pytest_plugin.py return RPCTestClient def test_api_client_endpoint_missing(swh_rpc_client): with pytest.raises(AttributeError): swh_rpc_client.missing(data="whatever") def test_api_server_endpoint_missing(swh_rpc_client): # A 'missing' endpoint (server-side) should raise an exception # due to a 404, since at the end, we do a GET/POST an inexistent URL with pytest.raises(Exception, match="404 not found"): swh_rpc_client.not_on_server() def test_api_endpoint_kwargs(swh_rpc_client): res = swh_rpc_client.something(data="whatever") assert res == "whatever" res = swh_rpc_client.endpoint(test_data="spam") assert res == "egg" def test_api_endpoint_args(swh_rpc_client): res = swh_rpc_client.something("whatever") assert res == "whatever" res = swh_rpc_client.endpoint("spam") assert res == "egg" def test_api_typeerror(swh_rpc_client): with pytest.raises(RemoteException) as exc_info: swh_rpc_client.raise_typeerror() assert exc_info.value.args[0]["type"] == "TypeError" assert exc_info.value.args[0]["args"] == ["Did I pass through?"] assert ( str(exc_info.value) == "" ) + + +def test_api_raise_exception_exc_arg(swh_rpc_client): + with pytest.raises(RemoteException) as exc_info: + swh_rpc_client.post("raise_exception_exc_arg", data={}) + + assert exc_info.value.args[0]["type"] == "Exception" + assert type(exc_info.value.args[0]["args"][0]) == Exception + assert str(exc_info.value.args[0]["args"][0]) == "error" diff --git a/swh/core/api/tests/test_serializers.py b/swh/core/api/tests/test_serializers.py index 9d7c261..f242541 100644 --- a/swh/core/api/tests/test_serializers.py +++ b/swh/core/api/tests/test_serializers.py @@ -1,250 +1,271 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json from typing import Any, Callable, List, Tuple, Union from uuid import UUID import arrow from arrow import Arrow import pytest import requests +from requests.exceptions import ConnectionError from swh.core.api.classes import PagedResult from swh.core.api.serializers import ( SWHJSONDecoder, SWHJSONEncoder, decode_response, msgpack_dumps, msgpack_loads, ) class ExtraType: def __init__(self, arg1, arg2): self.arg1 = arg1 self.arg2 = arg2 def __repr__(self): return f"ExtraType({self.arg1}, {self.arg2})" def __eq__(self, other): return isinstance(other, ExtraType) and (self.arg1, self.arg2) == ( other.arg1, other.arg2, ) extra_encoders: List[Tuple[type, str, Callable[..., Any]]] = [ (ExtraType, "extratype", lambda o: (o.arg1, o.arg2)) ] extra_decoders = { "extratype": lambda o: ExtraType(*o), } TZ = datetime.timezone(datetime.timedelta(minutes=118)) DATA_BYTES = b"123456789\x99\xaf\xff\x00\x12" ENCODED_DATA_BYTES = {"swhtype": "bytes", "d": "F)}kWH8wXmIhn8j01^"} DATA_DATETIME = datetime.datetime(2015, 3, 4, 18, 25, 13, 1234, tzinfo=TZ,) ENCODED_DATA_DATETIME = { "swhtype": "datetime", "d": "2015-03-04T18:25:13.001234+01:58", } DATA_TIMEDELTA = datetime.timedelta(64) ENCODED_DATA_TIMEDELTA = { "swhtype": "timedelta", "d": {"days": 64, "seconds": 0, "microseconds": 0}, } DATA_ARROW = arrow.get("2018-04-25T16:17:53.533672+00:00") ENCODED_DATA_ARROW = {"swhtype": "arrow", "d": "2018-04-25T16:17:53.533672+00:00"} DATA_UUID = UUID("cdd8f804-9db6-40c3-93ab-5955d3836234") ENCODED_DATA_UUID = {"swhtype": "uuid", "d": "cdd8f804-9db6-40c3-93ab-5955d3836234"} # For test demonstration purposes TestPagedResultStr = PagedResult[ Union[UUID, datetime.datetime, datetime.timedelta], str ] DATA_PAGED_RESULT = TestPagedResultStr( results=[DATA_UUID, DATA_DATETIME, DATA_TIMEDELTA], next_page_token="10", ) ENCODED_DATA_PAGED_RESULT = { "d": { "results": [ENCODED_DATA_UUID, ENCODED_DATA_DATETIME, ENCODED_DATA_TIMEDELTA,], "next_page_token": "10", }, "swhtype": "paged_result", } TestPagedResultTuple = PagedResult[Union[str, bytes, Arrow], List[Union[str, UUID]]] DATA_PAGED_RESULT2 = TestPagedResultTuple( results=["data0", DATA_BYTES, DATA_ARROW], next_page_token=["10", DATA_UUID], ) ENCODED_DATA_PAGED_RESULT2 = { "d": { "results": ["data0", ENCODED_DATA_BYTES, ENCODED_DATA_ARROW,], "next_page_token": ["10", ENCODED_DATA_UUID], }, "swhtype": "paged_result", } DATA = { "bytes": DATA_BYTES, "datetime_tz": DATA_DATETIME, "datetime_utc": datetime.datetime( 2015, 3, 4, 18, 25, 13, 1234, tzinfo=datetime.timezone.utc ), "datetime_delta": DATA_TIMEDELTA, "arrow_date": DATA_ARROW, "swhtype": "fake", "swh_dict": {"swhtype": 42, "d": "test"}, "random_dict": {"swhtype": 43}, "uuid": DATA_UUID, "paged-result": DATA_PAGED_RESULT, "paged-result2": DATA_PAGED_RESULT2, } ENCODED_DATA = { "bytes": ENCODED_DATA_BYTES, "datetime_tz": ENCODED_DATA_DATETIME, "datetime_utc": {"swhtype": "datetime", "d": "2015-03-04T18:25:13.001234+00:00",}, "datetime_delta": ENCODED_DATA_TIMEDELTA, "arrow_date": ENCODED_DATA_ARROW, "swhtype": "fake", "swh_dict": {"swhtype": 42, "d": "test"}, "random_dict": {"swhtype": 43}, "uuid": ENCODED_DATA_UUID, "paged-result": ENCODED_DATA_PAGED_RESULT, "paged-result2": ENCODED_DATA_PAGED_RESULT2, } def test_serializers_round_trip_json(): json_data = json.dumps(DATA, cls=SWHJSONEncoder) actual_data = json.loads(json_data, cls=SWHJSONDecoder) assert actual_data == DATA def test_serializers_round_trip_json_extra_types(): expected_original_data = [ExtraType("baz", DATA), "qux"] data = json.dumps( expected_original_data, cls=SWHJSONEncoder, extra_encoders=extra_encoders ) actual_data = json.loads(data, cls=SWHJSONDecoder, extra_decoders=extra_decoders) assert actual_data == expected_original_data +def test_exception_serializer_round_trip_json(): + error_message = "unreachable host" + json_data = json.dumps( + {"exception": ConnectionError(error_message)}, cls=SWHJSONEncoder + ) + actual_data = json.loads(json_data, cls=SWHJSONDecoder) + assert "exception" in actual_data + assert type(actual_data["exception"]) == ConnectionError + assert str(actual_data["exception"]) == error_message + + def test_serializers_encode_swh_json(): json_str = json.dumps(DATA, cls=SWHJSONEncoder) actual_data = json.loads(json_str) assert actual_data == ENCODED_DATA def test_serializers_round_trip_msgpack(): expected_original_data = { **DATA, "none_dict_key": {None: 42}, "long_int_is_loooong": 10000000000000000000000000000000, } data = msgpack_dumps(expected_original_data) actual_data = msgpack_loads(data) assert actual_data == expected_original_data def test_serializers_round_trip_msgpack_extra_types(): original_data = [ExtraType("baz", DATA), "qux"] data = msgpack_dumps(original_data, extra_encoders=extra_encoders) actual_data = msgpack_loads(data, extra_decoders=extra_decoders) assert actual_data == original_data +def test_exception_serializer_round_trip_msgpack(): + error_message = "unreachable host" + data = msgpack_dumps({"exception": ConnectionError(error_message)}) + actual_data = msgpack_loads(data) + assert "exception" in actual_data + assert type(actual_data["exception"]) == ConnectionError + assert str(actual_data["exception"]) == error_message + + def test_serializers_generator_json(): data = json.dumps((i for i in range(5)), cls=SWHJSONEncoder) assert json.loads(data, cls=SWHJSONDecoder) == [i for i in range(5)] def test_serializers_generator_msgpack(): data = msgpack_dumps((i for i in range(5))) assert msgpack_loads(data) == [i for i in range(5)] def test_serializers_decode_response_json(requests_mock): requests_mock.get( "https://example.org/test/data", json=ENCODED_DATA, headers={"content-type": "application/json"}, ) response = requests.get("https://example.org/test/data") assert decode_response(response) == DATA def test_serializers_decode_legacy_msgpack(): legacy_msgpack = { "bytes": b"\xc4\x0e123456789\x99\xaf\xff\x00\x12", "datetime_tz": ( b"\x82\xc4\x0c__datetime__\xc3\xc4\x01s\xd9 " b"2015-03-04T18:25:13.001234+01:58" ), "datetime_utc": ( b"\x82\xc4\x0c__datetime__\xc3\xc4\x01s\xd9 " b"2015-03-04T18:25:13.001234+00:00" ), "datetime_delta": ( b"\x82\xc4\r__timedelta__\xc3\xc4\x01s\x83\xa4" b"days@\xa7seconds\x00\xacmicroseconds\x00" ), "arrow_date": ( b"\x82\xc4\t__arrow__\xc3\xc4\x01s\xd9 2018-04-25T16:17:53.533672+00:00" ), "swhtype": b"\xa4fake", "swh_dict": b"\x82\xa7swhtype*\xa1d\xa4test", "random_dict": b"\x81\xa7swhtype+", "uuid": ( b"\x82\xc4\x08__uuid__\xc3\xc4\x01s\xd9$" b"cdd8f804-9db6-40c3-93ab-5955d3836234" ), } for k, v in legacy_msgpack.items(): assert msgpack_loads(v) == DATA[k] def test_serializers_encode_native_datetime(): dt = datetime.datetime(2015, 1, 1, 12, 4, 42, 231455) with pytest.raises(ValueError, match="naive datetime"): msgpack_dumps(dt) def test_serializers_decode_naive_datetime(): expected_dt = datetime.datetime(2015, 1, 1, 12, 4, 42, 231455) # Current encoding assert ( msgpack_loads( b"\x82\xc4\x07swhtype\xa8datetime\xc4\x01d\xba" b"2015-01-01T12:04:42.231455" ) == expected_dt ) # Legacy encoding assert ( msgpack_loads( b"\x82\xc4\x0c__datetime__\xc3\xc4\x01s\xba2015-01-01T12:04:42.231455" ) == expected_dt )