Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_kafka_writer.py
Show All 19 Lines | |||||
def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): | def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): | ||||
writer_config = { | writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"anonymize": False, | "anonymize": False, | ||||
"auto_flush": False, | |||||
} | } | ||||
storage_config: Dict[str, Any] = { | storage_config: Dict[str, Any] = { | ||||
"cls": "pipeline", | "cls": "pipeline", | ||||
"steps": [ | "steps": [ | ||||
{"cls": "memory", "journal_writer": writer_config}, | {"cls": "memory", "journal_writer": writer_config}, | ||||
], | ], | ||||
} | } | ||||
Show All 17 Lines | for obj_type, objs in TEST_OBJECTS.items(): | ||||
"origin_visit", | "origin_visit", | ||||
"origin_visit_status", | "origin_visit_status", | ||||
"raw_extrinsic_metadata", | "raw_extrinsic_metadata", | ||||
): | ): | ||||
method(objs) | method(objs) | ||||
expected_messages += len(objs) | expected_messages += len(objs) | ||||
else: | else: | ||||
assert False, obj_type | assert False, obj_type | ||||
assert storage.journal_writer is not None | |||||
storage.journal_writer.journal.flush() | |||||
existing_topics = set( | existing_topics = set( | ||||
topic | topic | ||||
for topic in consumer.list_topics(timeout=10).topics.keys() | for topic in consumer.list_topics(timeout=10).topics.keys() | ||||
if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics | if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics | ||||
) | ) | ||||
assert existing_topics == { | assert existing_topics == { | ||||
f"{kafka_prefix}.{obj_type}" | f"{kafka_prefix}.{obj_type}" | ||||
Show All 23 Lines | |||||
): | ): | ||||
writer_config = { | writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"anonymize": True, | "anonymize": True, | ||||
"auto_flush": False, | |||||
} | } | ||||
storage_config: Dict[str, Any] = { | storage_config: Dict[str, Any] = { | ||||
"cls": "pipeline", | "cls": "pipeline", | ||||
"steps": [ | "steps": [ | ||||
{"cls": "memory", "journal_writer": writer_config}, | {"cls": "memory", "journal_writer": writer_config}, | ||||
], | ], | ||||
} | } | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
for obj_type, objs in TEST_OBJECTS.items(): | for obj_type, objs in TEST_OBJECTS.items(): | ||||
if obj_type == "origin_visit": | if obj_type == "origin_visit": | ||||
# these have non-consistent API and are unrelated with what we | # these have non-consistent API and are unrelated with what we | ||||
# want to test here | # want to test here | ||||
continue | continue | ||||
method = getattr(storage, obj_type + "_add") | method = getattr(storage, obj_type + "_add") | ||||
method(objs) | method(objs) | ||||
expected_messages += len(objs) | expected_messages += len(objs) | ||||
assert storage.journal_writer is not None | |||||
storage.journal_writer.journal.flush() | |||||
existing_topics = set( | existing_topics = set( | ||||
topic | topic | ||||
for topic in consumer.list_topics(timeout=10).topics.keys() | for topic in consumer.list_topics(timeout=10).topics.keys() | ||||
if topic.startswith(kafka_prefix) | if topic.startswith(kafka_prefix) | ||||
) | ) | ||||
assert existing_topics == { | assert existing_topics == { | ||||
f"{kafka_prefix}.{obj_type}" | f"{kafka_prefix}.{obj_type}" | ||||
Show All 39 Lines |