diff --git a/sql/swh-scheduler-schema.sql b/sql/swh-scheduler-schema.sql index e9bb2be..29febc2 100644 --- a/sql/swh-scheduler-schema.sql +++ b/sql/swh-scheduler-schema.sql @@ -1,232 +1,258 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) - values (1, now(), 'Work In Progress'); + values (2, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval not null, min_interval interval not null, max_interval interval not null, backoff_factor float not null ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'disabled'); comment on type task_status is 'Status of a given task'; create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval not null, status task_status not null ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; create index on task(type); create index on task(next_run); create index task_args on task using btree ((arguments -> 'args')); create index task_kwargs on task using gin ((arguments -> 'kwargs')); create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'lost'); comment on type task_run_status is 'Status of a given task run'; create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; create index on task_run(task); create index on task_run(backend_id); create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task drop column id, drop column current_interval, drop column status; $$; comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin return query insert into task (type, arguments, next_run, status, current_interval) select type, arguments, next_run, 'next_run_not_scheduled', (select default_interval from task_type tt where tt.type = type) from tmp_task returning task.*; end; $$; comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; create or replace function swh_scheduler_peek_ready_tasks (ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts and status = 'next_run_not_scheduled' order by next_run limit num_tasks; $$; create or replace function swh_scheduler_grab_ready_tasks (ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from task where next_run <= ts and status='next_run_not_scheduled' order by next_run limit num_tasks for update skip locked ) next_tasks where task.id = next_tasks.id returning task.*; $$; create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ insert into task_run (task, backend_id, metadata, scheduled, status) values (task_id, backend_id, metadata, ts, 'scheduled') returning *; $$; +create or replace function swh_scheduler_mktemp_task_run () + returns void + language sql +as $$ + create temporary table tmp_task_run ( + like task_run excluding indexes + ) on commit drop; + alter table tmp_task_run + drop column id, + drop column status; +$$; + +comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling'; + +create or replace function swh_scheduler_schedule_task_run_from_temp () + returns void + language plpgsql +as $$ +begin + insert into task_run (task, backend_id, metadata, scheduled, status) + select task, backend_id, metadata, scheduled, 'scheduled' + from tmp_task_run; + return; +end; +$$; + create or replace function swh_scheduler_start_task_run (backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set started = ts, status = 'started', - metadata = task_run.metadata || swh_scheduler_start_task_run.metadata + metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; create or replace function swh_scheduler_end_task_run (backend_id text, status task_run_status, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set ended = ts, status = swh_scheduler_end_task_run.status, - metadata = task_run.metadata || swh_scheduler_end_task_run.metadata + metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; create or replace function swh_scheduler_compute_new_task_interval (task_type text, current_interval interval, end_status task_run_status) returns interval language plpgsql stable as $$ declare task_type_row task_type%rowtype; adjustment_factor float; begin select * from task_type where type = swh_scheduler_compute_new_task_interval.task_type into task_type_row; case end_status when 'eventful' then adjustment_factor := 1/task_type_row.backoff_factor; when 'uneventful' then adjustment_factor := task_type_row.backoff_factor; else -- failed or lost task: no backoff. adjustment_factor := 1; end case; return greatest(task_type_row.min_interval, least(task_type_row.max_interval, adjustment_factor * current_interval)); end; $$; create or replace function swh_scheduler_update_task_interval () returns trigger language plpgsql as $$ begin update task set status = 'next_run_not_scheduled', current_interval = swh_scheduler_compute_new_task_interval(type, current_interval, new.status), next_run = now () + swh_scheduler_compute_new_task_interval(type, current_interval, new.status) where id = new.task; return null; end; $$; create trigger update_interval_on_task_end after update of status on task_run for each row when (new.status IN ('eventful', 'uneventful', 'failed', 'lost')) execute procedure swh_scheduler_update_task_interval (); diff --git a/sql/updates/02.sql b/sql/updates/02.sql new file mode 100644 index 0000000..04fc040 --- /dev/null +++ b/sql/updates/02.sql @@ -0,0 +1,66 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 01 +-- to_version: 02 +-- description: Allow mass-creation of task runs + +begin; + +insert into dbversion (version, release, description) + values (2, now(), 'Work In Progress'); + +create or replace function swh_scheduler_mktemp_task_run () + returns void + language sql +as $$ + create temporary table tmp_task_run ( + like task_run excluding indexes + ) on commit drop; + alter table tmp_task_run + drop column id, + drop column status; +$$; + +comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling'; + +create or replace function swh_scheduler_schedule_task_run_from_temp () + returns void + language plpgsql +as $$ +begin + insert into task_run (task, backend_id, metadata, scheduled, status) + select task, backend_id, metadata, scheduled, 'scheduled' + from tmp_task_run; + return; +end; +$$; + +create or replace function swh_scheduler_start_task_run (backend_id text, + metadata jsonb default '{}'::jsonb, + ts timestamptz default now()) + returns task_run + language sql +as $$ + update task_run + set started = ts, + status = 'started', + metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata + where task_run.backend_id = swh_scheduler_start_task_run.backend_id + returning *; +$$; + +create or replace function swh_scheduler_end_task_run (backend_id text, + status task_run_status, + metadata jsonb default '{}'::jsonb, + ts timestamptz default now()) + returns task_run + language sql +as $$ + update task_run + set ended = ts, + status = swh_scheduler_end_task_run.status, + metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata + where task_run.backend_id = swh_scheduler_end_task_run.backend_id + returning *; +$$; + +commit; diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 75bb42a..690cc2a 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,358 +1,378 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime from functools import wraps import json import tempfile from arrow import Arrow, utcnow import psycopg2 import psycopg2.extras from psycopg2.extensions import AsIs from swh.core.config import SWHConfig def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped class SchedulerBackend(SWHConfig): """ Backend for the Software Heritage scheduling database. """ CONFIG_BASE_FILENAME = 'scheduler.ini' DEFAULT_CONFIG = { 'scheduling_db': ('str', 'dbname=swh-scheduler'), } def __init__(self, **override_config): self.config = self.parse_config_file(global_config=False) self.config.update(override_config) self.db = None self.reconnect() def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['scheduling_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() def copy_to(self, items, tblname, columns, cursor=None, item_cb=None): def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, (datetime.datetime, Arrow)): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k)) for k in columns] f.write(','.join(line)) f.write('\n') f.seek(0) cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', ] def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) def _format_multiquery(self, query, keys, values): """Format a query with placeholders generated for multiple values""" query_keys = ', '.join(keys) placeholders = '), ('.join( [', '.join(['%s'] * len(keys))] * len(values) ) ret_values = sum([[value[key] for key in keys] for value in values], []) return ( query.format(keys=query_keys, placeholders=placeholders), ret_values, ) @autocommit def create_task_type(self, task_type, cursor=None): """Create a new task type ready for scheduling. A task type is a dictionary with the following keys: type (str): an identifier for the task type description (str): a human-readable description of what the task does backend_name (str): the name of the task in the job-scheduling backend default_interval (datetime.timedelta): the default interval between two task runs min_interval (datetime.timedelta): the minimum interval between two task runs max_interval (datetime.timedelta): the maximum interval between two task runs backoff_factor (float): the factor by which the interval changes at each run """ query = self._format_query( """insert into task_type ({keys}) values ({placeholders})""", self.task_type_keys, ) cursor.execute(query, [task_type[key] for key in self.task_type_keys]) @autocommit def get_task_type(self, task_type_name, cursor=None): """Retrieve the task type with id task_type_name""" query = self._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cursor.execute(query, (task_type_name,)) ret = cursor.fetchone() return ret task_keys = ['id', 'type', 'arguments', 'next_run', 'current_interval', 'status'] task_create_keys = ['type', 'arguments', 'next_run'] @autocommit def create_tasks(self, tasks, cursor=None): """Create new tasks. A task is a dictionary with the following keys: type (str): the task type arguments (dict): the arguments for the task runner args (list of str): arguments kwargs (dict str -> str): keyword arguments next_run (datetime.datetime): the next scheduled run for the task This returns a list of created task ids. """ cursor.execute('select swh_scheduler_mktemp_task()') self.copy_to(tasks, 'tmp_task', self.task_create_keys, cursor) query = self._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cursor.execute(query) return cursor.fetchall() @autocommit def peek_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks Args: timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute('select * from swh_scheduler_peek_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() @autocommit def grab_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks, and mark them as scheduled Args: timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute('select * from swh_scheduler_grab_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() + task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] + @autocommit def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) return cursor.fetchone() + @autocommit + def mass_schedule_task_runs(self, task_runs, cursor=None): + """Schedule a bunch of task runs. + + Args: + task_runs: a list of dicts with keys: + task (int): the identifier for the task being scheduled + backend_id (str): the identifier of the job in the backend + metadata (dict): metadata to add to the task_run entry + scheduled (datetime.datetime): the instant the event occurred + Returns: + None + """ + cursor.execute('select swh_scheduler_mktemp_task_run()') + self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, + cursor) + cursor.execute('select swh_scheduler_schedule_task_run_from_temp()') + @autocommit def start_task_run(self, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def end_task_run(self, backend_id, status, metadata=None, timestamp=None, cursor=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status ('eventful', 'uneventful', 'failed'): how the task ended metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) return cursor.fetchone() diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index 8aa61d1..63fde9a 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,141 +1,142 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from celery import Celery from celery.signals import setup_logging from celery.utils.log import ColorFormatter from kombu import Exchange, Queue from swh.core.config import load_named_config from swh.core.logger import PostgresHandler CONFIG_NAME = 'worker.ini' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG formatter = logging.Formatter(format) if colorize: color_formatter = ColorFormatter(format) else: color_formatter = formatter root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) pg = PostgresHandler(CONFIG['log_db']) pg.setFormatter(logging.Formatter(format)) pg.setLevel(logging.DEBUG) pg.setFormatter(formatter) root_logger.addHandler(console) root_logger.addHandler(pg) celery_logger = logging.getLogger('celery') celery_logger.setLevel(logging.INFO) # Silence useless "Starting new HTTP connection" messages urllib3_logger = logging.getLogger('urllib3') urllib3_logger.setLevel(logging.WARNING) swh_logger = logging.getLogger('swh') swh_logger.setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task celery_task_logger = logging.getLogger('celery.task') celery_task_logger.setLevel(loglevel) class TaskRouter: """Route tasks according to the task_queue attribute in the task class""" def route_for_task(self, task, args=None, kwargs=None): task_class = app.tasks[task] if hasattr(task_class, 'task_queue'): return {'queue': task_class.task_queue} return None # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) # Instantiate the Celery app app = Celery() app.conf.update( # The broker BROKER_URL=CONFIG['task_broker'], # Timezone configuration: all in UTC CELERY_ENABLE_UTC=True, CELERY_TIMEZONE='UTC', # Imported modules CELERY_IMPORTS=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. CELERY_TASK_RESULT_EXPIRES=None, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['msgpack', 'pickle', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONFIG['task_soft_time_limit'], # Task routing CELERY_ROUTES=TaskRouter(), # Task queues this worker will consume from CELERY_QUEUES=CELERY_QUEUES, # Allow pool restarts from remote CELERYD_POOL_RESTARTS=True, # Do not prefetch tasks CELERYD_PREFETCH_MULTIPLIER=1, # Send events CELERY_SEND_EVENTS=True, - CELERY_SEND_TASK_SENT_EVENT=True, + # Do not send useless task_sent events + CELERY_SEND_TASK_SENT_EVENT=False, ) diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index 14a0944..b518c83 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,46 +1,61 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import arrow +from celery import group + from ..backend import SchedulerBackend from .config import app as main_app def run_ready_tasks(backend, app): """Run all tasks that are ready""" backend_names = {} while True: cursor = backend.cursor() - pending_tasks = backend.grab_ready_tasks(num_tasks=100, cursor=cursor) + pending_tasks = backend.grab_ready_tasks(num_tasks=10000, + cursor=cursor) if not pending_tasks: break + celery_tasks = [] for task in pending_tasks: backend_name = backend_names.get(task['type']) if not backend_name: task_type = backend.get_task_type(task['type'], cursor=cursor) backend_names[task['type']] = task_type['backend_name'] backend_name = task_type['backend_name'] args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] - celery_task = app.tasks[backend_name].delay(*args, **kwargs) - backend.schedule_task_run(task['id'], celery_task.id, - cursor=cursor) + celery_task = app.tasks[backend_name].s(*args, **kwargs) + + celery_tasks.append(celery_task) + + group_result = group(celery_tasks).delay() + + backend_tasks = [{ + 'task': task['id'], + 'backend_id': group_result.results[i].id, + 'scheduled': arrow.utcnow(), + } for i, task in enumerate(pending_tasks)] + + backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) backend.commit() if __name__ == '__main__': for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = SchedulerBackend() try: run_ready_tasks(main_backend, main_app) except: main_backend.rollback() raise