diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index 20ced18..1d932a5 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,158 +1,158 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from glob import glob import pytest from swh.core.utils import numfile_sortkey as sortkey from swh.core.tests.db_testing import DbTestFixture from swh.scheduler.tests import SQL_DIR from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.writer import UpdaterWriter from . import UpdaterTestUtil @pytest.mark.db class CommonSchedulerTest(DbTestFixture): TEST_SCHED_DB = 'softwareheritage-scheduler-test' TEST_SCHED_DUMP = os.path.join(SQL_DIR, '*.sql') TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' TEST_SCHED_UPDATER_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') @classmethod def setUpClass(cls): cls.add_db(cls.TEST_SCHED_DB, [(sqlfn, 'psql') for sqlfn in sorted(glob(cls.TEST_SCHED_DUMP), key=sortkey)]) cls.add_db(cls.TEST_SCHED_UPDATER_DB, [(sqlfn, 'psql') for sqlfn in sorted(glob(cls.TEST_SCHED_UPDATER_DUMP), key=sortkey)]) super().setUpClass() def tearDown(self): self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) self.reset_db_tables(self.TEST_SCHED_DB, excluded=['task_type', 'priority_ratio']) super().tearDown() class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() config = { 'scheduler': { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-test', 'cache_read_limit': 5, }, 'pause': 0.1, 'verbose': False, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend def tearDown(self): self.scheduler_backend.close_connection() self.scheduler_updater_backend.close_connection() super().tearDown() def test_run_ko(self): """Only git tasks are supported for now, other types are dismissed. """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'svn')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # other reads after writes are still empty since it's not supported self.assertEqual(len(r), 0) def test_run_ok(self): """Only git origin are supported for now """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'git')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() # now, we should have scheduling task ready r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') - self.assertEquals(len(r), expected_length) + self.assertEqual(len(r), expected_length) # Check the task has been scheduled for t in r: - self.assertEquals(t['type'], 'origin-update-git') - self.assertEquals(t['priority'], 'normal') - self.assertEquals(t['policy'], 'oneshot') - self.assertEquals(t['status'], 'next_run_not_scheduled') + self.assertEqual(t['type'], 'origin-update-git') + self.assertEqual(t['priority'], 'normal') + self.assertEqual(t['policy'], 'oneshot') + self.assertEqual(t['status'], 'next_run_not_scheduled') # writer has nothing to do now self.writer.run() # so no more data in cache data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), 0) # provided, no runner is ran, still the same amount of scheduling tasks r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') - self.assertEquals(len(r), expected_length) + self.assertEqual(len(r), expected_length) diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py index e0a0760..e0465f2 100644 --- a/swh/scheduler/updater/consumer.py +++ b/swh/scheduler/updater/consumer.py @@ -1,140 +1,140 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from abc import ABCMeta, abstractmethod from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterConsumer(metaclass=ABCMeta): """Event consumer """ def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend, log_class='swh.scheduler.updater.consumer.UpdaterConsumer'): super().__init__() self._reset_cache() self.backend = backend_class() self.batch = batch logging.basicConfig(level=logging.DEBUG) self.log = logging.getLogger(log_class) def _reset_cache(self): """Reset internal cache. """ self.count = 0 self.seen_events = set() self.events = [] def is_interesting(self, event): """Determine if an event is interesting or not. Args: event (SWHEvent): SWH event """ return event.is_interesting() @abstractmethod def convert_event(self, event): """Parse an event into an SWHEvent. """ pass def process_event(self, event): """Process converted and interesting event. Args: event (SWHEvent): Event to process if deemed interesting """ try: if event.url in self.seen_events: event.cnt += 1 else: self.events.append(event) self.seen_events.add(event.url) self.count += 1 finally: if self.count >= self.batch: if self.events: self.backend.cache_put(self.events) self._reset_cache() def _flush(self): """Flush remaining internal cache if any. """ if self.events: self.backend.cache_put(self.events) self._reset_cache() @abstractmethod def has_events(self): """Determine if there remains events to consume. Returns boolean value, true for remaining events, False otherwise """ pass @abstractmethod def consume_events(self): """The main entry point to consume events. This should either yield or return message for consumption. """ pass @abstractmethod def open_connection(self): """Open a connection to the remote system we are supposed to consume from. """ pass @abstractmethod def close_connection(self): """Close opened connection to the remote system. """ pass def run(self): """The main entry point to consume events. """ try: self.open_connection() while self.has_events(): for _event in self.consume_events(): event = self.convert_event(_event) if not event: - self.log.warn( + self.log.warning( 'Incomplete event dropped %s' % _event) continue if not self.is_interesting(event): continue if self.debug: self.log.debug('Event: %s' % event) try: self.process_event(event) except Exception: self.log.exception( 'Problem when processing event %s' % _event) continue except Exception as e: self.log.error('Error raised during consumption: %s' % e) raise e finally: self.close_connection() self._flush() diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py index cb10825..829f31c 100644 --- a/swh/scheduler/updater/writer.py +++ b/swh/scheduler/updater/writer.py @@ -1,122 +1,122 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import time from arrow import utcnow from swh.core.config import SWHConfig from swh.core import utils from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterWriter(SWHConfig): """Updater writer in charge of updating the scheduler db with latest prioritized oneshot tasks In effect, this: - reads the events from scheduler updater's db - converts those events into priority oneshot tasks - dumps them into the scheduler db """ CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' DEFAULT_CONFIG = { # access to the scheduler backend 'scheduler': ('dict', { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', }, }), # access to the scheduler updater cache 'scheduler_updater': ('dict', { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-dev', 'cache_read_limit': 1000, }), # waiting time between db reads 'pause': ('int', 10), # verbose or not 'verbose': ('bool', False), } def __init__(self, **config): if config: self.config = config else: self.config = self.parse_config_file() self.scheduler_updater_backend = SchedulerUpdaterBackend( **self.config['scheduler_updater']) self.scheduler_backend = get_scheduler(**self.config['scheduler']) self.pause = self.config['pause'] self.log = logging.getLogger( 'swh.scheduler.updater.writer.UpdaterWriter') self.log.setLevel( logging.DEBUG if self.config['verbose'] else logging.INFO) def convert_to_oneshot_task(self, event): """Given an event, convert it into oneshot task with priority Args: event (dict): The event to convert to task """ if event['origin_type'] == 'git': return create_oneshot_task_dict( 'origin-update-git', event['url'], priority='normal') - self.log.warn('Type %s is not supported for now, only git' % ( + self.log.warning('Type %s is not supported for now, only git' % ( event['origin_type'], )) return None def write_event_to_scheduler(self, events): """Write events to the scheduler and yield ids when done""" # convert events to oneshot tasks oneshot_tasks = filter(lambda e: e is not None, map(self.convert_to_oneshot_task, events)) # write event to scheduler self.scheduler_backend.create_tasks(list(oneshot_tasks)) for e in events: yield e['url'] def run(self): """First retrieve events from cache (including origin_type, cnt), then convert them into oneshot tasks with priority, then write them to the scheduler db, at last remove them from cache. """ while True: timestamp = utcnow() events = list(self.scheduler_updater_backend.cache_read(timestamp)) if not events: break for urls in utils.grouper(self.write_event_to_scheduler(events), n=100): self.scheduler_updater_backend.cache_remove(urls) time.sleep(self.pause) @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') def main(verbose): log = logging.getLogger('swh.scheduler.updater.writer') log.addHandler(logging.StreamHandler()) _loglevel = logging.DEBUG if verbose else logging.INFO log.setLevel(_loglevel) UpdaterWriter().run() if __name__ == '__main__': main()