Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_pytest_plugin.py
Show All 14 Lines | def test_kafka_server(kafka_server_base: str): | ||||
admin = AdminClient({"bootstrap.servers": kafka_server_base}) | admin = AdminClient({"bootstrap.servers": kafka_server_base}) | ||||
topics = admin.list_topics() | topics = admin.list_topics() | ||||
assert len(topics.brokers) == 1 | assert len(topics.brokers) == 1 | ||||
def test_kafka_server_with_topics( | def test_kafka_server_with_topics( | ||||
kafka_server: str, kafka_prefix: str, object_types: Iterator[str] | kafka_server: str, | ||||
kafka_prefix: str, | |||||
object_types: Iterator[str], | |||||
privileged_object_types: Iterator[str], | |||||
): | ): | ||||
admin = AdminClient({"bootstrap.servers": kafka_server}) | admin = AdminClient({"bootstrap.servers": kafka_server}) | ||||
# check unprivileged topics are present | |||||
topics = { | topics = { | ||||
topic | topic | ||||
for topic in admin.list_topics().topics | for topic in admin.list_topics().topics | ||||
if topic.startswith(f"{kafka_prefix}.") | if topic.startswith(f"{kafka_prefix}.") | ||||
} | } | ||||
assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} | assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} | ||||
# check privileged topics are present | |||||
topics = { | |||||
topic | |||||
for topic in admin.list_topics().topics | |||||
if topic.startswith(f"{kafka_prefix}_privileged.") | |||||
} | |||||
assert topics == { | |||||
f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types | |||||
} | |||||
def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): | def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): | ||||
assert test_config == { | assert test_config == { | ||||
"consumer_id": "swh.journal.consumer", | "consumer_id": "swh.journal.consumer", | ||||
"stop_after_objects": 1, | "stop_after_objects": 1, | ||||
"storage": {"cls": "memory", "args": {}}, | "storage": {"cls": "memory", "args": {}}, | ||||
"object_types": { | "object_types": { | ||||
"content", | "content", | ||||
"directory", | "directory", | ||||
"origin", | "origin", | ||||
"origin_visit", | "origin_visit", | ||||
"release", | "release", | ||||
"revision", | "revision", | ||||
"snapshot", | "snapshot", | ||||
"skipped_content", | "skipped_content", | ||||
}, | }, | ||||
"privileged_object_types": {"release", "revision",}, | |||||
"brokers": [kafka_server_base], | "brokers": [kafka_server_base], | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
} | } |