diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ -swh.core[db,http] >= 0.0.60 +swh.core[db,http] >= 0.0.70 swh.model >= 0.0.40 swh.storage >= 0.0.147 diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -3,23 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import msgpack - +from swh.core.bencode import encode as bencode_encode from swh.core.api.serializers import msgpack_dumps, msgpack_loads def key_to_kafka(key): """Serialize a key, possibly a dict, in a predictable way""" - p = msgpack.Packer(use_bin_type=True) - if isinstance(key, dict): - return p.pack_map_pairs(sorted(key.items())) - else: - return p.pack(key) - - -def kafka_to_key(kafka_key): - """Deserialize a key""" - return msgpack.loads(kafka_key) + return bencode_encode(key) def value_to_kafka(value): diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -140,7 +140,7 @@ producer.produce( topic=kafka_prefix+'.content', key=key_to_kafka(sha1), - value=key_to_kafka({ + value=value_to_kafka({ 'sha1': sha1, 'status': 'visible', }), diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -13,9 +13,7 @@ from swh.storage import get_storage from swh.journal.writer.kafka import KafkaJournalWriter -from swh.journal.serializers import ( - kafka_to_key, kafka_to_value -) +from swh.journal.serializers import kafka_to_value, key_to_kafka from .conftest import OBJECT_TYPE_KEYS @@ -45,14 +43,15 @@ fetched_messages += 1 consumed_objects[msg.topic()].append( - (kafka_to_key(msg.key()), kafka_to_value(msg.value())) + (msg.key(), kafka_to_value(msg.value())) ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: - assert list(keys) == [object_[key_name] for object_ in objects] + assert list(keys) == [key_to_kafka(object_[key_name]) + for object_ in objects] else: pass # TODO diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import OrderedDict import itertools import unittest @@ -22,7 +21,7 @@ key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): - d = OrderedDict() + d = {} for k in dict_keys: d[k] = base_dict[k]