Page MenuHomeSoftware Heritage

D6692.id24429.diff
No OneTemporary

D6692.id24429.diff

diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py
--- a/swh/objstorage/replayer/replay.py
+++ b/swh/objstorage/replayer/replay.py
@@ -16,6 +16,7 @@
from tenacity import (
retry,
+ retry_base,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
@@ -85,58 +86,44 @@
class ReplayError(Exception):
"""An error occurred during the replay of an object"""
- def __init__(self, operation, *, obj_id, exc):
- self.operation = operation
+ def __init__(self, *, obj_id, exc):
self.obj_id = hash_to_hex(obj_id)
self.exc = exc
def __str__(self):
- return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc)
+ return "ReplayError(%s, %s)" % (self.obj_id, self.exc)
def log_replay_retry(retry_state, sleep=None, last_result=None):
"""Log a retry of the content replayer"""
- try:
- exc = retry_state.outcome.exception()
- attempt_number = retry_state.attempt_number
- except AttributeError:
- # tenacity < 5.0
- exc = last_result.exception()
- attempt_number = retry_state.statistics["attempt_number"]
-
+ exc = retry_state.outcome.exception()
+ operation = retry_state.fn.__name__
logger.debug(
"Retry operation %(operation)s on %(obj_id)s: %(exc)s",
- {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)},
- )
-
- statsd.increment(
- CONTENT_RETRY_METRIC,
- tags={"operation": exc.operation, "attempt": str(attempt_number),},
+ {"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)},
)
def log_replay_error(retry_state):
"""Log a replay error to sentry"""
- try:
- exc = retry_state.outcome.exception()
- except AttributeError:
- # tenacity < 5.0
- exc = retry_state.exception()
+ exc = retry_state.outcome.exception()
with push_scope() as scope:
- scope.set_tag("operation", exc.operation)
+ scope.set_tag("operation", retry_state.fn.__name__)
scope.set_extra("obj_id", exc.obj_id)
capture_exception(exc.exc)
+ error_context = {
+ "obj_id": exc.obj_id,
+ "operation": retry_state.fn.__name__,
+ "exc": str(exc.exc),
+ "retries": retry_state.attempt_number,
+ }
+
logger.error(
"Failed operation %(operation)s on %(obj_id)s after %(retries)s"
" retries: %(exc)s",
- {
- "obj_id": exc.obj_id,
- "operation": exc.operation,
- "exc": str(exc.exc),
- "retries": retry_state.attempt_number,
- },
+ error_context,
)
return None
@@ -144,8 +131,24 @@
CONTENT_REPLAY_RETRIES = 3
+
+class retry_log_if_success(retry_base):
+ """Log in statsd the number of attempts required to succeed"""
+
+ def __call__(self, retry_state):
+ if not retry_state.outcome.failed:
+ statsd.increment(
+ CONTENT_RETRY_METRIC,
+ tags={
+ "operation": retry_state.fn.__name__,
+ "attempt": str(retry_state.attempt_number),
+ },
+ )
+ return False
+
+
content_replay_retry = retry(
- retry=retry_if_exception_type(ReplayError),
+ retry=retry_if_exception_type(ReplayError) | retry_log_if_success(),
stop=stop_after_attempt(CONTENT_REPLAY_RETRIES),
wait=wait_random_exponential(multiplier=1, max=60),
before_sleep=log_replay_retry,
@@ -154,26 +157,39 @@
@content_replay_retry
-def copy_object(obj_id, src, dst):
- hex_obj_id = hash_to_hex(obj_id)
- obj = ""
+def get_object(objstorage, obj_id):
try:
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}):
- obj = src.get(obj_id)
- logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id})
-
- with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}):
- dst.add(obj, obj_id=obj_id, check_presence=False)
- logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id})
- statsd.increment(CONTENT_BYTES_METRIC, len(obj))
+ obj = objstorage.get(obj_id)
+ logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)})
+ return obj
except ObjNotFoundError:
logger.error(
- "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id}
+ "Failed to retrieve %(obj_id)s: object not found",
+ {"obj_id": hash_to_hex(obj_id)},
)
raise
except Exception as exc:
- raise ReplayError("copy", obj_id=obj_id, exc=exc) from None
- return len(obj)
+ raise ReplayError(obj_id=obj_id, exc=exc) from None
+
+
+@content_replay_retry
+def put_object(objstorage, obj_id, obj):
+ try:
+ with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}):
+ obj = objstorage.add(obj, obj_id, check_presence=False)
+ logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)})
+ except Exception as exc:
+ raise ReplayError(obj_id=obj_id, exc=exc) from None
+
+
+def copy_object(obj_id, src, dst):
+ obj = get_object(src, obj_id)
+ if obj is not None:
+ put_object(dst, obj_id, obj)
+ statsd.increment(CONTENT_BYTES_METRIC, len(obj))
+ return len(obj)
+ return 0
@content_replay_retry
@@ -182,7 +198,7 @@
try:
return obj_id in dst
except Exception as exc:
- raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None
+ raise ReplayError(obj_id=obj_id, exc=exc) from None
def process_replay_objects_content(
diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py
--- a/swh/objstorage/replayer/tests/test_cli.py
+++ b/swh/objstorage/replayer/tests/test_cli.py
@@ -35,9 +35,10 @@
@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
- from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage
+ from swh.objstorage.replayer.replay import get_object, obj_in_objstorage, put_object
- monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
+ monkeypatch.setattr(get_object.retry, "sleep", lambda x: None)
+ monkeypatch.setattr(put_object.retry, "sleep", lambda x: None)
monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
@@ -120,6 +121,7 @@
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
):
+ """Check the content replayer in normal conditions"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
@@ -153,6 +155,7 @@
kafka_server: Tuple[Popen, int],
caplog,
):
+ """Check the logs produced by the content replayer in normal conditions"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
@@ -179,7 +182,7 @@
copied = set()
for record in caplog.records:
logtext = record.getMessage()
- if "copied" in logtext:
+ if "stored" in logtext:
copied.add(record.args["obj_id"])
assert (
@@ -195,6 +198,10 @@
kafka_server: Tuple[Popen, int],
caplog,
):
+ """Check the content replayer in normal conditions
+
+ with KAFKA_GROUP_INSTANCE_ID set
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
@@ -244,6 +251,10 @@
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
):
+ """Check the content replayer in normal conditions
+
+ with a exclusion file (--exclude-sha1-file)
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
@@ -300,11 +311,19 @@
expected_in_dst: int,
caplog,
):
+ """Check the content replayer in normal conditions
+
+ with some objects already in the dst objstorage.
+
+ When check_dst is True, expect those not to be neither retrieved from the
+ src objstorage nor pushed in the dst objstorage.
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
+ # add some objects in the dst objstorage
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
@@ -327,17 +346,22 @@
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
- copied = 0
+ retrieved = 0
+ stored = 0
in_dst = 0
for record in caplog.records:
logtext = record.getMessage()
- if "copied" in logtext:
- copied += 1
+ if "retrieved" in logtext:
+ retrieved += 1
+ elif "stored" in logtext:
+ stored += 1
elif "in dst" in logtext:
in_dst += 1
assert (
- copied == expected_copied and in_dst == expected_in_dst
+ retrieved == expected_copied
+ and stored == expected_copied
+ and in_dst == expected_in_dst
), "Unexpected amount of objects copied, see the captured log for details"
for (sha1, content) in contents.items():
@@ -346,6 +370,20 @@
class FlakyObjStorage(InMemoryObjStorage):
+ """Flaky objstorage
+
+ Any 'get', 'add' or 'in' (i.e. '__contains__()') operation will fail
+ according to configured 'failures'.
+
+ 'failures' is expected to be a dict which keys are couples (operation,
+ obj_id) and values are the number of time the operation 'operation' is
+ expected to fail for object 'obj_id' before being performed successfully.
+
+ An optional state ('state') can be also given as argument (see
+ InMemoryObjStorage).
+
+ """
+
def __init__(self, *args, **kwargs):
state = kwargs.pop("state")
self.failures_left = Counter(kwargs.pop("failures"))
@@ -378,27 +416,33 @@
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
monkeypatch_retry_sleep,
+ caplog,
):
+ """Check the content replayer with a flaky dst objstorage
+ for 'in' operations.
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
+
+ # build a flaky dst objstorage in which the 'in' operation for the first
+ # NUM_CONTENT_DST objects will fail once
failures = {}
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
-
objstorages["dst"].add(content, obj_id=sha1)
failures["in", sha1] = 1
-
orig_dst = objstorages["dst"]
objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures)
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
result = invoke(
"replay",
+ "--check-dst",
"--stop-after-objects",
str(NUM_CONTENTS),
- "--check-dst",
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
@@ -409,6 +453,17 @@
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+ # check that exactly NUM_CONTENTS_DST 'in' operations have failed once
+ failed_in = 0
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "Retry operation obj_in_objstorage" in logtext:
+ failed_in += 1
+ elif "Retry operation" in logtext:
+ assert False, "No other failure expected than 'in' operations"
+ assert failed_in == NUM_CONTENTS_DST
+
+ # in the end, the replay process should be OK
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@@ -423,7 +478,10 @@
caplog,
monkeypatch_retry_sleep,
):
+ """Check the content replayer with a flaky src and dst objstorages
+ for 'get' and 'add' operations.
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
@@ -459,6 +517,9 @@
if num_failures >= CONTENT_REPLAY_RETRIES:
definitely_failed.add(hash_to_hex(sha1))
+ assert add_failures
+ assert get_failures
+ assert definitely_failed
objstorages["dst"] = FlakyObjStorage(
state=objstorages["dst"].state, failures=add_failures,
)
@@ -486,7 +547,7 @@
actually_failed = set()
for record in caplog.records:
logtext = record.getMessage()
- if "copied" in logtext:
+ if "stored" in logtext:
copied += 1
elif "Failed operation" in logtext:
assert record.levelno == logging.ERROR
@@ -514,18 +575,18 @@
kafka_server: Tuple[Popen, int],
caplog,
):
+ """Check the ContentNotFound is not considered a failure to retry"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
+ # delete a few objects from the src objstorage
num_contents_deleted = 5
contents_deleted = set()
-
for i, sha1 in enumerate(contents):
if i >= num_contents_deleted:
break
-
del objstorages["src"].state[sha1]
contents_deleted.add(hash_to_hex(sha1))
@@ -549,12 +610,14 @@
not_in_src = set()
for record in caplog.records:
logtext = record.getMessage()
- if "copied" in logtext:
+ if "stored" in logtext:
copied += 1
elif "object not found" in logtext:
# Check that the object id can be recovered from logs
assert record.levelno == logging.ERROR
not_in_src.add(record.args["obj_id"])
+ elif "Retry operation" in logtext:
+ assert False, "Not found objects should not be retried"
assert (
copied == NUM_CONTENTS - num_contents_deleted
diff --git a/swh/objstorage/replayer/tests/test_statsd.py b/swh/objstorage/replayer/tests/test_statsd.py
--- a/swh/objstorage/replayer/tests/test_statsd.py
+++ b/swh/objstorage/replayer/tests/test_statsd.py
@@ -80,14 +80,20 @@
# of each statsd message.
prefix = "swh_content_replayer"
expected_report_re = (
+ f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:obj_in_objstorage$",
+ f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:get_object$",
+ f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:put_object$",
f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:get$",
f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:put$",
f"^{prefix}_bytes:4[|]c$",
)
expected_reports = (
- 2,
- 2,
- 2,
+ 4, # 2 for the copied objects + 2 for the in_dst ones
+ 2, # copied objects
+ 2, # "
+ 2, # "
+ 2, # "
+ 2, # "
)
decisions = ("copied", "skipped", "excluded", "in_dst", "not_in_src", "failed")
decision_re = (

File Metadata

Mime Type
text/plain
Expires
Sun, Aug 17, 8:32 PM (1 h, 50 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227426

Event Timeline