Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/updater/writer.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import click | import click | ||||
import logging | import logging | ||||
import time | import time | ||||
from arrow import utcnow | from arrow import utcnow | ||||
from swh.core.config import SWHConfig | |||||
from swh.core import utils | from swh.core import utils | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.utils import create_oneshot_task_dict | from swh.scheduler.utils import create_oneshot_task_dict | ||||
from swh.scheduler.updater.backend import SchedulerUpdaterBackend | from swh.scheduler.updater.backend import SchedulerUpdaterBackend | ||||
class UpdaterWriter(SWHConfig): | class UpdaterWriter: | ||||
"""Updater writer in charge of updating the scheduler db with latest | """Updater writer in charge of updating the scheduler db with latest | ||||
prioritized oneshot tasks | prioritized oneshot tasks | ||||
In effect, this: | In effect, this: | ||||
- reads the events from scheduler updater's db | - reads the events from scheduler updater's db | ||||
- converts those events into priority oneshot tasks | - converts those events into priority oneshot tasks | ||||
- dumps them into the scheduler db | - dumps them into the scheduler db | ||||
""" | """ | ||||
CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' | |||||
DEFAULT_CONFIG = { | |||||
# access to the scheduler backend | |||||
'scheduler': ('dict', { | |||||
'cls': 'local', | |||||
'args': { | |||||
'scheduling_db': 'dbname=softwareheritage-scheduler-dev', | |||||
}, | |||||
}), | |||||
# access to the scheduler updater cache | |||||
'scheduler_updater': ('dict', { | |||||
'scheduling_updater_db': | |||||
'dbname=softwareheritage-scheduler-updater-dev', | |||||
'cache_read_limit': 1000, | |||||
}), | |||||
# waiting time between db reads | |||||
'pause': ('int', 10), | |||||
# verbose or not | |||||
'verbose': ('bool', False), | |||||
} | |||||
def __init__(self, **config): | def __init__(self, **config): | ||||
if config: | |||||
self.config = config | self.config = config | ||||
else: | if self.config['scheduler_updater']['cls'] != 'local': | ||||
self.config = self.parse_config_file() | raise ValueError( | ||||
'The scheduler_updater can only be a cls=local for now') | |||||
self.scheduler_updater_backend = SchedulerUpdaterBackend( | self.scheduler_updater_backend = SchedulerUpdaterBackend( | ||||
**self.config['scheduler_updater']) | **self.config['scheduler_updater']['args']) | ||||
self.scheduler_backend = get_scheduler(**self.config['scheduler']) | self.scheduler_backend = get_scheduler(**self.config['scheduler']) | ||||
self.pause = self.config['pause'] | self.pause = self.config.get('updater_writer', {}).get('pause', 10) | ||||
self.log = logging.getLogger( | self.log = logging.getLogger( | ||||
'swh.scheduler.updater.writer.UpdaterWriter') | 'swh.scheduler.updater.writer.UpdaterWriter') | ||||
self.log.setLevel( | |||||
logging.DEBUG if self.config['verbose'] else logging.INFO) | |||||
def convert_to_oneshot_task(self, event): | def convert_to_oneshot_task(self, event): | ||||
"""Given an event, convert it into oneshot task with priority | """Given an event, convert it into oneshot task with priority | ||||
Args: | Args: | ||||
event (dict): The event to convert to task | event (dict): The event to convert to task | ||||
""" | """ | ||||
Show All 32 Lines | def run(self): | ||||
n=100): | n=100): | ||||
self.scheduler_updater_backend.cache_remove(urls) | self.scheduler_updater_backend.cache_remove(urls) | ||||
time.sleep(self.pause) | time.sleep(self.pause) | ||||
@click.command() | @click.command() | ||||
@click.option('--verbose/--no-verbose', '-v', default=False, | @click.option('--verbose/--no-verbose', '-v', default=False, | ||||
help='Verbose mode') | help='Verbose mode') | ||||
def main(verbose): | @click.pass_context | ||||
log = logging.getLogger('swh.scheduler.updater.writer') | def main(ctx, verbose): | ||||
log.addHandler(logging.StreamHandler()) | click.echo("Deprecated! Use 'swh-scheduler updater' instead.", | ||||
_loglevel = logging.DEBUG if verbose else logging.INFO | err=True) | ||||
log.setLevel(_loglevel) | ctx.exit(1) | ||||
UpdaterWriter().run() | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
main() | main() |