Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
Show First 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | @click.option('--consumer-id', type=str, | ||||
help='Name of the consumer/group id for reading from Kafka.') | help='Name of the consumer/group id for reading from Kafka.') | ||||
@click.pass_context | @click.pass_context | ||||
def replay(ctx, brokers, prefix, consumer_id, max_messages): | def replay(ctx, brokers, prefix, consumer_id, max_messages): | ||||
"""Fill a new storage by reading a journal. | """Fill a new storage by reading a journal. | ||||
""" | """ | ||||
conf = ctx.obj['config'] | conf = ctx.obj['config'] | ||||
storage = get_storage(**conf.pop('storage')) | storage = get_storage(**conf.pop('storage')) | ||||
replayer = StorageReplayer(brokers, prefix, consumer_id) | replayer = StorageReplayer(brokers, prefix, consumer_id, | ||||
storage=storage, max_messages=max_messages) | |||||
try: | try: | ||||
replayer.fill(storage, max_messages=max_messages) | replayer.process() | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
else: | else: | ||||
print('Done.') | print('Done.') | ||||
@cli.command() | @cli.command() | ||||
@click.argument('object_type') | @click.argument('object_type') | ||||
Show All 25 Lines |