Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346931
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
View Options
diff --git a/ardumont/kibana_fetch_logs.py b/ardumont/kibana_fetch_logs.py
index ec23a04..f0f7004 100755
--- a/ardumont/kibana_fetch_logs.py
+++ b/ardumont/kibana_fetch_logs.py
@@ -1,249 +1,211 @@
#!/usr/bin/env python3
# Copyright (C) 2017-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# Configuration file at ~/.config/swh/kibana/query.yml
# (configuration sample at https://forge.softwareheritage.org/P221)
# Use:
# ./kibana_fetch_logs.py | tee temporary-error-file
import ast
import click
import logging
import requests
from swh.core.config import SWHConfig
class KibanaFetchLog(SWHConfig):
"""Kibana fetch log class to permit log retrieval.
"""
CONFIG_BASE_FILENAME = 'kibana/query'
DEFAULT_CONFIG = {
'server': ('str', 'http://esnode3.internal.softwareheritage.org:9200'),
- 'indexes': ('list[str]', ['swh_workers-2017.05.*', 'swh_workers-2017.06.*']),
+ 'indexes': ('list[str]', [
+ 'swh_workers-2017.05.*', 'swh_workers-2017.06.*']),
'types': ('str', 'journal'),
'size': ('int', 10),
'from': ('int', 0),
'_source': ('list[str]', [
'message',
'swh_logging_args_args',
'swh_logging_args_exc',
'swh_logging_args_kwargs']),
'sort': ('list', [{
'@timestamp': 'asc'
}]),
'query': ('dict', {
'bool': {
'must': [
{
'match': {
- 'systemd_unit': {
+ 'systemd_unit.keyword': {
'query': 'swh-worker@swh_loader_svn.service',
}
}
},
{
'term': {
'priority': '3'
}
}
- ],
- 'must_not': [
- {
- 'match': {
- 'message': {
- 'query': "[.*] Property 'svn:externals' detected.", # noqa
- }
- }
- },
- {
- 'match': {
- 'message': {
- 'query': '[.*] Uneventful visit. Detail: file',
- }
- }
- },
- {
- 'match': {
- 'message': {
- 'query': '.*Failed to mount the svn dump.*',
- }
- }
- },
- {
- 'match': {
- 'message': {
- 'query': '[.*] Loading failure, updating to `partial`', # noqa
- }}},
- {
- 'match': {
- 'message': {
- 'query': '[.*] consumer: Cannot connect to amqp.*', # noqa
- }}},
- {
- 'match': {
- 'message': {
- 'query': '[.*] pidbox command error.*',
- }
- }
- }
]
}
}),
}
ADDITIONAL_CONFIG = {}
def __init__(self, config=None):
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
if config: # permit to amend the configuration file if it exists
self.config.update(config)
self.server = self.config['server']
self.indexes = ','.join(self.config['indexes'])
self.types = self.config['types']
self.payload = {
key: self.config[key]
for key in ['from', '_source', 'query', 'size', 'sort']
}
logging.debug('### Server: %s' % self.server)
logging.debug('### Indexes: %s' % self.indexes)
logging.debug('### Types: %s' % self.types)
def _retrieve_data(self, server, indexes, types, start=None):
"""Retrieve information from server looking up through 'indexes' and
'types'. This returns result of length 'size' (configuration)
starting from the 'start' position.
"""
payload = self.payload
if start:
payload['search_after'] = [start]
url = '%s/%s/%s/_search' % (server, indexes, types)
r = requests.post(url, json=payload)
logging.debug('Payload: %s' % payload)
if not r.ok:
logging.debug('Response: %s' % r.content)
- raise ValueError("Problem when communicating with server: %s" % r.status_code)
+ raise ValueError("Problem when communicating with server: %s" % (
+ r.status_code, ))
return r.json()
def _format_result(self, json):
"""Format result from the server's response"""
if not json:
return {}
hits = json.get('hits')
if not hits:
return {}
total_hits = hits.get('total')
if not total_hits:
return {}
hits = hits.get('hits')
if not hits:
return {}
all_data = []
last_sort_time = None
for data in hits:
last_sort_time = data['sort'][0]
source = data['_source']
_data = {}
swh_logging_args_args = source.get('swh_logging_args_args')
if swh_logging_args_args:
_data['args'] = ast.literal_eval(swh_logging_args_args)
swh_logging_args_kwargs = source.get('swh_logging_args_kwargs')
if swh_logging_args_kwargs:
_data['kwargs'] = ast.literal_eval(swh_logging_args_kwargs)
exception = source.get('swh_logging_args_exc')
if exception:
_data['exception'] = exception
if not _data:
message = source.get('message')
if message:
_data = {
'args': {},
'kwargs': {},
'exception': message
}
if _data:
all_data.append(_data)
return {
'all': all_data,
'last_sort_time': last_sort_time,
'total_hits': total_hits
}
def fetch(self):
"""Fetch wanted information (cf. 'query' entry in configuration).
"""
count = 0
last_sort_time = None
total_hits = 1
server = self.server
indexes = self.indexes
types = self.types
while count < total_hits:
response = self._retrieve_data(
server, indexes, types, start=last_sort_time)
data = self._format_result(response)
if not data:
break
total_hits = data['total_hits']
last_sort_time = data['last_sort_time']
for row in data['all']:
count += 1
yield row
@click.command()
@click.option('--server', default=None,
help='Elastic search instance to query against')
@click.option('--indexes', default=None,
help='ElasticSearch indexes to lookup (csv if many)')
@click.option('--types', default=None,
help='ElasticSearch types to lookup (csv if many)')
@click.option('--size', default=10, type=click.INT, help='Pagination size')
@click.option('--debug/--nodebug', is_flag=True, default=False)
def main(server, indexes, types, size, debug):
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
config = {}
if server:
config['server'] = server
if indexes:
config['indexes'] = indexes.split(',')
if types:
config['types'] = types
if size:
config['size'] = size
fetcher = KibanaFetchLog(config)
for entry in fetcher.fetch():
print(entry)
if __name__ == '__main__':
main()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 4:40 PM (2 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452624
Attached To
rDSNIP Code snippets
Event Timeline
Log In to Comment