diff --git a/swh/storage/retry.py b/swh/storage/retry.py index 895340c..40dd7c7 100644 --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -1,81 +1,88 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import psycopg2 import traceback from datetime import datetime from typing import Dict, List, Union from retrying import retry from swh.storage import get_storage, HashCollision logger = logging.getLogger(__name__) RETRY_EXCEPTIONS = [ # raised when two parallel insertions insert the same data psycopg2.IntegrityError, HashCollision, ] def should_retry_adding(error: Exception) -> bool: """Retry policy when some kind of failures occur (database integrity error, hash collision, etc...) """ retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS) if retry: error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry adding a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return retry class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). """ def __init__(self, storage): self.storage = get_storage(**storage) def __getattr__(self, key): return getattr(self.storage, key) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def content_add(self, content: List[Dict]) -> Dict: return self.storage.content_add(content) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def origin_add_one(self, origin: Dict) -> str: return self.storage.origin_add_one(origin) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def origin_visit_add(self, origin: Dict, date: Union[datetime, str], type: str) -> Dict: return self.storage.origin_visit_add(origin, date, type) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def tool_add(self, tools: List[Dict]) -> List[Dict]: return self.storage.tool_add(tools) @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) def metadata_provider_add( self, provider_name: str, provider_type: str, provider_url: str, metadata: Dict) -> Union[str, int]: return self.storage.metadata_provider_add( provider_name, provider_type, provider_url, metadata) + + @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3) + def origin_metadata_add( + self, origin_url: str, ts: Union[str, datetime], + provider_id: int, tool_id: int, metadata: Dict) -> None: + return self.storage.origin_metadata_add( + origin_url, ts, provider_id, tool_id, metadata) diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py index f2fa096..dd2dffc 100644 --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -1,227 +1,228 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob import pytest from typing import Union from pytest_postgresql import factories from pytest_postgresql.janitor import DatabaseJanitor, psycopg2, Version from os import path, environ from hypothesis import settings from typing import Dict import swh.storage from swh.core.utils import numfile_sortkey as sortkey from swh.model.tests.generate_testdata import gen_contents, gen_origins SQL_DIR = path.join(path.dirname(swh.storage.__file__), 'sql') environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = path.join(SQL_DIR, '*.sql') # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) @pytest.fixture def swh_storage(postgresql_proc, swh_storage_postgresql): storage_config = { 'cls': 'local', 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( host=postgresql_proc.host, port=postgresql_proc.port, user='postgres', dbname='tests'), 'objstorage': { 'cls': 'memory', 'args': {} }, 'journal_writer': { 'cls': 'memory', }, } storage = swh.storage.get_storage(**storage_config) return storage @pytest.fixture def swh_contents(swh_storage): contents = gen_contents(n=20) swh_storage.content_add(contents) return contents @pytest.fixture def swh_origins(swh_storage): origins = gen_origins(n=100) swh_storage.origin_add(origins) return origins # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES): @pytest.fixture def postgresql_factory(request): """ Fixture factory for PostgreSQL. :param FixtureRequest request: fixture request object :rtype: psycopg2.connection :returns: postgresql client """ config = factories.get_config(request) if not psycopg2: raise ImportError( 'No module named psycopg2. Please install it.' ) proc_fixture = request.getfixturevalue(process_fixture_name) # _, config = try_import('psycopg2', request) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user pg_options = proc_fixture.options pg_db = db_name or config['dbname'] with SwhDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, dump_files=dump_files ): connection = psycopg2.connect( dbname=pg_db, user=pg_user, host=pg_host, port=pg_port, options=pg_options ) yield connection connection.close() return postgresql_factory swh_storage_postgresql = postgresql_fact('postgresql_proc') # This version of the DatabaseJanitor implement a different setup/teardown # behavior than than the stock one: instead of dropping, creating and # initializing the database for each test, it create and initialize the db only # once, then it truncate the tables. This is needed to have acceptable test # performances. class SwhDatabaseJanitor(DatabaseJanitor): def __init__( self, user: str, host: str, port: str, db_name: str, version: Union[str, float, Version], dump_files: str = DUMP_FILES ) -> None: super().__init__(user, host, port, db_name, version) self.dump_files = sorted( glob.glob(dump_files), key=sortkey) def db_setup(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: for fname in self.dump_files: with open(fname) as fobj: sql = fobj.read().replace('concurrently', '').strip() if sql: cur.execute(sql) cnx.commit() def db_reset(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ('public',)) tables = set(table for (table,) in cur.fetchall()) for table in tables: cur.execute('truncate table %s cascade' % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ('public',)) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute('ALTER SEQUENCE %s RESTART;' % seq) cnx.commit() def init(self): with self.cursor() as cur: cur.execute( "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.db_name,)) db_exists = cur.fetchone()[0] == 1 if db_exists: cur.execute( 'UPDATE pg_database SET datallowconn=true ' 'WHERE datname = %s;', (self.db_name,)) if db_exists: self.db_reset() else: with self.cursor() as cur: cur.execute('CREATE DATABASE "{}";'.format(self.db_name)) self.db_setup() def drop(self): pid_column = 'pid' with self.cursor() as cur: cur.execute( 'UPDATE pg_database SET datallowconn=false ' 'WHERE datname = %s;', (self.db_name,)) cur.execute( 'SELECT pg_terminate_backend(pg_stat_activity.{})' 'FROM pg_stat_activity ' 'WHERE pg_stat_activity.datname = %s;'.format(pid_column), (self.db_name,)) @pytest.fixture def sample_data() -> Dict: """Pre-defined sample storage object data to manipulate Returns: Dict of data (keys: content, directory, revision, release, person, origin) """ from .storage_data import data return { 'content': [data.cont, data.cont2], 'person': [data.person], 'directory': [data.dir2], 'revision': [data.revision], 'release': [data.release, data.release2, data.release3], 'origin': [data.origin, data.origin2], 'tool': [data.metadata_tool], 'provider': [data.provider], + 'origin_metadata': [data.origin_metadata, data.origin_metadata2], } diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index 55363de..db4d1c1 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,367 +1,442 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 import pytest +from typing import Dict + +from unittest.mock import call + from swh.storage import HashCollision from swh.storage.retry import ( RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS ) def test_should_retry_adding(): """Specific exceptions should be elected for retrial """ for exc in RETRY_EXCEPTIONS: assert should_retry_adding(exc('error')) is True def test_should_retry_adding_no_retry(): """Unspecific exceptions should raise as usual """ for exc in [ValueError, Exception]: assert should_retry_adding(exc('fail!')) is False @pytest.fixture def swh_storage(): return RetryingProxyStorage(storage={'cls': 'memory'}) def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], 'skipped_content:add': 0 } def test_retrying_proxy_storage_content_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = [ # first try goes ko HashCollision('content hash collision'), # second try goes ko psycopg2.IntegrityError('content already inserted'), # ok then! {'content:add': 1} ] sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, } def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = ValueError('Refuse to add content always!') sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content with pytest.raises(ValueError, match='Refuse to add'): swh_storage.content_add([sample_content]) content = next(swh_storage.content_get([sample_content['sha1']])) assert not content def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data): """Standard content_add works as before """ sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] origin = swh_storage.origin_get(sample_origin) assert origin['url'] == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! sample_origin['url'] ] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = ValueError('Refuse to add origin always!') sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_add_one([sample_origin]) origin = swh_storage.origin_get(sample_origin) assert not origin def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): """Standard content_add works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) origin_visit = next(swh_storage.origin_visit_get(origin_url)) assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) def test_retrying_proxy_swh_storage_origin_visit_add_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin r = swh_storage.origin_visit_add(sample_origin, '2020-01-01', 'git') assert r == {'origin': origin_url, 'visit': 1} def test_retrying_proxy_swh_storage_origin_visit_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = ValueError('Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_visit_add(origin_url, '2020-01-01', 'svn') def test_retrying_proxy_storage_tool_add(swh_storage, sample_data): """Standard tool_add works as before """ sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools tool = tools[0] tool.pop('id') assert tool == sample_tool tool = swh_storage.tool_get(sample_tool) tool.pop('id') assert tool == sample_tool def test_retrying_proxy_storage_tool_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_tool = sample_data['tool'][0] mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') mock_memory.side_effect = [ # first try goes ko HashCollision('tool hash collision'), # second try goes ko psycopg2.IntegrityError('tool already inserted'), # ok then! [sample_tool] ] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools == [sample_tool] def test_retrying_proxy_swh_storage_tool_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') mock_memory.side_effect = ValueError('Refuse to add tool always!') sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool with pytest.raises(ValueError, match='Refuse to add'): swh_storage.tool_add([sample_tool]) tool = swh_storage.tool_get(sample_tool) assert not tool +def to_provider(provider: Dict) -> Dict: + return { + 'provider_name': provider['name'], + 'provider_url': provider['url'], + 'provider_type': provider['type'], + 'metadata': provider['metadata'], + } + + def test_retrying_proxy_storage_metadata_provider_add( swh_storage, sample_data): """Standard metadata_provider_add works as before """ provider = sample_data['provider'][0] - provider_get = { - 'provider_name': provider['name'], - 'provider_url': provider['url'], - 'provider_type': provider['type'], - 'metadata': provider['metadata'], - } + provider_get = to_provider(provider) provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id actual_provider = swh_storage.metadata_provider_get(provider_id) assert actual_provider actual_provider_id = actual_provider.pop('id') assert actual_provider_id == provider_id assert actual_provider == provider_get def test_retrying_proxy_storage_metadata_provider_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ provider = sample_data['provider'][0] - provider_get = { - 'provider_name': provider['name'], - 'provider_url': provider['url'], - 'provider_type': provider['type'], - 'metadata': provider['metadata'], - } + provider_get = to_provider(provider) + mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.metadata_provider_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! 'provider_id', ] provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id == 'provider_id' def test_retrying_proxy_swh_storage_metadata_provider_add_failure( swh_storage, sample_data, mocker): """Other errors are raising as usual """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.metadata_provider_add') mock_memory.side_effect = ValueError('Refuse to add provider_id always!') provider = sample_data['provider'][0] - provider_get = { - 'provider_name': provider['name'], - 'provider_url': provider['url'], - 'provider_type': provider['type'], - 'metadata': provider['metadata'], - } + provider_get = to_provider(provider) provider_id = swh_storage.metadata_provider_get_by(provider_get) assert not provider_id with pytest.raises(ValueError, match='Refuse to add'): swh_storage.metadata_provider_add(**provider_get) - provider_id = swh_storage.metadata_provider_get_by(provider_get) - assert not provider_id + +def test_retrying_proxy_storage_origin_metadata_add( + swh_storage, sample_data): + """Standard origin_metadata_add works as before + + """ + ori_meta = sample_data['origin_metadata'][0] + origin = ori_meta['origin'] + swh_storage.origin_add_one(origin) + provider_get = to_provider(ori_meta['provider']) + provider_id = swh_storage.metadata_provider_add(**provider_get) + + origin_metadata = swh_storage.origin_metadata_get_by(origin['url']) + assert not origin_metadata + + swh_storage.origin_metadata_add( + origin['url'], ori_meta['discovery_date'], + provider_id, ori_meta['tool'], ori_meta['metadata']) + + origin_metadata = swh_storage.origin_metadata_get_by( + origin['url']) + assert origin_metadata + + +def test_retrying_proxy_storage_origin_metadata_add_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + ori_meta = sample_data['origin_metadata'][0] + origin = ori_meta['origin'] + swh_storage.origin_add_one(origin) + provider_get = to_provider(ori_meta['provider']) + provider_id = swh_storage.metadata_provider_add(**provider_get) + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_metadata_add') + + mock_memory.side_effect = [ + # first try goes ko + HashCollision('provider_id hash collision'), + # second try goes ko + psycopg2.IntegrityError('provider_id already inserted'), + # ok then! + None + ] + + url = origin['url'] + ts = ori_meta['discovery_date'] + tool_id = ori_meta['tool'] + metadata = ori_meta['metadata'] + + # No exception raised as insertion finally came through + swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) + + mock_memory.assert_has_calls([ # 3 calls, as long as error raised + call(url, ts, provider_id, tool_id, metadata), + call(url, ts, provider_id, tool_id, metadata), + call(url, ts, provider_id, tool_id, metadata) + ]) + + +def test_retrying_proxy_swh_storage_origin_metadata_add_failure( + swh_storage, sample_data, mocker): + """Other errors are raising as usual + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.origin_metadata_add') + mock_memory.side_effect = ValueError('Refuse to add always!') + + ori_meta = sample_data['origin_metadata'][0] + origin = ori_meta['origin'] + swh_storage.origin_add_one(origin) + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.origin_metadata_add( + origin['url'], ori_meta['discovery_date'], + 'provider_id', ori_meta['tool'], ori_meta['metadata'])