diff --git a/PKG-INFO b/PKG-INFO index a5e59ad..be8fa1c 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.10 +Version: 0.0.11 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index a5e59ad..be8fa1c 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.10 +Version: 0.0.11 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/core/logger.py b/swh/core/logger.py index 940d868..69000ad 100644 --- a/swh/core/logger.py +++ b/swh/core/logger.py @@ -1,87 +1,98 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import psycopg2 import socket +from celery import current_task from psycopg2.extras import Json EXTRA_LOGDATA_PREFIX = 'swh_' def db_level_of_py_level(lvl): """convert a log level of the logging module to a log level suitable for the logging Postgres DB """ return logging.getLevelName(lvl).lower() class PostgresHandler(logging.Handler): """log handler that store messages in a Postgres DB See swh-core/sql/log-schema.sql for the DB schema. All logging methods can be used as usual. Additionally, arbitrary metadata can be passed to logging methods, requesting that they will be stored in the DB as a single JSONB value. To do so, pass a dictionary to the 'extra' kwarg of any logging method; all keys in that dictionary that start with EXTRA_LOGDATA_PREFIX (currently: 'swh_') will be extracted to form the JSONB dictionary. The prefix will be stripped and not included in the DB. Note: the logger name will be used to fill the 'module' DB column. Sample usage: logging.basicConfig(level=logging.INFO) h = PostgresHandler('dbname=softwareheritage-log') logging.getLogger().addHandler(h) logger.info('not so important notice', extra={'swh_type': 'swh_logging_test', 'swh_meditation': 'guru'}) logger.warn('something weird just happened, did you see that?') """ def __init__(self, connstring): """ Create a Postgres log handler. Args: config: configuration dictionary, with a key "log_db" containing a libpq connection string to the log DB """ super().__init__() self.connstring = connstring self.fqdn = socket.getfqdn() # cache FQDN value def _connect(self): return psycopg2.connect(self.connstring) def emit(self, record): log_data = record.__dict__ msg = self.format(record) extra_data = {k[len(EXTRA_LOGDATA_PREFIX):]: v for k, v in log_data.items() if k.startswith(EXTRA_LOGDATA_PREFIX)} + + # Retrieve Celery task info + if current_task and current_task.request: + extra_data['task'] = { + 'id': current_task.request.id, + 'name': current_task.name, + 'kwargs': current_task.request.kwargs, + 'args': current_task.request.args, + } + log_entry = (db_level_of_py_level(log_data['levelno']), msg, Json(extra_data), log_data['name'], self.fqdn, os.getpid()) db = self._connect() with db.cursor() as cur: cur.execute('INSERT INTO log ' '(level, message, data, src_module, src_host, src_pid)' 'VALUES (%s, %s, %s, %s, %s, %s)', log_entry) db.commit() db.close() diff --git a/swh/core/worker.py b/swh/core/worker.py index 733c259..3f0dcdc 100644 --- a/swh/core/worker.py +++ b/swh/core/worker.py @@ -1,137 +1,139 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from celery import Celery from celery.signals import setup_logging from celery.utils.log import ColorFormatter from kombu import Exchange, Queue from swh.core.config import load_named_config from swh.core.logger import PostgresHandler CONFIG_NAME = 'worker.ini' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG formatter = logging.Formatter(format) if colorize: color_formatter = ColorFormatter(format) else: color_formatter = formatter root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) pg = PostgresHandler(CONFIG['log_db']) pg.setFormatter(logging.Formatter(format)) pg.setLevel(logging.DEBUG) pg.setFormatter(formatter) root_logger.addHandler(console) root_logger.addHandler(pg) celery_logger = logging.getLogger('celery') celery_logger.setLevel(logging.INFO) # Silence useless "Starting new HTTP connection" messages urllib3_logger = logging.getLogger('urllib3') - urllib3_logger.setLevel(logging.CRITICAL) + urllib3_logger.setLevel(logging.WARNING) swh_logger = logging.getLogger('swh') swh_logger.setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task celery_task_logger = logging.getLogger('celery.task') celery_task_logger.setLevel(loglevel) # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) # Instantiate the Celery app app = Celery() app.conf.update( # The broker BROKER_URL=CONFIG['task_broker'], # Timezone configuration: all in UTC CELERY_ENABLE_UTC=True, CELERY_TIMEZONE='UTC', # Imported modules CELERY_IMPORTS=CONFIG['task_modules'], # Time (in seconds, or a timedelta object) for when after stored task - # tombstones will be deleted. - CELERY_TASK_RESULT_EXPIRES=3600, + # tombstones will be deleted. None means to never expire results. + CELERY_TASK_RESULT_EXPIRES=None, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['msgpack', 'pickle', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. # The task can catch this to e.g. clean up before the hard time limit # comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONFIG['task_soft_time_limit'], # Task routing CELERY_ROUTES={ 'swh.loader.git.tasks.LoadGitRepository': { 'queue': 'swh_loader_git', }, 'swh.loader.git.tasks.LoadGitHubRepository': { 'queue': 'swh_loader_git', }, 'swh.cloner.git.worker.tasks.execute_with_measure': { 'queue': 'swh_cloner_git', }, }, # Task queues this worker will consume from CELERY_QUEUES=CELERY_QUEUES, # Allow pool restarts from remote CELERYD_POOL_RESTARTS=True, + # Do not prefetch tasks + CELERYD_PREFETCH_MULTIPLIER=1, ) diff --git a/version.txt b/version.txt index ad9ea70..a2c51d5 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.10-0-g9d565f0 \ No newline at end of file +v0.0.11-0-g5c25456 \ No newline at end of file