diff --git a/requirements.txt b/requirements.txt index 5da06d5..66f2da5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ -celery msgpack-python psycopg2 python-dateutil vcversioner diff --git a/swh/core/scheduling.py b/swh/core/scheduling.py deleted file mode 100644 index a7aedde..0000000 --- a/swh/core/scheduling.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import celery -from celery.utils.log import get_task_logger - - -class Task(celery.Task): - """a schedulable task (abstract class) - - Sub-classes must implement the run() method. Sub-classes that - want their tasks to get routed to a non-default task queue must - override the task_queue attribute. - - Current implementation is based on Celery. See - http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for - how to use tasks once instantiated - - """ - - abstract = True - task_queue = 'celery' - - def run(self, *args, **kwargs): - raise NotImplementedError('tasks must implement the run() method') - - @property - def log(self): - if not hasattr(self, '__log'): - self.__log = get_task_logger('%s.%s' % - (__name__, self.__class__.__name__)) - return self.__log diff --git a/swh/core/worker.py b/swh/core/worker.py deleted file mode 100644 index d5de51d..0000000 --- a/swh/core/worker.py +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging - -from celery import Celery -from celery.signals import setup_logging -from celery.utils.log import ColorFormatter -from kombu import Exchange, Queue - -from swh.core.config import load_named_config -from swh.core.logger import PostgresHandler - -CONFIG_NAME = 'worker.ini' -DEFAULT_CONFIG = { - 'task_broker': ('str', 'amqp://guest@localhost//'), - 'task_modules': ('list[str]', []), - 'task_queues': ('list[str]', []), - 'task_soft_time_limit': ('int', 0), -} - - -@setup_logging.connect -def setup_log_handler(loglevel=None, logfile=None, format=None, - colorize=None, **kwargs): - """Setup logging according to Software Heritage preferences. - - We use the command-line loglevel for tasks only, as we never - really care about the debug messages from celery. - """ - - if loglevel is None: - loglevel = logging.DEBUG - - formatter = logging.Formatter(format) - if colorize: - color_formatter = ColorFormatter(format) - else: - color_formatter = formatter - - root_logger = logging.getLogger('') - root_logger.setLevel(logging.INFO) - - console = logging.StreamHandler() - console.setLevel(logging.DEBUG) - console.setFormatter(color_formatter) - - pg = PostgresHandler(CONFIG['log_db']) - pg.setFormatter(logging.Formatter(format)) - pg.setLevel(logging.DEBUG) - pg.setFormatter(formatter) - - root_logger.addHandler(console) - root_logger.addHandler(pg) - - celery_logger = logging.getLogger('celery') - celery_logger.setLevel(logging.INFO) - - # Silence useless "Starting new HTTP connection" messages - urllib3_logger = logging.getLogger('urllib3') - urllib3_logger.setLevel(logging.WARNING) - - swh_logger = logging.getLogger('swh') - swh_logger.setLevel(loglevel) - - # get_task_logger makes the swh tasks loggers children of celery.task - celery_task_logger = logging.getLogger('celery.task') - celery_task_logger.setLevel(loglevel) - - -class TaskRouter: - """Route tasks according to the task_queue attribute in the task class""" - def route_for_task(self, task, args=None, kwargs=None): - task_class = app.tasks[task] - if hasattr(task_class, 'task_queue'): - return {'queue': task_class.task_queue} - return None - - -# Load the Celery config -CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) - -# Celery Queues -CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] - -for queue in CONFIG['task_queues']: - CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) - -# Instantiate the Celery app -app = Celery() -app.conf.update( - # The broker - BROKER_URL=CONFIG['task_broker'], - # Timezone configuration: all in UTC - CELERY_ENABLE_UTC=True, - CELERY_TIMEZONE='UTC', - # Imported modules - CELERY_IMPORTS=CONFIG['task_modules'], - # Time (in seconds, or a timedelta object) for when after stored task - # tombstones will be deleted. None means to never expire results. - CELERY_TASK_RESULT_EXPIRES=None, - # Late ack means the task messages will be acknowledged after the task has - # been executed, not just before, which is the default behavior. - CELERY_ACKS_LATE=True, - # A string identifying the default serialization method to use. - # Can be pickle (default), json, yaml, msgpack or any custom serialization - # methods that have been registered with kombu.serialization.registry - CELERY_ACCEPT_CONTENT=['msgpack', 'pickle', 'json'], - # If True the task will report its status as “started” - # when the task is executed by a worker. - CELERY_TRACK_STARTED=True, - # Default compression used for task messages. Can be gzip, bzip2 - # (if available), or any custom compression schemes registered - # in the Kombu compression registry. - # CELERY_MESSAGE_COMPRESSION='bzip2', - # Disable all rate limits, even if tasks has explicit rate limits set. - # (Disabling rate limits altogether is recommended if you don’t have any - # tasks using them.) - CELERY_DISABLE_RATE_LIMITS=True, - # Task hard time limit in seconds. The worker processing the task will be - # killed and replaced with a new one when this is exceeded. - # CELERYD_TASK_TIME_LIMIT=3600, - # Task soft time limit in seconds. - # The SoftTimeLimitExceeded exception will be raised when this is exceeded. - # The task can catch this to e.g. clean up before the hard time limit - # comes. - CELERYD_TASK_SOFT_TIME_LIMIT=CONFIG['task_soft_time_limit'], - # Task routing - CELERY_ROUTES=TaskRouter(), - # Task queues this worker will consume from - CELERY_QUEUES=CELERY_QUEUES, - # Allow pool restarts from remote - CELERYD_POOL_RESTARTS=True, - # Do not prefetch tasks - CELERYD_PREFETCH_MULTIPLIER=1, -)