Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8392813
test_tasks.py
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Subscribers
None
test_tasks.py
View Options
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
pytest
@pytest.mark.db
def
test_task_check_eventful
(
mocker
,
deposit_config_path
,
swh_scheduler_celery_app
,
swh_scheduler_celery_worker
):
"""Successful check should make the check succeed
"""
client
=
mocker
.
patch
(
"swh.deposit.loader.checker.PrivateApiDepositClient.check"
)
client
.
return_value
=
"verified"
collection
=
"collection"
deposit_id
=
42
res
=
swh_scheduler_celery_app
.
send_task
(
"swh.deposit.loader.tasks.ChecksDepositTsk"
,
args
=
[
collection
,
deposit_id
]
)
assert
res
res
.
wait
()
assert
res
.
successful
()
assert
res
.
result
==
{
"status"
:
"eventful"
}
client
.
assert_called_once_with
(
f
"/{collection}/{deposit_id}/check/"
)
@pytest.mark.db
def
test_task_check_failure
(
mocker
,
deposit_config_path
,
swh_scheduler_celery_app
,
swh_scheduler_celery_worker
):
"""Unverified check status should make the check fail
"""
client
=
mocker
.
patch
(
"swh.deposit.loader.checker.PrivateApiDepositClient.check"
)
client
.
return_value
=
"not-verified"
# will make the status "failed"
collection
=
"collec"
deposit_id
=
666
res
=
swh_scheduler_celery_app
.
send_task
(
"swh.deposit.loader.tasks.ChecksDepositTsk"
,
args
=
[
collection
,
deposit_id
]
)
assert
res
res
.
wait
()
assert
res
.
successful
()
assert
res
.
result
==
{
"status"
:
"failed"
}
client
.
assert_called_once_with
(
f
"/{collection}/{deposit_id}/check/"
)
@pytest.mark.db
def
test_task_check_3
(
mocker
,
deposit_config_path
,
swh_scheduler_celery_app
,
swh_scheduler_celery_worker
):
"""Unexpected failures should fail the check
"""
client
=
mocker
.
patch
(
"swh.deposit.loader.checker.PrivateApiDepositClient.check"
)
client
.
side_effect
=
ValueError
(
"unexpected failure will make it fail"
)
collection
=
"another-collection"
deposit_id
=
999
res
=
swh_scheduler_celery_app
.
send_task
(
"swh.deposit.loader.tasks.ChecksDepositTsk"
,
args
=
[
collection
,
deposit_id
]
)
assert
res
res
.
wait
()
assert
res
.
successful
()
assert
res
.
result
==
{
"status"
:
"failed"
}
client
.
assert_called_once_with
(
f
"/{collection}/{deposit_id}/check/"
)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jun 4 2025, 7:03 PM (10 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3372386
Attached To
rDDEP Push deposit
Event Timeline
Log In to Comment