Page MenuHomeSoftware Heritage

D3257.diff
No OneTemporary

D3257.diff

diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py
--- a/swh/scheduler/api/client.py
+++ b/swh/scheduler/api/client.py
@@ -6,137 +6,12 @@
from swh.core.api import RPCClient
+from ..interface import SchedulerInterface
+
class RemoteScheduler(RPCClient):
"""Proxy to a remote scheduler API
"""
- def close_connection(self):
- return self.post("close_connection", {})
-
- def set_status_tasks(self, task_ids, status="disabled", next_run=None):
- return self.post(
- "set_status_tasks",
- dict(task_ids=task_ids, status=status, next_run=next_run),
- )
-
- def create_task_type(self, task_type):
- return self.post("create_task_type", {"task_type": task_type})
-
- def get_task_type(self, task_type_name):
- return self.post("get_task_type", {"task_type_name": task_type_name})
-
- def get_task_types(self):
- return self.post("get_task_types", {})
-
- def create_tasks(self, tasks):
- return self.post("create_tasks", {"tasks": tasks})
-
- def disable_tasks(self, task_ids):
- return self.post("disable_tasks", {"task_ids": task_ids})
-
- def get_tasks(self, task_ids):
- return self.post("get_tasks", {"task_ids": task_ids})
-
- def get_task_runs(self, task_ids, limit=None):
- return self.post("get_task_runs", {"task_ids": task_ids, "limit": limit})
-
- def search_tasks(
- self,
- task_id=None,
- task_type=None,
- status=None,
- priority=None,
- policy=None,
- before=None,
- after=None,
- limit=None,
- ):
- return self.post(
- "search_tasks",
- dict(
- task_id=task_id,
- task_type=task_type,
- status=status,
- priority=priority,
- policy=policy,
- before=before,
- after=after,
- limit=limit,
- ),
- )
-
- def peek_ready_tasks(
- self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None
- ):
- return self.post(
- "peek_ready_tasks",
- {
- "task_type": task_type,
- "timestamp": timestamp,
- "num_tasks": num_tasks,
- "num_tasks_priority": num_tasks_priority,
- },
- )
-
- def grab_ready_tasks(
- self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None
- ):
- return self.post(
- "grab_ready_tasks",
- {
- "task_type": task_type,
- "timestamp": timestamp,
- "num_tasks": num_tasks,
- "num_tasks_priority": num_tasks_priority,
- },
- )
-
- def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None):
- return self.post(
- "schedule_task_run",
- {
- "task_id": task_id,
- "backend_id": backend_id,
- "metadata": metadata,
- "timestamp": timestamp,
- },
- )
-
- def mass_schedule_task_runs(self, task_runs):
- return self.post("mass_schedule_task_runs", {"task_runs": task_runs})
-
- def start_task_run(self, backend_id, metadata=None, timestamp=None):
- return self.post(
- "start_task_run",
- {"backend_id": backend_id, "metadata": metadata, "timestamp": timestamp,},
- )
-
- def end_task_run(self, backend_id, status, metadata=None, timestamp=None):
- return self.post(
- "end_task_run",
- {
- "backend_id": backend_id,
- "status": status,
- "metadata": metadata,
- "timestamp": timestamp,
- },
- )
-
- def filter_task_to_archive(self, after_ts, before_ts, limit=10, page_token=None):
- return self.post(
- "filter_task_to_archive",
- {
- "after_ts": after_ts,
- "before_ts": before_ts,
- "limit": limit,
- "page_token": page_token,
- },
- )
-
- def delete_archived_tasks(self, task_ids):
- return self.post("delete_archived_tasks", {"task_ids": task_ids})
-
- def get_priority_ratios(self):
- return self.get("get_priority_ratios")
+ backend_class = SchedulerInterface
diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py
--- a/swh/scheduler/api/server.py
+++ b/swh/scheduler/api/server.py
@@ -3,180 +3,44 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import os
import logging
-
-from flask import request, Flask
+import os
from swh.core import config
-from swh.core.api import (
- decode_request,
- error_handler,
- encode_data_server as encode_data,
-)
+from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp
+from swh.core.api import encode_data_server as encode_data
+from swh.core.api import error_handler, negotiate
-from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter
-from swh.scheduler import get_scheduler as get_scheduler_from
+from swh.scheduler import get_scheduler
+from swh.scheduler.interface import SchedulerInterface
-app = Flask(__name__)
scheduler = None
-@app.errorhandler(Exception)
-def my_error_handler(exception):
- return error_handler(exception, encode_data)
-
-
-def get_sched():
+def get_global_scheduler():
global scheduler
if not scheduler:
- scheduler = get_scheduler_from(**app.config["scheduler"])
+ scheduler = get_scheduler(**app.config["scheduler"])
return scheduler
-def has_no_empty_params(rule):
- return len(rule.defaults or ()) >= len(rule.arguments or ())
-
-
-@app.route("/")
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def index():
- return "SWH Scheduler API server"
-
-
-@app.route("/close_connection", methods=["GET", "POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def close_connection():
- return get_sched().close_connection()
-
-
-@app.route("/set_status_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def set_status_tasks():
- return get_sched().set_status_tasks(**decode_request(request))
-
-
-@app.route("/create_task_type", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def create_task_type():
- return get_sched().create_task_type(**decode_request(request))
-
-
-@app.route("/get_task_type", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_task_type():
- return get_sched().get_task_type(**decode_request(request))
-
-
-@app.route("/get_task_types", methods=["GET", "POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_task_types():
- return get_sched().get_task_types(**decode_request(request))
+class SchedulerServerApp(RPCServerApp):
+ pass
-@app.route("/create_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def create_tasks():
- return get_sched().create_tasks(**decode_request(request))
-
-
-@app.route("/disable_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def disable_tasks():
- return get_sched().disable_tasks(**decode_request(request))
-
-
-@app.route("/get_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_tasks():
- return get_sched().get_tasks(**decode_request(request))
-
-
-@app.route("/get_task_runs", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_task_runs():
- return get_sched().get_task_runs(**decode_request(request))
-
-
-@app.route("/search_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def search_tasks():
- return get_sched().search_tasks(**decode_request(request))
-
-
-@app.route("/peek_ready_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def peek_ready_tasks():
- return get_sched().peek_ready_tasks(**decode_request(request))
-
-
-@app.route("/grab_ready_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def grab_ready_tasks():
- return get_sched().grab_ready_tasks(**decode_request(request))
-
-
-@app.route("/schedule_task_run", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def schedule_task_run():
- return get_sched().schedule_task_run(**decode_request(request))
-
-
-@app.route("/mass_schedule_task_runs", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def mass_schedule_task_runs():
- return get_sched().mass_schedule_task_runs(**decode_request(request))
-
-
-@app.route("/start_task_run", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def start_task_run():
- return get_sched().start_task_run(**decode_request(request))
-
-
-@app.route("/end_task_run", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def end_task_run():
- return get_sched().end_task_run(**decode_request(request))
-
-
-@app.route("/filter_task_to_archive", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def filter_task_to_archive():
- return get_sched().filter_task_to_archive(**decode_request(request))
+app = SchedulerServerApp(
+ __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler
+)
-@app.route("/delete_archived_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def delete_archived_tasks():
- return get_sched().delete_archived_tasks(**decode_request(request))
+@app.errorhandler(Exception)
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
-@app.route("/get_priority_ratios", methods=["GET", "POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_priority_ratios():
- return get_sched().get_priority_ratios(**decode_request(request))
+def has_no_empty_params(rule):
+ return len(rule.defaults or ()) >= len(rule.arguments or ())
@app.route("/site-map")
@@ -184,11 +48,13 @@
@negotiate(JSONFormatter)
def site_map():
links = []
- sched = get_sched()
for rule in app.url_map.iter_rules():
- if has_no_empty_params(rule) and hasattr(sched, rule.endpoint):
+ if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint):
links.append(
- dict(rule=rule.rule, description=getattr(sched, rule.endpoint).__doc__)
+ dict(
+ rule=rule.rule,
+ description=getattr(SchedulerInterface, rule.endpoint).__doc__,
+ )
)
# links is now a list of url, endpoint tuples
return links
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/interface.py
@@ -0,0 +1,254 @@
+# Copyright (C) 2015-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Dict, Optional
+
+from swh.core.api import remote_api_endpoint
+
+
+class SchedulerInterface:
+ @remote_api_endpoint("task_type/create")
+ def create_task_type(self, task_type):
+ """Create a new task type ready for scheduling.
+
+ Args:
+ task_type (dict): a dictionary with the following keys:
+
+ - type (str): an identifier for the task type
+ - description (str): a human-readable description of what the
+ task does
+ - backend_name (str): the name of the task in the
+ job-scheduling backend
+ - default_interval (datetime.timedelta): the default interval
+ between two task runs
+ - min_interval (datetime.timedelta): the minimum interval
+ between two task runs
+ - max_interval (datetime.timedelta): the maximum interval
+ between two task runs
+ - backoff_factor (float): the factor by which the interval
+ changes at each run
+ - max_queue_length (int): the maximum length of the task queue
+ for this task type
+
+ """
+ ...
+
+ @remote_api_endpoint("task_type/get")
+ def get_task_type(self, task_type_name):
+ """Retrieve the task type with id task_type_name"""
+ ...
+
+ @remote_api_endpoint("task_type/get_all")
+ def get_task_types(self):
+ """Retrieve all registered task types"""
+ ...
+
+ @remote_api_endpoint("task/create")
+ def create_tasks(self, tasks, policy="recurring"):
+ """Create new tasks.
+
+ Args:
+ tasks (list): each task is a dictionary with the following keys:
+
+ - type (str): the task type
+ - arguments (dict): the arguments for the task runner, keys:
+
+ - args (list of str): arguments
+ - kwargs (dict str -> str): keyword arguments
+
+ - next_run (datetime.datetime): the next scheduled run for the
+ task
+
+ Returns:
+ a list of created tasks.
+
+ """
+ ...
+
+ @remote_api_endpoint("task/set_status")
+ def set_status_tasks(self, task_ids, status="disabled", next_run=None):
+ """Set the tasks' status whose ids are listed.
+
+ If given, also set the next_run date.
+ """
+ ...
+
+ @remote_api_endpoint("task/disable")
+ def disable_tasks(self, task_ids):
+ """Disable the tasks whose ids are listed."""
+ ...
+
+ @remote_api_endpoint("task/search")
+ def search_tasks(
+ self,
+ task_id=None,
+ task_type=None,
+ status=None,
+ priority=None,
+ policy=None,
+ before=None,
+ after=None,
+ limit=None,
+ ):
+ """Search tasks from selected criterions"""
+ ...
+
+ @remote_api_endpoint("task/get")
+ def get_tasks(self, task_ids):
+ """Retrieve the info of tasks whose ids are listed."""
+ ...
+
+ @remote_api_endpoint("task/peek_ready")
+ def peek_ready_tasks(
+ self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None,
+ ):
+ """Fetch the list of ready tasks
+
+ Args:
+ task_type (str): filtering task per their type
+ timestamp (datetime.datetime): peek tasks that need to be executed
+ before that timestamp
+ num_tasks (int): only peek at num_tasks tasks (with no priority)
+ num_tasks_priority (int): only peek at num_tasks_priority
+ tasks (with priority)
+
+ Returns:
+ a list of tasks
+
+ """
+ ...
+
+ @remote_api_endpoint("task/grab_ready")
+ def grab_ready_tasks(
+ self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None,
+ ):
+ """Fetch the list of ready tasks, and mark them as scheduled
+
+ Args:
+ task_type (str): filtering task per their type
+ timestamp (datetime.datetime): grab tasks that need to be executed
+ before that timestamp
+ num_tasks (int): only grab num_tasks tasks (with no priority)
+ num_tasks_priority (int): only grab oneshot num_tasks tasks (with
+ priorities)
+
+ Returns:
+ a list of tasks
+
+ """
+ ...
+
+ @remote_api_endpoint("task_run/schedule_one")
+ def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None):
+ """Mark a given task as scheduled, adding a task_run entry in the database.
+
+ Args:
+ task_id (int): the identifier for the task being scheduled
+ backend_id (str): the identifier of the job in the backend
+ metadata (dict): metadata to add to the task_run entry
+ timestamp (datetime.datetime): the instant the event occurred
+
+ Returns:
+ a fresh task_run entry
+
+ """
+ ...
+
+ @remote_api_endpoint("task_run/schedule")
+ def mass_schedule_task_runs(self, task_runs):
+ """Schedule a bunch of task runs.
+
+ Args:
+ task_runs (list): a list of dicts with keys:
+
+ - task (int): the identifier for the task being scheduled
+ - backend_id (str): the identifier of the job in the backend
+ - metadata (dict): metadata to add to the task_run entry
+ - scheduled (datetime.datetime): the instant the event occurred
+
+ Returns:
+ None
+ """
+ ...
+
+ @remote_api_endpoint("task_run/start")
+ def start_task_run(self, backend_id, metadata=None, timestamp=None):
+ """Mark a given task as started, updating the corresponding task_run
+ entry in the database.
+
+ Args:
+ backend_id (str): the identifier of the job in the backend
+ metadata (dict): metadata to add to the task_run entry
+ timestamp (datetime.datetime): the instant the event occurred
+
+ Returns:
+ the updated task_run entry
+
+ """
+ ...
+
+ @remote_api_endpoint("task_run/end")
+ def end_task_run(
+ self, backend_id, status, metadata=None, timestamp=None, result=None,
+ ):
+ """Mark a given task as ended, updating the corresponding task_run entry in the
+ database.
+
+ Args:
+ backend_id (str): the identifier of the job in the backend
+ status (str): how the task ended; one of: 'eventful', 'uneventful',
+ 'failed'
+ metadata (dict): metadata to add to the task_run entry
+ timestamp (datetime.datetime): the instant the event occurred
+
+ Returns:
+ the updated task_run entry
+
+ """
+ ...
+
+ @remote_api_endpoint("task/filter_for_archive")
+ def filter_task_to_archive(
+ self,
+ after_ts: str,
+ before_ts: str,
+ limit: int = 10,
+ page_token: Optional[str] = None,
+ ) -> Dict[str, Any]:
+ """Compute the tasks to archive within the datetime interval
+ [after_ts, before_ts[. The method returns a paginated result.
+
+ Returns:
+ dict with the following keys:
+ - **next_page_token**: opaque token to be used as
+ `page_token` to retrieve the next page of result. If absent,
+ there is no more pages to gather.
+ - **tasks**: list of task dictionaries with the following keys:
+
+ **id** (str): origin task id
+ **started** (Optional[datetime]): started date
+ **scheduled** (datetime): scheduled date
+ **arguments** (json dict): task's arguments
+ ...
+
+ """
+ ...
+
+ @remote_api_endpoint("task/delete_archived")
+ def delete_archived_tasks(self, task_ids):
+ """Delete archived tasks as much as possible. Only the task_ids whose
+ complete associated task_run have been cleaned up will be.
+
+ """
+ ...
+
+ @remote_api_endpoint("task_run/get")
+ def get_task_runs(self, task_ids, limit=None):
+ """Search task run for a task id"""
+ ...
+
+ @remote_api_endpoint("priority_ratios/get")
+ def get_priority_ratios(self):
+ ...
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -17,6 +17,7 @@
# the Flask app used as server in these tests
@pytest.fixture
def app(swh_db_scheduler):
+ assert hasattr(server, "scheduler")
server.scheduler = swh_db_scheduler
yield server.app
@@ -41,24 +42,24 @@
expected_rules = set(
"/" + rule
for rule in (
- "set_status_tasks",
- "create_task_type",
- "get_task_type",
- "get_task_types",
- "create_tasks",
- "disable_tasks",
- "get_tasks",
- "search_tasks",
- "get_task_runs",
- "peek_ready_tasks",
- "grab_ready_tasks",
- "schedule_task_run",
- "mass_schedule_task_runs",
- "start_task_run",
- "end_task_run",
- "filter_task_to_archive",
- "delete_archived_tasks",
- "get_priority_ratios",
+ "priority_ratios/get",
+ "task/create",
+ "task/delete_archived",
+ "task/disable",
+ "task/filter_for_archive",
+ "task/get",
+ "task/grab_ready",
+ "task/peek_ready",
+ "task/search",
+ "task/set_status",
+ "task_run/end",
+ "task_run/get",
+ "task_run/schedule",
+ "task_run/schedule_one",
+ "task_run/start",
+ "task_type/create",
+ "task_type/get",
+ "task_type/get_all",
)
)
assert rules == expected_rules
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -9,10 +9,13 @@
import uuid
from collections import defaultdict
+import inspect
from typing import Any, Dict
from arrow import utcnow
+from swh.scheduler.interface import SchedulerInterface
+
from .common import tasks_from_template, TEMPLATES, TASK_TYPES
@@ -23,6 +26,36 @@
class TestScheduler:
+ def test_interface(self, swh_scheduler):
+ """Checks all methods of SchedulerInterface are implemented by this
+ backend, and that they have the same signature."""
+ # Create an instance of the protocol (which cannot be instantiated
+ # directly, so this creates a subclass, then instantiates it)
+ interface = type("_", (SchedulerInterface,), {})()
+
+ assert "create_task_type" in dir(interface)
+
+ missing_methods = []
+
+ for meth_name in dir(interface):
+ if meth_name.startswith("_"):
+ continue
+ interface_meth = getattr(interface, meth_name)
+ try:
+ concrete_meth = getattr(swh_scheduler, meth_name)
+ except AttributeError:
+ if not getattr(interface_meth, "deprecated_endpoint", False):
+ # The backend is missing a (non-deprecated) endpoint
+ missing_methods.append(meth_name)
+ continue
+
+ expected_signature = inspect.signature(interface_meth)
+ actual_signature = inspect.signature(concrete_meth)
+
+ assert expected_signature == actual_signature, meth_name
+
+ assert missing_methods == []
+
def test_get_priority_ratios(self, swh_scheduler):
assert swh_scheduler.get_priority_ratios() == {
"high": 0.5,

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 4:33 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217335

Event Timeline