Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.journal.tests.test_replay::test_storage_play_with_collision
Failed

TEST RESULT

Run At
Apr 11 2020, 12:50 PM
Details
self = <Retrying object at 0x7fce29c29f60 (stop=<tenacity.stop.stop_after_attempt object at 0x7fce29c297b8>, wait=<tenacity.w...0x7fce29c4cf28>, before=<function before_nothing at 0x7fce2a1b6510>, after=<function after_nothing at 0x7fce2a1c5ea0>)> fn = <function RetryingProxyStorage.content_add_metadata at 0x7fce29ce46a8> args = (<swh.storage.retry.RetryingProxyStorage object at 0x7fce29c40160>, [Content(sha1=b'\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]...=b'bnother-foo', sha256=b'another-baz', blake2s256=b'another-bar', length=4, status='visible', data=None, ctime=None)]) kwargs = {}, retry_state = <tenacity.RetryCallState object at 0x7fce280ffbe0> do = <tenacity.DoAttempt object at 0x7fce280ff8d0> def call(self, fn, *args, **kwargs): self.begin(fn) retry_state = RetryCallState( retry_object=self, fn=fn, args=args, kwargs=kwargs) while True: do = self.iter(retry_state=retry_state) if isinstance(do, DoAttempt): try: > result = fn(*args, **kwargs) .tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:394: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.storage.retry.RetryingProxyStorage object at 0x7fce29c40160> content = [Content(sha1=b'\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]\r\xd4\x7f<[\xc2u\xda\x8a3', sha1_git=b'\x19\x10(\x15f=#\xf8\xb7ZG\x...t=b'bnother-foo', sha256=b'another-baz', blake2s256=b'another-bar', length=4, status='visible', data=None, ctime=None)] @swh_retry def content_add_metadata(self, content: Iterable[Content]) -> Dict: > return self.storage.content_add_metadata(content) .tox/py3/lib/python3.7/site-packages/swh/storage/retry.py:100: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.storage.in_memory.InMemoryStorage object at 0x7fce29c40b00> content = [Content(sha1=b'\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]\r\xd4\x7f<[\xc2u\xda\x8a3', sha1_git=b'\x19\x10(\x15f=#\xf8\xb7ZG\x...t=b'bnother-foo', sha256=b'another-baz', blake2s256=b'another-bar', length=4, status='visible', data=None, ctime=None)] def content_add_metadata(self, content: Iterable[Content]) -> Dict: > return self._content_add(content, with_data=False) .tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:153: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.storage.in_memory.InMemoryStorage object at 0x7fce29c40b00> contents = [Content(sha1=b'\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]\r\xd4\x7f<[\xc2u\xda\x8a3', sha1_git=b'\x19\x10(\x15f=#\xf8\xb7ZG\x...t=b'bnother-foo', sha256=b'another-baz', blake2s256=b'another-bar', length=4, status='visible', data=None, ctime=None)] with_data = False def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict: self.journal_writer.content_add(contents) content_add = 0 content_add_bytes = 0 if with_data: summary = self.objstorage.content_add( c for c in contents if c.status != "absent" ) content_add_bytes = summary["content:add:bytes"] for content in contents: key = self._content_key(content) if key in self._contents: continue for algorithm in DEFAULT_ALGORITHMS: hash_ = content.get_hash(algorithm) if hash_ in self._content_indexes[algorithm] and ( algorithm not in {"blake2s256", "sha256"} ): colliding_content_hashes = [] # Add the already stored contents for content_hashes_set in self._content_indexes[algorithm][hash_]: hashes = dict(content_hashes_set) colliding_content_hashes.append(hashes) # Add the new colliding content colliding_content_hashes.append(content.hashes()) > raise HashCollision(algorithm, hash_, colliding_content_hashes) E swh.storage.exc.HashCollision: ('sha1', '44973274ccef6ab4dfaaf86599792fa9c3fe4689', [{'sha1_git': '616e6f746865722d666f6f', 'sha256': '616e6f746865722d62617a', 'blake2s256': '616e6f746865722d626172', 'sha1': '44973274ccef6ab4dfaaf86599792fa9c3fe4689'}, {'sha1_git': '626e6f746865722d666f6f', 'sha256': '616e6f746865722d62617a', 'blake2s256': '616e6f746865722d626172', 'sha1': '44973274ccef6ab4dfaaf86599792fa9c3fe4689'}]) .tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:108: HashCollision The above exception was the direct cause of the following exception: kafka_prefix = 'enovkcjsqy.swh.journal.objects' kafka_consumer_group = 'test-consumer-enovkcjsqy' kafka_server = (<subprocess.Popen object at 0x7fce29e7e978>, 16748) caplog = <_pytest.logging.LogCaptureFixture object at 0x7fce29c407f0> def test_storage_play_with_collision( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, ): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { "bootstrap.servers": "localhost:{}".format(port), "client.id": "test producer", "enable.idempotence": "true", } ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: topic = make_topic(kafka_prefix, "content") producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( brokers="localhost:%d" % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: > nb_inserted += replayer.process(worker_fn) .tox/py3/lib/python3.7/site-packages/swh/journal/tests/test_replay.py:210: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .tox/py3/lib/python3.7/site-packages/swh/journal/client.py:234: in process batch_processed, at_eof = self.handle_messages(messages, worker_fn) .tox/py3/lib/python3.7/site-packages/swh/journal/client.py:263: in handle_messages worker_fn(dict(objects)) .tox/py3/lib/python3.7/site-packages/swh/journal/replay.py:73: in process_replay_objects _insert_objects(object_type, objects, storage) .tox/py3/lib/python3.7/site-packages/swh/journal/replay.py:98: in _insert_objects storage.content_add_metadata(contents) .tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:311: in wrapped_f return self.call(f, *args, **kw) .tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:391: in call do = self.iter(retry_state=retry_state) .tox/py3/lib/python3.7/site-packages/tenacity/__init__.py:351: in iter six.raise_from(retry_exc, fut.exception()) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ value = None from_value = HashCollision('sha1', '44973274ccef6ab4dfaaf86599792fa9c3fe4689', [{'sha1_git': '616e6f746865722d666f6f', 'sha256': '6...'616e6f746865722d62617a', 'blake2s256': '616e6f746865722d626172', 'sha1': '44973274ccef6ab4dfaaf86599792fa9c3fe4689'}]) > ??? E tenacity.RetryError: RetryError[<Future at 0x7fce280ffac8 state=finished raised HashCollision>] <string>:3: RetryError