diff --git a/PKG-INFO b/PKG-INFO index dd31e6c..32f52da 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,59 +1,59 @@ Metadata-Version: 2.1 Name: swh.core -Version: 0.0.44 +Version: 0.0.45 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-core Description: swh-core ======== core library for swh's modules: - config parser - hash computations - serialization - logging mechanism Defines also a celery application to run concurrency tasks Celery use ---------- ### configuration file worker.ini file which looks like: [main] task_broker = amqp://guest@localhost// task_modules = swh.loader.dir.tasks, swh.loader.tar.tasks, swh.loader.git.tasks task_queues = swh_loader_tar, swh_loader_git, swh_loader_dir task_soft_time_limit = 0 This file can be set in the following location: - ~/.swh - ~/.config/swh - /etc/softwareheritage ### run celery worker Sample command: celery worker --app=swh.core.worker \ --pool=prefork \ --autoscale=2,2 \ -Ofair \ --loglevel=info 2>&1 | tee -a swh-core-worker.log Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..afa4cf3 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +norecursedirs = docs diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index dd31e6c..32f52da 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,59 +1,59 @@ Metadata-Version: 2.1 Name: swh.core -Version: 0.0.44 +Version: 0.0.45 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-core Description: swh-core ======== core library for swh's modules: - config parser - hash computations - serialization - logging mechanism Defines also a celery application to run concurrency tasks Celery use ---------- ### configuration file worker.ini file which looks like: [main] task_broker = amqp://guest@localhost// task_modules = swh.loader.dir.tasks, swh.loader.tar.tasks, swh.loader.git.tasks task_queues = swh_loader_tar, swh_loader_git, swh_loader_dir task_soft_time_limit = 0 This file can be set in the following location: - ~/.swh - ~/.config/swh - /etc/softwareheritage ### run celery worker Sample command: celery worker --app=swh.core.worker \ --pool=prefork \ --autoscale=2,2 \ -Ofair \ --loglevel=info 2>&1 | tee -a swh-core-worker.log Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.core.egg-info/SOURCES.txt b/swh.core.egg-info/SOURCES.txt index 6893a6b..29a61cb 100644 --- a/swh.core.egg-info/SOURCES.txt +++ b/swh.core.egg-info/SOURCES.txt @@ -1,49 +1,44 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile README.md +pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.py tox.ini version.txt -debian/changelog -debian/compat -debian/control -debian/copyright -debian/rules -debian/source/format docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.core.egg-info/PKG-INFO swh.core.egg-info/SOURCES.txt swh.core.egg-info/dependency_links.txt swh.core.egg-info/entry_points.txt swh.core.egg-info/requires.txt swh.core.egg-info/top_level.txt swh/core/__init__.py swh/core/api.py swh/core/api_async.py swh/core/cli.py swh/core/config.py swh/core/logger.py swh/core/serializers.py swh/core/tarball.py swh/core/utils.py swh/core/sql/log-schema.sql swh/core/tests/__init__.py swh/core/tests/db_testing.py swh/core/tests/server_testing.py swh/core/tests/test_api.py swh/core/tests/test_config.py swh/core/tests/test_logger.py swh/core/tests/test_serializers.py swh/core/tests/test_utils.py \ No newline at end of file diff --git a/swh/core/logger.py b/swh/core/logger.py index ecced63..4198503 100644 --- a/swh/core/logger.py +++ b/swh/core/logger.py @@ -1,192 +1,192 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import os import socket import psycopg2 from psycopg2.extras import Json from systemd.journal import JournalHandler as _JournalHandler, send try: from celery import current_task except ImportError: current_task = None EXTRA_LOGDATA_PREFIX = 'swh_' def db_level_of_py_level(lvl): """convert a log level of the logging module to a log level suitable for the logging Postgres DB """ return logging.getLevelName(lvl).lower() def get_extra_data(record, task_args=True): """Get the extra data to insert to the database from the logging record""" log_data = record.__dict__ extra_data = {k[len(EXTRA_LOGDATA_PREFIX):]: v for k, v in log_data.items() if k.startswith(EXTRA_LOGDATA_PREFIX)} args = log_data.get('args') if args: extra_data['logging_args'] = args # Retrieve Celery task info if current_task and current_task.request: extra_data['task'] = { 'id': current_task.request.id, 'name': current_task.name, } if task_args: extra_data['task'].update({ 'kwargs': current_task.request.kwargs, 'args': current_task.request.args, }) return extra_data def flatten(data, separator='_'): """Flatten the data dictionary into a flat structure""" def inner_flatten(data, prefix): if isinstance(data, dict): for key, value in data.items(): yield from inner_flatten(value, prefix + [key]) elif isinstance(data, (list, tuple)): for key, value in enumerate(data): yield from inner_flatten(value, prefix + [str(key)]) else: yield prefix, data for path, value in inner_flatten(data, []): yield separator.join(path), value def stringify(value): """Convert value to string""" if isinstance(value, datetime.datetime): return value.isoformat() return str(value) class PostgresHandler(logging.Handler): """log handler that store messages in a Postgres DB See swh-core/swh/core/sql/log-schema.sql for the DB schema. All logging methods can be used as usual. Additionally, arbitrary metadata can be passed to logging methods, requesting that they will be stored in the DB as a single JSONB value. To do so, pass a dictionary to the 'extra' kwarg of any logging method; all keys in that dictionary that start with - EXTRA_LOGDATA_PREFIX (currently: 'swh\_') will be extracted to form the + EXTRA_LOGDATA_PREFIX (currently: ``swh_``) will be extracted to form the JSONB dictionary. The prefix will be stripped and not included in the DB. Note: the logger name will be used to fill the 'module' DB column. Sample usage:: logging.basicConfig(level=logging.INFO) h = PostgresHandler('dbname=softwareheritage-log') logging.getLogger().addHandler(h) logger.info('not so important notice', extra={'swh_type': 'swh_logging_test', 'swh_meditation': 'guru'}) logger.warn('something weird just happened, did you see that?') """ def __init__(self, connstring): """ Create a Postgres log handler. Args: config: configuration dictionary, with a key "log_db" containing a libpq connection string to the log DB """ super().__init__() self.connstring = connstring self.fqdn = socket.getfqdn() # cache FQDN value def _connect(self): return psycopg2.connect(self.connstring) def emit(self, record): msg = self.format(record) extra_data = get_extra_data(record) if 'task' in extra_data: task_args = { 'args': extra_data['task']['args'], 'kwargs': extra_data['task']['kwargs'], } try: json_args = Json(task_args).getquoted() except TypeError: task_args = { 'args': [''], 'kwargs': {}, } else: json_args_length = len(json_args) if json_args_length >= 1000: task_args = { 'args': [''], 'kwargs': {}, } extra_data['task'].update(task_args) log_entry = (db_level_of_py_level(record.levelno), msg, Json(extra_data), record.name, self.fqdn, os.getpid()) db = self._connect() with db.cursor() as cur: cur.execute('INSERT INTO log ' '(level, message, data, src_module, src_host, src_pid)' 'VALUES (%s, %s, %s, %s, %s, %s)', log_entry) db.commit() db.close() class JournalHandler(_JournalHandler): def emit(self, record): """Write `record` as a journal event. MESSAGE is taken from the message provided by the user, and PRIORITY, LOGGER, THREAD_NAME, CODE_{FILE,LINE,FUNC} fields are appended automatically. In addition, record.MESSAGE_ID will be used if present. """ try: extra_data = flatten(get_extra_data(record, task_args=False)) extra_data = { (EXTRA_LOGDATA_PREFIX + key).upper(): stringify(value) for key, value in extra_data } msg = self.format(record) pri = self.mapPriority(record.levelno) send(msg, PRIORITY=format(pri), LOGGER=record.name, THREAD_NAME=record.threadName, CODE_FILE=record.pathname, CODE_LINE=record.lineno, CODE_FUNC=record.funcName, **extra_data) except Exception: self.handleError(record) diff --git a/swh/core/serializers.py b/swh/core/serializers.py index 49bd7e0..cc634f6 100644 --- a/swh/core/serializers.py +++ b/swh/core/serializers.py @@ -1,186 +1,191 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import base64 import datetime from json import JSONDecoder, JSONEncoder import types from uuid import UUID import dateutil.parser import msgpack def encode_data_client(data): try: return msgpack_dumps(data) except OverflowError as e: raise ValueError('Limits were reached. Please, check your input.\n' + str(e)) def decode_response(response): content_type = response.headers['content-type'] if content_type.startswith('application/x-msgpack'): r = msgpack_loads(response.content) elif content_type.startswith('application/json'): r = response.json(cls=SWHJSONDecoder) else: raise ValueError('Wrong content type `%s` for API response' % content_type) return r class SWHJSONEncoder(JSONEncoder): """JSON encoder for data structures generated by Software Heritage. This JSON encoder extends the default Python JSON encoder and adds awareness for the following specific types: - bytes (get encoded as a Base85 string); - datetime.datetime (get encoded as an ISO8601 string). Non-standard types get encoded as a a dictionary with two keys: - swhtype with value 'bytes' or 'datetime'; - d containing the encoded value. SWHJSONEncoder also encodes arbitrary iterables as a list (allowing serialization of generators). Caveats: Limitations in the JSONEncoder extension mechanism prevent us from "escaping" dictionaries that only contain the swhtype and d keys, and therefore arbitrary data structures can't be round-tripped through SWHJSONEncoder and SWHJSONDecoder. """ def default(self, o): if isinstance(o, bytes): return { 'swhtype': 'bytes', 'd': base64.b85encode(o).decode('ascii'), } elif isinstance(o, datetime.datetime): return { 'swhtype': 'datetime', 'd': o.isoformat(), } elif isinstance(o, UUID): return { 'swhtype': 'uuid', 'd': str(o), } elif isinstance(o, datetime.timedelta): return { 'swhtype': 'timedelta', 'd': { 'days': o.days, 'seconds': o.seconds, 'microseconds': o.microseconds, }, } elif isinstance(o, arrow.Arrow): return { 'swhtype': 'arrow', 'd': o.isoformat(), } try: return super().default(o) except TypeError as e: try: iterable = iter(o) except TypeError: raise e from None else: return list(iterable) class SWHJSONDecoder(JSONDecoder): """JSON decoder for data structures encoded with SWHJSONEncoder. This JSON decoder extends the default Python JSON decoder, allowing the decoding of: - bytes (encoded as a Base85 string); - datetime.datetime (encoded as an ISO8601 string). Non-standard types must be encoded as a a dictionary with exactly two keys: - swhtype with value 'bytes' or 'datetime'; - d containing the encoded value. To limit the impact our encoding, if the swhtype key doesn't contain a known value, the dictionary is decoded as-is. """ def decode_data(self, o): if isinstance(o, dict): if set(o.keys()) == {'d', 'swhtype'}: datatype = o['swhtype'] if datatype == 'bytes': return base64.b85decode(o['d']) elif datatype == 'datetime': return dateutil.parser.parse(o['d']) elif datatype == 'uuid': return UUID(o['d']) elif datatype == 'timedelta': return datetime.timedelta(**o['d']) elif datatype == 'arrow': return arrow.get(o['d']) return {key: self.decode_data(value) for key, value in o.items()} if isinstance(o, list): return [self.decode_data(value) for value in o] else: return o def raw_decode(self, s, idx=0): data, index = super().raw_decode(s, idx) return self.decode_data(data), index def msgpack_dumps(data): """Write data as a msgpack stream""" def encode_types(obj): if isinstance(obj, datetime.datetime): return {b'__datetime__': True, b's': obj.isoformat()} if isinstance(obj, types.GeneratorType): return list(obj) if isinstance(obj, UUID): return {b'__uuid__': True, b's': str(obj)} if isinstance(obj, datetime.timedelta): return { b'__timedelta__': True, b's': { 'days': obj.days, 'seconds': obj.seconds, 'microseconds': obj.microseconds, }, } if isinstance(obj, arrow.Arrow): return {b'__arrow__': True, b's': obj.isoformat()} return obj return msgpack.packb(data, use_bin_type=True, default=encode_types) def msgpack_loads(data): """Read data as a msgpack stream""" def decode_types(obj): if b'__datetime__' in obj and obj[b'__datetime__']: return dateutil.parser.parse(obj[b's']) if b'__uuid__' in obj and obj[b'__uuid__']: return UUID(obj[b's']) if b'__timedelta__' in obj and obj[b'__timedelta__']: return datetime.timedelta(**obj[b's']) if b'__arrow__' in obj and obj[b'__arrow__']: return arrow.get(obj[b's']) return obj - return msgpack.unpackb(data, encoding='utf-8', object_hook=decode_types) + try: + return msgpack.unpackb(data, raw=False, + object_hook=decode_types) + except TypeError: # msgpack < 0.5.2 + return msgpack.unpackb(data, encoding='utf-8', + object_hook=decode_types) diff --git a/swh/core/tests/db_testing.py b/swh/core/tests/db_testing.py index 7c7971d..a0ae407 100644 --- a/swh/core/tests/db_testing.py +++ b/swh/core/tests/db_testing.py @@ -1,281 +1,281 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import glob import psycopg2 import subprocess from swh.core.utils import numfile_sortkey as sortkey DB_DUMP_TYPES = {'.sql': 'psql', '.dump': 'pg_dump'} def pg_restore(dbname, dumpfile, dumptype='pg_dump'): """ Args: dbname: name of the DB to restore into dumpfile: path fo the dump file dumptype: one of 'pg_dump' (for binary dumps), 'psql' (for SQL dumps) """ assert dumptype in ['pg_dump', 'psql'] if dumptype == 'pg_dump': subprocess.check_call(['pg_restore', '--no-owner', '--no-privileges', '--dbname', dbname, dumpfile]) elif dumptype == 'psql': subprocess.check_call(['psql', '--quiet', '--no-psqlrc', '-v', 'ON_ERROR_STOP=1', '-f', dumpfile, dbname]) def pg_dump(dbname, dumpfile): subprocess.check_call(['pg_dump', '--no-owner', '--no-privileges', '-Fc', '-f', dumpfile, dbname]) def pg_dropdb(dbname): subprocess.check_call(['dropdb', dbname]) def pg_createdb(dbname): subprocess.check_call(['createdb', dbname]) def db_create(dbname, dumps=None): """create the test DB and load the test data dumps into it dumps is an iterable of couples (dump_file, dump_type). context: setUpClass """ try: pg_createdb(dbname) except subprocess.CalledProcessError: # try recovering once, in case pg_dropdb(dbname) # the db already existed pg_createdb(dbname) for dump, dtype in dumps: pg_restore(dbname, dump, dtype) return dbname def db_destroy(dbname): """destroy the test DB context: tearDownClass """ pg_dropdb(dbname) def db_connect(dbname): """connect to the test DB and open a cursor context: setUp """ conn = psycopg2.connect('dbname=' + dbname) return { 'conn': conn, 'cursor': conn.cursor() } def db_close(conn): """rollback current transaction and disconnect from the test DB context: tearDown """ if not conn.closed: conn.rollback() conn.close() class DbTestConn: def __init__(self, dbname): self.dbname = dbname def __enter__(self): self.db_setup = db_connect(self.dbname) self.conn = self.db_setup['conn'] self.cursor = self.db_setup['cursor'] return self def __exit__(self, *_): db_close(self.conn) class DbTestContext: def __init__(self, name='softwareheritage-test', dumps=None): self.dbname = name self.dumps = dumps def __enter__(self): db_create(dbname=self.dbname, dumps=self.dumps) return self def __exit__(self, *_): db_destroy(self.dbname) class DbTestFixture: """Mix this in a test subject class to get DB testing support. Use the class method add_db() to add a new database to be tested. Using this will create a DbTestConn entry in the `test_db` dictionary for all the tests, indexed by the name of the database. Example: class TestDb(DbTestFixture, unittest.TestCase): @classmethod def setUpClass(cls): - super().setUpClass() cls.add_db('db_name', DUMP) + super().setUpClass() def setUp(self): db = self.test_db['db_name'] print('conn: {}, cursor: {}'.format(db.conn, db.cursor)) To ensure test isolation, each test method of the test case class will execute in its own connection, cursor, and transaction. Note that if you want to define setup/teardown methods, you need to explicitly call super() to ensure that the fixture setup/teardown methods are invoked. Here is an example where all setup/teardown methods are defined in a test case: class TestDb(DbTestFixture, unittest.TestCase): @classmethod def setUpClass(cls): # your add_db() calls here super().setUpClass() # your class setup code here def setUp(self): super().setUp() # your instance setup code here def tearDown(self): # your instance teardown code here super().tearDown() @classmethod def tearDownClass(cls): # your class teardown code here super().tearDownClass() """ _DB_DUMP_LIST = {} _DB_LIST = {} DB_TEST_FIXTURE_IMPORTED = True @classmethod def add_db(cls, name='softwareheritage-test', dumps=None): cls._DB_DUMP_LIST[name] = dumps @classmethod def setUpClass(cls): for name, dumps in cls._DB_DUMP_LIST.items(): cls._DB_LIST[name] = DbTestContext(name, dumps) cls._DB_LIST[name].__enter__() super().setUpClass() @classmethod def tearDownClass(cls): super().tearDownClass() for name, context in cls._DB_LIST.items(): context.__exit__() def setUp(self): self.test_db = {} for name in self._DB_LIST.keys(): self.test_db[name] = DbTestConn(name) self.test_db[name].__enter__() super().setUp() def tearDown(self): super().tearDown() for name in self._DB_LIST.keys(): self.test_db[name].__exit__() def reset_db_tables(self, name, excluded=None): db = self.test_db[name] conn = db.conn cursor = db.cursor cursor.execute("""SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public',)) tables = set(table for (table,) in cursor.fetchall()) if excluded is not None: tables -= set(excluded) for table in tables: cursor.execute('truncate table %s cascade' % table) conn.commit() class SingleDbTestFixture(DbTestFixture): """Simplified fixture like DbTest but that can only handle a single DB. Gives access to shortcuts like self.cursor and self.conn. DO NOT use this with other fixtures that need to access databases, like StorageTestFixture. The class can override the following class attributes: TEST_DB_NAME: name of the DB used for testing TEST_DB_DUMP: DB dump to be restored before running test methods; can be set to None if no restore from dump is required. If the dump file name endswith" - '.sql' it will be loaded via psql, - '.dump' it will be loaded via pg_restore. Other file extensions will be ignored. Can be a string or a list of strings; each path will be expanded using glob pattern matching. The test case class will then have the following attributes, accessible via self: dbname: name of the test database conn: psycopg2 connection object cursor: open psycopg2 cursor to the DB """ TEST_DB_NAME = 'softwareheritage-test' TEST_DB_DUMP = None @classmethod def setUpClass(cls): cls.dbname = cls.TEST_DB_NAME # XXX to kill? dump_files = cls.TEST_DB_DUMP if isinstance(dump_files, str): dump_files = [dump_files] all_dump_files = [] for files in dump_files: all_dump_files.extend( sorted(glob.glob(files), key=sortkey)) all_dump_files = [(x, DB_DUMP_TYPES[os.path.splitext(x)[1]]) for x in all_dump_files] cls.add_db(name=cls.TEST_DB_NAME, dumps=all_dump_files) super().setUpClass() def setUp(self): super().setUp() db = self.test_db[self.TEST_DB_NAME] self.conn = db.conn self.cursor = db.cursor diff --git a/tox.ini b/tox.ini index c1e2f8b..70265ee 100644 --- a/tox.ini +++ b/tox.ini @@ -1,17 +1,17 @@ [tox] envlist=flake8,py3 [testenv:py3] deps = - -r requirements-test.txt + .[testing] pytest-cov pifpaf commands = pifpaf run postgresql -- pytest --cov=swh --cov-branch {posargs} [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 diff --git a/version.txt b/version.txt index 2595a75..0ecc223 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.44-0-ge118202 \ No newline at end of file +v0.0.45-0-gda3d0ab \ No newline at end of file