diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -77,7 +77,7 @@ def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" - return msgpack.loads(kafka_key) + return msgpack.loads(kafka_key, raw=False, strict_map_key=False) def value_to_kafka(value: Any) -> bytes: diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -15,7 +15,7 @@ from hypothesis.strategies import one_of from subprocess import Popen -from typing import Any, Dict, Iterator, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Tuple from pathlib import Path from pytest_kafka import ( @@ -145,19 +145,19 @@ }, ] -TEST_OBJECT_DICTS: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] = { - "content": ("sha1", CONTENTS), - "revision": ("id", REVISIONS), - "release": ("id", RELEASES), - "origin": (None, ORIGINS), - "origin_visit": (None, ORIGIN_VISITS), +TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { + "content": CONTENTS, + "revision": REVISIONS, + "release": RELEASES, + "origin": ORIGINS, + "origin_visit": ORIGIN_VISITS, } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} -for object_type, (_, objects) in TEST_OBJECT_DICTS.items(): +for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -11,7 +11,7 @@ from swh.storage import get_storage -from swh.journal.serializers import kafka_to_key, kafka_to_value +from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.writer.kafka import KafkaJournalWriter from swh.model.model import Origin, OriginVisit @@ -58,22 +58,23 @@ def assert_all_objects_consumed(consumed_messages): """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" - for (object_type, (key_name, objects)) in TEST_OBJECT_DICTS.items(): - (keys, values) = zip(*consumed_messages[object_type]) - if key_name: - assert list(keys) == [object_[key_name] for object_ in objects] - else: - pass # TODO + for object_type, known_values in TEST_OBJECT_DICTS.items(): + known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] + + (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type == "origin_visit": - for value in values: + for value in received_values: del value["visit"] elif object_type == "content": - for value in values: + for value in received_values: del value["ctime"] - for object_ in objects: - assert object_ in values + for key in known_keys: + assert key in received_keys + + for value in known_values: + assert value in received_values def test_kafka_writer( diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -66,7 +66,7 @@ # Fill Kafka nb_sent = 0 nb_visits = 0 - for (object_type, (_, objects)) in TEST_OBJECT_DICTS.items(): + for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) @@ -98,17 +98,15 @@ assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"][1] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"][1]]) + assert TEST_OBJECT_DICTS["revision"] == list( + storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) - assert TEST_OBJECT_DICTS["release"][1] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"][1]]) + assert TEST_OBJECT_DICTS["release"] == list( + storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) - origins = list( - storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"][1]]) - ) - assert TEST_OBJECT_DICTS["origin"][1] == [{"url": orig["url"]} for orig in origins] + origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) + assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ @@ -117,7 +115,7 @@ "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } - for visit in TEST_OBJECT_DICTS["origin_visit"][1] + for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) @@ -125,7 +123,7 @@ del visit["visit"] # opaque identifier assert expected_visits == actual_visits - input_contents = TEST_OBJECT_DICTS["content"][1] + input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} @@ -171,7 +169,7 @@ # Fill Kafka nb_sent = 0 nb_visits = 0 - for (object_type, (_, objects)) in TEST_OBJECT_DICTS.items(): + for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) @@ -213,17 +211,15 @@ assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"][1] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"][1]]) + assert TEST_OBJECT_DICTS["revision"] == list( + storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) - assert TEST_OBJECT_DICTS["release"][1] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"][1]]) + assert TEST_OBJECT_DICTS["release"] == list( + storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) - origins = list( - storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"][1]]) - ) - assert TEST_OBJECT_DICTS["origin"][1] == [{"url": orig["url"]} for orig in origins] + origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) + assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ @@ -232,7 +228,7 @@ "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } - for visit in TEST_OBJECT_DICTS["origin_visit"][1] + for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) @@ -240,7 +236,7 @@ del visit["visit"] # opaque identifier assert expected_visits == actual_visits - input_contents = TEST_OBJECT_DICTS["content"][1] + input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents}