diff --git a/ardumont/group_by_exception.py b/ardumont/group_by_exception.py index c1b405d..1df2606 100755 --- a/ardumont/group_by_exception.py +++ b/ardumont/group_by_exception.py @@ -1,88 +1,95 @@ #!/usr/bin/env python3 # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Use: # ./kibana_fetch_logs.py | tee temporary-error-file | \ # ./group_by_exception.py | jq > temporary-error-file-groupby-exception import ast import click import json import operator +import re import sys from collections import defaultdict, OrderedDict LOADER_TYPES = ['git', 'svn'] def work_on_exception_msg(exception): - return exception[0:50] + exception_msg = None + if exception.startswith('['): + exception_msg = re.sub('\[.*\]', '', exception).lstrip() + else: + exception_msg = exception + return exception_msg[0:50] def group_by(origin_types, loader_type): group = {ori_type: defaultdict(list) for ori_type in origin_types} if loader_type == 'svn': # args = ('path-to-archive', 'some-origin-url') origin_key_to_lookup = 1 elif loader_type == 'git': # args = {'origin_url: 'some-origin-url} origin_key_to_lookup = 'origin_url' for line in sys.stdin: origin_type = None line = line.strip() data = ast.literal_eval(line) for ori_type in origin_types: args = data['args'] - if ori_type in args[origin_key_to_lookup]: + if args and ori_type in args[origin_key_to_lookup]: origin_type = ori_type break if not origin_type: - continue + origin_type = 'unknown' reworked_exception_msg = work_on_exception_msg(data['exception']) group[origin_type][reworked_exception_msg].append(data['args']) return group @click.command() @click.option('--origin-types', default=['gitorious', 'googlecode'], help='Default types of origin to lookup') @click.option('--loader-type', default='svn', help="Type of loader (git, svn)") def main(origin_types, loader_type): if loader_type not in LOADER_TYPES: raise ValueError('Bad input, loader type is one of %s' % LOADER_TYPES) + origin_types = origin_types + ['unknown'] group = group_by(origin_types, loader_type) result = {} for ori_type in origin_types: _map = {} total = 0 for k, v in group[ori_type].items(): l = len(v) _map[k] = l total += l out = sorted(_map.items(), key=operator.itemgetter(1), reverse=True) result[ori_type] = { 'total': total, 'errors': OrderedDict(out), } print(json.dumps(result)) if __name__ == '__main__': main() diff --git a/ardumont/kibana_fetch_logs.py b/ardumont/kibana_fetch_logs.py index fe1d126..82bbbfe 100755 --- a/ardumont/kibana_fetch_logs.py +++ b/ardumont/kibana_fetch_logs.py @@ -1,165 +1,176 @@ #!/usr/bin/env python3 # Use: # ./kibana_fetch_logs.py | tee temporary-error-file # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import click import requests PAYLOAD_REQUESTS = { "from": 0, "_source": [ + "message", "swh_logging_args_args", "swh_logging_args_exc", "swh_logging_args_kwargs" ], "sort": [ { "@timestamp": "asc" } ], "query": { "bool": { "must": [ { "match": { "systemd_unit": { "query": "swh-worker@swh_loader_svn.service", "type": "phrase" } } }, { "term": { "priority": "3" } } ], "must_not": [ { "match": { "message": { "query": "[.*] consumer: Cannot connect to amqp.*", "type": "phrase" } } }, { "match": { "message": { "query": "[.*] pidbox command error.*", "type": "phrase" } } } ] } } } def retrieve_data(server, indexes, types, size, start=None): """Retrieve information from server looking up through 'indexes' and 'types'. This returns result of length 'size' starting from the 'start' position. """ payload = PAYLOAD_REQUESTS.copy() payload['size'] = size if start: payload['search_after'] = [start] url = '%s/%s/%s/_search' % (server, indexes, types) r = requests.post(url, json=payload) if r.ok: return r.json() def format_result(json): """Format result from the server's response""" if not json: return {} hits = json.get('hits') if not hits: return {} total_hits = hits.get('total') if not total_hits: return {} hits = hits.get('hits') if not hits: return {} all_data = [] last_sort_time = None for data in hits: last_sort_time = data['sort'][0] source = data['_source'] _data = {} swh_logging_args_args = source.get('swh_logging_args_args') if swh_logging_args_args: _data['args'] = ast.literal_eval(swh_logging_args_args) swh_logging_args_kwargs = source.get('swh_logging_args_kwargs') if swh_logging_args_kwargs: _data['kwargs'] = ast.literal_eval(swh_logging_args_kwargs) exception = source.get('swh_logging_args_exc') if exception: _data['exception'] = exception + if not _data: + message = source.get('message') + if message: + _data = { + 'args': {}, + 'kwargs': {}, + 'exception': message + } + if _data: all_data.append(_data) return { 'all': all_data, 'last_sort_time': last_sort_time, 'total_hits': total_hits } def query_log_server(server, indexes, types, size): count = 0 last_sort_time = None total_hits = 1 while count < total_hits: response = retrieve_data(server, indexes, types, size, start=last_sort_time) data = format_result(response) if not data: break total_hits = data['total_hits'] + last_sort_time = data['last_sort_time'] for row in data['all']: count += 1 yield row @click.command() @click.option('--server', default='http://banco.internal.softwareheritage.org:9200', help='Elastic search instance to query against') @click.option('--indexes', default='logstash-2017.05.*,logstash-2017.06.*', help='ElasticSearch indexes to lookup (csv if many)') @click.option('--types', default='journal', help='ElasticSearch types to lookup (csv if many)') @click.option('--size', default=10, type=click.INT, help='Pagination size') def main(server, indexes, types, size): for entry in query_log_server(server, indexes, types, size): print(entry) if __name__ == '__main__': main() diff --git a/ardumont/reschedule_errors.py b/ardumont/reschedule_errors.py index 4064ae7..050a276 100755 --- a/ardumont/reschedule_errors.py +++ b/ardumont/reschedule_errors.py @@ -1,72 +1,90 @@ #!/usr/bin/env python3 # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import click import sys from swh.scheduler.utils import get_task _MAP_ORIGIN_QUEUE = { - 'gitorious': 'swh.loader.git.tasks.LoadDiskGitRepository', - 'googlecode': 'swh.loader.git.tasks.UncompressAndLoadDiskGitRepository', + 'git-gitorious': 'swh.loader.git.tasks.LoadDiskGitRepository', + 'git-googlecode': 'swh.loader.git.tasks.UncompressAndLoadDiskGitRepository', + 'svn-googlecode': 'swh.loader.svn.tasks.MountAndLoadSvnRepositoryTsk' } _BLACK_LISTED_EXCEPTIONS = [ "NotGitRepository('No git repository was found at /", "ValueError('Failed to uncompress archive /srv/stor", ] +"""Loader types""" +LOADER_TYPES = ['git', 'svn'] + def work_on_exception_msg(exception): return exception[0:40] @click.command() -@click.option('--origins', default=['gitorious', 'googlecode']) +@click.option('--origins', + help='Origin concerned by scheduling back', + default=['gitorious', 'googlecode']) +@click.option('--loader-type', ) @click.option('--dry-run/--no-dry-run', help='Do nothing but print.') -def main(origins, dry_run): +def main(origins, loader_type, dry_run): if dry_run: print('*** DRY RUN ***') tasks = {k: get_task(v) for k, v in _MAP_ORIGIN_QUEUE.items()} black_listed_exceptions = list(map(work_on_exception_msg, _BLACK_LISTED_EXCEPTIONS)) + if loader_type == 'svn': + # args = ('path-to-archive', 'some-origin-url') + origin_key_to_lookup = 1 + elif loader_type == 'git': + # args = {'origin_url: 'some-origin-url} + origin_key_to_lookup = 'origin_url' + for line in sys.stdin: line = line.rstrip() data = ast.literal_eval(line) args = data['args'] exception = work_on_exception_msg(data['exception']) if exception in black_listed_exceptions: continue ori_type = None for ori in origins: - if ori in args['origin_url']: - ori_type = ori + url = args[origin_key_to_lookup] + if ori in url: + ori_type = '-'.join([loader_type, ori]) break if not ori_type: print('origin type \'%s\' unknown' % ori_type) continue print('%s %s' % (ori_type, data['args'])) if dry_run: continue - tasks[ori_type].delay(**data['args']) + if loader_type == 'svn': + tasks[ori_type].delay(*data['args']) + else: + tasks[ori_type].delay(**data['args']) if __name__ == '__main__': main()