Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_kafka_writer.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Any, Dict | |||||
from confluent_kafka import Consumer | from confluent_kafka import Consumer | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.model.model import Origin, OriginVisit | from swh.model.model import Origin, OriginVisit | ||||
from swh.model.hypothesis_strategies import objects | from swh.model.hypothesis_strategies import objects | ||||
from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed | from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
from swh.model.model import Person | from swh.model.model import Person | ||||
from attr import asdict, has | from attr import asdict, has | ||||
from hypothesis import given | from hypothesis import given | ||||
from hypothesis.strategies import lists | from hypothesis.strategies import lists | ||||
def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): | def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): | ||||
writer_config = { | writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"anonymize": False, | "anonymize": False, | ||||
} | } | ||||
storage_config = { | storage_config: Dict[str, Any] = { | ||||
"cls": "pipeline", | "cls": "pipeline", | ||||
"steps": [{"cls": "memory", "journal_writer": writer_config},], | "steps": [{"cls": "memory", "journal_writer": writer_config},], | ||||
} | } | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | ): | ||||
writer_config = { | writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"anonymize": True, | "anonymize": True, | ||||
} | } | ||||
storage_config = { | storage_config: Dict[str, Any] = { | ||||
"cls": "pipeline", | "cls": "pipeline", | ||||
"steps": [{"cls": "memory", "journal_writer": writer_config},], | "steps": [{"cls": "memory", "journal_writer": writer_config},], | ||||
} | } | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
▲ Show 20 Lines • Show All 47 Lines • Show Last 20 Lines |