Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import logging | import logging | ||||
from typing import Any, Container, Dict, Optional | |||||
from typing import Container, Dict, Optional | |||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS | from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS | ||||
from swh.model.model import Content | from swh.model.model import Content | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.in_memory import InMemoryStorage | from swh.storage.in_memory import InMemoryStorage | ||||
Show All 16 Lines | def replayer_storage_and_client( | ||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str | kafka_prefix: str, kafka_consumer_group: str, kafka_server: str | ||||
): | ): | ||||
journal_writer_config = { | journal_writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
} | } | ||||
storage_config = { | storage_config: Dict[str, Any] = { | ||||
"cls": "memory", | "cls": "memory", | ||||
"journal_writer": journal_writer_config, | "journal_writer": journal_writer_config, | ||||
} | } | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
replayer = JournalClient( | replayer = JournalClient( | ||||
brokers=kafka_server, | brokers=kafka_server, | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
▲ Show 20 Lines • Show All 221 Lines • ▼ Show 20 Lines | ): | ||||
""" | """ | ||||
writer_config = { | writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"anonymize": True, | "anonymize": True, | ||||
} | } | ||||
src_config = {"cls": "memory", "journal_writer": writer_config} | src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} | ||||
storage = get_storage(**src_config) | storage = get_storage(**src_config) | ||||
# Fill the src storage | # Fill the src storage | ||||
nb_sent = 0 | nb_sent = 0 | ||||
for obj_type, objs in TEST_OBJECTS.items(): | for obj_type, objs in TEST_OBJECTS.items(): | ||||
if obj_type in ("origin_visit", "origin_visit_status"): | if obj_type in ("origin_visit", "origin_visit_status"): | ||||
# these are unrelated with what we want to test here | # these are unrelated with what we want to test here | ||||
▲ Show 20 Lines • Show All 71 Lines • Show Last 20 Lines |