diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py new file mode 100644 index 0000000..b35a6b1 --- /dev/null +++ b/swh/scheduler/tests/updater/test_writer.py @@ -0,0 +1,162 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import unittest + + +from nose.plugins.attrib import attr +from nose.tools import istest + +from swh.core.tests.db_testing import DbTestFixture +from swh.scheduler.updater.events import SWHEvent +from swh.scheduler.updater.writer import UpdaterWriter +from swh.scheduler.updater.events import LISTENED_EVENTS + +from swh.scheduler.tests.updater import UpdaterTestUtil + + +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') + + +@attr('db') +class CommonSchedulerTest(DbTestFixture): + TEST_SCHED_DB = 'softwareheritage-scheduler-test' + TEST_SCHED_DUMP = os.path.join(TEST_DATA_DIR, + 'dumps/swh-scheduler.dump') + + TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' + TEST_SCHED_UPDATER_DUMP = os.path.join(TEST_DATA_DIR, + 'dumps/swh-scheduler-updater.dump') + + @classmethod + def setUpClass(cls): + cls.add_db(cls.TEST_SCHED_DB, cls.TEST_SCHED_DUMP) + cls.add_db(cls.TEST_SCHED_UPDATER_DB, cls.TEST_SCHED_UPDATER_DUMP) + super().setUpClass() + + def tearDown(self): + self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) + self.reset_db_tables(self.TEST_SCHED_DB, + excluded=['task_type', 'priority_ratio']) + super().tearDown() + + +class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, + unittest.TestCase): + def setUp(self): + super().setUp() + + config = { + 'scheduler': { + 'cls': 'local', + 'args': { + 'scheduling_db': 'dbname=softwareheritage-scheduler-test', + }, + }, + 'scheduler_updater': { + 'scheduling_updater_db': + 'dbname=softwareheritage-scheduler-updater-test', + 'cache_read_limit': 5, + }, + 'pause': 0.1, + 'verbose': False, + } + self.writer = UpdaterWriter(**config) + self.scheduler_backend = self.writer.scheduler_backend + self.scheduler_updater_backend = self.writer.scheduler_updater_backend + + def tearDown(self): + self.scheduler_backend.close_connection() + self.scheduler_updater_backend.close_connection() + super().tearDown() + + @istest + def run_ko(self): + """Only git tasks are supported for now, other types are dismissed. + + """ + ready_events = [ + SWHEvent( + self._make_simple_event(event_type, 'origin-%s' % i, + 'svn')) + for i, event_type in enumerate(LISTENED_EVENTS) + ] + + expected_length = len(ready_events) + + self.scheduler_updater_backend.cache_put(ready_events) + data = list(self.scheduler_updater_backend.cache_read()) + self.assertEqual(len(data), expected_length) + + r = self.scheduler_backend.peek_ready_tasks( + 'origin-update-git') + + # first read on an empty scheduling db results with nothing in it + self.assertEqual(len(r), 0) + + # Read from cache to scheduler db + self.writer.run() + + r = self.scheduler_backend.peek_ready_tasks( + 'origin-update-git') + + # other reads after writes are still empty since it's not supported + self.assertEqual(len(r), 0) + + @istest + def run_ok(self): + """Only git origin are supported for now + + """ + ready_events = [ + SWHEvent( + self._make_simple_event(event_type, 'origin-%s' % i, 'git')) + for i, event_type in enumerate(LISTENED_EVENTS) + ] + + expected_length = len(ready_events) + + self.scheduler_updater_backend.cache_put(ready_events) + + data = list(self.scheduler_updater_backend.cache_read()) + self.assertEqual(len(data), expected_length) + + r = self.scheduler_backend.peek_ready_tasks( + 'origin-update-git') + + # first read on an empty scheduling db results with nothing in it + self.assertEqual(len(r), 0) + + # Read from cache to scheduler db + self.writer.run() + + # now, we should have scheduling task ready + r = self.scheduler_backend.peek_ready_tasks( + 'origin-update-git') + + self.assertEquals(len(r), expected_length) + + # Check the task has been scheduled + for t in r: + self.assertEquals(t['type'], 'origin-update-git') + self.assertEquals(t['priority'], 'normal') + self.assertEquals(t['policy'], 'oneshot') + self.assertEquals(t['status'], 'next_run_not_scheduled') + + # writer has nothing to do now + self.writer.run() + + # so no more data in cache + data = list(self.scheduler_updater_backend.cache_read()) + + self.assertEqual(len(data), 0) + + # provided, no runner is ran, still the same amount of scheduling tasks + r = self.scheduler_backend.peek_ready_tasks( + 'origin-update-git') + + self.assertEquals(len(r), expected_length) diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py index 0f4e50a..c744360 100644 --- a/swh/scheduler/updater/writer.py +++ b/swh/scheduler/updater/writer.py @@ -1,122 +1,122 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import time from arrow import utcnow from swh.core.config import SWHConfig from swh.core import utils from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterWriter(SWHConfig): """Updater writer in charge of updating the scheduler db with latest prioritized oneshot tasks In effect, this: - reads the events from scheduler updater's db - converts those events into priority oneshot tasks - dumps them into the scheduler db """ CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' DEFAULT_CONFIG = { # access to the scheduler backend 'scheduler': ('dict', { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', }, }), # access to the scheduler updater cache 'scheduler_updater': ('dict', { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-dev', 'cache_read_limit': 1000, }), - # waiting time between db read when no more data exists + # waiting time between db reads 'pause': ('int', 10), # verbose or not 'verbose': ('bool', False), } def __init__(self, **config): if config: self.config = config else: self.config = self.parse_config_file() self.scheduler_updater_backend = SchedulerUpdaterBackend( **self.config['scheduler_updater']) self.scheduler_backend = get_scheduler(**self.config['scheduler']) self.pause = self.config['pause'] self.log = logging.getLogger( 'swh.scheduler.updater.writer.UpdaterWriter') self.log.setLevel( logging.DEBUG if self.config['verbose'] else logging.INFO) def convert_to_oneshot_task(self, event): """Given an event, convert it into oneshot task with priority Args: event (dict): The event to convert to task """ if event['origin_type'] == 'git': return create_oneshot_task_dict( 'origin-update-git', event['url'], priority='normal') self.log.warn('Type %s is not supported for now, only git' % ( - event['type'], )) + event['origin_type'], )) return None def write_event_to_scheduler(self, events): """Write events to the scheduler and yield ids when done""" # convert events to oneshot tasks oneshot_tasks = filter(lambda e: e is not None, map(self.convert_to_oneshot_task, events)) # write event to scheduler self.scheduler_backend.create_tasks(oneshot_tasks) for e in events: yield e['url'] def run(self): """First retrieve events from cache (including origin_type, cnt), then convert them into oneshot tasks with priority, then write them to the scheduler db, at last remove them from cache. """ while True: timestamp = utcnow() events = list(self.scheduler_updater_backend.cache_read(timestamp)) if not events: break for urls in utils.grouper(self.write_event_to_scheduler(events), n=100): self.scheduler_updater_backend.cache_remove(urls) time.sleep(self.pause) @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') def main(verbose): log = logging.getLogger('swh.scheduler.updater.writer') log.addHandler(logging.StreamHandler()) _loglevel = logging.DEBUG if verbose else logging.INFO log.setLevel(_loglevel) UpdaterWriter().run() if __name__ == '__main__': main()