Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.storage.tests.test_replay::test_storage_play_with_collision
Failed

TEST RESULT

Run At
Aug 12 2020, 6:31 PM
Details
replayer_storage_and_client = (<swh.storage.in_memory.InMemoryStorage object at 0x7f71790d4160>, <swh.journal.client.JournalClient object at 0x7f71790d48d0>) caplog = <_pytest.logging.LogCaptureFixture object at 0x7f717885f908> def test_storage_play_with_collision(replayer_storage_and_client, caplog): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) if object_type == "origin_visit": nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well nb_sent += len(objects) # Create collision in input data # These should not be written in the destination producer = src.journal_writer.journal.producer prefix = src.journal_writer.journal._prefix for content in DUPLICATE_CONTENTS: topic = f"{prefix}.content" key = content.sha1 producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content.to_dict()), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) > nb_inserted = replayer.process(worker_fn) .tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:135: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .tox/py3/lib/python3.7/site-packages/swh/journal/client.py:264: in process batch_processed, at_eof = self.handle_messages(messages, worker_fn) .tox/py3/lib/python3.7/site-packages/swh/journal/client.py:291: in handle_messages worker_fn(dict(objects)) .tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:61: in process_replay_objects _insert_objects(object_type, objects, storage) .tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:123: in _insert_objects collision_aware_content_add(storage.content_add_metadata, contents) .tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:85: in collision_aware_content_add content_add_fn(contents) .tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:189: in content_add_metadata return self._content_add(content, with_data=False) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.storage.in_memory.InMemoryStorage object at 0x7f717452f1d0> contents = [Content(sha1=b"\xaa\xad\xd9Iw\xb8\xfb\xf3\xf6\xfb\t\xfc;\xbb\xc9\xed\xbd\xfa\x84'", sha1_git=b'\x05\xa5\x14s\x8e\xfd\...me=datetime.datetime(2020, 8, 12, 16, 30, 46, 265460, tzinfo=datetime.timezone(datetime.timedelta(0), '+00:00'))), ...] with_data = False def _content_add(self, contents: List[Content], with_data: bool) -> Dict: # Filter-out content already in the database. contents = [ c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict()) ] self.journal_writer.content_add(contents) if with_data: # First insert to the objstorage, if the endpoint is # `content_add` (as opposed to `content_add_metadata`). # TODO: this should probably be done in concurrently to inserting # in index tables (but still before the main table; so an entry is # only added to the main table after everything else was # successfully inserted. summary = self.objstorage.content_add( c for c in contents if c.status != "absent" ) content_add_bytes = summary["content:add:bytes"] content_add = 0 for content in contents: content_add += 1 # Check for sha1 or sha1_git collisions. This test is not atomic # with the insertion, so it won't detect a collision if both # contents are inserted at the same time, but it's good enough. # # The proper way to do it would probably be a BATCH, but this # would be inefficient because of the number of partitions we # need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) for algo in {"sha1", "sha1_git"}: collisions = [] # Get tokens of 'content' rows with the same value for # sha1/sha1_git rows = self._content_get_from_hash(algo, content.get_hash(algo)) for row in rows: if getattr(row, algo) != content.get_hash(algo): # collision of token(partition key), ignore this # row continue for algo in HASH_ALGORITHMS: if getattr(row, algo) != content.get_hash(algo): # This hash didn't match; discard the row. collisions.append( {algo: getattr(row, algo) for algo in HASH_ALGORITHMS} ) if collisions: collisions.append(content.hashes()) raise HashCollision(algo, content.get_hash(algo), collisions) (token, insertion_finalizer) = self._cql_runner.content_add_prepare( > ContentRow(**remove_keys(content.to_dict(), ("data",))) ) E TypeError: __init__() missing 1 required positional argument: 'ctime' .tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:158: TypeError