Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
index c395e8c..5e0c21d 100644
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -1,36 +1,36 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.core.tests.server_testing import ServerTestFixture
from swh.scheduler import get_scheduler
-from swh.scheduler.tests.test_scheduler import CommonSchedulerTest
from swh.scheduler.api.server import app
+from swh.scheduler.tests.test_scheduler import CommonSchedulerTest
class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture,
unittest.TestCase):
"""Test the remote scheduler API.
This class doesn't define any tests as we want identical
functionality between local and remote scheduler. All the tests are
therefore defined in CommonSchedulerTest.
"""
def setUp(self):
self.config = {
'scheduler': {
'cls': 'local',
'args': {
'scheduling_db': 'dbname=%s' % self.TEST_DB_NAME,
}
}
}
self.app = app # this will setup the local scheduler...
super().setUp()
# accessible through a remote scheduler accessible on the
# given port
self.backend = get_scheduler('remote', {'url': self.url()})
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
index 443ec62..1a44fca 100644
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -1,484 +1,475 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import os
import random
import unittest
import uuid
+from collections import defaultdict
+import psycopg2
from arrow import utcnow
-from collections import defaultdict
from nose.plugins.attrib import attr
-from nose.tools import istest
-import psycopg2
from swh.core.tests.db_testing import SingleDbTestFixture
from swh.scheduler import get_scheduler
+
from . import DATA_DIR
@attr('db')
class CommonSchedulerTest(SingleDbTestFixture):
TEST_DB_NAME = 'softwareheritage-scheduler-test'
TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps/swh-scheduler.sql')
TEST_DB_DUMP_TYPE = 'psql'
def setUp(self):
super().setUp()
tt = {
'type': 'update-git',
'description': 'Update a git repository',
'backend_name': 'swh.loader.git.tasks.UpdateGitRepository',
'default_interval': datetime.timedelta(days=64),
'min_interval': datetime.timedelta(hours=12),
'max_interval': datetime.timedelta(days=64),
'backoff_factor': 2,
'max_queue_length': None,
'num_retries': 7,
'retry_delay': datetime.timedelta(hours=2),
}
tt2 = tt.copy()
tt2['type'] = 'update-hg'
tt2['description'] = 'Update a mercurial repository'
tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository'
tt2['max_queue_length'] = 42
tt2['num_retries'] = None
tt2['retry_delay'] = None
self.task_types = {
tt['type']: tt,
tt2['type']: tt2,
}
self.task1_template = t1_template = {
'type': tt['type'],
'arguments': {
'args': [],
'kwargs': {},
},
'next_run': None,
}
self.task2_template = t2_template = copy.deepcopy(t1_template)
t2_template['type'] = tt2['type']
t2_template['policy'] = 'oneshot'
def tearDown(self):
self.backend.close_connection()
self.empty_tables()
super().tearDown()
def empty_tables(self, whitelist=["priority_ratio"]):
query = """SELECT table_name FROM information_schema.tables
WHERE table_schema = %%s and
table_name not in (%s)
""" % ','.join(map(lambda t: "'%s'" % t, whitelist))
self.cursor.execute(query, ('public', ))
tables = set(table for (table,) in self.cursor.fetchall())
for table in tables:
self.cursor.execute('truncate table %s cascade' % table)
self.conn.commit()
- @istest
- def add_task_type(self):
+ def test_add_task_type(self):
tt, tt2 = self.task_types.values()
self.backend.create_task_type(tt)
self.assertEqual(tt, self.backend.get_task_type(tt['type']))
with self.assertRaisesRegex(psycopg2.IntegrityError,
r'\(type\)=\(%s\)' % tt['type']):
self.backend.create_task_type(tt)
self.backend.create_task_type(tt2)
self.assertEqual(tt, self.backend.get_task_type(tt['type']))
self.assertEqual(tt2, self.backend.get_task_type(tt2['type']))
- @istest
- def get_task_types(self):
+ def test_get_task_types(self):
tt, tt2 = self.task_types.values()
self.backend.create_task_type(tt)
self.backend.create_task_type(tt2)
self.assertCountEqual([tt2, tt], self.backend.get_task_types())
@staticmethod
def _task_from_template(template, next_run, priority, *args, **kwargs):
ret = copy.deepcopy(template)
ret['next_run'] = next_run
if priority:
ret['priority'] = priority
if args:
ret['arguments']['args'] = list(args)
if kwargs:
ret['arguments']['kwargs'] = kwargs
return ret
def _pop_priority(self, priorities):
if not priorities:
return None
for priority, remains in priorities.items():
if remains > 0:
priorities[priority] = remains - 1
return priority
return None
def _tasks_from_template(self, template, max_timestamp, num,
num_priority=0, priorities=None):
if num_priority and priorities:
priorities = {
priority: ratio * num_priority
for priority, ratio in priorities.items()
}
tasks = []
for i in range(num + num_priority):
priority = self._pop_priority(priorities)
tasks.append(self._task_from_template(
template,
max_timestamp - datetime.timedelta(microseconds=i),
priority,
'argument-%03d' % i,
**{'kwarg%03d' % i: 'bogus-kwarg'}
))
return tasks
def _create_task_types(self):
for tt in self.task_types.values():
self.backend.create_task_type(tt)
- @istest
- def create_tasks(self):
+ def test_create_tasks(self):
priority_ratio = self._priority_ratio()
self._create_task_types()
num_tasks_priority = 100
tasks_1 = self._tasks_from_template(self.task1_template, utcnow(), 100)
tasks_2 = self._tasks_from_template(
self.task2_template, utcnow(), 100,
num_tasks_priority, priorities=priority_ratio)
tasks = tasks_1 + tasks_2
# tasks are returned only once with their ids
ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2)
set_ret1 = set([t['id'] for t in ret1])
# creating the same set result in the same ids
ret = self.backend.create_tasks(tasks)
set_ret = set([t['id'] for t in ret])
# Idempotence results
self.assertEqual(set_ret, set_ret1)
self.assertEqual(len(ret), len(ret1))
ids = set()
actual_priorities = defaultdict(int)
for task, orig_task in zip(ret, tasks):
task = copy.deepcopy(task)
task_type = self.task_types[orig_task['type']]
self.assertNotIn(task['id'], ids)
self.assertEqual(task['status'], 'next_run_not_scheduled')
self.assertEqual(task['current_interval'],
task_type['default_interval'])
self.assertEqual(task['policy'], orig_task.get('policy',
'recurring'))
priority = task.get('priority')
if priority:
actual_priorities[priority] += 1
self.assertEqual(task['retries_left'],
task_type['num_retries'] or 0)
ids.add(task['id'])
del task['id']
del task['status']
del task['current_interval']
del task['retries_left']
if 'policy' not in orig_task:
del task['policy']
if 'priority' not in orig_task:
del task['priority']
self.assertEqual(task, orig_task)
self.assertEqual(dict(actual_priorities), {
priority: int(ratio * num_tasks_priority)
for priority, ratio in priority_ratio.items()
})
- @istest
- def peek_ready_tasks_no_priority(self):
+ def test_peek_ready_tasks_no_priority(self):
self._create_task_types()
t = utcnow()
task_type = self.task1_template['type']
tasks = self._tasks_from_template(self.task1_template, t, 100)
random.shuffle(tasks)
self.backend.create_tasks(tasks)
ready_tasks = self.backend.peek_ready_tasks(task_type)
self.assertEqual(len(ready_tasks), len(tasks))
for i in range(len(ready_tasks) - 1):
self.assertLessEqual(ready_tasks[i]['next_run'],
ready_tasks[i+1]['next_run'])
# Only get the first few ready tasks
limit = random.randrange(5, 5 + len(tasks)//2)
ready_tasks_limited = self.backend.peek_ready_tasks(
task_type, num_tasks=limit)
self.assertEqual(len(ready_tasks_limited), limit)
self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit])
# Limit by timestamp
max_ts = tasks[limit-1]['next_run']
ready_tasks_timestamped = self.backend.peek_ready_tasks(
task_type, timestamp=max_ts)
for ready_task in ready_tasks_timestamped:
self.assertLessEqual(ready_task['next_run'], max_ts)
# Make sure we get proper behavior for the first ready tasks
self.assertCountEqual(
ready_tasks[:len(ready_tasks_timestamped)],
ready_tasks_timestamped,
)
# Limit by both
ready_tasks_both = self.backend.peek_ready_tasks(
task_type, timestamp=max_ts, num_tasks=limit//3)
self.assertLessEqual(len(ready_tasks_both), limit//3)
for ready_task in ready_tasks_both:
self.assertLessEqual(ready_task['next_run'], max_ts)
self.assertIn(ready_task, ready_tasks[:limit//3])
def _priority_ratio(self):
self.cursor.execute('select id, ratio from priority_ratio')
priority_ratio = {}
for row in self.cursor.fetchall():
priority_ratio[row[0]] = row[1]
return priority_ratio
- @istest
- def peek_ready_tasks_mixed_priorities(self):
+ def test_peek_ready_tasks_mixed_priorities(self):
priority_ratio = self._priority_ratio()
self._create_task_types()
t = utcnow()
task_type = self.task1_template['type']
num_tasks_priority = 100
num_tasks_no_priority = 100
# Create tasks with and without priorities
tasks = self._tasks_from_template(
self.task1_template, t,
num=num_tasks_no_priority,
num_priority=num_tasks_priority,
priorities=priority_ratio)
random.shuffle(tasks)
self.backend.create_tasks(tasks)
# take all available tasks
ready_tasks = self.backend.peek_ready_tasks(
task_type)
self.assertEqual(len(ready_tasks), len(tasks))
self.assertEqual(num_tasks_priority + num_tasks_no_priority,
len(ready_tasks))
count_tasks_per_priority = defaultdict(int)
for task in ready_tasks:
priority = task.get('priority')
if priority:
count_tasks_per_priority[priority] += 1
self.assertEqual(dict(count_tasks_per_priority), {
priority: int(ratio * num_tasks_priority)
for priority, ratio in priority_ratio.items()
})
# Only get some ready tasks
num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2)
num_tasks_priority = random.randrange(5, num_tasks_priority//2)
ready_tasks_limited = self.backend.peek_ready_tasks(
task_type, num_tasks=num_tasks,
num_tasks_priority=num_tasks_priority)
count_tasks_per_priority = defaultdict(int)
for task in ready_tasks_limited:
priority = task.get('priority')
count_tasks_per_priority[priority] += 1
import math
for priority, ratio in priority_ratio.items():
expected_count = math.ceil(ratio * num_tasks_priority)
actual_prio = count_tasks_per_priority[priority]
self.assertTrue(
actual_prio == expected_count or
actual_prio == expected_count + 1)
self.assertEqual(count_tasks_per_priority[None], num_tasks)
- @istest
- def grab_ready_tasks(self):
+ def test_grab_ready_tasks(self):
priority_ratio = self._priority_ratio()
self._create_task_types()
t = utcnow()
task_type = self.task1_template['type']
num_tasks_priority = 100
num_tasks_no_priority = 100
# Create tasks with and without priorities
tasks = self._tasks_from_template(
self.task1_template, t,
num=num_tasks_no_priority,
num_priority=num_tasks_priority,
priorities=priority_ratio)
random.shuffle(tasks)
self.backend.create_tasks(tasks)
first_ready_tasks = self.backend.peek_ready_tasks(
task_type, num_tasks=10, num_tasks_priority=10)
grabbed_tasks = self.backend.grab_ready_tasks(
task_type, num_tasks=10, num_tasks_priority=10)
for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks):
self.assertEqual(peeked['status'], 'next_run_not_scheduled')
del peeked['status']
self.assertEqual(grabbed['status'], 'next_run_scheduled')
del grabbed['status']
self.assertEqual(peeked, grabbed)
self.assertEqual(peeked['priority'], grabbed['priority'])
- @istest
- def get_tasks(self):
+ def test_get_tasks(self):
self._create_task_types()
t = utcnow()
tasks = self._tasks_from_template(self.task1_template, t, 100)
tasks = self.backend.create_tasks(tasks)
random.shuffle(tasks)
while len(tasks) > 1:
length = random.randrange(1, len(tasks))
cur_tasks = tasks[:length]
tasks[:length] = []
ret = self.backend.get_tasks(task['id'] for task in cur_tasks)
self.assertCountEqual(ret, cur_tasks)
- @istest
- def filter_task_to_archive(self):
+ def test_filter_task_to_archive(self):
"""Filtering only list disabled recurring or completed oneshot tasks
"""
self._create_task_types()
_time = utcnow()
recurring = self._tasks_from_template(self.task1_template, _time, 12)
oneshots = self._tasks_from_template(self.task2_template, _time, 12)
total_tasks = len(recurring) + len(oneshots)
# simulate scheduling tasks
pending_tasks = self.backend.create_tasks(recurring + oneshots)
backend_tasks = [{
'task': task['id'],
'backend_id': str(uuid.uuid4()),
'scheduled': utcnow(),
} for task in pending_tasks]
self.backend.mass_schedule_task_runs(backend_tasks)
# we simulate the task are being done
_tasks = []
for task in backend_tasks:
t = self.backend.end_task_run(
task['backend_id'], status='eventful')
_tasks.append(t)
# Randomly update task's status per policy
status_per_policy = {'recurring': 0, 'oneshot': 0}
status_choice = {
# policy: [tuple (1-for-filtering, 'associated-status')]
'recurring': [(1, 'disabled'),
(0, 'completed'),
(0, 'next_run_not_scheduled')],
'oneshot': [(0, 'next_run_not_scheduled'),
(1, 'disabled'),
(1, 'completed')]
}
tasks_to_update = defaultdict(list)
_task_ids = defaultdict(list)
# randomize 'disabling' recurring task or 'complete' oneshot task
for task in pending_tasks:
policy = task['policy']
_task_ids[policy].append(task['id'])
status = random.choice(status_choice[policy])
if status[0] != 1:
continue
# elected for filtering
status_per_policy[policy] += status[0]
tasks_to_update[policy].append(task['id'])
self.backend.disable_tasks(tasks_to_update['recurring'])
# hack: change the status to something else than completed/disabled
self.backend.set_status_tasks(
_task_ids['oneshot'], status='next_run_not_scheduled')
# complete the tasks to update
self.backend.set_status_tasks(
tasks_to_update['oneshot'], status='completed')
total_tasks_filtered = (status_per_policy['recurring'] +
status_per_policy['oneshot'])
# retrieve tasks to archive
after = _time.shift(days=-1).format('YYYY-MM-DD')
before = utcnow().shift(days=1).format('YYYY-MM-DD')
tasks_to_archive = list(self.backend.filter_task_to_archive(
after_ts=after, before_ts=before, limit=total_tasks))
self.assertEqual(len(tasks_to_archive), total_tasks_filtered)
actual_filtered_per_status = {'recurring': 0, 'oneshot': 0}
for task in tasks_to_archive:
actual_filtered_per_status[task['task_policy']] += 1
self.assertEqual(actual_filtered_per_status, status_per_policy)
- @istest
- def delete_archived_tasks(self):
+ def test_delete_archived_tasks(self):
self._create_task_types()
_time = utcnow()
recurring = self._tasks_from_template(
self.task1_template, _time, 12)
oneshots = self._tasks_from_template(
self.task2_template, _time, 12)
total_tasks = len(recurring) + len(oneshots)
pending_tasks = self.backend.create_tasks(recurring + oneshots)
backend_tasks = [{
'task': task['id'],
'backend_id': str(uuid.uuid4()),
'scheduled': utcnow(),
} for task in pending_tasks]
self.backend.mass_schedule_task_runs(backend_tasks)
_tasks = []
percent = random.randint(0, 100) # random election removal boundary
for task in backend_tasks:
t = self.backend.end_task_run(
task['backend_id'], status='eventful')
c = random.randint(0, 100)
if c <= percent:
_tasks.append({'task_id': t['task'], 'task_run_id': t['id']})
self.backend.delete_archived_tasks(_tasks)
self.cursor.execute('select count(*) from task')
tasks_count = self.cursor.fetchone()
self.cursor.execute('select count(*) from task_run')
tasks_run_count = self.cursor.fetchone()
self.assertEqual(tasks_count[0], total_tasks - len(_tasks))
self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks))
class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase):
def setUp(self):
super().setUp()
self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME}
self.backend = get_scheduler('local', self.config)
diff --git a/swh/scheduler/tests/test_task.py b/swh/scheduler/tests/test_task.py
index 10131e9..7e2130e 100644
--- a/swh/scheduler/tests/test_task.py
+++ b/swh/scheduler/tests/test_task.py
@@ -1,32 +1,28 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
-from nose.tools import istest
-
from swh.scheduler import task
from .celery_testing import CeleryTestFixture
class Task(CeleryTestFixture, unittest.TestCase):
- @istest
- def not_implemented_task(self):
+ def test_not_implemented_task(self):
class NotImplementedTask(task.Task):
pass
with self.assertRaises(NotImplementedError):
NotImplementedTask().run()
- @istest
- def add_task(self):
+ def test_add_task(self):
class AddTask(task.Task):
def run_task(self, x, y):
return x + y
r = AddTask().apply([2, 3])
self.assertTrue(r.successful())
self.assertEqual(r.result, 5)
diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py
index a539cea..192be61 100644
--- a/swh/scheduler/tests/test_utils.py
+++ b/swh/scheduler/tests/test_utils.py
@@ -1,60 +1,56 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
-
from datetime import timezone
-from nose.tools import istest
from unittest.mock import patch
from swh.scheduler import utils
class UtilsTest(unittest.TestCase):
- @istest
@patch('swh.scheduler.utils.datetime')
- def create_oneshot_task_dict_simple(self, mock_datetime):
+ def test_create_oneshot_task_dict_simple(self, mock_datetime):
mock_datetime.now.return_value = 'some-date'
actual_task = utils.create_oneshot_task_dict('some-task-type')
expected_task = {
'policy': 'oneshot',
'type': 'some-task-type',
'next_run': 'some-date',
'arguments': {
'args': [],
'kwargs': {},
},
'priority': None,
}
self.assertEqual(actual_task, expected_task)
mock_datetime.now.assert_called_once_with(tz=timezone.utc)
- @istest
@patch('swh.scheduler.utils.datetime')
- def create_oneshot_task_dict_other_call(self, mock_datetime):
+ def test_create_oneshot_task_dict_other_call(self, mock_datetime):
mock_datetime.now.return_value = 'some-other-date'
actual_task = utils.create_oneshot_task_dict(
'some-task-type', 'arg0', 'arg1',
priority='high', other_stuff='normal'
)
expected_task = {
'policy': 'oneshot',
'type': 'some-task-type',
'next_run': 'some-other-date',
'arguments': {
'args': ('arg0', 'arg1'),
'kwargs': {'other_stuff': 'normal'},
},
'priority': 'high',
}
self.assertEqual(actual_task, expected_task)
mock_datetime.now.assert_called_once_with(tz=timezone.utc)
diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py
index 2e3f70e..68683d5 100644
--- a/swh/scheduler/tests/updater/test_backend.py
+++ b/swh/scheduler/tests/updater/test_backend.py
@@ -1,68 +1,66 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import unittest
from arrow import utcnow
-from nose.plugins.attrib import attr
-from nose.tools import istest
from hypothesis import given
from hypothesis.strategies import sets
+from nose.plugins.attrib import attr
from swh.core.tests.db_testing import SingleDbTestFixture
+from swh.scheduler.tests import DATA_DIR
from swh.scheduler.updater.backend import SchedulerUpdaterBackend
from swh.scheduler.updater.events import SWHEvent
-from swh.scheduler.tests import DATA_DIR
from . import from_regex
@attr('db')
class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase):
TEST_DB_NAME = 'softwareheritage-scheduler-updater-test'
TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps/swh-scheduler-updater.sql')
TEST_DB_DUMP_TYPE = 'psql'
def setUp(self):
super().setUp()
config = {
'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME,
'cache_read_limit': 1000,
}
self.backend = SchedulerUpdaterBackend(**config)
def _empty_tables(self):
self.cursor.execute(
"""SELECT table_name FROM information_schema.tables
WHERE table_schema = %s""", ('public', ))
tables = set(table for (table,) in self.cursor.fetchall())
for table in tables:
self.cursor.execute('truncate table %s cascade' % table)
self.conn.commit()
def tearDown(self):
self.backend.close_connection()
self._empty_tables()
super().tearDown()
- @istest
@given(sets(
from_regex(
r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
min_size=10, max_size=15))
- def cache_read(self, urls):
+ def test_cache_read(self, urls):
def gen_events(urls):
for url in urls:
yield SWHEvent({
'url': url,
'type': 'create',
'origin_type': 'git',
})
self.backend.cache_put(gen_events(urls))
r = self.backend.cache_read(timestamp=utcnow())
self.assertNotEqual(r, [])
diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py
index 0944e48..463a05c 100644
--- a/swh/scheduler/tests/updater/test_consumer.py
+++ b/swh/scheduler/tests/updater/test_consumer.py
@@ -1,199 +1,194 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
+from itertools import chain
from hypothesis import given
-from hypothesis.strategies import sampled_from, lists, tuples, text
+from hypothesis.strategies import lists, sampled_from, text, tuples
-from itertools import chain
-from nose.tools import istest
+from swh.scheduler.updater.consumer import UpdaterConsumer
+from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
from . import UpdaterTestUtil, from_regex
-from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS
-from swh.scheduler.updater.consumer import UpdaterConsumer
-
class FakeSchedulerUpdaterBackend:
def __init__(self):
self.events = []
def cache_put(self, events):
self.events.append(events)
class FakeUpdaterConsumerBase(UpdaterConsumer):
def __init__(self, backend_class=FakeSchedulerUpdaterBackend):
super().__init__(backend_class=backend_class)
self.connection_opened = False
self.connection_closed = False
self.consume_called = False
self.has_events_called = False
def open_connection(self):
self.connection_opened = True
def close_connection(self):
self.connection_closed = True
def convert_event(self, event):
pass
class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase):
def has_events(self):
self.has_events_called = True
return True
def consume_events(self):
self.consume_called = True
raise ValueError('Broken stuff')
class UpdaterConsumerRaisingTest(unittest.TestCase):
def setUp(self):
self.updater = FakeUpdaterConsumerRaise()
- @istest
- def running_raise(self):
+ def test_running_raise(self):
"""Raising during run should finish fine.
"""
# given
self.assertEqual(self.updater.count, 0)
self.assertEqual(self.updater.seen_events, set())
self.assertEqual(self.updater.events, [])
# when
with self.assertRaisesRegex(ValueError, 'Broken stuff'):
self.updater.run()
# then
self.assertEqual(self.updater.count, 0)
self.assertEqual(self.updater.seen_events, set())
self.assertEqual(self.updater.events, [])
self.assertTrue(self.updater.connection_opened)
self.assertTrue(self.updater.has_events_called)
self.assertTrue(self.updater.connection_closed)
self.assertTrue(self.updater.consume_called)
class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase):
def has_events(self):
self.has_events_called = True
return False
def consume_events(self):
self.consume_called = True
class UpdaterConsumerNoEventTest(unittest.TestCase):
def setUp(self):
self.updater = FakeUpdaterConsumerNoEvent()
- @istest
- def running_does_not_consume(self):
+ def test_running_does_not_consume(self):
"""Run with no events should do just fine"""
# given
self.assertEqual(self.updater.count, 0)
self.assertEqual(self.updater.seen_events, set())
self.assertEqual(self.updater.events, [])
# when
self.updater.run()
# then
self.assertEqual(self.updater.count, 0)
self.assertEqual(self.updater.seen_events, set())
self.assertEqual(self.updater.events, [])
self.assertTrue(self.updater.connection_opened)
self.assertTrue(self.updater.has_events_called)
self.assertTrue(self.updater.connection_closed)
self.assertFalse(self.updater.consume_called)
EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type']
class FakeUpdaterConsumer(FakeUpdaterConsumerBase):
def __init__(self, messages):
super().__init__()
self.messages = messages
self.debug = False
def has_events(self):
self.has_events_called = True
return len(self.messages) > 0
def consume_events(self):
self.consume_called = True
for msg in self.messages:
yield msg
self.messages.pop()
def convert_event(self, event, keys=EVENT_KEYS):
for k in keys:
v = event.get(k)
if v is None:
return None
e = {
'type': event['type'],
'url': 'https://fake.url/%s' % event['repo']['name'],
'last_seen': event['created_at'],
'origin_type': event['origin_type'],
}
return SWHEvent(e)
class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase):
- @istest
@given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type
from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name
text()), # origin type
min_size=3, max_size=10),
lists(tuples(text(), # event type
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name
text()), # origin type
min_size=3, max_size=10),
lists(tuples(sampled_from(LISTENED_EVENTS), # event type
from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name
text(), # origin type
sampled_from(EVENT_KEYS)), # keys to drop
min_size=3, max_size=10))
- def running(self, events, uninteresting_events, incomplete_events):
+ def test_running(self, events, uninteresting_events, incomplete_events):
"""Interesting events are written to cache, others are dropped
"""
# given
ready_events = self._make_events(events)
ready_uninteresting_events = self._make_events(uninteresting_events)
ready_incomplete_events = self._make_incomplete_events(
incomplete_events)
updater = FakeUpdaterConsumer(list(chain(
ready_events, ready_incomplete_events,
ready_uninteresting_events)))
self.assertEqual(updater.count, 0)
self.assertEqual(updater.seen_events, set())
self.assertEqual(updater.events, [])
# when
updater.run()
# then
self.assertEqual(updater.count, 0)
self.assertEqual(updater.seen_events, set())
self.assertEqual(updater.events, [])
self.assertTrue(updater.connection_opened)
self.assertTrue(updater.has_events_called)
self.assertTrue(updater.connection_closed)
self.assertTrue(updater.consume_called)
self.assertEqual(updater.messages, [])
# uninteresting or incomplete events are dropped
self.assertTrue(len(updater.backend.events), len(events))
diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py
index cb7489e..2f00bd7 100644
--- a/swh/scheduler/tests/updater/test_events.py
+++ b/swh/scheduler/tests/updater/test_events.py
@@ -1,48 +1,44 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from hypothesis import given
-from hypothesis.strategies import text, sampled_from
-from nose.tools import istest
+from hypothesis.strategies import sampled_from, text
-from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS
+from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
from swh.scheduler.updater.ghtorrent import events
from . import UpdaterTestUtil
def event_values_ko():
return set(events['evt']).union(
set(events['ent'])).difference(
set(LISTENED_EVENTS))
WRONG_EVENTS = sorted(list(event_values_ko()))
class EventTest(UpdaterTestUtil, unittest.TestCase):
- @istest
@given(sampled_from(LISTENED_EVENTS), text(), text())
- def is_interesting_ok(self, event_type, name, origin_type):
+ def test_is_interesting_ok(self, event_type, name, origin_type):
evt = self._make_simple_event(event_type, name, origin_type)
self.assertTrue(SWHEvent(evt).is_interesting())
- @istest
@given(text(), text(), text())
- def is_interested_with_noisy_event_should_be_ko(
+ def test_is_interested_with_noisy_event_should_be_ko(
self, event_type, name, origin_type):
if event_type in LISTENED_EVENTS:
# just in case something good is generated, skip it
return
evt = self._make_simple_event(event_type, name, origin_type)
self.assertFalse(SWHEvent(evt).is_interesting())
- @istest
@given(sampled_from(WRONG_EVENTS), text(), text())
- def is_interesting_ko(self, event_type, name, origin_type):
+ def test_is_interesting_ko(self, event_type, name, origin_type):
evt = self._make_simple_event(event_type, name, origin_type)
self.assertFalse(SWHEvent(evt).is_interesting())
diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py
index bfeecf2..b87e345 100644
--- a/swh/scheduler/tests/updater/test_ghtorrent.py
+++ b/swh/scheduler/tests/updater/test_ghtorrent.py
@@ -1,171 +1,164 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
+from unittest.mock import patch
from hypothesis import given
from hypothesis.strategies import sampled_from
-from nose.tools import istest
-from unittest.mock import patch
from swh.scheduler.updater.events import SWHEvent
-from swh.scheduler.updater.ghtorrent import (
- events, GHTorrentConsumer, INTERESTING_EVENT_KEYS)
+from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS,
+ GHTorrentConsumer, events)
-from . import from_regex, UpdaterTestUtil
+from . import UpdaterTestUtil, from_regex
def event_values():
return set(events['evt']).union(set(events['ent']))
def ghtorrentize_event_name(event_name):
return '%sEvent' % event_name.capitalize()
EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()])
class FakeChannel:
"""Fake Channel (virtual connection inside a connection)
"""
def close(self):
self.close = True
class FakeConnection:
"""Fake Rabbitmq connection for test purposes
"""
def __init__(self, conn_string):
self._conn_string = conn_string
self._connect = False
self._release = False
self._channel = False
def connect(self):
self._connect = True
return True
def release(self):
self._connect = False
self._release = True
def channel(self):
self._channel = True
return FakeChannel()
class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase):
def setUp(self):
self.fake_config = {
'conn': {
'url': 'amqp://u:p@https://somewhere:9807',
},
'debug': True,
'batch_cache_write': 10,
'rabbitmq_prefetch_read': 100,
}
self.consumer = GHTorrentConsumer(self.fake_config,
_connection_class=FakeConnection)
- @istest
def test_init(self):
# given
# check init is ok
self.assertEqual(self.consumer.debug,
self.fake_config['debug'])
self.assertEqual(self.consumer.batch,
self.fake_config['batch_cache_write'])
self.assertEqual(self.consumer.prefetch_read,
self.fake_config['rabbitmq_prefetch_read'])
self.assertEqual(self.consumer.config, self.fake_config)
- @istest
def test_has_events(self):
self.assertTrue(self.consumer.has_events())
- @istest
def test_connection(self):
# when
self.consumer.open_connection()
# then
self.assertEqual(self.consumer.conn._conn_string,
self.fake_config['conn']['url'])
self.assertTrue(self.consumer.conn._connect)
self.assertFalse(self.consumer.conn._release)
# when
self.consumer.close_connection()
# then
self.assertFalse(self.consumer.conn._connect)
self.assertTrue(self.consumer.conn._release)
self.assertIsInstance(self.consumer.channel, FakeChannel)
- @istest
@given(sampled_from(EVENT_TYPES),
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'))
- def convert_event_ok(self, event_type, name):
+ def test_convert_event_ok(self, event_type, name):
input_event = self._make_event(event_type, name, 'git')
actual_event = self.consumer.convert_event(input_event)
self.assertTrue(isinstance(actual_event, SWHEvent))
event = actual_event.get()
expected_event = {
'type': event_type.lower().rstrip('Event'),
'url': 'https://github.com/%s' % name,
'last_seen': input_event['created_at'],
'cnt': 1,
'origin_type': 'git',
}
self.assertEqual(event, expected_event)
- @istest
@given(sampled_from(EVENT_TYPES),
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
sampled_from(INTERESTING_EVENT_KEYS))
- def convert_event_ko(self, event_type, name, missing_data_key):
+ def test_convert_event_ko(self, event_type, name, missing_data_key):
input_event = self._make_incomplete_event(
event_type, name, 'git', missing_data_key)
actual_converted_event = self.consumer.convert_event(input_event)
self.assertIsNone(actual_converted_event)
@patch('swh.scheduler.updater.ghtorrent.collect_replies')
- @istest
- def consume_events(self, mock_collect_replies):
+ def test_consume_events(self, mock_collect_replies):
# given
self.consumer.queue = 'fake-queue' # hack
self.consumer.open_connection()
fake_events = [
self._make_event('PushEvent', 'user/some-repo', 'git'),
self._make_event('PushEvent', 'user2/some-other-repo', 'git'),
]
mock_collect_replies.return_value = fake_events
# when
actual_events = []
for e in self.consumer.consume_events():
actual_events.append(e)
# then
self.assertEqual(fake_events, actual_events)
mock_collect_replies.assert_called_once_with(
self.consumer.conn,
self.consumer.channel,
'fake-queue',
no_ack=False,
limit=self.fake_config['rabbitmq_prefetch_read']
)
diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py
index 305cd35..2f226d9 100644
--- a/swh/scheduler/tests/updater/test_writer.py
+++ b/swh/scheduler/tests/updater/test_writer.py
@@ -1,163 +1,158 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import unittest
-
from nose.plugins.attrib import attr
-from nose.tools import istest
from swh.core.tests.db_testing import DbTestFixture
-from swh.scheduler.updater.events import SWHEvent
+from swh.scheduler.tests import DATA_DIR
+from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
from swh.scheduler.updater.writer import UpdaterWriter
-from swh.scheduler.updater.events import LISTENED_EVENTS
-from swh.scheduler.tests import DATA_DIR
from . import UpdaterTestUtil
@attr('db')
class CommonSchedulerTest(DbTestFixture):
TEST_SCHED_DB = 'softwareheritage-scheduler-test'
TEST_SCHED_DUMP = os.path.join(
DATA_DIR, 'dumps/swh-scheduler.sql')
TEST_SCHED_DUMP_TYPE = 'psql'
TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test'
TEST_SCHED_UPDATER_DUMP = os.path.join(
DATA_DIR, 'dumps/swh-scheduler-updater.sql')
TEST_SCHED_UPDATER_DUMP_TYPE = 'psql'
@classmethod
def setUpClass(cls):
cls.add_db(cls.TEST_SCHED_DB, cls.TEST_SCHED_DUMP,
cls.TEST_SCHED_DUMP_TYPE)
cls.add_db(cls.TEST_SCHED_UPDATER_DB, cls.TEST_SCHED_UPDATER_DUMP,
cls.TEST_SCHED_UPDATER_DUMP_TYPE)
super().setUpClass()
def tearDown(self):
self.reset_db_tables(self.TEST_SCHED_UPDATER_DB)
self.reset_db_tables(self.TEST_SCHED_DB,
excluded=['task_type', 'priority_ratio'])
super().tearDown()
class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest,
unittest.TestCase):
def setUp(self):
super().setUp()
config = {
'scheduler': {
'cls': 'local',
'args': {
'scheduling_db': 'dbname=softwareheritage-scheduler-test',
},
},
'scheduler_updater': {
'scheduling_updater_db':
'dbname=softwareheritage-scheduler-updater-test',
'cache_read_limit': 5,
},
'pause': 0.1,
'verbose': False,
}
self.writer = UpdaterWriter(**config)
self.scheduler_backend = self.writer.scheduler_backend
self.scheduler_updater_backend = self.writer.scheduler_updater_backend
def tearDown(self):
self.scheduler_backend.close_connection()
self.scheduler_updater_backend.close_connection()
super().tearDown()
- @istest
- def run_ko(self):
+ def test_run_ko(self):
"""Only git tasks are supported for now, other types are dismissed.
"""
ready_events = [
SWHEvent(
self._make_simple_event(event_type, 'origin-%s' % i,
'svn'))
for i, event_type in enumerate(LISTENED_EVENTS)
]
expected_length = len(ready_events)
self.scheduler_updater_backend.cache_put(ready_events)
data = list(self.scheduler_updater_backend.cache_read())
self.assertEqual(len(data), expected_length)
r = self.scheduler_backend.peek_ready_tasks(
'origin-update-git')
# first read on an empty scheduling db results with nothing in it
self.assertEqual(len(r), 0)
# Read from cache to scheduler db
self.writer.run()
r = self.scheduler_backend.peek_ready_tasks(
'origin-update-git')
# other reads after writes are still empty since it's not supported
self.assertEqual(len(r), 0)
- @istest
- def run_ok(self):
+ def test_run_ok(self):
"""Only git origin are supported for now
"""
ready_events = [
SWHEvent(
self._make_simple_event(event_type, 'origin-%s' % i, 'git'))
for i, event_type in enumerate(LISTENED_EVENTS)
]
expected_length = len(ready_events)
self.scheduler_updater_backend.cache_put(ready_events)
data = list(self.scheduler_updater_backend.cache_read())
self.assertEqual(len(data), expected_length)
r = self.scheduler_backend.peek_ready_tasks(
'origin-update-git')
# first read on an empty scheduling db results with nothing in it
self.assertEqual(len(r), 0)
# Read from cache to scheduler db
self.writer.run()
# now, we should have scheduling task ready
r = self.scheduler_backend.peek_ready_tasks(
'origin-update-git')
self.assertEquals(len(r), expected_length)
# Check the task has been scheduled
for t in r:
self.assertEquals(t['type'], 'origin-update-git')
self.assertEquals(t['priority'], 'normal')
self.assertEquals(t['policy'], 'oneshot')
self.assertEquals(t['status'], 'next_run_not_scheduled')
# writer has nothing to do now
self.writer.run()
# so no more data in cache
data = list(self.scheduler_updater_backend.cache_read())
self.assertEqual(len(data), 0)
# provided, no runner is ran, still the same amount of scheduling tasks
r = self.scheduler_backend.peek_ready_tasks(
'origin-update-git')
self.assertEquals(len(r), expected_length)

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 8:07 AM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3262948

Event Timeline