diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py index aae1885..8b96133 100644 --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -1,58 +1,68 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Percentage of tasks with priority to schedule PRIORITY_SLOT = 0.6 +DEFAULT_CONFIG_PATH = 'backend/scheduler' +DEFAULT_CONFIG = { + 'scheduler': ('dict', { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-dev', + }, + }) +} + def compute_nb_tasks_from(num_tasks): """Compute and returns the tuple, number of tasks without priority, number of tasks with priority. Args: num_tasks (int): Returns: tuple number of tasks without priority (int), number of tasks with priority (int) """ if not num_tasks: return None, None return (int((1 - PRIORITY_SLOT) * num_tasks), int(PRIORITY_SLOT * num_tasks)) def get_scheduler(cls, args={}): """ Get a scheduler object of class `scheduler_class` with arguments `scheduler_args`. Args: scheduler (dict): dictionary with keys: cls (str): scheduler's class, either 'local' or 'remote' args (dict): dictionary with keys, default to empty. Returns: an instance of swh.scheduler, either local or remote: local: swh.scheduler.backend.SchedulerBackend remote: swh.scheduler.api.client.RemoteScheduler Raises: ValueError if passed an unknown storage class. """ if cls == 'remote': from .api.client import RemoteScheduler as SchedulerBackend elif cls == 'local': from .backend import SchedulerBackend else: raise ValueError('Unknown swh.scheduler class `%s`' % cls) return SchedulerBackend(**args) diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 9831e72..b8cba45 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,183 +1,173 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from flask import request, Flask from swh.core import config from swh.core.api import (decode_request, error_handler, encode_data_server as encode_data) from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter from swh.scheduler import get_scheduler as get_scheduler_from - - -DEFAULT_CONFIG_PATH = 'backend/scheduler' -DEFAULT_CONFIG = { - 'scheduler': ('dict', { - 'cls': 'local', - 'args': { - 'db': 'dbname=softwareheritage-scheduler-dev', - }, - }) -} +from swh.scheduler import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH app = Flask(__name__) scheduler = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) def get_sched(): global scheduler if not scheduler: scheduler = get_scheduler_from(**app.config['scheduler']) return scheduler @app.route('/') @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def index(): return 'SWH Scheduler API server' @app.route('/close_connection', methods=['GET', 'POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def close_connection(): return get_sched().close_connection() @app.route('/set_status_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def set_status_tasks(): return get_sched().set_status_tasks(**decode_request(request)) @app.route('/create_task_type', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def create_task_type(): return get_sched().create_task_type(**decode_request(request)) @app.route('/get_task_type', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def get_task_type(): return get_sched().get_task_type(**decode_request(request)) @app.route('/get_task_types', methods=['GET', 'POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def get_task_types(): return get_sched().get_task_types(**decode_request(request)) @app.route('/create_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def create_tasks(): return get_sched().create_tasks(**decode_request(request)) @app.route('/disable_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def disable_tasks(): return get_sched().disable_tasks(**decode_request(request)) @app.route('/get_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def get_tasks(): return get_sched().get_tasks(**decode_request(request)) @app.route('/search_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def search_tasks(): return get_sched().search_tasks(**decode_request(request)) @app.route('/peek_ready_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def peek_ready_tasks(): return get_sched().peek_ready_tasks(**decode_request(request)) @app.route('/grab_ready_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def grab_ready_tasks(): return get_sched().grab_ready_tasks(**decode_request(request)) @app.route('/schedule_task_run', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def schedule_task_run(): return get_sched().schedule_task_run(**decode_request(request)) @app.route('/mass_schedule_task_runs', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def mass_schedule_task_runs(): return get_sched().mass_schedule_task_runs(**decode_request(request)) @app.route('/start_task_run', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def start_task_run(): return get_sched().start_task_run(**decode_request(request)) @app.route('/end_task_run', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def end_task_run(): return get_sched().end_task_run(**decode_request(request)) @app.route('/filter_task_to_archive', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def filter_task_to_archive(): return get_sched().filter_task_to_archive(**decode_request(request)) @app.route('/delete_archived_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def delete_archived_tasks(): return get_sched().delete_archived_tasks(**decode_request(request)) def run_from_webserver(environ, start_response, config_path=DEFAULT_CONFIG_PATH): """Run the WSGI app from the webserver, loading the configuration.""" cfg = config.load_named_config(config_path, DEFAULT_CONFIG) app.config.update(cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) if __name__ == '__main__': print('Please use the "swh-scheduler api-server" command')