Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7122780
D8939.id32292.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
D8939.id32292.diff
View Options
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,6 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
+humanize
redis
+tenacity
diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py
--- a/swh/objstorage/replayer/replay.py
+++ b/swh/objstorage/replayer/replay.py
@@ -1,14 +1,16 @@
-# Copyright (C) 2019-2020 The Software Heritage developers
+# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait as futures_wait
import logging
from time import time
+from traceback import format_tb
from typing import Callable, Dict, List, Optional
+from humanize import naturaldelta, naturalsize
import msgpack
from sentry_sdk import capture_exception, push_scope
@@ -28,7 +30,9 @@
from swh.core.statsd import statsd
from swh.model.hashutil import hash_to_hex
from swh.model.model import SHA1_SIZE
-from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage
+from swh.objstorage.constants import ID_HASH_ALGO
+from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.objstorage import ObjStorage
logger = logging.getLogger(__name__)
REPORTER = None
@@ -95,7 +99,11 @@
self.exc = exc
def __str__(self):
- return "ReplayError(%s, %s)" % (self.obj_id, self.exc)
+ return "ReplayError(%s, %r, %s)" % (
+ self.obj_id,
+ self.exc,
+ format_tb(self.exc.__traceback__),
+ )
def log_replay_retry(retry_state, sleep=None, last_result=None):
@@ -120,13 +128,13 @@
error_context = {
"obj_id": exc.obj_id,
"operation": retry_state.fn.__name__,
- "exc": str(exc.exc),
+ "exc": str(exc),
"retries": retry_state.attempt_number,
}
logger.error(
"Failed operation %(operation)s on %(obj_id)s after %(retries)s"
- " retries: %(exc)s",
+ " retries; last exception: %(exc)s",
error_context,
)
@@ -136,7 +144,7 @@
msg = msgpack.dumps(error_context)
REPORTER(oid, msg)
- return None
+ raise exc
CONTENT_REPLAY_RETRIES = 3
@@ -271,50 +279,54 @@
>>> id2 in dst
True
"""
- vol = []
- nb_skipped = 0
- nb_failures = 0
- t0 = time()
def _copy_object(obj):
- nonlocal nb_skipped
- nonlocal nb_failures
obj_id = obj[ID_HASH_ALGO]
if obj["status"] != "visible":
- nb_skipped += 1
logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"])
statsd.increment(
CONTENT_OPERATIONS_METRIC,
tags={"decision": "skipped", "status": obj["status"]},
)
+ return "skipped", None
elif exclude_fn and exclude_fn(obj):
- nb_skipped += 1
logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id))
statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"})
+ return "excluded", None
elif check_dst and obj_in_objstorage(obj_id, dst):
- nb_skipped += 1
logger.debug("skipped %s (in dst)", hash_to_hex(obj_id))
statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"})
+ return "in_dst", None
else:
try:
- copied = copy_object(obj_id, src, dst)
+ copied_bytes = copy_object(obj_id, src, dst)
except ObjNotFoundError:
- nb_skipped += 1
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}
)
+ return "not_found", None
+ except Exception:
+ statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"})
+ return "failed", None
else:
- if copied is None:
- nb_failures += 1
+ if copied_bytes is None:
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}
)
+ return "failed", None
else:
- vol.append(copied)
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"}
)
+ return "copied", copied_bytes
+ return "failed", None
+
+ vol = 0
+ stats = dict.fromkeys(
+ ["skipped", "excluded", "not_found", "failed", "copied", "in_dst"], 0
+ )
+ t0 = time()
with ThreadPoolExecutor(max_workers=concurrency) as pool:
futures = []
@@ -327,26 +339,33 @@
for obj in objects:
futures.append(pool.submit(_copy_object, obj=obj))
- futures_wait(futures, return_when=FIRST_EXCEPTION)
+ futures_wait(futures)
for f in futures:
- if f.running():
- continue
exc = f.exception()
if exc:
- pool.shutdown(wait=False)
- f.result()
+ # XXX this should not happen, so it is probably wrong...
raise exc
+ else:
+ state, nbytes = f.result()
+ if nbytes is not None:
+ vol += nbytes
+ stats[state] += 1
dt = time() - t0
logger.info(
- "processed %s content objects in %.1fsec "
- "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped",
- len(vol),
- dt,
- len(vol) / dt,
- sum(vol) / 1024 / 1024 / dt,
- nb_failures,
- nb_skipped,
+ "processed %s content objects (%s) in %s "
+ "(%.1f obj/sec, %s/sec) - %d copied - %d skipped - %d excluded - "
+ "%d not found - %d failed",
+ len(futures),
+ naturalsize(vol),
+ naturaldelta(dt),
+ len(futures) / dt,
+ naturalsize(vol / dt),
+ stats["copied"],
+ stats["skipped"],
+ stats["excluded"],
+ stats["not_found"],
+ stats["failed"],
)
if notify:
diff --git a/swh/objstorage/replayer/tests/test_replay_errors.py b/swh/objstorage/replayer/tests/test_replay_errors.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/tests/test_replay_errors.py
@@ -0,0 +1,207 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import defaultdict
+import functools
+from queue import Queue
+
+from swh.journal.client import JournalClient
+from swh.journal.writer import get_journal_writer
+from swh.model.model import Content
+from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.factory import get_objstorage
+from swh.objstorage.multiplexer.filter.filter import ObjStorageFilter
+from swh.objstorage.replayer import replay
+from swh.objstorage.replayer.replay import copy_object # needed for MonkeyPatch
+
+CONTENTS = [Content.from_data(f"foo{i}".encode()) for i in range(10)] + [
+ Content.from_data(f"forbidden foo{i}".encode(), status="hidden") for i in range(10)
+]
+
+
+class FailingObjstorage(ObjStorageFilter):
+ def __init__(self, storage):
+ super().__init__(storage)
+ self.calls = defaultdict(lambda: 0)
+ self.rate = 3
+
+ def get(self, obj_id, *args, **kwargs):
+ self.calls[obj_id] += 1
+ if (self.calls[obj_id] % self.rate) == 0:
+ return self.storage.get(obj_id, *args, **kwargs)
+ raise Exception("Nope")
+
+
+class NotFoundObjstorage(ObjStorageFilter):
+ def get(self, obj_id, *args, **kwargs):
+ raise ObjNotFoundError(obj_id)
+
+
+def prepare_test(kafka_server, kafka_prefix, kafka_consumer_group):
+ src_objstorage = get_objstorage(cls="memory")
+
+ writer = get_journal_writer(
+ cls="kafka",
+ brokers=[kafka_server],
+ client_id="kafka_writer",
+ prefix=kafka_prefix,
+ anonymize=False,
+ )
+
+ for content in CONTENTS:
+ src_objstorage.add(content.data, obj_id=content.sha1)
+ writer.write_addition("content", content)
+
+ replayer = JournalClient(
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_on_eof=True,
+ )
+
+ return replayer, src_objstorage
+
+
+def copy_object_q(q):
+ """Wrap the original copy_object function to capture (thread-local) tenacity
+ stats and puch them in a queue suitable for checking in a test session"""
+
+ def wrap(obj_id, src, dst):
+ try:
+ ret = copy_object(obj_id, src, dst)
+ return ret
+ finally:
+ q.put(("get", obj_id, replay.get_object.retry.statistics.copy()))
+ q.put(("put", obj_id, replay.put_object.retry.statistics.copy()))
+
+ return wrap
+
+
+def test_replay_content_with_transient_errors(
+ kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch
+):
+ replayer, src_objstorage = prepare_test(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ )
+ dst_objstorage = get_objstorage(cls="memory")
+ src_objstorage = FailingObjstorage(src_objstorage)
+
+ q = Queue()
+ monkeypatch.setattr(replay, "copy_object", copy_object_q(q))
+
+ worker_fn = functools.partial(
+ replay.process_replay_objects_content,
+ src=src_objstorage,
+ dst=dst_objstorage,
+ )
+ replayer.process(worker_fn)
+
+ # only content with status visible will be copied in storage2
+ expected_objstorage_state = {
+ c.sha1: c.data for c in CONTENTS if c.status == "visible"
+ }
+ assert expected_objstorage_state == dst_objstorage.state
+
+ stats = [q.get_nowait() for i in range(q.qsize())]
+ for obj_id in expected_objstorage_state:
+ put = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put"
+ )
+ assert put.get("attempt_number") == 1
+ assert put.get("start_time") > 0
+ assert put.get("idle_for") == 0
+
+ get = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get"
+ )
+ assert get.get("attempt_number") == 3
+ assert get.get("start_time") > 0
+ assert get.get("idle_for") > 0
+ assert get.get("delay_since_first_attempt") > 0
+
+
+def test_replay_content_with_errors(
+ kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch
+):
+ replayer, src_objstorage = prepare_test(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ )
+ dst_objstorage = get_objstorage(cls="memory")
+ src_objstorage = FailingObjstorage(src_objstorage)
+
+ q = Queue()
+ monkeypatch.setattr(replay, "copy_object", copy_object_q(q))
+ monkeypatch.setattr(replay.get_object.retry.stop, "max_attempt_number", 2)
+
+ worker_fn = functools.partial(
+ replay.process_replay_objects_content,
+ src=src_objstorage,
+ dst=dst_objstorage,
+ )
+ replayer.process(worker_fn)
+
+ # no object could be replicated
+ assert dst_objstorage.state == {}
+
+ stats = [q.get_nowait() for i in range(q.qsize())]
+ for obj in CONTENTS:
+ if obj.status != "visible":
+ continue
+
+ obj_id = obj.sha1
+ put = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put"
+ )
+ assert put == {}
+
+ get = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get"
+ )
+ assert get.get("attempt_number") == 2
+ assert get.get("start_time") > 0
+ assert get.get("idle_for") > 0
+ assert get.get("delay_since_first_attempt") > 0
+
+
+def test_replay_content_not_found(
+ kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch
+):
+ replayer, src_objstorage = prepare_test(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ )
+ dst_objstorage = get_objstorage(cls="memory")
+ src_objstorage = NotFoundObjstorage(src_objstorage)
+
+ q = Queue()
+ monkeypatch.setattr(replay, "copy_object", copy_object_q(q))
+
+ worker_fn = functools.partial(
+ replay.process_replay_objects_content,
+ src=src_objstorage,
+ dst=dst_objstorage,
+ )
+ replayer.process(worker_fn)
+
+ # no object could be replicated
+ assert dst_objstorage.state == {}
+
+ stats = [q.get_nowait() for i in range(q.qsize())]
+ for obj in CONTENTS:
+ if obj.status != "visible":
+ continue
+
+ obj_id = obj.sha1
+ put = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put"
+ )
+ assert put == {}
+
+ get = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get"
+ )
+ # ObjectNotFound should not be retried several times...
+ assert get.get("attempt_number") == 1
+ assert get.get("start_time") > 0
+ assert get.get("idle_for") == 0
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Dec 16, 11:52 PM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214899
Attached To
D8939: Rework the replaying exception handling
Event Timeline
Log In to Comment