Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/serializers.py
# Copyright (C) 2016-2017 The Software Heritage developers | # Copyright (C) 2016-2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import msgpack | from swh.core.bencode import encode as bencode_encode | ||||
from swh.core.api.serializers import msgpack_dumps, msgpack_loads | from swh.core.api.serializers import msgpack_dumps, msgpack_loads | ||||
def key_to_kafka(key): | def key_to_kafka(key): | ||||
"""Serialize a key, possibly a dict, in a predictable way""" | """Serialize a key, possibly a dict, in a predictable way""" | ||||
p = msgpack.Packer(use_bin_type=True) | return bencode_encode(key) | ||||
if isinstance(key, dict): | |||||
return p.pack_map_pairs(sorted(key.items())) | |||||
else: | |||||
return p.pack(key) | |||||
def kafka_to_key(kafka_key): | |||||
"""Deserialize a key""" | |||||
return msgpack.loads(kafka_key) | |||||
def value_to_kafka(value): | def value_to_kafka(value): | ||||
"""Serialize some data for storage in kafka""" | """Serialize some data for storage in kafka""" | ||||
return msgpack_dumps(value) | return msgpack_dumps(value) | ||||
def kafka_to_value(kafka_value): | def kafka_to_value(kafka_value): | ||||
"""Deserialize some data stored in kafka""" | """Deserialize some data stored in kafka""" | ||||
return msgpack_loads(kafka_value) | return msgpack_loads(kafka_value) |