diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -50,25 +50,14 @@ ctx.obj['config'] = conf -def get_journal_client(ctx, brokers, prefix, group_id, object_types=None): - conf = ctx.obj['config'] - if not brokers: - brokers = conf.get('journal', {}).get('brokers') - if not brokers: +def get_journal_client(ctx, **kwargs): + conf = ctx.obj['config'].get('journal', {}) + conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) + if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') - if not isinstance(brokers, (list, tuple)): - brokers = [brokers] - - if prefix is None: - prefix = conf.get('journal', {}).get('prefix') - - if group_id is None: - group_id = conf.get('journal', {}).get('group_id') - - kwargs = dict(brokers=brokers, group_id=group_id, prefix=prefix) - if object_types: - kwargs['object_types'] = object_types - return JournalClient(**kwargs) + if not isinstance(conf['brokers'], (list, tuple)): + conf['brokers'] = [conf['brokers']] + return JournalClient(**conf) @cli.command() @@ -98,7 +87,8 @@ except KeyError: ctx.fail('You must have a storage configured in your config file.') - client = get_journal_client(ctx, brokers, prefix, group_id) + client = get_journal_client( + ctx, brokers=brokers, prefix=prefix, group_id=group_id) worker_fn = functools.partial(process_replay_objects, storage=storage) try: @@ -147,6 +137,9 @@ @cli.command() +@click.option('--concurrency', type=int, + default=8, + help='Concurrentcy level.') @click.option('--broker', 'brokers', type=str, multiple=True, hidden=True, # prefer config file help='Kafka broker to connect to.') @@ -157,7 +150,7 @@ hidden=True, # prefer config file help='Name of the consumer/group id for reading from Kafka.') @click.pass_context -def content_replay(ctx, brokers, prefix, group_id): +def content_replay(ctx, concurrency, brokers, prefix, group_id): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. @@ -181,11 +174,13 @@ ctx.fail('You must have a destination objstorage configured ' 'in your config file.') - client = get_journal_client(ctx, brokers, prefix, group_id, - object_types=('content',)) + client = get_journal_client( + ctx, brokers=brokers, prefix=prefix, group_id=group_id, + object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, - dst=objstorage_dst) + dst=objstorage_dst, + concurrency=concurrency) try: nb_messages = 0 @@ -195,7 +190,7 @@ except KeyboardInterrupt: ctx.exit(0) else: - print('Done.') + logger.info('Done.') def main(): diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -46,10 +46,12 @@ Messages are processed by the `process_objects` method in batches of maximum `max_messages`. + Any other named argument is passed directly to KafkaConsumer(). + """ def __init__( self, brokers, group_id, prefix=None, object_types=None, - max_messages=0, auto_offset_reset='earliest'): + max_messages=0, auto_offset_reset='earliest', **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: @@ -72,7 +74,7 @@ auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=group_id, - ) + **kwargs) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type) diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -3,12 +3,14 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from time import time import logging +from concurrent.futures import ThreadPoolExecutor from swh.storage import HashCollision from swh.model.hashutil import hash_to_hex from swh.objstorage.objstorage import ID_HASH_ALGO - +from swh.core.statsd import statsd logger = logging.getLogger(__name__) @@ -44,22 +46,45 @@ object_type) -def process_replay_objects_content(all_objects, *, src, dst): - for (object_type, objects) in all_objects.items(): - if object_type != 'content': - logger.warning('Received a series of %s, this should not happen', - object_type) - continue - logger.info('processing %s content objects', len(objects)) - for obj in objects: - obj_id = obj[ID_HASH_ALGO] - if obj['status'] == 'visible': - try: - obj = src.get(obj_id) - dst.add(obj, obj_id=obj_id) - logger.debug('copied %s', hash_to_hex(obj_id)) - except Exception: - logger.exception('Failed to copy %s', hash_to_hex(obj_id)) - else: - logger.debug('skipped %s (%s)', - hash_to_hex(obj_id), obj['status']) +def copy_object(obj_id, src, dst): + statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' + try: + with statsd.timed(statsd_name % 'get'): + obj = src.get(obj_id) + with statsd.timed(statsd_name % 'put'): + dst.add(obj, obj_id=obj_id, check_presence=False) + logger.debug('copied %s', hash_to_hex(obj_id)) + statsd.increment( + 'swh_journal_content_replayer_bytes_total', + len(obj)) + except Exception: + obj = '' + logger.exception('Failed to copy %s', hash_to_hex(obj_id)) + return len(obj) + + +def process_replay_objects_content(all_objects, *, src, dst, concurrency=8): + vol = [] + t0 = time() + with ThreadPoolExecutor(max_workers=concurrency) as executor: + for (object_type, objects) in all_objects.items(): + if object_type != 'content': + logger.warning( + 'Received a series of %s, this should not happen', + object_type) + continue + for obj in objects: + obj_id = obj[ID_HASH_ALGO] + if obj['status'] == 'visible': + fut = executor.submit(copy_object, obj_id, src, dst) + fut.add_done_callback(lambda fn: vol.append(fn.result())) + else: + logger.debug('skipped %s (%s)', + hash_to_hex(obj_id), obj['status']) + dt = time() - t0 + logger.info( + 'processed %s content objects in %.1fsec ' + '(%.1f obj/sec, %.1fMB/sec) - %s failures', + len(vol), dt, + len(vol)/dt, + sum(vol)/1024/1024/dt, len([x for x in vol if not x]))