Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/api/server.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | |||||
import logging | import logging | ||||
import os | |||||
from flask import request, Flask | |||||
from swh.core import config | from swh.core import config | ||||
from swh.core.api import ( | from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp | ||||
decode_request, | from swh.core.api import encode_data_server as encode_data | ||||
error_handler, | from swh.core.api import error_handler, negotiate | ||||
encode_data_server as encode_data, | |||||
) | |||||
from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter | from swh.scheduler import get_scheduler | ||||
from swh.scheduler import get_scheduler as get_scheduler_from | from swh.scheduler.interface import SchedulerInterface | ||||
app = Flask(__name__) | |||||
scheduler = None | scheduler = None | ||||
@app.errorhandler(Exception) | def get_global_scheduler(): | ||||
def my_error_handler(exception): | |||||
return error_handler(exception, encode_data) | |||||
def get_sched(): | |||||
global scheduler | global scheduler | ||||
if not scheduler: | if not scheduler: | ||||
scheduler = get_scheduler_from(**app.config["scheduler"]) | scheduler = get_scheduler(**app.config["scheduler"]) | ||||
return scheduler | return scheduler | ||||
def has_no_empty_params(rule): | class SchedulerServerApp(RPCServerApp): | ||||
return len(rule.defaults or ()) >= len(rule.arguments or ()) | pass | ||||
@app.route("/") | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def index(): | |||||
return "SWH Scheduler API server" | |||||
@app.route("/close_connection", methods=["GET", "POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def close_connection(): | |||||
return get_sched().close_connection() | |||||
@app.route("/set_status_tasks", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def set_status_tasks(): | |||||
return get_sched().set_status_tasks(**decode_request(request)) | |||||
@app.route("/create_task_type", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def create_task_type(): | |||||
return get_sched().create_task_type(**decode_request(request)) | |||||
@app.route("/get_task_type", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def get_task_type(): | |||||
return get_sched().get_task_type(**decode_request(request)) | |||||
@app.route("/get_task_types", methods=["GET", "POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def get_task_types(): | |||||
return get_sched().get_task_types(**decode_request(request)) | |||||
@app.route("/create_tasks", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def create_tasks(): | |||||
return get_sched().create_tasks(**decode_request(request)) | |||||
@app.route("/disable_tasks", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def disable_tasks(): | |||||
return get_sched().disable_tasks(**decode_request(request)) | |||||
@app.route("/get_tasks", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def get_tasks(): | |||||
return get_sched().get_tasks(**decode_request(request)) | |||||
@app.route("/get_task_runs", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def get_task_runs(): | |||||
return get_sched().get_task_runs(**decode_request(request)) | |||||
@app.route("/search_tasks", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def search_tasks(): | |||||
return get_sched().search_tasks(**decode_request(request)) | |||||
@app.route("/peek_ready_tasks", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def peek_ready_tasks(): | |||||
return get_sched().peek_ready_tasks(**decode_request(request)) | |||||
@app.route("/grab_ready_tasks", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def grab_ready_tasks(): | |||||
return get_sched().grab_ready_tasks(**decode_request(request)) | |||||
@app.route("/schedule_task_run", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def schedule_task_run(): | |||||
return get_sched().schedule_task_run(**decode_request(request)) | |||||
@app.route("/mass_schedule_task_runs", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def mass_schedule_task_runs(): | |||||
return get_sched().mass_schedule_task_runs(**decode_request(request)) | |||||
@app.route("/start_task_run", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def start_task_run(): | |||||
return get_sched().start_task_run(**decode_request(request)) | |||||
@app.route("/end_task_run", methods=["POST"]) | app = SchedulerServerApp( | ||||
@negotiate(MsgpackFormatter) | __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler | ||||
@negotiate(JSONFormatter) | ) | ||||
def end_task_run(): | |||||
return get_sched().end_task_run(**decode_request(request)) | |||||
@app.route("/filter_task_to_archive", methods=["POST"]) | |||||
@negotiate(MsgpackFormatter) | |||||
@negotiate(JSONFormatter) | |||||
def filter_task_to_archive(): | |||||
return get_sched().filter_task_to_archive(**decode_request(request)) | |||||
@app.route("/delete_archived_tasks", methods=["POST"]) | @app.errorhandler(Exception) | ||||
@negotiate(MsgpackFormatter) | def my_error_handler(exception): | ||||
@negotiate(JSONFormatter) | return error_handler(exception, encode_data) | ||||
def delete_archived_tasks(): | |||||
return get_sched().delete_archived_tasks(**decode_request(request)) | |||||
@app.route("/get_priority_ratios", methods=["GET", "POST"]) | def has_no_empty_params(rule): | ||||
@negotiate(MsgpackFormatter) | return len(rule.defaults or ()) >= len(rule.arguments or ()) | ||||
@negotiate(JSONFormatter) | |||||
def get_priority_ratios(): | |||||
return get_sched().get_priority_ratios(**decode_request(request)) | |||||
@app.route("/site-map") | @app.route("/site-map") | ||||
@negotiate(MsgpackFormatter) | @negotiate(MsgpackFormatter) | ||||
@negotiate(JSONFormatter) | @negotiate(JSONFormatter) | ||||
def site_map(): | def site_map(): | ||||
links = [] | links = [] | ||||
sched = get_sched() | |||||
for rule in app.url_map.iter_rules(): | for rule in app.url_map.iter_rules(): | ||||
if has_no_empty_params(rule) and hasattr(sched, rule.endpoint): | if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint): | ||||
links.append( | links.append( | ||||
dict(rule=rule.rule, description=getattr(sched, rule.endpoint).__doc__) | dict( | ||||
rule=rule.rule, | |||||
description=getattr(SchedulerInterface, rule.endpoint).__doc__, | |||||
) | |||||
) | ) | ||||
# links is now a list of url, endpoint tuples | # links is now a list of url, endpoint tuples | ||||
return links | return links | ||||
def load_and_check_config(config_file, type="local"): | def load_and_check_config(config_file, type="local"): | ||||
"""Check the minimal configuration is set to run the api or raise an | """Check the minimal configuration is set to run the api or raise an | ||||
error explanation. | error explanation. | ||||
▲ Show 20 Lines • Show All 67 Lines • Show Last 20 Lines |