Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/backfill.py
Show First 20 Lines • Show All 592 Lines • ▼ Show 20 Lines | class JournalBackfiller: | ||||
This is designed to be run periodically. | This is designed to be run periodically. | ||||
""" | """ | ||||
def __init__(self, config=None): | def __init__(self, config=None): | ||||
self.config = config | self.config = config | ||||
self.check_config(config) | self.check_config(config) | ||||
self._db = None | |||||
self.writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) | |||||
assert self.writer.journal is not None | |||||
@property | |||||
def db(self): | |||||
if self._db is None: | |||||
self._db = BaseDb.connect(self.config["storage"]["db"]) | |||||
return self._db | |||||
def check_config(self, config): | def check_config(self, config): | ||||
missing_keys = [] | missing_keys = [] | ||||
for key in MANDATORY_KEYS: | for key in MANDATORY_KEYS: | ||||
if not config.get(key): | if not config.get(key): | ||||
missing_keys.append(key) | missing_keys.append(key) | ||||
if missing_keys: | if missing_keys: | ||||
Show All 39 Lines | def run(self, object_type, start_object, end_object, dry_run=False): | ||||
"""Reads storage's subscribed object types and send them to the | """Reads storage's subscribed object types and send them to the | ||||
journal's reading topic. | journal's reading topic. | ||||
""" | """ | ||||
start_object, end_object = self.parse_arguments( | start_object, end_object = self.parse_arguments( | ||||
object_type, start_object, end_object | object_type, start_object, end_object | ||||
) | ) | ||||
db = BaseDb.connect(self.config["storage"]["db"]) | |||||
writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) | |||||
assert writer.journal is not None | |||||
for range_start, range_end in RANGE_GENERATORS[object_type]( | for range_start, range_end in RANGE_GENERATORS[object_type]( | ||||
start_object, end_object | start_object, end_object | ||||
): | ): | ||||
logger.info( | logger.info( | ||||
"Processing %s range %s to %s", | "Processing %s range %s to %s", | ||||
object_type, | object_type, | ||||
_format_range_bound(range_start), | _format_range_bound(range_start), | ||||
_format_range_bound(range_end), | _format_range_bound(range_end), | ||||
) | ) | ||||
objects = fetch(db, object_type, start=range_start, end=range_end) | objects = fetch(self.db, object_type, start=range_start, end=range_end) | ||||
if not dry_run: | if not dry_run: | ||||
writer.write_additions(object_type, objects) | self.writer.write_additions(object_type, objects) | ||||
else: | else: | ||||
# only consume the objects iterator to check for any potential | # only consume the objects iterator to check for any potential | ||||
# decoding/encoding errors | # decoding/encoding errors | ||||
for obj in objects: | for obj in objects: | ||||
pass | pass | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
print('Please use the "swh-journal backfiller run" command') | print('Please use the "swh-journal backfiller run" command') |