Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9125038
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
30 KB
Subscribers
None
View Options
diff --git a/swh/storage/retry.py b/swh/storage/retry.py
index f96f7a9..6245212 100644
--- a/swh/storage/retry.py
+++ b/swh/storage/retry.py
@@ -1,97 +1,101 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import psycopg2
import traceback
from datetime import datetime
from typing import Dict, List, Optional, Union
from retrying import retry
from swh.storage import get_storage, HashCollision
logger = logging.getLogger(__name__)
RETRY_EXCEPTIONS = [
# raised when two parallel insertions insert the same data
psycopg2.IntegrityError,
HashCollision,
]
def should_retry_adding(error: Exception) -> bool:
"""Retry policy when some kind of failures occur (database integrity error,
hash collision, etc...)
"""
retry = any(isinstance(error, exc) for exc in RETRY_EXCEPTIONS)
if retry:
error_name = error.__module__ + '.' + error.__class__.__name__
logger.warning('Retry adding a batch', exc_info=False, extra={
'swh_type': 'storage_retry',
'swh_exception_type': error_name,
'swh_exception': traceback.format_exception(
error.__class__,
error,
error.__traceback__,
),
})
return retry
class RetryingProxyStorage:
"""Storage implementation which retries adding objects when it specifically
fails (hash collision, integrity error).
"""
def __init__(self, storage):
self.storage = get_storage(**storage)
def __getattr__(self, key):
return getattr(self.storage, key)
@retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
def content_add(self, content: List[Dict]) -> Dict:
return self.storage.content_add(content)
@retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
def origin_add_one(self, origin: Dict) -> str:
return self.storage.origin_add_one(origin)
@retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
def origin_visit_add(self, origin: Dict,
date: Union[datetime, str], type: str) -> Dict:
return self.storage.origin_visit_add(origin, date, type)
@retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
def origin_visit_update(
self, origin: str, visit_id: int, status: Optional[str] = None,
metadata: Optional[Dict] = None,
snapshot: Optional[Dict] = None) -> Dict:
return self.storage.origin_visit_update(
origin, visit_id, status=status,
metadata=metadata, snapshot=snapshot)
@retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
def tool_add(self, tools: List[Dict]) -> List[Dict]:
return self.storage.tool_add(tools)
@retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
def metadata_provider_add(
self, provider_name: str, provider_type: str, provider_url: str,
metadata: Dict) -> Union[str, int]:
return self.storage.metadata_provider_add(
provider_name, provider_type, provider_url, metadata)
@retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
def origin_metadata_add(
self, origin_url: str, ts: Union[str, datetime],
provider_id: int, tool_id: int, metadata: Dict) -> None:
return self.storage.origin_metadata_add(
origin_url, ts, provider_id, tool_id, metadata)
+
+ @retry(retry_on_exception=should_retry_adding, stop_max_attempt_number=3)
+ def directory_add(self, directories: List[Dict]) -> Dict:
+ return self.storage.directory_add(directories)
diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py
index dd2dffc..f21331a 100644
--- a/swh/storage/tests/conftest.py
+++ b/swh/storage/tests/conftest.py
@@ -1,228 +1,228 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import glob
import pytest
from typing import Union
from pytest_postgresql import factories
from pytest_postgresql.janitor import DatabaseJanitor, psycopg2, Version
from os import path, environ
from hypothesis import settings
from typing import Dict
import swh.storage
from swh.core.utils import numfile_sortkey as sortkey
from swh.model.tests.generate_testdata import gen_contents, gen_origins
SQL_DIR = path.join(path.dirname(swh.storage.__file__), 'sql')
environ['LC_ALL'] = 'C.UTF-8'
DUMP_FILES = path.join(SQL_DIR, '*.sql')
# define tests profile. Full documentation is at:
# https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles
settings.register_profile("fast", max_examples=5, deadline=5000)
settings.register_profile("slow", max_examples=20, deadline=5000)
@pytest.fixture
def swh_storage(postgresql_proc, swh_storage_postgresql):
storage_config = {
'cls': 'local',
'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format(
host=postgresql_proc.host,
port=postgresql_proc.port,
user='postgres',
dbname='tests'),
'objstorage': {
'cls': 'memory',
'args': {}
},
'journal_writer': {
'cls': 'memory',
},
}
storage = swh.storage.get_storage(**storage_config)
return storage
@pytest.fixture
def swh_contents(swh_storage):
contents = gen_contents(n=20)
swh_storage.content_add(contents)
return contents
@pytest.fixture
def swh_origins(swh_storage):
origins = gen_origins(n=100)
swh_storage.origin_add(origins)
return origins
# the postgres_fact factory fixture below is mostly a copy of the code
# from pytest-postgresql. We need a custom version here to be able to
# specify our version of the DBJanitor we use.
def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES):
@pytest.fixture
def postgresql_factory(request):
"""
Fixture factory for PostgreSQL.
:param FixtureRequest request: fixture request object
:rtype: psycopg2.connection
:returns: postgresql client
"""
config = factories.get_config(request)
if not psycopg2:
raise ImportError(
'No module named psycopg2. Please install it.'
)
proc_fixture = request.getfixturevalue(process_fixture_name)
# _, config = try_import('psycopg2', request)
pg_host = proc_fixture.host
pg_port = proc_fixture.port
pg_user = proc_fixture.user
pg_options = proc_fixture.options
pg_db = db_name or config['dbname']
with SwhDatabaseJanitor(
pg_user, pg_host, pg_port, pg_db, proc_fixture.version,
dump_files=dump_files
):
connection = psycopg2.connect(
dbname=pg_db,
user=pg_user,
host=pg_host,
port=pg_port,
options=pg_options
)
yield connection
connection.close()
return postgresql_factory
swh_storage_postgresql = postgresql_fact('postgresql_proc')
# This version of the DatabaseJanitor implement a different setup/teardown
# behavior than than the stock one: instead of dropping, creating and
# initializing the database for each test, it create and initialize the db only
# once, then it truncate the tables. This is needed to have acceptable test
# performances.
class SwhDatabaseJanitor(DatabaseJanitor):
def __init__(
self,
user: str,
host: str,
port: str,
db_name: str,
version: Union[str, float, Version],
dump_files: str = DUMP_FILES
) -> None:
super().__init__(user, host, port, db_name, version)
self.dump_files = sorted(
glob.glob(dump_files), key=sortkey)
def db_setup(self):
with psycopg2.connect(
dbname=self.db_name,
user=self.user,
host=self.host,
port=self.port,
) as cnx:
with cnx.cursor() as cur:
for fname in self.dump_files:
with open(fname) as fobj:
sql = fobj.read().replace('concurrently', '').strip()
if sql:
cur.execute(sql)
cnx.commit()
def db_reset(self):
with psycopg2.connect(
dbname=self.db_name,
user=self.user,
host=self.host,
port=self.port,
) as cnx:
with cnx.cursor() as cur:
cur.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema = %s", ('public',))
tables = set(table for (table,) in cur.fetchall())
for table in tables:
cur.execute('truncate table %s cascade' % table)
cur.execute(
"SELECT sequence_name FROM information_schema.sequences "
"WHERE sequence_schema = %s", ('public',))
seqs = set(seq for (seq,) in cur.fetchall())
for seq in seqs:
cur.execute('ALTER SEQUENCE %s RESTART;' % seq)
cnx.commit()
def init(self):
with self.cursor() as cur:
cur.execute(
"SELECT COUNT(1) FROM pg_database WHERE datname=%s;",
(self.db_name,))
db_exists = cur.fetchone()[0] == 1
if db_exists:
cur.execute(
'UPDATE pg_database SET datallowconn=true '
'WHERE datname = %s;',
(self.db_name,))
if db_exists:
self.db_reset()
else:
with self.cursor() as cur:
cur.execute('CREATE DATABASE "{}";'.format(self.db_name))
self.db_setup()
def drop(self):
pid_column = 'pid'
with self.cursor() as cur:
cur.execute(
'UPDATE pg_database SET datallowconn=false '
'WHERE datname = %s;', (self.db_name,))
cur.execute(
'SELECT pg_terminate_backend(pg_stat_activity.{})'
'FROM pg_stat_activity '
'WHERE pg_stat_activity.datname = %s;'.format(pid_column),
(self.db_name,))
@pytest.fixture
def sample_data() -> Dict:
"""Pre-defined sample storage object data to manipulate
Returns:
Dict of data (keys: content, directory, revision, release, person,
origin)
"""
from .storage_data import data
return {
'content': [data.cont, data.cont2],
'person': [data.person],
- 'directory': [data.dir2],
+ 'directory': [data.dir2, data.dir],
'revision': [data.revision],
'release': [data.release, data.release2, data.release3],
'origin': [data.origin, data.origin2],
'tool': [data.metadata_tool],
'provider': [data.provider],
'origin_metadata': [data.origin_metadata, data.origin_metadata2],
}
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
index 5472aba..7ac40c7 100644
--- a/swh/storage/tests/test_retry.py
+++ b/swh/storage/tests/test_retry.py
@@ -1,520 +1,593 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import psycopg2
import pytest
from typing import Dict
from unittest.mock import call
from swh.storage import HashCollision
from swh.storage.retry import (
RetryingProxyStorage, should_retry_adding, RETRY_EXCEPTIONS
)
def test_should_retry_adding():
"""Specific exceptions should be elected for retrial
"""
for exc in RETRY_EXCEPTIONS:
assert should_retry_adding(exc('error')) is True
def test_should_retry_adding_no_retry():
"""Unspecific exceptions should raise as usual
"""
for exc in [ValueError, Exception]:
assert should_retry_adding(exc('fail!')) is False
@pytest.fixture
def swh_storage():
return RetryingProxyStorage(storage={'cls': 'memory'})
def test_retrying_proxy_storage_content_add(swh_storage, sample_data):
"""Standard content_add works as before
"""
sample_content = sample_data['content'][0]
content = next(swh_storage.content_get([sample_content['sha1']]))
assert not content
s = swh_storage.content_add([sample_content])
assert s == {
'content:add': 1,
'content:add:bytes': sample_content['length'],
'skipped_content:add': 0
}
def test_retrying_proxy_storage_content_add_with_retry(
swh_storage, sample_data, mocker):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
mock_memory.side_effect = [
# first try goes ko
HashCollision('content hash collision'),
# second try goes ko
psycopg2.IntegrityError('content already inserted'),
# ok then!
{'content:add': 1}
]
sample_content = sample_data['content'][0]
content = next(swh_storage.content_get([sample_content['sha1']]))
assert not content
s = swh_storage.content_add([sample_content])
assert s == {
'content:add': 1,
}
def test_retrying_proxy_swh_storage_content_add_failure(
swh_storage, sample_data, mocker):
"""Other errors are raising as usual
"""
mock_memory = mocker.patch('swh.storage.in_memory.Storage.content_add')
mock_memory.side_effect = ValueError('Refuse to add content always!')
sample_content = sample_data['content'][0]
content = next(swh_storage.content_get([sample_content['sha1']]))
assert not content
with pytest.raises(ValueError, match='Refuse to add'):
swh_storage.content_add([sample_content])
content = next(swh_storage.content_get([sample_content['sha1']]))
assert not content
def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data):
"""Standard origin_add_one works as before
"""
sample_origin = sample_data['origin'][0]
origin = swh_storage.origin_get(sample_origin)
assert not origin
r = swh_storage.origin_add_one(sample_origin)
assert r == sample_origin['url']
origin = swh_storage.origin_get(sample_origin)
assert origin['url'] == sample_origin['url']
def test_retrying_proxy_swh_storage_origin_add_one_retry(
swh_storage, sample_data, mocker):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
sample_origin = sample_data['origin'][1]
mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one')
mock_memory.side_effect = [
# first try goes ko
HashCollision('origin hash collision'),
# second try goes ko
psycopg2.IntegrityError('origin already inserted'),
# ok then!
sample_origin['url']
]
origin = swh_storage.origin_get(sample_origin)
assert not origin
r = swh_storage.origin_add_one(sample_origin)
assert r == sample_origin['url']
def test_retrying_proxy_swh_storage_origin_add_one_failure(
swh_storage, sample_data, mocker):
"""Other errors are raising as usual
"""
mock_memory = mocker.patch('swh.storage.in_memory.Storage.origin_add_one')
mock_memory.side_effect = ValueError('Refuse to add origin always!')
sample_origin = sample_data['origin'][0]
origin = swh_storage.origin_get(sample_origin)
assert not origin
with pytest.raises(ValueError, match='Refuse to add'):
swh_storage.origin_add_one([sample_origin])
origin = swh_storage.origin_get(sample_origin)
assert not origin
def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data):
"""Standard origin_visit_add works as before
"""
sample_origin = sample_data['origin'][0]
swh_storage.origin_add_one(sample_origin)
origin_url = sample_origin['url']
origin = list(swh_storage.origin_visit_get(origin_url))
assert not origin
origin_visit = swh_storage.origin_visit_add(
origin_url, '2020-01-01', 'hg')
assert origin_visit['origin'] == origin_url
assert isinstance(origin_visit['visit'], int)
origin_visit = next(swh_storage.origin_visit_get(origin_url))
assert origin_visit['origin'] == origin_url
assert isinstance(origin_visit['visit'], int)
def test_retrying_proxy_swh_storage_origin_visit_add_retry(
swh_storage, sample_data, mocker):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
sample_origin = sample_data['origin'][1]
swh_storage.origin_add_one(sample_origin)
origin_url = sample_origin['url']
mock_memory = mocker.patch(
'swh.storage.in_memory.Storage.origin_visit_add')
mock_memory.side_effect = [
# first try goes ko
HashCollision('origin hash collision'),
# second try goes ko
psycopg2.IntegrityError('origin already inserted'),
# ok then!
{'origin': origin_url, 'visit': 1}
]
origin = list(swh_storage.origin_visit_get(origin_url))
assert not origin
r = swh_storage.origin_visit_add(sample_origin, '2020-01-01', 'git')
assert r == {'origin': origin_url, 'visit': 1}
def test_retrying_proxy_swh_storage_origin_visit_add_failure(
swh_storage, sample_data, mocker):
"""Other errors are raising as usual
"""
mock_memory = mocker.patch(
'swh.storage.in_memory.Storage.origin_visit_add')
mock_memory.side_effect = ValueError('Refuse to add origin always!')
origin_url = sample_data['origin'][0]['url']
origin = list(swh_storage.origin_visit_get(origin_url))
assert not origin
with pytest.raises(ValueError, match='Refuse to add'):
swh_storage.origin_visit_add(origin_url, '2020-01-01', 'svn')
def test_retrying_proxy_storage_tool_add(swh_storage, sample_data):
"""Standard tool_add works as before
"""
sample_tool = sample_data['tool'][0]
tool = swh_storage.tool_get(sample_tool)
assert not tool
tools = swh_storage.tool_add([sample_tool])
assert tools
tool = tools[0]
tool.pop('id')
assert tool == sample_tool
tool = swh_storage.tool_get(sample_tool)
tool.pop('id')
assert tool == sample_tool
def test_retrying_proxy_storage_tool_add_with_retry(
swh_storage, sample_data, mocker):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
sample_tool = sample_data['tool'][0]
mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add')
mock_memory.side_effect = [
# first try goes ko
HashCollision('tool hash collision'),
# second try goes ko
psycopg2.IntegrityError('tool already inserted'),
# ok then!
[sample_tool]
]
tool = swh_storage.tool_get(sample_tool)
assert not tool
tools = swh_storage.tool_add([sample_tool])
assert tools == [sample_tool]
def test_retrying_proxy_swh_storage_tool_add_failure(
swh_storage, sample_data, mocker):
"""Other errors are raising as usual
"""
mock_memory = mocker.patch('swh.storage.in_memory.Storage.tool_add')
mock_memory.side_effect = ValueError('Refuse to add tool always!')
sample_tool = sample_data['tool'][0]
tool = swh_storage.tool_get(sample_tool)
assert not tool
with pytest.raises(ValueError, match='Refuse to add'):
swh_storage.tool_add([sample_tool])
tool = swh_storage.tool_get(sample_tool)
assert not tool
def to_provider(provider: Dict) -> Dict:
return {
'provider_name': provider['name'],
'provider_url': provider['url'],
'provider_type': provider['type'],
'metadata': provider['metadata'],
}
def test_retrying_proxy_storage_metadata_provider_add(
swh_storage, sample_data):
"""Standard metadata_provider_add works as before
"""
provider = sample_data['provider'][0]
provider_get = to_provider(provider)
provider = swh_storage.metadata_provider_get_by(provider_get)
assert not provider
provider_id = swh_storage.metadata_provider_add(**provider_get)
assert provider_id
actual_provider = swh_storage.metadata_provider_get(provider_id)
assert actual_provider
actual_provider_id = actual_provider.pop('id')
assert actual_provider_id == provider_id
assert actual_provider == provider_get
def test_retrying_proxy_storage_metadata_provider_add_with_retry(
swh_storage, sample_data, mocker):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
provider = sample_data['provider'][0]
provider_get = to_provider(provider)
mock_memory = mocker.patch(
'swh.storage.in_memory.Storage.metadata_provider_add')
mock_memory.side_effect = [
# first try goes ko
HashCollision('provider_id hash collision'),
# second try goes ko
psycopg2.IntegrityError('provider_id already inserted'),
# ok then!
'provider_id',
]
provider = swh_storage.metadata_provider_get_by(provider_get)
assert not provider
provider_id = swh_storage.metadata_provider_add(**provider_get)
assert provider_id == 'provider_id'
def test_retrying_proxy_swh_storage_metadata_provider_add_failure(
swh_storage, sample_data, mocker):
"""Other errors are raising as usual
"""
mock_memory = mocker.patch(
'swh.storage.in_memory.Storage.metadata_provider_add')
mock_memory.side_effect = ValueError('Refuse to add provider_id always!')
provider = sample_data['provider'][0]
provider_get = to_provider(provider)
provider_id = swh_storage.metadata_provider_get_by(provider_get)
assert not provider_id
with pytest.raises(ValueError, match='Refuse to add'):
swh_storage.metadata_provider_add(**provider_get)
def test_retrying_proxy_storage_origin_metadata_add(
swh_storage, sample_data):
"""Standard origin_metadata_add works as before
"""
ori_meta = sample_data['origin_metadata'][0]
origin = ori_meta['origin']
swh_storage.origin_add_one(origin)
provider_get = to_provider(ori_meta['provider'])
provider_id = swh_storage.metadata_provider_add(**provider_get)
origin_metadata = swh_storage.origin_metadata_get_by(origin['url'])
assert not origin_metadata
swh_storage.origin_metadata_add(
origin['url'], ori_meta['discovery_date'],
provider_id, ori_meta['tool'], ori_meta['metadata'])
origin_metadata = swh_storage.origin_metadata_get_by(
origin['url'])
assert origin_metadata
def test_retrying_proxy_storage_origin_metadata_add_with_retry(
swh_storage, sample_data, mocker):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
ori_meta = sample_data['origin_metadata'][0]
origin = ori_meta['origin']
swh_storage.origin_add_one(origin)
provider_get = to_provider(ori_meta['provider'])
provider_id = swh_storage.metadata_provider_add(**provider_get)
mock_memory = mocker.patch(
'swh.storage.in_memory.Storage.origin_metadata_add')
mock_memory.side_effect = [
# first try goes ko
HashCollision('provider_id hash collision'),
# second try goes ko
psycopg2.IntegrityError('provider_id already inserted'),
# ok then!
None
]
url = origin['url']
ts = ori_meta['discovery_date']
tool_id = ori_meta['tool']
metadata = ori_meta['metadata']
# No exception raised as insertion finally came through
swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata)
mock_memory.assert_has_calls([ # 3 calls, as long as error raised
call(url, ts, provider_id, tool_id, metadata),
call(url, ts, provider_id, tool_id, metadata),
call(url, ts, provider_id, tool_id, metadata)
])
def test_retrying_proxy_swh_storage_origin_metadata_add_failure(
swh_storage, sample_data, mocker):
"""Other errors are raising as usual
"""
mock_memory = mocker.patch(
'swh.storage.in_memory.Storage.origin_metadata_add')
mock_memory.side_effect = ValueError('Refuse to add always!')
ori_meta = sample_data['origin_metadata'][0]
origin = ori_meta['origin']
swh_storage.origin_add_one(origin)
with pytest.raises(ValueError, match='Refuse to add'):
swh_storage.origin_metadata_add(
origin['url'], ori_meta['discovery_date'],
'provider_id', ori_meta['tool'], ori_meta['metadata'])
def test_retrying_proxy_swh_storage_origin_visit_update(
swh_storage, sample_data):
"""Standard origin_visit_update works as before
"""
sample_origin = sample_data['origin'][0]
swh_storage.origin_add_one(sample_origin)
origin_url = sample_origin['url']
origin_visit = swh_storage.origin_visit_add(
origin_url, '2020-01-01', 'hg')
ov = next(swh_storage.origin_visit_get(origin_url))
assert ov['origin'] == origin_url
assert ov['visit'] == origin_visit['visit']
assert ov['status'] == 'ongoing'
assert ov['snapshot'] is None
assert ov['metadata'] is None
swh_storage.origin_visit_update(origin_url, ov['visit'], status='full')
ov = next(swh_storage.origin_visit_get(origin_url))
assert ov['origin'] == origin_url
assert ov['visit'] == origin_visit['visit']
assert ov['status'] == 'full'
assert ov['snapshot'] is None
assert ov['metadata'] is None
def test_retrying_proxy_swh_storage_origin_visit_update_retry(
swh_storage, sample_data, mocker):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
sample_origin = sample_data['origin'][1]
origin_url = sample_origin['url']
mock_memory = mocker.patch(
'swh.storage.in_memory.Storage.origin_visit_update')
mock_memory.side_effect = [
# first try goes ko
HashCollision('origin hash collision'),
# second try goes ko
psycopg2.IntegrityError('origin already inserted'),
# ok then!
{'origin': origin_url, 'visit': 1}
]
visit_id = 1
swh_storage.origin_visit_update(origin_url, visit_id, status='full')
assert mock_memory.has_calls([
call(origin_url, visit_id, status='full'),
call(origin_url, visit_id, status='full'),
call(origin_url, visit_id, status='full'),
])
def test_retrying_proxy_swh_storage_origin_visit_update_failure(
swh_storage, sample_data, mocker):
"""Other errors are raising as usual
"""
mock_memory = mocker.patch(
'swh.storage.in_memory.Storage.origin_visit_update')
mock_memory.side_effect = ValueError('Refuse to add origin always!')
origin_url = sample_data['origin'][0]['url']
visit_id = 9
with pytest.raises(ValueError, match='Refuse to add'):
swh_storage.origin_visit_update(origin_url, visit_id, 'partial')
assert mock_memory.has_calls([
call(origin_url, visit_id, 'partial'),
call(origin_url, visit_id, 'partial'),
call(origin_url, visit_id, 'partial'),
])
+
+
+def test_retrying_proxy_storage_directory_add(swh_storage, sample_data):
+ """Standard directory_add works as before
+
+ """
+ sample_dir = sample_data['directory'][0]
+
+ directory = swh_storage.directory_get_random() # no directory
+ assert not directory
+
+ s = swh_storage.directory_add([sample_dir])
+ assert s == {
+ 'directory:add': 1,
+ }
+
+ directory_id = swh_storage.directory_get_random() # only 1
+ assert directory_id == sample_dir['id']
+
+
+def test_retrying_proxy_storage_directory_add_with_retry(
+ swh_storage, sample_data, mocker):
+ """Multiple retries for hash collision and psycopg2 error but finally ok
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.directory_add')
+ mock_memory.side_effect = [
+ # first try goes ko
+ HashCollision('directory hash collision'),
+ # second try goes ko
+ psycopg2.IntegrityError('directory already inserted'),
+ # ok then!
+ {'directory:add': 1}
+ ]
+
+ sample_dir = sample_data['directory'][1]
+
+ directory_id = swh_storage.directory_get_random() # no directory
+ assert not directory_id
+
+ s = swh_storage.directory_add([sample_dir])
+ assert s == {
+ 'directory:add': 1,
+ }
+
+ assert mock_memory.has_calls([
+ call([sample_dir]),
+ call([sample_dir]),
+ call([sample_dir]),
+ ])
+
+
+def test_retrying_proxy_swh_storage_directory_add_failure(
+ swh_storage, sample_data, mocker):
+ """Other errors are raising as usual
+
+ """
+ mock_memory = mocker.patch('swh.storage.in_memory.Storage.directory_add')
+ mock_memory.side_effect = ValueError('Refuse to add directory always!')
+
+ sample_dir = sample_data['directory'][0]
+
+ directory_id = swh_storage.directory_get_random() # no directory
+ assert not directory_id
+
+ with pytest.raises(ValueError, match='Refuse to add'):
+ swh_storage.directory_add([sample_dir])
+
+ assert mock_memory.has_calls([
+ call([sample_dir]),
+ call([sample_dir]),
+ call([sample_dir]),
+ ])
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 7:57 PM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3266080
Attached To
rDSTOC swh-storage-cassandra
Event Timeline
Log In to Comment