diff --git a/swh/core/api/negotiation.py b/swh/core/api/negotiation.py index de57742..1322862 100644 --- a/swh/core/api/negotiation.py +++ b/swh/core/api/negotiation.py @@ -1,153 +1,159 @@ # This code is a partial and adapted copy of # https://github.com/nickstenning/negotiate # # Copyright 2012-2013 Nick Stenning # 2019 The Software Heritage developers # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # from collections import defaultdict from decorator import decorator from inspect import getcallargs -from typing import Any, List, Optional +from typing import Any, List, Optional, Callable, \ + Type, NoReturn, DefaultDict + +from requests import Response class FormatterNotFound(Exception): pass class Formatter: - format = None # type: Optional[str] - mimetypes = [] # type: List[Any] + format: Optional[str] = None + mimetypes: List[str] = [] - def __init__(self, request_mimetype=None): + def __init__(self, request_mimetype: Optional[str] = None) -> None: if request_mimetype is None or request_mimetype not in self.mimetypes: try: self.response_mimetype = self.mimetypes[0] except IndexError: raise NotImplementedError( "%s.mimetypes should be a non-empty list" % self.__class__.__name__) else: self.response_mimetype = request_mimetype - def configure(self): + def configure(self) -> None: pass - def render(self, obj): + def render(self, obj: Any) -> bytes: raise NotImplementedError( "render() should be implemented by Formatter subclasses") - def __call__(self, obj): + def __call__(self, obj: Any) -> Response: return self._make_response( self.render(obj), content_type=self.response_mimetype) - def _make_response(self, body, content_type): + def _make_response(self, body: bytes, content_type: str) -> Response: raise NotImplementedError( "_make_response() should be implemented by " "framework-specific subclasses of Formatter" ) class Negotiator: - def __init__(self, func): + def __init__(self, func: Callable[..., Any]) -> None: self.func = func - self._formatters = [] - self._formatters_by_format = defaultdict(list) - self._formatters_by_mimetype = defaultdict(list) + self._formatters: List[Type[Formatter]] = [] + self._formatters_by_format: DefaultDict = defaultdict(list) + self._formatters_by_mimetype: DefaultDict = defaultdict(list) - def __call__(self, *args, **kwargs): + def __call__(self, *args, **kwargs) -> Response: result = self.func(*args, **kwargs) format = getcallargs(self.func, *args, **kwargs).get('format') mimetype = self.best_mimetype() try: formatter = self.get_formatter(format, mimetype) except FormatterNotFound as e: return self._abort(404, str(e)) return formatter(result) - def register_formatter(self, formatter, *args, **kwargs): + def register_formatter(self, formatter: Type[Formatter], + *args, **kwargs) -> None: self._formatters.append(formatter) self._formatters_by_format[formatter.format].append( (formatter, args, kwargs)) for mimetype in formatter.mimetypes: self._formatters_by_mimetype[mimetype].append( (formatter, args, kwargs)) - def get_formatter(self, format=None, mimetype=None): + def get_formatter(self, format: Optional[str] = None, + mimetype: Optional[str] = None) -> Formatter: if format is None and mimetype is None: raise TypeError( "get_formatter expects one of the 'format' or 'mimetype' " "kwargs to be set") if format is not None: try: # the first added will be the most specific formatter_cls, args, kwargs = ( self._formatters_by_format[format][0]) except IndexError: raise FormatterNotFound( "Formatter for format '%s' not found!" % format) elif mimetype is not None: try: # the first added will be the most specific formatter_cls, args, kwargs = ( self._formatters_by_mimetype[mimetype][0]) except IndexError: raise FormatterNotFound( "Formatter for mimetype '%s' not found!" % mimetype) formatter = formatter_cls(request_mimetype=mimetype) formatter.configure(*args, **kwargs) return formatter @property - def accept_mimetypes(self): + def accept_mimetypes(self) -> List[str]: return [m for f in self._formatters for m in f.mimetypes] - def best_mimetype(self): + def best_mimetype(self) -> str: raise NotImplementedError( "best_mimetype() should be implemented in " "framework-specific subclasses of Negotiator" ) - def _abort(self, status_code, err=None): + def _abort(self, status_code: int, err: Optional[str] = None) -> NoReturn: raise NotImplementedError( "_abort() should be implemented in framework-specific " "subclasses of Negotiator" ) -def negotiate(negotiator_cls, formatter_cls, *args, **kwargs): +def negotiate(negotiator_cls: Type[Negotiator], formatter_cls: Type[Formatter], + *args, **kwargs) -> Callable: def _negotiate(f, *args, **kwargs): return f.negotiator(*args, **kwargs) def decorate(f): if not hasattr(f, 'negotiator'): f.negotiator = negotiator_cls(f) f.negotiator.register_formatter(formatter_cls, *args, **kwargs) return decorator(_negotiate, f) return decorate diff --git a/swh/core/api/serializers.py b/swh/core/api/serializers.py index ac43bfb..f9aca96 100644 --- a/swh/core/api/serializers.py +++ b/swh/core/api/serializers.py @@ -1,193 +1,198 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import datetime import json import types from uuid import UUID import arrow import dateutil.parser import msgpack +from typing import Any, Dict, Union, Tuple +from requests import Response -def encode_data_client(data): + +def encode_data_client(data: Any) -> bytes: try: return msgpack_dumps(data) except OverflowError as e: raise ValueError('Limits were reached. Please, check your input.\n' + str(e)) -def decode_response(response): +def decode_response(response: Response) -> Any: content_type = response.headers['content-type'] if content_type.startswith('application/x-msgpack'): r = msgpack_loads(response.content) elif content_type.startswith('application/json'): r = json.loads(response.text, cls=SWHJSONDecoder) elif content_type.startswith('text/'): r = response.text else: raise ValueError('Wrong content type `%s` for API response' % content_type) return r class SWHJSONEncoder(json.JSONEncoder): """JSON encoder for data structures generated by Software Heritage. This JSON encoder extends the default Python JSON encoder and adds awareness for the following specific types: - bytes (get encoded as a Base85 string); - datetime.datetime (get encoded as an ISO8601 string). Non-standard types get encoded as a a dictionary with two keys: - swhtype with value 'bytes' or 'datetime'; - d containing the encoded value. SWHJSONEncoder also encodes arbitrary iterables as a list (allowing serialization of generators). Caveats: Limitations in the JSONEncoder extension mechanism prevent us from "escaping" dictionaries that only contain the swhtype and d keys, and therefore arbitrary data structures can't be round-tripped through SWHJSONEncoder and SWHJSONDecoder. """ - def default(self, o): + def default(self, o: Any + ) -> Union[Dict[str, Union[Dict[str, int], str]], list]: if isinstance(o, bytes): return { 'swhtype': 'bytes', 'd': base64.b85encode(o).decode('ascii'), } elif isinstance(o, datetime.datetime): return { 'swhtype': 'datetime', 'd': o.isoformat(), } elif isinstance(o, UUID): return { 'swhtype': 'uuid', 'd': str(o), } elif isinstance(o, datetime.timedelta): return { 'swhtype': 'timedelta', 'd': { 'days': o.days, 'seconds': o.seconds, 'microseconds': o.microseconds, }, } elif isinstance(o, arrow.Arrow): return { 'swhtype': 'arrow', 'd': o.isoformat(), } try: return super().default(o) except TypeError as e: try: iterable = iter(o) except TypeError: raise e from None else: return list(iterable) class SWHJSONDecoder(json.JSONDecoder): """JSON decoder for data structures encoded with SWHJSONEncoder. This JSON decoder extends the default Python JSON decoder, allowing the decoding of: - bytes (encoded as a Base85 string); - datetime.datetime (encoded as an ISO8601 string). Non-standard types must be encoded as a a dictionary with exactly two keys: - swhtype with value 'bytes' or 'datetime'; - d containing the encoded value. To limit the impact our encoding, if the swhtype key doesn't contain a known value, the dictionary is decoded as-is. """ - def decode_data(self, o): + + def decode_data(self, o: Any) -> Any: if isinstance(o, dict): if set(o.keys()) == {'d', 'swhtype'}: datatype = o['swhtype'] if datatype == 'bytes': return base64.b85decode(o['d']) elif datatype == 'datetime': return dateutil.parser.parse(o['d']) elif datatype == 'uuid': return UUID(o['d']) elif datatype == 'timedelta': return datetime.timedelta(**o['d']) elif datatype == 'arrow': return arrow.get(o['d']) return {key: self.decode_data(value) for key, value in o.items()} if isinstance(o, list): return [self.decode_data(value) for value in o] else: return o - def raw_decode(self, s, idx=0): + def raw_decode(self, s: str, idx: int = 0) -> Tuple[Any, int]: data, index = super().raw_decode(s, idx) return self.decode_data(data), index -def msgpack_dumps(data): +def msgpack_dumps(data: Any) -> bytes: """Write data as a msgpack stream""" def encode_types(obj): if isinstance(obj, datetime.datetime): return {b'__datetime__': True, b's': obj.isoformat()} if isinstance(obj, types.GeneratorType): return list(obj) if isinstance(obj, UUID): return {b'__uuid__': True, b's': str(obj)} if isinstance(obj, datetime.timedelta): return { b'__timedelta__': True, b's': { 'days': obj.days, 'seconds': obj.seconds, 'microseconds': obj.microseconds, }, } if isinstance(obj, arrow.Arrow): return {b'__arrow__': True, b's': obj.isoformat()} return obj return msgpack.packb(data, use_bin_type=True, default=encode_types) -def msgpack_loads(data): +def msgpack_loads(data: bytes) -> Any: """Read data as a msgpack stream""" def decode_types(obj): if b'__datetime__' in obj and obj[b'__datetime__']: return dateutil.parser.parse(obj[b's']) if b'__uuid__' in obj and obj[b'__uuid__']: return UUID(obj[b's']) if b'__timedelta__' in obj and obj[b'__timedelta__']: return datetime.timedelta(**obj[b's']) if b'__arrow__' in obj and obj[b'__arrow__']: return arrow.get(obj[b's']) return obj try: return msgpack.unpackb(data, raw=False, object_hook=decode_types) except TypeError: # msgpack < 0.5.2 return msgpack.unpackb(data, encoding='utf-8', object_hook=decode_types)