self = <Retrying object at 0x7f001dc1b668 (stop=<tenacity.stop.stop_after_attempt object at 0x7f0245561278>, wait=<tenacity.w...0x7f02455b9510>, before=<function before_nothing at 0x7f02464ebb70>, after=<function after_nothing at 0x7f0246502048>)>
fn = <function retry_function.<locals>.newf at 0x7f003b6e4bf8>
args = ([Content(sha1=b'4\x972t\xcc\xefj\xb4\xdf\xaa\xf8e\x99y/\xa9\xc3\xfeF\x89', sha1_git=b'\xd8\x1c\xc0q\x0e\xb6\xcf\x9e\x...99We'\xe4,\xfdv\xa9EZ$2\xfe\x7fVf\x95dW}\xd9<B\x80\xe7mf\x1d", length=3, status='visible', data=b'42\n', ctime=None)],)
kwargs = {}
retry_state = <RetryCallState 139638895734288: attempt #3; slept for 1.53; last result: failed (AttributeError 'InMemoryCqlRunner' object has no attribute 'content_get_from_tokens')>
do = <tenacity.DoAttempt object at 0x7f0036c478d0>
def __call__(self, fn: t.Callable[..., _RetValT], *args: t.Any, **kwargs: t.Any) -> _RetValT:
self.begin()
retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
while True:
do = self.iter(retry_state=retry_state)
if isinstance(do, DoAttempt):
try:
> result = fn(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:407:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = ([Content(sha1=b'4\x972t\xcc\xefj\xb4\xdf\xaa\xf8e\x99y/\xa9\xc3\xfeF\x89', sha1_git=b'\xd8\x1c\xc0q\x0e\xb6\xcf\x9e\x...99We'\xe4,\xfdv\xa9EZ$2\xfe\x7fVf\x95dW}\xd9<B\x80\xe7mf\x1d", length=3, status='visible', data=b'42\n', ctime=None)],)
kwargs = {}
@swh_retry
def newf(*args, **kwargs):
> return getattr(storage, attribute_name)(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/swh/storage/proxies/retry.py:67:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
a = (<swh.storage.in_memory.InMemoryStorage object at 0x7f001dc1b0f0>, [Content(sha1=b'4\x972t\xcc\xefj\xb4\xdf\xaa\xf8e\x...199We'\xe4,\xfdv\xa9EZ$2\xfe\x7fVf\x95dW}\xd9<B\x80\xe7mf\x1d", length=3, status='visible', data=b'42\n', ctime=None)])
kw = {}
@wraps(f)
def d(*a, **kw):
with statsd.timed(DURATION_METRIC, tags={"endpoint": f.__name__}):
> return f(*a, **kw)
.tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:24:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
a = (<swh.storage.in_memory.InMemoryStorage object at 0x7f001dc1b0f0>, [Content(sha1=b'4\x972t\xcc\xefj\xb4\xdf\xaa\xf8e\x...199We'\xe4,\xfdv\xa9EZ$2\xfe\x7fVf\x95dW}\xd9<B\x80\xe7mf\x1d", length=3, status='visible', data=b'42\n', ctime=None)])
kw = {}
@wraps(f)
def d(*a, **kw):
> r = f(*a, **kw)
.tox/py3/lib/python3.7/site-packages/swh/storage/metrics.py:77:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f001dc1b0f0>
content = [Content(sha1=b'4\x972t\xcc\xefj\xb4\xdf\xaa\xf8e\x99y/\xa9\xc3\xfeF\x89', sha1_git=b'\xd8\x1c\xc0q\x0e\xb6\xcf\x9e\xf...x199We'\xe4,\xfdv\xa9EZ$2\xfe\x7fVf\x95dW}\xd9<B\x80\xe7mf\x1d", length=3, status='visible', data=b'42\n', ctime=None)]
@timed
@process_metrics
def content_add(self, content: List[Content]) -> Dict[str, int]:
to_add = {
(c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content
}.values()
contents = [attr.evolve(c, ctime=now()) for c in to_add]
> return self._content_add(list(contents), with_data=True)
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:265:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f001dc1b0f0>
contents = [Content(sha1=b'4\x972t\xcc\xefj\xb4\xdf\xaa\xf8e\x99y/\xa9\xc3\xfeF\x89', sha1_git=b'\xd8\x1c\xc0q\x0e\xb6\xcf\x9e\xf...status='visible', data=b'42\n', ctime=datetime.datetime(2021, 9, 15, 13, 19, 4, 575541, tzinfo=datetime.timezone.utc))]
with_data = True
def _content_add(self, contents: List[Content], with_data: bool) -> Dict[str, int]:
# Filter-out content already in the database.
if not self._allow_overwrite:
contents = [
c
for c in contents
if not self._cql_runner.content_get_from_pk(c.to_dict())
]
if with_data:
# First insert to the objstorage, if the endpoint is
# `content_add` (as opposed to `content_add_metadata`).
# Must add to the objstorage before the DB and journal. Otherwise:
# 1. in case of a crash the DB may "believe" we have the content, but
# we didn't have time to write to the objstorage before the crash
# 2. the objstorage mirroring, which reads from the journal, may attempt to
# read from the objstorage before we finished writing it
summary = self.objstorage.content_add(
c for c in contents if c.status != "absent"
)
content_add_bytes = summary["content:add:bytes"]
self.journal_writer.content_add(contents)
content_add = 0
for content in contents:
content_add += 1
# Check for sha1 or sha1_git collisions. This test is not atomic
# with the insertion, so it won't detect a collision if both
# contents are inserted at the same time, but it's good enough.
#
# The proper way to do it would probably be a BATCH, but this
# would be inefficient because of the number of partitions we
# need to affect (len(HASH_ALGORITHMS)+1, which is currently 5)
if not self._allow_overwrite:
for algo in {"sha1", "sha1_git"}:
collisions = []
# Get tokens of 'content' rows with the same value for
# sha1/sha1_git
# TODO: batch these requests, instead of sending them one by one
rows = self._content_get_from_hashes(algo, [content.get_hash(algo)])
> for row in rows:
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:221:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f001dc1b0f0>
algo = 'sha1_git'
hashes = [b'\xd8\x1c\xc0q\x0e\xb6\xcf\x9e\xfd[\x92\n\x84S\xe1\xe0qW\xb6\xcd']
def _content_get_from_hashes(self, algo, hashes: List[bytes]) -> Iterable:
"""From the name of a hash algorithm and a value of that hash,
looks up the "hash -> token" secondary table (content_by_{algo})
to get tokens.
Then, looks up the main table (content) to get all contents with
that token, and filters out contents whose hash doesn't match."""
found_tokens = list(
self._cql_runner.content_get_tokens_from_single_algo(algo, hashes)
)
assert all(isinstance(token, int) for token in found_tokens)
# Query the main table ('content').
> rows = self._cql_runner.content_get_from_tokens(found_tokens)
E AttributeError: 'InMemoryCqlRunner' object has no attribute 'content_get_from_tokens'
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:172: AttributeError
The above exception was the direct cause of the following exception:
swh_storage = <swh.storage.proxies.retry.RetryingProxyStorage object at 0x7f001dc1b748>
sample_data = <swh.storage.tests.storage_data.StorageData object at 0x7f0036c6e7f0>
def test_retrying_proxy_storage_content_add(swh_storage, sample_data):
"""Standard content_add works as before
"""
sample_content = sample_data.content
content = swh_storage.content_get_data(sample_content.sha1)
assert content is None
> s = swh_storage.content_add([sample_content])
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_retry.py:51:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:324: in wrapped_f
return self(f, *args, **kw)
.tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:404: in __call__
do = self.iter(retry_state=retry_state)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Retrying object at 0x7f001dc1b668 (stop=<tenacity.stop.stop_after_attempt object at 0x7f0245561278>, wait=<tenacity.w...0x7f02455b9510>, before=<function before_nothing at 0x7f02464ebb70>, after=<function after_nothing at 0x7f0246502048>)>
retry_state = <RetryCallState 139638895734288: attempt #3; slept for 1.53; last result: failed (AttributeError 'InMemoryCqlRunner' object has no attribute 'content_get_from_tokens')>
def iter(self, retry_state: "RetryCallState") -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa
fut = retry_state.outcome
if fut is None:
if self.before is not None:
self.before(retry_state)
return DoAttempt()
is_explicit_retry = retry_state.outcome.failed and isinstance(retry_state.outcome.exception(), TryAgain)
if not (is_explicit_retry or self.retry(retry_state=retry_state)):
return fut.result()
if self.after is not None:
self.after(retry_state)
self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
if self.stop(retry_state=retry_state):
if self.retry_error_callback:
return self.retry_error_callback(retry_state)
retry_exc = self.retry_error_cls(fut)
if self.reraise:
raise retry_exc.reraise()
> raise retry_exc from fut.exception()
E tenacity.RetryError: RetryError[<Future at 0x7f0036c479b0 state=finished raised AttributeError>]
.tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:361: RetryError
TEST RESULT
TEST RESULT
- Run At
- Sep 15 2021, 3:20 PM