Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9696572
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
20 KB
Subscribers
None
View Options
diff --git a/swh/core/config.py b/swh/core/config.py
index 054299b..2fdd45e 100644
--- a/swh/core/config.py
+++ b/swh/core/config.py
@@ -1,275 +1,276 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import configparser
import os
import yaml
SWH_CONFIG_DIRECTORIES = [
'~/.config/swh',
'~/.swh',
'/etc/softwareheritage',
]
SWH_GLOBAL_CONFIG = 'global.ini'
SWH_DEFAULT_GLOBAL_CONFIG = {
'content_size_limit': ('int', 100 * 1024 * 1024),
'log_db': ('str', 'dbname=softwareheritage-log'),
}
SWH_CONFIG_EXTENSIONS = [
'.yml',
'.ini',
]
# conversion per type
_map_convert_fn = {
'int': int,
'bool': lambda x: x.lower() == 'true',
'list[str]': lambda x: [value.strip() for value in x.split(',')],
'list[int]': lambda x: [int(value.strip()) for value in x.split(',')],
}
_map_check_fn = {
'int': lambda x: isinstance(x, int),
'bool': lambda x: isinstance(x, bool),
'list[str]': lambda x: (isinstance(x, list) and
all(isinstance(y, str) for y in x)),
'list[int]': lambda x: (isinstance(x, list) and
all(isinstance(y, int) for y in x)),
}
def exists_accessible(file):
"""Check whether a file exists, and is accessible.
Returns:
True if the file exists and is accessible
False if the file does not exist
Raises:
PermissionError if the file cannot be read.
"""
try:
os.stat(file)
except PermissionError:
raise
except FileNotFoundError:
return False
else:
if os.access(file, os.R_OK):
return True
else:
raise PermissionError("Permission denied: %r" % file)
def config_basepath(config_path):
"""Return the base path of a configuration file"""
if config_path.endswith(('.ini', '.yml')):
return config_path[:-4]
return config_path
def read_raw_config(base_config_path):
"""Read the raw config corresponding to base_config_path.
Can read yml or ini files.
"""
yml_file = base_config_path + '.yml'
if exists_accessible(yml_file):
with open(yml_file) as f:
return yaml.safe_load(f)
ini_file = base_config_path + '.ini'
if exists_accessible(ini_file):
config = configparser.ConfigParser()
config.read(ini_file)
if 'main' in config._sections:
return config._sections['main']
return {}
def config_exists(config_path):
"""Check whether the given config exists"""
basepath = config_basepath(config_path)
return any(exists_accessible(basepath + extension)
for extension in SWH_CONFIG_EXTENSIONS)
def read(conf_file=None, default_conf=None):
"""Read the user's configuration file.
- Fill in the gap using `default_conf`.
-`default_conf` is similar to this:
-DEFAULT_CONF = {
- 'a': ('str', '/tmp/swh-loader-git/log'),
- 'b': ('str', 'dbname=swhloadergit')
- 'c': ('bool', true)
- 'e': ('bool', None)
- 'd': ('int', 10)
-}
-If conf_file is None, return the default config.
+ Fill in the gap using `default_conf`. `default_conf` is similar to this::
+
+ DEFAULT_CONF = {
+ 'a': ('str', '/tmp/swh-loader-git/log'),
+ 'b': ('str', 'dbname=swhloadergit')
+ 'c': ('bool', true)
+ 'e': ('bool', None)
+ 'd': ('int', 10)
+ }
+
+ If conf_file is None, return the default config.
"""
conf = {}
if conf_file:
base_config_path = config_basepath(os.path.expanduser(conf_file))
conf = read_raw_config(base_config_path)
if not default_conf:
default_conf = {}
# remaining missing default configuration key are set
# also type conversion is enforced for underneath layer
for key in default_conf:
nature_type, default_value = default_conf[key]
val = conf.get(key, None)
if val is None: # fallback to default value
conf[key] = default_value
elif not _map_check_fn.get(nature_type, lambda x: True)(val):
# value present but not in the proper format, force type conversion
conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val)
return conf
def priority_read(conf_filenames, default_conf=None):
"""Try reading the configuration files from conf_filenames, in order,
and return the configuration from the first one that exists.
default_conf has the same specification as it does in read.
"""
# Try all the files in order
for filename in conf_filenames:
full_filename = os.path.expanduser(filename)
if config_exists(full_filename):
return read(full_filename, default_conf)
# Else, return the default configuration
return read(None, default_conf)
def merge_default_configs(base_config, *other_configs):
"""Merge several default config dictionaries, from left to right"""
full_config = base_config.copy()
for config in other_configs:
full_config.update(config)
return full_config
def swh_config_paths(base_filename):
"""Return the Software Heritage specific configuration paths for the given
filename."""
return [os.path.join(dirname, base_filename)
for dirname in SWH_CONFIG_DIRECTORIES]
def prepare_folders(conf, *keys):
"""Prepare the folder mentioned in config under keys.
"""
def makedir(folder):
if not os.path.exists(folder):
os.makedirs(folder)
for key in keys:
makedir(conf[key])
def load_global_config():
"""Load the global Software Heritage config"""
return priority_read(
swh_config_paths(SWH_GLOBAL_CONFIG),
SWH_DEFAULT_GLOBAL_CONFIG,
)
def load_named_config(name, default_conf=None, global_conf=True):
"""Load the config named `name` from the Software Heritage
configuration paths.
If global_conf is True (default), read the global configuration
too.
"""
conf = {}
if global_conf:
conf.update(load_global_config())
conf.update(priority_read(swh_config_paths(name), default_conf))
return conf
class SWHConfig:
"""Mixin to add configuration parsing abilities to classes
The class should override the class attributes:
- DEFAULT_CONFIG (default configuration to be parsed)
- CONFIG_BASE_FILENAME (the filename of the configuration to be used)
This class defines one classmethod, parse_config_file, which
parses a configuration file using the default config as set in the
class attribute.
"""
DEFAULT_CONFIG = {}
CONFIG_BASE_FILENAME = ''
@classmethod
def parse_config_file(cls, base_filename=None, config_filename=None,
additional_configs=None, global_config=True):
"""Parse the configuration file associated to the current class.
By default, parse_config_file will load the configuration
cls.CONFIG_BASE_FILENAME from one of the Software Heritage
configuration directories, in order, unless it is overridden
by base_filename or config_filename (which shortcuts the file
lookup completely).
Args:
- base_filename (str) overrides the default
cls.CONFIG_BASE_FILENAME
- config_filename (str) sets the file to parse instead of
the defaults set from cls.CONFIG_BASE_FILENAME
- additional_configs (list of default configuration dicts)
allows to override or extend the configuration set in
cls.DEFAULT_CONFIG.
- global_config (bool): Load the global configuration (default:
True)
"""
if config_filename:
config_filenames = [config_filename]
else:
if not base_filename:
base_filename = cls.CONFIG_BASE_FILENAME
config_filenames = swh_config_paths(base_filename)
if not additional_configs:
additional_configs = []
full_default_config = merge_default_configs(cls.DEFAULT_CONFIG,
*additional_configs)
config = {}
if global_config:
config = load_global_config()
config.update(priority_read(config_filenames, full_default_config))
return config
diff --git a/swh/core/logger.py b/swh/core/logger.py
index 30667f2..06e0847 100644
--- a/swh/core/logger.py
+++ b/swh/core/logger.py
@@ -1,192 +1,192 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import logging
import os
import socket
import psycopg2
from psycopg2.extras import Json
from systemd.journal import JournalHandler as _JournalHandler, send
try:
from celery import current_task
except ImportError:
current_task = None
EXTRA_LOGDATA_PREFIX = 'swh_'
def db_level_of_py_level(lvl):
"""convert a log level of the logging module to a log level suitable for the
logging Postgres DB
"""
return logging.getLevelName(lvl).lower()
def get_extra_data(record, task_args=True):
"""Get the extra data to insert to the database from the logging record"""
log_data = record.__dict__
extra_data = {k[len(EXTRA_LOGDATA_PREFIX):]: v
for k, v in log_data.items()
if k.startswith(EXTRA_LOGDATA_PREFIX)}
args = log_data.get('args')
if args:
extra_data['logging_args'] = args
# Retrieve Celery task info
if current_task and current_task.request:
extra_data['task'] = {
'id': current_task.request.id,
'name': current_task.name,
}
if task_args:
extra_data['task'].update({
'kwargs': current_task.request.kwargs,
'args': current_task.request.args,
})
return extra_data
def flatten(data, separator='_'):
"""Flatten the data dictionary into a flat structure"""
def inner_flatten(data, prefix):
if isinstance(data, dict):
for key, value in data.items():
yield from inner_flatten(value, prefix + [key])
elif isinstance(data, (list, tuple)):
for key, value in enumerate(data):
yield from inner_flatten(value, prefix + [str(key)])
else:
yield prefix, data
for path, value in inner_flatten(data, []):
yield separator.join(path), value
def stringify(value):
"""Convert value to string"""
if isinstance(value, datetime.datetime):
return value.isoformat()
return str(value)
class PostgresHandler(logging.Handler):
"""log handler that store messages in a Postgres DB
See swh-core/sql/log-schema.sql for the DB schema.
All logging methods can be used as usual. Additionally, arbitrary metadata
can be passed to logging methods, requesting that they will be stored in
the DB as a single JSONB value. To do so, pass a dictionary to the 'extra'
kwarg of any logging method; all keys in that dictionary that start with
- EXTRA_LOGDATA_PREFIX (currently: 'swh_') will be extracted to form the
+ EXTRA_LOGDATA_PREFIX (currently: 'swh\_') will be extracted to form the
JSONB dictionary. The prefix will be stripped and not included in the DB.
Note: the logger name will be used to fill the 'module' DB column.
- Sample usage:
+ Sample usage::
logging.basicConfig(level=logging.INFO)
h = PostgresHandler('dbname=softwareheritage-log')
logging.getLogger().addHandler(h)
logger.info('not so important notice',
extra={'swh_type': 'swh_logging_test',
'swh_meditation': 'guru'})
logger.warn('something weird just happened, did you see that?')
"""
def __init__(self, connstring):
"""
Create a Postgres log handler.
Args:
config: configuration dictionary, with a key "log_db" containing a
libpq connection string to the log DB
"""
super().__init__()
self.connstring = connstring
self.fqdn = socket.getfqdn() # cache FQDN value
def _connect(self):
return psycopg2.connect(self.connstring)
def emit(self, record):
msg = self.format(record)
extra_data = get_extra_data(record)
if 'task' in extra_data:
task_args = {
'args': extra_data['task']['args'],
'kwargs': extra_data['task']['kwargs'],
}
try:
json_args = Json(task_args).getquoted()
except TypeError:
task_args = {
'args': ['<failed to convert arguments to JSON>'],
'kwargs': {},
}
else:
json_args_length = len(json_args)
if json_args_length >= 1000:
task_args = {
'args': ['<arguments too long>'],
'kwargs': {},
}
extra_data['task'].update(task_args)
log_entry = (db_level_of_py_level(record.levelno), msg,
Json(extra_data), record.name, self.fqdn,
os.getpid())
db = self._connect()
with db.cursor() as cur:
cur.execute('INSERT INTO log '
'(level, message, data, src_module, src_host, src_pid)'
'VALUES (%s, %s, %s, %s, %s, %s)',
log_entry)
db.commit()
db.close()
class JournalHandler(_JournalHandler):
def emit(self, record):
"""Write `record` as a journal event.
MESSAGE is taken from the message provided by the user, and PRIORITY,
LOGGER, THREAD_NAME, CODE_{FILE,LINE,FUNC} fields are appended
automatically. In addition, record.MESSAGE_ID will be used if present.
"""
try:
extra_data = flatten(get_extra_data(record, task_args=False))
extra_data = {
(EXTRA_LOGDATA_PREFIX + key).upper(): stringify(value)
for key, value in extra_data
}
msg = self.format(record)
pri = self.mapPriority(record.levelno)
send(msg,
PRIORITY=format(pri),
LOGGER=record.name,
THREAD_NAME=record.threadName,
CODE_FILE=record.pathname,
CODE_LINE=record.lineno,
CODE_FUNC=record.funcName,
**extra_data)
except Exception:
self.handleError(record)
diff --git a/swh/core/serializers.py b/swh/core/serializers.py
index 437fc4f..acaf522 100644
--- a/swh/core/serializers.py
+++ b/swh/core/serializers.py
@@ -1,148 +1,152 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import datetime
from json import JSONDecoder, JSONEncoder
import types
from uuid import UUID
import dateutil.parser
import msgpack
def encode_data_client(data):
try:
return msgpack_dumps(data)
except OverflowError as e:
raise ValueError('Limits were reached. Please, check your input.\n' +
str(e))
def decode_response(response):
content_type = response.headers['content-type']
if content_type.startswith('application/x-msgpack'):
r = msgpack_loads(response.content)
elif content_type.startswith('application/json'):
r = response.json(cls=SWHJSONDecoder)
else:
raise ValueError('Wrong content type `%s` for API response'
% content_type)
return r
class SWHJSONEncoder(JSONEncoder):
"""JSON encoder for data structures generated by Software Heritage.
This JSON encoder extends the default Python JSON encoder and adds
awareness for the following specific types:
- - bytes (get encoded as a Base85 string);
- - datetime.datetime (get encoded as an ISO8601 string).
+
+ - bytes (get encoded as a Base85 string);
+ - datetime.datetime (get encoded as an ISO8601 string).
Non-standard types get encoded as a a dictionary with two keys:
- - swhtype with value 'bytes' or 'datetime';
- - d containing the encoded value.
+
+ - swhtype with value 'bytes' or 'datetime';
+ - d containing the encoded value.
SWHJSONEncoder also encodes arbitrary iterables as a list
(allowing serialization of generators).
Caveats: Limitations in the JSONEncoder extension mechanism
prevent us from "escaping" dictionaries that only contain the
swhtype and d keys, and therefore arbitrary data structures can't
be round-tripped through SWHJSONEncoder and SWHJSONDecoder.
"""
def default(self, o):
if isinstance(o, bytes):
return {
'swhtype': 'bytes',
'd': base64.b85encode(o).decode('ascii'),
}
elif isinstance(o, datetime.datetime):
return {
'swhtype': 'datetime',
'd': o.isoformat(),
}
elif isinstance(o, UUID):
return {
'swhtype': 'uuid',
'd': str(o),
}
try:
return super().default(o)
except TypeError as e:
try:
iterable = iter(o)
except TypeError:
raise e from None
else:
return list(iterable)
class SWHJSONDecoder(JSONDecoder):
"""JSON decoder for data structures encoded with SWHJSONEncoder.
This JSON decoder extends the default Python JSON decoder,
allowing the decoding of:
- - bytes (encoded as a Base85 string);
- - datetime.datetime (encoded as an ISO8601 string).
+
+ - bytes (encoded as a Base85 string);
+ - datetime.datetime (encoded as an ISO8601 string).
Non-standard types must be encoded as a a dictionary with exactly
two keys:
- - swhtype with value 'bytes' or 'datetime';
- - d containing the encoded value.
+
+ - swhtype with value 'bytes' or 'datetime';
+ - d containing the encoded value.
To limit the impact our encoding, if the swhtype key doesn't
contain a known value, the dictionary is decoded as-is.
"""
def decode_data(self, o):
if isinstance(o, dict):
if set(o.keys()) == {'d', 'swhtype'}:
datatype = o['swhtype']
if datatype == 'bytes':
return base64.b85decode(o['d'])
elif datatype == 'datetime':
return dateutil.parser.parse(o['d'])
elif datatype == 'uuid':
return UUID(o['d'])
return {key: self.decode_data(value) for key, value in o.items()}
if isinstance(o, list):
return [self.decode_data(value) for value in o]
else:
return o
def raw_decode(self, s, idx=0):
data, index = super().raw_decode(s, idx)
return self.decode_data(data), index
def msgpack_dumps(data):
"""Write data as a msgpack stream"""
def encode_types(obj):
if isinstance(obj, datetime.datetime):
return {b'__datetime__': True, b's': obj.isoformat()}
if isinstance(obj, types.GeneratorType):
return list(obj)
if isinstance(obj, UUID):
return {b'__uuid__': True, b's': str(obj)}
return obj
return msgpack.packb(data, use_bin_type=True, default=encode_types)
def msgpack_loads(data):
"""Read data as a msgpack stream"""
def decode_types(obj):
if b'__datetime__' in obj and obj[b'__datetime__']:
return dateutil.parser.parse(obj[b's'])
if b'__uuid__' in obj and obj[b'__uuid__']:
return UUID(obj[b's'])
return obj
return msgpack.unpackb(data, encoding='utf-8', object_hook=decode_types)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Aug 18, 8:40 PM (4 h, 26 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3277184
Attached To
rDCORE Foundations and core functionalities
Event Timeline
Log In to Comment