diff --git a/debian/control b/debian/control index ca99d6e..4ffada3 100644 --- a/debian/control +++ b/debian/control @@ -1,19 +1,21 @@ Source: swh-scheduler Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, + python3-arrow, python3-celery, python3-nose, + python3-psycopg2, python3-setuptools, python3-swh.core, python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSCH/ Package: python3-swh.scheduler Architecture: all Depends: ${misc:Depends}, ${python3:Depends} Description: Software Heritage Scheduler diff --git a/requirements.txt b/requirements.txt index beb36f5..df38688 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,11 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html -vcversioner + +arrow celery +psycopg2 +vcversioner swh.core diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index daf654b..7a61142 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,371 +1,318 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime from functools import wraps +from arrow import Arrow, utcnow import psycopg2 import psycopg2.extras +from psycopg2.extensions import AsIs from swh.core.config import SWHConfig +def adapt_arrow(arrow): + return AsIs("'%s'::timestamptz" % arrow.isoformat()) + + psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) +psycopg2.extensions.register_adapter(Arrow, adapt_arrow) def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped -def utcnow(): - return datetime.datetime.now(tz=datetime.timezone.utc) - - class SchedulerBackend(SWHConfig): """ Backend for the Software Heritage scheduling database. """ CONFIG_BASE_FILENAME = 'scheduler.ini' DEFAULT_CONFIG = { 'scheduling_db': ('str', 'dbname=swh-scheduler'), } def __init__(self): self.config = self.parse_config_file(global_config=False) self.db = None self.reconnect() def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['scheduling_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', ] def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) def _format_multiquery(self, query, keys, values): """Format a query with placeholders generated for multiple values""" query_keys = ', '.join(keys) placeholders = '), ('.join( [', '.join(['%s'] * len(keys))] * len(values) ) ret_values = sum([[value[key] for key in keys] for value in values], []) return ( query.format(keys=query_keys, placeholders=placeholders), ret_values, ) @autocommit def create_task_type(self, task_type, cursor=None): """Create a new task type ready for scheduling. A task type is a dictionary with the following keys: type (str): an identifier for the task type description (str): a human-readable description of what the task does backend_name (str): the name of the task in the job-scheduling backend default_interval (datetime.timedelta): the default interval between two task runs min_interval (datetime.timedelta): the minimum interval between two task runs max_interval (datetime.timedelta): the maximum interval between two task runs backoff_factor (float): the factor by which the interval changes at each run """ query = self._format_query( """insert into task_type ({keys}) values ({placeholders})""", self.task_type_keys, ) cursor.execute(query, [task_type[key] for key in self.task_type_keys]) @autocommit def get_task_type(self, task_type_name, cursor=None): """Retrieve the task type with id task_type_name""" query = self._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cursor.execute(query, (task_type_name,)) ret = cursor.fetchone() return ret create_task_keys = ['type', 'arguments', 'next_run'] task_keys = ['id', 'type', 'arguments', 'next_run', 'current_interval', 'status'] @autocommit def create_tasks(self, tasks, cursor=None): """Create new tasks. A task is a dictionary with the following keys: type (str): the task type arguments (dict): the arguments for the task runner args (list of str): arguments kwargs (dict str -> str): keyword arguments next_run (datetime.datetime): the next scheduled run for the task This returns a list of created task ids. """ cursor.execute('select swh_scheduler_mktemp_task()') query, data = self._format_multiquery( """insert into tmp_task ({keys}) values ({placeholders})""", self.create_task_keys, tasks, ) cursor.execute(query, data) query = self._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cursor.execute(query) return cursor.fetchall() @autocommit def peek_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks Args: timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute('select * from swh_scheduler_peek_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() @autocommit def grab_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks, and mark them as scheduled Args: timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute('select * from swh_scheduler_grab_ready_tasks(%s, %s)', (timestamp, num_tasks)) return cursor.fetchall() @autocommit def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def start_task_run(self, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def end_task_run(self, backend_id, status, metadata=None, timestamp=None, cursor=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status ('eventful', 'uneventful', 'failed'): how the task ended metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) return cursor.fetchone() - - -if __name__ == '__main__': - backend = SchedulerBackend() - if not backend.get_task_type('origin-update-git'): - backend.create_task_type({ - 'type': "origin-update-git", - 'description': 'Update an origin git repository', - 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', - 'default_interval': datetime.timedelta(days=8), - 'min_interval': datetime.timedelta(hours=12), - 'max_interval': datetime.timedelta(days=32), - 'backoff_factor': 2, - }) - - print(backend.get_task_type('origin-update-git')) - args = ''' - https://github.com/hylang/hy - https://github.com/torvalds/linux - '''.strip().split() - args = [arg.strip() for arg in args] - - tasks = [ - { - 'type': 'origin-update-git', - 'next_run': datetime.datetime.now(tz=datetime.timezone.utc), - 'arguments': { - 'args': [arg], - 'kwargs': {}, - } - } - for arg in args - ] - print(backend.create_tasks(tasks)) - print(backend.peek_ready_tasks()) - - # cur = backend.cursor() - # ready_tasks = backend.grab_ready_tasks(cursor=cur) - # print(ready_tasks) - - # for task in ready_tasks: - # backend.schedule_task_run(task['id'], 'task-%s' % task['id'], - # {'foo': 'bar'}, cursor=cur) - - # backend.commit() - - # for task in ready_tasks: - # backend.start_task_run('task-%s' % task['id'], - # {'worker': 'the-worker'}) - - # eventful = True - # for task in ready_tasks: - # eventful = not eventful - # backend.end_task_run('task-%s' % task['id'], eventful, - # {'ended': 'ack'}) diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index 3f5aeec..9c94d8f 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -1,180 +1,177 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import socket +from arrow import utcnow from kombu import Queue from kombu.mixins import ConsumerMixin from celery.events import get_exchange from .config import app as main_app from ..backend import SchedulerBackend -def utcnow(): - return datetime.datetime.now(tz=datetime.timezone.utc) - - # This is a simplified version of celery.events.Receiver, with a persistent # queue and acked messages, with most of the options stripped down # # The original celery.events.Receiver code is available under the following # license: # # Copyright (c) 2015-2016 Ask Solem & contributors. All rights reserved. # Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved. # Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. # All rights reserved. # # Celery is licensed under The BSD License (3 Clause, also known as # the new BSD license), whose full-text is available in the top-level # LICENSE.Celery file. class ReliableEventsReceiver(ConsumerMixin): def __init__(self, app, handlers, queue_id): self.app = app self.connection = self.app.connection().connection.client self.handlers = handlers self.queue_id = queue_id self.exchange = get_exchange(self.connection) self.queue = Queue(queue_id, exchange=self.exchange, routing_key='#', auto_delete=False, durable=True) self.accept = set([self.app.conf.CELERY_EVENT_SERIALIZER, 'json']) def process(self, type, event, message): """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') handler and handler(event, message) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], callbacks=[self.receive], no_ack=False, accept=self.accept)] def on_consume_ready(self, connection, channel, consumers, **kwargs): # When starting to consume, wakeup the workers self.app.control.broadcast('heartbeat', connection=self.connection, channel=channel) def capture(self, limit=None, timeout=None, wakeup=True): """Open up a consumer capturing events. This has to run in the main process, and it will never stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`. """ for _ in self.consume(limit=limit, timeout=timeout, wakeup=wakeup): pass def receive(self, body, message): body['local_received'] = utcnow() self.process(body['type'], body, message=message) ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) ACTION_QUEUE_MAX_LENGTH = 1000 def event_monitor(app, backend): actions = { 'last_send': utcnow() - 2*ACTION_SEND_DELAY, 'queue': [], } def try_perform_actions(actions=actions): if not actions['queue']: return if utcnow() - actions['last_send'] > ACTION_SEND_DELAY or \ len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH: perform_actions(actions) def perform_actions(actions, backend=backend): action_map = { 'start_task_run': backend.start_task_run, 'end_task_run': backend.end_task_run, } messages = [] cursor = backend.cursor() for action in actions['queue']: messages.append(action['message']) function = action_map[action['action']] args = action.get('args', ()) kwargs = action.get('kwargs', {}) kwargs['cursor'] = cursor function(*args, **kwargs) backend.commit() for message in messages: message.ack() actions['queue'] = [] actions['last_send'] = utcnow() def queue_action(action, actions=actions): actions['queue'].append(action) try_perform_actions() def catchall_event(event, message): message.ack() try_perform_actions() def task_started(event, message): queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'metadata': { 'worker': event['hostname'], }, }, 'message': message, }) def task_succeeded(event, message): status = 'uneventful' if 'True' in event['result']: status = 'eventful' queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': status, }, 'message': message, }) def task_failed(event, message): queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': 'failed', }, 'message': message, }) recv = ReliableEventsReceiver( main_app, handlers={ 'task-started': task_started, 'task-succeeded': task_succeeded, 'task-failed': task_failed, '*': catchall_event, }, queue_id='celeryev.listener-%s' % socket.gethostname(), ) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': main_backend = SchedulerBackend() event_monitor(main_app, main_backend)