diff --git a/debian/control b/debian/control index 0c5167f..6eca8ff 100644 --- a/debian/control +++ b/debian/control @@ -1,23 +1,24 @@ Source: swh-scheduler Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-arrow, python3-celery, python3-click, python3-elasticsearch (>= 5.4.0), + python3-flask, python3-nose, python3-psycopg2, python3-setuptools, python3-swh.core (>= 0.0.34), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSCH/ Package: python3-swh.scheduler Architecture: all Depends: python3-swh.core (>= 0.0.34), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Scheduler diff --git a/requirements.txt b/requirements.txt index f889ee0..df47e53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html arrow celery Click +elasticsearch>5.4 +flask psycopg2 vcversioner -elasticsearch>5.4 diff --git a/swh/scheduler/api/__init__.py b/swh/scheduler/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py new file mode 100644 index 0000000..f044b9f --- /dev/null +++ b/swh/scheduler/api/client.py @@ -0,0 +1,96 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from swh.core.api import SWHRemoteAPI + + +class SchedulerAPIError(Exception): + """Specific internal scheduler api issue (mainly connection) + + """ + + def __str__(self): + args = self.args + return 'An unexpected error occurred in the api backend: %s' % args + + +class RemoteScheduler(SWHRemoteAPI): + """Proxy to a remote scheduler API + + """ + def __init__(self, url): + super().__init__(api_exception=SchedulerAPIError, url=url) + + def create_task_type(self, task_type): + return self.post('create_task_type', {'task_type': task_type}) + + def get_task_type(self, task_type_name): + return self.post('get_task_type', {'task_type_name': task_type_name}) + + def get_task_types(self): + return self.post('get_task_types', {}) + + def create_tasks(self, tasks): + return self.post('create_tasks', {'tasks': tasks}) + + def disable_tasks(self, task_ids): + return self.post('disable_tasks', {'task_ids': task_ids}) + + def get_tasks(self, task_ids): + return self.post('get_tasks', {'task_ids': task_ids}) + + def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None): + return self.post('peek_ready_tasks', { + 'task_type': task_type, + 'timestamp': timestamp, + 'num_tasks': num_tasks, + }) + + def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None): + return self.post('grab_ready_tasks', { + 'task_type': task_type, + 'timestamp': timestamp, + 'num_tasks': num_tasks, + }) + + def schedule_task_run(self, task_id, backend_id, metadata=None, + timestamp=None): + return self.post('schedule_task_run', { + 'task_id': task_id, + 'backend_id': backend_id, + 'metadata': metadata, + 'timestamp': timestamp, + }) + + def mass_schedule_task_runs(self, task_runs): + return self.post('mass_schedule_task_runs', {'task_runs': task_runs}) + + def start_task_run(self, backend_id, metadata=None, timestamp=None): + return self.post('start_task_run', { + 'backend_id': backend_id, + 'metadata': metadata, + 'timestamp': timestamp, + }) + + def end_task_run(self, backend_id, status, metadata=None, timestamp=None): + return self.post('end_task_run', { + 'backend_id': backend_id, + 'status': status, + 'metadata': metadata, + 'timestamp': timestamp, + }) + + def filter_task_to_archive(self, after_ts, before_ts, limit=10, + last_id=-1): + return self.post('filter_task_to_archive', { + 'after_ts': after_ts, + 'before_ts': before_ts, + 'limit': limit, + 'last_id': last_id, + }) + + def delete_archived_tasks(self, task_ids): + return self.post('delete_archived_tasks', {'task_ids': task_ids}) diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py new file mode 100644 index 0000000..687c378 --- /dev/null +++ b/swh/scheduler/api/server.py @@ -0,0 +1,144 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import click + +from flask import g, request + +from swh.core import config +from swh.scheduler import get_scheduler +from swh.core.api import (SWHServerAPIApp, decode_request, + error_handler, + encode_data_server as encode_data) + +DEFAULT_CONFIG_PATH = 'backend/scheduler' +DEFAULT_CONFIG = { + 'scheduler': ('dict', { + 'cls': 'local', + 'args': { + 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', + }, + }) +} + + +app = SWHServerAPIApp(__name__) + + +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) + + +@app.before_request +def before_request(): + g.scheduler = get_scheduler(**app.config['scheduler']) + + +@app.route('/') +def index(): + return 'SWH Scheduler API server' + + +@app.route('/create_task_type', methods=['POST']) +def create_task_type(): + return encode_data(g.scheduler.create_task_type(**decode_request(request))) + + +@app.route('/get_task_type', methods=['POST']) +def get_task_type(): + return encode_data(g.scheduler.get_task_type(**decode_request(request))) + + +@app.route('/get_task_types', methods=['POST']) +def get_task_types(): + return encode_data(g.scheduler.get_task_types(**decode_request(request))) + + +@app.route('/create_tasks', methods=['POST']) +def create_tasks(): + return encode_data(g.scheduler.create_tasks(**decode_request(request))) + + +@app.route('/disable_tasks', methods=['POST']) +def disable_tasks(): + return encode_data(g.scheduler.disable_tasks(**decode_request(request))) + + +@app.route('/get_tasks', methods=['POST']) +def get_tasks(): + return encode_data(g.scheduler.get_tasks(**decode_request(request))) + + +@app.route('/peek_ready_tasks', methods=['POST']) +def peek_ready_tasks(): + return encode_data(g.scheduler.peek_ready_tasks(**decode_request(request))) + + +@app.route('/grab_ready_tasks', methods=['POST']) +def grab_ready_tasks(): + return encode_data(g.scheduler.grab_ready_tasks(**decode_request(request))) + + +@app.route('/schedule_task_run', methods=['POST']) +def schedule_task_run(): + return encode_data(g.scheduler.schedule_task_run( + **decode_request(request))) + + +@app.route('/mass_schedule_task_runs', methods=['POST']) +def mass_schedule_task_runs(): + return encode_data( + g.scheduler.mass_schedule_task_runs(**decode_request(request))) + + +@app.route('/start_task_run', methods=['POST']) +def start_task_run(): + return encode_data(g.scheduler.start_task_run(**decode_request(request))) + + +@app.route('/end_task_run', methods=['POST']) +def end_task_run(): + return encode_data(g.scheduler.end_task_run(**decode_request(request))) + + +@app.route('/filter_task_to_archive', methods=['POST']) +def filter_task_to_archive(): + return encode_data( + g.scheduler.filter_task_to_archive(**decode_request(request))) + + +@app.route('/delete_archived_tasks', methods=['POST']) +def delete_archived_tasks(): + return encode_data( + g.scheduler.delete_archived_tasks(**decode_request(request))) + + +def run_from_webserver(environ, start_response, + config_path=DEFAULT_CONFIG_PATH): + """Run the WSGI app from the webserver, loading the configuration.""" + cfg = config.load_named_config(config_path, DEFAULT_CONFIG) + app.config.update(cfg) + handler = logging.StreamHandler() + app.logger.addHandler(handler) + return app(environ, start_response) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--host', default='0.0.0.0', + help="Host to run the scheduler server api") +@click.option('--port', default=5008, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") +def launch(config_path, host, port, debug): + app.config.update(config.read(config_path, DEFAULT_CONFIG)) + app.run(host, port=port, debug=bool(debug)) + + +if __name__ == '__main__': + launch()