diff --git a/sql/swh-scheduler-data.sql b/sql/swh-scheduler-data.sql index 50bd8db..c39b6bc 100644 --- a/sql/swh-scheduler-data.sql +++ b/sql/swh-scheduler-data.sql @@ -1,77 +1,91 @@ insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-loader-mount-dump-and-load-svn-repository', 'Loading svn repositories from svn dump', 'swh.loader.svn.tasks.MountAndLoadSvnRepositoryTsk', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-loading', 'Loading deposit archive into swh through swh-loader-tar', 'swh.deposit.loader.tasks.LoadDepositArchiveTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-checks', 'Pre-checking deposit step before loading into swh archive', 'swh.deposit.loader.tasks.ChecksDepositTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-vault-cooking', 'Cook a Vault bundle', 'swh.vault.cooking_tasks.SWHCookingTask', '1 day', '1 day', '1 day', 1, 10000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-load-hg', 'Loading mercurial repository swh-loader-mercurial', 'swh.loader.mercurial.tasks.LoadMercurialTsk', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-load-archive-hg', 'Loading archive mercurial repository swh-loader-mercurial', 'swh.loader.mercurial.tasks.LoadArchiveMercurialTsk', '1 day', '1 day', '1 day', 1, 1000); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor, + max_queue_length) +values ( + 'origin-update-git', + 'Update an origin of type git', + 'swh.loader.git.tasks.UpdateGitRepository', + '64 days', + '12:00:00', + '64 days', 2, 100000); diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py index 7dfdbf4..6eb67a7 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -1,68 +1,68 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from arrow import utcnow from nose.plugins.attrib import attr from nose.tools import istest from hypothesis import given from hypothesis.strategies import sets, from_regex from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') @attr('db') class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler-updater.dump') def setUp(self): super().setUp() config = { 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, - 'time_window': '1 minute', + 'cache_read_limit': 1000, } self.backend = SchedulerUpdaterBackend(**config) def _empty_tables(self): self.cursor.execute( """SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def tearDown(self): self.backend.close_connection() self._empty_tables() super().tearDown() @istest @given(sets( from_regex( r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), min_size=10, max_size=15)) def cache_read(self, urls): def gen_events(urls): for url in urls: yield SWHEvent({ 'url': url, 'type': 'create', 'origin_type': 'git', }) self.backend.cache_put(gen_events(urls)) r = self.backend.cache_read(timestamp=utcnow()) self.assertNotEqual(r, []) diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py index 9f82734..e4b6958 100644 --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -1,67 +1,80 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow from swh.core.config import SWHConfig from swh.scheduler.backend import DbBackend, autocommit class SchedulerUpdaterBackend(SWHConfig, DbBackend): CONFIG_BASE_FILENAME = 'scheduler-updater' DEFAULT_CONFIG = { 'scheduling_updater_db': ( 'str', 'dbname=softwareheritage-scheduler-updater-dev'), - 'time_window': ('str', '1 hour'), + 'cache_read_limit': ('int', 1000), } def __init__(self, **override_config): super().__init__() - self.config = self.parse_config_file(global_config=False) - self.config.update(override_config) + if override_config: + self.config = override_config + else: + self.config = self.parse_config_file(global_config=False) self.db = None self.db_conn_dsn = self.config['scheduling_updater_db'] - self.time_window = self.config['time_window'] + self.limit = self.config['cache_read_limit'] self.reconnect() cache_put_keys = ['url', 'rate', 'last_seen', 'origin_type'] @autocommit def cache_put(self, events, timestamp=None, cursor=None): + """Write new events in the backend. + + """ if timestamp is None: timestamp = utcnow() def prepare_events(events): for e in events: event = e.get() seen = event['last_seen'] if seen is None: event['last_seen'] = timestamp yield event cursor.execute('select swh_mktemp_cache()') self.copy_to(prepare_events(events), 'tmp_cache', self.cache_put_keys, cursor) cursor.execute('select swh_cache_put()') - # @autocommit - # def cache_get(self, event, cursor=None): - # pass - - # @autocommit - # def cache_remove(self, event, cursor=None): - # pass - - cache_read_keys = ['id', 'url'] + cache_read_keys = ['id', 'url', 'rate', 'origin_type'] @autocommit - def cache_read(self, timestamp, limit=100, cursor=None): + def cache_read(self, timestamp, limit=None, cursor=None): + """Read events from the cache prior to timestamp. + + """ + if not limit: + limit = self.limit q = self._format_query("""select {keys} from cache - where %s - interval %s <= last_seen and last_seen <= %s + where last_seen <= %s limit %s """, self.cache_read_keys) - cursor.execute(q, (timestamp, self.time_window, timestamp, limit)) - return cursor.fetchall() + cursor.execute(q, (timestamp, limit)) + for r in cursor.fetchall(): + r['id'] = r['id'].tobytes() + yield r + + @autocommit + def cache_remove(self, entries, cursor=None): + """Clean events from the cache + + """ + q = 'delete from cache where url in (%s)' % ( + ', '.join(("'%s'" % e for e in entries)), ) + cursor.execute(q) diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py new file mode 100644 index 0000000..e2f856d --- /dev/null +++ b/swh/scheduler/updater/writer.py @@ -0,0 +1,139 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging +import time + +from arrow import utcnow + +from swh.core.config import SWHConfig +from swh.core import utils +from swh.scheduler import get_scheduler +from swh.scheduler.updater.backend import SchedulerUpdaterBackend + + +class UpdaterWriter(SWHConfig): + """Updater writer in charge of updating the scheduler db with latest + prioritized oneshot tasks + + In effect, this: + - reads the events from scheduler updater's db + - converts those events into priority oneshot tasks + - dumps them into the scheduler db + + """ + CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' + DEFAULT_CONFIG = { + 'scheduler': ('dict', { + 'cls': 'local', + 'args': { + 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', + }, + }), + 'scheduler_updater': ('dict', { + 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-dev', # noqa + 'cache_read_limit': 1000, + }), + 'pause': ('int', 10), + } + + def __init__(self, **config): + if config: + self.config = config + else: + self.config = self.parse_config_file() + + self.scheduler_updater_backend = SchedulerUpdaterBackend( + **self.config['scheduler_updater']) + self.scheduler_backend = get_scheduler(**self.config['scheduler']) + self.pause = self.config['pause'] + self.log = logging.getLogger( + 'swh.scheduler.updater.writer.UpdaterWriter') + self.log.setLevel(logging.DEBUG) + + def _compute_priority(self, rate): + """Given a ratio, compute the task priority. + + """ + if rate < 5: + return 'low' + elif rate < 50: + return 'normal' + else: + return 'high' + + def convert_to_oneshot_task(self, event): + """Given an event, convert it into oneshot task with priority + + Args: + event (dict): The event to convert to task + + """ + if event['origin_type'] == 'git': + return { + 'type': 'origin-update-git', + 'arguments': { + 'args': [event['url']], + 'kwargs': {}, + }, + 'next_run': utcnow(), + 'policy': 'oneshot', + 'retries_left': 2, + 'priority': self._compute_priority(event['rate']), + } + else: + self.log.warn('Type %s is not supported for now, only git' % ( + event['type'], )) + return None + + def write_event_to_scheduler(self, events): + """Write events to the scheduler and yield ids when done""" + for event in events: + # convert event to oneshot task + oneshot_task = self.convert_to_oneshot_task(event) + if not oneshot_task: + continue + + # write event to scheduler + # FIXME: deal with this in batch + r = self.scheduler_backend.create_tasks([oneshot_task]) + if r: + yield event['url'] + + def run(self): + """First retrieve events from cache (including origin_type, rate), + then convert them into oneshot tasks with priority, then + write them to the scheduler db, at last remove them from + cache. + + """ + while True: + timestamp = utcnow() + events = self.scheduler_updater_backend.cache_read(timestamp) + if not events: + time.sleep(self.pause) + continue + + for ids in utils.grouper(self.write_event_to_scheduler(events), + n=100): + _ids = list(ids) + self.scheduler_updater_backend.cache_remove(_ids) + + +@click.command() +@click.option('--verbose/--no-verbose', '-v', default=False, + help='Verbose mode') +def main(verbose): + log = logging.getLogger('swh.scheduler.updater.writer') + log.addHandler(logging.StreamHandler()) + _loglevel = logging.DEBUG if verbose else logging.INFO + log.setLevel(_loglevel) + + UpdaterWriter().run() + + +if __name__ == '__main__': + main()