Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend_es.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Elastic Search backend | """Elastic Search backend | ||||
""" | """ | ||||
from copy import deepcopy | |||||
from swh.core import utils | from swh.core import utils | ||||
from swh.core.config import SWHConfig | |||||
from elasticsearch import Elasticsearch | from elasticsearch import Elasticsearch | ||||
from elasticsearch import helpers | from elasticsearch import helpers | ||||
class SWHElasticSearchClient(SWHConfig): | |||||
CONFIG_BASE_FILENAME = 'backend/elastic' | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'storage_nodes': ('[dict]', [{'host': 'localhost', 'port': 9200}]), | 'elastic_search': { | ||||
'index_name_prefix': ('str', 'swh-tasks'), | 'storage_nodes': {'host': 'localhost', 'port': 9200}, | ||||
'client_options': ('dict', { | 'index_name_prefix': 'swh-tasks', | ||||
'client_options': { | |||||
'sniff_on_start': False, | 'sniff_on_start': False, | ||||
'sniff_on_connection_fail': True, | 'sniff_on_connection_fail': True, | ||||
'http_compress': False, | 'http_compress': False, | ||||
}) | }, | ||||
}, | |||||
} | } | ||||
def __init__(self, **config): | |||||
if config: | |||||
self.config = config | |||||
else: | |||||
self.config = self.parse_config_file() | |||||
options = self.config['client_options'] | class SWHElasticSearchClient: | ||||
def __init__(self, **config): | |||||
self.config = deepcopy(DEFAULT_CONFIG) | |||||
self.config.update(config) | |||||
es_conf = self.config['elastic_search'] | |||||
options = es_conf.get('client_options', {}) | |||||
self.storage = Elasticsearch( | self.storage = Elasticsearch( | ||||
# nodes to use by default | # nodes to use by default | ||||
self.config['storage_nodes'], | es_conf['storage_nodes'], | ||||
# auto detect cluster's status | # auto detect cluster's status | ||||
sniff_on_start=options['sniff_on_start'], | sniff_on_start=options['sniff_on_start'], | ||||
sniff_on_connection_fail=options['sniff_on_connection_fail'], | sniff_on_connection_fail=options['sniff_on_connection_fail'], | ||||
sniffer_timeout=60, | sniffer_timeout=60, | ||||
# compression or not | # compression or not | ||||
http_compress=options['http_compress']) | http_compress=options['http_compress']) | ||||
self.index_name_prefix = self.config['index_name_prefix'] | self.index_name_prefix = es_conf['index_name_prefix'] | ||||
# document's index type (cf. ../../data/elastic-template.json) | # document's index type (cf. ../../data/elastic-template.json) | ||||
self.doc_type = 'task' | self.doc_type = 'task' | ||||
def compute_index_name(self, year, month): | def compute_index_name(self, year, month): | ||||
"""Given a year, month, compute the index's name. | """Given a year, month, compute the index's name. | ||||
""" | """ | ||||
return '%s-%s-%s' % ( | return '%s-%s-%s' % ( | ||||
▲ Show 20 Lines • Show All 113 Lines • Show Last 20 Lines |