diff --git a/PKG-INFO b/PKG-INFO index 955abca..67cd4dc 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.42 +Version: 0.0.43 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everything should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 955abca..67cd4dc 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.42 +Version: 0.0.43 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everything should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index dca4a7f..98e9107 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -1,208 +1,207 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import time -import socket import sys import click from arrow import utcnow from kombu import Queue import celery from celery.events import EventReceiver class ReliableEventReceiver(EventReceiver): def __init__(self, channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix='celeryev', accept=None): super(ReliableEventReceiver, self).__init__( channel, handlers, routing_key, node_id, app, queue_prefix, accept) self.queue = Queue('.'.join([self.queue_prefix, self.node_id]), exchange=self.exchange, routing_key=self.routing_key, auto_delete=False, durable=True) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], callbacks=[self._receive], no_ack=False, accept=self.accept)] def _receive(self, bodies, message): logging.debug('## event-receiver: bodies: %s' % bodies) logging.debug('## event-receiver: message: %s' % message) if not isinstance(bodies, list): # celery<4 returned body as element bodies = [bodies] for body in bodies: type, body = self.event_from_message(body) self.process(type, body, message) def process(self, type, event, message): """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') logging.debug('## event-receiver: type: %s' % type) logging.debug('## event-receiver: event: %s' % event) logging.debug('## event-receiver: handler: %s' % handler) handler and handler(event, message) ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) ACTION_QUEUE_MAX_LENGTH = 1000 def event_monitor(app, backend): logger = logging.getLogger('swh.scheduler.listener') actions = { 'last_send': utcnow() - 2*ACTION_SEND_DELAY, 'queue': [], } def try_perform_actions(actions=actions): logger.debug('Try perform pending actions') if actions['queue'] and ( len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH or utcnow() - actions['last_send'] > ACTION_SEND_DELAY): perform_actions(actions) def perform_actions(actions, backend=backend): logger.info('Perform %s pending actions' % len(actions['queue'])) action_map = { 'start_task_run': backend.start_task_run, 'end_task_run': backend.end_task_run, } messages = [] db = backend.get_db() cursor = db.cursor(None) for action in actions['queue']: messages.append(action['message']) function = action_map[action['action']] args = action.get('args', ()) kwargs = action.get('kwargs', {}) kwargs['cur'] = cursor function(*args, **kwargs) db.conn.commit() for message in messages: if not message.acknowledged: message.ack() else: logger.info('message already acknowledged: %s', message) actions['queue'] = [] actions['last_send'] = utcnow() def queue_action(action, actions=actions): actions['queue'].append(action) try_perform_actions() def catchall_event(event, message): logger.info('event: %s, message:%s', event, message) if not message.acknowledged: message.ack() else: logger.info('message already acknowledged: %s', message) try_perform_actions() def task_started(event, message): logger.debug('task_started: event: %s' % event) logger.debug('task_started: message: %s' % message) queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'metadata': { 'worker': event['hostname'], }, }, 'message': message, }) def task_succeeded(event, message): logger.debug('task_succeeded: event: %s' % event) logger.debug('task_succeeded: message: %s' % message) result = event['result'] logger.debug('task_succeeded: result: %s' % result) try: status = result.get('status') if status == 'success': status = 'eventful' if result.get('eventful') else 'uneventful' except Exception: status = 'eventful' if result else 'uneventful' queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': status, 'result': result, }, 'message': message, }) def task_failed(event, message): logger.debug('task_failed: event: %s' % event) logger.debug('task_failed: message: %s' % message) queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': 'failed', }, 'message': message, }) recv = ReliableEventReceiver( celery.current_app.connection(), app=celery.current_app, handlers={ 'task-started': task_started, 'task-result': task_succeeded, 'task-failed': task_failed, '*': catchall_event, }, - node_id='listener-%s' % socket.gethostname(), + node_id='listener', ) errors = 0 while True: try: recv.capture(limit=None, timeout=None, wakeup=True) errors = 0 except KeyboardInterrupt: logger.exception('Keyboard interrupt, exiting') break except Exception: logger.exception('Unexpected exception') if errors < 5: time.sleep(errors) errors += 1 else: logger.error('Too many consecutive errors, exiting') sys.exit(1) @click.command() @click.pass_context def main(ctx): click.echo("Deprecated! Use 'swh-scheduler listener' instead.", err=True) ctx.exit(1) if __name__ == '__main__': main() diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index d4bdafc..9e4dc5f 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,115 +1,121 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import logging +from kombu.utils.uuid import uuid from swh.scheduler import get_scheduler, compute_nb_tasks_from logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks(backend, app): """Run tasks that are ready Args: backend (Scheduler): backend to read tasks to schedule app (App): Celery application to send tasks to Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': arrow.utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks = [] while True: task_types = {} pending_tasks = [] for task_type in backend.get_task_types(): task_type_name = task_type['type'] task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] backend_name = task_type['backend_name'] if max_queue_length: try: queue_length = app.get_queue_length(backend_name) except ValueError: queue_length = None if queue_length is None: # Running without RabbitMQ (probably a test env). num_tasks = MAX_NUM_TASKS else: num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS if num_tasks > 0: num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) grabbed_tasks = backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) logger.info('Grabbed %s tasks %s', len(grabbed_tasks), task_type_name) if not pending_tasks: return all_backend_tasks backend_tasks = [] + celery_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] backend_name = task_types[task['type']]['backend_name'] - celery_result = app.send_task( - backend_name, args=args, kwargs=kwargs, - ) + backend_id = uuid() + celery_tasks.append((backend_name, backend_id, args, kwargs)) data = { 'task': task['id'], - 'backend_id': celery_result.id, + 'backend_id': backend_id, 'scheduled': arrow.utcnow(), } backend_tasks.append(data) logger.debug('Sent %s celery tasks', len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) + for backend_name, backend_id, args, kwargs in celery_tasks: + app.send_task( + backend_name, task_id=backend_id, args=args, kwargs=kwargs, + ) + all_backend_tasks.extend(backend_tasks) def main(): from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler('local') try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == '__main__': main() diff --git a/swh/scheduler/sql/40-swh-data.sql b/swh/scheduler/sql/40-swh-data.sql index a0322c6..e44ff5c 100644 --- a/swh/scheduler/sql/40-swh-data.sql +++ b/swh/scheduler/sql/40-swh-data.sql @@ -1,286 +1,312 @@ insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-loader-mount-dump-and-load-svn-repository', 'Loading svn repositories from svn dump', 'swh.loader.svn.tasks.MountAndLoadSvnRepository', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-svn', 'Create dump of a remote svn repository, mount it and load it', 'swh.loader.svn.tasks.DumpMountAndLoadSvnRepository', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-loading', 'Loading deposit archive into swh through swh-loader-tar', 'swh.deposit.loader.tasks.LoadDepositArchiveTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-checks', 'Pre-checking deposit step before loading into swh archive', 'swh.deposit.loader.tasks.ChecksDepositTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-vault-cooking', 'Cook a Vault bundle', 'swh.vault.cooking_tasks.SWHCookingTask', '1 day', '1 day', '1 day', 1, 10000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-hg', 'Loading mercurial repository swh-loader-mercurial', 'swh.loader.mercurial.tasks.LoadMercurial', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-load-archive-hg', 'Loading archive mercurial repository swh-loader-mercurial', 'swh.loader.mercurial.tasks.LoadArchiveMercurial', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-git', 'Update an origin of type git', 'swh.loader.git.tasks.UpdateGitRepository', '64 days', '12:00:00', '64 days', 2, 5000); +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor) +values ( + 'swh-lister-bitbucket-incremental', + 'Incrementally list BitBucket', + 'swh.lister.bitbucket.tasks.IncrementalBitBucketLister', + '1 day', + '1 day', + '1 day', 1); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor) +values ( + 'swh-lister-bitbucket-full', + 'Full update of Bitbucket repos list', + 'swh.lister.bitbucket.tasks.FullBitBucketRelister', + '90 days', + '90 days', + '90 days', 1); + insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-github-incremental', 'Incrementally list GitHub', 'swh.lister.github.tasks.IncrementalGitHubLister', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-github-full', 'Full update of GitHub repos list', 'swh.lister.github.tasks.FullGitHubRelister', '90 days', '90 days', '90 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-debian', 'List a Debian distribution', 'swh.lister.debian.tasks.DebianListerTask', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-gitlab-incremental', 'Incrementally list a Gitlab instance', 'swh.lister.gitlab.tasks.IncrementalGitLabLister', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-gitlab-full', 'Full update of a Gitlab instance''s repos list', 'swh.lister.gitlab.tasks.FullGitLabRelister', '90 days', '90 days', '90 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-pypi', 'Full pypi lister', 'swh.lister.pypi.tasks.PyPIListerTask', '1 days', '1 days', '1 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-pypi', 'Load Pypi origin', 'swh.loader.pypi.tasks.LoadPyPI', '64 days', '12:00:00', '64 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_mimetype', 'Mimetype indexer task', 'swh.indexer.tasks.ContentMimetype', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_range_mimetype', 'Mimetype Range indexer task', 'swh.indexer.tasks.ContentRangeMimetype', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_fossology_license', 'Fossology license indexer task', 'swh.indexer.tasks.ContentFossologyLicense', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_range_fossology_license', 'Fossology license range indexer task', 'swh.indexer.tasks.ContentRangeFossologyLicense', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_origin_head', 'Origin Head indexer task', 'swh.indexer.tasks.OriginHead', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_revision_metadata', 'Revision Metadata indexer task', 'swh.indexer.tasks.RevisionMetadata', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_origin_metadata', 'Origin Metadata indexer task', 'swh.indexer.tasks.OriginMetadata', '1 day', '12:00:00', '1 days', 2, 20000); diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index ca65742..d2c9cf0 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,96 +1,97 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import current_app import celery.app.task from celery.utils.log import get_task_logger from swh.core.statsd import Statsd class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ _statsd = None _log = None @property def statsd(self): if self._statsd: return self._statsd worker_name = current_app.conf.get('worker_name') if worker_name: self._statsd = Statsd(constant_tags={ 'task': self.name, 'worker': worker_name, }) return self._statsd else: return Statsd(constant_tags={ 'task': self.name, 'worker': 'unknown worker', }) def __call__(self, *args, **kwargs): self.statsd.increment('swh_task_called_count') with self.statsd.timed('swh_task_duration_seconds'): return super().__call__(*args, **kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): self.statsd.increment('swh_task_failure_count') - self.send_event('task-result-exception') def on_success(self, retval, task_id, args, kwargs): self.statsd.increment('swh_task_success_count') + # this is a swh specific event. Used to attach the retval to the + # task_run self.send_event('task-result', result=retval) @property def log(self): if self._log is None: self._log = get_task_logger(self.name) return self._log def run(self, *args, **kwargs): self.log.debug('%s: args=%s, kwargs=%s', self.name, args, kwargs) ret = super().run(*args, **kwargs) self.log.debug('%s: OK => %s', self.name, ret) return ret class Task(SWHTask): """a schedulable task (abstract class) DEPRECATED! Please use SWHTask as base for decorated functions instead. Sub-classes must implement the run_task() method. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True def run(self, *args, **kwargs): """This method is called by the celery worker when a task is received. Should not be overridden as we need our special events to be sent for the reccurrent scheduler. Override run_task instead.""" return self.run_task(*args, **kwargs) def run_task(self, *args, **kwargs): """Perform the task. Must return a json-serializable value as it is passed back to the task scheduler using a celery event. """ raise NotImplementedError('tasks must implement the run_task() method') diff --git a/version.txt b/version.txt index d860e55..ed1bf66 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.42-0-g3488c26 \ No newline at end of file +v0.0.43-0-gf0a8c43 \ No newline at end of file