diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py index 29a416d..b2873e8 100644 --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -1,178 +1,194 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click try: from systemd.daemon import notify except ImportError: notify = None from swh.objstorage.cli import objstorage_cli_group @objstorage_cli_group.command("replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to run forever.", ) @click.option( "--exclude-sha1-file", default=None, type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) @click.option( "--check-dst/--no-check-dst", default=True, help="Check whether the destination contains the object before copying.", ) +@click.option( + "--concurrency", + default=4, + help=( + "Number of concurrent threads doing the actual copy of blobs between " + "the source and destination objstorages." + ), +) @click.pass_context -def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): +def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst, concurrency): """Fill a destination Object Storage using a journal stream. This is typically used for a mirror configuration, by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID`` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. ``--check-dst`` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. + ``--concurrency N`` sets the number of threads in charge of copy blob objects + from the source objstorage to the destination one. Using a large concurrency + value make sense if both the source and destination objstorages support highly + parallel workloads. Make not to set the ``batch_size`` configuration option too + low for the concurrency to be actually useful (each batch of kafka messages is + dispatched among the threads). + The expected configuration file should have 3 sections: - objstorage: the source object storage from which to retrieve objects to copy; this objstorage can (and should) be a read-only objstorage, https://docs.softwareheritage.org/devel/apidoc/swh.objstorage.html - objstorage_dst: the destination objstorage in which objects will be written into, - journal_client: the configuration of the kafka journal from which the `content` topic will be consumed to get the list of content objects to copy from the source objstorage to the destination one. https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html In addition to these 3 mandatory sections, an optional 'replayer' section can be provided with an 'error_reporter' config entry allowing to specify a Redis connection parameter set that will be used to report objects that could not be copied, eg.:: objstorage: [...] objstorage_dst: [...] journal_client: [...] replayer: error_reporter: host: redis.local port: 6379 db: 1 """ import functools import mmap from swh.journal.client import get_journal_client from swh.model.model import SHA1_SIZE from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer.replay import ( is_hash_in_bytearray, process_replay_objects_content, ) conf = ctx.obj["config"] try: objstorage_src = get_objstorage(**conf.pop("objstorage")) except KeyError: ctx.fail("You must have a source objstorage configured in " "your config file.") try: objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) except KeyError: ctx.fail( "You must have a destination objstorage configured " "in your config file." ) if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail( "--exclude-sha1 must link to a file whose size is an " "exact multiple of %d bytes." % SHA1_SIZE ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) else: exclude_fn = None journal_cfg = conf.pop("journal_client") replayer_cfg = conf.pop("replayer", {}) if "error_reporter" in replayer_cfg: from redis import Redis from swh.objstorage.replayer import replay replay.REPORTER = Redis(**replayer_cfg.get("error_reporter")).set client = get_journal_client( **journal_cfg, stop_after_objects=stop_after_objects, object_types=("content",), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst, + concurrency=concurrency, ) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def main(): logging.basicConfig() return objstorage_cli_group(auto_envvar_prefix="SWH_OBJSTORAGE") if __name__ == "__main__": main() diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py index d71d8ff..5e42b2b 100644 --- a/swh/objstorage/replayer/replay.py +++ b/swh/objstorage/replayer/replay.py @@ -1,331 +1,348 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor +from concurrent.futures import wait as futures_wait import logging from time import time from typing import Callable, Dict, List, Optional import msgpack from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from tenacity.retry import retry_base from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage logger = logging.getLogger(__name__) REPORTER = None CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError("hash_ does not match the provided hash_size.") def get_hash(position): return array[position * hash_size : (position + 1) * hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right - 1: middle = int((right + left) / 2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ class ReplayError(Exception): """An error occurred during the replay of an object""" def __init__(self, *, obj_id, exc): self.obj_id = hash_to_hex(obj_id) self.exc = exc def __str__(self): return "ReplayError(%s, %s)" % (self.obj_id, self.exc) def log_replay_retry(retry_state, sleep=None, last_result=None): """Log a retry of the content replayer""" exc = retry_state.outcome.exception() operation = retry_state.fn.__name__ logger.debug( "Retry operation %(operation)s on %(obj_id)s: %(exc)s", {"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, ) def log_replay_error(retry_state): """Log a replay error to sentry""" exc = retry_state.outcome.exception() with push_scope() as scope: scope.set_tag("operation", retry_state.fn.__name__) scope.set_extra("obj_id", exc.obj_id) capture_exception(exc.exc) error_context = { "obj_id": exc.obj_id, "operation": retry_state.fn.__name__, "exc": str(exc.exc), "retries": retry_state.attempt_number, } logger.error( "Failed operation %(operation)s on %(obj_id)s after %(retries)s" " retries: %(exc)s", error_context, ) # if we have a global error (redis) reporter if REPORTER is not None: oid = f"blob:{exc.obj_id}" msg = msgpack.dumps(error_context) REPORTER(oid, msg) return None CONTENT_REPLAY_RETRIES = 3 class retry_log_if_success(retry_base): """Log in statsd the number of attempts required to succeed""" def __call__(self, retry_state): if not retry_state.outcome.failed: statsd.increment( CONTENT_RETRY_METRIC, tags={ "operation": retry_state.fn.__name__, "attempt": str(retry_state.attempt_number), }, ) return False content_replay_retry = retry( retry=retry_if_exception_type(ReplayError) | retry_log_if_success(), stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), wait=wait_random_exponential(multiplier=1, max=60), before_sleep=log_replay_retry, retry_error_callback=log_replay_error, ) @content_replay_retry def get_object(objstorage, obj_id): try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): obj = objstorage.get(obj_id) logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) return obj except ObjNotFoundError: logger.error( "Failed to retrieve %(obj_id)s: object not found", {"obj_id": hash_to_hex(obj_id)}, ) raise except Exception as exc: raise ReplayError(obj_id=obj_id, exc=exc) from None @content_replay_retry def put_object(objstorage, obj_id, obj): try: with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): obj = objstorage.add(obj, obj_id, check_presence=False) logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) except Exception as exc: raise ReplayError(obj_id=obj_id, exc=exc) from None def copy_object(obj_id, src, dst): obj = get_object(src, obj_id) if obj is not None: put_object(dst, obj_id, obj) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) return len(obj) return 0 @content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" try: return obj_id in dst except Exception as exc: raise ReplayError(obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, + concurrency: int = 16, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage.factory import get_objstorage >>> src = get_objstorage('memory') >>> dst = get_objstorage('memory') >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 nb_failures = 0 t0 = time() - for (object_type, objects) in all_objects.items(): - if object_type != "content": - logger.warning( - "Received a series of %s, this should not happen", object_type + def _copy_object(obj): + nonlocal nb_skipped + nonlocal nb_failures + + obj_id = obj[ID_HASH_ALGO] + if obj["status"] != "visible": + nb_skipped += 1 + logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]) + statsd.increment( + CONTENT_OPERATIONS_METRIC, + tags={"decision": "skipped", "status": obj["status"]}, ) - continue - for obj in objects: - obj_id = obj[ID_HASH_ALGO] - if obj["status"] != "visible": - nb_skipped += 1 - logger.debug( - "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] - ) - statsd.increment( - CONTENT_OPERATIONS_METRIC, - tags={"decision": "skipped", "status": obj["status"]}, - ) - elif exclude_fn and exclude_fn(obj): + elif exclude_fn and exclude_fn(obj): + nb_skipped += 1 + logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) + elif check_dst and obj_in_objstorage(obj_id, dst): + nb_skipped += 1 + logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) + else: + try: + copied = copy_object(obj_id, src, dst) + except ObjNotFoundError: nb_skipped += 1 - logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} + CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} ) - elif check_dst and obj_in_objstorage(obj_id, dst): - nb_skipped += 1 - logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) - statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: - try: - copied = copy_object(obj_id, src, dst) - except ObjNotFoundError: - nb_skipped += 1 + if copied is None: + nb_failures += 1 statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} + CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} ) else: - if copied is None: - nb_failures += 1 - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} - ) - else: - vol.append(copied) - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} - ) + vol.append(copied) + statsd.increment( + CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} + ) + + with ThreadPoolExecutor(max_workers=concurrency) as pool: + futures = [] + for (object_type, objects) in all_objects.items(): + if object_type != "content": + logger.warning( + "Received a series of %s, this should not happen", object_type + ) + continue + for obj in objects: + futures.append(pool.submit(_copy_object, obj=obj)) + + futures_wait(futures, return_when=FIRST_EXCEPTION) + for f in futures: + if f.running(): + continue + exc = f.exception() + if exc: + pool.shutdown(wait=False) + f.result() + raise exc dt = time() - t0 logger.info( "processed %s content objects in %.1fsec " "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", len(vol), dt, len(vol) / dt, sum(vol) / 1024 / 1024 / dt, nb_failures, nb_skipped, ) if notify: notify("WATCHDOG=1")