diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 07ecca7..41c48e8 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,67 +1,123 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime +from enum import Enum from typing import Any, Union import msgpack -from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.model import KeyType +class MsgpackExtTypeCodes(Enum): + LONG_INT = 1 + LONG_NEG_INT = 2 + + +# this as been copied from swh.core.api.serializer +# TODO refactor swh.core to make this function available +def _msgpack_encode_longint(value): + # needed because msgpack will not handle long integers with more than 64 bits + # which we unfortunately happen to have to deal with from time to time + if value > 0: + code = MsgpackExtTypeCodes.LONG_INT.value + else: + code = MsgpackExtTypeCodes.LONG_NEG_INT.value + value = -value + length, rem = divmod(value.bit_length(), 8) + if rem: + length += 1 + return msgpack.ExtType(code, int.to_bytes(value, length, "big")) + + +def msgpack_ext_encode_types(obj): + if isinstance(obj, int): + return _msgpack_encode_longint(obj) + return obj + + +def msgpack_ext_hook(code, data): + if code == MsgpackExtTypeCodes.LONG_INT.value: + return int.from_bytes(data, "big") + if code == MsgpackExtTypeCodes.LONG_NEG_INT.value: + return -int.from_bytes(data, "big") + + raise ValueError("Unknown msgpack extended code %s" % code) + + +# for BW compat +def decode_types_bw(obj): + if set(obj.keys()) == {b"d", b"swhtype"} and obj[b"swhtype"] == "datetime": + return datetime.datetime.fromisoformat(obj[b"d"]) + return obj + + def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" - return msgpack_dumps(value) + return msgpack.packb( + value, + use_bin_type=True, + datetime=True, # encode datetime as msgpack.Timestamp + default=msgpack_ext_encode_types, + ) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" - value = msgpack_loads(kafka_value) + value = msgpack.unpackb( + kafka_value, + raw=False, + object_hook=decode_types_bw, + ext_hook=msgpack_ext_hook, + strict_map_key=False, + timestamp=3, # convert Timestamp in datetime objects (tz UTC) + ) return ensure_tuples(value) def ensure_tuples(value: Any) -> Any: if isinstance(value, (tuple, list)): return tuple(map(ensure_tuples, value)) elif isinstance(value, dict): return dict(ensure_tuples(list(value.items()))) else: return value diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index f2b9671..9420b10 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,69 +1,116 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict +from datetime import datetime, timedelta, timezone import itertools from typing import Iterable +import pytest + from swh.journal import serializers from .conftest import TEST_OBJECTS def test_key_to_kafka_repeatable(): """Check the kafka key encoding is repeatable""" base_dict = { "a": "foo", "b": "bar", "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] assert key == serializers.key_to_kafka(d) def test_pprint_key(): """Test whether get_key works on all our objects""" for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() pprinted_key = serializers.pprint_key(key) assert isinstance(pprinted_key, str) if isinstance(key, dict): assert pprinted_key[0], pprinted_key[-1] == "{}" for dict_key in key.keys(): assert f"{dict_key}:" in pprinted_key if isinstance(key, bytes): assert pprinted_key == key.hex() def test_kafka_to_key(): """Standard back and forth serialization with keys """ # All KeyType(s) keys: Iterable[serializers.KeyType] = [ {"a": "foo", "b": "bar", "c": "baz",}, {"a": b"foobarbaz",}, b"foo", ] for object_type, objects in TEST_OBJECTS.items(): for obj in objects: key = obj.unique_key() keys.append(key) for key in keys: ktk = serializers.key_to_kafka(key) v = serializers.kafka_to_key(ktk) assert v == key + + +# limits of supported int values by msgpack +MININT = -(2 ** 63) +MAXINT = 2 ** 64 - 1 + +intvalues = [ + MININT * 2, + MININT - 1, + MININT, + MININT + 1, + -10, + 0, + 10, + MAXINT - 1, + MAXINT, + MAXINT + 1, + MAXINT * 2, +] + + +@pytest.mark.parametrize("value", intvalues) +def test_encode_int(value): + assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value + + +datevalues = [ + datetime.now(tz=timezone.utc), + datetime.now(tz=timezone(timedelta(hours=-23, minutes=-59))), + datetime.now(tz=timezone(timedelta(hours=23, minutes=59))), + datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(2100, 1, 1, 1, 1, tzinfo=timezone.utc), +] + + +@pytest.mark.parametrize("value", datevalues) +def test_encode_datetime(value): + assert serializers.kafka_to_value(serializers.value_to_kafka(value)) == value + + +@pytest.mark.parametrize("value", datevalues) +def test_encode_datetime_bw(value): + bwdate = {b"swhtype": "datetime", b"d": value.isoformat()} + assert serializers.kafka_to_value(serializers.value_to_kafka(bwdate)) == value