Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/pytest_plugin.py
Show All 10 Lines | |||||
import attr | import attr | ||||
import pytest | import pytest | ||||
from confluent_kafka import Consumer, KafkaException, Producer | from confluent_kafka import Consumer, KafkaException, Producer | ||||
from confluent_kafka.admin import AdminClient | from confluent_kafka.admin import AdminClient | ||||
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value, pprint_key | from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value, pprint_key | ||||
from swh.journal.tests.journal_data import ( | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
TEST_OBJECTS, | |||||
TEST_OBJECT_DICTS, | |||||
MODEL_OBJECTS, | |||||
) | |||||
def consume_messages(consumer, kafka_prefix, expected_messages): | def consume_messages(consumer, kafka_prefix, expected_messages): | ||||
"""Consume expected_messages from the consumer; | """Consume expected_messages from the consumer; | ||||
Sort them all into a consumed_objects dict""" | Sort them all into a consumed_objects dict""" | ||||
consumed_messages = defaultdict(list) | consumed_messages = defaultdict(list) | ||||
fetched_messages = 0 | fetched_messages = 0 | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | for object_type, known_objects in TEST_OBJECTS.items(): | ||||
assert key in received_keys, ( | assert key in received_keys, ( | ||||
f"expected {object_type} key {pprint_key(key)} " | f"expected {object_type} key {pprint_key(key)} " | ||||
"absent from consumed messages" | "absent from consumed messages" | ||||
) | ) | ||||
if exclude and object_type in exclude: | if exclude and object_type in exclude: | ||||
continue | continue | ||||
received_objects = [ | |||||
MODEL_OBJECTS[object_type].from_dict(d) for d in received_values | |||||
] | |||||
for value in known_objects: | for value in known_objects: | ||||
assert value in received_objects, ( | expected_value = value.to_dict() | ||||
if value.object_type in ("content", "skipped_content"): | |||||
del expected_value["ctime"] | |||||
assert expected_value in received_values, ( | |||||
f"expected {object_type} value {value!r} is " | f"expected {object_type} value {value!r} is " | ||||
"absent from consumed messages" | "absent from consumed messages" | ||||
) | ) | ||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def kafka_prefix(): | def kafka_prefix(): | ||||
"""Pick a random prefix for kafka topics on each call""" | """Pick a random prefix for kafka topics on each call""" | ||||
return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) | return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) | ||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def kafka_consumer_group(kafka_prefix: str): | def kafka_consumer_group(kafka_prefix: str): | ||||
"""Pick a random consumer group for kafka consumers on each call""" | """Pick a random consumer group for kafka consumers on each call""" | ||||
return "test-consumer-%s" % kafka_prefix | return "test-consumer-%s" % kafka_prefix | ||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def object_types(): | def object_types(): | ||||
"""Set of object types to precreate topics for.""" | """Set of object types to precreate topics for.""" | ||||
return set(TEST_OBJECT_DICTS.keys()) | return set(TEST_OBJECTS.keys()) | ||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def privileged_object_types(): | def privileged_object_types(): | ||||
"""Set of object types to precreate privileged topics for.""" | """Set of object types to precreate privileged topics for.""" | ||||
return {"revision", "release"} | return {"revision", "release"} | ||||
▲ Show 20 Lines • Show All 112 Lines • Show Last 20 Lines |