Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
Show First 20 Lines • Show All 263 Lines • ▼ Show 20 Lines | return [ | ||||
"origin": f"https://somewhere/{i}", | "origin": f"https://somewhere/{i}", | ||||
"ctime": now, | "ctime": now, | ||||
}, | }, | ||||
) | ) | ||||
for i in range(n) | for i in range(n) | ||||
] | ] | ||||
@pytest.mark.parametrize("privileged", [True, False]) | |||||
def test_storage_play_anonymized( | def test_storage_play_anonymized( | ||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str | kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, privileged: bool, | ||||
): | ): | ||||
"""Optimal replayer scenario. | """Optimal replayer scenario. | ||||
This: | This: | ||||
- writes objects to the topic | - writes objects to the topic | ||||
- replayer consumes objects from the topic and replay them | - replayer consumes objects from the topic and replay them | ||||
This tests the behavior with both a privileged and non-privileged replayer | |||||
""" | """ | ||||
writer_config = { | writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"anonymize": True, | "anonymize": True, | ||||
} | } | ||||
src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} | src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} | ||||
storage = get_storage(**src_config) | storage = get_storage(**src_config) | ||||
# Fill the src storage | # Fill the src storage | ||||
nb_sent = 0 | nb_sent = 0 | ||||
for obj_type, objs in TEST_OBJECTS.items(): | for obj_type, objs in TEST_OBJECTS.items(): | ||||
if obj_type in ("origin_visit", "origin_visit_status"): | if obj_type in ("origin_visit", "origin_visit_status"): | ||||
# these are unrelated with what we want to test here | # these are unrelated with what we want to test here | ||||
continue | continue | ||||
method = getattr(storage, obj_type + "_add") | method = getattr(storage, obj_type + "_add") | ||||
method(objs) | method(objs) | ||||
nb_sent += len(objs) | nb_sent += len(objs) | ||||
# Fill a destination storage from Kafka **using anonymized topics** | # Fill a destination storage from Kafka, potentially using privileged topics | ||||
dst_storage = get_storage(cls="memory") | dst_storage = get_storage(cls="memory") | ||||
replayer = JournalClient( | replayer = JournalClient( | ||||
brokers=kafka_server, | brokers=kafka_server, | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=nb_sent, | stop_after_objects=nb_sent, | ||||
privileged=False, | privileged=privileged, | ||||
) | ) | ||||
worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | ||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||
replayer.consumer.commit() | replayer.consumer.commit() | ||||
assert nb_sent == nb_inserted | |||||
check_replayed(storage, dst_storage, expected_anonymized=True) | |||||
# Fill a destination storage from Kafka **with stock (non-anonymized) topics** | |||||
dst_storage = get_storage(cls="memory") | |||||
replayer = JournalClient( | |||||
brokers=kafka_server, | |||||
group_id=f"{kafka_consumer_group}-2", | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=nb_sent, | |||||
privileged=True, | |||||
) | |||||
worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | |||||
nb_inserted = replayer.process(worker_fn) | |||||
replayer.consumer.commit() | |||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
check_replayed(storage, dst_storage, expected_anonymized=False) | # Check the contents of the destination storage, and whether the anonymization was | ||||
# properly used | |||||
check_replayed(storage, dst_storage, expected_anonymized=not privileged) | |||||
def check_replayed(src, dst, expected_anonymized=False): | def check_replayed(src, dst, expected_anonymized=False): | ||||
"""Simple utility function to compare the content of 2 in_memory storages | """Simple utility function to compare the content of 2 in_memory storages | ||||
If expected_anonymized is True, objects from the source storage are anonymized | If expected_anonymized is True, objects from the source storage are anonymized | ||||
before comparing with the destination storage. | before comparing with the destination storage. | ||||
Show All 33 Lines |