Page MenuHomeSoftware Heritage

D8939.id32292.diff
No OneTemporary

D8939.id32292.diff

diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,6 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
+humanize
redis
+tenacity
diff --git a/swh/objstorage/replayer/replay.py b/swh/objstorage/replayer/replay.py
--- a/swh/objstorage/replayer/replay.py
+++ b/swh/objstorage/replayer/replay.py
@@ -1,14 +1,16 @@
-# Copyright (C) 2019-2020 The Software Heritage developers
+# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait as futures_wait
import logging
from time import time
+from traceback import format_tb
from typing import Callable, Dict, List, Optional
+from humanize import naturaldelta, naturalsize
import msgpack
from sentry_sdk import capture_exception, push_scope
@@ -28,7 +30,9 @@
from swh.core.statsd import statsd
from swh.model.hashutil import hash_to_hex
from swh.model.model import SHA1_SIZE
-from swh.objstorage.objstorage import ID_HASH_ALGO, ObjNotFoundError, ObjStorage
+from swh.objstorage.constants import ID_HASH_ALGO
+from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.objstorage import ObjStorage
logger = logging.getLogger(__name__)
REPORTER = None
@@ -95,7 +99,11 @@
self.exc = exc
def __str__(self):
- return "ReplayError(%s, %s)" % (self.obj_id, self.exc)
+ return "ReplayError(%s, %r, %s)" % (
+ self.obj_id,
+ self.exc,
+ format_tb(self.exc.__traceback__),
+ )
def log_replay_retry(retry_state, sleep=None, last_result=None):
@@ -120,13 +128,13 @@
error_context = {
"obj_id": exc.obj_id,
"operation": retry_state.fn.__name__,
- "exc": str(exc.exc),
+ "exc": str(exc),
"retries": retry_state.attempt_number,
}
logger.error(
"Failed operation %(operation)s on %(obj_id)s after %(retries)s"
- " retries: %(exc)s",
+ " retries; last exception: %(exc)s",
error_context,
)
@@ -136,7 +144,7 @@
msg = msgpack.dumps(error_context)
REPORTER(oid, msg)
- return None
+ raise exc
CONTENT_REPLAY_RETRIES = 3
@@ -271,50 +279,54 @@
>>> id2 in dst
True
"""
- vol = []
- nb_skipped = 0
- nb_failures = 0
- t0 = time()
def _copy_object(obj):
- nonlocal nb_skipped
- nonlocal nb_failures
obj_id = obj[ID_HASH_ALGO]
if obj["status"] != "visible":
- nb_skipped += 1
logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"])
statsd.increment(
CONTENT_OPERATIONS_METRIC,
tags={"decision": "skipped", "status": obj["status"]},
)
+ return "skipped", None
elif exclude_fn and exclude_fn(obj):
- nb_skipped += 1
logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id))
statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"})
+ return "excluded", None
elif check_dst and obj_in_objstorage(obj_id, dst):
- nb_skipped += 1
logger.debug("skipped %s (in dst)", hash_to_hex(obj_id))
statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"})
+ return "in_dst", None
else:
try:
- copied = copy_object(obj_id, src, dst)
+ copied_bytes = copy_object(obj_id, src, dst)
except ObjNotFoundError:
- nb_skipped += 1
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}
)
+ return "not_found", None
+ except Exception:
+ statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"})
+ return "failed", None
else:
- if copied is None:
- nb_failures += 1
+ if copied_bytes is None:
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"}
)
+ return "failed", None
else:
- vol.append(copied)
statsd.increment(
CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"}
)
+ return "copied", copied_bytes
+ return "failed", None
+
+ vol = 0
+ stats = dict.fromkeys(
+ ["skipped", "excluded", "not_found", "failed", "copied", "in_dst"], 0
+ )
+ t0 = time()
with ThreadPoolExecutor(max_workers=concurrency) as pool:
futures = []
@@ -327,26 +339,33 @@
for obj in objects:
futures.append(pool.submit(_copy_object, obj=obj))
- futures_wait(futures, return_when=FIRST_EXCEPTION)
+ futures_wait(futures)
for f in futures:
- if f.running():
- continue
exc = f.exception()
if exc:
- pool.shutdown(wait=False)
- f.result()
+ # XXX this should not happen, so it is probably wrong...
raise exc
+ else:
+ state, nbytes = f.result()
+ if nbytes is not None:
+ vol += nbytes
+ stats[state] += 1
dt = time() - t0
logger.info(
- "processed %s content objects in %.1fsec "
- "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped",
- len(vol),
- dt,
- len(vol) / dt,
- sum(vol) / 1024 / 1024 / dt,
- nb_failures,
- nb_skipped,
+ "processed %s content objects (%s) in %s "
+ "(%.1f obj/sec, %s/sec) - %d copied - %d skipped - %d excluded - "
+ "%d not found - %d failed",
+ len(futures),
+ naturalsize(vol),
+ naturaldelta(dt),
+ len(futures) / dt,
+ naturalsize(vol / dt),
+ stats["copied"],
+ stats["skipped"],
+ stats["excluded"],
+ stats["not_found"],
+ stats["failed"],
)
if notify:
diff --git a/swh/objstorage/replayer/tests/test_replay_errors.py b/swh/objstorage/replayer/tests/test_replay_errors.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/replayer/tests/test_replay_errors.py
@@ -0,0 +1,207 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import defaultdict
+import functools
+from queue import Queue
+
+from swh.journal.client import JournalClient
+from swh.journal.writer import get_journal_writer
+from swh.model.model import Content
+from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.factory import get_objstorage
+from swh.objstorage.multiplexer.filter.filter import ObjStorageFilter
+from swh.objstorage.replayer import replay
+from swh.objstorage.replayer.replay import copy_object # needed for MonkeyPatch
+
+CONTENTS = [Content.from_data(f"foo{i}".encode()) for i in range(10)] + [
+ Content.from_data(f"forbidden foo{i}".encode(), status="hidden") for i in range(10)
+]
+
+
+class FailingObjstorage(ObjStorageFilter):
+ def __init__(self, storage):
+ super().__init__(storage)
+ self.calls = defaultdict(lambda: 0)
+ self.rate = 3
+
+ def get(self, obj_id, *args, **kwargs):
+ self.calls[obj_id] += 1
+ if (self.calls[obj_id] % self.rate) == 0:
+ return self.storage.get(obj_id, *args, **kwargs)
+ raise Exception("Nope")
+
+
+class NotFoundObjstorage(ObjStorageFilter):
+ def get(self, obj_id, *args, **kwargs):
+ raise ObjNotFoundError(obj_id)
+
+
+def prepare_test(kafka_server, kafka_prefix, kafka_consumer_group):
+ src_objstorage = get_objstorage(cls="memory")
+
+ writer = get_journal_writer(
+ cls="kafka",
+ brokers=[kafka_server],
+ client_id="kafka_writer",
+ prefix=kafka_prefix,
+ anonymize=False,
+ )
+
+ for content in CONTENTS:
+ src_objstorage.add(content.data, obj_id=content.sha1)
+ writer.write_addition("content", content)
+
+ replayer = JournalClient(
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_on_eof=True,
+ )
+
+ return replayer, src_objstorage
+
+
+def copy_object_q(q):
+ """Wrap the original copy_object function to capture (thread-local) tenacity
+ stats and puch them in a queue suitable for checking in a test session"""
+
+ def wrap(obj_id, src, dst):
+ try:
+ ret = copy_object(obj_id, src, dst)
+ return ret
+ finally:
+ q.put(("get", obj_id, replay.get_object.retry.statistics.copy()))
+ q.put(("put", obj_id, replay.put_object.retry.statistics.copy()))
+
+ return wrap
+
+
+def test_replay_content_with_transient_errors(
+ kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch
+):
+ replayer, src_objstorage = prepare_test(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ )
+ dst_objstorage = get_objstorage(cls="memory")
+ src_objstorage = FailingObjstorage(src_objstorage)
+
+ q = Queue()
+ monkeypatch.setattr(replay, "copy_object", copy_object_q(q))
+
+ worker_fn = functools.partial(
+ replay.process_replay_objects_content,
+ src=src_objstorage,
+ dst=dst_objstorage,
+ )
+ replayer.process(worker_fn)
+
+ # only content with status visible will be copied in storage2
+ expected_objstorage_state = {
+ c.sha1: c.data for c in CONTENTS if c.status == "visible"
+ }
+ assert expected_objstorage_state == dst_objstorage.state
+
+ stats = [q.get_nowait() for i in range(q.qsize())]
+ for obj_id in expected_objstorage_state:
+ put = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put"
+ )
+ assert put.get("attempt_number") == 1
+ assert put.get("start_time") > 0
+ assert put.get("idle_for") == 0
+
+ get = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get"
+ )
+ assert get.get("attempt_number") == 3
+ assert get.get("start_time") > 0
+ assert get.get("idle_for") > 0
+ assert get.get("delay_since_first_attempt") > 0
+
+
+def test_replay_content_with_errors(
+ kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch
+):
+ replayer, src_objstorage = prepare_test(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ )
+ dst_objstorage = get_objstorage(cls="memory")
+ src_objstorage = FailingObjstorage(src_objstorage)
+
+ q = Queue()
+ monkeypatch.setattr(replay, "copy_object", copy_object_q(q))
+ monkeypatch.setattr(replay.get_object.retry.stop, "max_attempt_number", 2)
+
+ worker_fn = functools.partial(
+ replay.process_replay_objects_content,
+ src=src_objstorage,
+ dst=dst_objstorage,
+ )
+ replayer.process(worker_fn)
+
+ # no object could be replicated
+ assert dst_objstorage.state == {}
+
+ stats = [q.get_nowait() for i in range(q.qsize())]
+ for obj in CONTENTS:
+ if obj.status != "visible":
+ continue
+
+ obj_id = obj.sha1
+ put = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put"
+ )
+ assert put == {}
+
+ get = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get"
+ )
+ assert get.get("attempt_number") == 2
+ assert get.get("start_time") > 0
+ assert get.get("idle_for") > 0
+ assert get.get("delay_since_first_attempt") > 0
+
+
+def test_replay_content_not_found(
+ kafka_server, kafka_prefix, kafka_consumer_group, monkeypatch
+):
+ replayer, src_objstorage = prepare_test(
+ kafka_server, kafka_prefix, kafka_consumer_group
+ )
+ dst_objstorage = get_objstorage(cls="memory")
+ src_objstorage = NotFoundObjstorage(src_objstorage)
+
+ q = Queue()
+ monkeypatch.setattr(replay, "copy_object", copy_object_q(q))
+
+ worker_fn = functools.partial(
+ replay.process_replay_objects_content,
+ src=src_objstorage,
+ dst=dst_objstorage,
+ )
+ replayer.process(worker_fn)
+
+ # no object could be replicated
+ assert dst_objstorage.state == {}
+
+ stats = [q.get_nowait() for i in range(q.qsize())]
+ for obj in CONTENTS:
+ if obj.status != "visible":
+ continue
+
+ obj_id = obj.sha1
+ put = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "put"
+ )
+ assert put == {}
+
+ get = next(
+ stat for (meth, oid, stat) in stats if oid == obj_id and meth == "get"
+ )
+ # ObjectNotFound should not be retried several times...
+ assert get.get("attempt_number") == 1
+ assert get.get("start_time") > 0
+ assert get.get("idle_for") == 0

File Metadata

Mime Type
text/plain
Expires
Dec 16 2024, 11:52 PM (4 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214899

Event Timeline