diff --git a/PKG-INFO b/PKG-INFO index 33168db..91f0084 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.31 +Version: 0.0.32 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index 33168db..91f0084 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.31 +Version: 0.0.32 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.core.egg-info/SOURCES.txt b/swh.core.egg-info/SOURCES.txt index 8b969ec..9dde4d0 100644 --- a/swh.core.egg-info/SOURCES.txt +++ b/swh.core.egg-info/SOURCES.txt @@ -1,24 +1,25 @@ MANIFEST.in Makefile requirements-swh.txt requirements.txt setup.py version.txt bin/swh-hashdir bin/swh-hashfile swh.core.egg-info/PKG-INFO swh.core.egg-info/SOURCES.txt swh.core.egg-info/dependency_links.txt swh.core.egg-info/requires.txt swh.core.egg-info/top_level.txt swh/core/__init__.py swh/core/api.py +swh/core/api_async.py swh/core/config.py swh/core/logger.py swh/core/serializers.py swh/core/utils.py swh/core/tests/db_testing.py swh/core/tests/test_config.py swh/core/tests/test_logger.py swh/core/tests/test_serializers.py swh/core/tests/test_utils.py \ No newline at end of file diff --git a/swh/core/api.py b/swh/core/api.py index 7491045..aeaadbf 100644 --- a/swh/core/api.py +++ b/swh/core/api.py @@ -1,108 +1,134 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import collections import json import logging import pickle import requests from flask import Flask, Request, Response from .serializers import (decode_response, encode_data_client as encode_data, msgpack_dumps, msgpack_loads, SWHJSONDecoder) +class RemoteException(Exception): + pass + + class SWHRemoteAPI: """Proxy to an internal SWH API """ def __init__(self, api_exception, url): super().__init__() self.api_exception = api_exception base_url = url if url.endswith('/') else url + '/' self.url = base_url self.session = requests.Session() def _url(self, endpoint): return '%s%s' % (self.url, endpoint) - def post(self, endpoint, data): + def raw_post(self, endpoint, data, **opts): try: - response = self.session.post( + return self.session.post( self._url(endpoint), - data=encode_data(data), - headers={'content-type': 'application/x-msgpack'}, + data=data, + **opts ) except requests.exceptions.ConnectionError as e: raise self.api_exception(e) - # XXX: this breaks language-independence and should be - # replaced by proper unserialization - if response.status_code == 400: - raise pickle.loads(decode_response(response)) - - return decode_response(response) - - def get(self, endpoint, data=None): + def raw_get(self, endpoint, params=None, **opts): try: - response = self.session.get( + return self.session.get( self._url(endpoint), - params=data, + params=params, + **opts ) except requests.exceptions.ConnectionError as e: raise self.api_exception(e) + def post(self, endpoint, data, params=None): + data = encode_data(data) + response = self.raw_post( + endpoint, data, params=params, + headers={'content-type': 'application/x-msgpack'}) + return self._decode_response(response) + + def get(self, endpoint, params=None): + response = self.raw_get(endpoint, params=params) + return self._decode_response(response) + + def post_stream(self, endpoint, data, params=None): + if not isinstance(data, collections.Iterable): + raise ValueError("`data` must be Iterable") + response = self.raw_post(endpoint, data, params=params) + return self._decode_response(response) + + def get_stream(self, endpoint, params=None, chunk_size=4096): + response = self.raw_get(endpoint, params=params, stream=True) + return response.iter_content(chunk_size) + + def _decode_response(self, response): if response.status_code == 404: return None + if response.status_code == 500: + data = decode_response(response) + if 'exception_pickled' in data: + raise pickle.loads(data['exception_pickled']) + else: + raise RemoteException(data['exception']) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) - else: return decode_response(response) class BytesRequest(Request): """Request with proper escaping of arbitrary byte sequences.""" encoding = 'utf-8' encoding_errors = 'surrogateescape' def encode_data_server(data): return Response( msgpack_dumps(data), mimetype='application/x-msgpack', ) def decode_request(request): content_type = request.mimetype data = request.get_data() if content_type == 'application/x-msgpack': r = msgpack_loads(data) elif content_type == 'application/json': r = json.loads(data, cls=SWHJSONDecoder) else: raise ValueError('Wrong content type `%s` for API request' % content_type) return r def error_handler(exception, encoder): # XXX: this breaks language-independence and should be # replaced by proper serialization of errors logging.exception(exception) response = encoder(pickle.dumps(exception)) response.status_code = 400 return response class SWHServerAPIApp(Flask): request_class = BytesRequest diff --git a/swh/core/api_async.py b/swh/core/api_async.py new file mode 100644 index 0000000..eed981e --- /dev/null +++ b/swh/core/api_async.py @@ -0,0 +1,56 @@ +import aiohttp.web +import asyncio +import json +import logging +import multidict +import pickle +import sys +import traceback + +from .serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder + + +def encode_data_server(data, **kwargs): + return aiohttp.web.Response( + body=msgpack_dumps(data), + headers=multidict.MultiDict({'Content-Type': 'application/x-msgpack'}), + **kwargs + ) + + +@asyncio.coroutine +def decode_request(request): + content_type = request.headers.get('Content-Type') + data = yield from request.read() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + return r + + +@asyncio.coroutine +def error_middleware(app, handler): + @asyncio.coroutine + def middleware_handler(request): + try: + return (yield from handler(request)) + except Exception as e: + if isinstance(e, aiohttp.web.HTTPException): + raise + logging.exception(e) + exception = traceback.format_exception(*sys.exc_info()) + res = {'exception': exception, + 'exception_pickled': pickle.dumps(e)} + return encode_data_server(res, status=500) + return middleware_handler + + +class SWHRemoteAPI(aiohttp.web.Application): + def __init__(self, *args, middlewares=(), **kwargs): + middlewares = (error_middleware,) + middlewares + super().__init__(*args, middlewares=middlewares, **kwargs) diff --git a/swh/core/logger.py b/swh/core/logger.py index ffe7b1c..30667f2 100644 --- a/swh/core/logger.py +++ b/swh/core/logger.py @@ -1,188 +1,192 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import os import socket import psycopg2 from psycopg2.extras import Json from systemd.journal import JournalHandler as _JournalHandler, send try: from celery import current_task except ImportError: current_task = None EXTRA_LOGDATA_PREFIX = 'swh_' def db_level_of_py_level(lvl): """convert a log level of the logging module to a log level suitable for the logging Postgres DB """ return logging.getLevelName(lvl).lower() -def get_extra_data(record): +def get_extra_data(record, task_args=True): """Get the extra data to insert to the database from the logging record""" log_data = record.__dict__ extra_data = {k[len(EXTRA_LOGDATA_PREFIX):]: v for k, v in log_data.items() if k.startswith(EXTRA_LOGDATA_PREFIX)} args = log_data.get('args') if args: extra_data['logging_args'] = args # Retrieve Celery task info if current_task and current_task.request: extra_data['task'] = { 'id': current_task.request.id, 'name': current_task.name, - 'kwargs': current_task.request.kwargs, - 'args': current_task.request.args, } + if task_args: + extra_data['task'].update({ + 'kwargs': current_task.request.kwargs, + 'args': current_task.request.args, + }) return extra_data def flatten(data, separator='_'): """Flatten the data dictionary into a flat structure""" def inner_flatten(data, prefix): if isinstance(data, dict): for key, value in data.items(): yield from inner_flatten(value, prefix + [key]) elif isinstance(data, (list, tuple)): for key, value in enumerate(data): yield from inner_flatten(value, prefix + [str(key)]) else: yield prefix, data for path, value in inner_flatten(data, []): yield separator.join(path), value def stringify(value): """Convert value to string""" if isinstance(value, datetime.datetime): return value.isoformat() return str(value) class PostgresHandler(logging.Handler): """log handler that store messages in a Postgres DB See swh-core/sql/log-schema.sql for the DB schema. All logging methods can be used as usual. Additionally, arbitrary metadata can be passed to logging methods, requesting that they will be stored in the DB as a single JSONB value. To do so, pass a dictionary to the 'extra' kwarg of any logging method; all keys in that dictionary that start with EXTRA_LOGDATA_PREFIX (currently: 'swh_') will be extracted to form the JSONB dictionary. The prefix will be stripped and not included in the DB. Note: the logger name will be used to fill the 'module' DB column. Sample usage: logging.basicConfig(level=logging.INFO) h = PostgresHandler('dbname=softwareheritage-log') logging.getLogger().addHandler(h) logger.info('not so important notice', extra={'swh_type': 'swh_logging_test', 'swh_meditation': 'guru'}) logger.warn('something weird just happened, did you see that?') """ def __init__(self, connstring): """ Create a Postgres log handler. Args: config: configuration dictionary, with a key "log_db" containing a libpq connection string to the log DB """ super().__init__() self.connstring = connstring self.fqdn = socket.getfqdn() # cache FQDN value def _connect(self): return psycopg2.connect(self.connstring) def emit(self, record): msg = self.format(record) extra_data = get_extra_data(record) if 'task' in extra_data: task_args = { 'args': extra_data['task']['args'], 'kwargs': extra_data['task']['kwargs'], } try: json_args = Json(task_args).getquoted() except TypeError: task_args = { 'args': [''], 'kwargs': {}, } else: json_args_length = len(json_args) if json_args_length >= 1000: task_args = { 'args': [''], 'kwargs': {}, } extra_data['task'].update(task_args) log_entry = (db_level_of_py_level(record.levelno), msg, Json(extra_data), record.name, self.fqdn, os.getpid()) db = self._connect() with db.cursor() as cur: cur.execute('INSERT INTO log ' '(level, message, data, src_module, src_host, src_pid)' 'VALUES (%s, %s, %s, %s, %s, %s)', log_entry) db.commit() db.close() class JournalHandler(_JournalHandler): def emit(self, record): """Write `record` as a journal event. MESSAGE is taken from the message provided by the user, and PRIORITY, LOGGER, THREAD_NAME, CODE_{FILE,LINE,FUNC} fields are appended automatically. In addition, record.MESSAGE_ID will be used if present. """ try: + extra_data = flatten(get_extra_data(record, task_args=False)) extra_data = { (EXTRA_LOGDATA_PREFIX + key).upper(): stringify(value) - for key, value in flatten(get_extra_data(record)) + for key, value in extra_data } msg = self.format(record) pri = self.mapPriority(record.levelno) send(msg, PRIORITY=format(pri), LOGGER=record.name, THREAD_NAME=record.threadName, CODE_FILE=record.pathname, CODE_LINE=record.lineno, CODE_FUNC=record.funcName, **extra_data) except Exception: self.handleError(record) diff --git a/version.txt b/version.txt index fb3bd40..e3c3de5 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.31-0-ga4c33d9 \ No newline at end of file +v0.0.32-0-g806f583 \ No newline at end of file