diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index f6d7311..36af3a1 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,235 +1,246 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import string from typing import Collection, Dict, Iterator, Optional from collections import defaultdict +import attr import pytest from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value, pprint_key -from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS +from swh.journal.tests.journal_data import ( + TEST_OBJECTS, + TEST_OBJECT_DICTS, + MODEL_OBJECTS, +) def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): - """Check whether all objects from TEST_OBJECT_DICTS have been consumed + """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ - for object_type, known_values in TEST_OBJECT_DICTS.items(): - known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] + for object_type, known_objects in TEST_OBJECTS.items(): + known_keys = [object_key(object_type, obj) for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: del value["ctime"] + if object_type == "content": + known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue - for value in known_values: - assert value in received_values, ( + received_objects = [ + MODEL_OBJECTS[object_type].from_dict(d) for d in received_values + ] + + for value in known_objects: + assert value in received_objects, ( f"expected {object_type} value {value!r} is " "absent from consumed messages" ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECT_DICTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index 9076b1d..4aad169 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,319 +1,325 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Any, Dict, List, Type from swh.model.hashutil import MultiHash, hash_to_bytes from swh.journal.serializers import ModelObject from swh.model.model import ( BaseModel, Content, Directory, Origin, OriginVisit, OriginVisitStatus, Release, Revision, SkippedContent, Snapshot, ) +MODEL_CLASSES = ( + Content, + Directory, + Origin, + OriginVisit, + OriginVisitStatus, + Release, + Revision, + SkippedContent, + Snapshot, +) OBJECT_TYPES: Dict[Type[BaseModel], str] = { - Content: "content", - Directory: "directory", - Origin: "origin", - OriginVisit: "origin_visit", - OriginVisitStatus: "origin_visit_status", - Release: "release", - Revision: "revision", - SkippedContent: "skipped_content", - Snapshot: "snapshot", + cls: cls.object_type for cls in MODEL_CLASSES # type: ignore +} +MODEL_OBJECTS: Dict[str, Type[BaseModel]] = { + cls.object_type: cls for cls in MODEL_CLASSES # type: ignore } UTC = datetime.timezone.utc CONTENTS = [ { **MultiHash.from_data(f"foo{i}".encode()).digest(), "length": 4, + "data": f"foo{i}".encode(), "status": "visible", } for i in range(10) ] + [ { **MultiHash.from_data(f"forbidden foo{i}".encode()).digest(), "length": 14, + "data": f"forbidden foo{i}".encode(), "status": "hidden", } for i in range(10) ] SKIPPED_CONTENTS = [ { **MultiHash.from_data(f"bar{i}".encode()).digest(), "length": 4, "status": "absent", "reason": f"because chr({i}) != '*'", } for i in range(2) ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("4ca486e65eb68e4986aeef8227d2db1d56ce51b3"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": (), }, { "id": hash_to_bytes("677063f5c405d6fc1781fc56379c9a9adf43d3a0"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": (), }, ] RELEASES = [ { "id": hash_to_bytes("8059dc4e17fcd0e51ca3bcd6b80f4577d281fd08"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=UTC), "visit": 1, "type": "git", }, { "origin": ORIGINS[1]["url"], "date": datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=UTC), "visit": 1, "type": "hg", }, { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), "visit": 2, "type": "git", }, { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2018, 11, 27, 17, 20, 39, tzinfo=UTC), "visit": 3, "type": "git", }, { "origin": ORIGINS[1]["url"], "date": datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=UTC), "visit": 2, "type": "hg", }, ] # The origin-visit-status dates needs to be shifted slightly in the future from their # visit dates counterpart. Otherwise, we are hitting storage-wise the "on conflict" # ignore policy (because origin-visit-add creates an origin-visit-status with the same # parameters from the origin-visit {origin, visit, date}... ORIGIN_VISIT_STATUSES = [ { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2013, 5, 7, 4, 20, 39, 432222, tzinfo=UTC), "visit": 1, "status": "ongoing", "snapshot": None, "metadata": None, }, { "origin": ORIGINS[1]["url"], "date": datetime.datetime(2014, 11, 27, 17, 21, 12, tzinfo=UTC), "visit": 1, "status": "ongoing", "snapshot": None, "metadata": None, }, { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2018, 11, 27, 17, 20, 59, tzinfo=UTC), "visit": 2, "status": "ongoing", "snapshot": None, "metadata": None, }, { "origin": ORIGINS[0]["url"], "date": datetime.datetime(2018, 11, 27, 17, 20, 49, tzinfo=UTC), "visit": 3, "status": "full", "snapshot": hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), "metadata": None, }, { "origin": ORIGINS[1]["url"], "date": datetime.datetime(2015, 11, 27, 17, 22, 18, tzinfo=UTC), "visit": 2, "status": "partial", "snapshot": hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), "metadata": None, }, ] DIRECTORIES = [ {"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), "entries": ()}, { "id": hash_to_bytes("21416d920e0ebf0df4a7888bed432873ed5cb3a7"), "entries": ( { "name": b"file1.ext", "perms": 0o644, "type": "file", "target": CONTENTS[0]["sha1_git"], }, { "name": b"dir1", "perms": 0o755, "type": "dir", "target": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), }, { "name": b"subprepo1", "perms": 0o160000, "type": "rev", "target": REVISIONS[1]["id"], }, ), }, ] SNAPSHOTS = [ { "id": hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), "branches": { b"master": {"target_type": "revision", "target": REVISIONS[0]["id"]} }, }, { "id": hash_to_bytes("8ce268b87faf03850693673c3eb5c9bb66e1ca38"), "branches": { b"target/revision": { "target_type": "revision", "target": REVISIONS[0]["id"], }, b"target/alias": {"target_type": "alias", "target": b"target/revision"}, b"target/directory": { "target_type": "directory", "target": DIRECTORIES[0]["id"], }, b"target/release": {"target_type": "release", "target": RELEASES[0]["id"]}, b"target/snapshot": { "target_type": "snapshot", "target": hash_to_bytes("17d0066a4a80aba4a0e913532ee8ff2014f006a9"), }, }, }, ] TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, "directory": DIRECTORIES, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, "origin_visit_status": ORIGIN_VISIT_STATUSES, "release": RELEASES, "revision": REVISIONS, "snapshot": SNAPSHOTS, "skipped_content": SKIPPED_CONTENTS, } -MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} - TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): if object_type == "content": - obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now(tz=UTC)} + obj_d = {**obj_d, "ctime": datetime.datetime.now(tz=UTC)} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects