Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
Show All 24 Lines | from .common import ( | ||||
LISTERS, | LISTERS, | ||||
TASK_TYPES, | TASK_TYPES, | ||||
TEMPLATES, | TEMPLATES, | ||||
tasks_from_template, | tasks_from_template, | ||||
tasks_with_priority_from_template, | tasks_with_priority_from_template, | ||||
) | ) | ||||
ONEDAY = datetime.timedelta(days=1) | ONEDAY = datetime.timedelta(days=1) | ||||
vlorentz: existing
and what test code is it? why isn't it updated too? | |||||
Done Inline Actionsit is updated. Check below. The code that creates tasks (with and without priority) or the ones that read tasks (the ones read tasks with no priority check that they don't have any priority). ardumont: it is updated. Check below.
The code that creates tasks (with and without priority) or the… | |||||
Done Inline Actions
thx, fixed. ardumont: > existing
thx, fixed. | |||||
# for compatibility purpose with existing test code | |||||
PRIORITY_RATIO = {"high": 0.6, "normal": 0.3, "low": 0.2} | |||||
def subdict(d, keys=None, excl=()): | def subdict(d, keys=None, excl=()): | ||||
if keys is None: | if keys is None: | ||||
keys = [k for k in d.keys()] | keys = [k for k in d.keys()] | ||||
return {k: d[k] for k in keys if k not in excl} | return {k: d[k] for k in keys if k not in excl} | ||||
def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: | def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: | ||||
Show All 30 Lines | def test_interface(self, swh_scheduler): | ||||
expected_signature = inspect.signature(interface_meth) | expected_signature = inspect.signature(interface_meth) | ||||
actual_signature = inspect.signature(concrete_meth) | actual_signature = inspect.signature(concrete_meth) | ||||
assert expected_signature == actual_signature, meth_name | assert expected_signature == actual_signature, meth_name | ||||
assert missing_methods == [] | assert missing_methods == [] | ||||
def test_get_priority_ratios(self, swh_scheduler): | |||||
assert swh_scheduler.get_priority_ratios() == { | |||||
"high": 0.5, | |||||
"normal": 0.3, | |||||
"low": 0.2, | |||||
} | |||||
def test_add_task_type(self, swh_scheduler): | def test_add_task_type(self, swh_scheduler): | ||||
tt = TASK_TYPES["git"] | tt = TASK_TYPES["git"] | ||||
swh_scheduler.create_task_type(tt) | swh_scheduler.create_task_type(tt) | ||||
assert tt == swh_scheduler.get_task_type(tt["type"]) | assert tt == swh_scheduler.get_task_type(tt["type"]) | ||||
tt2 = TASK_TYPES["hg"] | tt2 = TASK_TYPES["hg"] | ||||
swh_scheduler.create_task_type(tt2) | swh_scheduler.create_task_type(tt2) | ||||
assert tt == swh_scheduler.get_task_type(tt["type"]) | assert tt == swh_scheduler.get_task_type(tt["type"]) | ||||
assert tt2 == swh_scheduler.get_task_type(tt2["type"]) | assert tt2 == swh_scheduler.get_task_type(tt2["type"]) | ||||
def test_create_task_type_idempotence(self, swh_scheduler): | def test_create_task_type_idempotence(self, swh_scheduler): | ||||
tt = TASK_TYPES["git"] | tt = TASK_TYPES["git"] | ||||
swh_scheduler.create_task_type(tt) | swh_scheduler.create_task_type(tt) | ||||
swh_scheduler.create_task_type(tt) | swh_scheduler.create_task_type(tt) | ||||
assert tt == swh_scheduler.get_task_type(tt["type"]) | assert tt == swh_scheduler.get_task_type(tt["type"]) | ||||
def test_get_task_types(self, swh_scheduler): | def test_get_task_types(self, swh_scheduler): | ||||
tt, tt2 = TASK_TYPES["git"], TASK_TYPES["hg"] | tt, tt2 = TASK_TYPES["git"], TASK_TYPES["hg"] | ||||
swh_scheduler.create_task_type(tt) | swh_scheduler.create_task_type(tt) | ||||
swh_scheduler.create_task_type(tt2) | swh_scheduler.create_task_type(tt2) | ||||
actual_task_types = swh_scheduler.get_task_types() | actual_task_types = swh_scheduler.get_task_types() | ||||
assert tt in actual_task_types | assert tt in actual_task_types | ||||
assert tt2 in actual_task_types | assert tt2 in actual_task_types | ||||
def test_create_tasks(self, swh_scheduler): | def test_create_tasks(self, swh_scheduler): | ||||
priority_ratio = self._priority_ratio(swh_scheduler) | |||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), 100) | tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), 100) | ||||
tasks_2 = tasks_from_template( | tasks_2 = tasks_from_template( | ||||
TEMPLATES["hg"], | TEMPLATES["hg"], | ||||
utcnow(), | utcnow(), | ||||
100, | 100, | ||||
num_tasks_priority, | num_tasks_priority, | ||||
priorities=priority_ratio, | priorities=PRIORITY_RATIO, | ||||
) | ) | ||||
tasks = tasks_1 + tasks_2 | tasks = tasks_1 + tasks_2 | ||||
# tasks are returned only once with their ids | # tasks are returned only once with their ids | ||||
ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) | ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) | ||||
set_ret1 = set([t["id"] for t in ret1]) | set_ret1 = set([t["id"] for t in ret1]) | ||||
# creating the same set result in the same ids | # creating the same set result in the same ids | ||||
Show All 27 Lines | def test_create_tasks(self, swh_scheduler): | ||||
if "policy" not in orig_task: | if "policy" not in orig_task: | ||||
del task["policy"] | del task["policy"] | ||||
if "priority" not in orig_task: | if "priority" not in orig_task: | ||||
del task["priority"] | del task["priority"] | ||||
assert task == orig_task | assert task == orig_task | ||||
assert dict(actual_priorities) == { | assert dict(actual_priorities) == { | ||||
priority: int(ratio * num_tasks_priority) | priority: int(ratio * num_tasks_priority) | ||||
for priority, ratio in priority_ratio.items() | for priority, ratio in PRIORITY_RATIO.items() | ||||
} | } | ||||
def test_peek_ready_tasks_no_priority(self, swh_scheduler): | def test_peek_ready_tasks_no_priority(self, swh_scheduler): | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
task_type = TEMPLATES["git"]["type"] | task_type = TEMPLATES["git"]["type"] | ||||
tasks = tasks_from_template(TEMPLATES["git"], t, 100) | tasks = tasks_from_template(TEMPLATES["git"], t, 100) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
Show All 27 Lines | def test_peek_ready_tasks_no_priority(self, swh_scheduler): | ||||
ready_tasks_both = swh_scheduler.peek_ready_tasks( | ready_tasks_both = swh_scheduler.peek_ready_tasks( | ||||
task_type, timestamp=max_ts, num_tasks=limit // 3 | task_type, timestamp=max_ts, num_tasks=limit // 3 | ||||
) | ) | ||||
assert len(ready_tasks_both) <= limit // 3 | assert len(ready_tasks_both) <= limit // 3 | ||||
for ready_task in ready_tasks_both: | for ready_task in ready_tasks_both: | ||||
assert ready_task["next_run"] <= max_ts | assert ready_task["next_run"] <= max_ts | ||||
assert ready_task in ready_tasks[: limit // 3] | assert ready_task in ready_tasks[: limit // 3] | ||||
def _priority_ratio(self, swh_scheduler): | def test_peek_ready_tasks_returns_only_no_priority_tasks(self, swh_scheduler): | ||||
return swh_scheduler.get_priority_ratios() | """Peek ready tasks only return standard tasks (no priority)""" | ||||
def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): | |||||
priority_ratio = self._priority_ratio(swh_scheduler) | |||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
task_type = TEMPLATES["git"]["type"] | task_type = TEMPLATES["git"]["type"] | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
num_tasks_no_priority = 100 | num_tasks_no_priority = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
tasks = tasks_from_template( | tasks = tasks_from_template( | ||||
TEMPLATES["git"], | TEMPLATES["git"], | ||||
t, | t, | ||||
num=num_tasks_no_priority, | num=num_tasks_no_priority, | ||||
num_priority=num_tasks_priority, | num_priority=num_tasks_priority, | ||||
priorities=priority_ratio, | priorities=PRIORITY_RATIO, | ||||
) | ) | ||||
count_priority = 0 | |||||
for task in tasks: | |||||
count_priority += 0 if task.get("priority") is None else 1 | |||||
assert count_priority > 0, "Some created tasks should have some priority" | |||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
swh_scheduler.create_tasks(tasks) | swh_scheduler.create_tasks(tasks) | ||||
# take all available tasks | # take all available no priority tasks | ||||
ready_tasks = swh_scheduler.peek_ready_tasks(task_type) | ready_tasks = swh_scheduler.peek_ready_tasks(task_type) | ||||
assert len(ready_tasks) == len(tasks) | assert len(ready_tasks) == len(tasks) - count_priority | ||||
assert num_tasks_priority + num_tasks_no_priority == len(ready_tasks) | |||||
count_tasks_per_priority = defaultdict(int) | # No read task should have any priority | ||||
for task in ready_tasks: | for task in ready_tasks: | ||||
priority = task.get("priority") | assert task.get("priority") is None | ||||
if priority: | |||||
count_tasks_per_priority[priority] += 1 | |||||
assert dict(count_tasks_per_priority) == { | |||||
priority: int(ratio * num_tasks_priority) | |||||
for priority, ratio in priority_ratio.items() | |||||
} | |||||
# Only get some ready tasks | |||||
num_tasks = random.randrange(5, 5 + num_tasks_no_priority // 2) | |||||
num_tasks_priority = random.randrange(5, num_tasks_priority // 2) | |||||
ready_tasks_limited = swh_scheduler.peek_ready_tasks( | |||||
task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority | |||||
) | |||||
count_tasks_per_priority = defaultdict(int) | |||||
for task in ready_tasks_limited: | |||||
priority = task.get("priority") | |||||
count_tasks_per_priority[priority] += 1 | |||||
import math | |||||
for priority, ratio in priority_ratio.items(): | |||||
expected_count = math.ceil(ratio * num_tasks_priority) | |||||
actual_prio = count_tasks_per_priority[priority] | |||||
assert actual_prio == expected_count or actual_prio == expected_count + 1 | |||||
assert count_tasks_per_priority[None] == num_tasks | |||||
def test_grab_ready_tasks(self, swh_scheduler): | def test_grab_ready_tasks(self, swh_scheduler): | ||||
priority_ratio = self._priority_ratio(swh_scheduler) | |||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
task_type = TEMPLATES["git"]["type"] | task_type = TEMPLATES["git"]["type"] | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
num_tasks_no_priority = 100 | num_tasks_no_priority = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
tasks = tasks_from_template( | tasks = tasks_from_template( | ||||
TEMPLATES["git"], | TEMPLATES["git"], | ||||
t, | t, | ||||
num=num_tasks_no_priority, | num=num_tasks_no_priority, | ||||
num_priority=num_tasks_priority, | num_priority=num_tasks_priority, | ||||
priorities=priority_ratio, | priorities=PRIORITY_RATIO, | ||||
) | ) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
swh_scheduler.create_tasks(tasks) | swh_scheduler.create_tasks(tasks) | ||||
first_ready_tasks = swh_scheduler.peek_ready_tasks( | first_ready_tasks = swh_scheduler.peek_ready_tasks(task_type, num_tasks=50) | ||||
task_type, num_tasks=10, num_tasks_priority=10 | grabbed_tasks = swh_scheduler.grab_ready_tasks(task_type, num_tasks=50) | ||||
) | |||||
grabbed_tasks = swh_scheduler.grab_ready_tasks( | |||||
task_type, num_tasks=10, num_tasks_priority=10 | |||||
) | |||||
for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): | for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): | ||||
assert peeked["status"] == "next_run_not_scheduled" | assert peeked["status"] == "next_run_not_scheduled" | ||||
del peeked["status"] | del peeked["status"] | ||||
assert grabbed["status"] == "next_run_scheduled" | assert grabbed["status"] == "next_run_scheduled" | ||||
del grabbed["status"] | del grabbed["status"] | ||||
assert peeked == grabbed | assert peeked == grabbed | ||||
assert peeked["priority"] == grabbed["priority"] | priority = grabbed["priority"] | ||||
assert priority == peeked["priority"] | |||||
assert priority is None | |||||
def test_grab_ready_priority_tasks(self, swh_scheduler): | def test_grab_ready_priority_tasks(self, swh_scheduler): | ||||
"""check the grab and peek priority tasks endpoint behave as expected""" | """check the grab and peek priority tasks endpoint behave as expected""" | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
task_type = TEMPLATES["git"]["type"] | task_type = TEMPLATES["git"]["type"] | ||||
num_tasks = 100 | num_tasks = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
▲ Show 20 Lines • Show All 993 Lines • Show Last 20 Lines |
existing
and what test code is it? why isn't it updated too?