Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/serializers.py
# Copyright (C) 2016-2017 The Software Heritage developers | # Copyright (C) 2016-2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Any, Dict, Union | from typing import Any, Dict, Union | ||||
import msgpack | import msgpack | ||||
from swh.core.api.serializers import msgpack_dumps, msgpack_loads | from swh.core.api.serializers import msgpack_dumps, msgpack_loads | ||||
KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] | |||||
def key_to_kafka(key: Union[bytes, Dict]) -> bytes: | |||||
def key_to_kafka(key: KeyType) -> bytes: | |||||
"""Serialize a key, possibly a dict, in a predictable way""" | """Serialize a key, possibly a dict, in a predictable way""" | ||||
p = msgpack.Packer(use_bin_type=True) | p = msgpack.Packer(use_bin_type=True) | ||||
if isinstance(key, dict): | if isinstance(key, dict): | ||||
return p.pack_map_pairs(sorted(key.items())) | return p.pack_map_pairs(sorted(key.items())) | ||||
else: | else: | ||||
return p.pack(key) | return p.pack(key) | ||||
def kafka_to_key(kafka_key: bytes) -> Union[bytes, Dict]: | def kafka_to_key(kafka_key: bytes) -> KeyType: | ||||
"""Deserialize a key""" | """Deserialize a key""" | ||||
return msgpack.loads(kafka_key) | return msgpack.loads(kafka_key) | ||||
def value_to_kafka(value: Any) -> bytes: | def value_to_kafka(value: Any) -> bytes: | ||||
"""Serialize some data for storage in kafka""" | """Serialize some data for storage in kafka""" | ||||
return msgpack_dumps(value) | return msgpack_dumps(value) | ||||
def kafka_to_value(kafka_value: bytes) -> Any: | def kafka_to_value(kafka_value: bytes) -> Any: | ||||
"""Deserialize some data stored in kafka""" | """Deserialize some data stored in kafka""" | ||||
return msgpack_loads(kafka_value) | return msgpack_loads(kafka_value) |