replayer_storage_and_client = (<swh.storage.in_memory.InMemoryStorage object at 0x7f50fc589048>, <swh.journal.client.JournalClient object at 0x7f50f48be860>)
caplog = <_pytest.logging.LogCaptureFixture object at 0x7f50fd5ec080>
def test_storage_play_with_collision(replayer_storage_and_client, caplog):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
src, replayer = replayer_storage_and_client
# Fill Kafka using a source storage
nb_sent = 0
for object_type, objects in TEST_OBJECTS.items():
method = getattr(src, object_type + "_add")
method(objects)
if object_type == "origin_visit":
nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
nb_sent += len(objects)
# Create collision in input data
# These should not be written in the destination
producer = src.journal_writer.journal.producer
prefix = src.journal_writer.journal._prefix
for content in DUPLICATE_CONTENTS:
topic = f"{prefix}.content"
key = content["sha1"]
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
nb_inserted = replayer.process(worker_fn)
assert nb_sent == nb_inserted
# check the logs for the collision being properly detected
nb_collisions = 0
actual_collision: Dict
for record in caplog.records:
logtext = record.getMessage()
if "Collision detected:" in logtext:
nb_collisions += 1
actual_collision = record.args["collision"]
assert nb_collisions == 1, "1 collision should be detected"
algo = "sha1"
assert actual_collision["algo"] == algo
expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo])
assert actual_collision["hash"] == expected_colliding_hash
actual_colliding_hashes = actual_collision["objects"]
assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS)
for content in DUPLICATE_CONTENTS:
expected_content_hashes = {
k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items()
}
assert expected_content_hashes in actual_colliding_hashes
# all objects from the src should exists in the dst storage
> _check_replayed(src, dst, exclude=["contents"])
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:165:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src = <swh.storage.in_memory.InMemoryStorage object at 0x7f50fc589048>
dst = <swh.storage.in_memory.InMemoryStorage object at 0x7f50f0e9fa20>
exclude = ['contents']
def _check_replayed(
src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None
):
"""Simple utility function to compare the content of 2 in_memory storages
"""
expected_persons = set(src._persons.values())
got_persons = set(dst._persons.values())
assert got_persons == expected_persons
for attr in (
"contents",
"skipped_contents",
"directories",
"revisions",
"releases",
"snapshots",
"origins",
"origin_visits",
):
if exclude and attr in exclude:
continue
expected_objects = sorted(getattr(src, f"_{attr}").items())
got_objects = sorted(getattr(dst, f"_{attr}").items())
> assert got_objects == expected_objects, f"Mismatch object list for {attr}"
E AssertionError: Mismatch object list for origin_visits
E assert [('https://ov...}, visit=3)])] == [('https://ov...den/fox', [])]
E At index 0 diff: ('https://overtherainbow.org/fox/den', [OriginVisit(origin='https://overtherainbow.org/fox/den', date=datetime.datetime(2014, 11, 27, 17, 20, 39, tzinfo=tzlocal()), status='ongoing', type='hg', snapshot=None, metadata={'baz': 'qux'}, visit=1), OriginVisit(origin='https://overtherainbow.org/fox/den', date=datetime.datetime(2015, 11, 27, 17, 20, 39, tzinfo=tzlocal()), status='partial', type='hg', snapshot=b'\xec\xeeH9z\x92\xb0\xd04\xe9u*\x17E\x9f6\x91\xa7>\xf9', metadata={'something': 'wrong occurred'}, visit=2)]) != ('https://overtherainbow.org/fox/den',...
E
E ...Full output truncated (13 lines hidden), use '-vv' to show
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:209: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Jun 11 2020, 4:27 PM