deposit_config_path = '/tmp/pytest-of-jenkins/pytest-0/test_convert_status_detail_20/deposit.yml'
swh_scheduler_config = {'db': "user=postgres password=xxx dbname=tests host=127.0.0.1 port=17789 options=''"}
@pytest.fixture(autouse=True)
def deposit_autoconfig(deposit_config_path, swh_scheduler_config):
"""Enforce config for deposit classes inherited from APIConfig."""
cfg = read(deposit_config_path)
# scheduler setup: require the load-deposit tasks (already existing in production)
scheduler = get_scheduler(**cfg["scheduler"])
task_type = {
"type": "load-deposit",
"backend_name": "swh.loader.packages.deposit.tasks.LoadDeposit",
"description": "Load deposit task",
}
scheduler.create_task_type(task_type)
# storage setup: require the metadata authority swh (already existing in production)
> storage = get_storage(**cfg["storage"])
.tox/py3/lib/python3.7/site-packages/swh/deposit/tests/conftest.py:101:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = 'local'
kwargs = {'db': 'postgresql://postgres@127.0.0.1:17789/tests', 'objstorage': {'args': {}, 'cls': 'memory'}}
class_path = '.postgresql.storage.Storage', module_path = '.postgresql.storage'
class_name = 'Storage'
module = <module 'swh.storage.postgresql.storage' from '/var/lib/jenkins/workspace/DDEP/tests-on-diff/.tox/py3/lib/python3.7/site-packages/swh/storage/postgresql/storage.py'>
Storage = <class 'swh.storage.postgresql.storage.Storage'>
check_config = {'check_write': True}
storage = <swh.storage.postgresql.storage.Storage object at 0x7f85b307a080>
def get_storage(cls: str, **kwargs) -> "StorageInterface":
"""Get a storage object of class `storage_class` with arguments
`storage_args`.
Args:
storage (dict): dictionary with keys:
- cls (str): storage's class, either local, remote, memory, filter,
buffer
- args (dict): dictionary with keys
Returns:
an instance of swh.storage.Storage or compatible class
Raises:
ValueError if passed an unknown storage class.
"""
if "args" in kwargs:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly instead.',
DeprecationWarning,
)
kwargs = kwargs["args"]
if cls == "pipeline":
return get_storage_pipeline(**kwargs)
class_path = STORAGE_IMPLEMENTATIONS.get(cls)
if class_path is None:
raise ValueError(
"Unknown storage class `%s`. Supported: %s"
% (cls, ", ".join(STORAGE_IMPLEMENTATIONS))
)
(module_path, class_name) = class_path.rsplit(".", 1)
module = importlib.import_module(module_path, package=__package__)
Storage = getattr(module, class_name)
check_config = kwargs.pop("check_config", {})
storage = Storage(**kwargs)
if check_config:
if not storage.check_config(**check_config):
> raise EnvironmentError("storage check config failed")
E OSError: storage check config failed
.tox/py3/lib/python3.7/site-packages/swh/storage/__init__.py:67: OSError
TEST RESULT
TEST RESULT
- Run At
- Sep 23 2020, 2:52 PM