Changeset View
Changeset View
Standalone View
Standalone View
swh/core/api/serializers.py
Show All 13 Lines | |||||
import iso8601 | import iso8601 | ||||
import msgpack | import msgpack | ||||
from typing import Any, Dict, Union, Tuple | from typing import Any, Dict, Union, Tuple | ||||
from requests import Response | from requests import Response | ||||
ENCODERS = [ | ENCODERS = [ | ||||
(arrow.Arrow, 'arrow', arrow.Arrow.isoformat), | (arrow.Arrow, "arrow", arrow.Arrow.isoformat), | ||||
(datetime.datetime, 'datetime', datetime.datetime.isoformat), | (datetime.datetime, "datetime", datetime.datetime.isoformat), | ||||
(datetime.timedelta, 'timedelta', lambda o: { | ( | ||||
'days': o.days, | datetime.timedelta, | ||||
'seconds': o.seconds, | "timedelta", | ||||
'microseconds': o.microseconds, | lambda o: { | ||||
}), | "days": o.days, | ||||
(UUID, 'uuid', str), | "seconds": o.seconds, | ||||
"microseconds": o.microseconds, | |||||
}, | |||||
), | |||||
(UUID, "uuid", str), | |||||
# Only for JSON: | # Only for JSON: | ||||
(bytes, 'bytes', lambda o: base64.b85encode(o).decode('ascii')), | (bytes, "bytes", lambda o: base64.b85encode(o).decode("ascii")), | ||||
] | ] | ||||
DECODERS = { | DECODERS = { | ||||
'arrow': arrow.get, | "arrow": arrow.get, | ||||
'datetime': lambda d: iso8601.parse_date(d, default_timezone=None), | "datetime": lambda d: iso8601.parse_date(d, default_timezone=None), | ||||
'timedelta': lambda d: datetime.timedelta(**d), | "timedelta": lambda d: datetime.timedelta(**d), | ||||
'uuid': UUID, | "uuid": UUID, | ||||
# Only for JSON: | # Only for JSON: | ||||
'bytes': base64.b85decode, | "bytes": base64.b85decode, | ||||
} | } | ||||
def encode_data_client(data: Any, extra_encoders=None) -> bytes: | def encode_data_client(data: Any, extra_encoders=None) -> bytes: | ||||
try: | try: | ||||
return msgpack_dumps(data, extra_encoders=extra_encoders) | return msgpack_dumps(data, extra_encoders=extra_encoders) | ||||
except OverflowError as e: | except OverflowError as e: | ||||
raise ValueError('Limits were reached. Please, check your input.\n' + | raise ValueError("Limits were reached. Please, check your input.\n" + str(e)) | ||||
str(e)) | |||||
def decode_response(response: Response, extra_decoders=None) -> Any: | def decode_response(response: Response, extra_decoders=None) -> Any: | ||||
content_type = response.headers['content-type'] | content_type = response.headers["content-type"] | ||||
if content_type.startswith('application/x-msgpack'): | if content_type.startswith("application/x-msgpack"): | ||||
r = msgpack_loads(response.content, | r = msgpack_loads(response.content, extra_decoders=extra_decoders) | ||||
extra_decoders=extra_decoders) | elif content_type.startswith("application/json"): | ||||
elif content_type.startswith('application/json'): | r = json_loads(response.text, extra_decoders=extra_decoders) | ||||
r = json_loads(response.text, | elif content_type.startswith("text/"): | ||||
extra_decoders=extra_decoders) | |||||
elif content_type.startswith('text/'): | |||||
r = response.text | r = response.text | ||||
else: | else: | ||||
raise ValueError('Wrong content type `%s` for API response' | raise ValueError("Wrong content type `%s` for API response" % content_type) | ||||
% content_type) | |||||
return r | return r | ||||
class SWHJSONEncoder(json.JSONEncoder): | class SWHJSONEncoder(json.JSONEncoder): | ||||
"""JSON encoder for data structures generated by Software Heritage. | """JSON encoder for data structures generated by Software Heritage. | ||||
This JSON encoder extends the default Python JSON encoder and adds | This JSON encoder extends the default Python JSON encoder and adds | ||||
Show All 18 Lines | class SWHJSONEncoder(json.JSONEncoder): | ||||
""" | """ | ||||
def __init__(self, extra_encoders=None, **kwargs): | def __init__(self, extra_encoders=None, **kwargs): | ||||
super().__init__(**kwargs) | super().__init__(**kwargs) | ||||
self.encoders = ENCODERS | self.encoders = ENCODERS | ||||
if extra_encoders: | if extra_encoders: | ||||
self.encoders += extra_encoders | self.encoders += extra_encoders | ||||
def default(self, o: Any | def default(self, o: Any) -> Union[Dict[str, Union[Dict[str, int], str]], list]: | ||||
) -> Union[Dict[str, Union[Dict[str, int], str]], list]: | |||||
for (type_, type_name, encoder) in self.encoders: | for (type_, type_name, encoder) in self.encoders: | ||||
if isinstance(o, type_): | if isinstance(o, type_): | ||||
return { | return {"swhtype": type_name, "d": encoder(o)} | ||||
'swhtype': type_name, | |||||
'd': encoder(o), | |||||
} | |||||
try: | try: | ||||
return super().default(o) | return super().default(o) | ||||
except TypeError as e: | except TypeError as e: | ||||
try: | try: | ||||
iterable = iter(o) | iterable = iter(o) | ||||
except TypeError: | except TypeError: | ||||
raise e from None | raise e from None | ||||
else: | else: | ||||
Show All 23 Lines | class SWHJSONDecoder(json.JSONDecoder): | ||||
def __init__(self, extra_decoders=None, **kwargs): | def __init__(self, extra_decoders=None, **kwargs): | ||||
super().__init__(**kwargs) | super().__init__(**kwargs) | ||||
self.decoders = DECODERS | self.decoders = DECODERS | ||||
if extra_decoders: | if extra_decoders: | ||||
self.decoders = {**self.decoders, **extra_decoders} | self.decoders = {**self.decoders, **extra_decoders} | ||||
def decode_data(self, o: Any) -> Any: | def decode_data(self, o: Any) -> Any: | ||||
if isinstance(o, dict): | if isinstance(o, dict): | ||||
if set(o.keys()) == {'d', 'swhtype'}: | if set(o.keys()) == {"d", "swhtype"}: | ||||
if o['swhtype'] == 'bytes': | if o["swhtype"] == "bytes": | ||||
return base64.b85decode(o['d']) | return base64.b85decode(o["d"]) | ||||
decoder = self.decoders.get(o['swhtype']) | decoder = self.decoders.get(o["swhtype"]) | ||||
if decoder: | if decoder: | ||||
return decoder(self.decode_data(o['d'])) | return decoder(self.decode_data(o["d"])) | ||||
return {key: self.decode_data(value) for key, value in o.items()} | return {key: self.decode_data(value) for key, value in o.items()} | ||||
if isinstance(o, list): | if isinstance(o, list): | ||||
return [self.decode_data(value) for value in o] | return [self.decode_data(value) for value in o] | ||||
else: | else: | ||||
return o | return o | ||||
def raw_decode(self, s: str, idx: int = 0) -> Tuple[Any, int]: | def raw_decode(self, s: str, idx: int = 0) -> Tuple[Any, int]: | ||||
data, index = super().raw_decode(s, idx) | data, index = super().raw_decode(s, idx) | ||||
return self.decode_data(data), index | return self.decode_data(data), index | ||||
def json_dumps(data: Any, extra_encoders=None) -> str: | def json_dumps(data: Any, extra_encoders=None) -> str: | ||||
return json.dumps(data, cls=SWHJSONEncoder, | return json.dumps(data, cls=SWHJSONEncoder, extra_encoders=extra_encoders) | ||||
extra_encoders=extra_encoders) | |||||
def json_loads(data: str, extra_decoders=None) -> Any: | def json_loads(data: str, extra_decoders=None) -> Any: | ||||
return json.loads(data, cls=SWHJSONDecoder, | return json.loads(data, cls=SWHJSONDecoder, extra_decoders=extra_decoders) | ||||
extra_decoders=extra_decoders) | |||||
def msgpack_dumps(data: Any, extra_encoders=None) -> bytes: | def msgpack_dumps(data: Any, extra_encoders=None) -> bytes: | ||||
"""Write data as a msgpack stream""" | """Write data as a msgpack stream""" | ||||
encoders = ENCODERS | encoders = ENCODERS | ||||
if extra_encoders: | if extra_encoders: | ||||
encoders += extra_encoders | encoders += extra_encoders | ||||
def encode_types(obj): | def encode_types(obj): | ||||
if isinstance(obj, types.GeneratorType): | if isinstance(obj, types.GeneratorType): | ||||
return list(obj) | return list(obj) | ||||
for (type_, type_name, encoder) in encoders: | for (type_, type_name, encoder) in encoders: | ||||
if isinstance(obj, type_): | if isinstance(obj, type_): | ||||
return { | return {b"swhtype": type_name, b"d": encoder(obj)} | ||||
b'swhtype': type_name, | |||||
b'd': encoder(obj), | |||||
} | |||||
return obj | return obj | ||||
return msgpack.packb(data, use_bin_type=True, default=encode_types) | return msgpack.packb(data, use_bin_type=True, default=encode_types) | ||||
def msgpack_loads(data: bytes, extra_decoders=None) -> Any: | def msgpack_loads(data: bytes, extra_decoders=None) -> Any: | ||||
"""Read data as a msgpack stream""" | """Read data as a msgpack stream""" | ||||
decoders = DECODERS | decoders = DECODERS | ||||
if extra_decoders: | if extra_decoders: | ||||
decoders = {**decoders, **extra_decoders} | decoders = {**decoders, **extra_decoders} | ||||
def decode_types(obj): | def decode_types(obj): | ||||
# Support for current encodings | # Support for current encodings | ||||
if set(obj.keys()) == {b'd', b'swhtype'}: | if set(obj.keys()) == {b"d", b"swhtype"}: | ||||
decoder = decoders.get(obj[b'swhtype']) | decoder = decoders.get(obj[b"swhtype"]) | ||||
if decoder: | if decoder: | ||||
return decoder(obj[b'd']) | return decoder(obj[b"d"]) | ||||
# Support for legacy encodings | # Support for legacy encodings | ||||
if b'__datetime__' in obj and obj[b'__datetime__']: | if b"__datetime__" in obj and obj[b"__datetime__"]: | ||||
return iso8601.parse_date(obj[b's'], default_timezone=None) | return iso8601.parse_date(obj[b"s"], default_timezone=None) | ||||
if b'__uuid__' in obj and obj[b'__uuid__']: | if b"__uuid__" in obj and obj[b"__uuid__"]: | ||||
return UUID(obj[b's']) | return UUID(obj[b"s"]) | ||||
if b'__timedelta__' in obj and obj[b'__timedelta__']: | if b"__timedelta__" in obj and obj[b"__timedelta__"]: | ||||
return datetime.timedelta(**obj[b's']) | return datetime.timedelta(**obj[b"s"]) | ||||
if b'__arrow__' in obj and obj[b'__arrow__']: | if b"__arrow__" in obj and obj[b"__arrow__"]: | ||||
return arrow.get(obj[b's']) | return arrow.get(obj[b"s"]) | ||||
# Fallthrough | # Fallthrough | ||||
return obj | return obj | ||||
try: | try: | ||||
try: | try: | ||||
return msgpack.unpackb(data, raw=False, | return msgpack.unpackb( | ||||
object_hook=decode_types, | data, raw=False, object_hook=decode_types, strict_map_key=False | ||||
strict_map_key=False) | ) | ||||
except TypeError: # msgpack < 0.6.0 | except TypeError: # msgpack < 0.6.0 | ||||
return msgpack.unpackb(data, raw=False, | return msgpack.unpackb(data, raw=False, object_hook=decode_types) | ||||
object_hook=decode_types) | |||||
except TypeError: # msgpack < 0.5.2 | except TypeError: # msgpack < 0.5.2 | ||||
return msgpack.unpackb(data, encoding='utf-8', | return msgpack.unpackb(data, encoding="utf-8", object_hook=decode_types) | ||||
object_hook=decode_types) | |||||
def exception_to_dict(exception): | def exception_to_dict(exception): | ||||
tb = traceback.format_exception(None, exception, exception.__traceback__) | tb = traceback.format_exception(None, exception, exception.__traceback__) | ||||
return { | return { | ||||
'exception': { | "exception": { | ||||
'type': type(exception).__name__, | "type": type(exception).__name__, | ||||
'args': exception.args, | "args": exception.args, | ||||
'message': str(exception), | "message": str(exception), | ||||
'traceback': tb, | "traceback": tb, | ||||
} | } | ||||
} | } |