Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9338645
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py
index 8c371b7..95e6315 100644
--- a/swh/scheduler/updater/writer.py
+++ b/swh/scheduler/updater/writer.py
@@ -1,138 +1,144 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import logging
import time
from arrow import utcnow
from swh.core.config import SWHConfig
from swh.core import utils
from swh.scheduler import get_scheduler
from swh.scheduler.updater.backend import SchedulerUpdaterBackend
class UpdaterWriter(SWHConfig):
"""Updater writer in charge of updating the scheduler db with latest
prioritized oneshot tasks
In effect, this:
- reads the events from scheduler updater's db
- converts those events into priority oneshot tasks
- dumps them into the scheduler db
"""
CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer'
DEFAULT_CONFIG = {
+ # access to the scheduler backend
'scheduler': ('dict', {
'cls': 'local',
'args': {
'scheduling_db': 'dbname=softwareheritage-scheduler-dev',
},
}),
+ # access to the scheduler updater cache
'scheduler_updater': ('dict', {
'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-dev', # noqa
'cache_read_limit': 1000,
}),
+ # waiting time between db read when no more data exists
'pause': ('int', 10),
+ # verbose or not
+ 'verbose': ('bool', False),
}
def __init__(self, **config):
if config:
self.config = config
else:
self.config = self.parse_config_file()
self.scheduler_updater_backend = SchedulerUpdaterBackend(
**self.config['scheduler_updater'])
self.scheduler_backend = get_scheduler(**self.config['scheduler'])
self.pause = self.config['pause']
self.log = logging.getLogger(
'swh.scheduler.updater.writer.UpdaterWriter')
- self.log.setLevel(logging.DEBUG)
def _compute_priority(self, cnt):
"""Given a ratio, compute the task priority.
"""
if cnt < 5:
return 'low'
elif cnt < 50:
return 'normal'
else:
return 'high'
+ self.log.setLevel(
+ logging.DEBUG if self.config['verbose'] else logging.INFO)
def convert_to_oneshot_task(self, event):
"""Given an event, convert it into oneshot task with priority
Args:
event (dict): The event to convert to task
"""
if event['origin_type'] == 'git':
return {
'type': 'origin-update-git',
'arguments': {
'args': [event['url']],
'kwargs': {},
},
'next_run': utcnow(),
'policy': 'oneshot',
'retries_left': 2,
'priority': self._compute_priority(event['cnt']),
}
else:
self.log.warn('Type %s is not supported for now, only git' % (
event['type'], ))
return None
def write_event_to_scheduler(self, events):
"""Write events to the scheduler and yield ids when done"""
for event in events:
# convert event to oneshot task
oneshot_task = self.convert_to_oneshot_task(event)
if not oneshot_task:
continue
# write event to scheduler
# FIXME: deal with this in batch
r = self.scheduler_backend.create_tasks([oneshot_task])
if r:
yield event['url']
def run(self):
"""First retrieve events from cache (including origin_type, cnt),
then convert them into oneshot tasks with priority, then
write them to the scheduler db, at last remove them from
cache.
"""
while True:
timestamp = utcnow()
events = self.scheduler_updater_backend.cache_read(timestamp)
if not events:
time.sleep(self.pause)
continue
for urls in utils.grouper(self.write_event_to_scheduler(events),
n=100):
self.scheduler_updater_backend.cache_remove(urls)
@click.command()
@click.option('--verbose/--no-verbose', '-v', default=False,
help='Verbose mode')
def main(verbose):
log = logging.getLogger('swh.scheduler.updater.writer')
log.addHandler(logging.StreamHandler())
_loglevel = logging.DEBUG if verbose else logging.INFO
log.setLevel(_loglevel)
UpdaterWriter().run()
if __name__ == '__main__':
main()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 9:00 AM (6 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3258709
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment