Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
Show All 13 Lines | |||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS | from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS | ||||
from swh.model.model import Content | from swh.model.model import Content | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.writer import anonymize | |||||
from swh.storage.replay import process_replay_objects | from swh.storage.replay import process_replay_objects | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | |||||
from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter | from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter | ||||
from swh.journal.tests.conftest import ( | from swh.journal.tests.conftest import ( | ||||
TEST_OBJECT_DICTS, | TEST_OBJECT_DICTS, | ||||
DUPLICATE_CONTENTS, | DUPLICATE_CONTENTS, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 424 Lines • ▼ Show 20 Lines | return [ | ||||
"status": "absent", | "status": "absent", | ||||
"reason": "why not", | "reason": "why not", | ||||
"origin": f"https://somewhere/{i}", | "origin": f"https://somewhere/{i}", | ||||
"ctime": now, | "ctime": now, | ||||
}, | }, | ||||
) | ) | ||||
for i in range(n) | for i in range(n) | ||||
] | ] | ||||
def test_storage_play_anonymized( | |||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, | |||||
): | |||||
"""Optimal replayer scenario. | |||||
This: | |||||
- writes objects to the topic | |||||
- replayer consumes objects from the topic and replay them | |||||
""" | |||||
writer_config = { | |||||
"cls": "kafka", | |||||
"brokers": [kafka_server], | |||||
"client_id": "kafka_writer", | |||||
"prefix": kafka_prefix, | |||||
"anonymize": True, | |||||
} | |||||
src_config = {"cls": "memory", "journal_writer": writer_config} | |||||
storage = get_storage(**src_config) | |||||
# Fill the src storage | |||||
nb_sent = 0 | |||||
for obj_type, objs in TEST_OBJECTS.items(): | |||||
if obj_type == "origin_visit": | |||||
# these have non-consistent API and are unrelated with what we | |||||
# want to test here | |||||
continue | |||||
method = getattr(storage, obj_type + "_add") | |||||
method(objs) | |||||
nb_sent += len(objs) | |||||
dst_storage = get_storage(cls="memory") | |||||
caplog.set_level(logging.ERROR, "swh.journal.replay") | |||||
# Fill the storage from Kafka using anonymized topics | |||||
object_types = { | |||||
"content", | |||||
"directory", | |||||
"origin", | |||||
"origin_visit", | |||||
"release:anonymized", | |||||
"revision:anonymized", | |||||
"snapshot", | |||||
"skipped_content", | |||||
} | |||||
replayer = JournalClient( | |||||
brokers=kafka_server, | |||||
group_id=kafka_consumer_group, | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=nb_sent, | |||||
object_types=list(object_types), | |||||
) | |||||
worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | |||||
nb_inserted = replayer.process(worker_fn) | |||||
assert nb_sent == nb_inserted | |||||
ardumont: no need for the `while` | |||||
expected_persons = {anonymize(person) for person in storage._persons} | |||||
assert set(dst_storage._persons) == expected_persons |
no need for the while