diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index c395e8c..5e0c21d 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,36 +1,36 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from swh.core.tests.server_testing import ServerTestFixture from swh.scheduler import get_scheduler -from swh.scheduler.tests.test_scheduler import CommonSchedulerTest from swh.scheduler.api.server import app +from swh.scheduler.tests.test_scheduler import CommonSchedulerTest class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture, unittest.TestCase): """Test the remote scheduler API. This class doesn't define any tests as we want identical functionality between local and remote scheduler. All the tests are therefore defined in CommonSchedulerTest. """ def setUp(self): self.config = { 'scheduler': { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=%s' % self.TEST_DB_NAME, } } } self.app = app # this will setup the local scheduler... super().setUp() # accessible through a remote scheduler accessible on the # given port self.backend = get_scheduler('remote', {'url': self.url()}) diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 443ec62..1a44fca 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,484 +1,475 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import os import random import unittest import uuid +from collections import defaultdict +import psycopg2 from arrow import utcnow -from collections import defaultdict from nose.plugins.attrib import attr -from nose.tools import istest -import psycopg2 from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler import get_scheduler + from . import DATA_DIR @attr('db') class CommonSchedulerTest(SingleDbTestFixture): TEST_DB_NAME = 'softwareheritage-scheduler-test' TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps/swh-scheduler.sql') TEST_DB_DUMP_TYPE = 'psql' def setUp(self): super().setUp() tt = { 'type': 'update-git', 'description': 'Update a git repository', 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), } tt2 = tt.copy() tt2['type'] = 'update-hg' tt2['description'] = 'Update a mercurial repository' tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository' tt2['max_queue_length'] = 42 tt2['num_retries'] = None tt2['retry_delay'] = None self.task_types = { tt['type']: tt, tt2['type']: tt2, } self.task1_template = t1_template = { 'type': tt['type'], 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, } self.task2_template = t2_template = copy.deepcopy(t1_template) t2_template['type'] = tt2['type'] t2_template['policy'] = 'oneshot' def tearDown(self): self.backend.close_connection() self.empty_tables() super().tearDown() def empty_tables(self, whitelist=["priority_ratio"]): query = """SELECT table_name FROM information_schema.tables WHERE table_schema = %%s and table_name not in (%s) """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) self.cursor.execute(query, ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() - @istest - def add_task_type(self): + def test_add_task_type(self): tt, tt2 = self.task_types.values() self.backend.create_task_type(tt) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) with self.assertRaisesRegex(psycopg2.IntegrityError, r'\(type\)=\(%s\)' % tt['type']): self.backend.create_task_type(tt) self.backend.create_task_type(tt2) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) - @istest - def get_task_types(self): + def test_get_task_types(self): tt, tt2 = self.task_types.values() self.backend.create_task_type(tt) self.backend.create_task_type(tt2) self.assertCountEqual([tt2, tt], self.backend.get_task_types()) @staticmethod def _task_from_template(template, next_run, priority, *args, **kwargs): ret = copy.deepcopy(template) ret['next_run'] = next_run if priority: ret['priority'] = priority if args: ret['arguments']['args'] = list(args) if kwargs: ret['arguments']['kwargs'] = kwargs return ret def _pop_priority(self, priorities): if not priorities: return None for priority, remains in priorities.items(): if remains > 0: priorities[priority] = remains - 1 return priority return None def _tasks_from_template(self, template, max_timestamp, num, num_priority=0, priorities=None): if num_priority and priorities: priorities = { priority: ratio * num_priority for priority, ratio in priorities.items() } tasks = [] for i in range(num + num_priority): priority = self._pop_priority(priorities) tasks.append(self._task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, 'argument-%03d' % i, **{'kwarg%03d' % i: 'bogus-kwarg'} )) return tasks def _create_task_types(self): for tt in self.task_types.values(): self.backend.create_task_type(tt) - @istest - def create_tasks(self): + def test_create_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() num_tasks_priority = 100 tasks_1 = self._tasks_from_template(self.task1_template, utcnow(), 100) tasks_2 = self._tasks_from_template( self.task2_template, utcnow(), 100, num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids ret = self.backend.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results self.assertEqual(set_ret, set_ret1) self.assertEqual(len(ret), len(ret1)) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = self.task_types[orig_task['type']] self.assertNotIn(task['id'], ids) self.assertEqual(task['status'], 'next_run_not_scheduled') self.assertEqual(task['current_interval'], task_type['default_interval']) self.assertEqual(task['policy'], orig_task.get('policy', 'recurring')) priority = task.get('priority') if priority: actual_priorities[priority] += 1 self.assertEqual(task['retries_left'], task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] del task['current_interval'] del task['retries_left'] if 'policy' not in orig_task: del task['policy'] if 'priority' not in orig_task: del task['priority'] self.assertEqual(task, orig_task) self.assertEqual(dict(actual_priorities), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) - @istest - def peek_ready_tasks_no_priority(self): + def test_peek_ready_tasks_no_priority(self): self._create_task_types() t = utcnow() task_type = self.task1_template['type'] tasks = self._tasks_from_template(self.task1_template, t, 100) random.shuffle(tasks) self.backend.create_tasks(tasks) ready_tasks = self.backend.peek_ready_tasks(task_type) self.assertEqual(len(ready_tasks), len(tasks)) for i in range(len(ready_tasks) - 1): self.assertLessEqual(ready_tasks[i]['next_run'], ready_tasks[i+1]['next_run']) # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=limit) self.assertEqual(len(ready_tasks_limited), limit) self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) # Limit by timestamp max_ts = tasks[limit-1]['next_run'] ready_tasks_timestamped = self.backend.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: self.assertLessEqual(ready_task['next_run'], max_ts) # Make sure we get proper behavior for the first ready tasks self.assertCountEqual( ready_tasks[:len(ready_tasks_timestamped)], ready_tasks_timestamped, ) # Limit by both ready_tasks_both = self.backend.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) self.assertLessEqual(len(ready_tasks_both), limit//3) for ready_task in ready_tasks_both: self.assertLessEqual(ready_task['next_run'], max_ts) self.assertIn(ready_task, ready_tasks[:limit//3]) def _priority_ratio(self): self.cursor.execute('select id, ratio from priority_ratio') priority_ratio = {} for row in self.cursor.fetchall(): priority_ratio[row[0]] = row[1] return priority_ratio - @istest - def peek_ready_tasks_mixed_priorities(self): + def test_peek_ready_tasks_mixed_priorities(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() task_type = self.task1_template['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( self.task1_template, t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) # take all available tasks ready_tasks = self.backend.peek_ready_tasks( task_type) self.assertEqual(len(ready_tasks), len(tasks)) self.assertEqual(num_tasks_priority + num_tasks_no_priority, len(ready_tasks)) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get('priority') if priority: count_tasks_per_priority[priority] += 1 self.assertEqual(dict(count_tasks_per_priority), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get('priority') count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] self.assertTrue( actual_prio == expected_count or actual_prio == expected_count + 1) self.assertEqual(count_tasks_per_priority[None], num_tasks) - @istest - def grab_ready_tasks(self): + def test_grab_ready_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() task_type = self.task1_template['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( self.task1_template, t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) first_ready_tasks = self.backend.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) grabbed_tasks = self.backend.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): self.assertEqual(peeked['status'], 'next_run_not_scheduled') del peeked['status'] self.assertEqual(grabbed['status'], 'next_run_scheduled') del grabbed['status'] self.assertEqual(peeked, grabbed) self.assertEqual(peeked['priority'], grabbed['priority']) - @istest - def get_tasks(self): + def test_get_tasks(self): self._create_task_types() t = utcnow() tasks = self._tasks_from_template(self.task1_template, t, 100) tasks = self.backend.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = tasks[:length] tasks[:length] = [] ret = self.backend.get_tasks(task['id'] for task in cur_tasks) self.assertCountEqual(ret, cur_tasks) - @istest - def filter_task_to_archive(self): + def test_filter_task_to_archive(self): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types() _time = utcnow() recurring = self._tasks_from_template(self.task1_template, _time, 12) oneshots = self._tasks_from_template(self.task2_template, _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) # Randomly update task's status per policy status_per_policy = {'recurring': 0, 'oneshot': 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] 'recurring': [(1, 'disabled'), (0, 'completed'), (0, 'next_run_not_scheduled')], 'oneshot': [(0, 'next_run_not_scheduled'), (1, 'disabled'), (1, 'completed')] } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task['policy'] _task_ids[policy].append(task['id']) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) self.backend.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled self.backend.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update self.backend.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) # retrieve tasks to archive after = _time.shift(days=-1).format('YYYY-MM-DD') before = utcnow().shift(days=1).format('YYYY-MM-DD') tasks_to_archive = list(self.backend.filter_task_to_archive( after_ts=after, before_ts=before, limit=total_tasks)) self.assertEqual(len(tasks_to_archive), total_tasks_filtered) actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: actual_filtered_per_status[task['task_policy']] += 1 self.assertEqual(actual_filtered_per_status, status_per_policy) - @istest - def delete_archived_tasks(self): + def test_delete_archived_tasks(self): self._create_task_types() _time = utcnow() recurring = self._tasks_from_template( self.task1_template, _time, 12) oneshots = self._tasks_from_template( self.task2_template, _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) self.backend.delete_archived_tasks(_tasks) self.cursor.execute('select count(*) from task') tasks_count = self.cursor.fetchone() self.cursor.execute('select count(*) from task_run') tasks_run_count = self.cursor.fetchone() self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME} self.backend = get_scheduler('local', self.config) diff --git a/swh/scheduler/tests/test_task.py b/swh/scheduler/tests/test_task.py index 10131e9..7e2130e 100644 --- a/swh/scheduler/tests/test_task.py +++ b/swh/scheduler/tests/test_task.py @@ -1,32 +1,28 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.tools import istest - from swh.scheduler import task from .celery_testing import CeleryTestFixture class Task(CeleryTestFixture, unittest.TestCase): - @istest - def not_implemented_task(self): + def test_not_implemented_task(self): class NotImplementedTask(task.Task): pass with self.assertRaises(NotImplementedError): NotImplementedTask().run() - @istest - def add_task(self): + def test_add_task(self): class AddTask(task.Task): def run_task(self, x, y): return x + y r = AddTask().apply([2, 3]) self.assertTrue(r.successful()) self.assertEqual(r.result, 5) diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py index a539cea..192be61 100644 --- a/swh/scheduler/tests/test_utils.py +++ b/swh/scheduler/tests/test_utils.py @@ -1,60 +1,56 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest - from datetime import timezone -from nose.tools import istest from unittest.mock import patch from swh.scheduler import utils class UtilsTest(unittest.TestCase): - @istest @patch('swh.scheduler.utils.datetime') - def create_oneshot_task_dict_simple(self, mock_datetime): + def test_create_oneshot_task_dict_simple(self, mock_datetime): mock_datetime.now.return_value = 'some-date' actual_task = utils.create_oneshot_task_dict('some-task-type') expected_task = { 'policy': 'oneshot', 'type': 'some-task-type', 'next_run': 'some-date', 'arguments': { 'args': [], 'kwargs': {}, }, 'priority': None, } self.assertEqual(actual_task, expected_task) mock_datetime.now.assert_called_once_with(tz=timezone.utc) - @istest @patch('swh.scheduler.utils.datetime') - def create_oneshot_task_dict_other_call(self, mock_datetime): + def test_create_oneshot_task_dict_other_call(self, mock_datetime): mock_datetime.now.return_value = 'some-other-date' actual_task = utils.create_oneshot_task_dict( 'some-task-type', 'arg0', 'arg1', priority='high', other_stuff='normal' ) expected_task = { 'policy': 'oneshot', 'type': 'some-task-type', 'next_run': 'some-other-date', 'arguments': { 'args': ('arg0', 'arg1'), 'kwargs': {'other_stuff': 'normal'}, }, 'priority': 'high', } self.assertEqual(actual_task, expected_task) mock_datetime.now.assert_called_once_with(tz=timezone.utc) diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py index 2e3f70e..68683d5 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -1,68 +1,66 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from arrow import utcnow -from nose.plugins.attrib import attr -from nose.tools import istest from hypothesis import given from hypothesis.strategies import sets +from nose.plugins.attrib import attr from swh.core.tests.db_testing import SingleDbTestFixture +from swh.scheduler.tests import DATA_DIR from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent -from swh.scheduler.tests import DATA_DIR from . import from_regex @attr('db') class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps/swh-scheduler-updater.sql') TEST_DB_DUMP_TYPE = 'psql' def setUp(self): super().setUp() config = { 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, 'cache_read_limit': 1000, } self.backend = SchedulerUpdaterBackend(**config) def _empty_tables(self): self.cursor.execute( """SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def tearDown(self): self.backend.close_connection() self._empty_tables() super().tearDown() - @istest @given(sets( from_regex( r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), min_size=10, max_size=15)) - def cache_read(self, urls): + def test_cache_read(self, urls): def gen_events(urls): for url in urls: yield SWHEvent({ 'url': url, 'type': 'create', 'origin_type': 'git', }) self.backend.cache_put(gen_events(urls)) r = self.backend.cache_read(timestamp=utcnow()) self.assertNotEqual(r, []) diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py index 0944e48..463a05c 100644 --- a/swh/scheduler/tests/updater/test_consumer.py +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -1,199 +1,194 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest +from itertools import chain from hypothesis import given -from hypothesis.strategies import sampled_from, lists, tuples, text +from hypothesis.strategies import lists, sampled_from, text, tuples -from itertools import chain -from nose.tools import istest +from swh.scheduler.updater.consumer import UpdaterConsumer +from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from . import UpdaterTestUtil, from_regex -from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS -from swh.scheduler.updater.consumer import UpdaterConsumer - class FakeSchedulerUpdaterBackend: def __init__(self): self.events = [] def cache_put(self, events): self.events.append(events) class FakeUpdaterConsumerBase(UpdaterConsumer): def __init__(self, backend_class=FakeSchedulerUpdaterBackend): super().__init__(backend_class=backend_class) self.connection_opened = False self.connection_closed = False self.consume_called = False self.has_events_called = False def open_connection(self): self.connection_opened = True def close_connection(self): self.connection_closed = True def convert_event(self, event): pass class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return True def consume_events(self): self.consume_called = True raise ValueError('Broken stuff') class UpdaterConsumerRaisingTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerRaise() - @istest - def running_raise(self): + def test_running_raise(self): """Raising during run should finish fine. """ # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when with self.assertRaisesRegex(ValueError, 'Broken stuff'): self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertTrue(self.updater.consume_called) class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return False def consume_events(self): self.consume_called = True class UpdaterConsumerNoEventTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerNoEvent() - @istest - def running_does_not_consume(self): + def test_running_does_not_consume(self): """Run with no events should do just fine""" # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertFalse(self.updater.consume_called) EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type'] class FakeUpdaterConsumer(FakeUpdaterConsumerBase): def __init__(self, messages): super().__init__() self.messages = messages self.debug = False def has_events(self): self.has_events_called = True return len(self.messages) > 0 def consume_events(self): self.consume_called = True for msg in self.messages: yield msg self.messages.pop() def convert_event(self, event, keys=EVENT_KEYS): for k in keys: v = event.get(k) if v is None: return None e = { 'type': event['type'], 'url': 'https://fake.url/%s' % event['repo']['name'], 'last_seen': event['created_at'], 'origin_type': event['origin_type'], } return SWHEvent(e) class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase): - @istest @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(text(), # event type from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text(), # origin type sampled_from(EVENT_KEYS)), # keys to drop min_size=3, max_size=10)) - def running(self, events, uninteresting_events, incomplete_events): + def test_running(self, events, uninteresting_events, incomplete_events): """Interesting events are written to cache, others are dropped """ # given ready_events = self._make_events(events) ready_uninteresting_events = self._make_events(uninteresting_events) ready_incomplete_events = self._make_incomplete_events( incomplete_events) updater = FakeUpdaterConsumer(list(chain( ready_events, ready_incomplete_events, ready_uninteresting_events))) self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) # when updater.run() # then self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) self.assertTrue(updater.connection_opened) self.assertTrue(updater.has_events_called) self.assertTrue(updater.connection_closed) self.assertTrue(updater.consume_called) self.assertEqual(updater.messages, []) # uninteresting or incomplete events are dropped self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py index cb7489e..2f00bd7 100644 --- a/swh/scheduler/tests/updater/test_events.py +++ b/swh/scheduler/tests/updater/test_events.py @@ -1,48 +1,44 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from hypothesis import given -from hypothesis.strategies import text, sampled_from -from nose.tools import istest +from hypothesis.strategies import sampled_from, text -from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS +from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.ghtorrent import events from . import UpdaterTestUtil def event_values_ko(): return set(events['evt']).union( set(events['ent'])).difference( set(LISTENED_EVENTS)) WRONG_EVENTS = sorted(list(event_values_ko())) class EventTest(UpdaterTestUtil, unittest.TestCase): - @istest @given(sampled_from(LISTENED_EVENTS), text(), text()) - def is_interesting_ok(self, event_type, name, origin_type): + def test_is_interesting_ok(self, event_type, name, origin_type): evt = self._make_simple_event(event_type, name, origin_type) self.assertTrue(SWHEvent(evt).is_interesting()) - @istest @given(text(), text(), text()) - def is_interested_with_noisy_event_should_be_ko( + def test_is_interested_with_noisy_event_should_be_ko( self, event_type, name, origin_type): if event_type in LISTENED_EVENTS: # just in case something good is generated, skip it return evt = self._make_simple_event(event_type, name, origin_type) self.assertFalse(SWHEvent(evt).is_interesting()) - @istest @given(sampled_from(WRONG_EVENTS), text(), text()) - def is_interesting_ko(self, event_type, name, origin_type): + def test_is_interesting_ko(self, event_type, name, origin_type): evt = self._make_simple_event(event_type, name, origin_type) self.assertFalse(SWHEvent(evt).is_interesting()) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index bfeecf2..b87e345 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,171 +1,164 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest +from unittest.mock import patch from hypothesis import given from hypothesis.strategies import sampled_from -from nose.tools import istest -from unittest.mock import patch from swh.scheduler.updater.events import SWHEvent -from swh.scheduler.updater.ghtorrent import ( - events, GHTorrentConsumer, INTERESTING_EVENT_KEYS) +from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS, + GHTorrentConsumer, events) -from . import from_regex, UpdaterTestUtil +from . import UpdaterTestUtil, from_regex def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeChannel: """Fake Channel (virtual connection inside a connection) """ def close(self): self.close = True class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False self._channel = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self._channel = True return FakeChannel() class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): def setUp(self): self.fake_config = { 'conn': { 'url': 'amqp://u:p@https://somewhere:9807', }, 'debug': True, 'batch_cache_write': 10, 'rabbitmq_prefetch_read': 100, } self.consumer = GHTorrentConsumer(self.fake_config, _connection_class=FakeConnection) - @istest def test_init(self): # given # check init is ok self.assertEqual(self.consumer.debug, self.fake_config['debug']) self.assertEqual(self.consumer.batch, self.fake_config['batch_cache_write']) self.assertEqual(self.consumer.prefetch_read, self.fake_config['rabbitmq_prefetch_read']) self.assertEqual(self.consumer.config, self.fake_config) - @istest def test_has_events(self): self.assertTrue(self.consumer.has_events()) - @istest def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, self.fake_config['conn']['url']) self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) self.assertIsInstance(self.consumer.channel, FakeChannel) - @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) - def convert_event_ok(self, event_type, name): + def test_convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name, 'git') actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'cnt': 1, 'origin_type': 'git', } self.assertEqual(event, expected_event) - @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(INTERESTING_EVENT_KEYS)) - def convert_event_ko(self, event_type, name, missing_data_key): + def test_convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, 'git', missing_data_key) actual_converted_event = self.consumer.convert_event(input_event) self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') - @istest - def consume_events(self, mock_collect_replies): + def test_consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ self._make_event('PushEvent', 'user/some-repo', 'git'), self._make_event('PushEvent', 'user2/some-other-repo', 'git'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, self.consumer.channel, 'fake-queue', no_ack=False, limit=self.fake_config['rabbitmq_prefetch_read'] ) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index 305cd35..2f226d9 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,163 +1,158 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest - from nose.plugins.attrib import attr -from nose.tools import istest from swh.core.tests.db_testing import DbTestFixture -from swh.scheduler.updater.events import SWHEvent +from swh.scheduler.tests import DATA_DIR +from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.writer import UpdaterWriter -from swh.scheduler.updater.events import LISTENED_EVENTS -from swh.scheduler.tests import DATA_DIR from . import UpdaterTestUtil @attr('db') class CommonSchedulerTest(DbTestFixture): TEST_SCHED_DB = 'softwareheritage-scheduler-test' TEST_SCHED_DUMP = os.path.join( DATA_DIR, 'dumps/swh-scheduler.sql') TEST_SCHED_DUMP_TYPE = 'psql' TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' TEST_SCHED_UPDATER_DUMP = os.path.join( DATA_DIR, 'dumps/swh-scheduler-updater.sql') TEST_SCHED_UPDATER_DUMP_TYPE = 'psql' @classmethod def setUpClass(cls): cls.add_db(cls.TEST_SCHED_DB, cls.TEST_SCHED_DUMP, cls.TEST_SCHED_DUMP_TYPE) cls.add_db(cls.TEST_SCHED_UPDATER_DB, cls.TEST_SCHED_UPDATER_DUMP, cls.TEST_SCHED_UPDATER_DUMP_TYPE) super().setUpClass() def tearDown(self): self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) self.reset_db_tables(self.TEST_SCHED_DB, excluded=['task_type', 'priority_ratio']) super().tearDown() class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() config = { 'scheduler': { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-test', 'cache_read_limit': 5, }, 'pause': 0.1, 'verbose': False, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend def tearDown(self): self.scheduler_backend.close_connection() self.scheduler_updater_backend.close_connection() super().tearDown() - @istest - def run_ko(self): + def test_run_ko(self): """Only git tasks are supported for now, other types are dismissed. """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'svn')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # other reads after writes are still empty since it's not supported self.assertEqual(len(r), 0) - @istest - def run_ok(self): + def test_run_ok(self): """Only git origin are supported for now """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'git')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() # now, we should have scheduling task ready r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEquals(len(r), expected_length) # Check the task has been scheduled for t in r: self.assertEquals(t['type'], 'origin-update-git') self.assertEquals(t['priority'], 'normal') self.assertEquals(t['policy'], 'oneshot') self.assertEquals(t['status'], 'next_run_not_scheduled') # writer has nothing to do now self.writer.run() # so no more data in cache data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), 0) # provided, no runner is ran, still the same amount of scheduling tasks r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEquals(len(r), expected_length)