Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
Show First 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | def backfiller(ctx, object_type, start_object, end_object, dry_run): | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
@cli.command() | @cli.command() | ||||
@click.option('--max-messages', '-m', default=None, type=int, | @click.option('--max-messages', '-m', default=None, type=int, | ||||
help='Maximum number of objects to replay. Default is to ' | help='Maximum number of objects to replay. Default is to ' | ||||
'run forever.') | 'run forever.') | ||||
@click.option('--concurrency', type=int, | |||||
default=8, | |||||
help='Concurrentcy level.') | |||||
@click.option('--broker', 'brokers', type=str, multiple=True, | @click.option('--broker', 'brokers', type=str, multiple=True, | ||||
help='Kafka broker to connect to.' | help='Kafka broker to connect to.' | ||||
'(deprecated, use the config file instead)') | '(deprecated, use the config file instead)') | ||||
@click.option('--prefix', type=str, default=None, | @click.option('--prefix', type=str, default=None, | ||||
help='Prefix of Kafka topic names to read from.' | help='Prefix of Kafka topic names to read from.' | ||||
'(deprecated, use the config file instead)') | '(deprecated, use the config file instead)') | ||||
@click.option('--group-id', type=str, | @click.option('--group-id', type=str, | ||||
help='Name of the group id for reading from Kafka.' | help='Name of the group id for reading from Kafka.' | ||||
'(deprecated, use the config file instead)') | '(deprecated, use the config file instead)') | ||||
@click.option('--exclude-sha1-file', default=None, type=click.File('rb'), | @click.option('--exclude-sha1-file', default=None, type=click.File('rb'), | ||||
help='File containing a sorted array of hashes to be excluded.') | help='File containing a sorted array of hashes to be excluded.') | ||||
@click.pass_context | @click.pass_context | ||||
def content_replay(ctx, max_messages, concurrency, | def content_replay(ctx, max_messages, | ||||
brokers, prefix, group_id, exclude_sha1_file): | brokers, prefix, group_id, exclude_sha1_file): | ||||
"""Fill a destination Object Storage (typically a mirror) by reading a Journal | """Fill a destination Object Storage (typically a mirror) by reading a Journal | ||||
and retrieving objects from an existing source ObjStorage. | and retrieving objects from an existing source ObjStorage. | ||||
There can be several 'replayers' filling a given ObjStorage as long as they | There can be several 'replayers' filling a given ObjStorage as long as they | ||||
use the same `group-id`. | use the same `group-id`. | ||||
This service retrieves object ids to copy from the 'content' topic. It will | This service retrieves object ids to copy from the 'content' topic. It will | ||||
Show All 33 Lines | else: | ||||
exclude_fn = None | exclude_fn = None | ||||
client = get_journal_client( | client = get_journal_client( | ||||
ctx, brokers=brokers, prefix=prefix, group_id=group_id, | ctx, brokers=brokers, prefix=prefix, group_id=group_id, | ||||
object_types=('content',)) | object_types=('content',)) | ||||
worker_fn = functools.partial(process_replay_objects_content, | worker_fn = functools.partial(process_replay_objects_content, | ||||
src=objstorage_src, | src=objstorage_src, | ||||
dst=objstorage_dst, | dst=objstorage_dst, | ||||
concurrency=concurrency, | |||||
exclude_fn=exclude_fn) | exclude_fn=exclude_fn) | ||||
try: | try: | ||||
nb_messages = 0 | nb_messages = 0 | ||||
last_log_time = 0 | last_log_time = 0 | ||||
while not max_messages or nb_messages < max_messages: | while not max_messages or nb_messages < max_messages: | ||||
nb_messages += client.process(worker_fn) | nb_messages += client.process(worker_fn) | ||||
if time.time() - last_log_time >= 60: | if time.time() - last_log_time >= 60: | ||||
Show All 16 Lines |