Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_backfill.py
Show All 14 Lines | |||||
from swh.storage.backfill import ( | from swh.storage.backfill import ( | ||||
PARTITION_KEY, | PARTITION_KEY, | ||||
JournalBackfiller, | JournalBackfiller, | ||||
byte_ranges, | byte_ranges, | ||||
compute_query, | compute_query, | ||||
raw_extrinsic_metadata_target_ranges, | raw_extrinsic_metadata_target_ranges, | ||||
) | ) | ||||
from swh.storage.in_memory import InMemoryStorage | from swh.storage.in_memory import InMemoryStorage | ||||
from swh.storage.replay import process_replay_objects | from swh.storage.replay import ModelObjectDeserializer, process_replay_objects | ||||
from swh.storage.tests.test_replay import check_replayed | from swh.storage.tests.test_replay import check_replayed | ||||
TEST_CONFIG = { | TEST_CONFIG = { | ||||
"journal_writer": { | "journal_writer": { | ||||
"brokers": ["localhost"], | "brokers": ["localhost"], | ||||
"prefix": "swh.tmp_journal.new", | "prefix": "swh.tmp_journal.new", | ||||
"client_id": "swh.journal.client.test", | "client_id": "swh.journal.client.test", | ||||
}, | }, | ||||
▲ Show 20 Lines • Show All 202 Lines • ▼ Show 20 Lines | ): | ||||
journal1 = { | journal1 = { | ||||
"cls": "kafka", | "cls": "kafka", | ||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer-1", | "client_id": "kafka_writer-1", | ||||
"prefix": prefix1, | "prefix": prefix1, | ||||
} | } | ||||
swh_storage_backend_config["journal_writer"] = journal1 | swh_storage_backend_config["journal_writer"] = journal1 | ||||
storage = get_storage(**swh_storage_backend_config) | storage = get_storage(**swh_storage_backend_config) | ||||
# fill the storage and the journal (under prefix1) | # fill the storage and the journal (under prefix1) | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
method = getattr(storage, object_type + "_add") | method = getattr(storage, object_type + "_add") | ||||
method(objects) | method(objects) | ||||
# now apply the backfiller on the storage to fill the journal under prefix2 | # now apply the backfiller on the storage to fill the journal under prefix2 | ||||
backfiller_config = { | backfiller_config = { | ||||
"journal_writer": { | "journal_writer": { | ||||
Show All 10 Lines | for object_type in TEST_OBJECTS: | ||||
backfiller.run(object_type, None, None) | backfiller.run(object_type, None, None) | ||||
# Trace log messages for unhandled object types in the replayer | # Trace log messages for unhandled object types in the replayer | ||||
caplog.set_level(logging.DEBUG, "swh.storage.replay") | caplog.set_level(logging.DEBUG, "swh.storage.replay") | ||||
# now check journal content are the same under both topics | # now check journal content are the same under both topics | ||||
# use the replayer scaffolding to fill storages to make is a bit easier | # use the replayer scaffolding to fill storages to make is a bit easier | ||||
# Replaying #1 | # Replaying #1 | ||||
deserializer = ModelObjectDeserializer() | |||||
sto1 = get_storage(cls="memory") | sto1 = get_storage(cls="memory") | ||||
replayer1 = JournalClient( | replayer1 = JournalClient( | ||||
brokers=kafka_server, | brokers=kafka_server, | ||||
group_id=f"{kafka_consumer_group}-1", | group_id=f"{kafka_consumer_group}-1", | ||||
prefix=prefix1, | prefix=prefix1, | ||||
stop_on_eof=True, | stop_on_eof=True, | ||||
value_deserializer=deserializer.convert, | |||||
) | ) | ||||
worker_fn1 = functools.partial(process_replay_objects, storage=sto1) | worker_fn1 = functools.partial(process_replay_objects, storage=sto1) | ||||
replayer1.process(worker_fn1) | replayer1.process(worker_fn1) | ||||
# Replaying #2 | # Replaying #2 | ||||
sto2 = get_storage(cls="memory") | sto2 = get_storage(cls="memory") | ||||
replayer2 = JournalClient( | replayer2 = JournalClient( | ||||
brokers=kafka_server, | brokers=kafka_server, | ||||
group_id=f"{kafka_consumer_group}-2", | group_id=f"{kafka_consumer_group}-2", | ||||
prefix=prefix2, | prefix=prefix2, | ||||
stop_on_eof=True, | stop_on_eof=True, | ||||
value_deserializer=deserializer.convert, | |||||
) | ) | ||||
worker_fn2 = functools.partial(process_replay_objects, storage=sto2) | worker_fn2 = functools.partial(process_replay_objects, storage=sto2) | ||||
replayer2.process(worker_fn2) | replayer2.process(worker_fn2) | ||||
# Compare storages | # Compare storages | ||||
assert isinstance(sto1, InMemoryStorage) # needed to help mypy | assert isinstance(sto1, InMemoryStorage) # needed to help mypy | ||||
assert isinstance(sto2, InMemoryStorage) | assert isinstance(sto2, InMemoryStorage) | ||||
check_replayed(sto1, sto2) | check_replayed(sto1, sto2) | ||||
for record in caplog.records: | for record in caplog.records: | ||||
assert ( | assert ( | ||||
"this should not happen" not in record.message | "this should not happen" not in record.message | ||||
), "Replayer ignored some message types, see captured logging" | ), "Replayer ignored some message types, see captured logging" |