Changeset View
Changeset View
Standalone View
Standalone View
swh/core/api/serializers.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import base64 | import base64 | ||||
import datetime | import datetime | ||||
import json | import json | ||||
import types | import types | ||||
from uuid import UUID | from uuid import UUID | ||||
import arrow | import arrow | ||||
import dateutil.parser | import dateutil.parser | ||||
import msgpack | import msgpack | ||||
from typing import Any, Dict, Union, Tuple | |||||
from requests import Response | |||||
def encode_data_client(data): | |||||
def encode_data_client(data: Any) -> bytes: | |||||
try: | try: | ||||
return msgpack_dumps(data) | return msgpack_dumps(data) | ||||
except OverflowError as e: | except OverflowError as e: | ||||
raise ValueError('Limits were reached. Please, check your input.\n' + | raise ValueError('Limits were reached. Please, check your input.\n' + | ||||
str(e)) | str(e)) | ||||
def decode_response(response): | def decode_response(response: Response) -> Any: | ||||
content_type = response.headers['content-type'] | content_type = response.headers['content-type'] | ||||
if content_type.startswith('application/x-msgpack'): | if content_type.startswith('application/x-msgpack'): | ||||
r = msgpack_loads(response.content) | r = msgpack_loads(response.content) | ||||
elif content_type.startswith('application/json'): | elif content_type.startswith('application/json'): | ||||
r = json.loads(response.text, cls=SWHJSONDecoder) | r = json.loads(response.text, cls=SWHJSONDecoder) | ||||
elif content_type.startswith('text/'): | elif content_type.startswith('text/'): | ||||
r = response.text | r = response.text | ||||
Show All 23 Lines | class SWHJSONEncoder(json.JSONEncoder): | ||||
Caveats: Limitations in the JSONEncoder extension mechanism | Caveats: Limitations in the JSONEncoder extension mechanism | ||||
prevent us from "escaping" dictionaries that only contain the | prevent us from "escaping" dictionaries that only contain the | ||||
swhtype and d keys, and therefore arbitrary data structures can't | swhtype and d keys, and therefore arbitrary data structures can't | ||||
be round-tripped through SWHJSONEncoder and SWHJSONDecoder. | be round-tripped through SWHJSONEncoder and SWHJSONDecoder. | ||||
""" | """ | ||||
def default(self, o): | def default(self, o: Any | ||||
) -> Union[Dict[str, Union[Dict[str, int], str]], list]: | |||||
if isinstance(o, bytes): | if isinstance(o, bytes): | ||||
return { | return { | ||||
'swhtype': 'bytes', | 'swhtype': 'bytes', | ||||
'd': base64.b85encode(o).decode('ascii'), | 'd': base64.b85encode(o).decode('ascii'), | ||||
} | } | ||||
elif isinstance(o, datetime.datetime): | elif isinstance(o, datetime.datetime): | ||||
return { | return { | ||||
'swhtype': 'datetime', | 'swhtype': 'datetime', | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | class SWHJSONDecoder(json.JSONDecoder): | ||||
- swhtype with value 'bytes' or 'datetime'; | - swhtype with value 'bytes' or 'datetime'; | ||||
- d containing the encoded value. | - d containing the encoded value. | ||||
To limit the impact our encoding, if the swhtype key doesn't | To limit the impact our encoding, if the swhtype key doesn't | ||||
contain a known value, the dictionary is decoded as-is. | contain a known value, the dictionary is decoded as-is. | ||||
""" | """ | ||||
def decode_data(self, o): | |||||
def decode_data(self, o: Any) -> Any: | |||||
if isinstance(o, dict): | if isinstance(o, dict): | ||||
if set(o.keys()) == {'d', 'swhtype'}: | if set(o.keys()) == {'d', 'swhtype'}: | ||||
datatype = o['swhtype'] | datatype = o['swhtype'] | ||||
if datatype == 'bytes': | if datatype == 'bytes': | ||||
return base64.b85decode(o['d']) | return base64.b85decode(o['d']) | ||||
elif datatype == 'datetime': | elif datatype == 'datetime': | ||||
return dateutil.parser.parse(o['d']) | return dateutil.parser.parse(o['d']) | ||||
elif datatype == 'uuid': | elif datatype == 'uuid': | ||||
return UUID(o['d']) | return UUID(o['d']) | ||||
elif datatype == 'timedelta': | elif datatype == 'timedelta': | ||||
return datetime.timedelta(**o['d']) | return datetime.timedelta(**o['d']) | ||||
elif datatype == 'arrow': | elif datatype == 'arrow': | ||||
return arrow.get(o['d']) | return arrow.get(o['d']) | ||||
return {key: self.decode_data(value) for key, value in o.items()} | return {key: self.decode_data(value) for key, value in o.items()} | ||||
if isinstance(o, list): | if isinstance(o, list): | ||||
return [self.decode_data(value) for value in o] | return [self.decode_data(value) for value in o] | ||||
else: | else: | ||||
return o | return o | ||||
def raw_decode(self, s, idx=0): | def raw_decode(self, s: str, idx: int = 0) -> Tuple[Any, int]: | ||||
data, index = super().raw_decode(s, idx) | data, index = super().raw_decode(s, idx) | ||||
return self.decode_data(data), index | return self.decode_data(data), index | ||||
def msgpack_dumps(data): | def msgpack_dumps(data: Any) -> bytes: | ||||
"""Write data as a msgpack stream""" | """Write data as a msgpack stream""" | ||||
def encode_types(obj): | def encode_types(obj): | ||||
if isinstance(obj, datetime.datetime): | if isinstance(obj, datetime.datetime): | ||||
return {b'__datetime__': True, b's': obj.isoformat()} | return {b'__datetime__': True, b's': obj.isoformat()} | ||||
if isinstance(obj, types.GeneratorType): | if isinstance(obj, types.GeneratorType): | ||||
return list(obj) | return list(obj) | ||||
if isinstance(obj, UUID): | if isinstance(obj, UUID): | ||||
return {b'__uuid__': True, b's': str(obj)} | return {b'__uuid__': True, b's': str(obj)} | ||||
if isinstance(obj, datetime.timedelta): | if isinstance(obj, datetime.timedelta): | ||||
return { | return { | ||||
b'__timedelta__': True, | b'__timedelta__': True, | ||||
b's': { | b's': { | ||||
'days': obj.days, | 'days': obj.days, | ||||
'seconds': obj.seconds, | 'seconds': obj.seconds, | ||||
'microseconds': obj.microseconds, | 'microseconds': obj.microseconds, | ||||
}, | }, | ||||
} | } | ||||
if isinstance(obj, arrow.Arrow): | if isinstance(obj, arrow.Arrow): | ||||
return {b'__arrow__': True, b's': obj.isoformat()} | return {b'__arrow__': True, b's': obj.isoformat()} | ||||
return obj | return obj | ||||
return msgpack.packb(data, use_bin_type=True, default=encode_types) | return msgpack.packb(data, use_bin_type=True, default=encode_types) | ||||
def msgpack_loads(data): | def msgpack_loads(data: bytes) -> Any: | ||||
"""Read data as a msgpack stream""" | """Read data as a msgpack stream""" | ||||
def decode_types(obj): | def decode_types(obj): | ||||
if b'__datetime__' in obj and obj[b'__datetime__']: | if b'__datetime__' in obj and obj[b'__datetime__']: | ||||
return dateutil.parser.parse(obj[b's']) | return dateutil.parser.parse(obj[b's']) | ||||
if b'__uuid__' in obj and obj[b'__uuid__']: | if b'__uuid__' in obj and obj[b'__uuid__']: | ||||
return UUID(obj[b's']) | return UUID(obj[b's']) | ||||
if b'__timedelta__' in obj and obj[b'__timedelta__']: | if b'__timedelta__' in obj and obj[b'__timedelta__']: | ||||
return datetime.timedelta(**obj[b's']) | return datetime.timedelta(**obj[b's']) | ||||
Show All 10 Lines |