Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
Show First 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | |||||
def replayer_storage_and_client( | def replayer_storage_and_client( | ||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str | kafka_prefix: str, kafka_consumer_group: str, kafka_server: str | ||||
): | ): | ||||
journal_writer_config = { | journal_writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"auto_flush": False, | |||||
} | } | ||||
storage_config: Dict[str, Any] = { | storage_config: Dict[str, Any] = { | ||||
"cls": "memory", | "cls": "memory", | ||||
"journal_writer": journal_writer_config, | "journal_writer": journal_writer_config, | ||||
} | } | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
deserializer = ModelObjectDeserializer() | deserializer = ModelObjectDeserializer() | ||||
replayer = JournalClient( | replayer = JournalClient( | ||||
Show All 20 Lines | def test_storage_replayer(replayer_storage_and_client, caplog): | ||||
src, replayer = replayer_storage_and_client | src, replayer = replayer_storage_and_client | ||||
# Fill Kafka using a source storage | # Fill Kafka using a source storage | ||||
nb_sent = 0 | nb_sent = 0 | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
method = getattr(src, object_type + "_add") | method = getattr(src, object_type + "_add") | ||||
method(objects) | method(objects) | ||||
nb_sent += len(objects) | nb_sent += len(objects) | ||||
src.journal_writer.journal.flush() | |||||
caplog.set_level(logging.ERROR, "swh.journal.replay") | caplog.set_level(logging.ERROR, "swh.journal.replay") | ||||
# Fill the destination storage from Kafka | # Fill the destination storage from Kafka | ||||
dst = get_storage(cls="memory") | dst = get_storage(cls="memory") | ||||
worker_fn = functools.partial(process_replay_objects, storage=dst) | worker_fn = functools.partial(process_replay_objects, storage=dst) | ||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
Show All 23 Lines | def test_storage_replay_with_collision(replayer_storage_and_client, caplog): | ||||
src, replayer = replayer_storage_and_client | src, replayer = replayer_storage_and_client | ||||
# Fill Kafka using a source storage | # Fill Kafka using a source storage | ||||
nb_sent = 0 | nb_sent = 0 | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
method = getattr(src, object_type + "_add") | method = getattr(src, object_type + "_add") | ||||
method(objects) | method(objects) | ||||
nb_sent += len(objects) | nb_sent += len(objects) | ||||
src.journal_writer.journal.flush() | |||||
# Create collision in input data | # Create collision in input data | ||||
# These should not be written in the destination | # These should not be written in the destination | ||||
producer = src.journal_writer.journal.producer | producer = src.journal_writer.journal.producer | ||||
prefix = src.journal_writer.journal._prefix | prefix = src.journal_writer.journal._prefix | ||||
for content in DUPLICATE_CONTENTS: | for content in DUPLICATE_CONTENTS: | ||||
topic = f"{prefix}.content" | topic = f"{prefix}.content" | ||||
key = content.sha1 | key = content.sha1 | ||||
▲ Show 20 Lines • Show All 180 Lines • ▼ Show 20 Lines | ): | ||||
This tests the behavior with both a privileged and non-privileged replayer | This tests the behavior with both a privileged and non-privileged replayer | ||||
""" | """ | ||||
writer_config = { | writer_config = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
"anonymize": True, | "anonymize": True, | ||||
"auto_flush": False, | |||||
} | } | ||||
src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} | src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} | ||||
storage = get_storage(**src_config) | storage = get_storage(**src_config) | ||||
# Fill the src storage | # Fill the src storage | ||||
nb_sent = 0 | nb_sent = 0 | ||||
for obj_type, objs in TEST_OBJECTS.items(): | for obj_type, objs in TEST_OBJECTS.items(): | ||||
if obj_type in ("origin_visit", "origin_visit_status"): | if obj_type in ("origin_visit", "origin_visit_status"): | ||||
# these are unrelated with what we want to test here | # these are unrelated with what we want to test here | ||||
continue | continue | ||||
method = getattr(storage, obj_type + "_add") | method = getattr(storage, obj_type + "_add") | ||||
method(objs) | method(objs) | ||||
nb_sent += len(objs) | nb_sent += len(objs) | ||||
assert storage.journal_writer is not None | |||||
storage.journal_writer.journal.flush() | |||||
# Fill a destination storage from Kafka, potentially using privileged topics | # Fill a destination storage from Kafka, potentially using privileged topics | ||||
dst_storage = get_storage(cls="memory") | dst_storage = get_storage(cls="memory") | ||||
deserializer = ModelObjectDeserializer( | deserializer = ModelObjectDeserializer( | ||||
validate=False | validate=False | ||||
) # we cannot validate an anonymized replay | ) # we cannot validate an anonymized replay | ||||
replayer = JournalClient( | replayer = JournalClient( | ||||
brokers=kafka_server, | brokers=kafka_server, | ||||
Show All 33 Lines | ): | ||||
replayer.deserializer = ModelObjectDeserializer(validate=True, reporter=redisdb.set) | replayer.deserializer = ModelObjectDeserializer(validate=True, reporter=redisdb.set) | ||||
# Fill Kafka using a source storage | # Fill Kafka using a source storage | ||||
nb_sent = 0 | nb_sent = 0 | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
method = getattr(src, object_type + "_add") | method = getattr(src, object_type + "_add") | ||||
method(objects) | method(objects) | ||||
nb_sent += len(objects) | nb_sent += len(objects) | ||||
src.journal_writer.journal.flush() | |||||
# Fill the destination storage from Kafka | # Fill the destination storage from Kafka | ||||
dst = get_storage(cls="memory") | dst = get_storage(cls="memory") | ||||
worker_fn = functools.partial(process_replay_objects, storage=dst) | worker_fn = functools.partial(process_replay_objects, storage=dst) | ||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
# check we do not have invalid objects reported | # check we do not have invalid objects reported | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | dict_repr = { | ||||
# copy each dir entry twice | # copy each dir entry twice | ||||
"entries": TEST_OBJECTS["directory"][1].to_dict()["entries"] * 2, | "entries": TEST_OBJECTS["directory"][1].to_dict()["entries"] * 2, | ||||
"id": b"\x01" * 20, | "id": b"\x01" * 20, | ||||
} | } | ||||
topic = f"{src.journal_writer.journal._prefix}.directory" | topic = f"{src.journal_writer.journal._prefix}.directory" | ||||
src.journal_writer.journal.send(topic, dict_repr["id"], dict_repr) | src.journal_writer.journal.send(topic, dict_repr["id"], dict_repr) | ||||
nb_sent += 1 | nb_sent += 1 | ||||
src.journal_writer.journal.flush() | |||||
# Fill the destination storage from Kafka | # Fill the destination storage from Kafka | ||||
dst = get_storage(cls="memory") | dst = get_storage(cls="memory") | ||||
worker_fn = functools.partial(process_replay_objects, storage=dst) | worker_fn = functools.partial(process_replay_objects, storage=dst) | ||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
# check we do have invalid objects reported | # check we do have invalid objects reported | ||||
invalid = 0 | invalid = 0 | ||||
▲ Show 20 Lines • Show All 96 Lines • Show Last 20 Lines |