Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/backfill.py
Show First 20 Lines • Show All 447 Lines • ▼ Show 20 Lines | |||||
def _format_range_bound(bound): | def _format_range_bound(bound): | ||||
if isinstance(bound, bytes): | if isinstance(bound, bytes): | ||||
return bound.hex() | return bound.hex() | ||||
else: | else: | ||||
return str(bound) | return str(bound) | ||||
MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"] | MANDATORY_KEYS = ["storage_dbconn", "journal_writer"] | ||||
class JournalBackfiller: | class JournalBackfiller: | ||||
"""Class in charge of reading the storage's objects and sends those | """Class in charge of reading the storage's objects and sends those | ||||
back to the journal's topics. | back to the journal's topics. | ||||
This is designed to be run periodically. | This is designed to be run periodically. | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | def run(self, object_type, start_object, end_object, dry_run=False): | ||||
journal's reading topic. | journal's reading topic. | ||||
""" | """ | ||||
start_object, end_object = self.parse_arguments( | start_object, end_object = self.parse_arguments( | ||||
object_type, start_object, end_object | object_type, start_object, end_object | ||||
) | ) | ||||
db = BaseDb.connect(self.config["storage_dbconn"]) | db = BaseDb.connect(self.config["storage_dbconn"]) | ||||
writer = KafkaJournalWriter( | writer = KafkaJournalWriter(**self.config["journal_writer"]) | ||||
brokers=self.config["brokers"], | |||||
prefix=self.config["prefix"], | |||||
client_id=self.config["client_id"], | |||||
) | |||||
for range_start, range_end in RANGE_GENERATORS[object_type]( | for range_start, range_end in RANGE_GENERATORS[object_type]( | ||||
start_object, end_object | start_object, end_object | ||||
): | ): | ||||
logger.info( | logger.info( | ||||
"Processing %s range %s to %s", | "Processing %s range %s to %s", | ||||
object_type, | object_type, | ||||
_format_range_bound(range_start), | _format_range_bound(range_start), | ||||
_format_range_bound(range_end), | _format_range_bound(range_end), | ||||
Show All 12 Lines |