diff --git a/swh/storage/retry.py b/swh/storage/retry.py index 01c73ff3..f2cb7df3 100644 --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -1,70 +1,74 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import psycopg2 import traceback from datetime import datetime from typing import Dict, List, Union from retrying import retry from swh.storage import get_storage, HashCollision logger = logging.getLogger(__name__) RETRY_EXCEPTIONS = [ # raised when two parallel insertions insert the same data psycopg2.IntegrityError, HashCollision, ] def should_retry_adding(error: Exception) -> bool: """Retry policy when some kind of failures occur (database integrity error, hash collision, etc...) """ retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS) if retry: error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry adding a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return retry class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). """ def __init__(self, storage): self.storage = get_storage(**storage) def __getattr__(self, key): return getattr(self.storage, key) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def content_add(self, content: List[Dict]) -> Dict: return self.storage.content_add(content) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def origin_add_one(self, origin: Dict) -> str: return self.storage.origin_add_one(origin) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def origin_visit_add(self, origin: Dict, date: Union[datetime, str], type: str) -> Dict: return self.storage.origin_visit_add(origin, date, type) + + @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + def tool_add(self, tools: List[Dict]) -> List[Dict]: + return self.storage.tool_add(tools) diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py index d3a9208a..adfdc335 100644 --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -1,225 +1,226 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob import pytest from typing import Union from pytest_postgresql import factories from pytest_postgresql.janitor import DatabaseJanitor, psycopg2, Version from os import path, environ from hypothesis import settings from typing import Dict import swh.storage from swh.core.utils import numfile_sortkey as sortkey from swh.model.tests.generate_testdata import gen_contents, gen_origins SQL_DIR = path.join(path.dirname(swh.storage.__file__), 'sql') environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = path.join(SQL_DIR, '*.sql') # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) @pytest.fixture def swh_storage(postgresql_proc, swh_storage_postgresql): storage_config = { 'cls': 'local', 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( host=postgresql_proc.host, port=postgresql_proc.port, user='postgres', dbname='tests'), 'objstorage': { 'cls': 'memory', 'args': {} }, 'journal_writer': { 'cls': 'memory', }, } storage = swh.storage.get_storage(**storage_config) return storage @pytest.fixture def swh_contents(swh_storage): contents = gen_contents(n=20) swh_storage.content_add(contents) return contents @pytest.fixture def swh_origins(swh_storage): origins = gen_origins(n=100) swh_storage.origin_add(origins) return origins # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES): @pytest.fixture def postgresql_factory(request): """ Fixture factory for PostgreSQL. :param FixtureRequest request: fixture request object :rtype: psycopg2.connection :returns: postgresql client """ config = factories.get_config(request) if not psycopg2: raise ImportError( 'No module named psycopg2. Please install it.' ) proc_fixture = request.getfixturevalue(process_fixture_name) # _, config = try_import('psycopg2', request) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user pg_options = proc_fixture.options pg_db = db_name or config['dbname'] with SwhDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, dump_files=dump_files ): connection = psycopg2.connect( dbname=pg_db, user=pg_user, host=pg_host, port=pg_port, options=pg_options ) yield connection connection.close() return postgresql_factory swh_storage_postgresql = postgresql_fact('postgresql_proc') # This version of the DatabaseJanitor implement a different setup/teardown # behavior than than the stock one: instead of dropping, creating and # initializing the database for each test, it create and initialize the db only # once, then it truncate the tables. This is needed to have acceptable test # performances. class SwhDatabaseJanitor(DatabaseJanitor): def __init__( self, user: str, host: str, port: str, db_name: str, version: Union[str, float, Version], dump_files: str = DUMP_FILES ) -> None: super().__init__(user, host, port, db_name, version) self.dump_files = sorted( glob.glob(dump_files), key=sortkey) def db_setup(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: for fname in self.dump_files: with open(fname) as fobj: sql = fobj.read().replace('concurrently', '').strip() if sql: cur.execute(sql) cnx.commit() def db_reset(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ('public',)) tables = set(table for (table,) in cur.fetchall()) for table in tables: cur.execute('truncate table %s cascade' % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ('public',)) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute('ALTER SEQUENCE %s RESTART;' % seq) cnx.commit() def init(self): with self.cursor() as cur: cur.execute( "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.db_name,)) db_exists = cur.fetchone()[0] == 1 if db_exists: cur.execute( 'UPDATE pg_database SET datallowconn=true ' 'WHERE datname = %s;', (self.db_name,)) if db_exists: self.db_reset() else: with self.cursor() as cur: cur.execute('CREATE DATABASE "{}";'.format(self.db_name)) self.db_setup() def drop(self): pid_column = 'pid' with self.cursor() as cur: cur.execute( 'UPDATE pg_database SET datallowconn=false ' 'WHERE datname = %s;', (self.db_name,)) cur.execute( 'SELECT pg_terminate_backend(pg_stat_activity.{})' 'FROM pg_stat_activity ' 'WHERE pg_stat_activity.datname = %s;'.format(pid_column), (self.db_name,)) @pytest.fixture def sample_data() -> Dict: """Pre-defined sample storage object data to manipulate Returns: Dict of data (keys: content, directory, revision, release, person, origin) """ from .storage_data import data return { 'content': [data.cont, data.cont2], 'person': [data.person], 'directory': [data.dir2], 'revision': [data.revision], 'release': [data.release, data.release2, data.release3], 'origin': [data.origin, data.origin2], + 'tool': [data.metadata_tool], } diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index 5a2632fa..ef4d16f6 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,221 +1,284 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 import pytest from swh.storage import HashCollision from swh.storage.retry import ( RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS ) def test_should_retry_adding(): """Specific exceptions should be elected for retrial """ for exc in RETRY_EXCEPTIONS: assert should_retry_adding(exc('error')) is True def test_should_retry_adding_no_retry(): """Unspecific exceptions should raise as usual """ for exc in [ValueError, Exception]: assert should_retry_adding(exc('fail!')) is False @pytest.fixture def swh_storage(): return RetryingProxyStorage(storage={'cls': 'memory'}) def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], 'skipped_content:add': 0 } def test_retrying_proxy_storage_content_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = [ # first try goes ko HashCollision('content hash collision'), # second try goes ko psycopg2.IntegrityError('content already inserted'), # ok then! {'content:add': 1} ] sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, } def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = ValueError('Refuse to add content always!') sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content with pytest.raises(ValueError, match='Refuse to add'): swh_storage.content_add([sample_content]) content = next(swh_storage.content_get([sample_content['sha1']])) assert not content def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data): """Standard content_add works as before """ sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] origin = swh_storage.origin_get(sample_origin) assert origin['url'] == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! sample_origin['url'] ] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = ValueError('Refuse to add origin always!') sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_add_one([sample_origin]) origin = swh_storage.origin_get(sample_origin) assert not origin def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): """Standard content_add works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) origin_visit = next(swh_storage.origin_visit_get(origin_url)) assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) def test_retrying_proxy_swh_storage_origin_visit_add_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin r = swh_storage.origin_visit_add(sample_origin, '2020-01-01', 'git') assert r == {'origin': origin_url, 'visit': 1} def test_retrying_proxy_swh_storage_origin_visit_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = ValueError('Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_visit_add(origin_url, '2020-01-01', 'svn') + + +def test_retrying_proxy_storage_tool_add(swh_storage, sample_data): + """Standard tool_add works as before + + """ + sample_tool = sample_data['tool'][0] + + tool = swh_storage.tool_get(sample_tool) + assert not tool + + tools = swh_storage.tool_add([sample_tool]) + assert tools + tool = tools[0] + tool.pop('id') + assert tool == sample_tool + + tool = swh_storage.tool_get(sample_tool) + tool.pop('id') + assert tool == sample_tool + + +def test_retrying_proxy_storage_tool_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + sample_tool = sample_data['tool'][0] + mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('tool hash collision'), + # second try goes ko + psycopg2.IntegrityError('tool already inserted'), + # ok then! + [sample_tool] + ] + + tool = swh_storage.tool_get(sample_tool) + assert not tool + + tools = swh_storage.tool_add([sample_tool]) + assert tools == [sample_tool] + + +def test_retrying_proxy_swh_storage_tool_add_failure( + swh_storage, sample_data, mocker): + """Other errors are raising as usual + + """ + mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') + mock_memory.side_effect = ValueError('Refuse to add tool always!') + + sample_tool = sample_data['tool'][0] + + tool = swh_storage.tool_get(sample_tool) + assert not tool + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.tool_add([sample_tool]) + + tool = swh_storage.tool_get(sample_tool) + assert not tool