Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/replayer/replay.py
Show All 10 Lines | |||||
try: | try: | ||||
from systemd.daemon import notify | from systemd.daemon import notify | ||||
except ImportError: | except ImportError: | ||||
notify = None | notify = None | ||||
from tenacity import ( | from tenacity import ( | ||||
retry, | retry, | ||||
retry_base, | |||||
retry_if_exception_type, | retry_if_exception_type, | ||||
stop_after_attempt, | stop_after_attempt, | ||||
wait_random_exponential, | wait_random_exponential, | ||||
) | ) | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | while left < right - 1: | ||||
else: | else: | ||||
right = middle | right = middle | ||||
return get_hash(left) == hash_ | return get_hash(left) == hash_ | ||||
class ReplayError(Exception): | class ReplayError(Exception): | ||||
"""An error occurred during the replay of an object""" | """An error occurred during the replay of an object""" | ||||
def __init__(self, operation, *, obj_id, exc): | def __init__(self, *, obj_id, exc): | ||||
self.operation = operation | |||||
self.obj_id = hash_to_hex(obj_id) | self.obj_id = hash_to_hex(obj_id) | ||||
self.exc = exc | self.exc = exc | ||||
def __str__(self): | def __str__(self): | ||||
return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) | return "ReplayError(%s, %s)" % (self.obj_id, self.exc) | ||||
def log_replay_retry(retry_state, sleep=None, last_result=None): | def log_replay_retry(retry_state, sleep=None, last_result=None): | ||||
"""Log a retry of the content replayer""" | """Log a retry of the content replayer""" | ||||
try: | |||||
exc = retry_state.outcome.exception() | exc = retry_state.outcome.exception() | ||||
attempt_number = retry_state.attempt_number | operation = retry_state.fn.__name__ | ||||
except AttributeError: | |||||
# tenacity < 5.0 | |||||
exc = last_result.exception() | |||||
attempt_number = retry_state.statistics["attempt_number"] | |||||
logger.debug( | logger.debug( | ||||
"Retry operation %(operation)s on %(obj_id)s: %(exc)s", | "Retry operation %(operation)s on %(obj_id)s: %(exc)s", | ||||
{"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, | {"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, | ||||
) | |||||
statsd.increment( | |||||
CONTENT_RETRY_METRIC, | |||||
tags={"operation": exc.operation, "attempt": str(attempt_number),}, | |||||
) | ) | ||||
def log_replay_error(retry_state): | def log_replay_error(retry_state): | ||||
"""Log a replay error to sentry""" | """Log a replay error to sentry""" | ||||
try: | |||||
exc = retry_state.outcome.exception() | exc = retry_state.outcome.exception() | ||||
except AttributeError: | |||||
# tenacity < 5.0 | |||||
exc = retry_state.exception() | |||||
with push_scope() as scope: | with push_scope() as scope: | ||||
scope.set_tag("operation", exc.operation) | scope.set_tag("operation", retry_state.fn.__name__) | ||||
scope.set_extra("obj_id", exc.obj_id) | scope.set_extra("obj_id", exc.obj_id) | ||||
capture_exception(exc.exc) | capture_exception(exc.exc) | ||||
logger.error( | error_context = { | ||||
"Failed operation %(operation)s on %(obj_id)s after %(retries)s" | |||||
" retries: %(exc)s", | |||||
{ | |||||
"obj_id": exc.obj_id, | "obj_id": exc.obj_id, | ||||
"operation": exc.operation, | "operation": retry_state.fn.__name__, | ||||
"exc": str(exc.exc), | "exc": str(exc.exc), | ||||
"retries": retry_state.attempt_number, | "retries": retry_state.attempt_number, | ||||
}, | } | ||||
logger.error( | |||||
"Failed operation %(operation)s on %(obj_id)s after %(retries)s" | |||||
" retries: %(exc)s", | |||||
error_context, | |||||
) | ) | ||||
return None | return None | ||||
CONTENT_REPLAY_RETRIES = 3 | CONTENT_REPLAY_RETRIES = 3 | ||||
class retry_log_if_success(retry_base): | |||||
"""Log in statsd the number of attempts required to succeed""" | |||||
def __call__(self, retry_state): | |||||
if not retry_state.outcome.failed: | |||||
statsd.increment( | |||||
CONTENT_RETRY_METRIC, | |||||
tags={ | |||||
"operation": retry_state.fn.__name__, | |||||
"attempt": str(retry_state.attempt_number), | |||||
}, | |||||
) | |||||
return False | |||||
content_replay_retry = retry( | content_replay_retry = retry( | ||||
retry=retry_if_exception_type(ReplayError), | retry=retry_if_exception_type(ReplayError) | retry_log_if_success(), | ||||
stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), | stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), | ||||
wait=wait_random_exponential(multiplier=1, max=60), | wait=wait_random_exponential(multiplier=1, max=60), | ||||
before_sleep=log_replay_retry, | before_sleep=log_replay_retry, | ||||
retry_error_callback=log_replay_error, | retry_error_callback=log_replay_error, | ||||
) | ) | ||||
@content_replay_retry | @content_replay_retry | ||||
def copy_object(obj_id, src, dst): | def get_object(objstorage, obj_id): | ||||
hex_obj_id = hash_to_hex(obj_id) | |||||
obj = "" | |||||
try: | try: | ||||
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): | with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): | ||||
obj = src.get(obj_id) | obj = objstorage.get(obj_id) | ||||
logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) | logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) | ||||
return obj | |||||
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): | |||||
dst.add(obj, obj_id=obj_id, check_presence=False) | |||||
logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) | |||||
statsd.increment(CONTENT_BYTES_METRIC, len(obj)) | |||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
logger.error( | logger.error( | ||||
"Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} | "Failed to retrieve %(obj_id)s: object not found", | ||||
{"obj_id": hash_to_hex(obj_id)}, | |||||
) | ) | ||||
raise | raise | ||||
except Exception as exc: | except Exception as exc: | ||||
raise ReplayError("copy", obj_id=obj_id, exc=exc) from None | raise ReplayError(obj_id=obj_id, exc=exc) from None | ||||
@content_replay_retry | |||||
def put_object(objstorage, obj_id, obj): | |||||
try: | |||||
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): | |||||
obj = objstorage.add(obj, obj_id, check_presence=False) | |||||
logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)}) | |||||
except Exception as exc: | |||||
raise ReplayError(obj_id=obj_id, exc=exc) from None | |||||
def copy_object(obj_id, src, dst): | |||||
obj = get_object(src, obj_id) | |||||
if obj is not None: | |||||
put_object(dst, obj_id, obj) | |||||
statsd.increment(CONTENT_BYTES_METRIC, len(obj)) | |||||
return len(obj) | return len(obj) | ||||
return 0 | |||||
@content_replay_retry | @content_replay_retry | ||||
def obj_in_objstorage(obj_id, dst): | def obj_in_objstorage(obj_id, dst): | ||||
"""Check if an object is already in an objstorage, tenaciously""" | """Check if an object is already in an objstorage, tenaciously""" | ||||
try: | try: | ||||
return obj_id in dst | return obj_id in dst | ||||
except Exception as exc: | except Exception as exc: | ||||
raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None | raise ReplayError(obj_id=obj_id, exc=exc) from None | ||||
def process_replay_objects_content( | def process_replay_objects_content( | ||||
all_objects: Dict[str, List[dict]], | all_objects: Dict[str, List[dict]], | ||||
*, | *, | ||||
src: ObjStorage, | src: ObjStorage, | ||||
dst: ObjStorage, | dst: ObjStorage, | ||||
exclude_fn: Optional[Callable[[dict], bool]] = None, | exclude_fn: Optional[Callable[[dict], bool]] = None, | ||||
▲ Show 20 Lines • Show All 114 Lines • Show Last 20 Lines |