diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ -swh.core[db,http] >= 0.0.60 +swh.core[db,http] >= 0.0.70 swh.model >= 0.0.40 swh.storage >= 0.0.147 diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -3,23 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import msgpack - +from swh.core.bencode import encode as bencode_encode from swh.core.api.serializers import msgpack_dumps, msgpack_loads def key_to_kafka(key): """Serialize a key, possibly a dict, in a predictable way""" - p = msgpack.Packer(use_bin_type=True) - if isinstance(key, dict): - return p.pack_map_pairs(sorted(key.items())) - else: - return p.pack(key) - - -def kafka_to_key(kafka_key): - """Deserialize a key""" - return msgpack.loads(kafka_key) + return bencode_encode(key) def value_to_kafka(value): diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -140,7 +140,7 @@ producer.produce( topic=kafka_prefix+'.content', key=key_to_kafka(sha1), - value=key_to_kafka({ + value=value_to_kafka({ 'sha1': sha1, 'status': 'visible', }), diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -13,9 +13,7 @@ from swh.storage import get_storage from swh.journal.writer.kafka import KafkaJournalWriter -from swh.journal.serializers import ( - kafka_to_key, kafka_to_value -) +from swh.journal.serializers import kafka_to_value, key_to_kafka from .conftest import OBJECT_TYPE_KEYS @@ -45,14 +43,15 @@ fetched_messages += 1 consumed_objects[msg.topic()].append( - (kafka_to_key(msg.key()), kafka_to_value(msg.value())) + (msg.key(), kafka_to_value(msg.value())) ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: - assert list(keys) == [object_[key_name] for object_ in objects] + assert list(keys) == [key_to_kafka(object_[key_name]) + for object_ in objects] else: pass # TODO diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -130,12 +130,12 @@ # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) - writer.send('origin', 'foo', { + writer.send('origin', b'foo', { 'url': 'http://example.com/', 'type': 'git', }) for visit in visits: - writer.send('origin_visit', 'foo', visit) + writer.send('origin_visit', b'foo', visit) queue_size = len(queue) diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import OrderedDict import itertools import unittest @@ -14,15 +13,15 @@ def test_key_to_kafka_repeatable(self): """Check the kafka key encoding is repeatable""" base_dict = { - 'a': 'foo', - 'b': 'bar', - 'c': 'baz', + b'a': b'foo', + b'b': b'bar', + b'c': b'baz', } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): - d = OrderedDict() + d = {} for k in dict_keys: d[k] = base_dict[k] diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -63,15 +63,21 @@ return object_['sha1'] # TODO: use a dict of hashes elif object_type == 'skipped_content': return { - hash: object_[hash] + hash.encode('ascii'): object_[hash] for hash in DEFAULT_ALGORITHMS } elif object_type == 'origin': - return {'url': object_['url'], 'type': object_['type']} + return { + b'url': object_['url'].encode('ascii'), + b'type': object_['type'].encode('ascii') + } elif object_type == 'origin_visit': return { - 'origin': object_['origin'], - 'date': str(object_['date']), + b'origin': { + b'url': object_['origin']['url'].encode('ascii'), + b'type': object_['origin']['type'].encode('ascii'), + }, + b'date': str(object_['date']).encode('ascii'), } else: raise ValueError('Unknown object type: %s.' % object_type)