Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import logging | import logging | ||||
from typing import Any, Container, Dict, Optional | from typing import Any, Container, Dict, Optional | ||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS | from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS | ||||
from swh.model.model import Content | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.in_memory import InMemoryStorage | from swh.storage.in_memory import InMemoryStorage | ||||
from swh.storage.replay import process_replay_objects | from swh.storage.replay import process_replay_objects | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
▲ Show 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | for object_type, objects in TEST_OBJECTS.items(): | ||||
nb_sent += len(objects) | nb_sent += len(objects) | ||||
# Create collision in input data | # Create collision in input data | ||||
# These should not be written in the destination | # These should not be written in the destination | ||||
producer = src.journal_writer.journal.producer | producer = src.journal_writer.journal.producer | ||||
prefix = src.journal_writer.journal._prefix | prefix = src.journal_writer.journal._prefix | ||||
for content in DUPLICATE_CONTENTS: | for content in DUPLICATE_CONTENTS: | ||||
topic = f"{prefix}.content" | topic = f"{prefix}.content" | ||||
key = content["sha1"] | key = content.sha1 | ||||
producer.produce( | producer.produce( | ||||
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), | topic=topic, key=key_to_kafka(key), value=value_to_kafka(content.to_dict()), | ||||
) | ) | ||||
nb_sent += 1 | nb_sent += 1 | ||||
producer.flush() | producer.flush() | ||||
caplog.set_level(logging.ERROR, "swh.journal.replay") | caplog.set_level(logging.ERROR, "swh.journal.replay") | ||||
# Fill the destination storage from Kafka | # Fill the destination storage from Kafka | ||||
Show All 10 Lines | for record in caplog.records: | ||||
if "Collision detected:" in logtext: | if "Collision detected:" in logtext: | ||||
nb_collisions += 1 | nb_collisions += 1 | ||||
actual_collision = record.args["collision"] | actual_collision = record.args["collision"] | ||||
assert nb_collisions == 1, "1 collision should be detected" | assert nb_collisions == 1, "1 collision should be detected" | ||||
algo = "sha1" | algo = "sha1" | ||||
assert actual_collision["algo"] == algo | assert actual_collision["algo"] == algo | ||||
expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) | expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0].get_hash(algo)) | ||||
assert actual_collision["hash"] == expected_colliding_hash | assert actual_collision["hash"] == expected_colliding_hash | ||||
actual_colliding_hashes = actual_collision["objects"] | actual_colliding_hashes = actual_collision["objects"] | ||||
assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) | assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) | ||||
for content in DUPLICATE_CONTENTS: | for content in DUPLICATE_CONTENTS: | ||||
expected_content_hashes = { | expected_content_hashes = { | ||||
k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() | k: hash_to_hex(v) for k, v in content.hashes().items() | ||||
} | } | ||||
assert expected_content_hashes in actual_colliding_hashes | assert expected_content_hashes in actual_colliding_hashes | ||||
# all objects from the src should exists in the dst storage | # all objects from the src should exists in the dst storage | ||||
_check_replayed(src, dst, exclude=["contents"]) | _check_replayed(src, dst, exclude=["contents"]) | ||||
# but the dst has one content more (one of the 2 colliding ones) | # but the dst has one content more (one of the 2 colliding ones) | ||||
assert len(src._contents) == len(dst._contents) - 1 | assert len(src._contents) == len(dst._contents) - 1 | ||||
▲ Show 20 Lines • Show All 192 Lines • Show Last 20 Lines |