diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -149,9 +149,6 @@ @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') -@click.option('--concurrency', type=int, - default=8, - help='Concurrentcy level.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.' '(deprecated, use the config file instead)') @@ -164,7 +161,7 @@ @click.option('--exclude-sha1-file', default=None, type=click.File('rb'), help='File containing a sorted array of hashes to be excluded.') @click.pass_context -def content_replay(ctx, max_messages, concurrency, +def content_replay(ctx, max_messages, brokers, prefix, group_id, exclude_sha1_file): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. @@ -214,7 +211,6 @@ worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, - concurrency=concurrency, exclude_fn=exclude_fn) try: diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -5,7 +5,6 @@ from time import time import logging -from concurrent.futures import ThreadPoolExecutor from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp @@ -259,11 +258,12 @@ len(obj)) except Exception: obj = '' - logger.exception('Failed to copy %s', hash_to_hex(obj_id)) + logger.error('Failed to copy %s', hash_to_hex(obj_id)) + raise return len(obj) -def process_replay_objects_content(all_objects, *, src, dst, concurrency=8, +def process_replay_objects_content(all_objects, *, src, dst, exclude_fn=None): """ Takes a list of records from Kafka (see @@ -312,26 +312,26 @@ vol = [] nb_skipped = 0 t0 = time() - with ThreadPoolExecutor(max_workers=concurrency) as executor: - for (object_type, objects) in all_objects.items(): - if object_type != 'content': - logger.warning( - 'Received a series of %s, this should not happen', - object_type) - continue - for obj in objects: - obj_id = obj[ID_HASH_ALGO] - if obj['status'] != 'visible': - nb_skipped += 1 - logger.debug('skipped %s (status=%s)', - hash_to_hex(obj_id), obj['status']) - elif exclude_fn and exclude_fn(obj): - nb_skipped += 1 - logger.debug('skipped %s (manually excluded)', - hash_to_hex(obj_id)) - else: - fut = executor.submit(copy_object, obj_id, src, dst) - fut.add_done_callback(lambda fn: vol.append(fn.result())) + + for (object_type, objects) in all_objects.items(): + if object_type != 'content': + logger.warning( + 'Received a series of %s, this should not happen', + object_type) + continue + for obj in objects: + obj_id = obj[ID_HASH_ALGO] + if obj['status'] != 'visible': + nb_skipped += 1 + logger.debug('skipped %s (status=%s)', + hash_to_hex(obj_id), obj['status']) + elif exclude_fn and exclude_fn(obj): + nb_skipped += 1 + logger.debug('skipped %s (manually excluded)', + hash_to_hex(obj_id)) + else: + vol.append(copy_object(obj_id, src, dst)) + dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec '