diff --git a/requirements.txt b/requirements.txt index 7e5d4c6..b600361 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,20 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html arrow +attrs celery >= 4 # Not a direct dependency: missing from celery 4.4.4 future >= 0.18.0 Click elasticsearch > 5.4 flask pika >= 1.1.0 psycopg2 pyyaml vcversioner setuptools # test dependencies # hypothesis diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py index abfa8d6..15e8982 100644 --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -1,17 +1,21 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient +from .serializers import ENCODERS, DECODERS from ..interface import SchedulerInterface class RemoteScheduler(RPCClient): """Proxy to a remote scheduler API """ backend_class = SchedulerInterface + + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS diff --git a/swh/scheduler/api/serializers.py b/swh/scheduler/api/serializers.py new file mode 100644 index 0000000..930c700 --- /dev/null +++ b/swh/scheduler/api/serializers.py @@ -0,0 +1,28 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Decoder and encoders for swh.scheduler.model objects.""" + +from typing import Callable, Dict, List, Tuple + +import attr + +import swh.scheduler.model as model + + +def _encode_model_object(obj): + d = attr.asdict(obj) + d["__type__"] = type(obj).__name__ + return d + + +ENCODERS: List[Tuple[type, str, Callable]] = [ + (model.BaseSchedulerModel, "scheduler_model", _encode_model_object), +] + + +DECODERS: Dict[str, Callable] = { + "scheduler_model": lambda d: getattr(model, d.pop("__type__"))(**d) +} diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 9dfda2a..57e28e9 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,132 +1,134 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from swh.core import config from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler, negotiate from swh.scheduler import get_scheduler from swh.scheduler.interface import SchedulerInterface +from .serializers import ENCODERS, DECODERS scheduler = None def get_global_scheduler(): global scheduler if not scheduler: scheduler = get_scheduler(**app.config["scheduler"]) return scheduler class SchedulerServerApp(RPCServerApp): - pass + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS app = SchedulerServerApp( __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler ) @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) def has_no_empty_params(rule): return len(rule.defaults or ()) >= len(rule.arguments or ()) @app.route("/site-map") @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def site_map(): links = [] for rule in app.url_map.iter_rules(): if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint): links.append( dict( rule=rule.rule, description=getattr(SchedulerInterface, rule.endpoint).__doc__, ) ) # links is now a list of url, endpoint tuples return links def load_and_check_config(config_file, type="local"): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config.read(config_file) vcfg = cfg.get("scheduler") if not vcfg: raise KeyError("Missing '%scheduler' configuration") if type == "local": cls = vcfg.get("cls") if cls != "local": raise ValueError( "The scheduler backend can only be started with a 'local' " "configuration" ) args = vcfg.get("args") if not args: raise KeyError("Invalid configuration; missing 'args' config entry") db = args.get("db") if not db: raise KeyError("Invalid configuration; missing 'db' config entry") return cfg api_cfg = None def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_file = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_file) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == "__main__": print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py new file mode 100644 index 0000000..2260440 --- /dev/null +++ b/swh/scheduler/model.py @@ -0,0 +1,57 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import List, Optional, Tuple + +import attr +import attr.converters + + +@attr.s +class BaseSchedulerModel: + _select_cols: Optional[Tuple[str, ...]] = None + _insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None + + @classmethod + def select_columns(cls) -> Tuple[str, ...]: + """Get all the database columns needed for a `select` on this object type""" + if cls._select_cols is None: + columns: List[str] = [] + for field in attr.fields(cls): + columns.append(field.name) + cls._select_cols = tuple(sorted(columns)) + + return cls._select_cols + + @classmethod + def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: + """Get the database columns and metavars needed for an `insert` or `update` on + this object type. + + This supports the following attributes as booleans in the field's metadata: + - primary_key: handled by the database; never inserted or updated + - auto_now_add: handled by the database; set to now() on insertion, never + updated + - auto_now: handled by the client; set to now() on every insertion and update + """ + if cls._insert_cols_and_metavars is None: + zipped_cols_and_metavars: List[Tuple[str, str]] = [] + + for field in attr.fields(cls): + if any( + field.metadata.get(flag) for flag in ("auto_now_add", "primary_key") + ): + continue + elif field.metadata.get("auto_now"): + zipped_cols_and_metavars.append((field.name, "now()")) + else: + zipped_cols_and_metavars.append((field.name, f"%({field.name})s")) + + zipped_cols_and_metavars.sort() + + cols, metavars = zip(*zipped_cols_and_metavars) + cls._insert_cols_and_metavars = cols, metavars + + return cls._insert_cols_and_metavars diff --git a/swh/scheduler/tests/test_model.py b/swh/scheduler/tests/test_model.py new file mode 100644 index 0000000..26a0129 --- /dev/null +++ b/swh/scheduler/tests/test_model.py @@ -0,0 +1,77 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +import attr + +from swh.scheduler import model + + +def test_select_columns(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str) + test1 = attr.ib(type=str) + a_first_attr = attr.ib(type=str) + + @property + def test2(self): + """This property should not show up in the extracted columns""" + return self.test1 + + assert TestModel.select_columns() == ("a_first_attr", "id", "test1") + + +def test_insert_columns(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str) + test1 = attr.ib(type=str) + + @property + def test2(self): + """This property should not show up in the extracted columns""" + return self.test1 + + assert TestModel.insert_columns_and_metavars() == ( + ("id", "test1"), + ("%(id)s", "%(test1)s"), + ) + + +def test_insert_columns_auto_now_add(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str) + test1 = attr.ib(type=str) + added = attr.ib(type=datetime.datetime, metadata={"auto_now_add": True}) + + assert TestModel.insert_columns_and_metavars() == ( + ("id", "test1"), + ("%(id)s", "%(test1)s"), + ) + + +def test_insert_columns_auto_now(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str) + test1 = attr.ib(type=str) + updated = attr.ib(type=datetime.datetime, metadata={"auto_now": True}) + + assert TestModel.insert_columns_and_metavars() == ( + ("id", "test1", "updated"), + ("%(id)s", "%(test1)s", "now()"), + ) + + +def test_insert_columns_primary_key(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str, metadata={"primary_key": True}) + test1 = attr.ib(type=str) + + assert TestModel.insert_columns_and_metavars() == (("test1",), ("%(test1)s",))