Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/backfill.py
Show First 20 Lines • Show All 360 Lines • ▼ Show 20 Lines | with db.cursor() as cursor: | ||||
record = dict(zip(column_aliases, row)) | record = dict(zip(column_aliases, row)) | ||||
if converter: | if converter: | ||||
record = converter(db, record) | record = converter(db, record) | ||||
logger.debug('record: %s' % record) | logger.debug('record: %s' % record) | ||||
yield record | yield record | ||||
MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id'] | MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id'] | ||||
vlorentz: This is probably outside the scope of this diff, but `storage_dbconn` could be renamed… | |||||
Done Inline ActionsI've noticed this, but going this way, I would rather use the "standard" swh config structure for services, aka: storage: cls: local args: db: <dsn> douardda: I've noticed this, but going this way, I would rather use the "standard" swh config structure… | |||||
class JournalBackfiller: | class JournalBackfiller: | ||||
"""Class in charge of reading the storage's objects and sends those | """Class in charge of reading the storage's objects and sends those | ||||
back to the journal's topics. | back to the journal's topics. | ||||
This is designed to be run periodically. | This is designed to be run periodically. | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | def run(self, object_type, start_object, end_object, dry_run=False): | ||||
""" | """ | ||||
start_object, end_object = self.parse_arguments( | start_object, end_object = self.parse_arguments( | ||||
object_type, start_object, end_object) | object_type, start_object, end_object) | ||||
db = BaseDb.connect(self.config['storage_dbconn']) | db = BaseDb.connect(self.config['storage_dbconn']) | ||||
writer = DirectKafkaWriter( | writer = DirectKafkaWriter( | ||||
brokers=self.config['brokers'], | brokers=self.config['brokers'], | ||||
prefix=self.config['final_prefix'], | prefix=self.config['prefix'], | ||||
client_id=self.config['client_id'] | client_id=self.config['client_id'] | ||||
) | ) | ||||
for range_start, range_end in RANGE_GENERATORS[object_type]( | for range_start, range_end in RANGE_GENERATORS[object_type]( | ||||
start_object, end_object): | start_object, end_object): | ||||
logger.info('Processing %s range %s to %s', object_type, | logger.info('Processing %s range %s to %s', object_type, | ||||
range_start, range_end) | range_start, range_end) | ||||
for obj in fetch( | for obj in fetch( | ||||
Show All 12 Lines |
This is probably outside the scope of this diff, but storage_dbconn could be renamed storage_dsn, to match psycopg2/libpq's terminology.