Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9349258
test_celery_tasks.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Subscribers
None
test_celery_tasks.py
View Options
from
time
import
sleep
from
celery.result
import
GroupResult
from
celery.result
import
AsyncResult
import
pytest
from
swh.scheduler.utils
import
create_task_dict
from
swh.scheduler.celery_backend.runner
import
run_ready_tasks
def
test_ping
(
swh_app
,
celery_session_worker
):
res
=
swh_app
.
send_task
(
'swh.scheduler.tests.tasks.ping'
)
assert
res
res
.
wait
()
assert
res
.
successful
()
assert
res
.
result
==
'OK'
def
test_multiping
(
swh_app
,
celery_session_worker
):
"Test that a task that spawns subtasks (group) works"
res
=
swh_app
.
send_task
(
'swh.scheduler.tests.tasks.multiping'
,
n
=
5
)
assert
res
res
.
wait
()
assert
res
.
successful
()
# retrieve the GroupResult for this task and wait for all the subtasks
# to complete
promise_id
=
res
.
result
assert
promise_id
promise
=
GroupResult
.
restore
(
promise_id
,
app
=
swh_app
)
for
i
in
range
(
5
):
if
promise
.
ready
():
break
sleep
(
1
)
results
=
[
x
.
get
()
for
x
in
promise
.
results
]
for
i
in
range
(
5
):
assert
(
"OK (kw={'i':
%s
})"
%
i
)
in
results
@pytest.mark.db
def
test_scheduler_fixture
(
swh_app
,
celery_session_worker
,
swh_scheduler
):
"Test that the scheduler fixture works properly"
task_type
=
swh_scheduler
.
get_task_type
(
'swh-test-ping'
)
assert
task_type
assert
task_type
[
'backend_name'
]
==
'swh.scheduler.tests.tasks.ping'
swh_scheduler
.
create_tasks
([
create_task_dict
(
'swh-test-ping'
,
'oneshot'
)])
backend_tasks
=
run_ready_tasks
(
swh_scheduler
,
swh_app
)
assert
backend_tasks
for
task
in
backend_tasks
:
# Make sure the task completed
AsyncResult
(
id
=
task
[
'backend_id'
])
.
get
()
@pytest.mark.db
def
test_task_return_value
(
swh_app
,
celery_session_worker
,
swh_scheduler
):
task_type
=
swh_scheduler
.
get_task_type
(
'swh-test-add'
)
assert
task_type
assert
task_type
[
'backend_name'
]
==
'swh.scheduler.tests.tasks.add'
swh_scheduler
.
create_tasks
([
create_task_dict
(
'swh-test-add'
,
'oneshot'
,
12
,
30
)])
backend_tasks
=
run_ready_tasks
(
swh_scheduler
,
swh_app
)
assert
len
(
backend_tasks
)
==
1
task
=
backend_tasks
[
0
]
value
=
AsyncResult
(
id
=
task
[
'backend_id'
])
.
get
()
assert
value
==
42
@pytest.mark.db
def
test_task_exception
(
swh_app
,
celery_session_worker
,
swh_scheduler
):
task_type
=
swh_scheduler
.
get_task_type
(
'swh-test-error'
)
assert
task_type
assert
task_type
[
'backend_name'
]
==
'swh.scheduler.tests.tasks.error'
swh_scheduler
.
create_tasks
([
create_task_dict
(
'swh-test-error'
,
'oneshot'
)])
backend_tasks
=
run_ready_tasks
(
swh_scheduler
,
swh_app
)
assert
len
(
backend_tasks
)
==
1
task
=
backend_tasks
[
0
]
result
=
AsyncResult
(
id
=
task
[
'backend_id'
])
with
pytest
.
raises
(
NotImplementedError
):
result
.
get
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jul 4 2025, 7:13 PM (6 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3286472
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment