diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -6,8 +6,9 @@ import logging import traceback +from collections import defaultdict from datetime import datetime -from typing import Dict, Iterable, List, Optional, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Union from tenacity import ( retry, @@ -16,6 +17,7 @@ ) from swh.model.model import ( + BaseContent, Content, SkippedContent, Directory, @@ -27,7 +29,7 @@ ) from swh.storage import get_storage -from swh.storage.exc import StorageArgumentException +from swh.storage.exc import StorageArgumentException, HashCollision logger = logging.getLogger(__name__) @@ -70,6 +72,50 @@ return False +def collision_aware_content_add( + content_add_fn: Callable[[Iterable[BaseContent]], Dict], contents: List[BaseContent] +) -> Dict[str, int]: + """Add contents to storage. If a hash collision is detected, an error is + logged. Then this adds the other non colliding contents to the storage. + + Args: + content_add_fn: Storage content callable which returns a summary of + added objects + contents: List of BaseContents to add to storage through content_add_fn + + Returns: + The summary of contents added to the storage + + """ + if not contents: + return {} + colliding_content_hashes: List[Dict[str, Any]] = [] + global_summary: Dict[str, int] = defaultdict(int) + while True: + try: + summary = content_add_fn(contents) + for key, value in summary.items(): + global_summary[key] = global_summary[key] + value + except HashCollision as e: + colliding_content_hashes.append( + { + "algo": e.algo, + "hash": e.hash_id, # hex hash id + "objects": e.colliding_contents, # hex hashes + } + ) + colliding_hashes = e.colliding_content_hashes() + # Drop the colliding contents from the transaction + contents = [c for c in contents if c.hashes() not in colliding_hashes] + else: + # Successfully added contents, we are done + break + if colliding_content_hashes: + for collision in colliding_content_hashes: + logger.error("Collision detected: %(collision)s", {"collision": collision}) + return dict(global_summary) + + swh_retry = retry( retry=should_retry_adding, wait=wait_random_exponential(multiplier=1, max=10), @@ -93,15 +139,18 @@ @swh_retry def content_add(self, content: Iterable[Content]) -> Dict: - return self.storage.content_add(content) + contents: List[BaseContent] = list(content) + return collision_aware_content_add(self.storage.content_add, contents) @swh_retry def content_add_metadata(self, content: Iterable[Content]) -> Dict: - return self.storage.content_add_metadata(content) + contents: List[BaseContent] = list(content) + return collision_aware_content_add(self.storage.content_add_metadata, contents) @swh_retry def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: - return self.storage.skipped_content_add(content) + contents: List[BaseContent] = list(content) + return collision_aware_content_add(self.storage.skipped_content_add, contents) @swh_retry def origin_add_one(self, origin: Origin) -> str: diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -220,7 +220,7 @@ from .storage_data import data return { - "content": [data.cont, data.cont2], + "content": [data.cont, data.cont2, data.cont3], "content_metadata": [data.cont3], "skipped_content": [data.skipped_cont, data.skipped_cont2], "person": [data.person], diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -3,6 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging + from typing import Dict from unittest.mock import call @@ -18,9 +20,11 @@ SkippedContent, Origin, ) +from swh.model.hashutil import hash_to_hex from swh.storage import get_storage from swh.storage.exc import HashCollision, StorageArgumentException +from swh.storage.utils import content_hex_hashes from .storage_data import date_visit1 @@ -39,6 +43,18 @@ return get_storage(**storage_config) +@pytest.fixture +def swh_pgstorage(swh_storage_backend_config): + """Pg storage with retry ability + + """ + storage_config = { + "cls": "pipeline", + "steps": [{"cls": "validate"}, {"cls": "retry"}, swh_storage_backend_config], + } + return get_storage(**storage_config) + + def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before @@ -58,17 +74,82 @@ assert content["sha1"] == sample_content["sha1"] +def test_retrying_proxy_storage_content_add_with_hash_collision( + swh_pgstorage, sample_data, caplog +): + """Insert everything but hash collision contents if any + + Note: Use the pg storage to simulate the transaction rollback (when hash collisions + is raised, no content is written to storage). + + """ + storage = swh_pgstorage + caplog.set_level(logging.ERROR, "swh.storage.retry") + + contents = sample_data["content"] + cont = contents[0] + cont2 = contents[1] + cont3 = contents[2] + + # craft a hash collision between cont and cont2 + assert cont != cont2 + cont2["sha1"] = cont["sha1"] + assert len(cont["data"]) != len(cont2["data"]) + + # same sha1 so only fetch one, which does not exist + content = next(storage.content_get([cont["sha1"]])) + assert not content + summary = storage.content_add([cont]) + # so summary is: + assert summary["content:add"] == 1 + + actual_content = next(storage.content_get([cont["sha1"]])) + assert actual_content == {"sha1": cont["sha1"], "data": cont["data"]} + + assert cont3["sha1"] != cont["sha1"] + content3 = next(storage.content_get([cont3["sha1"]])) + assert not content3 + + # cont2 will collide, cont3 will be stored + summary2 = storage.content_add([cont3, cont2]) + + content3 = next(storage.content_get([cont3["sha1"]])) + assert content3 == {"sha1": content3["sha1"], "data": cont3["data"]} + + # so summary is: + assert summary2["content:add"] == 1 + + collision = 0 + actual_collision: Dict + for record in caplog.records: + logtext = record.getMessage() + if "Collision detected:" in logtext: + collision += 1 + actual_collision = record.args["collision"] + + assert collision == 1, "1 collision should be detected" + + algo = "sha1" + assert actual_collision["algo"] == algo + expected_colliding_hash = hash_to_hex(cont2["sha1"]) + assert actual_collision["hash"] == expected_colliding_hash + + actual_colliding_hashes = actual_collision["objects"] + assert len(actual_colliding_hashes) == 1 + expected_colliding_hashes = Content.from_dict(cont2).hashes() + assert actual_colliding_hashes[0] == content_hex_hashes(expected_colliding_hashes) + + def test_retrying_proxy_storage_content_add_with_retry( swh_storage, sample_data, mocker, fake_hash_collision ): - """Multiple retries for hash collision and psycopg2 error but finally ok + """Multiple retries on psycopg2 error but finally ok """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko + # 1st & 2nd try goes ko + psycopg2.IntegrityError("content already inserted"), psycopg2.IntegrityError("content already inserted"), # ok then! {"content:add": 1}, @@ -138,16 +219,15 @@ def test_retrying_proxy_storage_content_add_metadata_with_retry( swh_storage, sample_data, mocker, fake_hash_collision ): - """Multiple retries for hash collision and psycopg2 error but finally ok + """Multiple retries on psycopg2 error but finally ok """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.content_add_metadata" ) mock_memory.side_effect = [ - # first try goes ko - fake_hash_collision, - # second try goes ko + # 1st & 2nd try goes ko + psycopg2.IntegrityError("content_metadata already inserted"), psycopg2.IntegrityError("content_metadata already inserted"), # ok then! {"content:add": 1}, @@ -217,7 +297,7 @@ def test_retrying_proxy_storage_skipped_content_add_with_retry( swh_storage, sample_data, mocker, fake_hash_collision ): - """Multiple retries for hash collision and psycopg2 error but finally ok + """Multiple retries on psycopg2 error but finally ok """ mock_memory = mocker.patch( @@ -225,7 +305,7 @@ ) mock_memory.side_effect = [ # 1st & 2nd try goes ko - fake_hash_collision, + psycopg2.IntegrityError("skipped_content already inserted"), psycopg2.IntegrityError("skipped_content already inserted"), # ok then! {"skipped_content:add": 1},