diff --git a/sql/swh-scheduler-schema.sql b/sql/swh-scheduler-schema.sql index f463598..e9bb2be 100644 --- a/sql/swh-scheduler-schema.sql +++ b/sql/swh-scheduler-schema.sql @@ -1,222 +1,232 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval not null, min_interval interval not null, max_interval interval not null, backoff_factor float not null ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'disabled'); comment on type task_status is 'Status of a given task'; create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval not null, status task_status not null ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; create index on task(type); create index on task(next_run); create index task_args on task using btree ((arguments -> 'args')); create index task_kwargs on task using gin ((arguments -> 'kwargs')); +create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'lost'); +comment on type task_run_status is 'Status of a given task run'; + create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, - eventful boolean + status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; -comment on column task_run.eventful is 'Whether the task run has seen an update (and the ' - 'task running interval should be reduced)'; create index on task_run(task); create index on task_run(backend_id); create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task drop column id, drop column current_interval, drop column status; $$; comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin return query insert into task (type, arguments, next_run, status, current_interval) select type, arguments, next_run, 'next_run_not_scheduled', (select default_interval from task_type tt where tt.type = type) from tmp_task returning task.*; end; $$; comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; create or replace function swh_scheduler_peek_ready_tasks (ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts and status = 'next_run_not_scheduled' order by next_run limit num_tasks; $$; create or replace function swh_scheduler_grab_ready_tasks (ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from task where next_run <= ts and status='next_run_not_scheduled' order by next_run limit num_tasks for update skip locked ) next_tasks where task.id = next_tasks.id returning task.*; $$; create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, - metadata jsonb default '{}'::jsonb) + metadata jsonb default '{}'::jsonb, + ts timestamptz default now()) returns task_run language sql as $$ - insert into task_run (task, backend_id, metadata, scheduled) - values (task_id, backend_id, metadata, now()) + insert into task_run (task, backend_id, metadata, scheduled, status) + values (task_id, backend_id, metadata, ts, 'scheduled') returning *; $$; create or replace function swh_scheduler_start_task_run (backend_id text, - metadata jsonb default '{}'::jsonb) + metadata jsonb default '{}'::jsonb, + ts timestamptz default now()) returns task_run language sql as $$ update task_run - set started = now (), + set started = ts, + status = 'started', metadata = task_run.metadata || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; create or replace function swh_scheduler_end_task_run (backend_id text, - eventful boolean, - metadata jsonb default '{}'::jsonb) + status task_run_status, + metadata jsonb default '{}'::jsonb, + ts timestamptz default now()) returns task_run language sql as $$ update task_run - set ended = now (), - eventful = swh_scheduler_end_task_run.eventful, + set ended = ts, + status = swh_scheduler_end_task_run.status, metadata = task_run.metadata || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; create or replace function swh_scheduler_compute_new_task_interval (task_type text, current_interval interval, - eventful boolean) + end_status task_run_status) returns interval language plpgsql stable as $$ declare task_type_row task_type%rowtype; adjustment_factor float; begin select * from task_type where type = swh_scheduler_compute_new_task_interval.task_type into task_type_row; - if eventful then + case end_status + when 'eventful' then adjustment_factor := 1/task_type_row.backoff_factor; - else + when 'uneventful' then adjustment_factor := task_type_row.backoff_factor; - end if; + else + -- failed or lost task: no backoff. + adjustment_factor := 1; + end case; return greatest(task_type_row.min_interval, least(task_type_row.max_interval, adjustment_factor * current_interval)); end; $$; create or replace function swh_scheduler_update_task_interval () returns trigger language plpgsql as $$ begin update task set status = 'next_run_not_scheduled', - current_interval = swh_scheduler_compute_new_task_interval(type, current_interval, new.eventful), - next_run = now () + swh_scheduler_compute_new_task_interval(type, current_interval, new.eventful) + current_interval = swh_scheduler_compute_new_task_interval(type, current_interval, new.status), + next_run = now () + swh_scheduler_compute_new_task_interval(type, current_interval, new.status) where id = new.task; return null; end; $$; create trigger update_interval_on_task_end - after update of ended, eventful on task_run + after update of status on task_run for each row + when (new.status IN ('eventful', 'uneventful', 'failed', 'lost')) execute procedure swh_scheduler_update_task_interval (); diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 01d574c..fae5558 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,353 +1,353 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import wraps import psycopg2 import psycopg2.extras from swh.core.config import SWHConfig psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped class SchedulerBackend(SWHConfig): """ Backend for the Software Heritage scheduling database. """ CONFIG_BASE_FILENAME = 'scheduler.ini' DEFAULT_CONFIG = { 'scheduling_db': ('str', 'dbname=swh-scheduler'), } def __init__(self): self.config = self.parse_config_file(global_config=False) self.db = None self.reconnect() def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['scheduling_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', ] def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) def _format_multiquery(self, query, keys, values): """Format a query with placeholders generated for multiple values""" query_keys = ', '.join(keys) placeholders = '), ('.join( [', '.join(['%s'] * len(keys))] * len(values) ) ret_values = sum([[value[key] for key in keys] for value in values], []) return ( query.format(keys=query_keys, placeholders=placeholders), ret_values, ) @autocommit def create_task_type(self, task_type, cursor=None): """Create a new task type ready for scheduling. A task type is a dictionary with the following keys: type (str): an identifier for the task type description (str): a human-readable description of what the task does backend_name (str): the name of the task in the job-scheduling backend default_interval (datetime.timedelta): the default interval between two task runs min_interval (datetime.timedelta): the minimum interval between two task runs max_interval (datetime.timedelta): the maximum interval between two task runs backoff_factor (float): the factor by which the interval changes at each run """ query = self._format_query( """insert into task_type ({keys}) values ({placeholders})""", self.task_type_keys, ) cursor.execute(query, [task_type[key] for key in self.task_type_keys]) @autocommit def get_task_type(self, task_type_name, cursor=None): """Retrieve the task type with id task_type_name""" query = self._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cursor.execute(query, (task_type_name,)) ret = cursor.fetchone() return ret create_task_keys = ['type', 'arguments', 'next_run'] task_keys = ['id', 'type', 'arguments', 'next_run', 'current_interval', 'status'] @autocommit def create_tasks(self, tasks, cursor=None): """Create new tasks. A task is a dictionary with the following keys: type (str): the task type arguments (dict): the arguments for the task runner args (list of str): arguments kwargs (dict str -> str): keyword arguments next_run (datetime.datetime): the next scheduled run for the task This returns a list of created task ids. """ cursor.execute('select swh_scheduler_mktemp_task()') query, data = self._format_multiquery( """insert into tmp_task ({keys}) values ({placeholders})""", self.create_task_keys, tasks, ) cursor.execute(query, data) query = self._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cursor.execute(query) return cursor.fetchall() @autocommit def peek_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks Args: timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = datetime.datetime.now(tz=datetime.timezone.utc) cursor.execute('select * from swh_scheduler_peek_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() @autocommit def grab_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks, and mark them as scheduled Args: timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = datetime.datetime.now(tz=datetime.timezone.utc) cursor.execute('select * from swh_scheduler_grab_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() @autocommit def schedule_task_run(self, task_id, backend_id, metadata=None, cursor=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry Returns: a fresh task_run entry """ if metadata is None: metadata = {} cursor.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s)', (task_id, backend_id, metadata) ) return cursor.fetchone() @autocommit def start_task_run(self, backend_id, metadata=None, cursor=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry Returns: the updated task_run entry """ if metadata is None: metadata = {} cursor.execute( 'select * from swh_scheduler_start_task_run(%s, %s)', (backend_id, metadata) ) return cursor.fetchone() @autocommit - def end_task_run(self, backend_id, eventful, metadata=None, cursor=None): + def end_task_run(self, backend_id, status, metadata=None, cursor=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend - eventful (bool): whether the task run was eventful + status ('eventful', 'uneventful', 'failed'): how the task ended metadata (dict): metadata to add to the task_run entry Returns: the updated task_run entry """ if metadata is None: metadata = {} cursor.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s)', - (backend_id, eventful, metadata) + (backend_id, status, metadata) ) return cursor.fetchone() if __name__ == '__main__': backend = SchedulerBackend() if not backend.get_task_type('origin-update-git'): backend.create_task_type({ 'type': "origin-update-git", 'description': 'Update an origin git repository', 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', 'default_interval': datetime.timedelta(days=8), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=32), 'backoff_factor': 2, }) print(backend.get_task_type('origin-update-git')) args = ''' https://github.com/hylang/hy https://github.com/torvalds/linux '''.strip().split() args = [arg.strip() for arg in args] tasks = [ { 'type': 'origin-update-git', 'next_run': datetime.datetime.now(tz=datetime.timezone.utc), 'arguments': { 'args': [arg], 'kwargs': {}, } } for arg in args ] print(backend.create_tasks(tasks)) print(backend.peek_ready_tasks()) # cur = backend.cursor() # ready_tasks = backend.grab_ready_tasks(cursor=cur) # print(ready_tasks) # for task in ready_tasks: # backend.schedule_task_run(task['id'], 'task-%s' % task['id'], # {'foo': 'bar'}, cursor=cur) # backend.commit() # for task in ready_tasks: # backend.start_task_run('task-%s' % task['id'], # {'worker': 'the-worker'}) # eventful = True # for task in ready_tasks: # eventful = not eventful # backend.end_task_run('task-%s' % task['id'], eventful, # {'ended': 'ack'}) diff --git a/swh/scheduler/celery/listener.py b/swh/scheduler/celery/listener.py index b693323..c916033 100644 --- a/swh/scheduler/celery/listener.py +++ b/swh/scheduler/celery/listener.py @@ -1,45 +1,50 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from .config import app as main_app from ..backend import SchedulerBackend def event_monitor(app, backend): state = app.events.State() def catchall_event(event, backend=backend): state.event(event) print(event) print(state) print(state.workers) def task_started(event, backend=backend): catchall_event(event, backend) backend.start_task_run(event['uuid'], metadata={'worker': event['hostname']}) def task_succeeded(event, backend=backend): catchall_event(event, backend) + status = 'uneventful' + if 'True' in event['result']: + status = 'eventful' backend.end_task_run(event['uuid'], - eventful='True' in event['result'], + status=status, metadata={}) def task_failed(event, backend=backend): catchall_event(event, backend) - # backend.fail_task_run(event['uuid']) + backend.end_task_run(event['uuid'], + status='failed', + metadata={}) with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ 'task-started': task_started, 'task-succeeded': task_succeeded, 'task-failed': task_failed, '*': catchall_event, }) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': main_backend = SchedulerBackend() event_monitor(main_app, main_backend)