Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
Show First 20 Lines • Show All 131 Lines • ▼ Show 20 Lines | try: | ||||
object_type=object_type, | object_type=object_type, | ||||
start_object=start_object, end_object=end_object, | start_object=start_object, end_object=end_object, | ||||
dry_run=dry_run) | dry_run=dry_run) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
@cli.command() | @cli.command() | ||||
@click.option('--max-messages', '-m', default=None, type=int, | |||||
help='Maximum number of objects to replay. Default is to ' | |||||
'run forever.') | |||||
@click.option('--concurrency', type=int, | @click.option('--concurrency', type=int, | ||||
default=8, | default=8, | ||||
help='Concurrentcy level.') | help='Concurrentcy level.') | ||||
@click.option('--broker', 'brokers', type=str, multiple=True, | @click.option('--broker', 'brokers', type=str, multiple=True, | ||||
help='Kafka broker to connect to.' | help='Kafka broker to connect to.' | ||||
'(deprecated, use the config file instead)') | '(deprecated, use the config file instead)') | ||||
@click.option('--prefix', type=str, default=None, | @click.option('--prefix', type=str, default=None, | ||||
help='Prefix of Kafka topic names to read from.' | help='Prefix of Kafka topic names to read from.' | ||||
'(deprecated, use the config file instead)') | '(deprecated, use the config file instead)') | ||||
@click.option('--group-id', type=str, | @click.option('--group-id', type=str, | ||||
help='Name of the group id for reading from Kafka.' | help='Name of the group id for reading from Kafka.' | ||||
'(deprecated, use the config file instead)') | '(deprecated, use the config file instead)') | ||||
@click.pass_context | @click.pass_context | ||||
def content_replay(ctx, concurrency, brokers, prefix, group_id): | def content_replay(ctx, max_messages, concurrency, brokers, prefix, group_id): | ||||
"""Fill a destination Object Storage (typically a mirror) by reading a Journal | """Fill a destination Object Storage (typically a mirror) by reading a Journal | ||||
and retrieving objects from an existing source ObjStorage. | and retrieving objects from an existing source ObjStorage. | ||||
There can be several 'replayers' filling a given ObjStorage as long as they | There can be several 'replayers' filling a given ObjStorage as long as they | ||||
use the same `group-id`. | use the same `group-id`. | ||||
This service retrieves object ids to copy from the 'content' topic. It will | This service retrieves object ids to copy from the 'content' topic. It will | ||||
only copy object's content if the object's description in the kafka | only copy object's content if the object's description in the kafka | ||||
Show All 17 Lines | client = get_journal_client( | ||||
object_types=('content',)) | object_types=('content',)) | ||||
worker_fn = functools.partial(process_replay_objects_content, | worker_fn = functools.partial(process_replay_objects_content, | ||||
src=objstorage_src, | src=objstorage_src, | ||||
dst=objstorage_dst, | dst=objstorage_dst, | ||||
concurrency=concurrency) | concurrency=concurrency) | ||||
try: | try: | ||||
nb_messages = 0 | nb_messages = 0 | ||||
while True: | while not max_messages or nb_messages < max_messages: | ||||
nb_messages += client.process(worker_fn) | nb_messages += client.process(worker_fn) | ||||
logger.info('Processed %d messages.' % nb_messages) | logger.info('Processed %d messages.' % nb_messages) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
else: | else: | ||||
logger.info('Done.') | print('Done.') | ||||
def main(): | def main(): | ||||
logging.basicConfig() | logging.basicConfig() | ||||
return cli(auto_envvar_prefix='SWH_JOURNAL') | return cli(auto_envvar_prefix='SWH_JOURNAL') | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
main() | main() |