diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -36,8 +36,16 @@ default=True, help="Check whether the destination contains the object before copying.", ) +@click.option( + "--concurrency", + default=4, + help=( + "Number of concurrent threads doing the actual copy of blobs between " + "the source and destination objstorages." + ), +) @click.pass_context -def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): +def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst, concurrency): """Fill a destination Object Storage using a journal stream. This is typically used for a mirror configuration, by reading a Journal @@ -62,6 +70,13 @@ ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. + ``--concurrency N`` sets the number of threads in charge of copy blob objects + from the source objstorage to the destination one. Using a large concurrency + value make sense if both the source and destination objstorages support highly + parallel workloads. Make not to set the ``batch_size`` configuration option too + low for the concurrency to be actually useful (each batch of kafka messages is + dispatched among the threads). + The expected configuration file should have 3 sections: - objstorage: the source object storage from which to retrieve objects to @@ -152,6 +167,7 @@ dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst, + concurrency=concurrency, ) if notify: diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -3,6 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor +from concurrent.futures import wait as futures_wait import logging from time import time from typing import Callable, Dict, List, Optional @@ -216,6 +218,7 @@ dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, + concurrency: int = 16, ): """ Takes a list of records from Kafka (see @@ -268,52 +271,66 @@ nb_failures = 0 t0 = time() - for (object_type, objects) in all_objects.items(): - if object_type != "content": - logger.warning( - "Received a series of %s, this should not happen", object_type + def _copy_object(obj): + nonlocal nb_skipped + nonlocal nb_failures + + obj_id = obj[ID_HASH_ALGO] + if obj["status"] != "visible": + nb_skipped += 1 + logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]) + statsd.increment( + CONTENT_OPERATIONS_METRIC, + tags={"decision": "skipped", "status": obj["status"]}, ) - continue - for obj in objects: - obj_id = obj[ID_HASH_ALGO] - if obj["status"] != "visible": - nb_skipped += 1 - logger.debug( - "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] - ) - statsd.increment( - CONTENT_OPERATIONS_METRIC, - tags={"decision": "skipped", "status": obj["status"]}, - ) - elif exclude_fn and exclude_fn(obj): + elif exclude_fn and exclude_fn(obj): + nb_skipped += 1 + logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) + elif check_dst and obj_in_objstorage(obj_id, dst): + nb_skipped += 1 + logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) + else: + try: + copied = copy_object(obj_id, src, dst) + except ObjNotFoundError: nb_skipped += 1 - logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} + CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) - elif check_dst and obj_in_objstorage(obj_id, dst): - nb_skipped += 1 - logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) - statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: - try: - copied = copy_object(obj_id, src, dst) - except ObjNotFoundError: - nb_skipped += 1 + if copied is None: + nb_failures += 1 statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} + CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) else: - if copied is None: - nb_failures += 1 - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} - ) - else: - vol.append(copied) - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} - ) + vol.append(copied) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} + ) + + with ThreadPoolExecutor(max_workers=concurrency) as pool: + futures = [] + for (object_type, objects) in all_objects.items(): + if object_type != "content": + logger.warning( + "Received a series of %s, this should not happen", object_type + ) + continue + for obj in objects: + futures.append(pool.submit(_copy_object, obj=obj)) + + futures_wait(futures, return_when=FIRST_EXCEPTION) + for f in futures: + if f.running(): + continue + exc = f.exception() + if exc: + pool.shutdown(wait=False) + f.result() + raise exc dt = time() - t0 logger.info(