Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py
index 2bdec24..d71d8ff 100644
--- a/swh/objstorage/replayer/replay.py
+++ b/swh/objstorage/replayer/replay.py
@@ -1,331 +1,331 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from time import time
from typing import Callable, Dict, List, Optional
import msgpack
from sentry_sdk import capture_exception, push_scope
try:
from systemd.daemon import notify
except ImportError:
notify = None
from tenacity import (
retry,
- retry_base,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)
+from tenacity.retry import retry_base
from swh.core.statsd import statsd
from swh.model.hashutil import hash_to_hex
from swh.model.model import SHA1_SIZE
from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage
logger = logging.getLogger(__name__)
REPORTER = None
CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total"
CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total"
CONTENT_BYTES_METRIC = "swh_content_replayer_bytes"
CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds"
def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE):
"""
Checks if the given hash is in the provided `array`. The array must be
a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes
(so its size must by `nb_hashes*hash_size` bytes).
Args:
hash_ (bytes): the hash to look for
array (bytes): a sorted concatenated array of hashes (may be of
any type supporting slice indexing, eg. :class:`mmap.mmap`)
nb_hashes (int): number of hashes in the array
hash_size (int): size of a hash (defaults to 20, for SHA1)
Example:
>>> import os
>>> hash1 = os.urandom(20)
>>> hash2 = os.urandom(20)
>>> hash3 = os.urandom(20)
>>> array = b''.join(sorted([hash1, hash2]))
>>> is_hash_in_bytearray(hash1, array, 2)
True
>>> is_hash_in_bytearray(hash2, array, 2)
True
>>> is_hash_in_bytearray(hash3, array, 2)
False
"""
if len(hash_) != hash_size:
raise ValueError("hash_ does not match the provided hash_size.")
def get_hash(position):
return array[position * hash_size : (position + 1) * hash_size]
# Regular dichotomy:
left = 0
right = nb_hashes
while left < right - 1:
middle = int((right + left) / 2)
pivot = get_hash(middle)
if pivot == hash_:
return True
elif pivot < hash_:
left = middle
else:
right = middle
return get_hash(left) == hash_
class ReplayError(Exception):
"""An error occurred during the replay of an object"""
def __init__(self, *, obj_id, exc):
self.obj_id = hash_to_hex(obj_id)
self.exc = exc
def __str__(self):
return "ReplayError(%s, %s)" % (self.obj_id, self.exc)
def log_replay_retry(retry_state, sleep=None, last_result=None):
"""Log a retry of the content replayer"""
exc = retry_state.outcome.exception()
operation = retry_state.fn.__name__
logger.debug(
"Retry operation %(operation)s on %(obj_id)s: %(exc)s",
{"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)},
)
def log_replay_error(retry_state):
"""Log a replay error to sentry"""
exc = retry_state.outcome.exception()
with push_scope() as scope:
scope.set_tag("operation", retry_state.fn.__name__)
scope.set_extra("obj_id", exc.obj_id)
capture_exception(exc.exc)
error_context = {
"obj_id": exc.obj_id,
"operation": retry_state.fn.__name__,
"exc": str(exc.exc),
"retries": retry_state.attempt_number,
}
logger.error(
"Failed operation %(operation)s on %(obj_id)s after %(retries)s"
" retries: %(exc)s",
error_context,
)
# if we have a global error (redis) reporter
if REPORTER is not None:
oid = f"blob:{exc.obj_id}"
msg = msgpack.dumps(error_context)
REPORTER(oid, msg)
return None
CONTENT_REPLAY_RETRIES = 3
class retry_log_if_success(retry_base):
"""Log in statsd the number of attempts required to succeed"""
def __call__(self, retry_state):
if not retry_state.outcome.failed:
statsd.increment(
CONTENT_RETRY_METRIC,
tags={
"operation": retry_state.fn.__name__,
"attempt": str(retry_state.attempt_number),
},
)
return False
content_replay_retry = retry(
retry=retry_if_exception_type(ReplayError) | retry_log_if_success(),
stop=stop_after_attempt(CONTENT_REPLAY_RETRIES),
wait=wait_random_exponential(multiplier=1, max=60),
before_sleep=log_replay_retry,
retry_error_callback=log_replay_error,
)
@content_replay_retry
def get_object(objstorage, obj_id):
try:
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}):
obj = objstorage.get(obj_id)
logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)})
return obj
except ObjNotFoundError:
logger.error(
"Failed to retrieve %(obj_id)s: object not found",
{"obj_id": hash_to_hex(obj_id)},
)
raise
except Exception as exc:
raise ReplayError(obj_id=obj_id, exc=exc) from None
@content_replay_retry
def put_object(objstorage, obj_id, obj):
try:
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}):
obj = objstorage.add(obj, obj_id, check_presence=False)
logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)})
except Exception as exc:
raise ReplayError(obj_id=obj_id, exc=exc) from None
def copy_object(obj_id, src, dst):
obj = get_object(src, obj_id)
if obj is not None:
put_object(dst, obj_id, obj)
statsd.increment(CONTENT_BYTES_METRIC, len(obj))
return len(obj)
return 0
@content_replay_retry
def obj_in_objstorage(obj_id, dst):
"""Check if an object is already in an objstorage, tenaciously"""
try:
return obj_id in dst
except Exception as exc:
raise ReplayError(obj_id=obj_id, exc=exc) from None
def process_replay_objects_content(
all_objects: Dict[str, List[dict]],
*,
src: ObjStorage,
dst: ObjStorage,
exclude_fn: Optional[Callable[[dict], bool]] = None,
check_dst: bool = True,
):
"""
Takes a list of records from Kafka (see
:py:func:`swh.journal.client.JournalClient.process`) and copies them
from the `src` objstorage to the `dst` objstorage, if:
* `obj['status']` is `'visible'`
* `exclude_fn(obj)` is `False` (if `exclude_fn` is provided)
* `obj['sha1'] not in dst` (if `check_dst` is True)
Args:
all_objects: Objects passed by the Kafka client. Most importantly,
`all_objects['content'][*]['sha1']` is the sha1 hash of each
content.
src: An object storage (see :py:func:`swh.objstorage.get_objstorage`)
dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`)
exclude_fn: Determines whether an object should be copied.
check_dst: Determines whether we should check the destination
objstorage before copying.
Example:
>>> from swh.objstorage.factory import get_objstorage
>>> src = get_objstorage('memory')
>>> dst = get_objstorage('memory')
>>> id1 = src.add(b'foo bar')
>>> id2 = src.add(b'baz qux')
>>> kafka_partitions = {
... 'content': [
... {
... 'sha1': id1,
... 'status': 'visible',
... },
... {
... 'sha1': id2,
... 'status': 'visible',
... },
... ]
... }
>>> process_replay_objects_content(
... kafka_partitions, src=src, dst=dst,
... exclude_fn=lambda obj: obj['sha1'] == id1)
>>> id1 in dst
False
>>> id2 in dst
True
"""
vol = []
nb_skipped = 0
nb_failures = 0
t0 = time()
for (object_type, objects) in all_objects.items():
if object_type != "content":
logger.warning(
"Received a series of %s, this should not happen", object_type
)
continue
for obj in objects:
obj_id = obj[ID_HASH_ALGO]
if obj["status"] != "visible":
nb_skipped += 1
logger.debug(
"skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]
)
statsd.increment(
CONTENT_OPERATIONS_METRIC,
tags={"decision": "skipped", "status": obj["status"]},
)
elif exclude_fn and exclude_fn(obj):
nb_skipped += 1
logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id))
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}
)
elif check_dst and obj_in_objstorage(obj_id, dst):
nb_skipped += 1
logger.debug("skipped %s (in dst)", hash_to_hex(obj_id))
statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"})
else:
try:
copied = copy_object(obj_id, src, dst)
except ObjNotFoundError:
nb_skipped += 1
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}
)
else:
if copied is None:
nb_failures += 1
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}
)
else:
vol.append(copied)
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"}
)
dt = time() - t0
logger.info(
"processed %s content objects in %.1fsec "
"(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped",
len(vol),
dt,
len(vol) / dt,
sum(vol) / 1024 / 1024 / dt,
nb_failures,
nb_skipped,
)
if notify:
notify("WATCHDOG=1")

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 11:01 AM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3259471

Event Timeline