Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/backfill.py
Show First 20 Lines • Show All 447 Lines • ▼ Show 20 Lines | |||||
def _format_range_bound(bound): | def _format_range_bound(bound): | ||||
if isinstance(bound, bytes): | if isinstance(bound, bytes): | ||||
return bound.hex() | return bound.hex() | ||||
else: | else: | ||||
return str(bound) | return str(bound) | ||||
MANDATORY_KEYS = ["storage_dbconn", "journal_writer"] | MANDATORY_KEYS = ["storage", "journal_writer"] | ||||
class JournalBackfiller: | class JournalBackfiller: | ||||
"""Class in charge of reading the storage's objects and sends those | """Class in charge of reading the storage's objects and sends those | ||||
back to the journal's topics. | back to the journal's topics. | ||||
This is designed to be run periodically. | This is designed to be run periodically. | ||||
Show All 10 Lines | def check_config(self, config): | ||||
missing_keys.append(key) | missing_keys.append(key) | ||||
if missing_keys: | if missing_keys: | ||||
raise ValueError( | raise ValueError( | ||||
"Configuration error: The following keys must be" | "Configuration error: The following keys must be" | ||||
" provided: %s" % (",".join(missing_keys),) | " provided: %s" % (",".join(missing_keys),) | ||||
) | ) | ||||
if "cls" not in config["storage"] or config["storage"]["cls"] != "local": | |||||
raise ValueError( | |||||
"swh storage backfiller must be configured to use a local" | |||||
" (PostgreSQL) storage" | |||||
) | |||||
def parse_arguments(self, object_type, start_object, end_object): | def parse_arguments(self, object_type, start_object, end_object): | ||||
"""Parse arguments | """Parse arguments | ||||
Raises: | Raises: | ||||
ValueError for unsupported object type | ValueError for unsupported object type | ||||
ValueError if object ids are not parseable | ValueError if object ids are not parseable | ||||
Returns: | Returns: | ||||
Show All 23 Lines | def run(self, object_type, start_object, end_object, dry_run=False): | ||||
"""Reads storage's subscribed object types and send them to the | """Reads storage's subscribed object types and send them to the | ||||
journal's reading topic. | journal's reading topic. | ||||
""" | """ | ||||
start_object, end_object = self.parse_arguments( | start_object, end_object = self.parse_arguments( | ||||
object_type, start_object, end_object | object_type, start_object, end_object | ||||
) | ) | ||||
db = BaseDb.connect(self.config["storage_dbconn"]) | db = BaseDb.connect(self.config["storage"]["db"]) | ||||
writer = KafkaJournalWriter(**self.config["journal_writer"]) | writer = KafkaJournalWriter(**self.config["journal_writer"]) | ||||
for range_start, range_end in RANGE_GENERATORS[object_type]( | for range_start, range_end in RANGE_GENERATORS[object_type]( | ||||
start_object, end_object | start_object, end_object | ||||
): | ): | ||||
logger.info( | logger.info( | ||||
"Processing %s range %s to %s", | "Processing %s range %s to %s", | ||||
object_type, | object_type, | ||||
_format_range_bound(range_start), | _format_range_bound(range_start), | ||||
Show All 13 Lines |