Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_retry.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | |||||
from typing import Dict | from typing import Dict | ||||
from unittest.mock import call | from unittest.mock import call | ||||
import psycopg2 | import psycopg2 | ||||
import pytest | import pytest | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
Snapshot, | Snapshot, | ||||
SkippedContent, | SkippedContent, | ||||
Origin, | Origin, | ||||
) | ) | ||||
from swh.model.hashutil import hash_to_hex | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.exc import HashCollision, StorageArgumentException | from swh.storage.exc import HashCollision, StorageArgumentException | ||||
from swh.storage.utils import content_hex_hashes | |||||
from .storage_data import date_visit1 | from .storage_data import date_visit1 | ||||
@pytest.fixture | @pytest.fixture | ||||
def fake_hash_collision(sample_data): | def fake_hash_collision(sample_data): | ||||
return HashCollision("sha1", "38762cf7f55934b34d179ae6a4c80cadccbb7f0a", []) | return HashCollision("sha1", "38762cf7f55934b34d179ae6a4c80cadccbb7f0a", []) | ||||
@pytest.fixture | @pytest.fixture | ||||
def swh_storage(): | def swh_storage(): | ||||
storage_config = { | storage_config = { | ||||
"cls": "pipeline", | "cls": "pipeline", | ||||
"steps": [{"cls": "validate"}, {"cls": "retry"}, {"cls": "memory"},], | "steps": [{"cls": "validate"}, {"cls": "retry"}, {"cls": "memory"},], | ||||
} | } | ||||
return get_storage(**storage_config) | return get_storage(**storage_config) | ||||
@pytest.fixture | |||||
def swh_pgstorage(swh_storage_backend_config): | |||||
"""Pg storage with retry ability | |||||
""" | |||||
storage_config = { | |||||
"cls": "pipeline", | |||||
"steps": [{"cls": "validate"}, {"cls": "retry"}, swh_storage_backend_config], | |||||
} | |||||
return get_storage(**storage_config) | |||||
def test_retrying_proxy_storage_content_add(swh_storage, sample_data): | def test_retrying_proxy_storage_content_add(swh_storage, sample_data): | ||||
"""Standard content_add works as before | """Standard content_add works as before | ||||
""" | """ | ||||
sample_content = sample_data["content"][0] | sample_content = sample_data["content"][0] | ||||
content = next(swh_storage.content_get([sample_content["sha1"]])) | content = next(swh_storage.content_get([sample_content["sha1"]])) | ||||
assert not content | assert not content | ||||
s = swh_storage.content_add([sample_content]) | s = swh_storage.content_add([sample_content]) | ||||
assert s == { | assert s == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": sample_content["length"], | "content:add:bytes": sample_content["length"], | ||||
} | } | ||||
content = next(swh_storage.content_get([sample_content["sha1"]])) | content = next(swh_storage.content_get([sample_content["sha1"]])) | ||||
assert content["sha1"] == sample_content["sha1"] | assert content["sha1"] == sample_content["sha1"] | ||||
vlorentz: Could you duplicate this test, to check what happens if you insert `[cont, cont2, cont3… | |||||
Done Inline Actionssure ardumont: sure | |||||
def test_retrying_proxy_storage_content_add_with_hash_collision( | |||||
swh_pgstorage, sample_data, caplog | |||||
): | |||||
"""Insert everything but hash collision contents if any | |||||
Note: Use the pg storage to simulate the transaction rollback (when hash collisions | |||||
is raised, no content is written to storage). | |||||
""" | |||||
storage = swh_pgstorage | |||||
caplog.set_level(logging.ERROR, "swh.storage.retry") | |||||
contents = sample_data["content"] | |||||
cont = contents[0] | |||||
cont2 = contents[1] | |||||
cont3 = contents[2] | |||||
# craft a hash collision between cont and cont2 | |||||
assert cont != cont2 | |||||
cont2["sha1"] = cont["sha1"] | |||||
assert len(cont["data"]) != len(cont2["data"]) | |||||
# same sha1 so only fetch one, which does not exist | |||||
content = next(storage.content_get([cont["sha1"]])) | |||||
assert not content | |||||
summary = storage.content_add([cont]) | |||||
# so summary is: | |||||
assert summary["content:add"] == 1 | |||||
actual_content = next(storage.content_get([cont["sha1"]])) | |||||
assert actual_content == {"sha1": cont["sha1"], "data": cont["data"]} | |||||
assert cont3["sha1"] != cont["sha1"] | |||||
content3 = next(storage.content_get([cont3["sha1"]])) | |||||
assert not content3 | |||||
# cont2 will collide, cont3 will be stored | |||||
summary2 = storage.content_add([cont3, cont2]) | |||||
content3 = next(storage.content_get([cont3["sha1"]])) | |||||
assert content3 == {"sha1": content3["sha1"], "data": cont3["data"]} | |||||
# so summary is: | |||||
assert summary2["content:add"] == 1 | |||||
collision = 0 | |||||
actual_collision: Dict | |||||
Not Done Inline ActionsYou should check the length of caplog.records, it will give a better error vlorentz: You should check the length of `caplog.records`, it will give a better error | |||||
Done Inline ActionsExcept i've no idea how long the length should be. ardumont: Except i've no idea how long the length should be.
I'm only interested in the collision one. | |||||
for record in caplog.records: | |||||
logtext = record.getMessage() | |||||
if "Collision detected:" in logtext: | |||||
collision += 1 | |||||
actual_collision = record.args["collision"] | |||||
assert collision == 1, "1 collision should be detected" | |||||
algo = "sha1" | |||||
assert actual_collision["algo"] == algo | |||||
expected_colliding_hash = hash_to_hex(cont2["sha1"]) | |||||
assert actual_collision["hash"] == expected_colliding_hash | |||||
actual_colliding_hashes = actual_collision["objects"] | |||||
assert len(actual_colliding_hashes) == 1 | |||||
expected_colliding_hashes = Content.from_dict(cont2).hashes() | |||||
assert actual_colliding_hashes[0] == content_hex_hashes(expected_colliding_hashes) | |||||
def test_retrying_proxy_storage_content_add_with_retry( | def test_retrying_proxy_storage_content_add_with_retry( | ||||
swh_storage, sample_data, mocker, fake_hash_collision | swh_storage, sample_data, mocker, fake_hash_collision | ||||
): | ): | ||||
"""Multiple retries for hash collision and psycopg2 error but finally ok | """Multiple retries on psycopg2 error but finally ok | ||||
""" | """ | ||||
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") | mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") | ||||
mock_memory.side_effect = [ | mock_memory.side_effect = [ | ||||
# first try goes ko | # 1st & 2nd tries goes ko | ||||
fake_hash_collision, | psycopg2.IntegrityError("content already inserted"), | ||||
Not Done Inline Actionsnit: 1st & 2nd tries go ko vlorentz: nit: `1st & 2nd tries go ko` | |||||
# second try goes ko | |||||
psycopg2.IntegrityError("content already inserted"), | psycopg2.IntegrityError("content already inserted"), | ||||
# ok then! | # ok then! | ||||
{"content:add": 1}, | {"content:add": 1}, | ||||
] | ] | ||||
mock_sleep = mocker.patch( | mock_sleep = mocker.patch( | ||||
"swh.storage.retry.RetryingProxyStorage" ".content_add.retry.sleep" | "swh.storage.retry.RetryingProxyStorage" ".content_add.retry.sleep" | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | def test_retrying_proxy_storage_content_add_metadata(swh_storage, sample_data): | ||||
content_metadata = swh_storage.content_get_metadata([pk]) | content_metadata = swh_storage.content_get_metadata([pk]) | ||||
assert len(content_metadata[pk]) == 1 | assert len(content_metadata[pk]) == 1 | ||||
assert content_metadata[pk][0]["sha1"] == pk | assert content_metadata[pk][0]["sha1"] == pk | ||||
def test_retrying_proxy_storage_content_add_metadata_with_retry( | def test_retrying_proxy_storage_content_add_metadata_with_retry( | ||||
swh_storage, sample_data, mocker, fake_hash_collision | swh_storage, sample_data, mocker, fake_hash_collision | ||||
): | ): | ||||
"""Multiple retries for hash collision and psycopg2 error but finally ok | """Multiple retries on psycopg2 error but finally ok | ||||
""" | """ | ||||
mock_memory = mocker.patch( | mock_memory = mocker.patch( | ||||
"swh.storage.in_memory.InMemoryStorage.content_add_metadata" | "swh.storage.in_memory.InMemoryStorage.content_add_metadata" | ||||
) | ) | ||||
mock_memory.side_effect = [ | mock_memory.side_effect = [ | ||||
# first try goes ko | # 1st & 2nd tries goes ko | ||||
fake_hash_collision, | psycopg2.IntegrityError("content_metadata already inserted"), | ||||
Not Done Inline Actionssame vlorentz: same | |||||
# second try goes ko | |||||
psycopg2.IntegrityError("content_metadata already inserted"), | psycopg2.IntegrityError("content_metadata already inserted"), | ||||
# ok then! | # ok then! | ||||
{"content:add": 1}, | {"content:add": 1}, | ||||
] | ] | ||||
mock_sleep = mocker.patch( | mock_sleep = mocker.patch( | ||||
"swh.storage.retry.RetryingProxyStorage" ".content_add_metadata.retry.sleep" | "swh.storage.retry.RetryingProxyStorage" ".content_add_metadata.retry.sleep" | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | def test_retrying_proxy_storage_skipped_content_add(swh_storage, sample_data): | ||||
skipped_content = list(swh_storage.skipped_content_missing([sample_content])) | skipped_content = list(swh_storage.skipped_content_missing([sample_content])) | ||||
assert len(skipped_content) == 0 | assert len(skipped_content) == 0 | ||||
def test_retrying_proxy_storage_skipped_content_add_with_retry( | def test_retrying_proxy_storage_skipped_content_add_with_retry( | ||||
swh_storage, sample_data, mocker, fake_hash_collision | swh_storage, sample_data, mocker, fake_hash_collision | ||||
): | ): | ||||
"""Multiple retries for hash collision and psycopg2 error but finally ok | """Multiple retries on psycopg2 error but finally ok | ||||
""" | """ | ||||
mock_memory = mocker.patch( | mock_memory = mocker.patch( | ||||
"swh.storage.in_memory.InMemoryStorage.skipped_content_add" | "swh.storage.in_memory.InMemoryStorage.skipped_content_add" | ||||
) | ) | ||||
mock_memory.side_effect = [ | mock_memory.side_effect = [ | ||||
# 1st & 2nd try goes ko | # 1st & 2nd tries goes ko | ||||
fake_hash_collision, | psycopg2.IntegrityError("skipped_content already inserted"), | ||||
psycopg2.IntegrityError("skipped_content already inserted"), | psycopg2.IntegrityError("skipped_content already inserted"), | ||||
# ok then! | # ok then! | ||||
{"skipped_content:add": 1}, | {"skipped_content:add": 1}, | ||||
] | ] | ||||
mock_sleep = mocker.patch( | mock_sleep = mocker.patch( | ||||
"swh.storage.retry.RetryingProxyStorage" ".skipped_content_add.retry.sleep" | "swh.storage.retry.RetryingProxyStorage" ".skipped_content_add.retry.sleep" | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 846 Lines • Show Last 20 Lines |
Could you duplicate this test, to check what happens if you insert [cont, cont2, cont3, cont2]?