Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/pytest_plugin.py
Show First 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | while fetched_messages < expected_messages: | ||||
if error is not None: | if error is not None: | ||||
if error.fatal(): | if error.fatal(): | ||||
raise KafkaException(error) | raise KafkaException(error) | ||||
retries_left -= 1 | retries_left -= 1 | ||||
continue | continue | ||||
fetched_messages += 1 | fetched_messages += 1 | ||||
topic = msg.topic() | topic = msg.topic() | ||||
assert topic.startswith(kafka_prefix + "."), "Unexpected topic" | assert topic.startswith(kafka_prefix + ".") or topic.startswith( | ||||
kafka_prefix + "_privileged." | |||||
), "Unexpected topic" | |||||
object_type = topic[len(kafka_prefix + ".") :] | object_type = topic[len(kafka_prefix + ".") :] | ||||
consumed_messages[object_type].append( | consumed_messages[object_type].append( | ||||
(kafka_to_key(msg.key()), kafka_to_value(msg.value())) | (kafka_to_key(msg.key()), kafka_to_value(msg.value())) | ||||
) | ) | ||||
return consumed_messages | return consumed_messages | ||||
Show All 36 Lines | |||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def object_types(): | def object_types(): | ||||
"""Set of object types to precreate topics for.""" | """Set of object types to precreate topics for.""" | ||||
return set(TEST_OBJECT_DICTS.keys()) | return set(TEST_OBJECT_DICTS.keys()) | ||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def privileged_object_types(): | |||||
"""Set of object types to precreate privileged topics for.""" | |||||
return {"revision", "release"} | |||||
@pytest.fixture(scope="function") | |||||
def kafka_server( | def kafka_server( | ||||
kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str] | kafka_server_base: str, | ||||
kafka_prefix: str, | |||||
object_types: Iterator[str], | |||||
privileged_object_types: Iterator[str], | |||||
) -> str: | ) -> str: | ||||
"""A kafka server with existing topics | """A kafka server with existing topics | ||||
topics are built from the ``kafka_prefix`` and the ``object_types`` list""" | Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type | ||||
topics = [f"{kafka_prefix}.{obj}" for obj in object_types] | from the ``object_types`` list. | ||||
Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with | |||||
object_type from the ``privileged_object_types`` list. | |||||
""" | |||||
topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ | |||||
f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types | |||||
] | |||||
# unfortunately, the Mock broker does not support the CreatTopic admin API, so we | # unfortunately, the Mock broker does not support the CreatTopic admin API, so we | ||||
# have to create topics using a Producer. | # have to create topics using a Producer. | ||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server_base, | "bootstrap.servers": kafka_server_base, | ||||
"client.id": "bootstrap producer", | "client.id": "bootstrap producer", | ||||
"acks": "all", | "acks": "all", | ||||
Show All 32 Lines | |||||
TEST_CONFIG = { | TEST_CONFIG = { | ||||
"consumer_id": "swh.journal.consumer", | "consumer_id": "swh.journal.consumer", | ||||
"stop_after_objects": 1, # will read 1 object and stop | "stop_after_objects": 1, # will read 1 object and stop | ||||
"storage": {"cls": "memory", "args": {}}, | "storage": {"cls": "memory", "args": {}}, | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]): | def test_config( | ||||
kafka_server_base: str, | |||||
kafka_prefix: str, | |||||
object_types: Iterator[str], | |||||
privileged_object_types: Iterator[str], | |||||
): | |||||
"""Test configuration needed for producer/consumer | """Test configuration needed for producer/consumer | ||||
""" | """ | ||||
return { | return { | ||||
**TEST_CONFIG, | **TEST_CONFIG, | ||||
"object_types": object_types, | "object_types": object_types, | ||||
"privileged_object_types": privileged_object_types, | |||||
"brokers": [kafka_server_base], | "brokers": [kafka_server_base], | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def consumer( | def consumer( | ||||
kafka_server: str, test_config: Dict, kafka_consumer_group: str, | kafka_server: str, test_config: Dict, kafka_consumer_group: str | ||||
) -> Consumer: | ) -> Consumer: | ||||
"""Get a connected Kafka consumer. | """Get a connected Kafka consumer. | ||||
""" | """ | ||||
consumer = Consumer( | consumer = Consumer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"auto.offset.reset": "earliest", | "auto.offset.reset": "earliest", | ||||
"enable.auto.commit": True, | "enable.auto.commit": True, | ||||
"group.id": kafka_consumer_group, | "group.id": kafka_consumer_group, | ||||
} | } | ||||
) | ) | ||||
prefix = test_config["prefix"] | |||||
kafka_topics = [ | kafka_topics = [ | ||||
"%s.%s" % (test_config["prefix"], object_type) | f"{prefix}.{object_type}" for object_type in test_config["object_types"] | ||||
for object_type in test_config["object_types"] | ] + [ | ||||
f"{prefix}_privileged.{object_type}" | |||||
for object_type in test_config["privileged_object_types"] | |||||
] | ] | ||||
consumer.subscribe(kafka_topics) | consumer.subscribe(kafka_topics) | ||||
yield consumer | yield consumer | ||||
consumer.close() | consumer.close() |