Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_backfill.py
Show All 10 Lines | |||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.tests.journal_data import TEST_OBJECTS | from swh.journal.tests.journal_data import TEST_OBJECTS | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.backfill import PARTITION_KEY, JournalBackfiller, compute_query | from swh.storage.backfill import PARTITION_KEY, JournalBackfiller, compute_query | ||||
from swh.storage.replay import process_replay_objects | from swh.storage.replay import process_replay_objects | ||||
from swh.storage.tests.test_replay import check_replayed | from swh.storage.tests.test_replay import check_replayed | ||||
TEST_CONFIG = { | TEST_CONFIG = { | ||||
"journal_writer": { | |||||
"brokers": ["localhost"], | "brokers": ["localhost"], | ||||
"prefix": "swh.tmp_journal.new", | "prefix": "swh.tmp_journal.new", | ||||
"client_id": "swh.journal.client.test", | "client_id": "swh.journal.client.test", | ||||
}, | |||||
"storage_dbconn": "service=swh-dev", | "storage_dbconn": "service=swh-dev", | ||||
} | } | ||||
def test_config_ko_missing_mandatory_key(): | def test_config_ko_missing_mandatory_key(): | ||||
"""Missing configuration key will make the initialization fail | """Missing configuration key will make the initialization fail | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 170 Lines • ▼ Show 20 Lines | ): | ||||
# fill the storage and the journal (under prefix1) | # fill the storage and the journal (under prefix1) | ||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
method = getattr(storage, object_type + "_add") | method = getattr(storage, object_type + "_add") | ||||
method(objects) | method(objects) | ||||
# now apply the backfiller on the storage to fill the journal under prefix2 | # now apply the backfiller on the storage to fill the journal under prefix2 | ||||
backfiller_config = { | backfiller_config = { | ||||
"journal_writer": { | |||||
"brokers": [kafka_server], | "brokers": [kafka_server], | ||||
"client_id": "kafka_writer-2", | "client_id": "kafka_writer-2", | ||||
"prefix": prefix2, | "prefix": prefix2, | ||||
}, | |||||
"storage_dbconn": swh_storage_backend_config["db"], | "storage_dbconn": swh_storage_backend_config["db"], | ||||
} | } | ||||
# Backfilling | # Backfilling | ||||
backfiller = JournalBackfiller(backfiller_config) | backfiller = JournalBackfiller(backfiller_config) | ||||
for object_type in TEST_OBJECTS: | for object_type in TEST_OBJECTS: | ||||
backfiller.run(object_type, None, None) | backfiller.run(object_type, None, None) | ||||
Show All 26 Lines |