diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -3,6 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor +from concurrent.futures import wait as futures_wait import logging from time import time from typing import Callable, Dict, List, Optional @@ -216,6 +218,7 @@ dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, + concurrency: int = 16, ): """ Takes a list of records from Kafka (see @@ -268,52 +271,66 @@ nb_failures = 0 t0 = time() - for (object_type, objects) in all_objects.items(): - if object_type != "content": - logger.warning( - "Received a series of %s, this should not happen", object_type + def _copy_object(obj): + nonlocal nb_skipped + nonlocal nb_failures + + obj_id = obj[ID_HASH_ALGO] + if obj["status"] != "visible": + nb_skipped += 1 + logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]) + statsd.increment( + CONTENT_OPERATIONS_METRIC, + tags={"decision": "skipped", "status": obj["status"]}, ) - continue - for obj in objects: - obj_id = obj[ID_HASH_ALGO] - if obj["status"] != "visible": - nb_skipped += 1 - logger.debug( - "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] - ) - statsd.increment( - CONTENT_OPERATIONS_METRIC, - tags={"decision": "skipped", "status": obj["status"]}, - ) - elif exclude_fn and exclude_fn(obj): + elif exclude_fn and exclude_fn(obj): + nb_skipped += 1 + logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) + elif check_dst and obj_in_objstorage(obj_id, dst): + nb_skipped += 1 + logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) + else: + try: + copied = copy_object(obj_id, src, dst) + except ObjNotFoundError: nb_skipped += 1 - logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} + CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) - elif check_dst and obj_in_objstorage(obj_id, dst): - nb_skipped += 1 - logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) - statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: - try: - copied = copy_object(obj_id, src, dst) - except ObjNotFoundError: - nb_skipped += 1 + if copied is None: + nb_failures += 1 statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} + CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) else: - if copied is None: - nb_failures += 1 - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} - ) - else: - vol.append(copied) - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} - ) + vol.append(copied) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} + ) + + with ThreadPoolExecutor(max_workers=concurrency) as pool: + futures = [] + for (object_type, objects) in all_objects.items(): + if object_type != "content": + logger.warning( + "Received a series of %s, this should not happen", object_type + ) + continue + for obj in objects: + futures.append(pool.submit(_copy_object, obj=obj)) + + futures_wait(futures, return_when=FIRST_EXCEPTION) + for f in futures: + if f.running(): + continue + exc = f.exception() + if exc: + pool.shutdown(wait=False) + f.result() + raise exc dt = time() - t0 logger.info(