diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py index 4cfe0df..abfa8d6 100644 --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -1,142 +1,17 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient +from ..interface import SchedulerInterface + class RemoteScheduler(RPCClient): """Proxy to a remote scheduler API """ - def close_connection(self): - return self.post("close_connection", {}) - - def set_status_tasks(self, task_ids, status="disabled", next_run=None): - return self.post( - "set_status_tasks", - dict(task_ids=task_ids, status=status, next_run=next_run), - ) - - def create_task_type(self, task_type): - return self.post("create_task_type", {"task_type": task_type}) - - def get_task_type(self, task_type_name): - return self.post("get_task_type", {"task_type_name": task_type_name}) - - def get_task_types(self): - return self.post("get_task_types", {}) - - def create_tasks(self, tasks): - return self.post("create_tasks", {"tasks": tasks}) - - def disable_tasks(self, task_ids): - return self.post("disable_tasks", {"task_ids": task_ids}) - - def get_tasks(self, task_ids): - return self.post("get_tasks", {"task_ids": task_ids}) - - def get_task_runs(self, task_ids, limit=None): - return self.post("get_task_runs", {"task_ids": task_ids, "limit": limit}) - - def search_tasks( - self, - task_id=None, - task_type=None, - status=None, - priority=None, - policy=None, - before=None, - after=None, - limit=None, - ): - return self.post( - "search_tasks", - dict( - task_id=task_id, - task_type=task_type, - status=status, - priority=priority, - policy=policy, - before=before, - after=after, - limit=limit, - ), - ) - - def peek_ready_tasks( - self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None - ): - return self.post( - "peek_ready_tasks", - { - "task_type": task_type, - "timestamp": timestamp, - "num_tasks": num_tasks, - "num_tasks_priority": num_tasks_priority, - }, - ) - - def grab_ready_tasks( - self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None - ): - return self.post( - "grab_ready_tasks", - { - "task_type": task_type, - "timestamp": timestamp, - "num_tasks": num_tasks, - "num_tasks_priority": num_tasks_priority, - }, - ) - - def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): - return self.post( - "schedule_task_run", - { - "task_id": task_id, - "backend_id": backend_id, - "metadata": metadata, - "timestamp": timestamp, - }, - ) - - def mass_schedule_task_runs(self, task_runs): - return self.post("mass_schedule_task_runs", {"task_runs": task_runs}) - - def start_task_run(self, backend_id, metadata=None, timestamp=None): - return self.post( - "start_task_run", - {"backend_id": backend_id, "metadata": metadata, "timestamp": timestamp,}, - ) - - def end_task_run(self, backend_id, status, metadata=None, timestamp=None): - return self.post( - "end_task_run", - { - "backend_id": backend_id, - "status": status, - "metadata": metadata, - "timestamp": timestamp, - }, - ) - - def filter_task_to_archive(self, after_ts, before_ts, limit=10, page_token=None): - return self.post( - "filter_task_to_archive", - { - "after_ts": after_ts, - "before_ts": before_ts, - "limit": limit, - "page_token": page_token, - }, - ) - - def delete_archived_tasks(self, task_ids): - return self.post("delete_archived_tasks", {"task_ids": task_ids}) - - def get_priority_ratios(self): - return self.get("get_priority_ratios") + backend_class = SchedulerInterface diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 02ad19f..9dfda2a 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,266 +1,132 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os import logging - -from flask import request, Flask +import os from swh.core import config -from swh.core.api import ( - decode_request, - error_handler, - encode_data_server as encode_data, -) +from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp +from swh.core.api import encode_data_server as encode_data +from swh.core.api import error_handler, negotiate -from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter -from swh.scheduler import get_scheduler as get_scheduler_from +from swh.scheduler import get_scheduler +from swh.scheduler.interface import SchedulerInterface -app = Flask(__name__) scheduler = None -@app.errorhandler(Exception) -def my_error_handler(exception): - return error_handler(exception, encode_data) - - -def get_sched(): +def get_global_scheduler(): global scheduler if not scheduler: - scheduler = get_scheduler_from(**app.config["scheduler"]) + scheduler = get_scheduler(**app.config["scheduler"]) return scheduler -def has_no_empty_params(rule): - return len(rule.defaults or ()) >= len(rule.arguments or ()) - - -@app.route("/") -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def index(): - return "SWH Scheduler API server" - - -@app.route("/close_connection", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def close_connection(): - return get_sched().close_connection() - - -@app.route("/set_status_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def set_status_tasks(): - return get_sched().set_status_tasks(**decode_request(request)) - - -@app.route("/create_task_type", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def create_task_type(): - return get_sched().create_task_type(**decode_request(request)) - - -@app.route("/get_task_type", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_type(): - return get_sched().get_task_type(**decode_request(request)) - - -@app.route("/get_task_types", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_types(): - return get_sched().get_task_types(**decode_request(request)) +class SchedulerServerApp(RPCServerApp): + pass -@app.route("/create_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def create_tasks(): - return get_sched().create_tasks(**decode_request(request)) - - -@app.route("/disable_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def disable_tasks(): - return get_sched().disable_tasks(**decode_request(request)) - - -@app.route("/get_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_tasks(): - return get_sched().get_tasks(**decode_request(request)) - - -@app.route("/get_task_runs", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_runs(): - return get_sched().get_task_runs(**decode_request(request)) - - -@app.route("/search_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def search_tasks(): - return get_sched().search_tasks(**decode_request(request)) - - -@app.route("/peek_ready_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def peek_ready_tasks(): - return get_sched().peek_ready_tasks(**decode_request(request)) - - -@app.route("/grab_ready_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def grab_ready_tasks(): - return get_sched().grab_ready_tasks(**decode_request(request)) - - -@app.route("/schedule_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def schedule_task_run(): - return get_sched().schedule_task_run(**decode_request(request)) - - -@app.route("/mass_schedule_task_runs", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def mass_schedule_task_runs(): - return get_sched().mass_schedule_task_runs(**decode_request(request)) - - -@app.route("/start_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def start_task_run(): - return get_sched().start_task_run(**decode_request(request)) - - -@app.route("/end_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def end_task_run(): - return get_sched().end_task_run(**decode_request(request)) - - -@app.route("/filter_task_to_archive", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def filter_task_to_archive(): - return get_sched().filter_task_to_archive(**decode_request(request)) +app = SchedulerServerApp( + __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler +) -@app.route("/delete_archived_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def delete_archived_tasks(): - return get_sched().delete_archived_tasks(**decode_request(request)) +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) -@app.route("/get_priority_ratios", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_priority_ratios(): - return get_sched().get_priority_ratios(**decode_request(request)) +def has_no_empty_params(rule): + return len(rule.defaults or ()) >= len(rule.arguments or ()) @app.route("/site-map") @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def site_map(): links = [] - sched = get_sched() for rule in app.url_map.iter_rules(): - if has_no_empty_params(rule) and hasattr(sched, rule.endpoint): + if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint): links.append( - dict(rule=rule.rule, description=getattr(sched, rule.endpoint).__doc__) + dict( + rule=rule.rule, + description=getattr(SchedulerInterface, rule.endpoint).__doc__, + ) ) # links is now a list of url, endpoint tuples return links def load_and_check_config(config_file, type="local"): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config.read(config_file) vcfg = cfg.get("scheduler") if not vcfg: raise KeyError("Missing '%scheduler' configuration") if type == "local": cls = vcfg.get("cls") if cls != "local": raise ValueError( "The scheduler backend can only be started with a 'local' " "configuration" ) args = vcfg.get("args") if not args: raise KeyError("Invalid configuration; missing 'args' config entry") db = args.get("db") if not db: raise KeyError("Invalid configuration; missing 'db' config entry") return cfg api_cfg = None def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_file = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_file) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == "__main__": print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py new file mode 100644 index 0000000..28db8a6 --- /dev/null +++ b/swh/scheduler/interface.py @@ -0,0 +1,254 @@ +# Copyright (C) 2015-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Optional + +from swh.core.api import remote_api_endpoint + + +class SchedulerInterface: + @remote_api_endpoint("task_type/create") + def create_task_type(self, task_type): + """Create a new task type ready for scheduling. + + Args: + task_type (dict): a dictionary with the following keys: + + - type (str): an identifier for the task type + - description (str): a human-readable description of what the + task does + - backend_name (str): the name of the task in the + job-scheduling backend + - default_interval (datetime.timedelta): the default interval + between two task runs + - min_interval (datetime.timedelta): the minimum interval + between two task runs + - max_interval (datetime.timedelta): the maximum interval + between two task runs + - backoff_factor (float): the factor by which the interval + changes at each run + - max_queue_length (int): the maximum length of the task queue + for this task type + + """ + ... + + @remote_api_endpoint("task_type/get") + def get_task_type(self, task_type_name): + """Retrieve the task type with id task_type_name""" + ... + + @remote_api_endpoint("task_type/get_all") + def get_task_types(self): + """Retrieve all registered task types""" + ... + + @remote_api_endpoint("task/create") + def create_tasks(self, tasks, policy="recurring"): + """Create new tasks. + + Args: + tasks (list): each task is a dictionary with the following keys: + + - type (str): the task type + - arguments (dict): the arguments for the task runner, keys: + + - args (list of str): arguments + - kwargs (dict str -> str): keyword arguments + + - next_run (datetime.datetime): the next scheduled run for the + task + + Returns: + a list of created tasks. + + """ + ... + + @remote_api_endpoint("task/set_status") + def set_status_tasks(self, task_ids, status="disabled", next_run=None): + """Set the tasks' status whose ids are listed. + + If given, also set the next_run date. + """ + ... + + @remote_api_endpoint("task/disable") + def disable_tasks(self, task_ids): + """Disable the tasks whose ids are listed.""" + ... + + @remote_api_endpoint("task/search") + def search_tasks( + self, + task_id=None, + task_type=None, + status=None, + priority=None, + policy=None, + before=None, + after=None, + limit=None, + ): + """Search tasks from selected criterions""" + ... + + @remote_api_endpoint("task/get") + def get_tasks(self, task_ids): + """Retrieve the info of tasks whose ids are listed.""" + ... + + @remote_api_endpoint("task/peek_ready") + def peek_ready_tasks( + self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, + ): + """Fetch the list of ready tasks + + Args: + task_type (str): filtering task per their type + timestamp (datetime.datetime): peek tasks that need to be executed + before that timestamp + num_tasks (int): only peek at num_tasks tasks (with no priority) + num_tasks_priority (int): only peek at num_tasks_priority + tasks (with priority) + + Returns: + a list of tasks + + """ + ... + + @remote_api_endpoint("task/grab_ready") + def grab_ready_tasks( + self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, + ): + """Fetch the list of ready tasks, and mark them as scheduled + + Args: + task_type (str): filtering task per their type + timestamp (datetime.datetime): grab tasks that need to be executed + before that timestamp + num_tasks (int): only grab num_tasks tasks (with no priority) + num_tasks_priority (int): only grab oneshot num_tasks tasks (with + priorities) + + Returns: + a list of tasks + + """ + ... + + @remote_api_endpoint("task_run/schedule_one") + def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): + """Mark a given task as scheduled, adding a task_run entry in the database. + + Args: + task_id (int): the identifier for the task being scheduled + backend_id (str): the identifier of the job in the backend + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + a fresh task_run entry + + """ + ... + + @remote_api_endpoint("task_run/schedule") + def mass_schedule_task_runs(self, task_runs): + """Schedule a bunch of task runs. + + Args: + task_runs (list): a list of dicts with keys: + + - task (int): the identifier for the task being scheduled + - backend_id (str): the identifier of the job in the backend + - metadata (dict): metadata to add to the task_run entry + - scheduled (datetime.datetime): the instant the event occurred + + Returns: + None + """ + ... + + @remote_api_endpoint("task_run/start") + def start_task_run(self, backend_id, metadata=None, timestamp=None): + """Mark a given task as started, updating the corresponding task_run + entry in the database. + + Args: + backend_id (str): the identifier of the job in the backend + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + the updated task_run entry + + """ + ... + + @remote_api_endpoint("task_run/end") + def end_task_run( + self, backend_id, status, metadata=None, timestamp=None, result=None, + ): + """Mark a given task as ended, updating the corresponding task_run entry in the + database. + + Args: + backend_id (str): the identifier of the job in the backend + status (str): how the task ended; one of: 'eventful', 'uneventful', + 'failed' + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + the updated task_run entry + + """ + ... + + @remote_api_endpoint("task/filter_for_archive") + def filter_task_to_archive( + self, + after_ts: str, + before_ts: str, + limit: int = 10, + page_token: Optional[str] = None, + ) -> Dict[str, Any]: + """Compute the tasks to archive within the datetime interval + [after_ts, before_ts[. The method returns a paginated result. + + Returns: + dict with the following keys: + - **next_page_token**: opaque token to be used as + `page_token` to retrieve the next page of result. If absent, + there is no more pages to gather. + - **tasks**: list of task dictionaries with the following keys: + + **id** (str): origin task id + **started** (Optional[datetime]): started date + **scheduled** (datetime): scheduled date + **arguments** (json dict): task's arguments + ... + + """ + ... + + @remote_api_endpoint("task/delete_archived") + def delete_archived_tasks(self, task_ids): + """Delete archived tasks as much as possible. Only the task_ids whose + complete associated task_run have been cleaned up will be. + + """ + ... + + @remote_api_endpoint("task_run/get") + def get_task_runs(self, task_ids, limit=None): + """Search task run for a task id""" + ... + + @remote_api_endpoint("priority_ratios/get") + def get_priority_ratios(self): + ... diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index 73b3373..d2b696d 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,64 +1,65 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from flask import url_for import swh.scheduler.api.server as server from swh.scheduler.api.client import RemoteScheduler from swh.scheduler.tests.test_scheduler import TestScheduler # noqa # tests are executed using imported class (TestScheduler) using overloaded # swh_scheduler fixture below # the Flask app used as server in these tests @pytest.fixture def app(swh_db_scheduler): + assert hasattr(server, "scheduler") server.scheduler = swh_db_scheduler yield server.app # the RPCClient class used as client used in these tests @pytest.fixture def swh_rpc_client_class(): return RemoteScheduler @pytest.fixture def swh_scheduler(swh_rpc_client, app): yield swh_rpc_client def test_site_map(flask_app_client): sitemap = flask_app_client.get(url_for("site_map")) assert sitemap.headers["Content-Type"] == "application/json" rules = set(x["rule"] for x in sitemap.json) # we expect at least these rules expected_rules = set( "/" + rule for rule in ( - "set_status_tasks", - "create_task_type", - "get_task_type", - "get_task_types", - "create_tasks", - "disable_tasks", - "get_tasks", - "search_tasks", - "get_task_runs", - "peek_ready_tasks", - "grab_ready_tasks", - "schedule_task_run", - "mass_schedule_task_runs", - "start_task_run", - "end_task_run", - "filter_task_to_archive", - "delete_archived_tasks", - "get_priority_ratios", + "priority_ratios/get", + "task/create", + "task/delete_archived", + "task/disable", + "task/filter_for_archive", + "task/get", + "task/grab_ready", + "task/peek_ready", + "task/search", + "task/set_status", + "task_run/end", + "task_run/get", + "task_run/schedule", + "task_run/schedule_one", + "task_run/start", + "task_type/create", + "task_type/get", + "task_type/get_all", ) ) assert rules == expected_rules diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index f292059..53e065e 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,584 +1,617 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import random import uuid from collections import defaultdict +import inspect from typing import Any, Dict from arrow import utcnow +from swh.scheduler.interface import SchedulerInterface + from .common import tasks_from_template, TEMPLATES, TASK_TYPES def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} class TestScheduler: + def test_interface(self, swh_scheduler): + """Checks all methods of SchedulerInterface are implemented by this + backend, and that they have the same signature.""" + # Create an instance of the protocol (which cannot be instantiated + # directly, so this creates a subclass, then instantiates it) + interface = type("_", (SchedulerInterface,), {})() + + assert "create_task_type" in dir(interface) + + missing_methods = [] + + for meth_name in dir(interface): + if meth_name.startswith("_"): + continue + interface_meth = getattr(interface, meth_name) + try: + concrete_meth = getattr(swh_scheduler, meth_name) + except AttributeError: + if not getattr(interface_meth, "deprecated_endpoint", False): + # The backend is missing a (non-deprecated) endpoint + missing_methods.append(meth_name) + continue + + expected_signature = inspect.signature(interface_meth) + actual_signature = inspect.signature(concrete_meth) + + assert expected_signature == actual_signature, meth_name + + assert missing_methods == [] + def test_get_priority_ratios(self, swh_scheduler): assert swh_scheduler.get_priority_ratios() == { "high": 0.5, "normal": 0.3, "low": 0.2, } def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) tt2 = TASK_TYPES["hg"] swh_scheduler.create_task_type(tt2) assert tt == swh_scheduler.get_task_type(tt["type"]) assert tt2 == swh_scheduler.get_task_type(tt2["type"]) def test_create_task_type_idempotence(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES["git"], TASK_TYPES["hg"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt2) actual_task_types = swh_scheduler.get_task_types() assert tt in actual_task_types assert tt2 in actual_task_types def test_create_tasks(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) num_tasks_priority = 100 tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), 100) tasks_2 = tasks_from_template( TEMPLATES["hg"], utcnow(), 100, num_tasks_priority, priorities=priority_ratio, ) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t["id"] for t in ret1]) # creating the same set result in the same ids ret = swh_scheduler.create_tasks(tasks) set_ret = set([t["id"] for t in ret]) # Idempotence results assert set_ret == set_ret1 assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task["type"].split("-")[-1]] assert task["id"] not in ids assert task["status"] == "next_run_not_scheduled" assert task["current_interval"] == task_type["default_interval"] assert task["policy"] == orig_task.get("policy", "recurring") priority = task.get("priority") if priority: actual_priorities[priority] += 1 assert task["retries_left"] == (task_type["num_retries"] or 0) ids.add(task["id"]) del task["id"] del task["status"] del task["current_interval"] del task["retries_left"] if "policy" not in orig_task: del task["policy"] if "priority" not in orig_task: del task["priority"] assert task == orig_task assert dict(actual_priorities) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() } def test_peek_ready_tasks_no_priority(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] tasks = tasks_from_template(TEMPLATES["git"], t, 100) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): assert ready_tasks[i]["next_run"] <= ready_tasks[i + 1]["next_run"] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks) // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks(task_type, num_tasks=limit) assert len(ready_tasks_limited) == limit assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit - 1]["next_run"] ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts ) for ready_task in ready_tasks_timestamped: assert ready_task["next_run"] <= max_ts # Make sure we get proper behavior for the first ready tasks assert ready_tasks[: len(ready_tasks_timestamped)] == ready_tasks_timestamped # Limit by both ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit // 3 ) assert len(ready_tasks_both) <= limit // 3 for ready_task in ready_tasks_both: assert ready_task["next_run"] <= max_ts assert ready_task in ready_tasks[: limit // 3] def _priority_ratio(self, swh_scheduler): return swh_scheduler.get_priority_ratios() def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["git"], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio, ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) # take all available tasks ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) assert num_tasks_priority + num_tasks_no_priority == len(ready_tasks) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get("priority") if priority: count_tasks_per_priority[priority] += 1 assert dict(count_tasks_per_priority) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() } # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority // 2) num_tasks_priority = random.randrange(5, num_tasks_priority // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority ) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get("priority") count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] assert actual_prio == expected_count or actual_prio == expected_count + 1 assert count_tasks_per_priority[None] == num_tasks def test_grab_ready_tasks(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["git"], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio, ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) first_ready_tasks = swh_scheduler.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10 ) grabbed_tasks = swh_scheduler.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10 ) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed assert peeked["priority"] == grabbed["priority"] def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = sorted(tasks[:length], key=lambda x: x["id"]) tasks[:length] = [] ret = swh_scheduler.get_tasks(task["id"] for task in cur_tasks) # result is not guaranteed to be sorted ret.sort(key=lambda x: x["id"]) assert ret == cur_tasks def test_search_tasks(self, swh_scheduler): def make_real_dicts(lst): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in lst] self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) assert make_real_dicts(swh_scheduler.search_tasks()) == make_real_dicts(tasks) def assert_filtered_task_ok( self, task: Dict[str, Any], after: datetime.datetime, before: datetime.datetime ) -> None: """Ensure filtered tasks have the right expected properties (within the range, recurring disabled, etc..) """ started = task["started"] date = started if started is not None else task["scheduled"] assert after <= date and date <= before if task["task_policy"] == "oneshot": assert task["task_status"] in ["completed", "disabled"] if task["task_policy"] == "recurring": assert task["task_status"] in ["disabled"] def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") _tasks.append(t) # Randomly update task's status per policy status_per_policy = {"recurring": 0, "oneshot": 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] "recurring": [ (1, "disabled"), (0, "completed"), (0, "next_run_not_scheduled"), ], "oneshot": [ (0, "next_run_not_scheduled"), (1, "disabled"), (1, "completed"), ], } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task["policy"] _task_ids[policy].append(task["id"]) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task["id"]) swh_scheduler.disable_tasks(tasks_to_update["recurring"]) # hack: change the status to something else than completed/disabled swh_scheduler.set_status_tasks( _task_ids["oneshot"], status="next_run_not_scheduled" ) # complete the tasks to update swh_scheduler.set_status_tasks(tasks_to_update["oneshot"], status="completed") total_tasks_filtered = ( status_per_policy["recurring"] + status_per_policy["oneshot"] ) # no pagination scenario # retrieve tasks to archive after = _time.shift(days=-1) after_ts = after.format("YYYY-MM-DD") before = utcnow().shift(days=1) before_ts = before.format("YYYY-MM-DD") tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=total_tasks ) tasks_to_archive = tasks_result["tasks"] assert len(tasks_to_archive) == total_tasks_filtered assert tasks_result.get("next_page_token") is None actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in tasks_to_archive: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy # pagination scenario nb_tasks = 3 tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) == nb_tasks next_page_token = tasks_result["next_page_token"] assert next_page_token is not None all_tasks = tasks_to_archive2 while next_page_token is not None: # Retrieve paginated results tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks, page_token=next_page_token, ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) <= nb_tasks all_tasks.extend(tasks_to_archive2) next_page_token = tasks_result.get("next_page_token") actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in all_tasks: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy def test_delete_archived_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") c = random.randint(0, 100) if c <= percent: _tasks.append({"task_id": t["task"], "task_run_id": t["id"]}) swh_scheduler.delete_archived_tasks(_tasks) all_tasks = [task["id"] for task in swh_scheduler.search_tasks()] tasks_count = len(all_tasks) tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) assert tasks_count == total_tasks - len(_tasks) assert tasks_run_count == total_tasks - len(_tasks) def test_get_task_runs_no_task(self, swh_scheduler): """No task exist in the scheduler's db, get_task_runs() should always return an empty list. """ assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_no_task_executed(self, swh_scheduler): """No task has been executed yet, get_task_runs() should always return an empty list. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) swh_scheduler.create_tasks(recurring + oneshots) assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_with_scheduled(self, swh_scheduler): """Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) assert not swh_scheduler.get_task_runs(task_ids=[total_tasks + 1]) btask = backend_tasks[0] runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 run = runs[0] assert subdict(run, excl=("id",)) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": None, "ended": None, "metadata": None, "status": "scheduled", } runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks], limit=2 ) assert len(runs) == 2 runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks] ) assert len(runs) == total_tasks keys = ("task", "backend_id", "scheduled") assert ( sorted([subdict(x, keys) for x in runs], key=lambda x: x["task"]) == backend_tasks ) def test_get_task_runs_with_executed(self, swh_scheduler): """Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() swh_scheduler.start_task_run( btask["backend_id"], metadata={"something": "stupid"}, timestamp=ts ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": None, "metadata": {"something": "stupid"}, "status": "started", } ts2 = utcnow() swh_scheduler.end_task_run( btask["backend_id"], metadata={"other": "stuff"}, timestamp=ts2, status="eventful", ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": ts2, "metadata": {"something": "stupid", "other": "stuff"}, "status": "eventful", } def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt)