diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -3,14 +3,58 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime +from enum import Enum from typing import Any, Union import msgpack -from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.model import KeyType +class MsgpackExtTypeCodes(Enum): + LONG_INT = 1 + LONG_NEG_INT = 2 + + +# this as been copied from swh.core.api.serializer +# TODO refactor swh.core to make this function available +def _msgpack_encode_longint(value): + # needed because msgpack will not handle long integers with more than 64 bits + # which we unfortunately happen to have to deal with from time to time + if value > 0: + code = MsgpackExtTypeCodes.LONG_INT.value + else: + code = MsgpackExtTypeCodes.LONG_NEG_INT.value + value = -value + length, rem = divmod(value.bit_length(), 8) + if rem: + length += 1 + return msgpack.ExtType(code, int.to_bytes(value, length, "big")) + + +def msgpack_ext_encode_types(obj): + if isinstance(obj, int): + return _msgpack_encode_longint(obj) + return obj + + +def msgpack_ext_hook(code, data): + if code == MsgpackExtTypeCodes.LONG_INT.value: + return int.from_bytes(data, "big") + if code == MsgpackExtTypeCodes.LONG_NEG_INT.value: + return -int.from_bytes(data, "big") + + raise ValueError("Unknown msgpack extended code %s" % code) + + +# for BW compat +def decode_types_bw(obj): + if set(obj.keys()) == {b"d", b"swhtype"} and obj[b"swhtype"] == "datetime": + return datetime.datetime.fromisoformat(obj[b"d"]) + return obj + + def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): @@ -49,12 +93,24 @@ def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" - return msgpack_dumps(value) + return msgpack.packb( + value, + use_bin_type=True, + datetime=True, # encode datetime as msgpack.Timestamp + default=msgpack_ext_encode_types, + ) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" - value = msgpack_loads(kafka_value) + value = msgpack.unpackb( + kafka_value, + raw=False, + object_hook=decode_types_bw, + ext_hook=msgpack_ext_hook, + strict_map_key=False, + timestamp=3, # convert Timestamp in datetime objects (tz UTC) + ) return ensure_tuples(value) diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -4,9 +4,12 @@ # See top-level LICENSE file for more information from collections import OrderedDict +from datetime import datetime, timedelta, timezone import itertools from typing import Iterable +import pytest + from swh.journal import serializers from .conftest import TEST_OBJECTS @@ -67,3 +70,47 @@ v = serializers.kafka_to_key(ktk) assert v == key + + +# limits of supported int values by msgpack +MININT = -(2 ** 63) +MAXINT = 2 ** 64 - 1 + +intvalues = [ + MININT * 2, + MININT - 1, + MININT, + MININT + 1, + -10, + 0, + 10, + MAXINT - 1, + MAXINT, + MAXINT + 1, + MAXINT * 2, +] + + +@pytest.mark.parametrize("value", intvalues) +def test_encode_int(value): + assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value + + +datevalues = [ + datetime.now(tz=timezone.utc), + datetime.now(tz=timezone(timedelta(hours=-23, minutes=-59))), + datetime.now(tz=timezone(timedelta(hours=23, minutes=59))), + datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(2100, 1, 1, 1, 1, tzinfo=timezone.utc), +] + + +@pytest.mark.parametrize("value", datevalues) +def test_encode_datetime(value): + assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value + + +@pytest.mark.parametrize("value", datevalues) +def test_encode_datetime_bw(value): + bwdate = {b"swhtype": "datetime", b"d": value.isoformat()} + assert serializers.kafka_to_value(serializers.value_to_kafka(bwdate)) == value