diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -5,6 +5,7 @@ from time import time import logging +from contextlib import contextmanager from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp @@ -18,6 +19,7 @@ def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): + logger.debug("Inserting %s %s objects", len(objects), object_type) _insert_objects(object_type, objects, storage) @@ -245,14 +247,31 @@ return get_hash(left) == hash_ -def copy_object(obj_id, src, dst): +@contextmanager +def retry(max_retries): + lasterror = None + for i in range(max_retries): + try: + yield + break + except Exception as exc: + lasterror = exc + else: + raise lasterror + + +def copy_object(obj_id, src, dst, max_retries=3): statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' try: with statsd.timed(statsd_name % 'get'): - obj = src.get(obj_id) + with retry(max_retries): + obj = src.get(obj_id) + logger.debug('retrieved %s', hash_to_hex(obj_id)) + with statsd.timed(statsd_name % 'put'): - dst.add(obj, obj_id=obj_id, check_presence=False) - logger.debug('copied %s', hash_to_hex(obj_id)) + with retry(max_retries): + dst.add(obj, obj_id=obj_id, check_presence=False) + logger.debug('copied %s', hash_to_hex(obj_id)) statsd.increment( 'swh_journal_content_replayer_bytes_total', len(obj))