Page MenuHomeSoftware Heritage

writer.py
No OneTemporary

writer.py

# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import logging
import time
from arrow import utcnow
from swh.core.config import SWHConfig
from swh.core import utils
from swh.scheduler import get_scheduler
from swh.scheduler.updater.backend import SchedulerUpdaterBackend
class UpdaterWriter(SWHConfig):
"""Updater writer in charge of updating the scheduler db with latest
prioritized oneshot tasks
In effect, this:
- reads the events from scheduler updater's db
- converts those events into priority oneshot tasks
- dumps them into the scheduler db
"""
CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer'
DEFAULT_CONFIG = {
'scheduler': ('dict', {
'cls': 'local',
'args': {
'scheduling_db': 'dbname=softwareheritage-scheduler-dev',
},
}),
'scheduler_updater': ('dict', {
'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-dev', # noqa
'cache_read_limit': 1000,
}),
'pause': ('int', 10),
}
def __init__(self, **config):
if config:
self.config = config
else:
self.config = self.parse_config_file()
self.scheduler_updater_backend = SchedulerUpdaterBackend(
**self.config['scheduler_updater'])
self.scheduler_backend = get_scheduler(**self.config['scheduler'])
self.pause = self.config['pause']
self.log = logging.getLogger(
'swh.scheduler.updater.writer.UpdaterWriter')
self.log.setLevel(logging.DEBUG)
def _compute_priority(self, rate):
"""Given a ratio, compute the task priority.
"""
if rate < 5:
return 'low'
elif rate < 50:
return 'normal'
else:
return 'high'
def convert_to_oneshot_task(self, event):
"""Given an event, convert it into oneshot task with priority
Args:
event (dict): The event to convert to task
"""
if event['origin_type'] == 'git':
return {
'type': 'origin-update-git',
'arguments': {
'args': [event['url']],
'kwargs': {},
},
'next_run': utcnow(),
'policy': 'oneshot',
'retries_left': 2,
'priority': self._compute_priority(event['rate']),
}
else:
self.log.warn('Type %s is not supported for now, only git' % (
event['type'], ))
return None
def write_event_to_scheduler(self, events):
"""Write events to the scheduler and yield ids when done"""
for event in events:
# convert event to oneshot task
oneshot_task = self.convert_to_oneshot_task(event)
if not oneshot_task:
continue
# write event to scheduler
# FIXME: deal with this in batch
r = self.scheduler_backend.create_tasks([oneshot_task])
if r:
yield event['url']
def run(self):
"""First retrieve events from cache (including origin_type, rate),
then convert them into oneshot tasks with priority, then
write them to the scheduler db, at last remove them from
cache.
"""
while True:
timestamp = utcnow()
events = self.scheduler_updater_backend.cache_read(timestamp)
if not events:
time.sleep(self.pause)
continue
for ids in utils.grouper(self.write_event_to_scheduler(events),
n=100):
_ids = list(ids)
self.scheduler_updater_backend.cache_remove(_ids)
@click.command()
@click.option('--verbose/--no-verbose', '-v', default=False,
help='Verbose mode')
def main(verbose):
log = logging.getLogger('swh.scheduler.updater.writer')
log.addHandler(logging.StreamHandler())
_loglevel = logging.DEBUG if verbose else logging.INFO
log.setLevel(_loglevel)
UpdaterWriter().run()
if __name__ == '__main__':
main()

File Metadata

Mime Type
text/x-python
Expires
Thu, Jul 3, 12:23 PM (2 d, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408766

Event Timeline