Page MenuHomeSoftware Heritage

backend_es.py
No OneTemporary

backend_es.py

# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Elastic Search backend
"""
from swh.core import utils
from swh.core.config import SWHConfig
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class SWHElasticSearchClient(SWHConfig):
CONFIG_BASE_FILENAME = 'backend/elastic'
DEFAULT_CONFIG = {
'storage_nodes': ('[dict]', [{'host': 'localhost', 'port': 9200}]),
'index_name_prefix': ('str', 'swh-tasks'),
'client_options': ('dict', {
'sniff': True,
'http_compress': False,
})
}
def __init__(self, **config):
if config:
self.config = config
else:
self.config = self.parse_config_file()
options = self.config['client_options']
sniff = options['sniff']
self.storage = Elasticsearch(
# nodes to use by default
self.config['storage_nodes'],
# auto detect cluster's status
sniff_on_start=sniff, sniff_on_connection_fail=sniff,
sniffer_timeout=60,
# compression or not
http_compress=options['http_compress'])
self.index_name_prefix = self.config['index_name_prefix']
# document's index type (cf. ../../data/elastic-template.json)
self.doc_type = 'task'
def compute_index_name(self, year, month):
"""Given a year, month, compute the index's name.
"""
return '%s-%s-%s' % (
self.index_name_prefix, year, '%02d' % month)
def index(self, data):
"""Index given data to elasticsearch.
The field 'ended' in data is used to compute the index to
index data to.
"""
date = data['ended']
index_name = self.compute_index_name(date.year, date.month)
return self.storage.index(index=index_name,
doc_type=self.doc_type,
body=data)
def mget(self, index_name, doc_ids, chunk_size=500,
source=True, log=None):
"""Retrieve document's full content according to their ids as per
source's setup.
The `source` permits to retrieve only what's of interest to
us, e.g:
- source=True ; gives back the original indexed data
- source=False ; returns without the original _source field
- source=['task_id'] ; returns only task_id in the _source field
Args:
index_name (str): Name of the concerned index.
doc_ids (generator): Generator of ids to retrieve
chunk_size (int): Number of documents chunk to send for retrieval
source (bool/[str]): Source of information to return
Yields:
document indexed as per source's setup
"""
if isinstance(source, list):
source = {'_source': ','.join(source)}
else:
source = {'_source': str(source).lower()}
for ids in utils.grouper(doc_ids, n=1000):
res = self.storage.mget(body={'ids': list(ids)},
index=index_name,
doc_type=self.doc_type,
params=source)
if not res:
if log:
log.error('Error during retrieval of data, skipping!')
continue
for doc in res['docs']:
found = doc.get('found')
if not found:
msg = 'Doc id %s not found, not indexed yet' % doc['_id']
if log:
log.warning(msg)
continue
yield doc['_source']
def _streaming_bulk(self, index_name, doc_stream, chunk_size=500,
log=None):
"""Bulk index data and returns the successful indexed data's
identifier.
Args:
index_name (str): Name of the concerned index.
doc_stream (generator): Generator of documents to index
chunk_size (int): Number of documents chunk to send for indexation
Yields:
document id indexed
"""
actions = ({'_index': index_name,
'_op_type': 'index',
'_type': self.doc_type,
'_source': data} for data in doc_stream)
for ok, result in helpers.streaming_bulk(client=self.storage,
actions=actions,
chunk_size=chunk_size,
raise_on_error=False,
raise_on_exception=False):
if not ok:
if log:
log.error('Error during %s indexation. Skipping.' % result)
continue
yield result['index']['_id']
def streaming_bulk(self, index_name, doc_stream, chunk_size=500,
source=True, log=None):
"""Bulk index data and returns the successful indexed data as per
source's setup.
the `source` permits to retrieve only what's of interest to
us, e.g:
- source=True ; gives back the original indexed data
- source=False ; returns without the original _source field
- source=['task_id'] ; returns only task_id in the _source field
Args:
index_name (str): Name of the concerned index.
doc_stream (generator): Document generator to index
chunk_size (int): Number of documents chunk to send
source (bool, [str]): the information to return
"""
indexed_ids = self._streaming_bulk(
index_name, doc_stream, chunk_size=chunk_size, log=log)
yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size,
source=source, log=log)

File Metadata

Mime Type
text/x-python
Expires
Tue, Apr 15, 12:21 AM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3286478

Event Timeline