diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -3,14 +3,58 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime +from enum import Enum from typing import Any, Union import msgpack -from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.model import KeyType +class MsgpackExtTypeCodes(Enum): + LONG_INT = 1 + LONG_NEG_INT = 2 + + +# this as been copied from swh.core.api.serializer +# TODO refactor swh.core to make this function available +def _msgpack_encode_int(value): + # needed because msgpack will not handle long integers with more than 64 bits + # which we unfortunately happen to have to deal with from time to time + if value > 0: + code = MsgpackExtTypeCodes.LONG_INT.value + else: + code = MsgpackExtTypeCodes.LONG_NEG_INT.value + value = -value + length, rem = divmod(value.bit_length(), 8) + if rem: + length += 1 + return msgpack.ExtType(code, int.to_bytes(value, length, "big")) + + +def msgpack_ext_encode_types(obj): + if isinstance(obj, int): + return _msgpack_encode_int(obj) + return obj + + +def msgpack_ext_hook(code, data): + if code == MsgpackExtTypeCodes.LONG_INT.value: + return int.from_bytes(data, "big") + if code == MsgpackExtTypeCodes.LONG_NEG_INT.value: + return -int.from_bytes(data, "big") + + raise ValueError("Unknown msgpack extended code %s" % code) + + +# for BW compat +def decode_types_bw(obj): + if set(obj.keys()) == {b"d", b"swhtype"} and obj[b"swhtype"] == "datetime": + return datetime.datetime.fromisoformat(obj[b"d"]) + return obj + + def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): @@ -49,12 +93,24 @@ def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" - return msgpack_dumps(value) + return msgpack.packb( + value, + use_bin_type=True, + datetime=True, # encode datetime as msgpack.Timestamp + default=msgpack_ext_encode_types, + ) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" - value = msgpack_loads(kafka_value) + value = msgpack.unpackb( + kafka_value, + raw=False, + object_hook=decode_types_bw, + ext_hook=msgpack_ext_hook, + strict_map_key=False, + timestamp=3, # convert Timestamp in datetime objects (tz UTC) + ) return ensure_tuples(value)