diff --git a/PKG-INFO b/PKG-INFO index 33168db..91f0084 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.31 +Version: 0.0.32 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/changelog b/debian/changelog index 63e6b27..e40c02a 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,216 +1,218 @@ -swh-core (0.0.31-1~swh1~bpo9+1) stretch-swh; urgency=medium +swh-core (0.0.32-1~swh1) unstable-swh; urgency=medium - * Rebuild for stretch-backports. + * Release swh-core v0.0.32 + * Add asynchronous streaming methods for internal APIs + * Remove task arguments from systemd-journal loggers - -- Nicolas Dandrimont Fri, 07 Apr 2017 15:11:26 +0200 + -- Nicolas Dandrimont Tue, 09 May 2017 14:04:22 +0200 swh-core (0.0.31-1~swh1) unstable-swh; urgency=medium * Release swh.core v0.0.31 * Add explicit dependency on python3-systemd -- Nicolas Dandrimont Fri, 07 Apr 2017 15:11:26 +0200 swh-core (0.0.30-1~swh1) unstable-swh; urgency=medium * Release swh.core v0.0.30 * drop swh.core.hashutil (moved to swh.model.hashutil) * add a systemd logger -- Nicolas Dandrimont Fri, 07 Apr 2017 11:49:15 +0200 swh-core (0.0.29-1~swh1) unstable-swh; urgency=medium * Release swh.core v0.0.29 * Catch proper exception in the base API client -- Nicolas Dandrimont Thu, 02 Feb 2017 00:19:25 +0100 swh-core (0.0.28-1~swh1) unstable-swh; urgency=medium * v0.0.28 * Refactoring some common code into swh.core -- Antoine R. Dumont (@ardumont) Thu, 26 Jan 2017 14:54:22 +0100 swh-core (0.0.27-1~swh1) unstable-swh; urgency=medium * v0.0.27 * Fix issue with default boolean value -- Antoine R. Dumont (@ardumont) Thu, 20 Oct 2016 16:15:20 +0200 swh-core (0.0.26-1~swh1) unstable-swh; urgency=medium * Release swh.core v0.0.26 * Raise an exception when a configuration file exists and is unreadable -- Nicolas Dandrimont Wed, 12 Oct 2016 10:16:09 +0200 swh-core (0.0.25-1~swh1) unstable-swh; urgency=medium * v0.0.25 * Add new function utils.cwd -- Antoine R. Dumont (@ardumont) Thu, 29 Sep 2016 21:29:37 +0200 swh-core (0.0.24-1~swh1) unstable-swh; urgency=medium * v0.0.24 * Deal with edge case in logger regarding json -- Antoine R. Dumont (@ardumont) Thu, 22 Sep 2016 12:21:09 +0200 swh-core (0.0.23-1~swh1) unstable-swh; urgency=medium * Release swh.core v0.0.23 * Properly fix the PyYAML dependency -- Nicolas Dandrimont Tue, 23 Aug 2016 16:20:29 +0200 swh-core (0.0.22-1~swh1) unstable-swh; urgency=medium * Release swh.core v0.0.22 * Proper loading of yaml and ini files in all paths -- Nicolas Dandrimont Fri, 19 Aug 2016 15:45:55 +0200 swh-core (0.0.21-1~swh1) unstable-swh; urgency=medium * v0.0.21 * Update test tools -- Antoine R. Dumont (@ardumont) Tue, 19 Jul 2016 14:47:01 +0200 swh-core (0.0.20-1~swh1) unstable-swh; urgency=medium * Release swh.core v0.0.20 * Add some generic bytes <-> escaped unicode methods -- Nicolas Dandrimont Tue, 14 Jun 2016 16:54:41 +0200 swh-core (0.0.19-1~swh1) unstable-swh; urgency=medium * v0.0.19 * Resurrect swh.core.utils -- Antoine R. Dumont (@ardumont) Fri, 15 Apr 2016 12:40:43 +0200 swh-core (0.0.18-1~swh1) unstable-swh; urgency=medium * v0.0.18 * Add swh.core.utils * serializers: support UUIDs all around -- Antoine R. Dumont (@ardumont) Sat, 26 Mar 2016 11:16:33 +0100 swh-core (0.0.17-1~swh1) unstable-swh; urgency=medium * Release swh.core v0.0.17 * Allow serialization of UUIDs -- Nicolas Dandrimont Fri, 04 Mar 2016 11:40:56 +0100 swh-core (0.0.16-1~swh1) unstable-swh; urgency=medium * Release swh.core version 0.0.16 * add bytehex_to_hash and hash_to_bytehex in hashutil * move scheduling utilities to swh.scheduler -- Nicolas Dandrimont Fri, 19 Feb 2016 18:12:10 +0100 swh-core (0.0.15-1~swh1) unstable-swh; urgency=medium * Release v0.0.15 * Add hashutil.hash_git_object -- Nicolas Dandrimont Wed, 16 Dec 2015 16:31:26 +0100 swh-core (0.0.14-1~swh1) unstable-swh; urgency=medium * v0.0.14 * Add simple README * Update license * swh.core.hashutil.hashfile can now deal with filepath as bytes -- Antoine R. Dumont (@ardumont) Fri, 23 Oct 2015 11:13:14 +0200 swh-core (0.0.13-1~swh1) unstable-swh; urgency=medium * Prepare deployment of swh.core v0.0.13 -- Nicolas Dandrimont Fri, 09 Oct 2015 17:32:49 +0200 swh-core (0.0.12-1~swh1) unstable-swh; urgency=medium * Prepare deployment of swh.core v0.0.12 -- Nicolas Dandrimont Tue, 06 Oct 2015 17:34:34 +0200 swh-core (0.0.11-1~swh1) unstable-swh; urgency=medium * Prepare deployment of swh.core v0.0.11 -- Nicolas Dandrimont Sat, 03 Oct 2015 15:57:03 +0200 swh-core (0.0.10-1~swh1) unstable-swh; urgency=medium * Prepare deploying swh.core v0.0.10 -- Nicolas Dandrimont Sat, 03 Oct 2015 12:28:52 +0200 swh-core (0.0.9-1~swh1) unstable-swh; urgency=medium * Prepare deploying swh.core v0.0.9 -- Nicolas Dandrimont Sat, 03 Oct 2015 11:36:55 +0200 swh-core (0.0.8-1~swh1) unstable-swh; urgency=medium * Prepare deployment of swh.core v0.0.8 -- Nicolas Dandrimont Thu, 01 Oct 2015 12:31:44 +0200 swh-core (0.0.7-1~swh1) unstable-swh; urgency=medium * Prepare deployment of swh.core v0.0.7 -- Nicolas Dandrimont Thu, 01 Oct 2015 11:29:04 +0200 swh-core (0.0.6-1~swh1) unstable-swh; urgency=medium * Prepare deployment of swh.core v0.0.6 -- Nicolas Dandrimont Tue, 29 Sep 2015 16:48:44 +0200 swh-core (0.0.5-1~swh1) unstable-swh; urgency=medium * Prepare v0.0.5 deployment -- Nicolas Dandrimont Tue, 29 Sep 2015 16:08:32 +0200 swh-core (0.0.4-1~swh1) unstable-swh; urgency=medium * Tagging swh.core 0.0.4 -- Nicolas Dandrimont Fri, 25 Sep 2015 15:41:26 +0200 swh-core (0.0.3-1~swh1) unstable-swh; urgency=medium * Tag swh.core v0.0.3 -- Nicolas Dandrimont Fri, 25 Sep 2015 11:07:10 +0200 swh-core (0.0.2-1~swh1) unstable-swh; urgency=medium * Deploy v0.0.2 -- Nicolas Dandrimont Wed, 23 Sep 2015 12:08:50 +0200 swh-core (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * Tag v0.0.1 for deployment -- Nicolas Dandrimont Tue, 22 Sep 2015 14:52:26 +0200 diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index 33168db..91f0084 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.31 +Version: 0.0.32 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.core.egg-info/SOURCES.txt b/swh.core.egg-info/SOURCES.txt index 8b969ec..9dde4d0 100644 --- a/swh.core.egg-info/SOURCES.txt +++ b/swh.core.egg-info/SOURCES.txt @@ -1,24 +1,25 @@ MANIFEST.in Makefile requirements-swh.txt requirements.txt setup.py version.txt bin/swh-hashdir bin/swh-hashfile swh.core.egg-info/PKG-INFO swh.core.egg-info/SOURCES.txt swh.core.egg-info/dependency_links.txt swh.core.egg-info/requires.txt swh.core.egg-info/top_level.txt swh/core/__init__.py swh/core/api.py +swh/core/api_async.py swh/core/config.py swh/core/logger.py swh/core/serializers.py swh/core/utils.py swh/core/tests/db_testing.py swh/core/tests/test_config.py swh/core/tests/test_logger.py swh/core/tests/test_serializers.py swh/core/tests/test_utils.py \ No newline at end of file diff --git a/swh/core/api.py b/swh/core/api.py index 7491045..aeaadbf 100644 --- a/swh/core/api.py +++ b/swh/core/api.py @@ -1,108 +1,134 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import collections import json import logging import pickle import requests from flask import Flask, Request, Response from .serializers import (decode_response, encode_data_client as encode_data, msgpack_dumps, msgpack_loads, SWHJSONDecoder) +class RemoteException(Exception): + pass + + class SWHRemoteAPI: """Proxy to an internal SWH API """ def __init__(self, api_exception, url): super().__init__() self.api_exception = api_exception base_url = url if url.endswith('/') else url + '/' self.url = base_url self.session = requests.Session() def _url(self, endpoint): return '%s%s' % (self.url, endpoint) - def post(self, endpoint, data): + def raw_post(self, endpoint, data, **opts): try: - response = self.session.post( + return self.session.post( self._url(endpoint), - data=encode_data(data), - headers={'content-type': 'application/x-msgpack'}, + data=data, + **opts ) except requests.exceptions.ConnectionError as e: raise self.api_exception(e) - # XXX: this breaks language-independence and should be - # replaced by proper unserialization - if response.status_code == 400: - raise pickle.loads(decode_response(response)) - - return decode_response(response) - - def get(self, endpoint, data=None): + def raw_get(self, endpoint, params=None, **opts): try: - response = self.session.get( + return self.session.get( self._url(endpoint), - params=data, + params=params, + **opts ) except requests.exceptions.ConnectionError as e: raise self.api_exception(e) + def post(self, endpoint, data, params=None): + data = encode_data(data) + response = self.raw_post( + endpoint, data, params=params, + headers={'content-type': 'application/x-msgpack'}) + return self._decode_response(response) + + def get(self, endpoint, params=None): + response = self.raw_get(endpoint, params=params) + return self._decode_response(response) + + def post_stream(self, endpoint, data, params=None): + if not isinstance(data, collections.Iterable): + raise ValueError("`data` must be Iterable") + response = self.raw_post(endpoint, data, params=params) + return self._decode_response(response) + + def get_stream(self, endpoint, params=None, chunk_size=4096): + response = self.raw_get(endpoint, params=params, stream=True) + return response.iter_content(chunk_size) + + def _decode_response(self, response): if response.status_code == 404: return None + if response.status_code == 500: + data = decode_response(response) + if 'exception_pickled' in data: + raise pickle.loads(data['exception_pickled']) + else: + raise RemoteException(data['exception']) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) - else: return decode_response(response) class BytesRequest(Request): """Request with proper escaping of arbitrary byte sequences.""" encoding = 'utf-8' encoding_errors = 'surrogateescape' def encode_data_server(data): return Response( msgpack_dumps(data), mimetype='application/x-msgpack', ) def decode_request(request): content_type = request.mimetype data = request.get_data() if content_type == 'application/x-msgpack': r = msgpack_loads(data) elif content_type == 'application/json': r = json.loads(data, cls=SWHJSONDecoder) else: raise ValueError('Wrong content type `%s` for API request' % content_type) return r def error_handler(exception, encoder): # XXX: this breaks language-independence and should be # replaced by proper serialization of errors logging.exception(exception) response = encoder(pickle.dumps(exception)) response.status_code = 400 return response class SWHServerAPIApp(Flask): request_class = BytesRequest diff --git a/swh/core/api_async.py b/swh/core/api_async.py new file mode 100644 index 0000000..eed981e --- /dev/null +++ b/swh/core/api_async.py @@ -0,0 +1,56 @@ +import aiohttp.web +import asyncio +import json +import logging +import multidict +import pickle +import sys +import traceback + +from .serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder + + +def encode_data_server(data, **kwargs): + return aiohttp.web.Response( + body=msgpack_dumps(data), + headers=multidict.MultiDict({'Content-Type': 'application/x-msgpack'}), + **kwargs + ) + + +@asyncio.coroutine +def decode_request(request): + content_type = request.headers.get('Content-Type') + data = yield from request.read() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + return r + + +@asyncio.coroutine +def error_middleware(app, handler): + @asyncio.coroutine + def middleware_handler(request): + try: + return (yield from handler(request)) + except Exception as e: + if isinstance(e, aiohttp.web.HTTPException): + raise + logging.exception(e) + exception = traceback.format_exception(*sys.exc_info()) + res = {'exception': exception, + 'exception_pickled': pickle.dumps(e)} + return encode_data_server(res, status=500) + return middleware_handler + + +class SWHRemoteAPI(aiohttp.web.Application): + def __init__(self, *args, middlewares=(), **kwargs): + middlewares = (error_middleware,) + middlewares + super().__init__(*args, middlewares=middlewares, **kwargs) diff --git a/swh/core/logger.py b/swh/core/logger.py index ffe7b1c..30667f2 100644 --- a/swh/core/logger.py +++ b/swh/core/logger.py @@ -1,188 +1,192 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import os import socket import psycopg2 from psycopg2.extras import Json from systemd.journal import JournalHandler as _JournalHandler, send try: from celery import current_task except ImportError: current_task = None EXTRA_LOGDATA_PREFIX = 'swh_' def db_level_of_py_level(lvl): """convert a log level of the logging module to a log level suitable for the logging Postgres DB """ return logging.getLevelName(lvl).lower() -def get_extra_data(record): +def get_extra_data(record, task_args=True): """Get the extra data to insert to the database from the logging record""" log_data = record.__dict__ extra_data = {k[len(EXTRA_LOGDATA_PREFIX):]: v for k, v in log_data.items() if k.startswith(EXTRA_LOGDATA_PREFIX)} args = log_data.get('args') if args: extra_data['logging_args'] = args # Retrieve Celery task info if current_task and current_task.request: extra_data['task'] = { 'id': current_task.request.id, 'name': current_task.name, - 'kwargs': current_task.request.kwargs, - 'args': current_task.request.args, } + if task_args: + extra_data['task'].update({ + 'kwargs': current_task.request.kwargs, + 'args': current_task.request.args, + }) return extra_data def flatten(data, separator='_'): """Flatten the data dictionary into a flat structure""" def inner_flatten(data, prefix): if isinstance(data, dict): for key, value in data.items(): yield from inner_flatten(value, prefix + [key]) elif isinstance(data, (list, tuple)): for key, value in enumerate(data): yield from inner_flatten(value, prefix + [str(key)]) else: yield prefix, data for path, value in inner_flatten(data, []): yield separator.join(path), value def stringify(value): """Convert value to string""" if isinstance(value, datetime.datetime): return value.isoformat() return str(value) class PostgresHandler(logging.Handler): """log handler that store messages in a Postgres DB See swh-core/sql/log-schema.sql for the DB schema. All logging methods can be used as usual. Additionally, arbitrary metadata can be passed to logging methods, requesting that they will be stored in the DB as a single JSONB value. To do so, pass a dictionary to the 'extra' kwarg of any logging method; all keys in that dictionary that start with EXTRA_LOGDATA_PREFIX (currently: 'swh_') will be extracted to form the JSONB dictionary. The prefix will be stripped and not included in the DB. Note: the logger name will be used to fill the 'module' DB column. Sample usage: logging.basicConfig(level=logging.INFO) h = PostgresHandler('dbname=softwareheritage-log') logging.getLogger().addHandler(h) logger.info('not so important notice', extra={'swh_type': 'swh_logging_test', 'swh_meditation': 'guru'}) logger.warn('something weird just happened, did you see that?') """ def __init__(self, connstring): """ Create a Postgres log handler. Args: config: configuration dictionary, with a key "log_db" containing a libpq connection string to the log DB """ super().__init__() self.connstring = connstring self.fqdn = socket.getfqdn() # cache FQDN value def _connect(self): return psycopg2.connect(self.connstring) def emit(self, record): msg = self.format(record) extra_data = get_extra_data(record) if 'task' in extra_data: task_args = { 'args': extra_data['task']['args'], 'kwargs': extra_data['task']['kwargs'], } try: json_args = Json(task_args).getquoted() except TypeError: task_args = { 'args': [''], 'kwargs': {}, } else: json_args_length = len(json_args) if json_args_length >= 1000: task_args = { 'args': [''], 'kwargs': {}, } extra_data['task'].update(task_args) log_entry = (db_level_of_py_level(record.levelno), msg, Json(extra_data), record.name, self.fqdn, os.getpid()) db = self._connect() with db.cursor() as cur: cur.execute('INSERT INTO log ' '(level, message, data, src_module, src_host, src_pid)' 'VALUES (%s, %s, %s, %s, %s, %s)', log_entry) db.commit() db.close() class JournalHandler(_JournalHandler): def emit(self, record): """Write `record` as a journal event. MESSAGE is taken from the message provided by the user, and PRIORITY, LOGGER, THREAD_NAME, CODE_{FILE,LINE,FUNC} fields are appended automatically. In addition, record.MESSAGE_ID will be used if present. """ try: + extra_data = flatten(get_extra_data(record, task_args=False)) extra_data = { (EXTRA_LOGDATA_PREFIX + key).upper(): stringify(value) - for key, value in flatten(get_extra_data(record)) + for key, value in extra_data } msg = self.format(record) pri = self.mapPriority(record.levelno) send(msg, PRIORITY=format(pri), LOGGER=record.name, THREAD_NAME=record.threadName, CODE_FILE=record.pathname, CODE_LINE=record.lineno, CODE_FUNC=record.funcName, **extra_data) except Exception: self.handleError(record) diff --git a/version.txt b/version.txt index fb3bd40..e3c3de5 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.31-0-ga4c33d9 \ No newline at end of file +v0.0.32-0-g806f583 \ No newline at end of file