diff --git a/PKG-INFO b/PKG-INFO index cead7a6..f90bab7 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.6 +Version: 0.0.7 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/rules b/debian/rules index 4fbd2bf..0015df0 100755 --- a/debian/rules +++ b/debian/rules @@ -1,14 +1,14 @@ #!/usr/bin/make -f # This file was automatically generated by stdeb 0.8.5 at # Tue, 22 Sep 2015 12:05:09 +0200 export PYBUILD_NAME=swh-core %: dh $@ --with python3 --buildsystem=pybuild override_dh_auto_test: PYBUILD_SYSTEM=custom \ - PYBUILD_TEST_ARGS="python{version} -m nose -sv swh.core" \ + PYBUILD_TEST_ARGS="python{version} -m nose -sv swh.core -a '!db'" \ dh_auto_test diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index cead7a6..f90bab7 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.core -Version: 0.0.6 +Version: 0.0.7 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.core.egg-info/SOURCES.txt b/swh.core.egg-info/SOURCES.txt index 08b0a2e..46457a3 100644 --- a/swh.core.egg-info/SOURCES.txt +++ b/swh.core.egg-info/SOURCES.txt @@ -1,30 +1,32 @@ .gitignore MANIFEST.in Makefile requirements.txt setup.py version.txt bin/swh-hashdir bin/swh-hashfile debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format sql/log-schema.sql swh.core.egg-info/PKG-INFO swh.core.egg-info/SOURCES.txt swh.core.egg-info/dependency_links.txt swh.core.egg-info/requires.txt swh.core.egg-info/top_level.txt swh/core/config.py swh/core/hashutil.py swh/core/logger.py swh/core/scheduling.py swh/core/serializers.py +swh/core/worker.py swh/core/tests/db_testing.py swh/core/tests/test_config.py swh/core/tests/test_hashutil.py +swh/core/tests/test_logger.py swh/core/tests/test_scheduling.py swh/core/tests/test_serializers.py \ No newline at end of file diff --git a/swh/core/config.py b/swh/core/config.py index 0e285ee..8bb4c04 100644 --- a/swh/core/config.py +++ b/swh/core/config.py @@ -1,185 +1,203 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import configparser import os SWH_CONFIG_DIRECTORIES = [ '~/.config/swh', '~/.swh', '/etc/softwareheritage', ] SWH_GLOBAL_CONFIG = 'global.ini' SWH_DEFAULT_GLOBAL_CONFIG = { 'content_size_limit': ('int', 100 * 1024 * 1024), 'log_db': ('str', 'dbname=softwareheritage-log'), } # conversion per type _map_convert_fn = { 'int': int, 'bool': lambda x: x.lower() == 'true', 'list[str]': lambda x: [value.strip() for value in x.split(',')], 'list[int]': lambda x: [int(value.strip()) for value in x.split(',')], } def read(conf_file=None, default_conf=None): """Read the user's configuration file. Fill in the gap using `default_conf`. `default_conf` is similar to this: DEFAULT_CONF = { 'a': ('string', '/tmp/swh-loader-git/log'), 'b': ('string', 'dbname=swhloadergit') 'c': ('bool', true) 'e': ('bool', None) 'd': ('int', 10) } If conf_file is None, return the default config. """ conf = {} if conf_file: config_path = os.path.expanduser(conf_file) if os.path.exists(config_path): config = configparser.ConfigParser(defaults=default_conf) config.read(os.path.expanduser(conf_file)) if 'main' in config._sections: conf = config._sections['main'] if not default_conf: default_conf = {} # remaining missing default configuration key are set # also type conversion is enforced for underneath layer for key in default_conf: nature_type, default_value = default_conf[key] val = conf.get(key, None) if not val: # fallback to default value conf[key] = default_value else: # value present but in string format, force type conversion conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val) return conf def priority_read(conf_filenames, default_conf=None): """Try reading the configuration files from conf_filenames, in order, and return the configuration from the first one that exists. default_conf has the same specification as it does in read. """ # Try all the files in order for filename in conf_filenames: full_filename = os.path.expanduser(filename) if os.path.exists(full_filename): return read(full_filename, default_conf) # Else, return the default configuration return read(None, default_conf) def merge_default_configs(base_config, *other_configs): """Merge several default config dictionaries, from left to right""" full_config = base_config.copy() for config in other_configs: full_config.update(config) return full_config def swh_config_paths(base_filename): """Return the Software Heritage specific configuration paths for the given filename.""" return [os.path.join(dirname, base_filename) for dirname in SWH_CONFIG_DIRECTORIES] def prepare_folders(conf, *keys): """Prepare the folder mentioned in config under keys. """ def makedir(folder): if not os.path.exists(folder): os.makedirs(folder) for key in keys: makedir(conf[key]) def load_global_config(): """Load the global Software Heritage config""" return priority_read( swh_config_paths(SWH_GLOBAL_CONFIG), SWH_DEFAULT_GLOBAL_CONFIG, ) +def load_named_config(name, default_conf=None, global_conf=True): + """Load the config named `name` from the Software Heritage + configuration paths. + + If global_conf is True (default), read the global configuration + too. + """ + + conf = {} + + if global_conf: + conf.update(load_global_config()) + + conf.update(priority_read(swh_config_paths(name), default_conf)) + + return conf + + class SWHConfig: """Mixin to add configuration parsing abilities to classes The class should override the class attributes: - DEFAULT_CONFIG (default configuration to be parsed) - - CONFIG_FILENAME (the filename of the configuration to be used) + - CONFIG_BASE_FILENAME (the filename of the configuration to be used) This class defines one classmethod, parse_config_file, which parses a configuration file using the default config as set in the class attribute. """ DEFAULT_CONFIG = {} CONFIG_BASE_FILENAME = '' @classmethod def parse_config_file(cls, base_filename=None, config_filename=None, additional_configs=None, global_config=True): """Parse the configuration file associated to the current class. By default, parse_config_file will load the configuration cls.CONFIG_BASE_FILENAME from one of the Software Heritage configuration directories, in order, unless it is overridden by base_filename or config_filename (which shortcuts the file lookup completely). Args: - base_filename (str) overrides the default cls.CONFIG_BASE_FILENAME - config_filename (str) sets the file to parse instead of the defaults set from cls.CONFIG_BASE_FILENAME - additional_configs (list of default configuration dicts) allows to override or extend the configuration set in cls.DEFAULT_CONFIG. - global_config (bool): Load the global configuration (default: True) """ if config_filename: config_filenames = [config_filename] else: if not base_filename: base_filename = cls.CONFIG_BASE_FILENAME config_filenames = swh_config_paths(base_filename) if not additional_configs: additional_configs = [] full_default_config = merge_default_configs(cls.DEFAULT_CONFIG, *additional_configs) config = {} if global_config: config = load_global_config() config.update(priority_read(config_filenames, full_default_config)) return config diff --git a/swh/core/logger.py b/swh/core/logger.py index 0c2298a..96355a3 100644 --- a/swh/core/logger.py +++ b/swh/core/logger.py @@ -1,83 +1,87 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import psycopg2 import socket from psycopg2.extras import Json EXTRA_LOGDATA_PREFIX = 'swh_' def db_level_of_py_level(lvl): """convert a log level of the logging module to a log level suitable for the logging Postgres DB """ return logging.getLevelName(lvl).lower() class PostgresHandler(logging.Handler): """log handler that store messages in a Postgres DB See swh-core/sql/log-schema.sql for the DB schema. All logging methods can be used as usual. Additionally, arbitrary metadata can be passed to logging methods, requesting that they will be stored in the DB as a single JSONB value. To do so, pass a dictionary to the 'extra' kwarg of any logging method; all keys in that dictionary that start with EXTRA_LOGDATA_PREFIX (currently: 'swh_') will be extracted to form the JSONB dictionary. The prefix will be stripped and not included in the DB. + Note: the logger name will be used to fill the 'module' DB column. + Sample usage: logging.basicConfig(level=logging.INFO) - h = PostgresHandler({'log_db': 'dbname=softwareheritage-log'}) + h = PostgresHandler('dbname=softwareheritage-log') logging.getLogger().addHandler(h) logger.info('not so important notice', extra={'swh_type': 'swh_logging_test', 'swh_meditation': 'guru'}) logger.warn('something weird just happened, did you see that?') """ - def __init__(self, config): + def __init__(self, connstring): """ Create a Postgres log handler. Args: config: configuration dictionary, with a key "log_db" containing a libpq connection string to the log DB """ super().__init__() - self.config = config - self.conn = psycopg2.connect(self.config['log_db']) + self.conn = psycopg2.connect(connstring) self.fqdn = socket.getfqdn() # cache FQDN value + def close(self): + self.conn.close() + super().close() + def emit(self, record): log_data = record.__dict__ + msg = self.format(record) + extra_data = {k[len(EXTRA_LOGDATA_PREFIX):]: v for k, v in log_data.items() if k.startswith(EXTRA_LOGDATA_PREFIX)} - log_entry = (db_level_of_py_level(log_data['levelno']), - log_data['msg'], - Json(extra_data), - log_data['module'], - self.fqdn, + log_entry = (db_level_of_py_level(log_data['levelno']), msg, + Json(extra_data), log_data['name'], self.fqdn, os.getpid()) with self.conn.cursor() as cur: cur.execute('INSERT INTO log ' '(level, message, data, src_module, src_host, src_pid)' 'VALUES (%s, %s, %s, %s, %s, %s)', log_entry) self.conn.commit() diff --git a/swh/core/scheduling.py b/swh/core/scheduling.py index 8c4f52b..16f307f 100644 --- a/swh/core/scheduling.py +++ b/swh/core/scheduling.py @@ -1,20 +1,31 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import celery +from celery.utils.log import get_task_logger class Task(celery.Task): """a schedulable task (abstract class) Sub-classes must implement the run() method Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ + + abstract = True + def run(self, *args, **kwargs): raise NotImplementedError('tasks must implement the run() method') + + @property + def log(self): + if not hasattr(self, '__log'): + self.__log = get_task_logger('%s.%s' % + (__name__, self.__class__.__name__)) + return self.__log diff --git a/swh/core/tests/db_testing.py b/swh/core/tests/db_testing.py index 8cba731..1117eb4 100644 --- a/swh/core/tests/db_testing.py +++ b/swh/core/tests/db_testing.py @@ -1,140 +1,155 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os import psycopg2 import subprocess -TEST_DB_NAME = 'softwareheritage-test' -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') -TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') - - -def pg_restore(dbname, dumpfile): - subprocess.check_call(['pg_restore', '--no-owner', '--no-privileges', - '--dbname', dbname, dumpfile]) +def pg_restore(dbname, dumpfile, dumptype='pg_dump'): + """ + Args: + dbname: name of the DB to restore into + dumpfile: path fo the dump file + dumptype: one of 'pg_dump' (for binary dumps), 'psql' (for SQL dumps) + """ + assert dumptype in ['pg_dump', 'psql'] + if dumptype == 'pg_dump': + subprocess.check_call(['pg_restore', '--no-owner', '--no-privileges', + '--dbname', dbname, dumpfile]) + elif dumptype == 'psql': + subprocess.check_call(['psql', '--quiet', '-f', dumpfile, dbname]) def pg_dump(dbname, dumpfile): subprocess.check_call(['pg_dump', '--no-owner', '--no-privileges', '-Fc', '-f', dumpfile, dbname]) def pg_dropdb(dbname): subprocess.check_call(['dropdb', dbname]) def pg_createdb(dbname): subprocess.check_call(['createdb', dbname]) -def db_create(test_subj, dbname=TEST_DB_NAME, dbdump=TEST_DB_DUMP): +def db_create(test_subj, dbname, dump=None, dumptype='pg_dump'): """create the test DB and load the test data dump into it context: setUpClass """ try: pg_createdb(dbname) except subprocess.CalledProcessError: # try recovering once, in case pg_dropdb(dbname) # the db already existed pg_createdb(dbname) - pg_restore(dbname, dbdump) + if dump: + pg_restore(dbname, dump, dumptype) test_subj.dbname = dbname def db_destroy(test_subj): """destroy the test DB context: tearDownClass """ pg_dropdb(test_subj.dbname) def db_connect(test_subj): """connect to the test DB and open a cursor context: setUp """ test_subj.conn = psycopg2.connect('dbname=' + test_subj.dbname) test_subj.cursor = test_subj.conn.cursor() def db_close(test_subj): """rollback current transaction and disconnet from the test DB context: tearDown """ if not test_subj.conn.closed: test_subj.conn.rollback() test_subj.conn.close() class DbTestFixture(): """Mix this in a test subject class to get DB testing support. + The class can override the following class attributes: + TEST_DB_NAME: name of the DB used for testing + TEST_DB_DUMP: DB dump to be restored before running test methods; can + be set to None if no restore from dump is required + TEST_DB_DUMP_TYPE: one of 'pg_dump' (binary dump) or 'psql' (SQL dump) + The test case class will then have the following attributes, accessible via self: dbname: name of the test database conn: psycopg2 connection object cursor: open psycopg2 cursor to the DB To ensure test isolation, each test method of the test case class will execute in its own connection, cursor, and transaction. To ensure setup/teardown methods are called, in case of multiple inheritance DbTestFixture should be the first class in the inheritance hierarchy. Note that if you want to define setup/teardown methods, you need to explicitly call super() to ensure that the fixture setup/teardown methods are invoked. Here is an example where all setup/teardown methods are defined in a test case: class TestDb(DbTestFixture, unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() # your class setup code here def setUp(self): super().setUp() # your instance setup code here def tearDown(self): # your instance teardown code here super().tearDown() @classmethod def tearDownClass(cls): # your class teardown code here super().tearDownClass() """ + TEST_DB_NAME = 'softwareheritage-test' + TEST_DB_DUMP = None + TEST_DB_DUMP_TYPE = 'pg_dump' + @classmethod def setUpClass(cls): - db_create(cls) + db_create(cls, dbname=cls.TEST_DB_NAME, + dump=cls.TEST_DB_DUMP, dumptype=cls.TEST_DB_DUMP_TYPE) super().setUpClass() def setUp(self): db_connect(self) super().setUp() def tearDown(self): super().tearDown() db_close(self) @classmethod def tearDownClass(cls): super().tearDownClass() db_destroy(cls) diff --git a/swh/core/tests/test_logger.py b/swh/core/tests/test_logger.py new file mode 100644 index 0000000..4c16b27 --- /dev/null +++ b/swh/core/tests/test_logger.py @@ -0,0 +1,50 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import os +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core.logger import PostgresHandler +from swh.core.tests.db_testing import DbTestFixture + + +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +SQL_DIR = os.path.join(TEST_DIR, '../../../sql') + + +@attr('db') +class PgLogHandler(DbTestFixture, unittest.TestCase): + + TEST_DB_DUMP = os.path.join(SQL_DIR, 'log-schema.sql') + TEST_DB_DUMP_TYPE = 'psql' + + def setUp(self): + super().setUp() + self.modname = 'swh.core.tests.test_logger' + self.logger = logging.Logger(self.modname, logging.DEBUG) + self.logger.addHandler(PostgresHandler('dbname=' + self.TEST_DB_NAME)) + + def tearDown(self): + logging.shutdown() + super().tearDown() + + @istest + def log(self): + self.logger.info('notice', + extra={'swh_type': 'test entry', 'swh_data': 42}) + self.logger.warn('warning') + + with self.conn.cursor() as cur: + cur.execute('SELECT level, message, data, src_module FROM log') + db_log_entries = cur.fetchall() + + self.assertIn(('info', 'notice', {'type': 'test entry', 'data': 42}, + self.modname), + db_log_entries) + self.assertIn(('warning', 'warning', {}, self.modname), db_log_entries) diff --git a/swh/core/worker.py b/swh/core/worker.py new file mode 100644 index 0000000..fd30c52 --- /dev/null +++ b/swh/core/worker.py @@ -0,0 +1,98 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from celery import Celery +from celery.signals import after_setup_logger, after_setup_task_logger +from kombu import Exchange, Queue + +from swh.core.config import load_named_config +from swh.core.logger import PostgresHandler + +CONFIG_NAME = 'worker.ini' +DEFAULT_CONFIG = { + 'task_broker': ('str', 'amqp://guest@localhost//'), + 'task_modules': ('list[str]', []), + 'task_queues': ('list[str]', []), + 'task_soft_time_limit': ('int', 0), +} + + +@after_setup_logger.connect +@after_setup_task_logger.connect +def setup_log_handler(sender=None, logger=None, loglevel=None, + logfile=None, format=None, + colorize=None, **kwds): + """Setup the postgresql log handler""" + + handler = PostgresHandler(CONFIG['log_db']) + handler.setFormatter(logging.Formatter(format)) + handler.setLevel(logging.DEBUG) + logger.addHandler(handler) + +# Load the Celery config +CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) + +# Celery Queues +CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] + +for queue in CONFIG['task_queues']: + CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) + +# Instantiate the Celery app +app = Celery() +app.conf.update( + # The broker + BROKER_URL=CONFIG['task_broker'], + # Timezone configuration: all in UTC + CELERY_ENABLE_UTC=True, + CELERY_TIMEZONE='UTC', + # Imported modules + CELERY_IMPORTS=CONFIG['task_modules'], + # Time (in seconds, or a timedelta object) for when after stored task + # tombstones will be deleted. + CELERY_TASK_RESULT_EXPIRES=3600, + # Late ack means the task messages will be acknowledged after the task has + # been executed, not just before, which is the default behavior. + CELERY_ACKS_LATE=True, + # A string identifying the default serialization method to use. + # Can be pickle (default), json, yaml, msgpack or any custom serialization + # methods that have been registered with kombu.serialization.registry + CELERY_ACCEPT_CONTENT=['msgpack', 'pickle', 'json'], + # If True the task will report its status as “started” + # when the task is executed by a worker. + CELERY_TRACK_STARTED=True, + # Default compression used for task messages. Can be gzip, bzip2 + # (if available), or any custom compression schemes registered + # in the Kombu compression registry. + # CELERY_MESSAGE_COMPRESSION='bzip2', + # Disable all rate limits, even if tasks has explicit rate limits set. + # (Disabling rate limits altogether is recommended if you don’t have any + # tasks using them.) + CELERY_DISABLE_RATE_LIMITS=True, + # Task hard time limit in seconds. The worker processing the task will be + # killed and replaced with a new one when this is exceeded. + # CELERYD_TASK_TIME_LIMIT=3600, + # Task soft time limit in seconds. + # The SoftTimeLimitExceeded exception will be raised when this is exceeded. + # The task can catch this to e.g. clean up before the hard time limit + # comes. + CELERYD_TASK_SOFT_TIME_LIMIT=CONFIG['task_soft_time_limit'], + # Task routing + CELERY_ROUTES={ + 'swh.loader.git.tasks.LoadGitRepository': { + 'queue': 'swh_loader_git', + }, + 'swh.loader.git.tasks.LoadGitHubRepository': { + 'queue': 'swh_loader_git', + }, + 'swh.cloner.git.worker.tasks.execute_with_measure': { + 'queue': 'swh_cloner_git', + }, + }, + # Task queues this worker will consume from + CELERY_QUEUES=CELERY_QUEUES, +) diff --git a/version.txt b/version.txt index a78aec6..60b4f3d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.6-0-gbe6d313 \ No newline at end of file +v0.0.7-0-g6163fdb \ No newline at end of file