diff --git a/swh/storage/retry.py b/swh/storage/retry.py index 67158cd..d68868e 100644 --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -1,119 +1,123 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import psycopg2 import traceback from datetime import datetime from typing import Dict, List, Optional, Union from requests.exceptions import ConnectionError from tenacity import ( retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type ) from swh.storage import get_storage, HashCollision logger = logging.getLogger(__name__) RETRY_EXCEPTIONS = [ # raised when two parallel insertions insert the same data psycopg2.IntegrityError, HashCollision, # when the server is restarting ConnectionError, ] def should_retry_adding(error: Exception) -> bool: """Retry if the error/exception if one of the RETRY_EXCEPTIONS type. """ for exc in RETRY_EXCEPTIONS: if retry_if_exception_type(exc)(error): error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry adding a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exc(), }) return True return False swh_retry = retry(retry=should_retry_adding, wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(3)) class RetryingProxyStorage: """Storage implementation which retries adding objects when it specifically fails (hash collision, integrity error). """ def __init__(self, storage): self.storage = get_storage(**storage) def __getattr__(self, key): return getattr(self.storage, key) @swh_retry def content_add(self, content: List[Dict]) -> Dict: return self.storage.content_add(content) + @swh_retry + def content_add_metadata(self, content: List[Dict]) -> Dict: + return self.storage.content_add_metadata(content) + @swh_retry def origin_add_one(self, origin: Dict) -> str: return self.storage.origin_add_one(origin) @swh_retry def origin_visit_add(self, origin: Dict, date: Union[datetime, str], type: str) -> Dict: return self.storage.origin_visit_add(origin, date, type) @swh_retry def origin_visit_update( self, origin: str, visit_id: int, status: Optional[str] = None, metadata: Optional[Dict] = None, snapshot: Optional[Dict] = None) -> Dict: return self.storage.origin_visit_update( origin, visit_id, status=status, metadata=metadata, snapshot=snapshot) @swh_retry def tool_add(self, tools: List[Dict]) -> List[Dict]: return self.storage.tool_add(tools) @swh_retry def metadata_provider_add( self, provider_name: str, provider_type: str, provider_url: str, metadata: Dict) -> Union[str, int]: return self.storage.metadata_provider_add( provider_name, provider_type, provider_url, metadata) @swh_retry def origin_metadata_add( self, origin_url: str, ts: Union[str, datetime], provider_id: int, tool_id: int, metadata: Dict) -> None: return self.storage.origin_metadata_add( origin_url, ts, provider_id, tool_id, metadata) @swh_retry def directory_add(self, directories: List[Dict]) -> Dict: return self.storage.directory_add(directories) @swh_retry def revision_add(self, revisions: List[Dict]) -> Dict: return self.storage.revision_add(revisions) @swh_retry def release_add(self, releases: List[Dict]) -> Dict: return self.storage.release_add(releases) @swh_retry def snapshot_add(self, snapshot: List[Dict]) -> Dict: return self.storage.snapshot_add(snapshot) diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py index 8bb4cc5..a364115 100644 --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -1,229 +1,230 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob import pytest from typing import Union from pytest_postgresql import factories from pytest_postgresql.janitor import DatabaseJanitor, psycopg2, Version from os import path, environ from hypothesis import settings from typing import Dict import swh.storage from swh.core.utils import numfile_sortkey as sortkey from swh.model.tests.generate_testdata import gen_contents, gen_origins SQL_DIR = path.join(path.dirname(swh.storage.__file__), 'sql') environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = path.join(SQL_DIR, '*.sql') # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) @pytest.fixture def swh_storage(postgresql_proc, swh_storage_postgresql): storage_config = { 'cls': 'local', 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( host=postgresql_proc.host, port=postgresql_proc.port, user='postgres', dbname='tests'), 'objstorage': { 'cls': 'memory', 'args': {} }, 'journal_writer': { 'cls': 'memory', }, } storage = swh.storage.get_storage(**storage_config) return storage @pytest.fixture def swh_contents(swh_storage): contents = gen_contents(n=20) swh_storage.content_add(contents) return contents @pytest.fixture def swh_origins(swh_storage): origins = gen_origins(n=100) swh_storage.origin_add(origins) return origins # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES): @pytest.fixture def postgresql_factory(request): """ Fixture factory for PostgreSQL. :param FixtureRequest request: fixture request object :rtype: psycopg2.connection :returns: postgresql client """ config = factories.get_config(request) if not psycopg2: raise ImportError( 'No module named psycopg2. Please install it.' ) proc_fixture = request.getfixturevalue(process_fixture_name) # _, config = try_import('psycopg2', request) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user pg_options = proc_fixture.options pg_db = db_name or config['dbname'] with SwhDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, dump_files=dump_files ): connection = psycopg2.connect( dbname=pg_db, user=pg_user, host=pg_host, port=pg_port, options=pg_options ) yield connection connection.close() return postgresql_factory swh_storage_postgresql = postgresql_fact('postgresql_proc') # This version of the DatabaseJanitor implement a different setup/teardown # behavior than than the stock one: instead of dropping, creating and # initializing the database for each test, it create and initialize the db only # once, then it truncate the tables. This is needed to have acceptable test # performances. class SwhDatabaseJanitor(DatabaseJanitor): def __init__( self, user: str, host: str, port: str, db_name: str, version: Union[str, float, Version], dump_files: str = DUMP_FILES ) -> None: super().__init__(user, host, port, db_name, version) self.dump_files = sorted( glob.glob(dump_files), key=sortkey) def db_setup(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: for fname in self.dump_files: with open(fname) as fobj: sql = fobj.read().replace('concurrently', '').strip() if sql: cur.execute(sql) cnx.commit() def db_reset(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ('public',)) tables = set(table for (table,) in cur.fetchall()) for table in tables: cur.execute('truncate table %s cascade' % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ('public',)) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute('ALTER SEQUENCE %s RESTART;' % seq) cnx.commit() def init(self): with self.cursor() as cur: cur.execute( "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.db_name,)) db_exists = cur.fetchone()[0] == 1 if db_exists: cur.execute( 'UPDATE pg_database SET datallowconn=true ' 'WHERE datname = %s;', (self.db_name,)) if db_exists: self.db_reset() else: with self.cursor() as cur: cur.execute('CREATE DATABASE "{}";'.format(self.db_name)) self.db_setup() def drop(self): pid_column = 'pid' with self.cursor() as cur: cur.execute( 'UPDATE pg_database SET datallowconn=false ' 'WHERE datname = %s;', (self.db_name,)) cur.execute( 'SELECT pg_terminate_backend(pg_stat_activity.{})' 'FROM pg_stat_activity ' 'WHERE pg_stat_activity.datname = %s;'.format(pid_column), (self.db_name,)) @pytest.fixture def sample_data() -> Dict: """Pre-defined sample storage object data to manipulate Returns: Dict of data (keys: content, directory, revision, release, person, origin) """ from .storage_data import data return { 'content': [data.cont, data.cont2], + 'content_metadata': [data.cont3], 'person': [data.person], 'directory': [data.dir2, data.dir], 'revision': [data.revision], 'release': [data.release, data.release2, data.release3], 'snapshot': [data.snapshot], 'origin': [data.origin, data.origin2], 'tool': [data.metadata_tool], 'provider': [data.provider], 'origin_metadata': [data.origin_metadata, data.origin_metadata2], } diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py index 76a051c..906abf7 100644 --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -1,525 +1,526 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.model.hashutil import hash_to_bytes from swh.model import from_disk class StorageData: def __getattr__(self, key): try: v = globals()[key] except KeyError as e: raise AttributeError(e.args[0]) if hasattr(v, 'copy'): return v.copy() return v data = StorageData() cont = { 'data': b'42\n', 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': hash_to_bytes( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'sha256': hash_to_bytes( '673650f936cb3b0a2f93ce09d81be10748b1b203c19e8176b4eefc1964a0cf3a'), 'blake2s256': hash_to_bytes( 'd5fe1939576527e42cfd76a9455a2432fe7f56669564577dd93c4280e76d661d'), 'status': 'visible', } cont2 = { 'data': b'4242\n', 'length': 5, 'sha1': hash_to_bytes( '61c2b3a30496d329e21af70dd2d7e097046d07b7'), 'sha1_git': hash_to_bytes( '36fade77193cb6d2bd826161a0979d64c28ab4fa'), 'sha256': hash_to_bytes( '859f0b154fdb2d630f45e1ecae4a862915435e663248bb8461d914696fc047cd'), 'blake2s256': hash_to_bytes( '849c20fad132b7c2d62c15de310adfe87be94a379941bed295e8141c6219810d'), 'status': 'visible', } cont3 = { 'data': b'424242\n', 'length': 7, 'sha1': hash_to_bytes( '3e21cc4942a4234c9e5edd8a9cacd1670fe59f13'), 'sha1_git': hash_to_bytes( 'c932c7649c6dfa4b82327d121215116909eb3bea'), 'sha256': hash_to_bytes( '92fb72daf8c6818288a35137b72155f507e5de8d892712ab96277aaed8cf8a36'), 'blake2s256': hash_to_bytes( '76d0346f44e5a27f6bafdd9c2befd304aff83780f93121d801ab6a1d4769db11'), 'status': 'visible', + 'ctime': '2019-12-01', } contents = (cont, cont2, cont3) missing_cont = { 'data': b'missing\n', 'length': 8, 'sha1': hash_to_bytes( 'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'), 'sha1_git': hash_to_bytes( '33e45d56f88993aae6a0198013efa80716fd8919'), 'sha256': hash_to_bytes( '6bbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes( '306856b8fd879edb7b6f1aeaaf8db9bbecc993cd7f776c333ac3a782fa5c6eba'), 'status': 'absent', } skipped_cont = { 'length': 1024 * 1024 * 200, 'sha1_git': hash_to_bytes( '33e45d56f88993aae6a0198013efa80716fd8920'), 'sha1': hash_to_bytes( '43e45d56f88993aae6a0198013efa80716fd8920'), 'sha256': hash_to_bytes( '7bbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes( 'ade18b1adecb33f891ca36664da676e12c772cc193778aac9a137b8dc5834b9b'), 'reason': 'Content too long', 'status': 'absent', 'origin': 'file:///dev/zero', } skipped_cont2 = { 'length': 1024 * 1024 * 300, 'sha1_git': hash_to_bytes( '44e45d56f88993aae6a0198013efa80716fd8921'), 'sha1': hash_to_bytes( '54e45d56f88993aae6a0198013efa80716fd8920'), 'sha256': hash_to_bytes( '8cbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes( '9ce18b1adecb33f891ca36664da676e12c772cc193778aac9a137b8dc5834b9b'), 'reason': 'Content too long', 'status': 'absent', } dir = { 'id': hash_to_bytes( '340133423253310030f531e632a733ff37c3a930'), 'entries': [ { 'name': b'foo', 'type': 'file', 'target': hash_to_bytes( # cont 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'perms': from_disk.DentryPerms.content, }, { 'name': b'bar\xc3', 'type': 'dir', 'target': b'12345678901234567890', 'perms': from_disk.DentryPerms.directory, }, ], } dir2 = { 'id': hash_to_bytes( '340133423253310030f531e632a733ff37c3a935'), 'entries': [ { 'name': b'oof', 'type': 'file', 'target': hash_to_bytes( # cont2 '36fade77193cb6d2bd826161a0979d64c28ab4fa'), 'perms': from_disk.DentryPerms.content, } ], } dir3 = { 'id': hash_to_bytes('33e45d56f88993aae6a0198013efa80716fd8921'), 'entries': [ { 'name': b'foo', 'type': 'file', 'target': hash_to_bytes( # cont 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'perms': from_disk.DentryPerms.content, }, { 'name': b'subdir', 'type': 'dir', 'target': hash_to_bytes( # dir '340133423253310030f531e632a733ff37c3a930'), 'perms': from_disk.DentryPerms.directory, }, { 'name': b'hello', 'type': 'file', 'target': b'12345678901234567890', 'perms': from_disk.DentryPerms.content, }, ], } dir4 = { 'id': hash_to_bytes('33e45d56f88993aae6a0198013efa80716fd8922'), 'entries': [ { 'name': b'subdir1', 'type': 'dir', 'target': hash_to_bytes( '33e45d56f88993aae6a0198013efa80716fd8921'), # dir3 'perms': from_disk.DentryPerms.directory, }, ] } dierctories = (dir, dir2, dir3, dir4) minus_offset = datetime.timezone(datetime.timedelta(minutes=-120)) plus_offset = datetime.timezone(datetime.timedelta(minutes=120)) revision = { 'id': b'56789012345678901234', 'message': b'hello', 'author': { 'name': b'Nicolas Dandrimont', 'email': b'nicolas@example.com', 'fullname': b'Nicolas Dandrimont ', }, 'date': { 'timestamp': 1234567890, 'offset': 120, 'negative_utc': None, }, 'committer': { 'name': b'St\xc3fano Zacchiroli', 'email': b'stefano@example.com', 'fullname': b'St\xc3fano Zacchiroli ' }, 'committer_date': { 'timestamp': 1123456789, 'offset': 0, 'negative_utc': True, }, 'parents': [b'01234567890123456789', b'23434512345123456789'], 'type': 'git', 'directory': hash_to_bytes( # dir '340133423253310030f531e632a733ff37c3a930'), 'metadata': { 'checksums': { 'sha1': 'tarball-sha1', 'sha256': 'tarball-sha256', }, 'signed-off-by': 'some-dude', 'extra_headers': [ ['gpgsig', b'test123'], ['mergetags', [b'foo\\bar', b'\x22\xaf\x89\x80\x01\x00']], ], }, 'synthetic': True } revision2 = { 'id': b'87659012345678904321', 'message': b'hello again', 'author': { 'name': b'Roberto Dicosmo', 'email': b'roberto@example.com', 'fullname': b'Roberto Dicosmo ', }, 'date': { 'timestamp': { 'seconds': 1234567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'committer_date': { 'timestamp': 1123456789, 'offset': 0, 'negative_utc': False, }, 'parents': [b'01234567890123456789'], 'type': 'git', 'directory': hash_to_bytes( # dir2 '340133423253310030f531e632a733ff37c3a935'), 'metadata': None, 'synthetic': False } revision3 = { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'a simple revision with no parents this time', 'author': { 'name': b'Roberto Dicosmo', 'email': b'roberto@example.com', 'fullname': b'Roberto Dicosmo ', }, 'date': { 'timestamp': { 'seconds': 1234567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'committer_date': { 'timestamp': 1127351742, 'offset': 0, 'negative_utc': False, }, 'parents': [], 'type': 'git', 'directory': hash_to_bytes( # dir2 '340133423253310030f531e632a733ff37c3a935'), 'metadata': None, 'synthetic': True } revision4 = { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'parent of self.revision2', 'author': { 'name': b'me', 'email': b'me@soft.heri', 'fullname': b'me ', }, 'date': { 'timestamp': { 'seconds': 1244567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'committer-dude', 'email': b'committer@dude.com', 'fullname': b'committer-dude ', }, 'committer_date': { 'timestamp': { 'seconds': 1244567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'parents': [hash_to_bytes( # revision3 '7026b7c1a2af56521e951c01ed20f255fa054238')], 'type': 'git', 'directory': hash_to_bytes( # dir '340133423253310030f531e632a733ff37c3a930'), 'metadata': None, 'synthetic': False } revisions = (revision, revision2, revision3, revision4) origin = { 'url': 'file:///dev/null', } origin2 = { 'url': 'file:///dev/zero', } origins = (origin, origin2) provider = { 'name': 'hal', 'type': 'deposit-client', 'url': 'http:///hal/inria', 'metadata': { 'location': 'France' } } metadata_tool = { 'name': 'swh-deposit', 'version': '0.0.1', 'configuration': { 'sword_version': '2' } } date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) type_visit1 = 'git' date_visit2 = datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) type_visit2 = 'hg' date_visit3 = datetime.datetime(2018, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) type_visit3 = 'deb' release = { 'id': b'87659012345678901234', 'name': b'v0.0.1', 'author': { 'name': b'olasd', 'email': b'nic@olasd.fr', 'fullname': b'olasd ', }, 'date': { 'timestamp': 1234567890, 'offset': 42, 'negative_utc': None, }, 'target': b'43210987654321098765', 'target_type': 'revision', 'message': b'synthetic release', 'synthetic': True, } release2 = { 'id': b'56789012348765901234', 'name': b'v0.0.2', 'author': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'date': { 'timestamp': 1634366813, 'offset': -120, 'negative_utc': None, }, 'target': b'432109\xa9765432\xc309\x00765', 'target_type': 'revision', 'message': b'v0.0.2\nMisc performance improvements + bug fixes', 'synthetic': False } release3 = { 'id': b'87659012345678904321', 'name': b'v0.0.2', 'author': { 'name': b'tony', 'email': b'tony@ardumont.fr', 'fullname': b'tony ', }, 'date': { 'timestamp': 1634336813, 'offset': 0, 'negative_utc': False, }, 'target': b'87659012345678904321', # revision2 'target_type': 'revision', 'message': b'yet another synthetic release', 'synthetic': True, } releases = (release, release2, release3) snapshot = { 'id': hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'), 'branches': { b'master': { 'target': b'56789012345678901234', # revision 'target_type': 'revision', }, }, } empty_snapshot = { 'id': hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'), 'branches': {}, } complete_snapshot = { 'id': hash_to_bytes('6e65b86363953b780d92b0a928f3e8fcdd10db36'), 'branches': { b'directory': { 'target': hash_to_bytes( '1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8'), 'target_type': 'directory', }, b'directory2': { 'target': hash_to_bytes( '1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8'), 'target_type': 'directory', }, b'content': { 'target': hash_to_bytes( 'fe95a46679d128ff167b7c55df5d02356c5a1ae1'), 'target_type': 'content', }, b'alias': { 'target': b'revision', 'target_type': 'alias', }, b'revision': { 'target': hash_to_bytes( 'aafb16d69fd30ff58afdd69036a26047f3aebdc6'), 'target_type': 'revision', }, b'release': { 'target': hash_to_bytes( '7045404f3d1c54e6473c71bbb716529fbad4be24'), 'target_type': 'release', }, b'snapshot': { 'target': hash_to_bytes( '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'), 'target_type': 'snapshot', }, b'dangling': None, } } origin_metadata = { 'origin': origin, 'discovery_date': datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc), 'provider': provider, 'tool': 'swh-deposit', 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' } } origin_metadata2 = { 'origin': origin, 'discovery_date': datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc), 'provider': provider, 'tool': 'swh-deposit', 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' } } person = { 'name': b'John Doe', 'email': b'john.doe@institute.org', 'fullname': b'John Doe ' } diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index 343d54f..6843ab8 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,815 +1,885 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 import pytest from typing import Dict from unittest.mock import call from swh.storage import HashCollision from swh.storage.retry import RetryingProxyStorage @pytest.fixture def swh_storage(): return RetryingProxyStorage(storage={'cls': 'memory'}) def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], 'skipped_content:add': 0 } content = next(swh_storage.content_get([sample_content['sha1']])) assert content['sha1'] == sample_content['sha1'] def test_retrying_proxy_storage_content_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = [ # first try goes ko HashCollision('content hash collision'), # second try goes ko psycopg2.IntegrityError('content already inserted'), # ok then! {'content:add': 1} ] sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content s = swh_storage.content_add([sample_content]) assert s == {'content:add': 1} assert mock_memory.has_calls([ call([sample_content]), call([sample_content]), call([sample_content]), ]) def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add') mock_memory.side_effect = ValueError('Refuse to add content always!') sample_content = sample_data['content'][0] content = next(swh_storage.content_get([sample_content['sha1']])) assert not content with pytest.raises(ValueError, match='Refuse to add'): swh_storage.content_add([sample_content]) assert mock_memory.call_count == 1 +def test_retrying_proxy_storage_content_add_metadata(swh_storage, sample_data): + """Standard content_add_metadata works as before + + """ + sample_content = sample_data['content_metadata'][0] + + pk = sample_content['sha1'] + content_metadata = swh_storage.content_get_metadata([pk]) + assert not content_metadata[pk] + + s = swh_storage.content_add_metadata([sample_content]) + assert s == { + 'content:add': 1, + 'skipped_content:add': 0 + } + + content_metadata = swh_storage.content_get_metadata([pk]) + assert len(content_metadata[pk]) == 1 + assert content_metadata[pk][0]['sha1'] == pk + + +def test_retrying_proxy_storage_content_add_metadata_with_retry( + swh_storage, sample_data, mocker): + """Multiple retries for hash collision and psycopg2 error but finally ok + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.content_add_metadata') + mock_memory.side_effect = [ + # first try goes ko + HashCollision('content_metadata hash collision'), + # second try goes ko + psycopg2.IntegrityError('content_metadata already inserted'), + # ok then! + {'content:add': 1} + ] + + sample_content = sample_data['content_metadata'][0] + + s = swh_storage.content_add_metadata([sample_content]) + assert s == {'content:add': 1} + + assert mock_memory.has_calls([ + call([sample_content]), + call([sample_content]), + call([sample_content]), + ]) + + +def test_retrying_proxy_swh_storage_content_add_metadata_failure( + swh_storage, sample_data, mocker): + """Unfiltered errors are raising without retry + + """ + mock_memory = mocker.patch( + 'swh.storage.in_memory.Storage.content_add_metadata') + mock_memory.side_effect = ValueError('Refuse to add content_metadata!') + + sample_content = sample_data['content_metadata'][0] + pk = sample_content['sha1'] + + content_metadata = swh_storage.content_get_metadata([pk]) + assert not content_metadata[pk] + + with pytest.raises(ValueError, match='Refuse to add'): + swh_storage.content_add_metadata([sample_content]) + + assert mock_memory.call_count == 1 + + def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data): """Standard origin_add_one works as before """ sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] origin = swh_storage.origin_get(sample_origin) assert origin['url'] == sample_origin['url'] def test_retrying_proxy_swh_storage_origin_add_one_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! sample_origin['url'] ] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin['url'] assert mock_memory.has_calls([ call([sample_origin]), call([sample_origin]), call([sample_origin]), ]) def test_retrying_proxy_swh_storage_origin_add_one_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one') mock_memory.side_effect = ValueError('Refuse to add origin always!') sample_origin = sample_data['origin'][0] origin = swh_storage.origin_get(sample_origin) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_add_one([sample_origin]) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): """Standard origin_visit_add works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) origin_visit = next(swh_storage.origin_visit_get(origin_url)) assert origin_visit['origin'] == origin_url assert isinstance(origin_visit['visit'], int) def test_retrying_proxy_swh_storage_origin_visit_add_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin r = swh_storage.origin_visit_add(sample_origin, '2020-01-01', 'git') assert r == {'origin': origin_url, 'visit': 1} assert mock_memory.has_calls([ call(sample_origin, '2020-01-01', 'git'), call(sample_origin, '2020-01-01', 'git'), call(sample_origin, '2020-01-01', 'git') ]) def test_retrying_proxy_swh_storage_origin_visit_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_add') mock_memory.side_effect = ValueError('Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_visit_add(origin_url, '2020-01-31', 'svn') assert mock_memory.has_calls([ call(origin_url, '2020-01-31', 'svn'), ]) def test_retrying_proxy_storage_tool_add(swh_storage, sample_data): """Standard tool_add works as before """ sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools tool = tools[0] tool.pop('id') assert tool == sample_tool tool = swh_storage.tool_get(sample_tool) tool.pop('id') assert tool == sample_tool def test_retrying_proxy_storage_tool_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_tool = sample_data['tool'][0] mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') mock_memory.side_effect = [ # first try goes ko HashCollision('tool hash collision'), # second try goes ko psycopg2.IntegrityError('tool already inserted'), # ok then! [sample_tool] ] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools == [sample_tool] assert mock_memory.has_calls([ call([sample_tool]), call([sample_tool]), call([sample_tool]), ]) def test_retrying_proxy_swh_storage_tool_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add') mock_memory.side_effect = ValueError('Refuse to add tool always!') sample_tool = sample_data['tool'][0] tool = swh_storage.tool_get(sample_tool) assert not tool with pytest.raises(ValueError, match='Refuse to add'): swh_storage.tool_add([sample_tool]) assert mock_memory.call_count == 1 def to_provider(provider: Dict) -> Dict: return { 'provider_name': provider['name'], 'provider_url': provider['url'], 'provider_type': provider['type'], 'metadata': provider['metadata'], } def test_retrying_proxy_storage_metadata_provider_add( swh_storage, sample_data): """Standard metadata_provider_add works as before """ provider = sample_data['provider'][0] provider_get = to_provider(provider) provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id actual_provider = swh_storage.metadata_provider_get(provider_id) assert actual_provider actual_provider_id = actual_provider.pop('id') assert actual_provider_id == provider_id assert actual_provider == provider_get def test_retrying_proxy_storage_metadata_provider_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ provider = sample_data['provider'][0] provider_get = to_provider(provider) mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.metadata_provider_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! 'provider_id', ] provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id == 'provider_id' assert mock_memory.has_calls([ call(**provider_get), call(**provider_get), call(**provider_get), ]) def test_retrying_proxy_swh_storage_metadata_provider_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.metadata_provider_add') mock_memory.side_effect = ValueError('Refuse to add provider_id always!') provider = sample_data['provider'][0] provider_get = to_provider(provider) provider_id = swh_storage.metadata_provider_get_by(provider_get) assert not provider_id with pytest.raises(ValueError, match='Refuse to add'): swh_storage.metadata_provider_add(**provider_get) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_origin_metadata_add( swh_storage, sample_data): """Standard origin_metadata_add works as before """ ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta['provider']) provider_id = swh_storage.metadata_provider_add(**provider_get) origin_metadata = swh_storage.origin_metadata_get_by(origin['url']) assert not origin_metadata swh_storage.origin_metadata_add( origin['url'], ori_meta['discovery_date'], provider_id, ori_meta['tool'], ori_meta['metadata']) origin_metadata = swh_storage.origin_metadata_get_by( origin['url']) assert origin_metadata def test_retrying_proxy_storage_origin_metadata_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta['provider']) provider_id = swh_storage.metadata_provider_add(**provider_get) mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_metadata_add') mock_memory.side_effect = [ # first try goes ko HashCollision('provider_id hash collision'), # second try goes ko psycopg2.IntegrityError('provider_id already inserted'), # ok then! None ] url = origin['url'] ts = ori_meta['discovery_date'] tool_id = ori_meta['tool'] metadata = ori_meta['metadata'] # No exception raised as insertion finally came through swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) mock_memory.assert_has_calls([ # 3 calls, as long as error raised call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata) ]) def test_retrying_proxy_swh_storage_origin_metadata_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_metadata_add') mock_memory.side_effect = ValueError('Refuse to add always!') ori_meta = sample_data['origin_metadata'][0] origin = ori_meta['origin'] swh_storage.origin_add_one(origin) url = origin['url'] ts = ori_meta['discovery_date'] provider_id = 'provider_id' tool_id = ori_meta['tool'] metadata = ori_meta['metadata'] with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_visit_update( swh_storage, sample_data): """Standard origin_visit_update works as before """ sample_origin = sample_data['origin'][0] swh_storage.origin_add_one(sample_origin) origin_url = sample_origin['url'] origin_visit = swh_storage.origin_visit_add( origin_url, '2020-01-01', 'hg') ov = next(swh_storage.origin_visit_get(origin_url)) assert ov['origin'] == origin_url assert ov['visit'] == origin_visit['visit'] assert ov['status'] == 'ongoing' assert ov['snapshot'] is None assert ov['metadata'] is None swh_storage.origin_visit_update(origin_url, ov['visit'], status='full') ov = next(swh_storage.origin_visit_get(origin_url)) assert ov['origin'] == origin_url assert ov['visit'] == origin_visit['visit'] assert ov['status'] == 'full' assert ov['snapshot'] is None assert ov['metadata'] is None def test_retrying_proxy_swh_storage_origin_visit_update_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data['origin'][1] origin_url = sample_origin['url'] mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_update') mock_memory.side_effect = [ # first try goes ko HashCollision('origin hash collision'), # second try goes ko psycopg2.IntegrityError('origin already inserted'), # ok then! {'origin': origin_url, 'visit': 1} ] visit_id = 1 swh_storage.origin_visit_update(origin_url, visit_id, status='full') assert mock_memory.has_calls([ call(origin_url, visit_id, status='full'), call(origin_url, visit_id, status='full'), call(origin_url, visit_id, status='full'), ]) def test_retrying_proxy_swh_storage_origin_visit_update_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( 'swh.storage.in_memory.Storage.origin_visit_update') mock_memory.side_effect = ValueError('Refuse to add origin always!') origin_url = sample_data['origin'][0]['url'] visit_id = 9 with pytest.raises(ValueError, match='Refuse to add'): swh_storage.origin_visit_update(origin_url, visit_id, 'partial') assert mock_memory.call_count == 1 def test_retrying_proxy_storage_directory_add(swh_storage, sample_data): """Standard directory_add works as before """ sample_dir = sample_data['directory'][0] directory = swh_storage.directory_get_random() # no directory assert not directory s = swh_storage.directory_add([sample_dir]) assert s == { 'directory:add': 1, } directory_id = swh_storage.directory_get_random() # only 1 assert directory_id == sample_dir['id'] def test_retrying_proxy_storage_directory_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.directory_add') mock_memory.side_effect = [ # first try goes ko HashCollision('directory hash collision'), # second try goes ko psycopg2.IntegrityError('directory already inserted'), # ok then! {'directory:add': 1} ] sample_dir = sample_data['directory'][1] directory_id = swh_storage.directory_get_random() # no directory assert not directory_id s = swh_storage.directory_add([sample_dir]) assert s == { 'directory:add': 1, } assert mock_memory.has_calls([ call([sample_dir]), call([sample_dir]), call([sample_dir]), ]) def test_retrying_proxy_swh_storage_directory_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.directory_add') mock_memory.side_effect = ValueError('Refuse to add directory always!') sample_dir = sample_data['directory'][0] directory_id = swh_storage.directory_get_random() # no directory assert not directory_id with pytest.raises(ValueError, match='Refuse to add'): swh_storage.directory_add([sample_dir]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_revision_add(swh_storage, sample_data): """Standard revision_add works as before """ sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision s = swh_storage.revision_add([sample_rev]) assert s == { 'revision:add': 1, } revision = next(swh_storage.revision_get([sample_rev['id']])) assert revision['id'] == sample_rev['id'] def test_retrying_proxy_storage_revision_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.revision_add') mock_memory.side_effect = [ # first try goes ko HashCollision('revision hash collision'), # second try goes ko psycopg2.IntegrityError('revision already inserted'), # ok then! {'revision:add': 1} ] sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision s = swh_storage.revision_add([sample_rev]) assert s == { 'revision:add': 1, } assert mock_memory.has_calls([ call([sample_rev]), call([sample_rev]), call([sample_rev]), ]) def test_retrying_proxy_swh_storage_revision_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.revision_add') mock_memory.side_effect = ValueError('Refuse to add revision always!') sample_rev = sample_data['revision'][0] revision = next(swh_storage.revision_get([sample_rev['id']])) assert not revision with pytest.raises(ValueError, match='Refuse to add'): swh_storage.revision_add([sample_rev]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_release_add(swh_storage, sample_data): """Standard release_add works as before """ sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release s = swh_storage.release_add([sample_rel]) assert s == { 'release:add': 1, } release = next(swh_storage.release_get([sample_rel['id']])) assert release['id'] == sample_rel['id'] def test_retrying_proxy_storage_release_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.release_add') mock_memory.side_effect = [ # first try goes ko HashCollision('release hash collision'), # second try goes ko psycopg2.IntegrityError('release already inserted'), # ok then! {'release:add': 1} ] sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release s = swh_storage.release_add([sample_rel]) assert s == { 'release:add': 1, } assert mock_memory.has_calls([ call([sample_rel]), call([sample_rel]), call([sample_rel]), ]) def test_retrying_proxy_swh_storage_release_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.release_add') mock_memory.side_effect = ValueError('Refuse to add release always!') sample_rel = sample_data['release'][0] release = next(swh_storage.release_get([sample_rel['id']])) assert not release with pytest.raises(ValueError, match='Refuse to add'): swh_storage.release_add([sample_rel]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data): """Standard snapshot_add works as before """ sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot s = swh_storage.snapshot_add([sample_snap]) assert s == { 'snapshot:add': 1, } snapshot = swh_storage.snapshot_get(sample_snap['id']) assert snapshot['id'] == sample_snap['id'] def test_retrying_proxy_storage_snapshot_add_with_retry( swh_storage, sample_data, mocker): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.snapshot_add') mock_memory.side_effect = [ # first try goes ko HashCollision('snapshot hash collision'), # second try goes ko psycopg2.IntegrityError('snapshot already inserted'), # ok then! {'snapshot:add': 1} ] sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot s = swh_storage.snapshot_add([sample_snap]) assert s == { 'snapshot:add': 1, } assert mock_memory.has_calls([ call([sample_snap]), call([sample_snap]), call([sample_snap]), ]) def test_retrying_proxy_swh_storage_snapshot_add_failure( swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch('swh.storage.in_memory.Storage.snapshot_add') mock_memory.side_effect = ValueError('Refuse to add snapshot always!') sample_snap = sample_data['snapshot'][0] snapshot = swh_storage.snapshot_get(sample_snap['id']) assert not snapshot with pytest.raises(ValueError, match='Refuse to add'): swh_storage.snapshot_add([sample_snap]) assert mock_memory.call_count == 1