diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -6,137 +6,12 @@ from swh.core.api import RPCClient +from ..interface import SchedulerInterface + class RemoteScheduler(RPCClient): """Proxy to a remote scheduler API """ - def close_connection(self): - return self.post("close_connection", {}) - - def set_status_tasks(self, task_ids, status="disabled", next_run=None): - return self.post( - "set_status_tasks", - dict(task_ids=task_ids, status=status, next_run=next_run), - ) - - def create_task_type(self, task_type): - return self.post("create_task_type", {"task_type": task_type}) - - def get_task_type(self, task_type_name): - return self.post("get_task_type", {"task_type_name": task_type_name}) - - def get_task_types(self): - return self.post("get_task_types", {}) - - def create_tasks(self, tasks): - return self.post("create_tasks", {"tasks": tasks}) - - def disable_tasks(self, task_ids): - return self.post("disable_tasks", {"task_ids": task_ids}) - - def get_tasks(self, task_ids): - return self.post("get_tasks", {"task_ids": task_ids}) - - def get_task_runs(self, task_ids, limit=None): - return self.post("get_task_runs", {"task_ids": task_ids, "limit": limit}) - - def search_tasks( - self, - task_id=None, - task_type=None, - status=None, - priority=None, - policy=None, - before=None, - after=None, - limit=None, - ): - return self.post( - "search_tasks", - dict( - task_id=task_id, - task_type=task_type, - status=status, - priority=priority, - policy=policy, - before=before, - after=after, - limit=limit, - ), - ) - - def peek_ready_tasks( - self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None - ): - return self.post( - "peek_ready_tasks", - { - "task_type": task_type, - "timestamp": timestamp, - "num_tasks": num_tasks, - "num_tasks_priority": num_tasks_priority, - }, - ) - - def grab_ready_tasks( - self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None - ): - return self.post( - "grab_ready_tasks", - { - "task_type": task_type, - "timestamp": timestamp, - "num_tasks": num_tasks, - "num_tasks_priority": num_tasks_priority, - }, - ) - - def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): - return self.post( - "schedule_task_run", - { - "task_id": task_id, - "backend_id": backend_id, - "metadata": metadata, - "timestamp": timestamp, - }, - ) - - def mass_schedule_task_runs(self, task_runs): - return self.post("mass_schedule_task_runs", {"task_runs": task_runs}) - - def start_task_run(self, backend_id, metadata=None, timestamp=None): - return self.post( - "start_task_run", - {"backend_id": backend_id, "metadata": metadata, "timestamp": timestamp,}, - ) - - def end_task_run(self, backend_id, status, metadata=None, timestamp=None): - return self.post( - "end_task_run", - { - "backend_id": backend_id, - "status": status, - "metadata": metadata, - "timestamp": timestamp, - }, - ) - - def filter_task_to_archive(self, after_ts, before_ts, limit=10, page_token=None): - return self.post( - "filter_task_to_archive", - { - "after_ts": after_ts, - "before_ts": before_ts, - "limit": limit, - "page_token": page_token, - }, - ) - - def delete_archived_tasks(self, task_ids): - return self.post("delete_archived_tasks", {"task_ids": task_ids}) - - def get_priority_ratios(self): - return self.get("get_priority_ratios") + backend_class = SchedulerInterface diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -3,180 +3,44 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os import logging - -from flask import request, Flask +import os from swh.core import config -from swh.core.api import ( - decode_request, - error_handler, - encode_data_server as encode_data, -) +from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp +from swh.core.api import encode_data_server as encode_data +from swh.core.api import error_handler, negotiate -from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter -from swh.scheduler import get_scheduler as get_scheduler_from +from swh.scheduler import get_scheduler +from swh.scheduler.interface import SchedulerInterface -app = Flask(__name__) scheduler = None -@app.errorhandler(Exception) -def my_error_handler(exception): - return error_handler(exception, encode_data) - - -def get_sched(): +def get_global_scheduler(): global scheduler if not scheduler: - scheduler = get_scheduler_from(**app.config["scheduler"]) + scheduler = get_scheduler(**app.config["scheduler"]) return scheduler -def has_no_empty_params(rule): - return len(rule.defaults or ()) >= len(rule.arguments or ()) - - -@app.route("/") -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def index(): - return "SWH Scheduler API server" - - -@app.route("/close_connection", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def close_connection(): - return get_sched().close_connection() - - -@app.route("/set_status_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def set_status_tasks(): - return get_sched().set_status_tasks(**decode_request(request)) - - -@app.route("/create_task_type", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def create_task_type(): - return get_sched().create_task_type(**decode_request(request)) - - -@app.route("/get_task_type", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_type(): - return get_sched().get_task_type(**decode_request(request)) - - -@app.route("/get_task_types", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_types(): - return get_sched().get_task_types(**decode_request(request)) +class SchedulerServerApp(RPCServerApp): + pass -@app.route("/create_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def create_tasks(): - return get_sched().create_tasks(**decode_request(request)) - - -@app.route("/disable_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def disable_tasks(): - return get_sched().disable_tasks(**decode_request(request)) - - -@app.route("/get_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_tasks(): - return get_sched().get_tasks(**decode_request(request)) - - -@app.route("/get_task_runs", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_runs(): - return get_sched().get_task_runs(**decode_request(request)) - - -@app.route("/search_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def search_tasks(): - return get_sched().search_tasks(**decode_request(request)) - - -@app.route("/peek_ready_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def peek_ready_tasks(): - return get_sched().peek_ready_tasks(**decode_request(request)) - - -@app.route("/grab_ready_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def grab_ready_tasks(): - return get_sched().grab_ready_tasks(**decode_request(request)) - - -@app.route("/schedule_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def schedule_task_run(): - return get_sched().schedule_task_run(**decode_request(request)) - - -@app.route("/mass_schedule_task_runs", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def mass_schedule_task_runs(): - return get_sched().mass_schedule_task_runs(**decode_request(request)) - - -@app.route("/start_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def start_task_run(): - return get_sched().start_task_run(**decode_request(request)) - - -@app.route("/end_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def end_task_run(): - return get_sched().end_task_run(**decode_request(request)) - - -@app.route("/filter_task_to_archive", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def filter_task_to_archive(): - return get_sched().filter_task_to_archive(**decode_request(request)) +app = SchedulerServerApp( + __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler +) -@app.route("/delete_archived_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def delete_archived_tasks(): - return get_sched().delete_archived_tasks(**decode_request(request)) +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) -@app.route("/get_priority_ratios", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_priority_ratios(): - return get_sched().get_priority_ratios(**decode_request(request)) +def has_no_empty_params(rule): + return len(rule.defaults or ()) >= len(rule.arguments or ()) @app.route("/site-map") @@ -184,11 +48,13 @@ @negotiate(JSONFormatter) def site_map(): links = [] - sched = get_sched() for rule in app.url_map.iter_rules(): - if has_no_empty_params(rule) and hasattr(sched, rule.endpoint): + if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint): links.append( - dict(rule=rule.rule, description=getattr(sched, rule.endpoint).__doc__) + dict( + rule=rule.rule, + description=getattr(SchedulerInterface, rule.endpoint).__doc__, + ) ) # links is now a list of url, endpoint tuples return links diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/interface.py @@ -0,0 +1,254 @@ +# Copyright (C) 2015-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Optional + +from swh.core.api import remote_api_endpoint + + +class SchedulerInterface: + @remote_api_endpoint("task_type/create") + def create_task_type(self, task_type): + """Create a new task type ready for scheduling. + + Args: + task_type (dict): a dictionary with the following keys: + + - type (str): an identifier for the task type + - description (str): a human-readable description of what the + task does + - backend_name (str): the name of the task in the + job-scheduling backend + - default_interval (datetime.timedelta): the default interval + between two task runs + - min_interval (datetime.timedelta): the minimum interval + between two task runs + - max_interval (datetime.timedelta): the maximum interval + between two task runs + - backoff_factor (float): the factor by which the interval + changes at each run + - max_queue_length (int): the maximum length of the task queue + for this task type + + """ + ... + + @remote_api_endpoint("task_type/get") + def get_task_type(self, task_type_name): + """Retrieve the task type with id task_type_name""" + ... + + @remote_api_endpoint("task_type/get_all") + def get_task_types(self): + """Retrieve all registered task types""" + ... + + @remote_api_endpoint("task/create") + def create_tasks(self, tasks, policy="recurring"): + """Create new tasks. + + Args: + tasks (list): each task is a dictionary with the following keys: + + - type (str): the task type + - arguments (dict): the arguments for the task runner, keys: + + - args (list of str): arguments + - kwargs (dict str -> str): keyword arguments + + - next_run (datetime.datetime): the next scheduled run for the + task + + Returns: + a list of created tasks. + + """ + ... + + @remote_api_endpoint("task/set_status") + def set_status_tasks(self, task_ids, status="disabled", next_run=None): + """Set the tasks' status whose ids are listed. + + If given, also set the next_run date. + """ + ... + + @remote_api_endpoint("task/disable") + def disable_tasks(self, task_ids): + """Disable the tasks whose ids are listed.""" + ... + + @remote_api_endpoint("task/search") + def search_tasks( + self, + task_id=None, + task_type=None, + status=None, + priority=None, + policy=None, + before=None, + after=None, + limit=None, + ): + """Search tasks from selected criterions""" + ... + + @remote_api_endpoint("task/get") + def get_tasks(self, task_ids): + """Retrieve the info of tasks whose ids are listed.""" + ... + + @remote_api_endpoint("task/peek_ready") + def peek_ready_tasks( + self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, + ): + """Fetch the list of ready tasks + + Args: + task_type (str): filtering task per their type + timestamp (datetime.datetime): peek tasks that need to be executed + before that timestamp + num_tasks (int): only peek at num_tasks tasks (with no priority) + num_tasks_priority (int): only peek at num_tasks_priority + tasks (with priority) + + Returns: + a list of tasks + + """ + ... + + @remote_api_endpoint("task/grab_ready") + def grab_ready_tasks( + self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, + ): + """Fetch the list of ready tasks, and mark them as scheduled + + Args: + task_type (str): filtering task per their type + timestamp (datetime.datetime): grab tasks that need to be executed + before that timestamp + num_tasks (int): only grab num_tasks tasks (with no priority) + num_tasks_priority (int): only grab oneshot num_tasks tasks (with + priorities) + + Returns: + a list of tasks + + """ + ... + + @remote_api_endpoint("task_run/schedule_one") + def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): + """Mark a given task as scheduled, adding a task_run entry in the database. + + Args: + task_id (int): the identifier for the task being scheduled + backend_id (str): the identifier of the job in the backend + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + a fresh task_run entry + + """ + ... + + @remote_api_endpoint("task_run/schedule") + def mass_schedule_task_runs(self, task_runs): + """Schedule a bunch of task runs. + + Args: + task_runs (list): a list of dicts with keys: + + - task (int): the identifier for the task being scheduled + - backend_id (str): the identifier of the job in the backend + - metadata (dict): metadata to add to the task_run entry + - scheduled (datetime.datetime): the instant the event occurred + + Returns: + None + """ + ... + + @remote_api_endpoint("task_run/start") + def start_task_run(self, backend_id, metadata=None, timestamp=None): + """Mark a given task as started, updating the corresponding task_run + entry in the database. + + Args: + backend_id (str): the identifier of the job in the backend + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + the updated task_run entry + + """ + ... + + @remote_api_endpoint("task_run/end") + def end_task_run( + self, backend_id, status, metadata=None, timestamp=None, result=None, + ): + """Mark a given task as ended, updating the corresponding task_run entry in the + database. + + Args: + backend_id (str): the identifier of the job in the backend + status (str): how the task ended; one of: 'eventful', 'uneventful', + 'failed' + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + the updated task_run entry + + """ + ... + + @remote_api_endpoint("task/filter_for_archive") + def filter_task_to_archive( + self, + after_ts: str, + before_ts: str, + limit: int = 10, + page_token: Optional[str] = None, + ) -> Dict[str, Any]: + """Compute the tasks to archive within the datetime interval + [after_ts, before_ts[. The method returns a paginated result. + + Returns: + dict with the following keys: + - **next_page_token**: opaque token to be used as + `page_token` to retrieve the next page of result. If absent, + there is no more pages to gather. + - **tasks**: list of task dictionaries with the following keys: + + **id** (str): origin task id + **started** (Optional[datetime]): started date + **scheduled** (datetime): scheduled date + **arguments** (json dict): task's arguments + ... + + """ + ... + + @remote_api_endpoint("task/delete_archived") + def delete_archived_tasks(self, task_ids): + """Delete archived tasks as much as possible. Only the task_ids whose + complete associated task_run have been cleaned up will be. + + """ + ... + + @remote_api_endpoint("task_run/get") + def get_task_runs(self, task_ids, limit=None): + """Search task run for a task id""" + ... + + @remote_api_endpoint("priority_ratios/get") + def get_priority_ratios(self): + ... diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -17,6 +17,7 @@ # the Flask app used as server in these tests @pytest.fixture def app(swh_db_scheduler): + assert hasattr(server, "scheduler") server.scheduler = swh_db_scheduler yield server.app @@ -41,24 +42,24 @@ expected_rules = set( "/" + rule for rule in ( - "set_status_tasks", - "create_task_type", - "get_task_type", - "get_task_types", - "create_tasks", - "disable_tasks", - "get_tasks", - "search_tasks", - "get_task_runs", - "peek_ready_tasks", - "grab_ready_tasks", - "schedule_task_run", - "mass_schedule_task_runs", - "start_task_run", - "end_task_run", - "filter_task_to_archive", - "delete_archived_tasks", - "get_priority_ratios", + "priority_ratios/get", + "task/create", + "task/delete_archived", + "task/disable", + "task/filter_for_archive", + "task/get", + "task/grab_ready", + "task/peek_ready", + "task/search", + "task/set_status", + "task_run/end", + "task_run/get", + "task_run/schedule", + "task_run/schedule_one", + "task_run/start", + "task_type/create", + "task_type/get", + "task_type/get_all", ) ) assert rules == expected_rules diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -9,10 +9,13 @@ import uuid from collections import defaultdict +import inspect from typing import Any, Dict from arrow import utcnow +from swh.scheduler.interface import SchedulerInterface + from .common import tasks_from_template, TEMPLATES, TASK_TYPES @@ -23,6 +26,36 @@ class TestScheduler: + def test_interface(self, swh_scheduler): + """Checks all methods of SchedulerInterface are implemented by this + backend, and that they have the same signature.""" + # Create an instance of the protocol (which cannot be instantiated + # directly, so this creates a subclass, then instantiates it) + interface = type("_", (SchedulerInterface,), {})() + + assert "create_task_type" in dir(interface) + + missing_methods = [] + + for meth_name in dir(interface): + if meth_name.startswith("_"): + continue + interface_meth = getattr(interface, meth_name) + try: + concrete_meth = getattr(swh_scheduler, meth_name) + except AttributeError: + if not getattr(interface_meth, "deprecated_endpoint", False): + # The backend is missing a (non-deprecated) endpoint + missing_methods.append(meth_name) + continue + + expected_signature = inspect.signature(interface_meth) + actual_signature = inspect.signature(concrete_meth) + + assert expected_signature == actual_signature, meth_name + + assert missing_methods == [] + def test_get_priority_ratios(self, swh_scheduler): assert swh_scheduler.get_priority_ratios() == { "high": 0.5,