diff --git a/PKG-INFO b/PKG-INFO index ffc6c79..204899c 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.38 +Version: 0.0.39 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler -Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everything should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index ffc6c79..204899c 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.38 +Version: 0.0.39 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler -Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everything should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index 910cb10..4a3dbbc 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,55 +1,54 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt bin/swh-worker-control swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/sql/30-swh-schema.sql swh/scheduler/sql/40-swh-data.sql swh/scheduler/sql/updater/10-swh-init.sql swh/scheduler/sql/updater/30-swh-schema.sql swh/scheduler/sql/updater/40-swh-func.sql swh/scheduler/tests/__init__.py -swh/scheduler/tests/celery_testing.py -swh/scheduler/tests/scheduler_testing.py +swh/scheduler/tests/conftest.py +swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py -swh/scheduler/tests/test_fixtures.py +swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_scheduler.py -swh/scheduler/tests/test_task.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/updater/__init__.py swh/scheduler/tests/updater/test_backend.py swh/scheduler/tests/updater/test_consumer.py swh/scheduler/tests/updater/test_events.py swh/scheduler/tests/updater/test_ghtorrent.py swh/scheduler/tests/updater/test_writer.py swh/scheduler/updater/__init__.py swh/scheduler/updater/backend.py swh/scheduler/updater/consumer.py swh/scheduler/updater/events.py swh/scheduler/updater/writer.py swh/scheduler/updater/ghtorrent/__init__.py swh/scheduler/updater/ghtorrent/cli.py swh/scheduler/updater/ghtorrent/fake.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index dd2c8d5..c7d5f70 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,14 +1,15 @@ arrow celery>=4 Click elasticsearch>5.4 flask kombu psycopg2 vcversioner swh.core>=0.0.48 [testing] hypothesis -pytest +pytest<4 +pytest-postgresql celery>=4 diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py index 87ba5d7..aae1885 100644 --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -1,57 +1,58 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Percentage of tasks with priority to schedule PRIORITY_SLOT = 0.6 def compute_nb_tasks_from(num_tasks): """Compute and returns the tuple, number of tasks without priority, number of tasks with priority. Args: num_tasks (int): Returns: - tuple number of tasks without priority, number of tasks with - priority + tuple number of tasks without priority (int), number of tasks with + priority (int) """ if not num_tasks: return None, None - return (1 - PRIORITY_SLOT) * num_tasks, PRIORITY_SLOT * num_tasks + return (int((1 - PRIORITY_SLOT) * num_tasks), + int(PRIORITY_SLOT * num_tasks)) def get_scheduler(cls, args={}): """ Get a scheduler object of class `scheduler_class` with arguments `scheduler_args`. Args: scheduler (dict): dictionary with keys: cls (str): scheduler's class, either 'local' or 'remote' args (dict): dictionary with keys, default to empty. Returns: an instance of swh.scheduler, either local or remote: local: swh.scheduler.backend.SchedulerBackend remote: swh.scheduler.api.client.RemoteScheduler Raises: ValueError if passed an unknown storage class. """ if cls == 'remote': from .api.client import RemoteScheduler as SchedulerBackend elif cls == 'local': from .backend import SchedulerBackend else: raise ValueError('Unknown swh.scheduler class `%s`' % cls) return SchedulerBackend(**args) diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py index dbb9641..d452e97 100644 --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -1,108 +1,116 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import SWHRemoteAPI class SchedulerAPIError(Exception): """Specific internal scheduler api issue (mainly connection) """ def __str__(self): args = self.args return 'An unexpected error occurred in the api backend: %s' % args class RemoteScheduler(SWHRemoteAPI): """Proxy to a remote scheduler API """ def __init__(self, url, timeout=None): super().__init__( api_exception=SchedulerAPIError, url=url, timeout=timeout) def close_connection(self): return self.post('close_connection', {}) - def set_status_tasks(self, task_ids, status='disabled'): - return self.post('set_status_tasks', {'task_ids': task_ids, - 'status': status}) + def set_status_tasks(self, task_ids, status='disabled', next_run=None): + return self.post('set_status_tasks', dict( + task_ids=task_ids, status=status, next_run=next_run)) def create_task_type(self, task_type): return self.post('create_task_type', {'task_type': task_type}) def get_task_type(self, task_type_name): return self.post('get_task_type', {'task_type_name': task_type_name}) def get_task_types(self): return self.post('get_task_types', {}) def create_tasks(self, tasks): return self.post('create_tasks', {'tasks': tasks}) def disable_tasks(self, task_ids): return self.post('disable_tasks', {'task_ids': task_ids}) def get_tasks(self, task_ids): return self.post('get_tasks', {'task_ids': task_ids}) + def search_tasks(self, task_id=None, task_type=None, status=None, + priority=None, policy=None, before=None, after=None, + limit=None): + return self.post('search_tasks', dict( + task_id=task_id, task_type=task_type, status=status, + priority=priority, policy=policy, before=before, after=after, + limit=limit)) + def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None): return self.post('peek_ready_tasks', { 'task_type': task_type, 'timestamp': timestamp, 'num_tasks': num_tasks, 'num_tasks_priority': num_tasks_priority, }) def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None): return self.post('grab_ready_tasks', { 'task_type': task_type, 'timestamp': timestamp, 'num_tasks': num_tasks, 'num_tasks_priority': num_tasks_priority, }) def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): return self.post('schedule_task_run', { 'task_id': task_id, 'backend_id': backend_id, 'metadata': metadata, 'timestamp': timestamp, }) def mass_schedule_task_runs(self, task_runs): return self.post('mass_schedule_task_runs', {'task_runs': task_runs}) def start_task_run(self, backend_id, metadata=None, timestamp=None): return self.post('start_task_run', { 'backend_id': backend_id, 'metadata': metadata, 'timestamp': timestamp, }) def end_task_run(self, backend_id, status, metadata=None, timestamp=None): return self.post('end_task_run', { 'backend_id': backend_id, 'status': status, 'metadata': metadata, 'timestamp': timestamp, }) def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1): return self.post('filter_task_to_archive', { 'after_ts': after_ts, 'before_ts': before_ts, 'limit': limit, 'last_id': last_id, }) def delete_archived_tasks(self, task_ids): return self.post('delete_archived_tasks', {'task_ids': task_ids}) diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 737abb2..1229cfd 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,157 +1,148 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -import click from flask import request from swh.core import config from swh.scheduler import get_scheduler as get_scheduler_from from swh.core.api import (SWHServerAPIApp, decode_request, error_handler, encode_data_server as encode_data) DEFAULT_CONFIG_PATH = 'backend/scheduler' DEFAULT_CONFIG = { 'scheduler': ('dict', { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', }, }) } app = SWHServerAPIApp(__name__) scheduler = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) def get_sched(): global scheduler if not scheduler: scheduler = get_scheduler_from(**app.config['scheduler']) return scheduler @app.route('/') def index(): return 'SWH Scheduler API server' @app.route('/close_connection', methods=['POST']) def close_connection(): return encode_data(get_sched().close_connection()) @app.route('/set_status_tasks', methods=['POST']) def set_status_tasks(): return encode_data(get_sched().set_status_tasks(**decode_request(request))) @app.route('/create_task_type', methods=['POST']) def create_task_type(): return encode_data(get_sched().create_task_type(**decode_request(request))) @app.route('/get_task_type', methods=['POST']) def get_task_type(): return encode_data(get_sched().get_task_type(**decode_request(request))) @app.route('/get_task_types', methods=['POST']) def get_task_types(): return encode_data(get_sched().get_task_types(**decode_request(request))) @app.route('/create_tasks', methods=['POST']) def create_tasks(): return encode_data(get_sched().create_tasks(**decode_request(request))) @app.route('/disable_tasks', methods=['POST']) def disable_tasks(): return encode_data(get_sched().disable_tasks(**decode_request(request))) @app.route('/get_tasks', methods=['POST']) def get_tasks(): return encode_data(get_sched().get_tasks(**decode_request(request))) +@app.route('/search_tasks', methods=['POST']) +def search_tasks(): + return encode_data(get_sched().search_tasks(**decode_request(request))) + + @app.route('/peek_ready_tasks', methods=['POST']) def peek_ready_tasks(): return encode_data(get_sched().peek_ready_tasks(**decode_request(request))) @app.route('/grab_ready_tasks', methods=['POST']) def grab_ready_tasks(): return encode_data(get_sched().grab_ready_tasks(**decode_request(request))) @app.route('/schedule_task_run', methods=['POST']) def schedule_task_run(): return encode_data(get_sched().schedule_task_run( **decode_request(request))) @app.route('/mass_schedule_task_runs', methods=['POST']) def mass_schedule_task_runs(): return encode_data( get_sched().mass_schedule_task_runs(**decode_request(request))) @app.route('/start_task_run', methods=['POST']) def start_task_run(): return encode_data(get_sched().start_task_run(**decode_request(request))) @app.route('/end_task_run', methods=['POST']) def end_task_run(): return encode_data(get_sched().end_task_run(**decode_request(request))) @app.route('/filter_task_to_archive', methods=['POST']) def filter_task_to_archive(): return encode_data( get_sched().filter_task_to_archive(**decode_request(request))) @app.route('/delete_archived_tasks', methods=['POST']) def delete_archived_tasks(): return encode_data( get_sched().delete_archived_tasks(**decode_request(request))) def run_from_webserver(environ, start_response, config_path=DEFAULT_CONFIG_PATH): """Run the WSGI app from the webserver, loading the configuration.""" cfg = config.load_named_config(config_path, DEFAULT_CONFIG) app.config.update(cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) -@click.command() -@click.argument('config-path', required=1) -@click.option('--host', default='0.0.0.0', - help="Host to run the scheduler server api") -@click.option('--port', default=5008, type=click.INT, - help="Binding port of the server") -@click.option('--debug/--nodebug', default=True, - help="Indicates if the server should run in debug mode") -def launch(config_path, host, port, debug): - app.config.update(config.read(config_path, DEFAULT_CONFIG)) - app.run(host, port=port, debug=bool(debug)) - - if __name__ == '__main__': - launch() + print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 835c089..09171c2 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,523 +1,590 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime from functools import wraps import json import tempfile +import logging from arrow import Arrow, utcnow import psycopg2 import psycopg2.extras from psycopg2.extensions import AsIs from swh.core.config import SWHConfig +logger = logging.getLogger(__name__) + + def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except Exception: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped class DbBackend: """Mixin intended to be used within scheduling db backend classes cf. swh.scheduler.backend.SchedulerBackend, and swh.scheduler.updater.backend.SchedulerUpdaterBackend """ def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.db_conn_dsn, cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure """ cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() def close_connection(self): """Close db connection""" if self.db and not self.db.closed: self.db.close() def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) def _format_multiquery(self, query, keys, values): """Format a query with placeholders generated for multiple values""" query_keys = ', '.join(keys) placeholders = '), ('.join( [', '.join(['%s'] * len(keys))] * len(values) ) ret_values = sum([[value[key] for key in keys] for value in values], []) return ( query.format(keys=query_keys, placeholders=placeholders), ret_values, ) def copy_to(self, items, tblname, columns, default_columns={}, cursor=None, item_cb=None): def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, (datetime.datetime, Arrow)): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [] for k in columns: v = d.get(k) if not v: v = default_columns.get(k) v = escape(v) line.append(v) f.write(','.join(line)) f.write('\n') f.seek(0) cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) class SchedulerBackend(SWHConfig, DbBackend): """Backend for the Software Heritage scheduling database. """ CONFIG_BASE_FILENAME = 'scheduler' DEFAULT_CONFIG = { 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), } def __init__(self, **override_config): super().__init__() self.config = self.parse_config_file(global_config=False) self.config.update(override_config) self.db = None self.db_conn_dsn = self.config['scheduling_db'] self.reconnect() + logger.debug('SchedulerBackend config=%s' % self.config) task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay', ] @autocommit def create_task_type(self, task_type, cursor=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ + keys = [key for key in self.task_type_keys if key in task_type] query = self._format_query( """insert into task_type ({keys}) values ({placeholders})""", - self.task_type_keys, - ) - cursor.execute(query, [task_type[key] for key in self.task_type_keys]) + keys) + cursor.execute(query, [task_type[key] for key in keys]) @autocommit def get_task_type(self, task_type_name, cursor=None): """Retrieve the task type with id task_type_name""" query = self._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cursor.execute(query, (task_type_name,)) ret = cursor.fetchone() return ret @autocommit def get_task_types(self, cursor=None): query = self._format_query( "select {keys} from task_type", self.task_type_keys, ) cursor.execute(query) ret = cursor.fetchall() return ret task_create_keys = [ 'type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority' ] task_keys = task_create_keys + ['id', 'current_interval', 'status'] @autocommit def create_tasks(self, tasks, policy='recurring', cursor=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cursor.execute('select swh_scheduler_mktemp_task()') self.copy_to(tasks, 'tmp_task', self.task_create_keys, default_columns={ 'policy': policy, 'status': 'next_run_not_scheduled' }, cursor=cursor) query = self._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cursor.execute(query) return cursor.fetchall() @autocommit - def set_status_tasks(self, task_ids, status='disabled', cursor=None): - """Set the tasks' status whose ids are listed.""" + def set_status_tasks(self, task_ids, + status='disabled', next_run=None, cursor=None): + """Set the tasks' status whose ids are listed. + + If given, also set the next_run date. + """ if not task_ids: return - query = "UPDATE task SET status = %s WHERE id IN %s" - cursor.execute(query, (status, tuple(task_ids),)) - return None + query = ["UPDATE task SET status = %s"] + args = [status] + if next_run: + query.append(', next_run = %s') + args.append(next_run) + query.append(" WHERE id IN %s") + args.append(tuple(task_ids)) + + cursor.execute(''.join(query), args) @autocommit def disable_tasks(self, task_ids, cursor=None): """Disable the tasks whose ids are listed.""" return self.set_status_tasks(task_ids) + @autocommit + def search_tasks(self, task_id=None, task_type=None, status=None, + priority=None, policy=None, before=None, after=None, + limit=None, cursor=None): + """Search tasks from selected criterions""" + where = [] + args = [] + + if task_id: + if isinstance(task_id, (str, int)): + where.append('id = %s') + else: + where.append('id in %s') + task_id = tuple(task_id) + args.append(task_id) + if task_type: + if isinstance(task_type, str): + where.append('type = %s') + else: + where.append('type in %s') + task_type = tuple(task_type) + args.append(task_type) + if status: + if isinstance(status, str): + where.append('status = %s') + else: + where.append('status in %s') + status = tuple(status) + args.append(status) + if priority: + if isinstance(priority, str): + where.append('priority = %s') + else: + priority = tuple(priority) + where.append('priority in %s') + args.append(priority) + if policy: + where.append('policy = %s') + args.append(policy) + if before: + where.append('next_run <= %s') + args.append(before) + if after: + where.append('next_run >= %s') + args.append(after) + + query = 'select * from task where ' + ' and '.join(where) + if limit: + query += ' limit %s :: bigint' + args.append(limit) + cursor.execute(query, args) + return cursor.fetchall() + @autocommit def get_tasks(self, task_ids, cursor=None): """Retrieve the info of tasks whose ids are listed.""" query = self._format_query('select {keys} from task where id in %s', self.task_keys) cursor.execute(query, (tuple(task_ids),)) return cursor.fetchall() @autocommit def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, cursor=None): """Fetch the list of ready tasks Args: task_type (str): filtering task per their type timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks (with no priority) num_tasks_priority (int): only peek at num_tasks_priority tasks (with priority) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute( '''select * from swh_scheduler_peek_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)''', (task_type, timestamp, num_tasks, num_tasks_priority) ) - + logger.debug('PEEK %s => %s' % (task_type, cursor.rowcount)) return cursor.fetchall() @autocommit def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, cursor=None): """Fetch the list of ready tasks, and mark them as scheduled Args: task_type (str): filtering task per their type timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks (with no priority) num_tasks_priority (int): only grab oneshot num_tasks tasks (with priorities) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() - cursor.execute( '''select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)''', (task_type, timestamp, num_tasks, num_tasks_priority) ) - + logger.debug('GRAB %s => %s' % (task_type, cursor.rowcount)) return cursor.fetchall() task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] @autocommit def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def mass_schedule_task_runs(self, task_runs, cursor=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cursor.execute('select swh_scheduler_mktemp_task_run()') self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, cursor=cursor) cursor.execute('select swh_scheduler_schedule_task_run_from_temp()') @autocommit def start_task_run(self, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def end_task_run(self, backend_id, status, metadata=None, timestamp=None, result=None, cursor=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) return cursor.fetchone() @autocommit def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, cursor=None): """Returns the list of task/task_run prior to a given date to archive. """ last_task_run_id = None while True: row = None cursor.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit) ) for row in cursor: # nested type index does not accept bare values # transform it as a dict to comply with this row['arguments']['args'] = { i: v for i, v in enumerate(row['arguments']['args']) } kwargs = row['arguments']['kwargs'] row['arguments']['kwargs'] = json.dumps(kwargs) yield row if not row: break _id = row.get('task_id') _task_run_id = row.get('task_run_id') if last_id == _id and last_task_run_id == _task_run_id: break last_id = _id last_task_run_id = _task_run_id @autocommit def delete_archived_tasks(self, task_ids, cursor=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id['task_id']) _task_run_ids.append(task_id['task_run_id']) cursor.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids)) diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index 6bed5d0..489f4e9 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,257 +1,264 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import importlib import logging import os import urllib.parse from celery import Celery from celery.signals import setup_logging, celeryd_after_setup from celery.utils.log import ColorFormatter from celery.worker.control import Panel from kombu import Exchange, Queue from kombu.five import monotonic as _monotonic import requests from swh.scheduler.task import Task from swh.core.config import load_named_config from swh.core.logger import JournalHandler DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' CONFIG_NAME_TEMPLATE = 'worker/%s' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), + 'result_backend': ('str', 'rpc://'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ - if loglevel is None: loglevel = logging.DEBUG + if isinstance(loglevel, str): + loglevel = logging._nameToLevel[loglevel] formatter = logging.Formatter(format) root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) if loglevel == logging.DEBUG: color_formatter = ColorFormatter(format) if colorize else formatter console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) root_logger.addHandler(console) systemd_journal = JournalHandler() systemd_journal.setLevel(logging.DEBUG) systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) - celery_logger = logging.getLogger('celery') - celery_logger.setLevel(logging.INFO) - + logging.getLogger('celery').setLevel(logging.INFO) + # Silence amqp heartbeat_tick messages + logger = logging.getLogger('amqp') + logger.addFilter(lambda record: not record.msg.startswith( + 'heartbeat_tick')) + logger.setLevel(logging.DEBUG) # Silence useless "Starting new HTTP connection" messages - urllib3_logger = logging.getLogger('urllib3') - urllib3_logger.setLevel(logging.WARNING) - - swh_logger = logging.getLogger('swh') - swh_logger.setLevel(loglevel) + logging.getLogger('urllib3').setLevel(logging.WARNING) + logging.getLogger('swh').setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task - celery_task_logger = logging.getLogger('celery.task') - celery_task_logger.setLevel(loglevel) + logging.getLogger('celery.task').setLevel(loglevel) + return loglevel @celeryd_after_setup.connect def setup_queues_and_tasks(sender, instance, **kwargs): """Signal called on worker start. This automatically registers swh.scheduler.task.Task subclasses as available celery tasks. This also subscribes the worker to the "implicit" per-task queues defined for these task classes. """ for module_name in itertools.chain( # celery worker -I flag instance.app.conf['include'], # set from the celery / swh worker instance configuration file instance.app.conf['imports'], ): module = importlib.import_module(module_name) for name in dir(module): obj = getattr(module, name) if ( isinstance(obj, type) and issubclass(obj, Task) and obj != Task # Don't register the abstract class itself ): class_name = '%s.%s' % (module_name, name) - instance.app.register_task_class(class_name, obj) + register_task_class(instance.app, class_name, obj) for task_name in instance.app.tasks: if task_name.startswith('swh.'): instance.app.amqp.queues.select_add(task_name) @Panel.register def monotonic(state): """Get the current value for the monotonic clock""" return {'monotonic': _monotonic()} -class TaskRouter: +def route_for_task(name, args, kwargs, options, task=None, **kw): """Route tasks according to the task_queue attribute in the task class""" - def route_for_task(self, task, *args, **kwargs): - if task.startswith('swh.'): - return {'queue': task} - - -class CustomCelery(Celery): - def get_queue_stats(self, queue_name): - """Get the statistics regarding a queue on the broker. - - Arguments: - queue_name: name of the queue to check - - Returns a dictionary raw from the RabbitMQ management API; - or `None` if the current configuration does not use RabbitMQ. - - Interesting keys: - - consumers (number of consumers for the queue) - - messages (number of messages in queue) - - messages_unacknowledged (number of messages currently being - processed) - - Documentation: https://www.rabbitmq.com/management.html#http-api - """ - - conn_info = self.connection().info() - if conn_info['transport'] == 'memory': - # We're running in a test environment, without RabbitMQ. - return None - url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( - hostname=conn_info['hostname'], - port=conn_info['port'] + 10000, - vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), - queue=urllib.parse.quote(queue_name, safe=''), - ) - credentials = (conn_info['userid'], conn_info['password']) - r = requests.get(url, auth=credentials) - if r.status_code == 404: - return {} - if r.status_code != 200: - raise ValueError('Got error %s when reading queue stats: %s' % ( - r.status_code, r.json())) - return r.json() - - def get_queue_length(self, queue_name): - """Shortcut to get a queue's length""" - stats = self.get_queue_stats(queue_name) - if stats: - return stats.get('messages') - - def register_task_class(self, name, cls): - """Register a class-based task under the given name""" - if name in self.tasks: - return - - task_instance = cls() - task_instance.name = name - self.register_task(task_instance) + if name is not None and name.startswith('swh.'): + return {'queue': name} + + +def get_queue_stats(app, queue_name): + """Get the statistics regarding a queue on the broker. + + Arguments: + queue_name: name of the queue to check + + Returns a dictionary raw from the RabbitMQ management API; + or `None` if the current configuration does not use RabbitMQ. + + Interesting keys: + - consumers (number of consumers for the queue) + - messages (number of messages in queue) + - messages_unacknowledged (number of messages currently being + processed) + + Documentation: https://www.rabbitmq.com/management.html#http-api + """ + + conn_info = app.connection().info() + if conn_info['transport'] == 'memory': + # We're running in a test environment, without RabbitMQ. + return None + url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( + hostname=conn_info['hostname'], + port=conn_info['port'] + 10000, + vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), + queue=urllib.parse.quote(queue_name, safe=''), + ) + credentials = (conn_info['userid'], conn_info['password']) + r = requests.get(url, auth=credentials) + if r.status_code == 404: + return {} + if r.status_code != 200: + raise ValueError('Got error %s when reading queue stats: %s' % ( + r.status_code, r.json())) + return r.json() + + +def get_queue_length(app, queue_name): + """Shortcut to get a queue's length""" + stats = get_queue_stats(app, queue_name) + if stats: + return stats.get('messages') + + +def register_task_class(app, name, cls): + """Register a class-based task under the given name""" + if name in app.tasks: + return + + task_instance = cls() + task_instance.name = name + app.register_task(task_instance) INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) if INSTANCE_NAME: CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME else: CONFIG_NAME = DEFAULT_CONFIG_NAME # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) -# Instantiate the Celery app -app = CustomCelery() -app.conf.update( - # The broker - broker_url=CONFIG['task_broker'], +CELERY_DEFAULT_CONFIG = dict( # Timezone configuration: all in UTC enable_utc=True, timezone='UTC', # Imported modules imports=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. result_expires=None, # A string identifying the default serialization method to use. Can # be json (default), pickle, yaml, msgpack, or any custom # serialization methods that have been registered with task_serializer='msgpack', # Result serialization format result_serializer='msgpack', # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. task_acks_late=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry accept_content=['msgpack', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. task_track_started=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # result_compression='bzip2', # task_compression='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) worker_disable_rate_limits=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # task_time_limit=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. task_soft_time_limit=CONFIG['task_soft_time_limit'], # Task routing - task_routes=TaskRouter(), + task_routes=route_for_task, # Task queues this worker will consume from task_queues=CELERY_QUEUES, # Allow pool restarts from remote worker_pool_restarts=True, # Do not prefetch tasks worker_prefetch_multiplier=1, # Send events worker_send_task_events=True, # Do not send useless task_sent events task_send_sent_event=False, -) + ) + +# Instantiate the Celery app +app = Celery(broker=CONFIG['task_broker'], + backend=CONFIG['result_backend'], + task_cls='swh.scheduler.task:SWHTask') +app.add_defaults(CELERY_DEFAULT_CONFIG) + +# XXX for BW compat +Celery.get_queue_length = get_queue_length diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index 20cd310..0a8b17b 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -1,203 +1,207 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import datetime import logging import socket from arrow import utcnow from kombu import Queue from celery.events import EventReceiver from swh.scheduler import get_scheduler -from .config import app as main_app +from .config import setup_log_handler, app as main_app class ReliableEventReceiver(EventReceiver): def __init__(self, channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix='celeryev', accept=None): super(ReliableEventReceiver, self).__init__( channel, handlers, routing_key, node_id, app, queue_prefix, accept) self.queue = Queue('.'.join([self.queue_prefix, self.node_id]), exchange=self.exchange, routing_key=self.routing_key, auto_delete=False, durable=True) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], callbacks=[self._receive], no_ack=False, accept=self.accept)] def _receive(self, bodies, message): logging.debug('## event-receiver: bodies: %s' % bodies) logging.debug('## event-receiver: message: %s' % message) if not isinstance(bodies, list): # celery<4 returned body as element bodies = [bodies] for body in bodies: type, body = self.event_from_message(body) self.process(type, body, message) def process(self, type, event, message): """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') logging.debug('## event-receiver: type: %s' % type) logging.debug('## event-receiver: event: %s' % event) logging.debug('## event-receiver: handler: %s' % handler) handler and handler(event, message) ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) ACTION_QUEUE_MAX_LENGTH = 1000 def event_monitor(app, backend): + logger = logging.getLogger('swh.scheduler.listener') actions = { 'last_send': utcnow() - 2*ACTION_SEND_DELAY, 'queue': [], } def try_perform_actions(actions=actions): - if not actions['queue']: - return - if utcnow() - actions['last_send'] > ACTION_SEND_DELAY or \ - len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH: + logger.debug('Try perform pending actions') + if actions['queue'] and ( + len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH or + utcnow() - actions['last_send'] > ACTION_SEND_DELAY): perform_actions(actions) def perform_actions(actions, backend=backend): + logger.info('Perform %s pending actions' % len(actions['queue'])) action_map = { 'start_task_run': backend.start_task_run, 'end_task_run': backend.end_task_run, } messages = [] cursor = backend.cursor() for action in actions['queue']: messages.append(action['message']) function = action_map[action['action']] args = action.get('args', ()) kwargs = action.get('kwargs', {}) kwargs['cursor'] = cursor function(*args, **kwargs) backend.commit() for message in messages: message.ack() actions['queue'] = [] actions['last_send'] = utcnow() def queue_action(action, actions=actions): actions['queue'].append(action) try_perform_actions() def catchall_event(event, message): message.ack() try_perform_actions() def task_started(event, message): - logging.debug('#### task_started: event: %s' % event) - logging.debug('#### task_started: message: %s' % message) + logger.debug('task_started: event: %s' % event) + logger.debug('task_started: message: %s' % message) queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'metadata': { 'worker': event['hostname'], }, }, 'message': message, }) def task_succeeded(event, message): - logging.debug('#### task_succeeded: event: %s' % event) - logging.debug('#### task_succeeded: message: %s' % message) + logger.debug('task_succeeded: event: %s' % event) + logger.debug('task_succeeded: message: %s' % message) result = event['result'] - logging.debug('#### task_succeeded: result: %s' % result) + logger.debug('task_succeeded: result: %s' % result) try: status = result.get('status') if status == 'success': status = 'eventful' if result.get('eventful') else 'uneventful' except Exception: status = 'eventful' if result else 'uneventful' queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': status, 'result': result, }, 'message': message, }) def task_failed(event, message): - logging.debug('#### task_failed: event: %s' % event) - logging.debug('#### task_failed: message: %s' % message) + logger.debug('task_failed: event: %s' % event) + logger.debug('task_failed: message: %s' % message) queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': 'failed', }, 'message': message, }) recv = ReliableEventReceiver( main_app.connection(), app=main_app, handlers={ 'task-started': task_started, 'task-result': task_succeeded, 'task-failed': task_failed, '*': catchall_event, }, node_id='listener-%s' % socket.gethostname(), ) recv.capture(limit=None, timeout=None, wakeup=True) @click.command() @click.option('--cls', '-c', default='local', help="Scheduler's class, default to 'local'") @click.option( '--database', '-d', help='Scheduling database DSN') @click.option('--url', '-u', help="(Optional) Scheduler's url access") -@click.option('--verbose', is_flag=True, default=False, - help='Default to be silent') -def main(cls, database, url, verbose): - if verbose: - logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) +@click.option('--log-level', '-l', default='INFO', + type=click.Choice(logging._nameToLevel.keys()), + help='Log level (default to INFO)') +def main(cls, database, url, log_level): + setup_log_handler(loglevel=log_level, colorize=False, + format='[%(levelname)s] %(name)s -- %(message)s') + # logging.basicConfig(level=level) scheduler = None override_config = {} if cls == 'local': if database: override_config = {'scheduling_db': database} scheduler = get_scheduler(cls, args=override_config) elif cls == 'remote': if url: override_config = {'url': url} scheduler = get_scheduler(cls, args=override_config) if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') event_monitor(main_app, backend=scheduler) if __name__ == '__main__': main() diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index ff9ae6c..b6586e4 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,111 +1,115 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow from swh.scheduler import get_scheduler, compute_nb_tasks_from from .config import app as main_app # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks(backend, app): """Run tasks that are ready Args: backend (Scheduler): backend to read tasks to schedule app (App): Celery application to send tasks to Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': arrow.utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks = [] while True: cursor = backend.cursor() task_types = {} pending_tasks = [] for task_type in backend.get_task_types(cursor=cursor): task_type_name = task_type['type'] task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] backend_name = task_type['backend_name'] if max_queue_length: try: queue_length = app.get_queue_length(backend_name) except ValueError: queue_length = None if queue_length is None: # Running without RabbitMQ (probably a test env). num_tasks = MAX_NUM_TASKS else: num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS if num_tasks > 0: num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) pending_tasks.extend( backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority, cursor=cursor)) if not pending_tasks: return all_backend_tasks backend_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] backend_name = task_types[task['type']]['backend_name'] celery_result = app.send_task( backend_name, args=args, kwargs=kwargs, ) data = { 'task': task['id'], 'backend_id': celery_result.id, 'scheduled': arrow.utcnow(), } backend_tasks.append(data) backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) backend.commit() all_backend_tasks.extend(backend_tasks) -if __name__ == '__main__': +def main(): for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler('local') try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise + + +if __name__ == '__main__': + main() diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py index f806071..d622b52 100644 --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -1,363 +1,634 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import click import csv import itertools import json import locale import logging +import time -from swh.core import utils +from swh.core import utils, config from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient +from . import get_scheduler locale.setlocale(locale.LC_ALL, '') ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] class DateTimeType(click.ParamType): name = 'time and date' def convert(self, value, param, ctx): if not isinstance(value, arrow.Arrow): value = arrow.get(value) return value DATETIME = DateTimeType() CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) def pretty_print_list(list, indent): """Pretty-print a list""" return ''.join('%s%s\n' % (' ' * indent, item) for item in list) def pretty_print_dict(dict, indent): """Pretty-print a list""" return ''.join('%s%s: %s\n' % (' ' * indent, click.style(key, bold=True), value) for key, value in dict.items()) -def pretty_print_task(task): - """Pretty-print a task""" +def pretty_print_task(task, full=False): + """Pretty-print a task + + If 'full' is True, also print the status and priority fields. + """ next_run = arrow.get(task['next_run']) lines = [ '%s %s\n' % (click.style('Task', bold=True), task['id']), click.style(' Next run: ', bold=True), "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()), '\n', click.style(' Interval: ', bold=True), str(task['current_interval']), '\n', - click.style(' Type: ', bold=True), task['type'], '\n', - click.style(' Policy: ', bold=True), task['policy'], '\n', + click.style(' Type: ', bold=True), task['type'] or '', '\n', + click.style(' Policy: ', bold=True), task['policy'] or '', '\n', + ] + if full: + lines += [ + click.style(' Status: ', bold=True), + task['status'] or '', '\n', + click.style(' Priority: ', bold=True), + task['priority'] or '', '\n', + ] + lines += [ click.style(' Args:\n', bold=True), pretty_print_list(task['arguments']['args'], indent=4), click.style(' Keyword args:\n', bold=True), pretty_print_dict(task['arguments']['kwargs'], indent=4), ] return ''.join(lines) -def list_task_types(ctx, param, value): - if not value or ctx.resilient_parsing: - return - click.echo("Known task types:") - for tasktype in ctx.obj.get_task_types(): - click.echo('{type}:\n {description}'.format(**tasktype)) - ctx.exit() - - @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--cls', '-c', default='local', + type=click.Choice(['local', 'remote']), help="Scheduler's class, default to 'local'") @click.option('--database', '-d', - help='Scheduling database DSN') + help="Scheduling database DSN (if cls is 'local')") @click.option('--url', '-u', - help="(Optional) Scheduler's url access") + help="Scheduler's url access (if cls is 'remote')") +@click.option('--log-level', '-l', default='INFO', + type=click.Choice(logging._nameToLevel.keys()), + help="Log level (default to INFO)") @click.pass_context -def cli(ctx, cls, database, url): +def cli(ctx, cls, database, url, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ + from swh.scheduler.celery_backend.config import setup_log_handler + log_level = setup_log_handler( + loglevel=log_level, colorize=False, + format='[%(levelname)s] %(name)s -- %(message)s') + + ctx.ensure_object(dict) + + logger = logging.getLogger(__name__) scheduler = None override_config = {} - from . import get_scheduler - if cls == 'local': - if database: + try: + if cls == 'local' and database: override_config = {'scheduling_db': database} - scheduler = get_scheduler(cls, args=override_config) - elif cls == 'remote': - if url: + elif cls == 'remote' and url: override_config = {'url': url} + logger.debug('Instanciating scheduler %s with %s' % ( + cls, override_config)) scheduler = get_scheduler(cls, args=override_config) + except Exception: + # it's the subcommand to decide whether not having a proper + # scheduler instance is a problem. + pass - if not scheduler: - raise ValueError('Scheduler class (local/remote) must be instantiated') - - ctx.obj = scheduler + ctx.obj['scheduler'] = scheduler + ctx.obj['config'] = {'cls': cls, 'args': override_config} + ctx.obj['loglevel'] = log_level @cli.group('task') -@click.option('--list-types', '-l', is_flag=True, default=False, is_eager=True, - expose_value=False, callback=list_task_types) @click.pass_context def task(ctx): """Manipulate tasks.""" pass @task.command('schedule') @click.option('--columns', '-c', multiple=True, default=['type', 'args', 'kwargs', 'next_run'], type=click.Choice([ 'type', 'args', 'kwargs', 'policy', 'next_run']), help='columns present in the CSV file') @click.option('--delimiter', '-d', default=',') @click.argument('file', type=click.File(encoding='utf-8')) @click.pass_context def schedule_tasks(ctx, columns, delimiter, file): """Schedule tasks from a CSV input file. The following columns are expected, and can be set through the -c option: - type: the type of the task to be scheduled (mandatory) - args: the arguments passed to the task (JSON list, defaults to an empty list) - kwargs: the keyword arguments passed to the task (JSON object, defaults to an empty dict) - next_run: the date at which the task should run (datetime, defaults to now) The CSV can be read either from a named file, or from stdin (use - as filename). Use sample: cat scheduling-task.txt | \ python3 -m swh.scheduler.cli \ --database 'service=swh-scheduler-dev' \ task schedule \ --columns type --columns kwargs --columns policy \ --delimiter ';' - """ tasks = [] now = arrow.utcnow() + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') reader = csv.reader(file, delimiter=delimiter) for line in reader: task = dict(zip(columns, line)) args = json.loads(task.pop('args', '[]')) kwargs = json.loads(task.pop('kwargs', '{}')) task['arguments'] = { 'args': args, 'kwargs': kwargs, } task['next_run'] = DATETIME.convert(task.get('next_run', now), None, None) tasks.append(task) - created = ctx.obj.create_tasks(tasks) + created = scheduler.create_tasks(tasks) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager('\n'.join(output)) @task.command('add') @click.argument('type', nargs=1, required=True) @click.argument('options', nargs=-1) @click.option('--policy', '-p', default='recurring', type=click.Choice(['recurring', 'oneshot'])) +@click.option('--priority', '-P', default=None, + type=click.Choice(['low', 'normal', 'high'])) @click.option('--next-run', '-n', default=None) @click.pass_context -def schedule_task(ctx, type, options, policy, next_run): +def schedule_task(ctx, type, options, policy, priority, next_run): """Schedule one task from arguments. - Use sample: + Usage sample: swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-pypi swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-debian --policy=oneshot distribution=stretch + Note: if the priority is not given, the task won't have the priority set, + which is considered as the lowest priority level. """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + now = arrow.utcnow() args = [x for x in options if '=' not in x] kw = dict(x.split('=', 1) for x in options if '=' in x) task = {'type': type, 'policy': policy, + 'priority': priority, 'arguments': { 'args': args, 'kwargs': kw, }, 'next_run': DATETIME.convert(next_run or now, None, None), } - created = ctx.obj.create_tasks([task]) + created = scheduler.create_tasks([task]) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('list-pending') @click.argument('task-types', required=True, nargs=-1) @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch') @click.option('--before', '-b', required=False, type=DATETIME, help='List all jobs supposed to run before the given date') @click.pass_context def list_pending_tasks(ctx, task_types, limit, before): """List the tasks that are going to be run. You can override the number of tasks to fetch """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) output = [] for task_type in task_types: - pending = ctx.obj.peek_ready_tasks( + pending = scheduler.peek_ready_tasks( task_type, timestamp=before, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) output.append('Found %d %s tasks\n' % ( len(pending), task_type)) for task in pending: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) +@task.command('list') +@click.option('--task-id', '-i', default=None, multiple=True, metavar='ID', + help='List only tasks whose id is ID.') +@click.option('--task-type', '-t', default=None, multiple=True, metavar='TYPE', + help='List only tasks of type TYPE') +@click.option('--limit', '-l', required=False, type=click.INT, + help='The maximum number of tasks to fetch.') +@click.option('--status', '-s', multiple=True, metavar='STATUS', + default=None, + help='List tasks whose status is STATUS.') +@click.option('--policy', '-p', default=None, + type=click.Choice(['recurring', 'oneshot']), + help='List tasks whose policy is POLICY.') +@click.option('--priority', '-P', default=None, multiple=True, + type=click.Choice(['all', 'low', 'normal', 'high']), + help='List tasks whose priority is PRIORITY.') +@click.option('--before', '-b', required=False, type=DATETIME, + metavar='DATETIME', + help='Limit to tasks supposed to run before the given date.') +@click.option('--after', '-a', required=False, type=DATETIME, + metavar='DATETIME', + help='Limit to tasks supposed to run after the given date.') +@click.pass_context +def list_tasks(ctx, task_id, task_type, limit, status, policy, priority, + before, after): + """List tasks. + """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + + if not task_type: + task_type = [x['type'] for x in scheduler.get_task_types()] + + # if task_id is not given, default value for status is + # 'next_run_not_scheduled' + # if task_id is given, default status is 'all' + if task_id is None and status is None: + status = ['next_run_not_scheduled'] + if status and 'all' in status: + status = None + + if priority and 'all' in priority: + priority = None + + output = [] + tasks = scheduler.search_tasks( + task_id=task_id, + task_type=task_type, + status=status, priority=priority, policy=policy, + before=before, after=after, + limit=limit) + + output.append('Found %d tasks\n' % ( + len(tasks))) + for task in tasks: + output.append(pretty_print_task(task, full=True)) + + click.echo('\n'.join(output)) + + +@task.command('respawn') +@click.argument('task-ids', required=True, nargs=-1) +@click.option('--next-run', '-n', required=False, type=DATETIME, + metavar='DATETIME', default=None, + help='Re spawn the selected tasks at this date') +@click.pass_context +def respawn_tasks(ctx, task_ids, next_run): + """Respawn tasks. + + Respawn tasks given by their ids (see the 'task list' command to + find task ids) at the given date (immediately by default). + + Eg. + + swh-scheduler task respawn 1 3 12 + """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + if next_run is None: + next_run = arrow.utcnow() + output = [] + + scheduler.set_status_tasks( + task_ids, status='next_run_not_scheduled', next_run=next_run) + output.append('Respawn tasks %s\n' % ( + task_ids)) + + click.echo('\n'.join(output)) + + @task.command('archive') @click.option('--before', '-b', default=None, help='''Task whose ended date is anterior will be archived. Default to current month's first day.''') @click.option('--after', '-a', default=None, help='''Task whose ended date is after the specified date will be archived. Default to prior month's first day.''') @click.option('--batch-index', default=1000, type=click.INT, help='Batch size of tasks to read from db to archive') @click.option('--bulk-index', default=200, type=click.INT, help='Batch size of tasks to bulk index') @click.option('--batch-clean', default=1000, type=click.INT, help='Batch size of task to clean after archival') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--verbose', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--cleanup/--no-cleanup', is_flag=True, default=True, help='Clean up archived tasks (default)') @click.option('--start-from', type=click.INT, default=-1, help='(Optional) default task id to start from. Default is -1.') @click.pass_context def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, dry_run, verbose, cleanup, start_from): """Archive task/task_run whose (task_type is 'oneshot' and task_status is 'completed') or (task_type is 'recurring' and task_status is 'disabled'). With --dry-run flag set (default), only list those. """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + es_client = SWHElasticSearchClient() logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') logging.getLogger('urllib3').setLevel(logging.WARN) logging.getLogger('elasticsearch').setLevel(logging.WARN) if dry_run: log.info('**DRY-RUN** (only reading db)') if not cleanup: log.info('**NO CLEANUP**') now = arrow.utcnow() # Default to archive tasks from a rolling month starting the week # prior to the current one if not before: before = now.shift(weeks=-1).format('YYYY-MM-DD') if not after: after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( not dry_run, not dry_run and cleanup, after, before)) def group_by_index_name(data, es_client=es_client): """Given a data record, determine the index's name through its ending date. This varies greatly depending on the task_run's status. """ date = data.get('started') if not date: date = data['scheduled'] return es_client.compute_index_name(date.year, date.month) - def index_data(before, last_id, batch_index, backend=ctx.obj): - tasks_in = backend.filter_task_to_archive( + def index_data(before, last_id, batch_index): + tasks_in = scheduler.filter_task_to_archive( after, before, last_id=last_id, limit=batch_index) for index_name, tasks_group in itertools.groupby( tasks_in, key=group_by_index_name): log.debug('Index tasks to %s' % index_name) if dry_run: for task in tasks_group: yield task continue yield from es_client.streaming_bulk( index_name, tasks_group, source=['task_id', 'task_run_id'], chunk_size=bulk_index, log=log) gen = index_data(before, last_id=start_from, batch_index=batch_index) if cleanup: for task_ids in utils.grouper(gen, n=batch_clean): task_ids = list(task_ids) log.info('Clean up %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) if dry_run: # no clean up continue - ctx.obj.delete_archived_tasks(task_ids) + ctx.obj['scheduler'].delete_archived_tasks(task_ids) else: for task_ids in utils.grouper(gen, n=batch_index): task_ids = list(task_ids) log.info('Indexed %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) -@cli.group('task-run') +@cli.command('runner') +@click.option('--period', '-p', default=0, + help=('Period (in s) at witch pending tasks are checked and ' + 'executed. Set to 0 (default) for a one shot.')) +@click.pass_context +def runner(ctx, period): + """Starts a swh-scheduler runner service. + + This process is responsible for checking for ready-to-run tasks and + schedule them.""" + from swh.scheduler.celery_backend.runner import run_ready_tasks + from swh.scheduler.celery_backend.config import app + + logger = logging.getLogger(__name__ + '.runner') + scheduler = ctx.obj['scheduler'] + logger.debug('Scheduler %s' % scheduler) + try: + while True: + logger.info('Run ready tasks') + try: + run_ready_tasks(scheduler, app) + except Exception: + scheduler.rollback() + logger.exception('Unexpected error in run_ready_tasks()') + if not period: + break + time.sleep(period) + except KeyboardInterrupt: + ctx.exit(0) + + +@cli.command('listener') +@click.pass_context +def listener(ctx): + """Starts a swh-scheduler listener service. + + This service is responsible for listening at task lifecycle events and + handle their workflow status in the database.""" + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + + from swh.scheduler.celery_backend.listener import ( + event_monitor, main_app) + event_monitor(main_app, backend=scheduler) + + +@cli.command('api-server') +@click.argument('config-path', required=1) +@click.option('--host', default='0.0.0.0', + help="Host to run the scheduler server api") +@click.option('--port', default=5008, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=None, + help=("Indicates if the server should run in debug mode. " + "Defaults to True if log-level is DEBUG, False otherwise.") + ) +@click.pass_context +def api_server(ctx, config_path, host, port, debug): + """Starts a swh-scheduler API HTTP server. + """ + if ctx.obj['config']['cls'] == 'remote': + click.echo("The API server can only be started with a 'local' " + "configuration", err=True) + ctx.exit(1) + + from swh.scheduler.api.server import app, DEFAULT_CONFIG + conf = config.read(config_path, DEFAULT_CONFIG) + if ctx.obj['config']['args']: + conf['scheduler']['args'].update(ctx.obj['config']['args']) + app.config.update(conf) + if debug is None: + debug = ctx.obj['loglevel'] <= logging.DEBUG + + app.run(host, port=port, debug=bool(debug)) + + +@cli.group('task-type') @click.pass_context -def task_run(ctx): - """Manipulate task runs.""" +def task_type(ctx): + """Manipulate task types.""" pass +@task_type.command('list') +@click.option('--verbose', '-v', is_flag=True, default=False) +@click.option('--task_type', '-t', multiple=True, default=None, + help='List task types of given type') +@click.option('--task_name', '-n', multiple=True, default=None, + help='List task types of given backend task name') +@click.pass_context +def list_task_types(ctx, verbose, task_type, task_name): + click.echo("Known task types:") + if verbose: + tmpl = click.style('{type}: ', bold=True) + '''{backend_name} + {description} + interval: {default_interval} [{min_interval}, {max_interval}] + backoff_factor: {backoff_factor} + max_queue_length: {max_queue_length} + num_retries: {num_retries} + retry_delay: {retry_delay} +''' + else: + tmpl = '{type}:\n {description}' + for tasktype in ctx.obj['scheduler'].get_task_types(): + if task_type and tasktype['type'] not in task_type: + continue + if task_name and tasktype['backend_name'] not in task_name: + continue + click.echo(tmpl.format(**tasktype)) + + +@task_type.command('add') +@click.argument('type', required=1) +@click.argument('task-name', required=1) +@click.argument('description', required=1) +@click.option('--default-interval', '-i', default='90 days', + help='Default interval ("90 days" by default)') +@click.option('--min-interval', default=None, + help='Minimum interval (default interval if not set)') +@click.option('--max-interval', '-i', default=None, + help='Maximal interval (default interval if not set)') +@click.option('--backoff-factor', '-f', type=float, default=1, + help='Backoff factor') +@click.pass_context +def add_task_type(ctx, type, task_name, description, + default_interval, min_interval, max_interval, + backoff_factor): + """Create a new task type + """ + scheduler = ctx.obj['scheduler'] + if not scheduler: + raise ValueError('Scheduler class (local/remote) must be instantiated') + task_type = dict( + type=type, + backend_name=task_name, + description=description, + default_interval=default_interval, + min_interval=min_interval, + max_interval=max_interval, + backoff_factor=backoff_factor, + max_queue_length=None, + num_retries=None, + retry_delay=None, + ) + scheduler.create_task_type(task_type) + click.echo('OK') + + if __name__ == '__main__': cli() diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index 0bea878..3d16510 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,50 +1,62 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import celery.app.task from celery.utils.log import get_task_logger -class Task(celery.app.task.Task): +class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) + Current implementation is based on Celery. See + http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for + how to use tasks once instantiated + + """ + + _log = None + + def on_failure(self, exc, task_id, args, kwargs, einfo): + self.send_event('task-result-exception') + + def on_success(self, retval, task_id, args, kwargs): + self.send_event('task-result', result=retval) + + @property + def log(self): + if self._log is None: + self._log = get_task_logger(self.name) + return self._log + + +class Task(SWHTask): + """a schedulable task (abstract class) + + DEPRECATED! Please use SWHTask as base for decorated functions instead. + Sub-classes must implement the run_task() method. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True def run(self, *args, **kwargs): """This method is called by the celery worker when a task is received. Should not be overridden as we need our special events to be sent for the reccurrent scheduler. Override run_task instead.""" - try: - result = self.run_task(*args, **kwargs) - except Exception as e: - self.send_event('task-result-exception') - raise e from None - else: - self.send_event('task-result', result=result) - return result + return self.run_task(*args, **kwargs) def run_task(self, *args, **kwargs): """Perform the task. Must return a json-serializable value as it is passed back to the task scheduler using a celery event. """ raise NotImplementedError('tasks must implement the run_task() method') - - @property - def log(self): - if not hasattr(self, '__log'): - self.__log = get_task_logger('%s.%s' % - (__name__, self.__class__.__name__)) - return self.__log diff --git a/swh/scheduler/tests/celery_testing.py b/swh/scheduler/tests/celery_testing.py deleted file mode 100644 index acc8ad6..0000000 --- a/swh/scheduler/tests/celery_testing.py +++ /dev/null @@ -1,18 +0,0 @@ -import os - - -def setup_celery(): - os.environ.setdefault('CELERY_BROKER_URL', 'memory://') - os.environ.setdefault('CELERY_RESULT_BACKEND', 'cache+memory://') - - -class CeleryTestFixture: - """Mix this in a test subject class to setup Celery config for testing - purpose. - - Can be overriden by CELERY_BROKER_URL and CELERY_RESULT_BACKEND env vars. - """ - - def setUp(self): - setup_celery() - super().setUp() diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py new file mode 100644 index 0000000..0394053 --- /dev/null +++ b/swh/scheduler/tests/conftest.py @@ -0,0 +1,84 @@ +import os +import pytest +import glob +from datetime import timedelta + +from swh.core.utils import numfile_sortkey as sortkey +from swh.scheduler import get_scheduler +from swh.scheduler.tests import SQL_DIR + +DUMP_FILES = os.path.join(SQL_DIR, '*.sql') + +# celery tasks for testing purpose; tasks themselves should be +# in swh/scheduler/tests/celery_tasks.py +TASK_NAMES = ['ping', 'multiping', 'add', 'error'] + + +@pytest.fixture(scope='session') +def celery_enable_logging(): + return True + + +@pytest.fixture(scope='session') +def celery_includes(): + return [ + 'swh.scheduler.tests.tasks', + ] + + +@pytest.fixture(scope='session') +def celery_parameters(): + return { + 'task_cls': 'swh.scheduler.task:SWHTask', + } + + +@pytest.fixture(scope='session') +def celery_config(): + return { + 'accept_content': ['application/x-msgpack', 'application/json'], + 'task_serializer': 'msgpack', + 'result_serializer': 'msgpack', + } + + +# override the celery_session_app fixture to monkeypatch the 'main' +# swh.scheduler.celery_backend.config.app Celery application +# with the test application. +@pytest.fixture(scope='session') +def swh_app(celery_session_app): + import swh.scheduler.celery_backend.config + swh.scheduler.celery_backend.config.app = celery_session_app + yield celery_session_app + + +@pytest.fixture +def swh_scheduler(request, postgresql_proc, postgresql): + scheduler_config = { + 'scheduling_db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( + host=postgresql_proc.host, + port=postgresql_proc.port, + user='postgres', + dbname='tests') + } + + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + cursor = postgresql.cursor() + for fname in all_dump_files: + with open(fname) as fobj: + cursor.execute(fobj.read()) + postgresql.commit() + + scheduler = get_scheduler('local', scheduler_config) + for taskname in TASK_NAMES: + scheduler.create_task_type({ + 'type': 'swh-test-{}'.format(taskname), + 'description': 'The {} testing task'.format(taskname), + 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), + 'default_interval': timedelta(days=1), + 'min_interval': timedelta(hours=6), + 'max_interval': timedelta(days=12), + }) + + return scheduler diff --git a/swh/scheduler/tests/scheduler_testing.py b/swh/scheduler/tests/scheduler_testing.py deleted file mode 100644 index 1fad1f6..0000000 --- a/swh/scheduler/tests/scheduler_testing.py +++ /dev/null @@ -1,80 +0,0 @@ -import glob -import os.path -import datetime - -from celery.result import AsyncResult -from celery.contrib.testing.worker import start_worker -import celery.contrib.testing.tasks # noqa -import pytest - -from swh.core.tests.db_testing import DbTestFixture, DB_DUMP_TYPES -from swh.core.utils import numfile_sortkey as sortkey - -from swh.scheduler import get_scheduler -from swh.scheduler.celery_backend.runner import run_ready_tasks -from swh.scheduler.celery_backend.config import app -from swh.scheduler.tests.celery_testing import CeleryTestFixture - -from . import SQL_DIR - -DUMP_FILES = os.path.join(SQL_DIR, '*.sql') - - -@pytest.mark.db -class SchedulerTestFixture(CeleryTestFixture, DbTestFixture): - """Base class for test case classes, providing an SWH scheduler as - the `scheduler` attribute.""" - SCHEDULER_DB_NAME = 'softwareheritage-scheduler-test-fixture' - - def add_scheduler_task_type(self, task_type, backend_name, - task_class=None): - task_type = { - 'type': task_type, - 'description': 'Update a git repository', - 'backend_name': backend_name, - 'default_interval': datetime.timedelta(days=64), - 'min_interval': datetime.timedelta(hours=12), - 'max_interval': datetime.timedelta(days=64), - 'backoff_factor': 2, - 'max_queue_length': None, - 'num_retries': 7, - 'retry_delay': datetime.timedelta(hours=2), - } - self.scheduler.create_task_type(task_type) - if task_class: - app.register_task_class(backend_name, task_class) - - def run_ready_tasks(self): - """Runs the scheduler and a Celery worker, then blocks until - all tasks are completed.""" - - # Make sure the worker is listening to all task-specific queues - for task in self.scheduler.get_task_types(): - app.amqp.queues.select_add(task['backend_name']) - - with start_worker(app): - backend_tasks = run_ready_tasks(self.scheduler, app) - for task in backend_tasks: - # Make sure the task completed - AsyncResult(id=task['backend_id']).get() - - @classmethod - def setUpClass(cls): - all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) - - all_dump_files = [(x, DB_DUMP_TYPES[os.path.splitext(x)[1]]) - for x in all_dump_files] - - cls.add_db(name=cls.SCHEDULER_DB_NAME, - dumps=all_dump_files) - super().setUpClass() - - def setUp(self): - super().setUp() - self.scheduler_config = { - 'scheduling_db': 'dbname=' + self.SCHEDULER_DB_NAME} - self.scheduler = get_scheduler('local', self.scheduler_config) - - def tearDown(self): - self.scheduler.close_connection() - super().tearDown() diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py new file mode 100644 index 0000000..5702680 --- /dev/null +++ b/swh/scheduler/tests/tasks.py @@ -0,0 +1,41 @@ +from celery import group + +from swh.scheduler.celery_backend.config import app + + +@app.task(name='swh.scheduler.tests.tasks.ping', + bind=True) +def ping(self, **kw): + # check this is a SWHTask + assert hasattr(self, 'log') + assert not hasattr(self, 'run_task') + assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] + self.log.debug(self.name) + if kw: + return 'OK (kw=%s)' % kw + return 'OK' + + +@app.task(name='swh.scheduler.tests.tasks.multiping', + bind=True) +def multiping(self, n=10): + self.log.debug(self.name) + + promise = group(ping.s(i=i) for i in range(n))() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) + promise.save() + return promise.id + + +@app.task(name='swh.scheduler.tests.tasks.error', + bind=True) +def not_implemented(self): + self.log.debug(self.name) + raise NotImplementedError('Nope') + + +@app.task(name='swh.scheduler.tests.tasks.add', + bind=True) +def add(self, x, y): + self.log.debug(self.name) + return x + y diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py new file mode 100644 index 0000000..accd1a0 --- /dev/null +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -0,0 +1,90 @@ +from time import sleep +from celery.result import GroupResult +from celery.result import AsyncResult + +import pytest + +from swh.scheduler.utils import create_task_dict +from swh.scheduler.celery_backend.runner import run_ready_tasks + + +def test_ping(swh_app, celery_session_worker): + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.ping') + assert res + res.wait() + assert res.successful() + assert res.result == 'OK' + + +def test_multiping(swh_app, celery_session_worker): + "Test that a task that spawns subtasks (group) works" + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.multiping', n=5) + assert res + + res.wait() + assert res.successful() + + # retrieve the GroupResult for this task and wait for all the subtasks + # to complete + promise_id = res.result + assert promise_id + promise = GroupResult.restore(promise_id, app=swh_app) + for i in range(5): + if promise.ready(): + break + sleep(1) + + results = [x.get() for x in promise.results] + for i in range(5): + assert ("OK (kw={'i': %s})" % i) in results + + +def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): + "Test that the scheduler fixture works properly" + task_type = swh_scheduler.get_task_type('swh-test-ping') + + assert task_type + assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.ping' + + swh_scheduler.create_tasks([create_task_dict( + 'swh-test-ping', 'oneshot')]) + + backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + assert backend_tasks + for task in backend_tasks: + # Make sure the task completed + AsyncResult(id=task['backend_id']).get() + + +def test_task_return_value(swh_app, celery_session_worker, swh_scheduler): + task_type = swh_scheduler.get_task_type('swh-test-add') + assert task_type + assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.add' + + swh_scheduler.create_tasks([create_task_dict( + 'swh-test-add', 'oneshot', 12, 30)]) + + backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + assert len(backend_tasks) == 1 + task = backend_tasks[0] + value = AsyncResult(id=task['backend_id']).get() + assert value == 42 + + +def test_task_exception(swh_app, celery_session_worker, swh_scheduler): + task_type = swh_scheduler.get_task_type('swh-test-error') + assert task_type + assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.error' + + swh_scheduler.create_tasks([create_task_dict( + 'swh-test-error', 'oneshot')]) + + backend_tasks = run_ready_tasks(swh_scheduler, swh_app) + assert len(backend_tasks) == 1 + + task = backend_tasks[0] + result = AsyncResult(id=task['backend_id']) + with pytest.raises(NotImplementedError): + result.get() diff --git a/swh/scheduler/tests/test_fixtures.py b/swh/scheduler/tests/test_fixtures.py deleted file mode 100644 index e69ac9d..0000000 --- a/swh/scheduler/tests/test_fixtures.py +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest - -from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture -from swh.scheduler.task import Task -from swh.scheduler.utils import create_task_dict - -task_has_run = False - - -class SomeTestTask(Task): - def run(self, *, foo): - global task_has_run - assert foo == 'bar' - task_has_run = True - - -class FixtureTest(SchedulerTestFixture, unittest.TestCase): - def setUp(self): - super().setUp() - self.add_scheduler_task_type( - 'some_test_task_type', - 'swh.scheduler.tests.test_fixtures.SomeTestTask', - SomeTestTask, - ) - - def test_task_run(self): - self.scheduler.create_tasks([create_task_dict( - 'some_test_task_type', - 'oneshot', - foo='bar', - )]) - self.assertEqual(task_has_run, False) - self.run_ready_tasks() - self.assertEqual(task_has_run, True) diff --git a/swh/scheduler/tests/test_task.py b/swh/scheduler/tests/test_task.py deleted file mode 100644 index 9abe842..0000000 --- a/swh/scheduler/tests/test_task.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest - -from celery import current_app as app - -from swh.scheduler import task -from .celery_testing import CeleryTestFixture - - -class Task(CeleryTestFixture, unittest.TestCase): - - def test_not_implemented_task(self): - class NotImplementedTask(task.Task): - name = 'NotImplementedTask' - - pass - - app.register_task(NotImplementedTask()) - - with self.assertRaises(NotImplementedError): - NotImplementedTask().run() - - def test_add_task(self): - class AddTask(task.Task): - name = 'AddTask' - - def run_task(self, x, y): - return x + y - - app.register_task(AddTask()) - - r = AddTask().apply([2, 3]) - self.assertTrue(r.successful()) - self.assertEqual(r.result, 5) diff --git a/version.txt b/version.txt index 2fc4a81..e0d62ae 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.38-0-gb6bc2f2 \ No newline at end of file +v0.0.39-0-gcaaa44b \ No newline at end of file