Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/pytest_plugin.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import random | import random | ||||
import string | import string | ||||
from typing import Collection, Dict, Iterator, Optional | from typing import Any, Collection, Dict, Iterator, Optional | ||||
import attr | import attr | ||||
from confluent_kafka import Consumer, KafkaException, Producer | from confluent_kafka import Consumer, KafkaException, Producer | ||||
from confluent_kafka.admin import AdminClient | from confluent_kafka.admin import AdminClient | ||||
import pytest | import pytest | ||||
from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key | from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
def ensure_lists(value: Any) -> Any: | |||||
""" | |||||
>>> ensure_lists(["foo", 42]) | |||||
["foo", 42] | |||||
>>> ensure_lists(("foo", 42)) | |||||
["foo", 42] | |||||
>>> ensure_lists({"a": ["foo", 42]}) | |||||
{"a": ["foo", 42]} | |||||
>>> ensure_lists({"a": ("foo", 42)}) | |||||
{"a": ["foo", 42]} | |||||
""" | |||||
if isinstance(value, (tuple, list)): | |||||
return list(map(ensure_lists, value)) | |||||
elif isinstance(value, dict): | |||||
return dict(ensure_lists(list(value.items()))) | |||||
else: | |||||
return value | |||||
def consume_messages(consumer, kafka_prefix, expected_messages): | def consume_messages(consumer, kafka_prefix, expected_messages): | ||||
"""Consume expected_messages from the consumer; | """Consume expected_messages from the consumer; | ||||
Sort them all into a consumed_objects dict""" | Sort them all into a consumed_objects dict""" | ||||
consumed_messages = defaultdict(list) | consumed_messages = defaultdict(list) | ||||
fetched_messages = 0 | fetched_messages = 0 | ||||
retries_left = 1000 | retries_left = 1000 | ||||
▲ Show 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | for object_type, known_objects in TEST_OBJECTS.items(): | ||||
if exclude and object_type in exclude: | if exclude and object_type in exclude: | ||||
continue | continue | ||||
for value in known_objects: | for value in known_objects: | ||||
expected_value = value.to_dict() | expected_value = value.to_dict() | ||||
if value.object_type in ("content", "skipped_content"): | if value.object_type in ("content", "skipped_content"): | ||||
expected_value.pop("ctime", None) | expected_value.pop("ctime", None) | ||||
assert expected_value in received_values, ( | assert ensure_lists(expected_value) in received_values, ( | ||||
f"expected {object_type} value {value!r} is " | f"expected {object_type} value {value!r} is " | ||||
"absent from consumed messages" | "absent from consumed messages" | ||||
) | ) | ||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def kafka_prefix(): | def kafka_prefix(): | ||||
"""Pick a random prefix for kafka topics on each call""" | """Pick a random prefix for kafka topics on each call""" | ||||
▲ Show 20 Lines • Show All 133 Lines • Show Last 20 Lines |