diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py new file mode 100644 index 0000000..a539cea --- /dev/null +++ b/swh/scheduler/tests/test_utils.py @@ -0,0 +1,60 @@ +# Copyright (C) 2017-2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from datetime import timezone +from nose.tools import istest +from unittest.mock import patch + +from swh.scheduler import utils + + +class UtilsTest(unittest.TestCase): + + @istest + @patch('swh.scheduler.utils.datetime') + def create_oneshot_task_dict_simple(self, mock_datetime): + mock_datetime.now.return_value = 'some-date' + + actual_task = utils.create_oneshot_task_dict('some-task-type') + + expected_task = { + 'policy': 'oneshot', + 'type': 'some-task-type', + 'next_run': 'some-date', + 'arguments': { + 'args': [], + 'kwargs': {}, + }, + 'priority': None, + } + + self.assertEqual(actual_task, expected_task) + mock_datetime.now.assert_called_once_with(tz=timezone.utc) + + @istest + @patch('swh.scheduler.utils.datetime') + def create_oneshot_task_dict_other_call(self, mock_datetime): + mock_datetime.now.return_value = 'some-other-date' + + actual_task = utils.create_oneshot_task_dict( + 'some-task-type', 'arg0', 'arg1', + priority='high', other_stuff='normal' + ) + + expected_task = { + 'policy': 'oneshot', + 'type': 'some-task-type', + 'next_run': 'some-other-date', + 'arguments': { + 'args': ('arg0', 'arg1'), + 'kwargs': {'other_stuff': 'normal'}, + }, + 'priority': 'high', + } + + self.assertEqual(actual_task, expected_task) + mock_datetime.now.assert_called_once_with(tz=timezone.utc) diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py index 95e6315..8aa1319 100644 --- a/swh/scheduler/updater/writer.py +++ b/swh/scheduler/updater/writer.py @@ -1,144 +1,126 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import time from arrow import utcnow from swh.core.config import SWHConfig from swh.core import utils from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_oneshot_task_dict from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterWriter(SWHConfig): """Updater writer in charge of updating the scheduler db with latest prioritized oneshot tasks In effect, this: - reads the events from scheduler updater's db - converts those events into priority oneshot tasks - dumps them into the scheduler db """ CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' DEFAULT_CONFIG = { # access to the scheduler backend 'scheduler': ('dict', { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', }, }), # access to the scheduler updater cache 'scheduler_updater': ('dict', { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-dev', # noqa 'cache_read_limit': 1000, }), # waiting time between db read when no more data exists 'pause': ('int', 10), # verbose or not 'verbose': ('bool', False), } def __init__(self, **config): if config: self.config = config else: self.config = self.parse_config_file() self.scheduler_updater_backend = SchedulerUpdaterBackend( **self.config['scheduler_updater']) self.scheduler_backend = get_scheduler(**self.config['scheduler']) self.pause = self.config['pause'] self.log = logging.getLogger( 'swh.scheduler.updater.writer.UpdaterWriter') - - def _compute_priority(self, cnt): - """Given a ratio, compute the task priority. - - """ - if cnt < 5: - return 'low' - elif cnt < 50: - return 'normal' - else: - return 'high' self.log.setLevel( logging.DEBUG if self.config['verbose'] else logging.INFO) def convert_to_oneshot_task(self, event): """Given an event, convert it into oneshot task with priority Args: event (dict): The event to convert to task """ if event['origin_type'] == 'git': - return { - 'type': 'origin-update-git', - 'arguments': { - 'args': [event['url']], - 'kwargs': {}, - }, - 'next_run': utcnow(), - 'policy': 'oneshot', - 'retries_left': 2, - 'priority': self._compute_priority(event['cnt']), - } - else: - self.log.warn('Type %s is not supported for now, only git' % ( - event['type'], )) - return None + return create_oneshot_task_dict( + 'origin-update-git', + event['url'], + priority='normal') + self.log.warn('Type %s is not supported for now, only git' % ( + event['type'], )) + return None def write_event_to_scheduler(self, events): """Write events to the scheduler and yield ids when done""" for event in events: # convert event to oneshot task oneshot_task = self.convert_to_oneshot_task(event) if not oneshot_task: continue # write event to scheduler # FIXME: deal with this in batch r = self.scheduler_backend.create_tasks([oneshot_task]) if r: yield event['url'] def run(self): """First retrieve events from cache (including origin_type, cnt), then convert them into oneshot tasks with priority, then write them to the scheduler db, at last remove them from cache. """ while True: timestamp = utcnow() events = self.scheduler_updater_backend.cache_read(timestamp) if not events: time.sleep(self.pause) continue for urls in utils.grouper(self.write_event_to_scheduler(events), n=100): self.scheduler_updater_backend.cache_remove(urls) @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') def main(verbose): log = logging.getLogger('swh.scheduler.updater.writer') log.addHandler(logging.StreamHandler()) _loglevel = logging.DEBUG if verbose else logging.INFO log.setLevel(_loglevel) UpdaterWriter().run() if __name__ == '__main__': main() diff --git a/swh/scheduler/utils.py b/swh/scheduler/utils.py index d95e0be..722feba 100644 --- a/swh/scheduler/utils.py +++ b/swh/scheduler/utils.py @@ -1,49 +1,53 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime +from datetime import datetime, timezone def get_task(task_name): """Retrieve task object in our application instance by its fully qualified python name. Args: task_name (str): task's name (e.g swh.loader.git.tasks.LoadDiskGitRepository) Returns: Instance of task """ from swh.scheduler.celery_backend.config import app for module in app.conf.CELERY_IMPORTS: __import__(module) return app.tasks[task_name] def create_oneshot_task_dict(type, *args, **kwargs): """Create a oneshot task scheduled for as soon as possible. Args: type (str): Type of oneshot task as per swh-scheduler's db table task_type's column (Ex: origin-update-git, swh-deposit-archive-checks) Returns: Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks) """ + priority = None + if 'priority' in kwargs: + priority = kwargs.pop('priority') return { 'policy': 'oneshot', 'type': type, - 'next_run': datetime.datetime.now(tz=datetime.timezone.utc), + 'next_run': datetime.now(tz=timezone.utc), 'arguments': { 'args': args if args else [], 'kwargs': kwargs if kwargs else {}, - } + }, + 'priority': priority, }