diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -453,7 +453,7 @@ return str(bound) -MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"] +MANDATORY_KEYS = ["storage_dbconn", "journal_writer"] class JournalBackfiller: @@ -520,11 +520,7 @@ ) db = BaseDb.connect(self.config["storage_dbconn"]) - writer = KafkaJournalWriter( - brokers=self.config["brokers"], - prefix=self.config["prefix"], - client_id=self.config["client_id"], - ) + writer = KafkaJournalWriter(**self.config["journal_writer"]) for range_start, range_end in RANGE_GENERATORS[object_type]( start_object, end_object ): diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -16,9 +16,11 @@ from swh.storage.tests.test_replay import check_replayed TEST_CONFIG = { - "brokers": ["localhost"], - "prefix": "swh.tmp_journal.new", - "client_id": "swh.journal.client.test", + "journal_writer": { + "brokers": ["localhost"], + "prefix": "swh.tmp_journal.new", + "client_id": "swh.journal.client.test", + }, "storage_dbconn": "service=swh-dev", } @@ -205,9 +207,11 @@ # now apply the backfiller on the storage to fill the journal under prefix2 backfiller_config = { - "brokers": [kafka_server], - "client_id": "kafka_writer-2", - "prefix": prefix2, + "journal_writer": { + "brokers": [kafka_server], + "client_id": "kafka_writer-2", + "prefix": prefix2, + }, "storage_dbconn": swh_storage_backend_config["db"], }