Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_backfill.py
Show First 20 Lines • Show All 229 Lines • ▼ Show 20 Lines | ): | ||||
prefix1 = f"{kafka_prefix}-1" | prefix1 = f"{kafka_prefix}-1" | ||||
prefix2 = f"{kafka_prefix}-2" | prefix2 = f"{kafka_prefix}-2" | ||||
journal1 = { | journal1 = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer-1", | "client_id": "kafka_writer-1", | ||||
"prefix": prefix1, | "prefix": prefix1, | ||||
"auto_flush": False, | |||||
} | } | ||||
swh_storage_backend_config["journal_writer"] = journal1 | swh_storage_backend_config["journal_writer"] = journal1 | ||||
storage = get_storage(**swh_storage_backend_config) | storage = get_storage(**swh_storage_backend_config) | ||||
# fill the storage and the journal (under prefix1) | # fill the storage and the journal (under prefix1) | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
method = getattr(storage, object_type + "_add") | method = getattr(storage, object_type + "_add") | ||||
method(objects) | method(objects) | ||||
assert storage.journal_writer is not None | |||||
storage.journal_writer.journal.flush() | |||||
# now apply the backfiller on the storage to fill the journal under prefix2 | # now apply the backfiller on the storage to fill the journal under prefix2 | ||||
backfiller_config = { | backfiller_config = { | ||||
"journal_writer": { | "journal_writer": { | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer-2", | "client_id": "kafka_writer-2", | ||||
"prefix": prefix2, | "prefix": prefix2, | ||||
"auto_flush": False, | |||||
}, | }, | ||||
"storage": swh_storage_backend_config, | "storage": swh_storage_backend_config, | ||||
} | } | ||||
# Backfilling | # Backfilling | ||||
backfiller = JournalBackfiller(backfiller_config) | backfiller = JournalBackfiller(backfiller_config) | ||||
for object_type in TEST_OBJECTS: | for object_type in TEST_OBJECTS: | ||||
backfiller.run(object_type, None, None) | backfiller.run(object_type, None, None) | ||||
backfiller.writer.journal.flush() | |||||
# Trace log messages for unhandled object types in the replayer | # Trace log messages for unhandled object types in the replayer | ||||
caplog.set_level(logging.DEBUG, "swh.storage.replay") | caplog.set_level(logging.DEBUG, "swh.storage.replay") | ||||
# now check journal content are the same under both topics | # now check journal content are the same under both topics | ||||
# use the replayer scaffolding to fill storages to make is a bit easier | # use the replayer scaffolding to fill storages to make is a bit easier | ||||
# Replaying #1 | # Replaying #1 | ||||
deserializer = ModelObjectDeserializer() | deserializer = ModelObjectDeserializer() | ||||
▲ Show 20 Lines • Show All 97 Lines • Show Last 20 Lines |