Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
Show First 20 Lines • Show All 306 Lines • ▼ Show 20 Lines | replayer = JournalClient( | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=nb_sent, | stop_after_objects=nb_sent, | ||||
privileged=False, | privileged=False, | ||||
) | ) | ||||
worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | ||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||
replayer.consumer.commit() | |||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
check_replayed(storage, dst_storage, expected_anonymized=True) | check_replayed(storage, dst_storage, expected_anonymized=True) | ||||
# Fill a destination storage from Kafka **with stock (non-anonymized) topics** | # Fill a destination storage from Kafka **with stock (non-anonymized) topics** | ||||
dst_storage = get_storage(cls="memory") | dst_storage = get_storage(cls="memory") | ||||
replayer = JournalClient( | replayer = JournalClient( | ||||
brokers=kafka_server, | brokers=kafka_server, | ||||
group_id=kafka_consumer_group, | group_id=f"{kafka_consumer_group}-2", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=nb_sent, | stop_after_objects=nb_sent, | ||||
privileged=True, | privileged=True, | ||||
) | ) | ||||
worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | ||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||
replayer.consumer.commit() | |||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
check_replayed(storage, dst_storage, expected_anonymized=False) | check_replayed(storage, dst_storage, expected_anonymized=False) | ||||
def check_replayed(src, dst, expected_anonymized=False): | def check_replayed(src, dst, expected_anonymized=False): | ||||
"""Simple utility function to compare the content of 2 in_memory storages | """Simple utility function to compare the content of 2 in_memory storages | ||||
If expected_anonymized is True, objects from the source storage are anonymized | If expected_anonymized is True, objects from the source storage are anonymized | ||||
Show All 35 Lines |