Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7437663
test_cli.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
22 KB
Subscribers
None
test_cli.py
View Options
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
datetime
from
itertools
import
islice
import
logging
import
random
import
re
import
tempfile
from
unittest.mock
import
patch
from
click.testing
import
CliRunner
import
pytest
from
swh.core.api.classes
import
stream_results
from
swh.model.model
import
Origin
from
swh.scheduler.cli
import
cli
from
swh.scheduler.utils
import
create_task_dict
,
utcnow
CLI_CONFIG
=
"""
scheduler:
cls: foo
args: {}
"""
def
invoke
(
scheduler
,
catch_exceptions
,
args
):
runner
=
CliRunner
()
with
patch
(
"swh.scheduler.get_scheduler"
)
as
get_scheduler_mock
,
tempfile
.
NamedTemporaryFile
(
"a"
,
suffix
=
".yml"
)
as
config_fd
:
config_fd
.
write
(
CLI_CONFIG
)
config_fd
.
seek
(
0
)
get_scheduler_mock
.
return_value
=
scheduler
args
=
[
"-C"
+
config_fd
.
name
,]
+
args
result
=
runner
.
invoke
(
cli
,
args
,
obj
=
{
"log_level"
:
logging
.
WARNING
})
if
not
catch_exceptions
and
result
.
exception
:
print
(
result
.
output
)
raise
result
.
exception
return
result
def
test_schedule_tasks
(
swh_scheduler
):
csv_data
=
(
b
'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};'
+
utcnow
()
.
isoformat
()
.
encode
()
+
b
"
\n
"
+
b
'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};'
+
utcnow
()
.
isoformat
()
.
encode
()
+
b
"
\n
"
)
with
tempfile
.
NamedTemporaryFile
(
suffix
=
".csv"
)
as
csv_fd
:
csv_fd
.
write
(
csv_data
)
csv_fd
.
seek
(
0
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"schedule"
,
"-d"
,
";"
,
csv_fd
.
name
]
)
expected
=
r"""
Created 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
\['arg1', 'arg2'\]
Keyword args:
key: 'value'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
\['arg3', 'arg4'\]
Keyword args:
key: 'value'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_schedule_tasks_columns
(
swh_scheduler
):
with
tempfile
.
NamedTemporaryFile
(
suffix
=
".csv"
)
as
csv_fd
:
csv_fd
.
write
(
b
'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}
\n
'
)
csv_fd
.
seek
(
0
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"schedule"
,
"-c"
,
"type"
,
"-c"
,
"policy"
,
"-c"
,
"args"
,
"-c"
,
"kwargs"
,
"-d"
,
";"
,
csv_fd
.
name
,
],
)
expected
=
r"""
Created 1 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
'arg1'
'arg2'
Keyword args:
key: 'value'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_schedule_task
(
swh_scheduler
):
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"add"
,
"swh-test-ping"
,
"arg1"
,
"arg2"
,
"key=value"
,],
)
expected
=
r"""
Created 1 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
'arg1'
'arg2'
Keyword args:
key: 'value'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_pending_tasks_none
(
swh_scheduler
):
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list-pending"
,
"swh-test-ping"
,])
expected
=
r"""
Found 0 swh-test-ping tasks
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_pending_tasks
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value1"
)
task2
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
)
task2
[
"next_run"
]
+=
datetime
.
timedelta
(
days
=
1
)
swh_scheduler
.
create_tasks
([
task1
,
task2
])
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list-pending"
,
"swh-test-ping"
,])
expected
=
r"""
Found 1 swh-test-ping tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value1'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
swh_scheduler
.
grab_ready_tasks
(
"swh-test-ping"
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list-pending"
,
"swh-test-ping"
,])
expected
=
r"""
Found 0 swh-test-ping tasks
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_pending_tasks_filter
(
swh_scheduler
):
task
=
create_task_dict
(
"swh-test-multiping"
,
"oneshot"
,
key
=
"value"
)
swh_scheduler
.
create_tasks
([
task
])
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list-pending"
,
"swh-test-ping"
,])
expected
=
r"""
Found 0 swh-test-ping tasks
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_pending_tasks_filter_2
(
swh_scheduler
):
swh_scheduler
.
create_tasks
(
[
create_task_dict
(
"swh-test-multiping"
,
"oneshot"
,
key
=
"value"
),
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
),
]
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list-pending"
,
"swh-test-ping"
,])
expected
=
r"""
Found 1 swh-test-ping tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
# Fails because "task list-pending --limit 3" only returns 2 tasks, because
# of how compute_nb_tasks_from works.
@pytest.mark.xfail
def
test_list_pending_tasks_limit
(
swh_scheduler
):
swh_scheduler
.
create_tasks
(
[
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value
%d
"
%
i
)
for
i
in
range
(
10
)
]
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list-pending"
,
"swh-test-ping"
,
"--limit"
,
"3"
,]
)
expected
=
r"""
Found 2 swh-test-ping tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value0'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value1'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_pending_tasks_before
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value"
)
task2
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
)
task1
[
"next_run"
]
+=
datetime
.
timedelta
(
days
=
3
)
task2
[
"next_run"
]
+=
datetime
.
timedelta
(
days
=
1
)
swh_scheduler
.
create_tasks
([
task1
,
task2
])
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list-pending"
,
"swh-test-ping"
,
"--before"
,
(
datetime
.
date
.
today
()
+
datetime
.
timedelta
(
days
=
2
))
.
isoformat
(),
],
)
expected
=
r"""
Found 1 swh-test-ping tasks
Task 2
Next run: tomorrow \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_tasks
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value1"
)
task2
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
)
task1
[
"next_run"
]
+=
datetime
.
timedelta
(
days
=
3
,
hours
=
2
)
swh_scheduler
.
create_tasks
([
task1
,
task2
])
swh_scheduler
.
grab_ready_tasks
(
"swh-test-ping"
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list"
,])
expected
=
r"""
Found 2 tasks
Task 1
Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_tasks_id
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value1"
)
task2
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
)
task3
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value3"
)
swh_scheduler
.
create_tasks
([
task1
,
task2
,
task3
])
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list"
,
"--task-id"
,
"2"
,])
expected
=
r"""
Found 1 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_tasks_id_2
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value1"
)
task2
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
)
task3
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value3"
)
swh_scheduler
.
create_tasks
([
task1
,
task2
,
task3
])
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list"
,
"--task-id"
,
"2"
,
"--task-id"
,
"3"
]
)
expected
=
r"""
Found 2 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value3'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_tasks_type
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value1"
)
task2
=
create_task_dict
(
"swh-test-multiping"
,
"oneshot"
,
key
=
"value2"
)
task3
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value3"
)
swh_scheduler
.
create_tasks
([
task1
,
task2
,
task3
])
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list"
,
"--task-type"
,
"swh-test-ping"
]
)
expected
=
r"""
Found 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value3'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_tasks_limit
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value1"
)
task2
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
)
task3
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value3"
)
swh_scheduler
.
create_tasks
([
task1
,
task2
,
task3
])
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list"
,
"--limit"
,
"2"
,])
expected
=
r"""
Found 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_tasks_before
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value1"
)
task2
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
)
task1
[
"next_run"
]
+=
datetime
.
timedelta
(
days
=
3
,
hours
=
2
)
swh_scheduler
.
create_tasks
([
task1
,
task2
])
swh_scheduler
.
grab_ready_tasks
(
"swh-test-ping"
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list"
,
"--before"
,
(
datetime
.
date
.
today
()
+
datetime
.
timedelta
(
days
=
2
))
.
isoformat
(),
],
)
expected
=
r"""
Found 1 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
test_list_tasks_after
(
swh_scheduler
):
task1
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value1"
)
task2
=
create_task_dict
(
"swh-test-ping"
,
"oneshot"
,
key
=
"value2"
)
task1
[
"next_run"
]
+=
datetime
.
timedelta
(
days
=
3
,
hours
=
2
)
swh_scheduler
.
create_tasks
([
task1
,
task2
])
swh_scheduler
.
grab_ready_tasks
(
"swh-test-ping"
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"list"
,
"--after"
,
(
datetime
.
date
.
today
()
+
datetime
.
timedelta
(
days
=
2
))
.
isoformat
(),
],
)
expected
=
r"""
Found 1 tasks
Task 1
Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
result
.
output
def
_fill_storage_with_origins
(
storage
,
nb_origins
):
origins
=
[
Origin
(
url
=
f
"http://example.com/{i}"
)
for
i
in
range
(
nb_origins
)]
storage
.
origin_add
(
origins
)
return
origins
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
def
test_task_schedule_origins_dry_run
(
swh_scheduler
,
storage
):
"""Tests the scheduling when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
_fill_storage_with_origins
(
storage
,
90
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"schedule_origins"
,
"--dry-run"
,
"swh-test-ping"
,],
)
# Check the output
expected
=
r"""
Scheduled 3 tasks \(30 origins\).
Scheduled 6 tasks \(60 origins\).
Scheduled 9 tasks \(90 origins\).
Done.
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
repr
(
result
.
output
)
# Check scheduled tasks
tasks
=
swh_scheduler
.
search_tasks
()
assert
len
(
tasks
)
==
0
def
_assert_origin_tasks_contraints
(
tasks
,
max_tasks
,
max_task_size
,
expected_origins
):
# check there are not too many tasks
assert
len
(
tasks
)
<=
max_tasks
# check tasks are not too large
assert
all
(
len
(
task
[
"arguments"
][
"args"
][
0
])
<=
max_task_size
for
task
in
tasks
)
# check the tasks are exhaustive
assert
sum
([
len
(
task
[
"arguments"
][
"args"
][
0
])
for
task
in
tasks
])
==
len
(
expected_origins
)
assert
set
.
union
(
*
(
set
(
task
[
"arguments"
][
"args"
][
0
])
for
task
in
tasks
))
==
{
origin
.
url
for
origin
in
expected_origins
}
@patch
(
"swh.scheduler.cli.utils.TASK_BATCH_SIZE"
,
3
)
def
test_task_schedule_origins
(
swh_scheduler
,
storage
):
"""Tests the scheduling when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
origins
=
_fill_storage_with_origins
(
storage
,
70
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"schedule_origins"
,
"swh-test-ping"
,
"--batch-size"
,
"20"
,],
)
# Check the output
expected
=
r"""
Scheduled 3 tasks \(60 origins\).
Scheduled 4 tasks \(70 origins\).
Done.
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
repr
(
result
.
output
)
# Check tasks
tasks
=
swh_scheduler
.
search_tasks
()
_assert_origin_tasks_contraints
(
tasks
,
4
,
20
,
origins
)
assert
all
(
task
[
"arguments"
][
"kwargs"
]
==
{}
for
task
in
tasks
)
def
test_task_schedule_origins_kwargs
(
swh_scheduler
,
storage
):
"""Tests support of extra keyword-arguments."""
origins
=
_fill_storage_with_origins
(
storage
,
30
)
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"schedule_origins"
,
"swh-test-ping"
,
"--batch-size"
,
"20"
,
'key1="value1"'
,
'key2="value2"'
,
],
)
# Check the output
expected
=
r"""
Scheduled 2 tasks \(30 origins\).
Done.
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
repr
(
result
.
output
)
# Check tasks
tasks
=
swh_scheduler
.
search_tasks
()
_assert_origin_tasks_contraints
(
tasks
,
2
,
20
,
origins
)
assert
all
(
task
[
"arguments"
][
"kwargs"
]
==
{
"key1"
:
"value1"
,
"key2"
:
"value2"
}
for
task
in
tasks
)
def
test_task_schedule_origins_with_limit
(
swh_scheduler
,
storage
):
"""Tests support of extra keyword-arguments."""
_fill_storage_with_origins
(
storage
,
50
)
limit
=
20
expected_origins
=
list
(
islice
(
stream_results
(
storage
.
origin_list
),
limit
))
nb_origins
=
len
(
expected_origins
)
assert
nb_origins
==
limit
max_task_size
=
5
nb_tasks
,
remainder
=
divmod
(
nb_origins
,
max_task_size
)
assert
remainder
==
0
# made the numbers go round
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"schedule_origins"
,
"swh-test-ping"
,
"--batch-size"
,
max_task_size
,
"--limit"
,
limit
,
],
)
# Check the output
expected
=
rf
"""
Scheduled {nb_tasks} tasks \({nb_origins} origins\).
Done.
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
repr
(
result
.
output
)
tasks
=
swh_scheduler
.
search_tasks
()
_assert_origin_tasks_contraints
(
tasks
,
max_task_size
,
nb_origins
,
expected_origins
)
def
test_task_schedule_origins_with_page_token
(
swh_scheduler
,
storage
):
"""Tests support of extra keyword-arguments."""
nb_total_origins
=
50
origins
=
_fill_storage_with_origins
(
storage
,
nb_total_origins
)
# prepare page_token and origins result expectancy
page_result
=
storage
.
origin_list
(
limit
=
10
)
assert
len
(
page_result
.
results
)
==
10
page_token
=
page_result
.
next_page_token
assert
page_token
is
not
None
# remove the first 10 origins listed as we won't see those in tasks
expected_origins
=
[
o
for
o
in
origins
if
o
not
in
page_result
.
results
]
nb_origins
=
len
(
expected_origins
)
assert
nb_origins
==
nb_total_origins
-
len
(
page_result
.
results
)
max_task_size
=
10
nb_tasks
,
remainder
=
divmod
(
nb_origins
,
max_task_size
)
assert
remainder
==
0
result
=
invoke
(
swh_scheduler
,
False
,
[
"task"
,
"schedule_origins"
,
"swh-test-ping"
,
"--batch-size"
,
max_task_size
,
"--page-token"
,
page_token
,
],
)
# Check the output
expected
=
rf
"""
Scheduled {nb_tasks} tasks \({nb_origins} origins\).
Done.
"""
.
lstrip
()
assert
result
.
exit_code
==
0
,
result
.
output
assert
re
.
fullmatch
(
expected
,
result
.
output
,
re
.
MULTILINE
),
repr
(
result
.
output
)
# Check tasks
tasks
=
swh_scheduler
.
search_tasks
()
_assert_origin_tasks_contraints
(
tasks
,
max_task_size
,
nb_origins
,
expected_origins
)
def
test_cli_task_runner_unknown_task_types
(
swh_scheduler
,
storage
):
"""When passing at least one unknown task type, the runner should fail."""
task_types
=
swh_scheduler
.
get_task_types
()
task_type_names
=
[
t
[
"type"
]
for
t
in
task_types
]
known_task_type
=
random
.
choice
(
task_type_names
)
unknown_task_type
=
"unknown-task-type"
assert
unknown_task_type
not
in
task_type_names
with
pytest
.
raises
(
ValueError
,
match
=
"Unknown"
):
invoke
(
swh_scheduler
,
False
,
[
"start-runner"
,
"--task-type"
,
known_task_type
,
"--task-type"
,
unknown_task_type
,
],
)
@pytest.mark.parametrize
(
"flag_priority"
,
[
"--with-priority"
,
"--without-priority"
])
def
test_cli_task_runner_with_known_tasks
(
swh_scheduler
,
storage
,
caplog
,
flag_priority
):
"""Trigger runner with known tasks runs smoothly."""
task_types
=
swh_scheduler
.
get_task_types
()
task_type_names
=
[
t
[
"type"
]
for
t
in
task_types
]
task_type_name
=
random
.
choice
(
task_type_names
)
task_type_name2
=
random
.
choice
(
task_type_names
)
# The runner will just iterate over the following known tasks and do noop. We are
# just checking the runner does not explode here.
result
=
invoke
(
swh_scheduler
,
False
,
[
"start-runner"
,
flag_priority
,
"--task-type"
,
task_type_name
,
"--task-type"
,
task_type_name2
,
],
)
assert
result
.
exit_code
==
0
,
result
.
output
def
test_cli_task_runner_no_task
(
swh_scheduler
,
storage
):
"""Trigger runner with no parameter should run as before."""
# The runner will just iterate over the existing tasks from the scheduler and do
# noop. We are just checking the runner does not explode here.
result
=
invoke
(
swh_scheduler
,
False
,
[
"start-runner"
,],)
assert
result
.
exit_code
==
0
,
result
.
output
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Tue, Apr 15, 2:21 AM (2 d, 8 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3285970
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment