Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9696539
D6692.id24429.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Subscribers
None
D6692.id24429.diff
View Options
diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py
--- a/swh/objstorage/replayer/replay.py
+++ b/swh/objstorage/replayer/replay.py
@@ -16,6 +16,7 @@
from tenacity import (
retry,
+ retry_base,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
@@ -85,58 +86,44 @@
class ReplayError(Exception):
"""An error occurred during the replay of an object"""
- def __init__(self, operation, *, obj_id, exc):
- self.operation = operation
+ def __init__(self, *, obj_id, exc):
self.obj_id = hash_to_hex(obj_id)
self.exc = exc
def __str__(self):
- return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc)
+ return "ReplayError(%s, %s)" % (self.obj_id, self.exc)
def log_replay_retry(retry_state, sleep=None, last_result=None):
"""Log a retry of the content replayer"""
- try:
- exc = retry_state.outcome.exception()
- attempt_number = retry_state.attempt_number
- except AttributeError:
- # tenacity < 5.0
- exc = last_result.exception()
- attempt_number = retry_state.statistics["attempt_number"]
-
+ exc = retry_state.outcome.exception()
+ operation = retry_state.fn.__name__
logger.debug(
"Retry operation %(operation)s on %(obj_id)s: %(exc)s",
- {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)},
- )
-
- statsd.increment(
- CONTENT_RETRY_METRIC,
- tags={"operation": exc.operation, "attempt": str(attempt_number),},
+ {"operation": operation, "obj_id": exc.obj_id, "exc": str(exc.exc)},
)
def log_replay_error(retry_state):
"""Log a replay error to sentry"""
- try:
- exc = retry_state.outcome.exception()
- except AttributeError:
- # tenacity < 5.0
- exc = retry_state.exception()
+ exc = retry_state.outcome.exception()
with push_scope() as scope:
- scope.set_tag("operation", exc.operation)
+ scope.set_tag("operation", retry_state.fn.__name__)
scope.set_extra("obj_id", exc.obj_id)
capture_exception(exc.exc)
+ error_context = {
+ "obj_id": exc.obj_id,
+ "operation": retry_state.fn.__name__,
+ "exc": str(exc.exc),
+ "retries": retry_state.attempt_number,
+ }
+
logger.error(
"Failed operation %(operation)s on %(obj_id)s after %(retries)s"
" retries: %(exc)s",
- {
- "obj_id": exc.obj_id,
- "operation": exc.operation,
- "exc": str(exc.exc),
- "retries": retry_state.attempt_number,
- },
+ error_context,
)
return None
@@ -144,8 +131,24 @@
CONTENT_REPLAY_RETRIES = 3
+
+class retry_log_if_success(retry_base):
+ """Log in statsd the number of attempts required to succeed"""
+
+ def __call__(self, retry_state):
+ if not retry_state.outcome.failed:
+ statsd.increment(
+ CONTENT_RETRY_METRIC,
+ tags={
+ "operation": retry_state.fn.__name__,
+ "attempt": str(retry_state.attempt_number),
+ },
+ )
+ return False
+
+
content_replay_retry = retry(
- retry=retry_if_exception_type(ReplayError),
+ retry=retry_if_exception_type(ReplayError) | retry_log_if_success(),
stop=stop_after_attempt(CONTENT_REPLAY_RETRIES),
wait=wait_random_exponential(multiplier=1, max=60),
before_sleep=log_replay_retry,
@@ -154,26 +157,39 @@
@content_replay_retry
-def copy_object(obj_id, src, dst):
- hex_obj_id = hash_to_hex(obj_id)
- obj = ""
+def get_object(objstorage, obj_id):
try:
with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}):
- obj = src.get(obj_id)
- logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id})
-
- with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}):
- dst.add(obj, obj_id=obj_id, check_presence=False)
- logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id})
- statsd.increment(CONTENT_BYTES_METRIC, len(obj))
+ obj = objstorage.get(obj_id)
+ logger.debug("retrieved %(obj_id)s", {"obj_id": hash_to_hex(obj_id)})
+ return obj
except ObjNotFoundError:
logger.error(
- "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id}
+ "Failed to retrieve %(obj_id)s: object not found",
+ {"obj_id": hash_to_hex(obj_id)},
)
raise
except Exception as exc:
- raise ReplayError("copy", obj_id=obj_id, exc=exc) from None
- return len(obj)
+ raise ReplayError(obj_id=obj_id, exc=exc) from None
+
+
+@content_replay_retry
+def put_object(objstorage, obj_id, obj):
+ try:
+ with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}):
+ obj = objstorage.add(obj, obj_id, check_presence=False)
+ logger.debug("stored %(obj_id)s", {"obj_id": hash_to_hex(obj_id)})
+ except Exception as exc:
+ raise ReplayError(obj_id=obj_id, exc=exc) from None
+
+
+def copy_object(obj_id, src, dst):
+ obj = get_object(src, obj_id)
+ if obj is not None:
+ put_object(dst, obj_id, obj)
+ statsd.increment(CONTENT_BYTES_METRIC, len(obj))
+ return len(obj)
+ return 0
@content_replay_retry
@@ -182,7 +198,7 @@
try:
return obj_id in dst
except Exception as exc:
- raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None
+ raise ReplayError(obj_id=obj_id, exc=exc) from None
def process_replay_objects_content(
diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py
--- a/swh/objstorage/replayer/tests/test_cli.py
+++ b/swh/objstorage/replayer/tests/test_cli.py
@@ -35,9 +35,10 @@
@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
- from swh.objstorage.replayer.replay import copy_object, obj_in_objstorage
+ from swh.objstorage.replayer.replay import get_object, obj_in_objstorage, put_object
- monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
+ monkeypatch.setattr(get_object.retry, "sleep", lambda x: None)
+ monkeypatch.setattr(put_object.retry, "sleep", lambda x: None)
monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
@@ -120,6 +121,7 @@
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
):
+ """Check the content replayer in normal conditions"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
@@ -153,6 +155,7 @@
kafka_server: Tuple[Popen, int],
caplog,
):
+ """Check the logs produced by the content replayer in normal conditions"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
@@ -179,7 +182,7 @@
copied = set()
for record in caplog.records:
logtext = record.getMessage()
- if "copied" in logtext:
+ if "stored" in logtext:
copied.add(record.args["obj_id"])
assert (
@@ -195,6 +198,10 @@
kafka_server: Tuple[Popen, int],
caplog,
):
+ """Check the content replayer in normal conditions
+
+ with KAFKA_GROUP_INSTANCE_ID set
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
@@ -244,6 +251,10 @@
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
):
+ """Check the content replayer in normal conditions
+
+ with a exclusion file (--exclude-sha1-file)
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
@@ -300,11 +311,19 @@
expected_in_dst: int,
caplog,
):
+ """Check the content replayer in normal conditions
+
+ with some objects already in the dst objstorage.
+
+ When check_dst is True, expect those not to be neither retrieved from the
+ src objstorage nor pushed in the dst objstorage.
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
+ # add some objects in the dst objstorage
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
@@ -327,17 +346,22 @@
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
- copied = 0
+ retrieved = 0
+ stored = 0
in_dst = 0
for record in caplog.records:
logtext = record.getMessage()
- if "copied" in logtext:
- copied += 1
+ if "retrieved" in logtext:
+ retrieved += 1
+ elif "stored" in logtext:
+ stored += 1
elif "in dst" in logtext:
in_dst += 1
assert (
- copied == expected_copied and in_dst == expected_in_dst
+ retrieved == expected_copied
+ and stored == expected_copied
+ and in_dst == expected_in_dst
), "Unexpected amount of objects copied, see the captured log for details"
for (sha1, content) in contents.items():
@@ -346,6 +370,20 @@
class FlakyObjStorage(InMemoryObjStorage):
+ """Flaky objstorage
+
+ Any 'get', 'add' or 'in' (i.e. '__contains__()') operation will fail
+ according to configured 'failures'.
+
+ 'failures' is expected to be a dict which keys are couples (operation,
+ obj_id) and values are the number of time the operation 'operation' is
+ expected to fail for object 'obj_id' before being performed successfully.
+
+ An optional state ('state') can be also given as argument (see
+ InMemoryObjStorage).
+
+ """
+
def __init__(self, *args, **kwargs):
state = kwargs.pop("state")
self.failures_left = Counter(kwargs.pop("failures"))
@@ -378,27 +416,33 @@
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
monkeypatch_retry_sleep,
+ caplog,
):
+ """Check the content replayer with a flaky dst objstorage
+ for 'in' operations.
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
+
+ # build a flaky dst objstorage in which the 'in' operation for the first
+ # NUM_CONTENT_DST objects will fail once
failures = {}
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
-
objstorages["dst"].add(content, obj_id=sha1)
failures["in", sha1] = 1
-
orig_dst = objstorages["dst"]
objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures)
+ caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay")
result = invoke(
"replay",
+ "--check-dst",
"--stop-after-objects",
str(NUM_CONTENTS),
- "--check-dst",
journal_config={
"brokers": kafka_server,
"group_id": kafka_consumer_group,
@@ -409,6 +453,17 @@
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+ # check that exactly NUM_CONTENTS_DST 'in' operations have failed once
+ failed_in = 0
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if "Retry operation obj_in_objstorage" in logtext:
+ failed_in += 1
+ elif "Retry operation" in logtext:
+ assert False, "No other failure expected than 'in' operations"
+ assert failed_in == NUM_CONTENTS_DST
+
+ # in the end, the replay process should be OK
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@@ -423,7 +478,10 @@
caplog,
monkeypatch_retry_sleep,
):
+ """Check the content replayer with a flaky src and dst objstorages
+ for 'get' and 'add' operations.
+ """
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
@@ -459,6 +517,9 @@
if num_failures >= CONTENT_REPLAY_RETRIES:
definitely_failed.add(hash_to_hex(sha1))
+ assert add_failures
+ assert get_failures
+ assert definitely_failed
objstorages["dst"] = FlakyObjStorage(
state=objstorages["dst"].state, failures=add_failures,
)
@@ -486,7 +547,7 @@
actually_failed = set()
for record in caplog.records:
logtext = record.getMessage()
- if "copied" in logtext:
+ if "stored" in logtext:
copied += 1
elif "Failed operation" in logtext:
assert record.levelno == logging.ERROR
@@ -514,18 +575,18 @@
kafka_server: Tuple[Popen, int],
caplog,
):
+ """Check the ContentNotFound is not considered a failure to retry"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
+ # delete a few objects from the src objstorage
num_contents_deleted = 5
contents_deleted = set()
-
for i, sha1 in enumerate(contents):
if i >= num_contents_deleted:
break
-
del objstorages["src"].state[sha1]
contents_deleted.add(hash_to_hex(sha1))
@@ -549,12 +610,14 @@
not_in_src = set()
for record in caplog.records:
logtext = record.getMessage()
- if "copied" in logtext:
+ if "stored" in logtext:
copied += 1
elif "object not found" in logtext:
# Check that the object id can be recovered from logs
assert record.levelno == logging.ERROR
not_in_src.add(record.args["obj_id"])
+ elif "Retry operation" in logtext:
+ assert False, "Not found objects should not be retried"
assert (
copied == NUM_CONTENTS - num_contents_deleted
diff --git a/swh/objstorage/replayer/tests/test_statsd.py b/swh/objstorage/replayer/tests/test_statsd.py
--- a/swh/objstorage/replayer/tests/test_statsd.py
+++ b/swh/objstorage/replayer/tests/test_statsd.py
@@ -80,14 +80,20 @@
# of each statsd message.
prefix = "swh_content_replayer"
expected_report_re = (
+ f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:obj_in_objstorage$",
+ f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:get_object$",
+ f"^{prefix}_retries_total:1[|]c[|]#attempt:1,operation:put_object$",
f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:get$",
f"^{prefix}_duration_seconds:[0-9]+[.][0-9]+[|]ms[|]#request:put$",
f"^{prefix}_bytes:4[|]c$",
)
expected_reports = (
- 2,
- 2,
- 2,
+ 4, # 2 for the copied objects + 2 for the in_dst ones
+ 2, # copied objects
+ 2, # "
+ 2, # "
+ 2, # "
+ 2, # "
)
decisions = ("copied", "skipped", "excluded", "in_dst", "not_in_src", "failed")
decision_re = (
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 17, 8:32 PM (1 h, 50 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227426
Attached To
D6692: Rework the retry and reporting system in replay.py
Event Timeline
Log In to Comment