Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9337565
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
View Options
diff --git a/sql/swh-scheduler-data.sql b/sql/swh-scheduler-data.sql
index 50bd8db..c39b6bc 100644
--- a/sql/swh-scheduler-data.sql
+++ b/sql/swh-scheduler-data.sql
@@ -1,77 +1,91 @@
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'swh-loader-mount-dump-and-load-svn-repository',
'Loading svn repositories from svn dump',
'swh.loader.svn.tasks.MountAndLoadSvnRepositoryTsk',
'1 day', '1 day', '1 day', 1,
1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
num_retries,
max_queue_length)
values (
'swh-deposit-archive-loading',
'Loading deposit archive into swh through swh-loader-tar',
'swh.deposit.loader.tasks.LoadDepositArchiveTsk',
'1 day', '1 day', '1 day', 1, 3, 1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
num_retries, max_queue_length)
values (
'swh-deposit-archive-checks',
'Pre-checking deposit step before loading into swh archive',
'swh.deposit.loader.tasks.ChecksDepositTsk',
'1 day', '1 day', '1 day', 1, 3, 1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'swh-vault-cooking',
'Cook a Vault bundle',
'swh.vault.cooking_tasks.SWHCookingTask',
'1 day', '1 day', '1 day', 1,
10000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'origin-load-hg',
'Loading mercurial repository swh-loader-mercurial',
'swh.loader.mercurial.tasks.LoadMercurialTsk',
'1 day', '1 day', '1 day', 1,
1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'origin-load-archive-hg',
'Loading archive mercurial repository swh-loader-mercurial',
'swh.loader.mercurial.tasks.LoadArchiveMercurialTsk',
'1 day', '1 day', '1 day', 1,
1000);
+
+insert into task_type(
+ type,
+ description,
+ backend_name,
+ default_interval, min_interval, max_interval, backoff_factor,
+ max_queue_length)
+values (
+ 'origin-update-git',
+ 'Update an origin of type git',
+ 'swh.loader.git.tasks.UpdateGitRepository',
+ '64 days',
+ '12:00:00',
+ '64 days', 2, 100000);
diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py
index 7dfdbf4..6eb67a7 100644
--- a/swh/scheduler/tests/updater/test_backend.py
+++ b/swh/scheduler/tests/updater/test_backend.py
@@ -1,68 +1,68 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import unittest
from arrow import utcnow
from nose.plugins.attrib import attr
from nose.tools import istest
from hypothesis import given
from hypothesis.strategies import sets, from_regex
from swh.core.tests.db_testing import SingleDbTestFixture
from swh.scheduler.updater.backend import SchedulerUpdaterBackend
from swh.scheduler.updater.events import SWHEvent
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata')
@attr('db')
class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase):
TEST_DB_NAME = 'softwareheritage-scheduler-updater-test'
TEST_DB_DUMP = os.path.join(TEST_DATA_DIR,
'dumps/swh-scheduler-updater.dump')
def setUp(self):
super().setUp()
config = {
'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME,
- 'time_window': '1 minute',
+ 'cache_read_limit': 1000,
}
self.backend = SchedulerUpdaterBackend(**config)
def _empty_tables(self):
self.cursor.execute(
"""SELECT table_name FROM information_schema.tables
WHERE table_schema = %s""", ('public', ))
tables = set(table for (table,) in self.cursor.fetchall())
for table in tables:
self.cursor.execute('truncate table %s cascade' % table)
self.conn.commit()
def tearDown(self):
self.backend.close_connection()
self._empty_tables()
super().tearDown()
@istest
@given(sets(
from_regex(
r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
min_size=10, max_size=15))
def cache_read(self, urls):
def gen_events(urls):
for url in urls:
yield SWHEvent({
'url': url,
'type': 'create',
'origin_type': 'git',
})
self.backend.cache_put(gen_events(urls))
r = self.backend.cache_read(timestamp=utcnow())
self.assertNotEqual(r, [])
diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py
index 9f82734..e4b6958 100644
--- a/swh/scheduler/updater/backend.py
+++ b/swh/scheduler/updater/backend.py
@@ -1,67 +1,80 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from arrow import utcnow
from swh.core.config import SWHConfig
from swh.scheduler.backend import DbBackend, autocommit
class SchedulerUpdaterBackend(SWHConfig, DbBackend):
CONFIG_BASE_FILENAME = 'scheduler-updater'
DEFAULT_CONFIG = {
'scheduling_updater_db': (
'str', 'dbname=softwareheritage-scheduler-updater-dev'),
- 'time_window': ('str', '1 hour'),
+ 'cache_read_limit': ('int', 1000),
}
def __init__(self, **override_config):
super().__init__()
- self.config = self.parse_config_file(global_config=False)
- self.config.update(override_config)
+ if override_config:
+ self.config = override_config
+ else:
+ self.config = self.parse_config_file(global_config=False)
self.db = None
self.db_conn_dsn = self.config['scheduling_updater_db']
- self.time_window = self.config['time_window']
+ self.limit = self.config['cache_read_limit']
self.reconnect()
cache_put_keys = ['url', 'rate', 'last_seen', 'origin_type']
@autocommit
def cache_put(self, events, timestamp=None, cursor=None):
+ """Write new events in the backend.
+
+ """
if timestamp is None:
timestamp = utcnow()
def prepare_events(events):
for e in events:
event = e.get()
seen = event['last_seen']
if seen is None:
event['last_seen'] = timestamp
yield event
cursor.execute('select swh_mktemp_cache()')
self.copy_to(prepare_events(events),
'tmp_cache', self.cache_put_keys, cursor)
cursor.execute('select swh_cache_put()')
- # @autocommit
- # def cache_get(self, event, cursor=None):
- # pass
-
- # @autocommit
- # def cache_remove(self, event, cursor=None):
- # pass
-
- cache_read_keys = ['id', 'url']
+ cache_read_keys = ['id', 'url', 'rate', 'origin_type']
@autocommit
- def cache_read(self, timestamp, limit=100, cursor=None):
+ def cache_read(self, timestamp, limit=None, cursor=None):
+ """Read events from the cache prior to timestamp.
+
+ """
+ if not limit:
+ limit = self.limit
q = self._format_query("""select {keys}
from cache
- where %s - interval %s <= last_seen and last_seen <= %s
+ where last_seen <= %s
limit %s
""", self.cache_read_keys)
- cursor.execute(q, (timestamp, self.time_window, timestamp, limit))
- return cursor.fetchall()
+ cursor.execute(q, (timestamp, limit))
+ for r in cursor.fetchall():
+ r['id'] = r['id'].tobytes()
+ yield r
+
+ @autocommit
+ def cache_remove(self, entries, cursor=None):
+ """Clean events from the cache
+
+ """
+ q = 'delete from cache where url in (%s)' % (
+ ', '.join(("'%s'" % e for e in entries)), )
+ cursor.execute(q)
diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py
new file mode 100644
index 0000000..e2f856d
--- /dev/null
+++ b/swh/scheduler/updater/writer.py
@@ -0,0 +1,139 @@
+# Copyright (C) 2018 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import logging
+import time
+
+from arrow import utcnow
+
+from swh.core.config import SWHConfig
+from swh.core import utils
+from swh.scheduler import get_scheduler
+from swh.scheduler.updater.backend import SchedulerUpdaterBackend
+
+
+class UpdaterWriter(SWHConfig):
+ """Updater writer in charge of updating the scheduler db with latest
+ prioritized oneshot tasks
+
+ In effect, this:
+ - reads the events from scheduler updater's db
+ - converts those events into priority oneshot tasks
+ - dumps them into the scheduler db
+
+ """
+ CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer'
+ DEFAULT_CONFIG = {
+ 'scheduler': ('dict', {
+ 'cls': 'local',
+ 'args': {
+ 'scheduling_db': 'dbname=softwareheritage-scheduler-dev',
+ },
+ }),
+ 'scheduler_updater': ('dict', {
+ 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-dev', # noqa
+ 'cache_read_limit': 1000,
+ }),
+ 'pause': ('int', 10),
+ }
+
+ def __init__(self, **config):
+ if config:
+ self.config = config
+ else:
+ self.config = self.parse_config_file()
+
+ self.scheduler_updater_backend = SchedulerUpdaterBackend(
+ **self.config['scheduler_updater'])
+ self.scheduler_backend = get_scheduler(**self.config['scheduler'])
+ self.pause = self.config['pause']
+ self.log = logging.getLogger(
+ 'swh.scheduler.updater.writer.UpdaterWriter')
+ self.log.setLevel(logging.DEBUG)
+
+ def _compute_priority(self, rate):
+ """Given a ratio, compute the task priority.
+
+ """
+ if rate < 5:
+ return 'low'
+ elif rate < 50:
+ return 'normal'
+ else:
+ return 'high'
+
+ def convert_to_oneshot_task(self, event):
+ """Given an event, convert it into oneshot task with priority
+
+ Args:
+ event (dict): The event to convert to task
+
+ """
+ if event['origin_type'] == 'git':
+ return {
+ 'type': 'origin-update-git',
+ 'arguments': {
+ 'args': [event['url']],
+ 'kwargs': {},
+ },
+ 'next_run': utcnow(),
+ 'policy': 'oneshot',
+ 'retries_left': 2,
+ 'priority': self._compute_priority(event['rate']),
+ }
+ else:
+ self.log.warn('Type %s is not supported for now, only git' % (
+ event['type'], ))
+ return None
+
+ def write_event_to_scheduler(self, events):
+ """Write events to the scheduler and yield ids when done"""
+ for event in events:
+ # convert event to oneshot task
+ oneshot_task = self.convert_to_oneshot_task(event)
+ if not oneshot_task:
+ continue
+
+ # write event to scheduler
+ # FIXME: deal with this in batch
+ r = self.scheduler_backend.create_tasks([oneshot_task])
+ if r:
+ yield event['url']
+
+ def run(self):
+ """First retrieve events from cache (including origin_type, rate),
+ then convert them into oneshot tasks with priority, then
+ write them to the scheduler db, at last remove them from
+ cache.
+
+ """
+ while True:
+ timestamp = utcnow()
+ events = self.scheduler_updater_backend.cache_read(timestamp)
+ if not events:
+ time.sleep(self.pause)
+ continue
+
+ for ids in utils.grouper(self.write_event_to_scheduler(events),
+ n=100):
+ _ids = list(ids)
+ self.scheduler_updater_backend.cache_remove(_ids)
+
+
+@click.command()
+@click.option('--verbose/--no-verbose', '-v', default=False,
+ help='Verbose mode')
+def main(verbose):
+ log = logging.getLogger('swh.scheduler.updater.writer')
+ log.addHandler(logging.StreamHandler())
+ _loglevel = logging.DEBUG if verbose else logging.INFO
+ log.setLevel(_loglevel)
+
+ UpdaterWriter().run()
+
+
+if __name__ == '__main__':
+ main()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 8:10 AM (10 w, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3318705
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment