Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
# Copyright (C) 2017-2018 The Software Heritage developers | # Copyright (C) 2017-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import os | import os | ||||
import random | import random | ||||
import unittest | import unittest | ||||
import uuid | import uuid | ||||
from collections import defaultdict | |||||
import psycopg2 | |||||
from arrow import utcnow | from arrow import utcnow | ||||
from collections import defaultdict | |||||
from nose.plugins.attrib import attr | from nose.plugins.attrib import attr | ||||
from nose.tools import istest | |||||
import psycopg2 | |||||
from swh.core.tests.db_testing import SingleDbTestFixture | from swh.core.tests.db_testing import SingleDbTestFixture | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from . import DATA_DIR | from . import DATA_DIR | ||||
@attr('db') | @attr('db') | ||||
class CommonSchedulerTest(SingleDbTestFixture): | class CommonSchedulerTest(SingleDbTestFixture): | ||||
TEST_DB_NAME = 'softwareheritage-scheduler-test' | TEST_DB_NAME = 'softwareheritage-scheduler-test' | ||||
TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps/swh-scheduler.sql') | TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps/swh-scheduler.sql') | ||||
TEST_DB_DUMP_TYPE = 'psql' | TEST_DB_DUMP_TYPE = 'psql' | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | def empty_tables(self, whitelist=["priority_ratio"]): | ||||
self.cursor.execute(query, ('public', )) | self.cursor.execute(query, ('public', )) | ||||
tables = set(table for (table,) in self.cursor.fetchall()) | tables = set(table for (table,) in self.cursor.fetchall()) | ||||
for table in tables: | for table in tables: | ||||
self.cursor.execute('truncate table %s cascade' % table) | self.cursor.execute('truncate table %s cascade' % table) | ||||
self.conn.commit() | self.conn.commit() | ||||
@istest | def test_add_task_type(self): | ||||
def add_task_type(self): | |||||
tt, tt2 = self.task_types.values() | tt, tt2 = self.task_types.values() | ||||
self.backend.create_task_type(tt) | self.backend.create_task_type(tt) | ||||
self.assertEqual(tt, self.backend.get_task_type(tt['type'])) | self.assertEqual(tt, self.backend.get_task_type(tt['type'])) | ||||
with self.assertRaisesRegex(psycopg2.IntegrityError, | with self.assertRaisesRegex(psycopg2.IntegrityError, | ||||
r'\(type\)=\(%s\)' % tt['type']): | r'\(type\)=\(%s\)' % tt['type']): | ||||
self.backend.create_task_type(tt) | self.backend.create_task_type(tt) | ||||
self.backend.create_task_type(tt2) | self.backend.create_task_type(tt2) | ||||
self.assertEqual(tt, self.backend.get_task_type(tt['type'])) | self.assertEqual(tt, self.backend.get_task_type(tt['type'])) | ||||
self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) | self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) | ||||
@istest | def test_get_task_types(self): | ||||
def get_task_types(self): | |||||
tt, tt2 = self.task_types.values() | tt, tt2 = self.task_types.values() | ||||
self.backend.create_task_type(tt) | self.backend.create_task_type(tt) | ||||
self.backend.create_task_type(tt2) | self.backend.create_task_type(tt2) | ||||
self.assertCountEqual([tt2, tt], self.backend.get_task_types()) | self.assertCountEqual([tt2, tt], self.backend.get_task_types()) | ||||
@staticmethod | @staticmethod | ||||
def _task_from_template(template, next_run, priority, *args, **kwargs): | def _task_from_template(template, next_run, priority, *args, **kwargs): | ||||
ret = copy.deepcopy(template) | ret = copy.deepcopy(template) | ||||
Show All 34 Lines | def _tasks_from_template(self, template, max_timestamp, num, | ||||
**{'kwarg%03d' % i: 'bogus-kwarg'} | **{'kwarg%03d' % i: 'bogus-kwarg'} | ||||
)) | )) | ||||
return tasks | return tasks | ||||
def _create_task_types(self): | def _create_task_types(self): | ||||
for tt in self.task_types.values(): | for tt in self.task_types.values(): | ||||
self.backend.create_task_type(tt) | self.backend.create_task_type(tt) | ||||
@istest | def test_create_tasks(self): | ||||
def create_tasks(self): | |||||
priority_ratio = self._priority_ratio() | priority_ratio = self._priority_ratio() | ||||
self._create_task_types() | self._create_task_types() | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
tasks_1 = self._tasks_from_template(self.task1_template, utcnow(), 100) | tasks_1 = self._tasks_from_template(self.task1_template, utcnow(), 100) | ||||
tasks_2 = self._tasks_from_template( | tasks_2 = self._tasks_from_template( | ||||
self.task2_template, utcnow(), 100, | self.task2_template, utcnow(), 100, | ||||
num_tasks_priority, priorities=priority_ratio) | num_tasks_priority, priorities=priority_ratio) | ||||
tasks = tasks_1 + tasks_2 | tasks = tasks_1 + tasks_2 | ||||
Show All 39 Lines | def test_create_tasks(self): | ||||
del task['priority'] | del task['priority'] | ||||
self.assertEqual(task, orig_task) | self.assertEqual(task, orig_task) | ||||
self.assertEqual(dict(actual_priorities), { | self.assertEqual(dict(actual_priorities), { | ||||
priority: int(ratio * num_tasks_priority) | priority: int(ratio * num_tasks_priority) | ||||
for priority, ratio in priority_ratio.items() | for priority, ratio in priority_ratio.items() | ||||
}) | }) | ||||
@istest | def test_peek_ready_tasks_no_priority(self): | ||||
def peek_ready_tasks_no_priority(self): | |||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
task_type = self.task1_template['type'] | task_type = self.task1_template['type'] | ||||
tasks = self._tasks_from_template(self.task1_template, t, 100) | tasks = self._tasks_from_template(self.task1_template, t, 100) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
self.backend.create_tasks(tasks) | self.backend.create_tasks(tasks) | ||||
ready_tasks = self.backend.peek_ready_tasks(task_type) | ready_tasks = self.backend.peek_ready_tasks(task_type) | ||||
Show All 34 Lines | class CommonSchedulerTest(SingleDbTestFixture): | ||||
def _priority_ratio(self): | def _priority_ratio(self): | ||||
self.cursor.execute('select id, ratio from priority_ratio') | self.cursor.execute('select id, ratio from priority_ratio') | ||||
priority_ratio = {} | priority_ratio = {} | ||||
for row in self.cursor.fetchall(): | for row in self.cursor.fetchall(): | ||||
priority_ratio[row[0]] = row[1] | priority_ratio[row[0]] = row[1] | ||||
return priority_ratio | return priority_ratio | ||||
@istest | def test_peek_ready_tasks_mixed_priorities(self): | ||||
def peek_ready_tasks_mixed_priorities(self): | |||||
priority_ratio = self._priority_ratio() | priority_ratio = self._priority_ratio() | ||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
task_type = self.task1_template['type'] | task_type = self.task1_template['type'] | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
num_tasks_no_priority = 100 | num_tasks_no_priority = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
tasks = self._tasks_from_template( | tasks = self._tasks_from_template( | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def test_peek_ready_tasks_mixed_priorities(self): | ||||
expected_count = math.ceil(ratio * num_tasks_priority) | expected_count = math.ceil(ratio * num_tasks_priority) | ||||
actual_prio = count_tasks_per_priority[priority] | actual_prio = count_tasks_per_priority[priority] | ||||
self.assertTrue( | self.assertTrue( | ||||
actual_prio == expected_count or | actual_prio == expected_count or | ||||
actual_prio == expected_count + 1) | actual_prio == expected_count + 1) | ||||
self.assertEqual(count_tasks_per_priority[None], num_tasks) | self.assertEqual(count_tasks_per_priority[None], num_tasks) | ||||
@istest | def test_grab_ready_tasks(self): | ||||
def grab_ready_tasks(self): | |||||
priority_ratio = self._priority_ratio() | priority_ratio = self._priority_ratio() | ||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
task_type = self.task1_template['type'] | task_type = self.task1_template['type'] | ||||
num_tasks_priority = 100 | num_tasks_priority = 100 | ||||
num_tasks_no_priority = 100 | num_tasks_no_priority = 100 | ||||
# Create tasks with and without priorities | # Create tasks with and without priorities | ||||
tasks = self._tasks_from_template( | tasks = self._tasks_from_template( | ||||
Show All 12 Lines | def test_grab_ready_tasks(self): | ||||
for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): | for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): | ||||
self.assertEqual(peeked['status'], 'next_run_not_scheduled') | self.assertEqual(peeked['status'], 'next_run_not_scheduled') | ||||
del peeked['status'] | del peeked['status'] | ||||
self.assertEqual(grabbed['status'], 'next_run_scheduled') | self.assertEqual(grabbed['status'], 'next_run_scheduled') | ||||
del grabbed['status'] | del grabbed['status'] | ||||
self.assertEqual(peeked, grabbed) | self.assertEqual(peeked, grabbed) | ||||
self.assertEqual(peeked['priority'], grabbed['priority']) | self.assertEqual(peeked['priority'], grabbed['priority']) | ||||
@istest | def test_get_tasks(self): | ||||
def get_tasks(self): | |||||
self._create_task_types() | self._create_task_types() | ||||
t = utcnow() | t = utcnow() | ||||
tasks = self._tasks_from_template(self.task1_template, t, 100) | tasks = self._tasks_from_template(self.task1_template, t, 100) | ||||
tasks = self.backend.create_tasks(tasks) | tasks = self.backend.create_tasks(tasks) | ||||
random.shuffle(tasks) | random.shuffle(tasks) | ||||
while len(tasks) > 1: | while len(tasks) > 1: | ||||
length = random.randrange(1, len(tasks)) | length = random.randrange(1, len(tasks)) | ||||
cur_tasks = tasks[:length] | cur_tasks = tasks[:length] | ||||
tasks[:length] = [] | tasks[:length] = [] | ||||
ret = self.backend.get_tasks(task['id'] for task in cur_tasks) | ret = self.backend.get_tasks(task['id'] for task in cur_tasks) | ||||
self.assertCountEqual(ret, cur_tasks) | self.assertCountEqual(ret, cur_tasks) | ||||
@istest | def test_filter_task_to_archive(self): | ||||
def filter_task_to_archive(self): | |||||
"""Filtering only list disabled recurring or completed oneshot tasks | """Filtering only list disabled recurring or completed oneshot tasks | ||||
""" | """ | ||||
self._create_task_types() | self._create_task_types() | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template(self.task1_template, _time, 12) | recurring = self._tasks_from_template(self.task1_template, _time, 12) | ||||
oneshots = self._tasks_from_template(self.task2_template, _time, 12) | oneshots = self._tasks_from_template(self.task2_template, _time, 12) | ||||
total_tasks = len(recurring) + len(oneshots) | total_tasks = len(recurring) + len(oneshots) | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | def test_filter_task_to_archive(self): | ||||
self.assertEqual(len(tasks_to_archive), total_tasks_filtered) | self.assertEqual(len(tasks_to_archive), total_tasks_filtered) | ||||
actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} | actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} | ||||
for task in tasks_to_archive: | for task in tasks_to_archive: | ||||
actual_filtered_per_status[task['task_policy']] += 1 | actual_filtered_per_status[task['task_policy']] += 1 | ||||
self.assertEqual(actual_filtered_per_status, status_per_policy) | self.assertEqual(actual_filtered_per_status, status_per_policy) | ||||
@istest | def test_delete_archived_tasks(self): | ||||
def delete_archived_tasks(self): | |||||
self._create_task_types() | self._create_task_types() | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template( | recurring = self._tasks_from_template( | ||||
self.task1_template, _time, 12) | self.task1_template, _time, 12) | ||||
oneshots = self._tasks_from_template( | oneshots = self._tasks_from_template( | ||||
self.task2_template, _time, 12) | self.task2_template, _time, 12) | ||||
total_tasks = len(recurring) + len(oneshots) | total_tasks = len(recurring) + len(oneshots) | ||||
pending_tasks = self.backend.create_tasks(recurring + oneshots) | pending_tasks = self.backend.create_tasks(recurring + oneshots) | ||||
Show All 33 Lines |