replayer_storage_and_client = (<swh.storage.in_memory.InMemoryStorage object at 0x7f9a7d0da320>, <swh.journal.client.JournalClient object at 0x7f9a7d0da5f8>)
caplog = <_pytest.logging.LogCaptureFixture object at 0x7f9a7d0f2898>
def test_storage_play_with_collision(replayer_storage_and_client, caplog):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
src, replayer = replayer_storage_and_client
# Fill Kafka using a source storage
nb_sent = 0
for object_type, objects in TEST_OBJECTS.items():
method = getattr(src, object_type + "_add")
method(objects)
if object_type == "origin_visit":
nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
nb_sent += len(objects)
# Create collision in input data
# These should not be written in the destination
producer = src.journal_writer.journal.producer
prefix = src.journal_writer.journal._prefix
for content in DUPLICATE_CONTENTS:
topic = f"{prefix}.content"
key = content.sha1
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content.to_dict()),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
> nb_inserted = replayer.process(worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:135:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:264: in process
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:291: in handle_messages
worker_fn(dict(objects))
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:61: in process_replay_objects
_insert_objects(object_type, objects, storage)
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:123: in _insert_objects
collision_aware_content_add(storage.content_add_metadata, contents)
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:85: in collision_aware_content_add
content_add_fn(contents)
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:189: in content_add_metadata
return self._content_add(content, with_data=False)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f9a7d208780>
contents = [Content(sha1=b"\xaa\xad\xd9Iw\xb8\xfb\xf3\xf6\xfb\t\xfc;\xbb\xc9\xed\xbd\xfa\x84'", sha1_git=b'\x05\xa5\x14s\x8e\xfd\...me=datetime.datetime(2020, 8, 12, 16, 39, 35, 688225, tzinfo=datetime.timezone(datetime.timedelta(0), '+00:00'))), ...]
with_data = False
def _content_add(self, contents: List[Content], with_data: bool) -> Dict:
# Filter-out content already in the database.
contents = [
c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict())
]
self.journal_writer.content_add(contents)
if with_data:
# First insert to the objstorage, if the endpoint is
# `content_add` (as opposed to `content_add_metadata`).
# TODO: this should probably be done in concurrently to inserting
# in index tables (but still before the main table; so an entry is
# only added to the main table after everything else was
# successfully inserted.
summary = self.objstorage.content_add(
c for c in contents if c.status != "absent"
)
content_add_bytes = summary["content:add:bytes"]
content_add = 0
for content in contents:
content_add += 1
# Check for sha1 or sha1_git collisions. This test is not atomic
# with the insertion, so it won't detect a collision if both
# contents are inserted at the same time, but it's good enough.
#
# The proper way to do it would probably be a BATCH, but this
# would be inefficient because of the number of partitions we
# need to affect (len(HASH_ALGORITHMS)+1, which is currently 5)
for algo in {"sha1", "sha1_git"}:
collisions = []
# Get tokens of 'content' rows with the same value for
# sha1/sha1_git
rows = self._content_get_from_hash(algo, content.get_hash(algo))
for row in rows:
if getattr(row, algo) != content.get_hash(algo):
# collision of token(partition key), ignore this
# row
continue
for algo in HASH_ALGORITHMS:
if getattr(row, algo) != content.get_hash(algo):
# This hash didn't match; discard the row.
collisions.append(
{algo: getattr(row, algo) for algo in HASH_ALGORITHMS}
)
if collisions:
collisions.append(content.hashes())
raise HashCollision(algo, content.get_hash(algo), collisions)
(token, insertion_finalizer) = self._cql_runner.content_add_prepare(
> ContentRow(**remove_keys(content.to_dict(), ("data",)))
)
E TypeError: __init__() missing 1 required positional argument: 'ctime'
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:158: TypeError
TEST RESULT
TEST RESULT
- Run At
- Aug 12 2020, 6:40 PM