diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 1229cfd..5a8ed2b 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,148 +1,148 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from flask import request from swh.core import config from swh.scheduler import get_scheduler as get_scheduler_from from swh.core.api import (SWHServerAPIApp, decode_request, error_handler, encode_data_server as encode_data) DEFAULT_CONFIG_PATH = 'backend/scheduler' DEFAULT_CONFIG = { 'scheduler': ('dict', { 'cls': 'local', 'args': { - 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', + 'db': 'dbname=softwareheritage-scheduler-dev', }, }) } app = SWHServerAPIApp(__name__) scheduler = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) def get_sched(): global scheduler if not scheduler: scheduler = get_scheduler_from(**app.config['scheduler']) return scheduler @app.route('/') def index(): return 'SWH Scheduler API server' @app.route('/close_connection', methods=['POST']) def close_connection(): return encode_data(get_sched().close_connection()) @app.route('/set_status_tasks', methods=['POST']) def set_status_tasks(): return encode_data(get_sched().set_status_tasks(**decode_request(request))) @app.route('/create_task_type', methods=['POST']) def create_task_type(): return encode_data(get_sched().create_task_type(**decode_request(request))) @app.route('/get_task_type', methods=['POST']) def get_task_type(): return encode_data(get_sched().get_task_type(**decode_request(request))) @app.route('/get_task_types', methods=['POST']) def get_task_types(): return encode_data(get_sched().get_task_types(**decode_request(request))) @app.route('/create_tasks', methods=['POST']) def create_tasks(): return encode_data(get_sched().create_tasks(**decode_request(request))) @app.route('/disable_tasks', methods=['POST']) def disable_tasks(): return encode_data(get_sched().disable_tasks(**decode_request(request))) @app.route('/get_tasks', methods=['POST']) def get_tasks(): return encode_data(get_sched().get_tasks(**decode_request(request))) @app.route('/search_tasks', methods=['POST']) def search_tasks(): return encode_data(get_sched().search_tasks(**decode_request(request))) @app.route('/peek_ready_tasks', methods=['POST']) def peek_ready_tasks(): return encode_data(get_sched().peek_ready_tasks(**decode_request(request))) @app.route('/grab_ready_tasks', methods=['POST']) def grab_ready_tasks(): return encode_data(get_sched().grab_ready_tasks(**decode_request(request))) @app.route('/schedule_task_run', methods=['POST']) def schedule_task_run(): return encode_data(get_sched().schedule_task_run( **decode_request(request))) @app.route('/mass_schedule_task_runs', methods=['POST']) def mass_schedule_task_runs(): return encode_data( get_sched().mass_schedule_task_runs(**decode_request(request))) @app.route('/start_task_run', methods=['POST']) def start_task_run(): return encode_data(get_sched().start_task_run(**decode_request(request))) @app.route('/end_task_run', methods=['POST']) def end_task_run(): return encode_data(get_sched().end_task_run(**decode_request(request))) @app.route('/filter_task_to_archive', methods=['POST']) def filter_task_to_archive(): return encode_data( get_sched().filter_task_to_archive(**decode_request(request))) @app.route('/delete_archived_tasks', methods=['POST']) def delete_archived_tasks(): return encode_data( get_sched().delete_archived_tasks(**decode_request(request))) def run_from_webserver(environ, start_response, config_path=DEFAULT_CONFIG_PATH): """Run the WSGI app from the webserver, loading the configuration.""" cfg = config.load_named_config(config_path, DEFAULT_CONFIG) app.config.update(cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) if __name__ == '__main__': print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 09171c2..01a57ab 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,590 +1,516 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime -from functools import wraps import json import tempfile import logging from arrow import Arrow, utcnow -import psycopg2 +import psycopg2.pool import psycopg2.extras from psycopg2.extensions import AsIs -from swh.core.config import SWHConfig +from swh.core.db import BaseDb +from swh.core.db.common import db_transaction, db_transaction_generator logger = logging.getLogger(__name__) def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) -def autocommit(fn): - @wraps(fn) - def wrapped(self, *args, **kwargs): - autocommit = False - if 'cursor' not in kwargs or not kwargs['cursor']: - autocommit = True - kwargs['cursor'] = self.cursor() - - try: - ret = fn(self, *args, **kwargs) - except Exception: - if autocommit: - self.rollback() - raise - - if autocommit: - self.commit() - - return ret - - return wrapped - - -class DbBackend: - """Mixin intended to be used within scheduling db backend classes +class DbBackend(BaseDb): + """Base class intended to be used for scheduling db backend classes cf. swh.scheduler.backend.SchedulerBackend, and swh.scheduler.updater.backend.SchedulerUpdaterBackend """ - def reconnect(self): - if not self.db or self.db.closed: - self.db = psycopg2.connect( - dsn=self.db_conn_dsn, - cursor_factory=psycopg2.extras.RealDictCursor, - ) - - def cursor(self): - """Return a fresh cursor on the database, with auto-reconnection in - case of failure - - """ - cur = None - - # Get a fresh cursor and reconnect at most three times - tries = 0 - while True: - tries += 1 - try: - cur = self.db.cursor() - cur.execute('select 1') - break - except psycopg2.OperationalError: - if tries < 3: - self.reconnect() - else: - raise - - return cur - - def commit(self): - """Commit a transaction""" - self.db.commit() - - def rollback(self): - """Rollback a transaction""" - self.db.rollback() - - def close_connection(self): - """Close db connection""" - if self.db and not self.db.closed: - self.db.close() + cursor = BaseDb._cursor def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) - def _format_multiquery(self, query, keys, values): - """Format a query with placeholders generated for multiple values""" - query_keys = ', '.join(keys) - placeholders = '), ('.join( - [', '.join(['%s'] * len(keys))] * len(values) - ) - ret_values = sum([[value[key] for key in keys] - for value in values], []) - - return ( - query.format(keys=query_keys, placeholders=placeholders), - ret_values, - ) - def copy_to(self, items, tblname, columns, default_columns={}, cursor=None, item_cb=None): def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, (datetime.datetime, Arrow)): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [] for k in columns: v = d.get(k) if not v: v = default_columns.get(k) v = escape(v) line.append(v) f.write(','.join(line)) f.write('\n') f.seek(0) cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) -class SchedulerBackend(SWHConfig, DbBackend): +class SchedulerBackend: """Backend for the Software Heritage scheduling database. """ - CONFIG_BASE_FILENAME = 'scheduler' - DEFAULT_CONFIG = { - 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), - } - - def __init__(self, **override_config): - super().__init__() - self.config = self.parse_config_file(global_config=False) - self.config.update(override_config) - self.db = None - self.db_conn_dsn = self.config['scheduling_db'] - self.reconnect() - logger.debug('SchedulerBackend config=%s' % self.config) + + def __init__(self, db, min_pool_conns=1, max_pool_conns=10): + """ + Args: + db_conn: either a libpq connection string, or a psycopg2 connection + + """ + if isinstance(db, psycopg2.extensions.connection): + self._pool = None + self._db = DbBackend(db) + else: + self._pool = psycopg2.pool.ThreadedConnectionPool( + min_pool_conns, max_pool_conns, db, + cursor_factory=psycopg2.extras.RealDictCursor, + ) + self._db = None + + def get_db(self): + if self._db: + return self._db + return DbBackend.from_pool(self._pool) task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay', ] - @autocommit - def create_task_type(self, task_type, cursor=None): + @db_transaction() + def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] - query = self._format_query( + query = db._format_query( """insert into task_type ({keys}) values ({placeholders})""", keys) - cursor.execute(query, [task_type[key] for key in keys]) + cur.execute(query, [task_type[key] for key in keys]) - @autocommit - def get_task_type(self, task_type_name, cursor=None): + @db_transaction() + def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" - query = self._format_query( + query = db._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) - cursor.execute(query, (task_type_name,)) - - ret = cursor.fetchone() + cur.execute(query, (task_type_name,)) + return cur.fetchone() - return ret - - @autocommit - def get_task_types(self, cursor=None): - query = self._format_query( + @db_transaction() + def get_task_types(self, db=None, cur=None): + """Retrieve all registered task types""" + query = db._format_query( "select {keys} from task_type", self.task_type_keys, ) - cursor.execute(query) - ret = cursor.fetchall() - return ret + cur.execute(query) + return cur.fetchall() task_create_keys = [ 'type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority' ] task_keys = task_create_keys + ['id', 'current_interval', 'status'] - @autocommit - def create_tasks(self, tasks, policy='recurring', cursor=None): + @db_transaction() + def create_tasks(self, tasks, policy='recurring', db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ - cursor.execute('select swh_scheduler_mktemp_task()') - self.copy_to(tasks, 'tmp_task', self.task_create_keys, - default_columns={ - 'policy': policy, - 'status': 'next_run_not_scheduled' - }, - cursor=cursor) - query = self._format_query( + cur.execute('select swh_scheduler_mktemp_task()') + db.copy_to(tasks, 'tmp_task', self.task_create_keys, + default_columns={ + 'policy': policy, + 'status': 'next_run_not_scheduled' + }, + cursor=cur) + query = db._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) - cursor.execute(query) - return cursor.fetchall() + cur.execute(query) + return cur.fetchall() - @autocommit - def set_status_tasks(self, task_ids, - status='disabled', next_run=None, cursor=None): + @db_transaction() + def set_status_tasks(self, task_ids, status='disabled', next_run=None, + db=None, cur=None): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args = [status] if next_run: query.append(', next_run = %s') args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) - cursor.execute(''.join(query), args) + cur.execute(''.join(query), args) - @autocommit - def disable_tasks(self, task_ids, cursor=None): + @db_transaction() + def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" - return self.set_status_tasks(task_ids) + return self.set_status_tasks(task_ids, db=db, cur=cur) - @autocommit + @db_transaction() def search_tasks(self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, - limit=None, cursor=None): + limit=None, db=None, cur=None): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append('id = %s') else: where.append('id in %s') task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append('type = %s') else: where.append('type in %s') task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append('status = %s') else: where.append('status in %s') status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append('priority = %s') else: priority = tuple(priority) where.append('priority in %s') args.append(priority) if policy: where.append('policy = %s') args.append(policy) if before: where.append('next_run <= %s') args.append(before) if after: where.append('next_run >= %s') args.append(after) query = 'select * from task where ' + ' and '.join(where) if limit: query += ' limit %s :: bigint' args.append(limit) - cursor.execute(query, args) - return cursor.fetchall() + cur.execute(query, args) + return cur.fetchall() - @autocommit - def get_tasks(self, task_ids, cursor=None): + @db_transaction() + def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" - query = self._format_query('select {keys} from task where id in %s', - self.task_keys) - cursor.execute(query, (tuple(task_ids),)) - return cursor.fetchall() + query = db._format_query('select {keys} from task where id in %s', + self.task_keys) + cur.execute(query, (tuple(task_ids),)) + return cur.fetchall() - @autocommit + @db_transaction() def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, - cursor=None): + db=None, cur=None): """Fetch the list of ready tasks Args: task_type (str): filtering task per their type timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks (with no priority) num_tasks_priority (int): only peek at num_tasks_priority tasks (with priority) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( '''select * from swh_scheduler_peek_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)''', (task_type, timestamp, num_tasks, num_tasks_priority) ) - logger.debug('PEEK %s => %s' % (task_type, cursor.rowcount)) - return cursor.fetchall() + logger.debug('PEEK %s => %s' % (task_type, cur.rowcount)) + return cur.fetchall() - @autocommit + @db_transaction() def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, - num_tasks_priority=None, cursor=None): + num_tasks_priority=None, db=None, cur=None): """Fetch the list of ready tasks, and mark them as scheduled Args: task_type (str): filtering task per their type timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks (with no priority) num_tasks_priority (int): only grab oneshot num_tasks tasks (with priorities) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( '''select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)''', (task_type, timestamp, num_tasks, num_tasks_priority) ) - logger.debug('GRAB %s => %s' % (task_type, cursor.rowcount)) - return cursor.fetchall() + logger.debug('GRAB %s => %s' % (task_type, cur.rowcount)) + return cur.fetchall() task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] - @autocommit + @db_transaction() def schedule_task_run(self, task_id, backend_id, metadata=None, - timestamp=None, cursor=None): + timestamp=None, db=None, cur=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) - return cursor.fetchone() + return cur.fetchone() - @autocommit - def mass_schedule_task_runs(self, task_runs, cursor=None): + @db_transaction() + def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ - cursor.execute('select swh_scheduler_mktemp_task_run()') - self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, - cursor=cursor) - cursor.execute('select swh_scheduler_schedule_task_run_from_temp()') + cur.execute('select swh_scheduler_mktemp_task_run()') + db.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, + cursor=cur) + cur.execute('select swh_scheduler_schedule_task_run_from_temp()') - @autocommit + @db_transaction() def start_task_run(self, backend_id, metadata=None, timestamp=None, - cursor=None): + db=None, cur=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) - return cursor.fetchone() + return cur.fetchone() - @autocommit + @db_transaction() def end_task_run(self, backend_id, status, metadata=None, timestamp=None, - result=None, cursor=None): + result=None, db=None, cur=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) + return cur.fetchone() - return cursor.fetchone() - - @autocommit + @db_transaction_generator() def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, - cursor=None): + db=None, cur=None): """Returns the list of task/task_run prior to a given date to archive. """ last_task_run_id = None while True: row = None - cursor.execute( + cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit) ) - for row in cursor: + for row in cur: # nested type index does not accept bare values # transform it as a dict to comply with this row['arguments']['args'] = { i: v for i, v in enumerate(row['arguments']['args']) } kwargs = row['arguments']['kwargs'] row['arguments']['kwargs'] = json.dumps(kwargs) yield row if not row: break _id = row.get('task_id') _task_run_id = row.get('task_run_id') if last_id == _id and last_task_run_id == _task_run_id: break last_id = _id last_task_run_id = _task_run_id - @autocommit - def delete_archived_tasks(self, task_ids, cursor=None): + @db_transaction() + def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id['task_id']) _task_run_ids.append(task_id['task_run_id']) - cursor.execute( + cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids)) diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index be37bce..d4bdafc 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,118 +1,115 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import logging from swh.scheduler import get_scheduler, compute_nb_tasks_from logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks(backend, app): """Run tasks that are ready Args: backend (Scheduler): backend to read tasks to schedule app (App): Celery application to send tasks to Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': arrow.utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks = [] while True: - cursor = backend.cursor() task_types = {} pending_tasks = [] - for task_type in backend.get_task_types(cursor=cursor): + for task_type in backend.get_task_types(): task_type_name = task_type['type'] task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] backend_name = task_type['backend_name'] if max_queue_length: try: queue_length = app.get_queue_length(backend_name) except ValueError: queue_length = None if queue_length is None: # Running without RabbitMQ (probably a test env). num_tasks = MAX_NUM_TASKS else: num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS if num_tasks > 0: num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) grabbed_tasks = backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, - num_tasks_priority=num_tasks_priority, - cursor=cursor) + num_tasks_priority=num_tasks_priority) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) logger.info('Grabbed %s tasks %s', len(grabbed_tasks), task_type_name) if not pending_tasks: return all_backend_tasks backend_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] backend_name = task_types[task['type']]['backend_name'] celery_result = app.send_task( backend_name, args=args, kwargs=kwargs, ) data = { 'task': task['id'], 'backend_id': celery_result.id, 'scheduled': arrow.utcnow(), } backend_tasks.append(data) logger.debug('Sent %s celery tasks', len(backend_tasks)) - backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) - backend.commit() + backend.mass_schedule_task_runs(backend_tasks) all_backend_tasks.extend(backend_tasks) def main(): from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler('local') try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == '__main__': main()