Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_kafka_writer.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Any, Dict | from typing import Any, Dict | ||||
from attr import asdict, has | from attr import asdict, has | ||||
from confluent_kafka import Consumer | from confluent_kafka import Consumer | ||||
from hypothesis import given | from hypothesis import given | ||||
from hypothesis.strategies import lists | from hypothesis.strategies import lists | ||||
from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages | from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | |||||
from swh.model.hypothesis_strategies import objects | from swh.model.hypothesis_strategies import objects | ||||
from swh.model.model import Origin, OriginVisit, Person | from swh.model.model import Origin, OriginVisit, Person | ||||
from swh.model.tests.swh_model_data import TEST_OBJECTS | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): | def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): | ||||
writer_config = { | writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
Show All 11 Lines | def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): | ||||
expected_messages = 0 | expected_messages = 0 | ||||
for obj_type, objs in TEST_OBJECTS.items(): | for obj_type, objs in TEST_OBJECTS.items(): | ||||
method = getattr(storage, obj_type + "_add") | method = getattr(storage, obj_type + "_add") | ||||
if obj_type in ( | if obj_type in ( | ||||
"content", | "content", | ||||
"skipped_content", | "skipped_content", | ||||
"directory", | "directory", | ||||
"extid", | |||||
"metadata_authority", | "metadata_authority", | ||||
"metadata_fetcher", | "metadata_fetcher", | ||||
"revision", | "revision", | ||||
"release", | "release", | ||||
"snapshot", | "snapshot", | ||||
"origin", | "origin", | ||||
"origin_visit_status", | "origin_visit_status", | ||||
"raw_extrinsic_metadata", | "raw_extrinsic_metadata", | ||||
Show All 14 Lines | existing_topics = set( | ||||
for topic in consumer.list_topics(timeout=10).topics.keys() | for topic in consumer.list_topics(timeout=10).topics.keys() | ||||
if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics | if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics | ||||
) | ) | ||||
assert existing_topics == { | assert existing_topics == { | ||||
f"{kafka_prefix}.{obj_type}" | f"{kafka_prefix}.{obj_type}" | ||||
for obj_type in ( | for obj_type in ( | ||||
"content", | "content", | ||||
"directory", | "directory", | ||||
"extid", | |||||
"metadata_authority", | "metadata_authority", | ||||
"metadata_fetcher", | "metadata_fetcher", | ||||
"origin", | "origin", | ||||
"origin_visit", | "origin_visit", | ||||
"origin_visit_status", | "origin_visit_status", | ||||
"raw_extrinsic_metadata", | "raw_extrinsic_metadata", | ||||
"release", | "release", | ||||
"revision", | "revision", | ||||
Show All 40 Lines | existing_topics = set( | ||||
for topic in consumer.list_topics(timeout=10).topics.keys() | for topic in consumer.list_topics(timeout=10).topics.keys() | ||||
if topic.startswith(kafka_prefix) | if topic.startswith(kafka_prefix) | ||||
) | ) | ||||
assert existing_topics == { | assert existing_topics == { | ||||
f"{kafka_prefix}.{obj_type}" | f"{kafka_prefix}.{obj_type}" | ||||
for obj_type in ( | for obj_type in ( | ||||
"content", | "content", | ||||
"directory", | "directory", | ||||
"extid", | |||||
"metadata_authority", | "metadata_authority", | ||||
"metadata_fetcher", | "metadata_fetcher", | ||||
"origin", | "origin", | ||||
"origin_visit", | "origin_visit", | ||||
"origin_visit_status", | "origin_visit_status", | ||||
"raw_extrinsic_metadata", | "raw_extrinsic_metadata", | ||||
"release", | "release", | ||||
"revision", | "revision", | ||||
Show All 23 Lines |