Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/pytest_plugin.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | import random | ||||
import string | import string | ||||
from typing import Collection, Dict, Iterator, Optional | from typing import Collection, Dict, Iterator, Optional | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import pytest | import pytest | ||||
from confluent_kafka import Consumer, KafkaException, Producer | from confluent_kafka import Consumer, KafkaException, Producer | ||||
from confluent_kafka.admin import AdminClient | from confluent_kafka.admin import AdminClient | ||||
from swh.model.hashutil import hash_to_hex | from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value, pprint_key | ||||
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value | |||||
from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS | from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS | ||||
def consume_messages(consumer, kafka_prefix, expected_messages): | def consume_messages(consumer, kafka_prefix, expected_messages): | ||||
"""Consume expected_messages from the consumer; | """Consume expected_messages from the consumer; | ||||
Sort them all into a consumed_objects dict""" | Sort them all into a consumed_objects dict""" | ||||
consumed_messages = defaultdict(list) | consumed_messages = defaultdict(list) | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | for object_type, known_values in TEST_OBJECT_DICTS.items(): | ||||
(received_keys, received_values) = zip(*consumed_messages[object_type]) | (received_keys, received_values) = zip(*consumed_messages[object_type]) | ||||
if object_type in ("content", "skipped_content"): | if object_type in ("content", "skipped_content"): | ||||
for value in received_values: | for value in received_values: | ||||
del value["ctime"] | del value["ctime"] | ||||
for key in known_keys: | for key in known_keys: | ||||
assert key in received_keys, ( | assert key in received_keys, ( | ||||
f"expected {object_type} key {hash_to_hex(key)} " | f"expected {object_type} key {pprint_key(key)} " | ||||
"absent from consumed messages" | "absent from consumed messages" | ||||
) | ) | ||||
if exclude and object_type in exclude: | if exclude and object_type in exclude: | ||||
continue | continue | ||||
for value in known_values: | for value in known_values: | ||||
if object_type == "origin_visit": | if object_type == "origin_visit": | ||||
value["date"] = str(value["date"]) | value["date"] = str(value["date"]) | ||||
assert value in received_values, ( | assert value in received_values, ( | ||||
f"expected {object_type} value {value!r} is " | f"expected {object_type} value {value!r} is " | ||||
ardumont: (forgot to submit) or we can use `{key!r}` (in the diff change above) as in here?
| |||||
"absent from consumed messages" | "absent from consumed messages" | ||||
) | ) | ||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def kafka_prefix(): | def kafka_prefix(): | ||||
"""Pick a random prefix for kafka topics on each call""" | """Pick a random prefix for kafka topics on each call""" | ||||
return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) | return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) | ||||
▲ Show 20 Lines • Show All 132 Lines • Show Last 20 Lines |
(forgot to submit) or we can use {key!r} (in the diff change above) as in here?