diff --git a/ardumont/kibana_fetch_logs.py b/ardumont/kibana_fetch_logs.py index e49dd5b..f3ca394 100755 --- a/ardumont/kibana_fetch_logs.py +++ b/ardumont/kibana_fetch_logs.py @@ -1,234 +1,290 @@ #!/usr/bin/env python3 # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Configuration file at ~/.config/swh/kibana/query.yml # (configuration sample at https://forge.softwareheritage.org/P221) # Use: # ./kibana_fetch_logs.py | tee temporary-error-file import ast import click import logging import requests from swh.core.config import SWHConfig +logger = logging.getLogger(__name__) + + +def old_task_information_keys_p(keys): + """Old format keys for logging task input information + + """ + for k in keys: + if 'swh_logging_' in k: + return True + return False + + +def old_parse_task_arguments(source, keys): + """Old format parsing logic + + """ + swh_logging_args_args = source.get('swh_logging_args_args') + if swh_logging_args_args: + args = ast.literal_eval(swh_logging_args_args) + swh_logging_args_kwargs = source.get('swh_logging_args_kwargs') + if swh_logging_args_kwargs: + kwargs = ast.literal_eval(swh_logging_args_kwargs) + exception = source.get('swh_logging_args_exc') + return args, kwargs, exception + + +def task_information_keys_p(keys): + """Are there `swh_task_` keys in the set + + """ + for k in keys: + logger.debug('#### task_info_keys_p: key %s', k) + if 'swh_task_' in k: + return True + return False + + +def parse_task_arguments(source, keys): + """Parse the task arguments as args, kwargs and None (no exception to + parse). + + """ + task_args = parse_task_args(source, keys) + task_kwargs = parse_task_kwargs(source, keys) + exception = None + return task_args, task_kwargs, exception + + +def parse_task_args(source, keys=[]): + """Parse task args + + >>> source = {'swh_task_args_0': 1, 'swh_task_args_1': 2, 'swh_task_kwargs_some': 'one'} + >>> parse_task_args(source) + [1, 2] + + """ + if not keys: + keys = source.keys() + args = [] + prefix_key = 'swh_task_args_' + logger.debug('parse-task-args: source: %s', source) + for k in (k for k in keys if k.startswith(prefix_key)): + logger.debug('parse-task-args: %s', k) + index = k.split(prefix_key)[-1] + args.insert(int(index), source[k]) + return args + + +def parse_task_kwargs(source, keys=[]): + """Parse task kwargs + + >>> source = {'swh_task_args_0': 1, 'swh_task_args_1': 2, 'swh_task_kwargs_no': 'one'} # noqa + >>> parse_task_kwargs(source) + {'no': 'one'} + + """ + if not keys: + keys = source.keys() + kwargs = {} + prefix_key = 'swh_task_kwargs_' + for k in (k for k in keys if k.startswith(prefix_key)): + logger.debug('parse-task-kwargs: %s', k) + key_name = k.split(prefix_key)[-1] + kwargs[key_name] = source[k] + + return kwargs + + class KibanaFetchLog(SWHConfig): """Kibana fetch log class to permit log retrieval. """ CONFIG_BASE_FILENAME = 'kibana/query' DEFAULT_CONFIG = { 'server': ('str', 'http://esnode3.internal.softwareheritage.org:9200'), 'indexes': ('list[str]', [ 'swh_workers-2017.05.*', 'swh_workers-2017.06.*']), 'size': ('int', 10), 'from': ('int', 0), '_source': ('list[str]', [ 'message', - 'swh_logging_args_args', - 'swh_logging_args_exc', - 'swh_logging_args_kwargs']), + 'swh_task_args_0', + ]), 'sort': ('list', [{ '@timestamp': 'asc' }]), 'query': ('dict', { 'bool': { 'must': [ { 'match': { 'systemd_unit.keyword': { - 'query': 'swh-worker@swh_loader_svn.service', + 'query': 'swh-worker@loader_svn.service', } } }, { 'term': { 'priority': '3' } } ] } }), } ADDITIONAL_CONFIG = {} def __init__(self, config=None): self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) if config: # permit to amend the configuration file if it exists self.config.update(config) self.server = self.config['server'] self.indexes = ','.join(self.config['indexes']) self.payload = { key: self.config[key] for key in ['from', '_source', 'query', 'size', 'sort'] } - logging.debug('### Server: %s' % self.server) - logging.debug('### Indexes: %s' % self.indexes) + logger.debug('### Server: %s' % self.server) + logger.debug('### Indexes: %s' % self.indexes) def _retrieve_data(self, server, indexes, start=None): """Retrieve information from server looking up through 'indexes'. This returns result of length 'size' (configuration) starting from the 'start' position. """ payload = self.payload if start: payload['search_after'] = [start] url = '%s/%s/_search' % (server, indexes) r = requests.post(url, json=payload) - logging.debug('Payload: %s' % payload) + logger.debug('Payload: %s' % payload) if not r.ok: - logging.debug('Response: %s' % r.content) + logger.debug('Response: %s' % r.content) raise ValueError("Problem when communicating with server: %s" % ( r.status_code, )) return r.json() def _format_result(self, json): """Format result from the server's response""" if not json: return {} hits = json.get('hits') if not hits: return {} total_hits = hits.get('total') if not total_hits: return {} hits = hits.get('hits') if not hits: return {} all_data = [] last_sort_time = None - def swh_logging_key_information(keys): - for k in keys: - if 'swh_logging_' in k: - return True - return False - for data in hits: last_sort_time = data['sort'][0] source = data['_source'] - _data = { - 'args': [], - 'kwargs': {}, - } + logger.debug('#### data: %s', data) + logger.debug('#### source: %s', source) + _data = {} source_keys = source.keys() - - if swh_logging_key_information(source_keys): - swh_logging_args_args = source.get('swh_logging_args_args') - if swh_logging_args_args: - _data['args'] = ast.literal_eval(swh_logging_args_args) - - swh_logging_args_kwargs = source.get('swh_logging_args_kwargs') - if swh_logging_args_kwargs: - _data['kwargs'] = ast.literal_eval(swh_logging_args_kwargs) - - exception = source.get('swh_logging_args_exc') - if exception: - _data['exception'] = exception - else: # determine the args/kwargs for each result - args_keys = [] - kwargs_keys = [] - - # filtering args - if not args_keys: - for k in sorted(k for k in source_keys if '_args_' in k): - args_keys.append(k) - - # and kwargs - if not kwargs_keys: - for k in (k for k in source_keys if '_kwargs_' in k): - kwargs_keys.append(k) - - for _arg in args_keys: - args_value = source.get(_arg) - if args_value: - _data['args'].append(args_value) - - for _kwarg in kwargs_keys: - kwargs_value = source.get(_kwarg) - if kwargs_value: - kwargs_key = _kwarg.split('swh_task_kwargs_')[1] - _data['kwargs'][kwargs_key] = kwargs_value - - message = source.get('message') - if 'exception' not in _data: - _data['exception'] = message + args = [] + kwargs = {} + exception = None + + if task_information_keys_p(source_keys): + args, kwargs, exception = parse_task_arguments( + source, source_keys) + elif old_task_information_keys_p(source_keys): # old logs format + args, kwargs, exception = old_parse_task_arguments( + source, source_keys) + else: + logger.warning('Record format unknown: %s, skipping', source) + + _data['args'] = args + _data['kwargs'] = kwargs + _data['exception'] = exception if exception else source['message'] if _data: all_data.append(_data) return { 'all': all_data, 'last_sort_time': last_sort_time, 'total_hits': total_hits } def fetch(self): """Fetch wanted information (cf. 'query' entry in configuration). """ count = 0 last_sort_time = None total_hits = 1 server = self.server indexes = self.indexes while count < total_hits: response = self._retrieve_data( server, indexes, start=last_sort_time) data = self._format_result(response) if not data: break total_hits = data['total_hits'] last_sort_time = data['last_sort_time'] for row in data['all']: count += 1 yield row @click.command() @click.option('--server', default=None, help='Elastic search instance to query against') @click.option('--indexes', default=None, help='ElasticSearch indexes to lookup (csv if many)') @click.option('--size', default=10, type=click.INT, help='Pagination size') @click.option('--debug/--nodebug', is_flag=True, default=False) def main(server, indexes, size, debug): logging.basicConfig(level=logging.DEBUG if debug else logging.INFO) config = {} if server: config['server'] = server if indexes: config['indexes'] = indexes.split(',') if size: config['size'] = size fetcher = KibanaFetchLog(config) for entry in fetcher.fetch(): print(entry) if __name__ == '__main__': main()