diff --git a/PKG-INFO b/PKG-INFO index d445425..c46f142 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,28 +1,30 @@ Metadata-Version: 2.1 Name: swh.core -Version: 0.0.59 +Version: 0.0.60 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-core Description: swh-core ======== core library for swh's modules: - config parser - hash computations - serialization - logging mechanism Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown +Provides-Extra: db Provides-Extra: testing +Provides-Extra: http diff --git a/requirements.txt b/requirements.txt index e25273a..cd36a30 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,13 @@ +PyYAML +systemd-python + +# these deps below are now handled in dedicated 'extras' and should be removed +# from this main requirement file ASAP arrow aiohttp msgpack > 0.5 psycopg2 python-dateutil -vcversioner -PyYAML requests Flask -systemd-python decorator diff --git a/setup.py b/setup.py index 25fc474..dd9de8a 100755 --- a/setup.py +++ b/setup.py @@ -1,69 +1,78 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, 'README.md'), encoding='utf-8') as f: long_description = f.read() -def parse_requirements(name=None): - if name: - reqf = 'requirements-%s.txt' % name - else: - reqf = 'requirements.txt' - +def parse_requirements(*names): requirements = [] - if not os.path.exists(reqf): - return requirements + for name in names: + if name: + reqf = 'requirements-%s.txt' % name + else: + reqf = 'requirements.txt' + + if not os.path.exists(reqf): + return requirements - with open(reqf) as f: - for line in f.readlines(): - line = line.strip() - if not line or line.startswith('#'): - continue - requirements.append(line) + with open(reqf) as f: + for line in f.readlines(): + line = line.strip() + if not line or line.startswith('#'): + continue + requirements.append(line) return requirements setup( name='swh.core', description='Software Heritage core utilities', long_description=long_description, long_description_content_type='text/markdown', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DCORE/', packages=find_packages(), scripts=[], - install_requires=parse_requirements() + parse_requirements('swh'), + install_requires=parse_requirements(None, 'swh'), setup_requires=['vcversioner'], - extras_require={'testing': parse_requirements('test')}, + extras_require={ + 'testing': parse_requirements('test', 'db', 'http'), + 'db': parse_requirements('db'), + 'http': parse_requirements('http'), + }, vcversioner={}, include_package_data=True, - entry_points={ - 'console_scripts': ['swh-db-init=swh.core.cli:db_init'], - }, + entry_points=''' + [console_scripts] + swh=swh.core.cli:main + swh-db-init=swh.core.cli.db:db_init + [swh.cli.subcommands] + db-init=swh.core.cli.db:db_init + ''', classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', 'Funding': 'https://www.softwareheritage.org/donate', 'Source': 'https://forge.softwareheritage.org/source/swh-core', }, ) diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index d445425..c46f142 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,28 +1,30 @@ Metadata-Version: 2.1 Name: swh.core -Version: 0.0.59 +Version: 0.0.60 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-core Description: swh-core ======== core library for swh's modules: - config parser - hash computations - serialization - logging mechanism Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown +Provides-Extra: db Provides-Extra: testing +Provides-Extra: http diff --git a/swh.core.egg-info/SOURCES.txt b/swh.core.egg-info/SOURCES.txt index 08bd88a..8590f71 100644 --- a/swh.core.egg-info/SOURCES.txt +++ b/swh.core.egg-info/SOURCES.txt @@ -1,41 +1,41 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt swh/__init__.py swh.core.egg-info/PKG-INFO swh.core.egg-info/SOURCES.txt swh.core.egg-info/dependency_links.txt swh.core.egg-info/entry_points.txt swh.core.egg-info/requires.txt swh.core.egg-info/top_level.txt swh/core/__init__.py swh/core/api_async.py -swh/core/cli.py swh/core/config.py swh/core/logger.py swh/core/statsd.py swh/core/tarball.py swh/core/utils.py swh/core/api/__init__.py swh/core/api/asynchronous.py swh/core/api/negotiation.py swh/core/api/serializers.py +swh/core/cli/__init__.py +swh/core/cli/db.py swh/core/db/__init__.py swh/core/db/common.py swh/core/db/db_utils.py swh/core/sql/log-schema.sql swh/core/tests/__init__.py swh/core/tests/conftest.py swh/core/tests/db_testing.py swh/core/tests/server_testing.py swh/core/tests/test_api.py swh/core/tests/test_config.py swh/core/tests/test_db.py -swh/core/tests/test_logger.py swh/core/tests/test_serializers.py swh/core/tests/test_statsd.py swh/core/tests/test_utils.py \ No newline at end of file diff --git a/swh.core.egg-info/entry_points.txt b/swh.core.egg-info/entry_points.txt index 6b673cb..06d08b2 100644 --- a/swh.core.egg-info/entry_points.txt +++ b/swh.core.egg-info/entry_points.txt @@ -1,3 +1,7 @@ -[console_scripts] -swh-db-init = swh.core.cli:db_init + [console_scripts] + swh=swh.core.cli:main + swh-db-init=swh.core.cli.db:db_init + [swh.cli.subcommands] + db-init=swh.core.cli.db:db_init + \ No newline at end of file diff --git a/swh.core.egg-info/requires.txt b/swh.core.egg-info/requires.txt index fe35cfa..1614931 100644 --- a/swh.core.egg-info/requires.txt +++ b/swh.core.egg-info/requires.txt @@ -1,17 +1,36 @@ +PyYAML +systemd-python arrow aiohttp msgpack>0.5 psycopg2 python-dateutil -vcversioner -PyYAML requests Flask -systemd-python decorator +[db] +psycopg2 + +[http] +aiohttp +arrow +decorator +Flask +msgpack>0.5 +python-dateutil +requests + [testing] pytest<4 pytest-postgresql requests-mock hypothesis>=3.11.0 +psycopg2 +aiohttp +arrow +decorator +Flask +msgpack>0.5 +python-dateutil +requests diff --git a/swh/core/api/asynchronous.py b/swh/core/api/asynchronous.py index 967c994..02085b1 100644 --- a/swh/core/api/asynchronous.py +++ b/swh/core/api/asynchronous.py @@ -1,53 +1,54 @@ -import aiohttp.web import json import logging -import multidict import pickle import sys import traceback +import aiohttp.web +import multidict + from .serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder def encode_data_server(data, **kwargs): return aiohttp.web.Response( body=msgpack_dumps(data), headers=multidict.MultiDict({'Content-Type': 'application/x-msgpack'}), **kwargs ) async def decode_request(request): content_type = request.headers.get('Content-Type') data = await request.read() if not data: return {} if content_type == 'application/x-msgpack': r = msgpack_loads(data) elif content_type == 'application/json': r = json.loads(data, cls=SWHJSONDecoder) else: raise ValueError('Wrong content type `%s` for API request' % content_type) return r async def error_middleware(app, handler): async def middleware_handler(request): try: return (await handler(request)) except Exception as e: if isinstance(e, aiohttp.web.HTTPException): raise logging.exception(e) exception = traceback.format_exception(*sys.exc_info()) res = {'exception': exception, 'exception_pickled': pickle.dumps(e)} return encode_data_server(res, status=500) return middleware_handler class SWHRemoteAPI(aiohttp.web.Application): def __init__(self, *args, middlewares=(), **kwargs): middlewares = (error_middleware,) + middlewares super().__init__(*args, middlewares=middlewares, **kwargs) diff --git a/swh/core/api/negotiation.py b/swh/core/api/negotiation.py index 3fce1b6..91d658d 100644 --- a/swh/core/api/negotiation.py +++ b/swh/core/api/negotiation.py @@ -1,152 +1,152 @@ # This code is a partial and adapted copy of # https://github.com/nickstenning/negotiate # # Copyright 2012-2013 Nick Stenning # 2019 The Software Heritage developers # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # from collections import defaultdict -from decorator import decorator - from inspect import getcallargs +from decorator import decorator + class FormatterNotFound(Exception): pass class Formatter: format = None mimetypes = [] def __init__(self, request_mimetype=None): if request_mimetype is None or request_mimetype not in self.mimetypes: try: self.response_mimetype = self.mimetypes[0] except IndexError: raise NotImplementedError( "%s.mimetypes should be a non-empty list" % self.__class__.__name__) else: self.response_mimetype = request_mimetype def configure(self): pass def render(self, obj): raise NotImplementedError( "render() should be implemented by Formatter subclasses") def __call__(self, obj): return self._make_response( self.render(obj), content_type=self.response_mimetype) def _make_response(self, body, content_type): raise NotImplementedError( "_make_response() should be implemented by " "framework-specific subclasses of Formatter" ) class Negotiator: def __init__(self, func): self.func = func self._formatters = [] self._formatters_by_format = defaultdict(list) self._formatters_by_mimetype = defaultdict(list) def __call__(self, *args, **kwargs): result = self.func(*args, **kwargs) format = getcallargs(self.func, *args, **kwargs).get('format') mimetype = self.best_mimetype() try: formatter = self.get_formatter(format, mimetype) except FormatterNotFound as e: return self._abort(404, str(e)) return formatter(result) def register_formatter(self, formatter, *args, **kwargs): self._formatters.append(formatter) self._formatters_by_format[formatter.format].append( (formatter, args, kwargs)) for mimetype in formatter.mimetypes: self._formatters_by_mimetype[mimetype].append( (formatter, args, kwargs)) def get_formatter(self, format=None, mimetype=None): if format is None and mimetype is None: raise TypeError( "get_formatter expects one of the 'format' or 'mimetype' " "kwargs to be set") if format is not None: try: # the first added will be the most specific formatter_cls, args, kwargs = ( self._formatters_by_format[format][0]) except IndexError: raise FormatterNotFound( "Formatter for format '%s' not found!" % format) elif mimetype is not None: try: # the first added will be the most specific formatter_cls, args, kwargs = ( self._formatters_by_mimetype[mimetype][0]) except IndexError: raise FormatterNotFound( "Formatter for mimetype '%s' not found!" % mimetype) formatter = formatter_cls(request_mimetype=mimetype) formatter.configure(*args, **kwargs) return formatter @property def accept_mimetypes(self): return [m for f in self._formatters for m in f.mimetypes] def best_mimetype(self): raise NotImplementedError( "best_mimetype() should be implemented in " "framework-specific subclasses of Negotiator" ) def _abort(self, status_code, err=None): raise NotImplementedError( "_abort() should be implemented in framework-specific " "subclasses of Negotiator" ) def negotiate(negotiator_cls, formatter_cls, *args, **kwargs): def _negotiate(f, *args, **kwargs): return f.negotiator(*args, **kwargs) def decorate(f): if not hasattr(f, 'negotiator'): f.negotiator = negotiator_cls(f) f.negotiator.register_formatter(formatter_cls, *args, **kwargs) return decorator(_negotiate, f) return decorate diff --git a/swh/core/api/serializers.py b/swh/core/api/serializers.py index cc634f6..1b33c59 100644 --- a/swh/core/api/serializers.py +++ b/swh/core/api/serializers.py @@ -1,191 +1,191 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import arrow import base64 import datetime from json import JSONDecoder, JSONEncoder import types from uuid import UUID +import arrow import dateutil.parser import msgpack def encode_data_client(data): try: return msgpack_dumps(data) except OverflowError as e: raise ValueError('Limits were reached. Please, check your input.\n' + str(e)) def decode_response(response): content_type = response.headers['content-type'] if content_type.startswith('application/x-msgpack'): r = msgpack_loads(response.content) elif content_type.startswith('application/json'): r = response.json(cls=SWHJSONDecoder) else: raise ValueError('Wrong content type `%s` for API response' % content_type) return r class SWHJSONEncoder(JSONEncoder): """JSON encoder for data structures generated by Software Heritage. This JSON encoder extends the default Python JSON encoder and adds awareness for the following specific types: - bytes (get encoded as a Base85 string); - datetime.datetime (get encoded as an ISO8601 string). Non-standard types get encoded as a a dictionary with two keys: - swhtype with value 'bytes' or 'datetime'; - d containing the encoded value. SWHJSONEncoder also encodes arbitrary iterables as a list (allowing serialization of generators). Caveats: Limitations in the JSONEncoder extension mechanism prevent us from "escaping" dictionaries that only contain the swhtype and d keys, and therefore arbitrary data structures can't be round-tripped through SWHJSONEncoder and SWHJSONDecoder. """ def default(self, o): if isinstance(o, bytes): return { 'swhtype': 'bytes', 'd': base64.b85encode(o).decode('ascii'), } elif isinstance(o, datetime.datetime): return { 'swhtype': 'datetime', 'd': o.isoformat(), } elif isinstance(o, UUID): return { 'swhtype': 'uuid', 'd': str(o), } elif isinstance(o, datetime.timedelta): return { 'swhtype': 'timedelta', 'd': { 'days': o.days, 'seconds': o.seconds, 'microseconds': o.microseconds, }, } elif isinstance(o, arrow.Arrow): return { 'swhtype': 'arrow', 'd': o.isoformat(), } try: return super().default(o) except TypeError as e: try: iterable = iter(o) except TypeError: raise e from None else: return list(iterable) class SWHJSONDecoder(JSONDecoder): """JSON decoder for data structures encoded with SWHJSONEncoder. This JSON decoder extends the default Python JSON decoder, allowing the decoding of: - bytes (encoded as a Base85 string); - datetime.datetime (encoded as an ISO8601 string). Non-standard types must be encoded as a a dictionary with exactly two keys: - swhtype with value 'bytes' or 'datetime'; - d containing the encoded value. To limit the impact our encoding, if the swhtype key doesn't contain a known value, the dictionary is decoded as-is. """ def decode_data(self, o): if isinstance(o, dict): if set(o.keys()) == {'d', 'swhtype'}: datatype = o['swhtype'] if datatype == 'bytes': return base64.b85decode(o['d']) elif datatype == 'datetime': return dateutil.parser.parse(o['d']) elif datatype == 'uuid': return UUID(o['d']) elif datatype == 'timedelta': return datetime.timedelta(**o['d']) elif datatype == 'arrow': return arrow.get(o['d']) return {key: self.decode_data(value) for key, value in o.items()} if isinstance(o, list): return [self.decode_data(value) for value in o] else: return o def raw_decode(self, s, idx=0): data, index = super().raw_decode(s, idx) return self.decode_data(data), index def msgpack_dumps(data): """Write data as a msgpack stream""" def encode_types(obj): if isinstance(obj, datetime.datetime): return {b'__datetime__': True, b's': obj.isoformat()} if isinstance(obj, types.GeneratorType): return list(obj) if isinstance(obj, UUID): return {b'__uuid__': True, b's': str(obj)} if isinstance(obj, datetime.timedelta): return { b'__timedelta__': True, b's': { 'days': obj.days, 'seconds': obj.seconds, 'microseconds': obj.microseconds, }, } if isinstance(obj, arrow.Arrow): return {b'__arrow__': True, b's': obj.isoformat()} return obj return msgpack.packb(data, use_bin_type=True, default=encode_types) def msgpack_loads(data): """Read data as a msgpack stream""" def decode_types(obj): if b'__datetime__' in obj and obj[b'__datetime__']: return dateutil.parser.parse(obj[b's']) if b'__uuid__' in obj and obj[b'__uuid__']: return UUID(obj[b's']) if b'__timedelta__' in obj and obj[b'__timedelta__']: return datetime.timedelta(**obj[b's']) if b'__arrow__' in obj and obj[b'__arrow__']: return arrow.get(obj[b's']) return obj try: return msgpack.unpackb(data, raw=False, object_hook=decode_types) except TypeError: # msgpack < 0.5.2 return msgpack.unpackb(data, encoding='utf-8', object_hook=decode_types) diff --git a/swh/core/cli/__init__.py b/swh/core/cli/__init__.py new file mode 100644 index 0000000..de82173 --- /dev/null +++ b/swh/core/cli/__init__.py @@ -0,0 +1,40 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging +import pkg_resources + +logger = logging.getLogger(__name__) + + +CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) + + +@click.group(context_settings=CONTEXT_SETTINGS) +@click.option('--log-level', '-l', default='INFO', + type=click.Choice(logging._nameToLevel.keys()), + help="Log level (default to INFO)") +@click.pass_context +def swh(ctx, log_level): + """Software Heritage Tool + """ + logger.setLevel(log_level) + ctx.ensure_object(dict) + ctx.obj['log_level'] = logging._nameToLevel[log_level] + + +def main(): + logging.basicConfig() + # load plugins that define cli sub commands + for entry_point in pkg_resources.iter_entry_points('swh.cli.subcommands'): + cmd = entry_point.load() + swh.add_command(cmd, name=entry_point.name) + + return swh(auto_envvar_prefix='SWH') + + +if __name__ == '__main__': + main() diff --git a/swh/core/cli.py b/swh/core/cli/db.py similarity index 80% rename from swh/core/cli.py rename to swh/core/cli/db.py index 0a63ceb..27a1992 100755 --- a/swh/core/cli.py +++ b/swh/core/cli/db.py @@ -1,80 +1,86 @@ #!/usr/bin/env python3 # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging import warnings warnings.filterwarnings("ignore") # noqa prevent psycopg from telling us sh*t -from os import path -import glob - import click -from importlib import import_module -from swh.core.utils import numfile_sortkey as sortkey -from swh.core.tests.db_testing import ( - pg_createdb, pg_restore, DB_DUMP_TYPES, - swh_db_version -) + +logger = logging.getLogger(__name__) @click.command() @click.argument('module', nargs=-1, required=True) @click.option('--db-name', '-d', help='Database name.', default='softwareheritage-dev', show_default=True) def db_init(module, db_name=None): """Initialise a database for the Software Heritage . By default, attempts to create the database first. Example: - swh-db-init storage -d swh-test + swh db-init -d swh-test storage If you want to specify non-default postgresql connection parameters, please provide them using standard environment variables. See psql(1) man page (section ENVIRONMENTS) for details. Example: - PGPORT=5434 swh-db-init indexer -d swh-indexer + PGPORT=5434 swh db-init indexer """ + # put import statements here so we can keep startup time of the main swh + # command as short as possible + from os import path + import glob + from importlib import import_module + from swh.core.utils import numfile_sortkey as sortkey + from swh.core.tests.db_testing import ( + pg_createdb, pg_restore, DB_DUMP_TYPES, + swh_db_version + ) + + logger.debug('db_init %s dn_name=%s', module, db_name) dump_files = [] for modname in module: if not modname.startswith('swh.'): modname = 'swh.{}'.format(modname) try: m = import_module(modname) except ImportError: raise click.BadParameter( 'Unable to load module {}'.format(modname)) sqldir = path.join(path.dirname(m.__file__), 'sql') if not path.isdir(sqldir): raise click.BadParameter( 'Module {} does not provide a db schema ' '(no sql/ dir)'.format(modname)) dump_files.extend(sorted(glob.glob(path.join(sqldir, '*.sql')), key=sortkey)) # Create the db (or fail silently if already existing) pg_createdb(db_name, check=False) # Try to retrieve the db version if any db_version = swh_db_version(db_name) if not db_version: # Initialize the db dump_files = [(x, DB_DUMP_TYPES[path.splitext(x)[1]]) for x in dump_files] for dump, dtype in dump_files: click.secho('Loading {}'.format(dump), fg='yellow') pg_restore(db_name, dump, dtype) db_version = swh_db_version(db_name) # TODO: Ideally migrate the version from db_version to the latest # db version click.secho('DONE database is {} version {}'.format(db_name, db_version), fg='green', bold=True) diff --git a/swh/core/logger.py b/swh/core/logger.py index 74b9dd5..0a2511f 100644 --- a/swh/core/logger.py +++ b/swh/core/logger.py @@ -1,192 +1,104 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging -import os -import socket -import psycopg2 -from psycopg2.extras import Json from systemd.journal import JournalHandler as _JournalHandler, send try: from celery import current_task except ImportError: current_task = None EXTRA_LOGDATA_PREFIX = 'swh_' def db_level_of_py_level(lvl): """convert a log level of the logging module to a log level suitable for the logging Postgres DB """ return logging.getLevelName(lvl).lower() def get_extra_data(record, task_args=True): """Get the extra data to insert to the database from the logging record""" log_data = record.__dict__ extra_data = {k[len(EXTRA_LOGDATA_PREFIX):]: v for k, v in log_data.items() if k.startswith(EXTRA_LOGDATA_PREFIX)} args = log_data.get('args') if args: extra_data['logging_args'] = args # Retrieve Celery task info if current_task and current_task.request: extra_data['task'] = { 'id': current_task.request.id, 'name': current_task.name, } if task_args: extra_data['task'].update({ 'kwargs': current_task.request.kwargs, 'args': current_task.request.args, }) return extra_data def flatten(data, separator='_'): """Flatten the data dictionary into a flat structure""" def inner_flatten(data, prefix): if isinstance(data, dict): for key, value in data.items(): yield from inner_flatten(value, prefix + [key]) elif isinstance(data, (list, tuple)): for key, value in enumerate(data): yield from inner_flatten(value, prefix + [str(key)]) else: yield prefix, data for path, value in inner_flatten(data, []): yield separator.join(path), value def stringify(value): """Convert value to string""" if isinstance(value, datetime.datetime): return value.isoformat() return str(value) -class PostgresHandler(logging.Handler): - """log handler that store messages in a Postgres DB - - See swh-core/swh/core/sql/log-schema.sql for the DB schema. - - All logging methods can be used as usual. Additionally, arbitrary metadata - can be passed to logging methods, requesting that they will be stored in - the DB as a single JSONB value. To do so, pass a dictionary to the 'extra' - kwarg of any logging method; all keys in that dictionary that start with - EXTRA_LOGDATA_PREFIX (currently: ``swh_``) will be extracted to form the - JSONB dictionary. The prefix will be stripped and not included in the DB. - - Note: the logger name will be used to fill the 'module' DB column. - - Sample usage:: - - logging.basicConfig(level=logging.INFO) - h = PostgresHandler('dbname=softwareheritage-log') - logging.getLogger().addHandler(h) - - logger.info('not so important notice', - extra={'swh_type': 'swh_logging_test', - 'swh_meditation': 'guru'}) - logger.warn('something weird just happened, did you see that?') - - """ - - def __init__(self, connstring): - """ - Create a Postgres log handler. - - Args: - config: configuration dictionary, with a key "log_db" containing a - libpq connection string to the log DB - """ - super().__init__() - - self.connstring = connstring - - self.fqdn = socket.getfqdn() # cache FQDN value - - def _connect(self): - return psycopg2.connect(self.connstring) - - def emit(self, record): - msg = self.format(record) - extra_data = get_extra_data(record) - - if 'task' in extra_data: - task_args = { - 'args': extra_data['task']['args'], - 'kwargs': extra_data['task']['kwargs'], - } - - try: - json_args = Json(task_args).getquoted() - except TypeError: - task_args = { - 'args': [''], - 'kwargs': {}, - } - else: - json_args_length = len(json_args) - if json_args_length >= 1000: - task_args = { - 'args': [''], - 'kwargs': {}, - } - - extra_data['task'].update(task_args) - - log_entry = (db_level_of_py_level(record.levelno), msg, - Json(extra_data), record.name, self.fqdn, - os.getpid()) - db = self._connect() - with db.cursor() as cur: - cur.execute('INSERT INTO log ' - '(level, message, data, src_module, src_host, src_pid)' - 'VALUES (%s, %s, %s, %s, %s, %s)', - log_entry) - db.commit() - db.close() - - class JournalHandler(_JournalHandler): def emit(self, record): """Write `record` as a journal event. MESSAGE is taken from the message provided by the user, and PRIORITY, LOGGER, THREAD_NAME, CODE_{FILE,LINE,FUNC} fields are appended automatically. In addition, record.MESSAGE_ID will be used if present. """ try: extra_data = flatten(get_extra_data(record, task_args=False)) extra_data = { (EXTRA_LOGDATA_PREFIX + key).upper(): stringify(value) for key, value in extra_data } msg = self.format(record) pri = self.mapPriority(record.levelno) send(msg, PRIORITY=format(pri), LOGGER=record.name, THREAD_NAME=record.threadName, CODE_FILE=record.pathname, CODE_LINE=record.lineno, CODE_FUNC=record.funcName, **extra_data) except Exception: self.handleError(record) diff --git a/swh/core/tests/db_testing.py b/swh/core/tests/db_testing.py index 8ca1625..c2122e8 100644 --- a/swh/core/tests/db_testing.py +++ b/swh/core/tests/db_testing.py @@ -1,314 +1,315 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import glob -import psycopg2 import subprocess +import psycopg2 + from swh.core.utils import numfile_sortkey as sortkey DB_DUMP_TYPES = {'.sql': 'psql', '.dump': 'pg_dump'} def swh_db_version(dbname_or_service): """Retrieve the swh version if any. In case of the db not initialized, this returns None. Otherwise, this returns the db's version. Args: dbname_or_service (str): The db's name or service Returns: Optional[Int]: Either the db's version or None """ query = 'select version from dbversion order by dbversion desc limit 1' cmd = [ 'psql', '--tuples-only', '--no-psqlrc', '--quiet', '-v', 'ON_ERROR_STOP=1', "--command=%s" % query, dbname_or_service ] try: r = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, universal_newlines=True) result = int(r.stdout.strip()) except Exception: # db not initialized result = None return result def pg_restore(dbname, dumpfile, dumptype='pg_dump'): """ Args: dbname: name of the DB to restore into dumpfile: path fo the dump file dumptype: one of 'pg_dump' (for binary dumps), 'psql' (for SQL dumps) """ assert dumptype in ['pg_dump', 'psql'] if dumptype == 'pg_dump': subprocess.check_call(['pg_restore', '--no-owner', '--no-privileges', '--dbname', dbname, dumpfile]) elif dumptype == 'psql': subprocess.check_call(['psql', '--quiet', '--no-psqlrc', '-v', 'ON_ERROR_STOP=1', '-f', dumpfile, dbname]) def pg_dump(dbname, dumpfile): subprocess.check_call(['pg_dump', '--no-owner', '--no-privileges', '-Fc', '-f', dumpfile, dbname]) def pg_dropdb(dbname): subprocess.check_call(['dropdb', dbname]) def pg_createdb(dbname, check=True): """Create a db. If check is True and the db already exists, this will raise an exception (original behavior). If check is False and the db already exists, this will fail silently. If the db does not exist, the db will be created. """ subprocess.run(['createdb', dbname], check=check) def db_create(dbname, dumps=None): """create the test DB and load the test data dumps into it dumps is an iterable of couples (dump_file, dump_type). context: setUpClass """ try: pg_createdb(dbname) except subprocess.CalledProcessError: # try recovering once, in case pg_dropdb(dbname) # the db already existed pg_createdb(dbname) for dump, dtype in dumps: pg_restore(dbname, dump, dtype) return dbname def db_destroy(dbname): """destroy the test DB context: tearDownClass """ pg_dropdb(dbname) def db_connect(dbname): """connect to the test DB and open a cursor context: setUp """ conn = psycopg2.connect('dbname=' + dbname) return { 'conn': conn, 'cursor': conn.cursor() } def db_close(conn): """rollback current transaction and disconnect from the test DB context: tearDown """ if not conn.closed: conn.rollback() conn.close() class DbTestConn: def __init__(self, dbname): self.dbname = dbname def __enter__(self): self.db_setup = db_connect(self.dbname) self.conn = self.db_setup['conn'] self.cursor = self.db_setup['cursor'] return self def __exit__(self, *_): db_close(self.conn) class DbTestContext: def __init__(self, name='softwareheritage-test', dumps=None): self.dbname = name self.dumps = dumps def __enter__(self): db_create(dbname=self.dbname, dumps=self.dumps) return self def __exit__(self, *_): db_destroy(self.dbname) class DbTestFixture: """Mix this in a test subject class to get DB testing support. Use the class method add_db() to add a new database to be tested. Using this will create a DbTestConn entry in the `test_db` dictionary for all the tests, indexed by the name of the database. Example: class TestDb(DbTestFixture, unittest.TestCase): @classmethod def setUpClass(cls): cls.add_db('db_name', DUMP) super().setUpClass() def setUp(self): db = self.test_db['db_name'] print('conn: {}, cursor: {}'.format(db.conn, db.cursor)) To ensure test isolation, each test method of the test case class will execute in its own connection, cursor, and transaction. Note that if you want to define setup/teardown methods, you need to explicitly call super() to ensure that the fixture setup/teardown methods are invoked. Here is an example where all setup/teardown methods are defined in a test case: class TestDb(DbTestFixture, unittest.TestCase): @classmethod def setUpClass(cls): # your add_db() calls here super().setUpClass() # your class setup code here def setUp(self): super().setUp() # your instance setup code here def tearDown(self): # your instance teardown code here super().tearDown() @classmethod def tearDownClass(cls): # your class teardown code here super().tearDownClass() """ _DB_DUMP_LIST = {} _DB_LIST = {} DB_TEST_FIXTURE_IMPORTED = True @classmethod def add_db(cls, name='softwareheritage-test', dumps=None): cls._DB_DUMP_LIST[name] = dumps @classmethod def setUpClass(cls): for name, dumps in cls._DB_DUMP_LIST.items(): cls._DB_LIST[name] = DbTestContext(name, dumps) cls._DB_LIST[name].__enter__() super().setUpClass() @classmethod def tearDownClass(cls): super().tearDownClass() for name, context in cls._DB_LIST.items(): context.__exit__() def setUp(self, *args, **kwargs): self.test_db = {} for name in self._DB_LIST.keys(): self.test_db[name] = DbTestConn(name) self.test_db[name].__enter__() super().setUp(*args, **kwargs) def tearDown(self): super().tearDown() for name in self._DB_LIST.keys(): self.test_db[name].__exit__() def reset_db_tables(self, name, excluded=None): db = self.test_db[name] conn = db.conn cursor = db.cursor cursor.execute("""SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public',)) tables = set(table for (table,) in cursor.fetchall()) if excluded is not None: tables -= set(excluded) for table in tables: cursor.execute('truncate table %s cascade' % table) conn.commit() class SingleDbTestFixture(DbTestFixture): """Simplified fixture like DbTest but that can only handle a single DB. Gives access to shortcuts like self.cursor and self.conn. DO NOT use this with other fixtures that need to access databases, like StorageTestFixture. The class can override the following class attributes: TEST_DB_NAME: name of the DB used for testing TEST_DB_DUMP: DB dump to be restored before running test methods; can be set to None if no restore from dump is required. If the dump file name endswith" - '.sql' it will be loaded via psql, - '.dump' it will be loaded via pg_restore. Other file extensions will be ignored. Can be a string or a list of strings; each path will be expanded using glob pattern matching. The test case class will then have the following attributes, accessible via self: dbname: name of the test database conn: psycopg2 connection object cursor: open psycopg2 cursor to the DB """ TEST_DB_NAME = 'softwareheritage-test' TEST_DB_DUMP = None @classmethod def setUpClass(cls): cls.dbname = cls.TEST_DB_NAME # XXX to kill? dump_files = cls.TEST_DB_DUMP if isinstance(dump_files, str): dump_files = [dump_files] all_dump_files = [] for files in dump_files: all_dump_files.extend( sorted(glob.glob(files), key=sortkey)) all_dump_files = [(x, DB_DUMP_TYPES[os.path.splitext(x)[1]]) for x in all_dump_files] cls.add_db(name=cls.TEST_DB_NAME, dumps=all_dump_files) super().setUpClass() def setUp(self, *args, **kwargs): super().setUp(*args, **kwargs) db = self.test_db[self.TEST_DB_NAME] self.conn = db.conn self.cursor = db.cursor diff --git a/swh/core/tests/server_testing.py b/swh/core/tests/server_testing.py index 3187d1f..9a708a4 100644 --- a/swh/core/tests/server_testing.py +++ b/swh/core/tests/server_testing.py @@ -1,149 +1,149 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc -import aiohttp import multiprocessing import os import socket import time - from urllib.request import urlopen +import aiohttp + class ServerTestFixtureBaseClass(metaclass=abc.ABCMeta): """Base class for http client/server testing implementations. Override this class to implement the following methods: - process_config: to do something needed for the server configuration (e.g propagate the configuration to other part) - define_worker_function: define the function that will actually run the server. To ensure test isolation, each test will run in a different server and a different folder. In order to correctly work, the subclass must call the parents class's setUp() and tearDown() methods. """ def setUp(self): super().setUp() self.start_server() def tearDown(self): self.stop_server() super().tearDown() def url(self): return 'http://127.0.0.1:%d/' % self.port def process_config(self): """Process the server's configuration. Do something useful for example, pass along the self.config dictionary inside the self.app. By default, do nothing. """ pass @abc.abstractmethod def define_worker_function(self, app, port): """Define how the actual implementation server will run. """ pass def start_server(self): """ Spawn the API server using multiprocessing. """ self.process = None self.process_config() # Get an available port number sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 0)) self.port = sock.getsockname()[1] sock.close() worker_fn = self.define_worker_function() self.process = multiprocessing.Process( target=worker_fn, args=(self.app, self.port) ) self.process.start() # Wait max 5 seconds for server to spawn i = 0 while i < 500: try: urlopen(self.url()) except Exception: i += 1 time.sleep(0.01) else: return def stop_server(self): """ Terminate the API server's process. """ if self.process: self.process.terminate() class ServerTestFixture(ServerTestFixtureBaseClass): """Base class for http client/server testing (e.g flask). Mix this in a test class in order to have access to an http server running in background. Note that the subclass should define a dictionary in self.config that contains the server config. And an application in self.app that corresponds to the type of server the tested client needs. To ensure test isolation, each test will run in a different server and a different folder. In order to correctly work, the subclass must call the parents class's setUp() and tearDown() methods. """ def process_config(self): # WSGI app configuration for key, value in self.config.items(): self.app.config[key] = value def define_worker_function(self): def worker(app, port): # Make Flask 1.0 stop printing its server banner os.environ['WERKZEUG_RUN_MAIN'] = 'true' return app.run(port=port, use_reloader=False) return worker class ServerTestFixtureAsync(ServerTestFixtureBaseClass): """Base class for http client/server async testing (e.g aiohttp). Mix this in a test class in order to have access to an http server running in background. Note that the subclass should define an application in self.app that corresponds to the type of server the tested client needs. To ensure test isolation, each test will run in a different server and a different folder. In order to correctly work, the subclass must call the parents class's setUp() and tearDown() methods. """ def define_worker_function(self): def worker(app, port): return aiohttp.web.run_app(app, port=int(port), print=lambda *_: None) return worker diff --git a/swh/core/tests/test_logger.py b/swh/core/tests/test_logger.py deleted file mode 100644 index 3d00693..0000000 --- a/swh/core/tests/test_logger.py +++ /dev/null @@ -1,51 +0,0 @@ -# Copyright (C) 2015-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging -import os - -import pytest - -from swh.core.logger import PostgresHandler - -from swh.core.tests import SQL_DIR - -DUMP_FILE = os.path.join(SQL_DIR, 'log-schema.sql') - - -@pytest.fixture -def swh_db_logger(postgresql_proc, postgresql): - - cursor = postgresql.cursor() - with open(DUMP_FILE) as fobj: - cursor.execute(fobj.read()) - postgresql.commit() - modname = 'swh.core.tests.test_logger' - logger = logging.Logger(modname, logging.DEBUG) - dsn = 'postgresql://{user}@{host}:{port}/{dbname}'.format( - host=postgresql_proc.host, - port=postgresql_proc.port, - user='postgres', - dbname='tests') - logger.addHandler(PostgresHandler(dsn)) - return logger - - -@pytest.mark.db -def test_log(swh_db_logger, postgresql): - logger = swh_db_logger - modname = logger.name - - logger.info('notice', - extra={'swh_type': 'test entry', 'swh_data': 42}) - logger.warning('warning') - - with postgresql.cursor() as cur: - cur.execute('SELECT level, message, data, src_module FROM log') - db_log_entries = cur.fetchall() - - assert ('info', 'notice', {'type': 'test entry', 'data': 42}, - modname) in db_log_entries - assert ('warning', 'warning', {}, modname) in db_log_entries diff --git a/version.txt b/version.txt index ab0de05..05e622d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.59-0-g1d9c0bf \ No newline at end of file +v0.0.60-0-g86eeb30 \ No newline at end of file