diff --git a/ardumont/group_by_exception.py b/ardumont/group_by_exception.py index 609b671..c1b405d 100755 --- a/ardumont/group_by_exception.py +++ b/ardumont/group_by_exception.py @@ -1,72 +1,88 @@ #!/usr/bin/env python3 # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Use: # ./kibana_fetch_logs.py | tee temporary-error-file | \ # ./group_by_exception.py | jq > temporary-error-file-groupby-exception import ast import click import json import operator import sys from collections import defaultdict, OrderedDict +LOADER_TYPES = ['git', 'svn'] + + def work_on_exception_msg(exception): return exception[0:50] -def group_by(origin_types): +def group_by(origin_types, loader_type): group = {ori_type: defaultdict(list) for ori_type in origin_types} + if loader_type == 'svn': + # args = ('path-to-archive', 'some-origin-url') + origin_key_to_lookup = 1 + elif loader_type == 'git': + # args = {'origin_url: 'some-origin-url} + origin_key_to_lookup = 'origin_url' + for line in sys.stdin: origin_type = None line = line.strip() data = ast.literal_eval(line) for ori_type in origin_types: - if ori_type in data['args']['origin_url']: + args = data['args'] + if ori_type in args[origin_key_to_lookup]: origin_type = ori_type break if not origin_type: continue reworked_exception_msg = work_on_exception_msg(data['exception']) group[origin_type][reworked_exception_msg].append(data['args']) return group @click.command() @click.option('--origin-types', default=['gitorious', 'googlecode'], help='Default types of origin to lookup') -def main(origin_types): - group = group_by(origin_types) +@click.option('--loader-type', default='svn', + help="Type of loader (git, svn)") +def main(origin_types, loader_type): + if loader_type not in LOADER_TYPES: + raise ValueError('Bad input, loader type is one of %s' % LOADER_TYPES) + + group = group_by(origin_types, loader_type) result = {} for ori_type in origin_types: _map = {} total = 0 for k, v in group[ori_type].items(): l = len(v) _map[k] = l total += l out = sorted(_map.items(), key=operator.itemgetter(1), reverse=True) result[ori_type] = { 'total': total, 'errors': OrderedDict(out), } print(json.dumps(result)) if __name__ == '__main__': main() diff --git a/ardumont/kibana_fetch_logs.py b/ardumont/kibana_fetch_logs.py index 6c2bb7f..fe1d126 100755 --- a/ardumont/kibana_fetch_logs.py +++ b/ardumont/kibana_fetch_logs.py @@ -1,128 +1,165 @@ #!/usr/bin/env python3 # Use: # ./kibana_fetch_logs.py | tee temporary-error-file # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import click import requests PAYLOAD_REQUESTS = { "from": 0, "_source": [ + "swh_logging_args_args", "swh_logging_args_exc", "swh_logging_args_kwargs" ], "sort": [ { "@timestamp": "asc" } ], "query": { "bool": { "must": [ { "match": { "systemd_unit": { - "query": "swh-worker@swh_loader_git_disk.service", + "query": "swh-worker@swh_loader_svn.service", "type": "phrase" } } }, { "term": { "priority": "3" } } ], "must_not": [ { "match": { "message": { "query": "[.*] consumer: Cannot connect to amqp.*", "type": "phrase" } } }, { "match": { "message": { "query": "[.*] pidbox command error.*", "type": "phrase" } } } ] } } } def retrieve_data(server, indexes, types, size, start=None): """Retrieve information from server looking up through 'indexes' and 'types'. This returns result of length 'size' starting from the 'start' position. """ payload = PAYLOAD_REQUESTS.copy() payload['size'] = size if start: payload['search_after'] = [start] url = '%s/%s/%s/_search' % (server, indexes, types) + r = requests.post(url, json=payload) if r.ok: return r.json() def format_result(json): """Format result from the server's response""" + if not json: + return {} + + hits = json.get('hits') + if not hits: + return {} + + total_hits = hits.get('total') + if not total_hits: + return {} + + hits = hits.get('hits') + if not hits: + return {} + all_data = [] - total_hits = json['hits']['total'] - for data in json['hits']['hits']: + last_sort_time = None + for data in hits: last_sort_time = data['sort'][0] source = data['_source'] - all_data.append({ - 'args': ast.literal_eval(source['swh_logging_args_kwargs']), - 'exception': source['swh_logging_args_exc'] - }) + _data = {} - return all_data, last_sort_time, total_hits + swh_logging_args_args = source.get('swh_logging_args_args') + if swh_logging_args_args: + _data['args'] = ast.literal_eval(swh_logging_args_args) + + swh_logging_args_kwargs = source.get('swh_logging_args_kwargs') + if swh_logging_args_kwargs: + _data['kwargs'] = ast.literal_eval(swh_logging_args_kwargs) + + exception = source.get('swh_logging_args_exc') + if exception: + _data['exception'] = exception + + if _data: + all_data.append(_data) + + return { + 'all': all_data, + 'last_sort_time': last_sort_time, + 'total_hits': total_hits + } def query_log_server(server, indexes, types, size): count = 0 last_sort_time = None total_hits = 1 while count < total_hits: response = retrieve_data(server, indexes, types, size, start=last_sort_time) - all_data, last_sort_time, total_hits = format_result(response) + data = format_result(response) + if not data: + break + + total_hits = data['total_hits'] - for row in all_data: + for row in data['all']: count += 1 yield row @click.command() @click.option('--server', default='http://banco.internal.softwareheritage.org:9200', help='Elastic search instance to query against') @click.option('--indexes', default='logstash-2017.05.*,logstash-2017.06.*', help='ElasticSearch indexes to lookup (csv if many)') @click.option('--types', default='journal', help='ElasticSearch types to lookup (csv if many)') @click.option('--size', default=10, type=click.INT, help='Pagination size') def main(server, indexes, types, size): for entry in query_log_server(server, indexes, types, size): print(entry) if __name__ == '__main__': main()