Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9349190
pytest_plugin.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Subscribers
None
pytest_plugin.py
View Options
# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
os
from
typing
import
Any
,
Dict
import
pytest
import
yaml
from
swh.scheduler.model
import
ListedOrigin
,
Lister
from
swh.scheduler.utils
import
create_origin_task_dict
@pytest.fixture
def
swh_storage_backend_config
(
swh_storage_postgresql
)
->
Dict
[
str
,
Any
]:
return
{
"cls"
:
"retry"
,
"storage"
:
{
"cls"
:
"filter"
,
"storage"
:
{
"cls"
:
"buffer"
,
"storage"
:
{
"cls"
:
"postgresql"
,
"db"
:
swh_storage_postgresql
.
dsn
,
"objstorage"
:
{
"cls"
:
"memory"
},
},
},
},
}
@pytest.fixture
def
swh_loader_config
(
swh_storage_backend_config
)
->
Dict
[
str
,
Any
]:
return
{
"storage"
:
swh_storage_backend_config
,
}
@pytest.fixture
def
swh_config
(
swh_loader_config
,
monkeypatch
,
tmp_path
)
->
str
:
conffile
=
os
.
path
.
join
(
str
(
tmp_path
),
"loader.yml"
)
with
open
(
conffile
,
"w"
)
as
f
:
f
.
write
(
yaml
.
dump
(
swh_loader_config
))
monkeypatch
.
setenv
(
"SWH_CONFIG_FILENAME"
,
conffile
)
return
conffile
@pytest.fixture
(
autouse
=
True
,
scope
=
"session"
)
def
swh_proxy
():
"""Automatically inject this fixture in all tests to ensure no outside
connection takes place.
"""
os
.
environ
[
"http_proxy"
]
=
"http://localhost:999"
os
.
environ
[
"https_proxy"
]
=
"http://localhost:999"
@pytest.fixture
def
loading_task_creation_for_listed_origin_test
(
mocker
,
swh_scheduler_celery_app
,
swh_scheduler_celery_worker
,
swh_config
,
):
def
test_implementation
(
loader_class_name
:
str
,
task_function_name
:
str
,
lister
:
Lister
,
listed_origin
:
ListedOrigin
,
):
mock_load
=
mocker
.
patch
(
f
"{loader_class_name}.load"
)
mock_load
.
return_value
=
{
"status"
:
"eventful"
}
task_dict
=
create_origin_task_dict
(
listed_origin
,
lister
)
res
=
swh_scheduler_celery_app
.
send_task
(
task_function_name
,
kwargs
=
task_dict
[
"arguments"
][
"kwargs"
],
)
assert
res
res
.
wait
()
assert
res
.
successful
()
assert
mock_load
.
called
assert
res
.
result
==
{
"status"
:
"eventful"
}
return
test_implementation
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jul 4 2025, 7:09 PM (6 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3240136
Attached To
rDLDBASE Generic VCS/Package Loader
Event Timeline
Log In to Comment