self = <Retrying object at 0x7fce29c29f60 (stop=<tenacity.stop.stop_after_attempt object at 0x7fce29c297b8>, wait=<tenacity.w...0x7fce29c4cf28>, before=<function before_nothing at 0x7fce2a1b6510>, after=<function after_nothing at 0x7fce2a1c5ea0>)>
fn = <function RetryingProxyStorage.content_add_metadata at 0x7fce29ce46a8>
args = (<swh.storage.retry.RetryingProxyStorage object at 0x7fce29c40160>, [Content(sha1=b'\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]...=b'bnother-foo', sha256=b'another-baz', blake2s256=b'another-bar', length=4, status='visible', data=None, ctime=None)])
kwargs = {}, retry_state = <tenacity.RetryCallState object at 0x7fce280ffbe0>
do = <tenacity.DoAttempt object at 0x7fce280ff8d0>
def call(self, fn, *args, **kwargs):
self.begin(fn)
retry_state = RetryCallState(
retry_object=self, fn=fn, args=args, kwargs=kwargs)
while True:
do = self.iter(retry_state=retry_state)
if isinstance(do, DoAttempt):
try:
> result = fn(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:394:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.retry.RetryingProxyStorage object at 0x7fce29c40160>
content = [Content(sha1=b'\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]\r\xd4\x7f<[\xc2u\xda\x8a3', sha1_git=b'\x19\x10(\x15f=#\xf8\xb7ZG\x...t=b'bnother-foo', sha256=b'another-baz', blake2s256=b'another-bar', length=4, status='visible', data=None, ctime=None)]
@swh_retry
def content_add_metadata(self, content: Iterable[Content]) -> Dict:
> return self.storage.content_add_metadata(content)
.tox/py3/lib/python3.7/site-packages/swh/storage/retry.py:100:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7fce29c40b00>
content = [Content(sha1=b'\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]\r\xd4\x7f<[\xc2u\xda\x8a3', sha1_git=b'\x19\x10(\x15f=#\xf8\xb7ZG\x...t=b'bnother-foo', sha256=b'another-baz', blake2s256=b'another-bar', length=4, status='visible', data=None, ctime=None)]
def content_add_metadata(self, content: Iterable[Content]) -> Dict:
> return self._content_add(content, with_data=False)
.tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:153:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7fce29c40b00>
contents = [Content(sha1=b'\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]\r\xd4\x7f<[\xc2u\xda\x8a3', sha1_git=b'\x19\x10(\x15f=#\xf8\xb7ZG\x...t=b'bnother-foo', sha256=b'another-baz', blake2s256=b'another-bar', length=4, status='visible', data=None, ctime=None)]
with_data = False
def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict:
self.journal_writer.content_add(contents)
content_add = 0
content_add_bytes = 0
if with_data:
summary = self.objstorage.content_add(
c for c in contents if c.status != "absent"
)
content_add_bytes = summary["content:add:bytes"]
for content in contents:
key = self._content_key(content)
if key in self._contents:
continue
for algorithm in DEFAULT_ALGORITHMS:
hash_ = content.get_hash(algorithm)
if hash_ in self._content_indexes[algorithm] and (
algorithm not in {"blake2s256", "sha256"}
):
colliding_content_hashes = []
# Add the already stored contents
for content_hashes_set in self._content_indexes[algorithm][hash_]:
hashes = dict(content_hashes_set)
colliding_content_hashes.append(hashes)
# Add the new colliding content
colliding_content_hashes.append(content.hashes())
> raise HashCollision(algorithm, hash_, colliding_content_hashes)
E swh.storage.exc.HashCollision: ('sha1', '44973274ccef6ab4dfaaf86599792fa9c3fe4689', [{'sha1_git': '616e6f746865722d666f6f', 'sha256': '616e6f746865722d62617a', 'blake2s256': '616e6f746865722d626172', 'sha1': '44973274ccef6ab4dfaaf86599792fa9c3fe4689'}, {'sha1_git': '626e6f746865722d666f6f', 'sha256': '616e6f746865722d62617a', 'blake2s256': '616e6f746865722d626172', 'sha1': '44973274ccef6ab4dfaaf86599792fa9c3fe4689'}])
.tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:108: HashCollision
The above exception was the direct cause of the following exception:
kafka_prefix = 'enovkcjsqy.swh.journal.objects'
kafka_consumer_group = 'test-consumer-enovkcjsqy'
kafka_server = (<subprocess.Popen object at 0x7fce29e7e978>, 16748)
caplog = <_pytest.logging.LogCaptureFixture object at 0x7fce29c407f0>
def test_storage_play_with_collision(
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog,
):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
(_, port) = kafka_server
kafka_prefix += ".swh.journal.objects"
storage = get_storage(**storage_config)
producer = Producer(
{
"bootstrap.servers": "localhost:{}".format(port),
"client.id": "test producer",
"enable.idempotence": "true",
}
)
now = datetime.datetime.now(tz=datetime.timezone.utc)
# Fill Kafka
nb_sent = 0
nb_visits = 0
for object_type, objects in TEST_OBJECT_DICTS.items():
topic = make_topic(kafka_prefix, object_type)
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
if object_type == "content":
object_["ctime"] = now
elif object_type == "origin_visit":
nb_visits += 1
object_["visit"] = nb_visits
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_),
)
nb_sent += 1
# Create collision in input data
# They are not written in the destination
for content in DUPLICATE_CONTENTS:
topic = make_topic(kafka_prefix, "content")
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the storage from Kafka
replayer = JournalClient(
brokers="localhost:%d" % kafka_server[1],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
)
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
> nb_inserted += replayer.process(worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/tests/test_replay.py:210:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:234: in process
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:263: in handle_messages
worker_fn(dict(objects))
.tox/py3/lib/python3.7/site-packages/swh/journal/replay.py:73: in process_replay_objects
_insert_objects(object_type, objects, storage)
.tox/py3/lib/python3.7/site-packages/swh/journal/replay.py:98: in _insert_objects
storage.content_add_metadata(contents)
.tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:311: in wrapped_f
return self.call(f, *args, **kw)
.tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:391: in call
do = self.iter(retry_state=retry_state)
.tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:351: in iter
six.raise_from(retry_exc, fut.exception())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
value = None
from_value = HashCollision('sha1', '44973274ccef6ab4dfaaf86599792fa9c3fe4689', [{'sha1_git': '616e6f746865722d666f6f', 'sha256': '6...'616e6f746865722d62617a', 'blake2s256': '616e6f746865722d626172', 'sha1': '44973274ccef6ab4dfaaf86599792fa9c3fe4689'}])
> ???
E tenacity.RetryError: RetryError[<Future at 0x7fce280ffac8 state=finished raised HashCollision>]
<string>:3: RetryError
TEST RESULT
TEST RESULT
- Run At
- Apr 11 2020, 12:50 PM