diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core[db,http] >= 0.0.60 -swh.model >= 0.2 +swh.model >= 0.3 diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -6,7 +6,7 @@ import random import string -from typing import Dict, Iterator +from typing import Collection, Dict, Iterator, Optional from collections import defaultdict import pytest @@ -14,6 +14,7 @@ from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient +from swh.model.hashutil import hash_to_hex from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS @@ -60,8 +61,15 @@ return consumed_messages -def assert_all_objects_consumed(consumed_messages): - """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" +def assert_all_objects_consumed( + consumed_messages: Dict, exclude: Optional[Collection] = None +): + """Check whether all objects from TEST_OBJECT_DICTS have been consumed + + `exclude` can be a list of object types for which we do not want to compare the + values (eg. for anonymized object). + + """ for object_type, known_values in TEST_OBJECT_DICTS.items(): known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] @@ -70,18 +78,24 @@ (received_keys, received_values) = zip(*consumed_messages[object_type]) - if object_type == "origin_visit": - for value in received_values: - del value["visit"] - elif object_type == "content": + if object_type in ("content", "skipped_content"): for value in received_values: del value["ctime"] for key in known_keys: - assert key in received_keys + assert key in received_keys, ( + f"expected {object_type} key {hash_to_hex(key)} " + "absent from consumed messages" + ) + + if exclude and object_type in exclude: + continue for value in known_values: - assert value in received_values + assert value in received_values, ( + f"expected {object_type} value {value!r} is " + "absent from consumed messages" + ) @pytest.fixture(scope="function") diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -109,4 +109,13 @@ def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" - return msgpack_loads(kafka_value) + value = msgpack_loads(kafka_value) + if isinstance(value, list): + return tuple(value) + if isinstance(value, dict): + return ensure_tuples(value) + return value + + +def ensure_tuples(value: Dict) -> Dict: + return {k: tuple(v) if isinstance(v, list) else v for k, v in value.items()} diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -37,7 +37,21 @@ UTC = datetime.timezone.utc CONTENTS = [ - {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, + { + **MultiHash.from_data(f"foo{i}".encode()).digest(), + "length": 4, + "status": "visible", + } + for i in range(10) +] +SKIPPED_CONTENTS = [ + { + **MultiHash.from_data(f"bar{i}".encode()).digest(), + "length": 4, + "status": "absent", + "reason": f"because chr({i}) != '*'", + } + for i in range(2) ] duplicate_content1 = { @@ -89,7 +103,7 @@ "directory": b"\x01" * 20, "synthetic": False, "metadata": None, - "parents": [], + "parents": (), }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), @@ -102,7 +116,7 @@ "directory": b"\x02" * 20, "synthetic": False, "metadata": None, - "parents": [], + "parents": (), }, ] @@ -132,30 +146,117 @@ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO + "snapshot": None, + "status": "ongoing", "metadata": {"foo": "bar"}, "type": "git", + "visit": 1, + }, + { + "origin": ORIGINS[1]["url"], + "date": "2014-11-27 17:20:39+00:00", + "snapshot": None, + "status": "ongoing", + "metadata": {"baz": "qux"}, + "type": "hg", + "visit": 1, + }, + { + "origin": ORIGINS[0]["url"], + "date": "2018-11-27 17:20:39+00:00", + "snapshot": None, + "status": "ongoing", + "metadata": {"baz": "qux"}, + "type": "git", + "visit": 2, }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO + "snapshot": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + "status": "full", "metadata": {"baz": "qux"}, "type": "git", + "visit": 3, + }, + { + "origin": ORIGINS[1]["url"], + "date": "2015-11-27 17:20:39+00:00", + "snapshot": hash_to_bytes("ecee48397a92b0d034e9752a17459f3691a73ef9"), + "status": "partial", + "metadata": {"something": "wrong occurred"}, + "type": "hg", + "visit": 2, }, ] + +DIRECTORIES = [ + {"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), "entries": ()}, + { + "id": hash_to_bytes("cc13247a0d6584f297ca37b5868d2cbd242aea03"), + "entries": ( + { + "name": b"file1.ext", + "perms": 0o644, + "type": "file", + "target": CONTENTS[0]["sha1_git"], + }, + { + "name": b"dir1", + "perms": 0o755, + "type": "dir", + "target": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + }, + { + "name": b"subprepo1", + "perms": 0o160000, + "type": "rev", + "target": REVISIONS[1]["id"], + }, + ), + }, +] + + +SNAPSHOTS = [ + { + "id": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + "branches": { + b"master": {"target_type": "revision", "target": REVISIONS[0]["id"]} + }, + }, + { + "id": hash_to_bytes("ecee48397a92b0d034e9752a17459f3691a73ef9"), + "branches": { + b"target/revision": { + "target_type": "revision", + "target": REVISIONS[0]["id"], + }, + b"target/alias": {"target_type": "alias", "target": b"target/revision"}, + b"target/directory": { + "target_type": "directory", + "target": DIRECTORIES[0]["id"], + }, + b"target/release": {"target_type": "release", "target": RELEASES[0]["id"]}, + b"target/snapshot": { + "target_type": "snapshot", + "target": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + }, + }, + }, +] + + TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, - "directory": [], + "directory": DIRECTORIES, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, "release": RELEASES, "revision": REVISIONS, - "snapshot": [], - "skipped_content": [], + "snapshot": SNAPSHOTS, + "skipped_content": SKIPPED_CONTENTS, } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} @@ -167,9 +268,7 @@ model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): - if object_type == "origin_visit": - obj_d = {**obj_d, "visit": num} - elif object_type == "content": + if object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now(tz=UTC)} converted_objects.append(model.from_dict(obj_d)) diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -35,7 +35,7 @@ ), "synthetic": False, "metadata": None, - "parents": [], + "parents": (), "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", } diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -77,7 +77,7 @@ expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) - assert_all_objects_consumed(consumed_messages) + assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"]) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) @@ -121,7 +121,7 @@ brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) - empty_dir = Directory(entries=[]) + empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) @@ -153,7 +153,7 @@ producer_class=MockProducer, ) - empty_dir = Directory(entries=[]) + empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir)