Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/serializers.py
Show First 20 Lines • Show All 97 Lines • ▼ Show 20 Lines | return msgpack.packb( | ||||
use_bin_type=True, | use_bin_type=True, | ||||
datetime=True, # encode datetime as msgpack.Timestamp | datetime=True, # encode datetime as msgpack.Timestamp | ||||
default=msgpack_ext_encode_types, | default=msgpack_ext_encode_types, | ||||
) | ) | ||||
def kafka_to_value(kafka_value: bytes) -> Any: | def kafka_to_value(kafka_value: bytes) -> Any: | ||||
"""Deserialize some data stored in kafka""" | """Deserialize some data stored in kafka""" | ||||
value = msgpack.unpackb( | return msgpack.unpackb( | ||||
kafka_value, | kafka_value, | ||||
raw=False, | raw=False, | ||||
object_hook=decode_types_bw, | object_hook=decode_types_bw, | ||||
ext_hook=msgpack_ext_hook, | ext_hook=msgpack_ext_hook, | ||||
strict_map_key=False, | strict_map_key=False, | ||||
timestamp=3, # convert Timestamp in datetime objects (tz UTC) | timestamp=3, # convert Timestamp in datetime objects (tz UTC) | ||||
) | ) | ||||
return ensure_tuples(value) | |||||
def ensure_tuples(value: Any) -> Any: | |||||
if isinstance(value, (tuple, list)): | |||||
return tuple(map(ensure_tuples, value)) | |||||
elif isinstance(value, dict): | |||||
return dict(ensure_tuples(list(value.items()))) | |||||
else: | |||||
return value |