Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
Show First 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | if not isinstance(conf['brokers'], (list, tuple)): | ||||
conf['brokers'] = [conf['brokers']] | conf['brokers'] = [conf['brokers']] | ||||
return JournalClient(**conf) | return JournalClient(**conf) | ||||
@cli.command() | @cli.command() | ||||
@click.option('--stop-after-objects', '-n', default=None, type=int, | @click.option('--stop-after-objects', '-n', default=None, type=int, | ||||
help='Stop after processing this many objects. Default is to ' | help='Stop after processing this many objects. Default is to ' | ||||
'run forever.') | 'run forever.') | ||||
@click.option('--broker', 'brokers', type=str, multiple=True, | |||||
help='Kafka broker to connect to. ' | |||||
'(deprecated, use the config file instead)') | |||||
@click.option('--prefix', type=str, default=None, | |||||
help='Prefix of Kafka topic names to read from. ' | |||||
'(deprecated, use the config file instead)') | |||||
@click.option('--group-id', type=str, | |||||
help='Name of the group id for reading from Kafka. ' | |||||
'(deprecated, use the config file instead)') | |||||
@click.pass_context | @click.pass_context | ||||
def replay(ctx, brokers, prefix, group_id, stop_after_objects): | def replay(ctx, stop_after_objects): | ||||
"""Fill a Storage by reading a Journal. | """Fill a Storage by reading a Journal. | ||||
There can be several 'replayers' filling a Storage as long as they use | There can be several 'replayers' filling a Storage as long as they use | ||||
the same `group-id`. | the same `group-id`. | ||||
""" | """ | ||||
conf = ctx.obj['config'] | conf = ctx.obj['config'] | ||||
try: | try: | ||||
storage = get_storage(**conf.pop('storage')) | storage = get_storage(**conf.pop('storage')) | ||||
except KeyError: | except KeyError: | ||||
ctx.fail('You must have a storage configured in your config file.') | ctx.fail('You must have a storage configured in your config file.') | ||||
client = get_journal_client( | client = get_journal_client( | ||||
ctx, brokers=brokers, prefix=prefix, group_id=group_id, | ctx, stop_after_objects=stop_after_objects) | ||||
stop_after_objects=stop_after_objects) | |||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
if notify: | if notify: | ||||
notify('READY=1') | notify('READY=1') | ||||
try: | try: | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | except KeyboardInterrupt: | ||||
notify('STOPPING=1') | notify('STOPPING=1') | ||||
ctx.exit(0) | ctx.exit(0) | ||||
@cli.command('content-replay') | @cli.command('content-replay') | ||||
@click.option('--stop-after-objects', '-n', default=None, type=int, | @click.option('--stop-after-objects', '-n', default=None, type=int, | ||||
help='Stop after processing this many objects. Default is to ' | help='Stop after processing this many objects. Default is to ' | ||||
'run forever.') | 'run forever.') | ||||
@click.option('--broker', 'brokers', type=str, multiple=True, | |||||
help='Kafka broker to connect to.' | |||||
'(deprecated, use the config file instead)') | |||||
@click.option('--prefix', type=str, default=None, | |||||
help='Prefix of Kafka topic names to read from.' | |||||
'(deprecated, use the config file instead)') | |||||
@click.option('--group-id', type=str, | |||||
help='Name of the group id for reading from Kafka.' | |||||
'(deprecated, use the config file instead)') | |||||
@click.option('--exclude-sha1-file', default=None, type=click.File('rb'), | @click.option('--exclude-sha1-file', default=None, type=click.File('rb'), | ||||
help='File containing a sorted array of hashes to be excluded.') | help='File containing a sorted array of hashes to be excluded.') | ||||
@click.option('--check-dst/--no-check-dst', default=True, | @click.option('--check-dst/--no-check-dst', default=True, | ||||
help='Check whether the destination contains the object before ' | help='Check whether the destination contains the object before ' | ||||
'copying.') | 'copying.') | ||||
@click.pass_context | @click.pass_context | ||||
def content_replay(ctx, stop_after_objects, | def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): | ||||
brokers, prefix, group_id, exclude_sha1_file, check_dst): | |||||
"""Fill a destination Object Storage (typically a mirror) by reading a Journal | """Fill a destination Object Storage (typically a mirror) by reading a Journal | ||||
and retrieving objects from an existing source ObjStorage. | and retrieving objects from an existing source ObjStorage. | ||||
There can be several 'replayers' filling a given ObjStorage as long as they | There can be several 'replayers' filling a given ObjStorage as long as they | ||||
use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` | use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` | ||||
environment variable to use KIP-345 static group membership. | environment variable to use KIP-345 static group membership. | ||||
This service retrieves object ids to copy from the 'content' topic. It will | This service retrieves object ids to copy from the 'content' topic. It will | ||||
Show All 31 Lines | if exclude_sha1_file: | ||||
nb_excluded_hashes = int(map_.size()/SHA1_SIZE) | nb_excluded_hashes = int(map_.size()/SHA1_SIZE) | ||||
def exclude_fn(obj): | def exclude_fn(obj): | ||||
return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) | return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) | ||||
else: | else: | ||||
exclude_fn = None | exclude_fn = None | ||||
client = get_journal_client( | client = get_journal_client( | ||||
ctx, brokers=brokers, prefix=prefix, group_id=group_id, | ctx, stop_after_objects=stop_after_objects, object_types=('content',)) | ||||
stop_after_objects=stop_after_objects, object_types=('content',)) | |||||
worker_fn = functools.partial( | worker_fn = functools.partial( | ||||
process_replay_objects_content, | process_replay_objects_content, | ||||
src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, | src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, | ||||
check_dst=check_dst) | check_dst=check_dst) | ||||
if notify: | if notify: | ||||
notify('READY=1') | notify('READY=1') | ||||
Show All 19 Lines |