diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 6bf086c..01d574c 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,346 +1,353 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import wraps import psycopg2 import psycopg2.extras from swh.core.config import SWHConfig psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped class SchedulerBackend(SWHConfig): """ Backend for the Software Heritage scheduling database. """ CONFIG_BASE_FILENAME = 'scheduler.ini' DEFAULT_CONFIG = { 'scheduling_db': ('str', 'dbname=swh-scheduler'), } def __init__(self): self.config = self.parse_config_file(global_config=False) self.db = None self.reconnect() def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['scheduling_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', ] def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) def _format_multiquery(self, query, keys, values): """Format a query with placeholders generated for multiple values""" query_keys = ', '.join(keys) placeholders = '), ('.join( [', '.join(['%s'] * len(keys))] * len(values) ) ret_values = sum([[value[key] for key in keys] for value in values], []) return ( query.format(keys=query_keys, placeholders=placeholders), ret_values, ) @autocommit def create_task_type(self, task_type, cursor=None): """Create a new task type ready for scheduling. A task type is a dictionary with the following keys: type (str): an identifier for the task type description (str): a human-readable description of what the task does backend_name (str): the name of the task in the job-scheduling backend default_interval (datetime.timedelta): the default interval between two task runs min_interval (datetime.timedelta): the minimum interval between two task runs max_interval (datetime.timedelta): the maximum interval between two task runs backoff_factor (float): the factor by which the interval changes at each run """ query = self._format_query( """insert into task_type ({keys}) values ({placeholders})""", self.task_type_keys, ) cursor.execute(query, [task_type[key] for key in self.task_type_keys]) @autocommit def get_task_type(self, task_type_name, cursor=None): """Retrieve the task type with id task_type_name""" query = self._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cursor.execute(query, (task_type_name,)) ret = cursor.fetchone() return ret create_task_keys = ['type', 'arguments', 'next_run'] task_keys = ['id', 'type', 'arguments', 'next_run', 'current_interval', 'status'] @autocommit def create_tasks(self, tasks, cursor=None): """Create new tasks. A task is a dictionary with the following keys: type (str): the task type arguments (dict): the arguments for the task runner args (list of str): arguments kwargs (dict str -> str): keyword arguments next_run (datetime.datetime): the next scheduled run for the task This returns a list of created task ids. """ cursor.execute('select swh_scheduler_mktemp_task()') query, data = self._format_multiquery( """insert into tmp_task ({keys}) values ({placeholders})""", self.create_task_keys, tasks, ) cursor.execute(query, data) query = self._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cursor.execute(query) return cursor.fetchall() @autocommit def peek_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks Args: timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = datetime.datetime.now(tz=datetime.timezone.utc) cursor.execute('select * from swh_scheduler_peek_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() @autocommit def grab_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks, and mark them as scheduled Args: timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = datetime.datetime.now(tz=datetime.timezone.utc) cursor.execute('select * from swh_scheduler_grab_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() @autocommit def schedule_task_run(self, task_id, backend_id, metadata=None, cursor=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry Returns: a fresh task_run entry """ if metadata is None: metadata = {} cursor.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s)', (task_id, backend_id, metadata) ) return cursor.fetchone() @autocommit def start_task_run(self, backend_id, metadata=None, cursor=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry Returns: the updated task_run entry """ if metadata is None: metadata = {} cursor.execute( 'select * from swh_scheduler_start_task_run(%s, %s)', (backend_id, metadata) ) return cursor.fetchone() @autocommit def end_task_run(self, backend_id, eventful, metadata=None, cursor=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend eventful (bool): whether the task run was eventful metadata (dict): metadata to add to the task_run entry Returns: the updated task_run entry """ if metadata is None: metadata = {} cursor.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s)', (backend_id, eventful, metadata) ) return cursor.fetchone() if __name__ == '__main__': backend = SchedulerBackend() - backend.create_task_type({ - 'type': "origin-update-git", - 'description': 'Update an origin git repository', - 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', - 'default_interval': datetime.timedelta(days=8), - 'min_interval': datetime.timedelta(hours=12), - 'max_interval': datetime.timedelta(days=32), - 'backoff_factor': 2, - }) + if not backend.get_task_type('origin-update-git'): + backend.create_task_type({ + 'type': "origin-update-git", + 'description': 'Update an origin git repository', + 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', + 'default_interval': datetime.timedelta(days=8), + 'min_interval': datetime.timedelta(hours=12), + 'max_interval': datetime.timedelta(days=32), + 'backoff_factor': 2, + }) print(backend.get_task_type('origin-update-git')) - args = ['foo', 'bar', 'baz', 'quux', 'bla'] + args = ''' + https://github.com/hylang/hy + https://github.com/torvalds/linux + '''.strip().split() + args = [arg.strip() for arg in args] + tasks = [ { 'type': 'origin-update-git', 'next_run': datetime.datetime.now(tz=datetime.timezone.utc), 'arguments': { 'args': [arg], 'kwargs': {}, } } for arg in args ] print(backend.create_tasks(tasks)) print(backend.peek_ready_tasks()) - cur = backend.cursor() - ready_tasks = backend.grab_ready_tasks(cursor=cur) - print(ready_tasks) - for task in ready_tasks: - backend.schedule_task_run(task['id'], 'task-%s' % task['id'], - {'foo': 'bar'}, cursor=cur) + # cur = backend.cursor() + # ready_tasks = backend.grab_ready_tasks(cursor=cur) + # print(ready_tasks) + + # for task in ready_tasks: + # backend.schedule_task_run(task['id'], 'task-%s' % task['id'], + # {'foo': 'bar'}, cursor=cur) - backend.commit() + # backend.commit() - for task in ready_tasks: - backend.start_task_run('task-%s' % task['id'], - {'worker': 'the-worker'}) + # for task in ready_tasks: + # backend.start_task_run('task-%s' % task['id'], + # {'worker': 'the-worker'}) - eventful = True - for task in ready_tasks: - eventful = not eventful - backend.end_task_run('task-%s' % task['id'], eventful, - {'ended': 'ack'}) + # eventful = True + # for task in ready_tasks: + # eventful = not eventful + # backend.end_task_run('task-%s' % task['id'], eventful, + # {'ended': 'ack'}) diff --git a/swh/scheduler/celery/listener.py b/swh/scheduler/celery/listener.py index f5095cf..b693323 100644 --- a/swh/scheduler/celery/listener.py +++ b/swh/scheduler/celery/listener.py @@ -1,25 +1,45 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from .config import app as main_app +from ..backend import SchedulerBackend -def event_monitor(app): +def event_monitor(app, backend): state = app.events.State() - def announce_event(event): + def catchall_event(event, backend=backend): state.event(event) print(event) print(state) print(state.workers) + def task_started(event, backend=backend): + catchall_event(event, backend) + backend.start_task_run(event['uuid'], + metadata={'worker': event['hostname']}) + + def task_succeeded(event, backend=backend): + catchall_event(event, backend) + backend.end_task_run(event['uuid'], + eventful='True' in event['result'], + metadata={}) + + def task_failed(event, backend=backend): + catchall_event(event, backend) + # backend.fail_task_run(event['uuid']) + with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ - '*': announce_event, + 'task-started': task_started, + 'task-succeeded': task_succeeded, + 'task-failed': task_failed, + '*': catchall_event, }) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': - event_monitor(main_app) + main_backend = SchedulerBackend() + event_monitor(main_app, main_backend) diff --git a/swh/scheduler/celery/runner.py b/swh/scheduler/celery/runner.py new file mode 100644 index 0000000..bc14ea0 --- /dev/null +++ b/swh/scheduler/celery/runner.py @@ -0,0 +1,46 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from ..backend import SchedulerBackend +from .config import app as main_app + + +def run_ready_tasks(backend, app): + """Run all tasks that are ready""" + + backend_names = {} + + while True: + cursor = backend.cursor() + pending_tasks = backend.grab_ready_tasks(num_tasks=100, cursor=cursor) + if not pending_tasks: + break + + for task in pending_tasks: + backend_name = backend_names.get(task['type']) + if not backend_name: + task_type = backend.get_task_type(task['type'], cursor=cursor) + backend_names[task['type']] = task_type['backend_name'] + backend_name = task_type['backend_name'] + + args = task['arguments']['args'] + kwargs = task['arguments']['kwargs'] + + celery_task = app.tasks[backend_name].delay(*args, **kwargs) + backend.schedule_task_run(task['id'], celery_task.id, + cursor=cursor) + + backend.commit() + +if __name__ == '__main__': + for module in main_app.conf.CELERY_IMPORTS: + __import__(module) + + main_backend = SchedulerBackend() + try: + run_ready_tasks(main_backend, main_app) + except: + main_backend.rollback() + raise