Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py
index 9980dcb..60e7092 100644
--- a/swh/scheduler/backend_es.py
+++ b/swh/scheduler/backend_es.py
@@ -1,268 +1,268 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Elastic Search backend
"""
import datetime # noqa
import logging
from copy import deepcopy
from typing import Any, Dict
from elasticsearch import helpers
from swh.core import utils
logger = logging.getLogger(__name__)
DEFAULT_CONFIG = {
'elasticsearch': {
'cls': 'local',
'args': {
'index_name_prefix': 'swh-tasks',
'storage_nodes': ['localhost:9200'],
'client_options': {
'sniff_on_start': False,
'sniff_on_connection_fail': True,
'http_compress': False,
'sniffer_timeout': 60
},
},
}
}
def get_elasticsearch(cls: str, args: Dict[str, Any] = {}):
"""Instantiate an elastic search instance
"""
if cls == 'local':
from elasticsearch import Elasticsearch
elif cls == 'memory':
- from .backend_es_memory import MemoryElasticsearch as Elasticsearch # type: ignore # noqa
+ from .elasticsearch_memory import MemoryElasticsearch as Elasticsearch # type: ignore # noqa
else:
raise ValueError('Unknown elasticsearch class `%s`' % cls)
return Elasticsearch(**args)
class ElasticSearchBackend:
"""ElasticSearch backend to index tasks
This uses an elasticsearch client to actually discuss with the
elasticsearch instance.
"""
def __init__(self, **config):
self.config = deepcopy(DEFAULT_CONFIG)
self.config.update(config)
es_conf = self.config['elasticsearch']
args = deepcopy(es_conf['args'])
self.index_name_prefix = args.pop('index_name_prefix')
self.storage = get_elasticsearch(
cls=es_conf['cls'],
args={
'storage_nodes': args.get('storage_nodes', []),
**args.get('client_options', {}),
}
)
# document's index type (cf. /data/elastic-template.json)
self.doc_type = 'task'
def initialize(self):
self.storage.indices.put_mapping(
index=f"{self.index_name_prefix}-*",
doc_type=self.doc_type,
# to allow type definition below
include_type_name=True,
# to allow install mapping even if no index yet
allow_no_indices=True,
body={
"properties": {
"task_id": {"type": "double"},
"task_policy": {"type": "text"},
"task_status": {"type": "text"},
"task_run_id": {"type": "double"},
"arguments": {
"type": "object",
"properties": {
"args": {
"type": "nested",
"dynamic": False
},
"kwargs": {
"type": "text"
}
}
},
"type": {"type": "text"},
"backend_id": {"type": "text"},
"metadata": {
"type": "object",
"enabled": False
},
"scheduled": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa
},
"started": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa
},
"ended": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" # noqa
}
}
})
self.storage.indices.put_settings(
index=f"{self.index_name_prefix}-*",
allow_no_indices=True,
body={
"index": {
"codec": "best_compression",
"refresh_interval": "1s",
"number_of_shards": 1
}
})
def create(self, index_name) -> None:
"""Create and initialize index_name with mapping for all indices
matching `swh-tasks-` pattern
"""
assert index_name.startswith(self.index_name_prefix)
self.storage.indices.create(index_name)
def compute_index_name(self, year, month):
"""Given a year, month, compute the index's name.
"""
return '%s-%s-%s' % (
self.index_name_prefix, year, '%02d' % month)
def mget(self, index_name, doc_ids, chunk_size=500,
source=True):
"""Retrieve document's full content according to their ids as per
source's setup.
The `source` allows to retrieve only what's interesting, e.g:
- source=True ; gives back the original indexed data
- source=False ; returns without the original _source field
- source=['task_id'] ; returns only task_id in the _source field
Args:
index_name (str): Name of the concerned index.
doc_ids (generator): Generator of ids to retrieve
chunk_size (int): Number of documents chunk to send for retrieval
source (bool/[str]): Source of information to return
Yields:
document indexed as per source's setup
"""
if isinstance(source, list):
source = {'_source': ','.join(source)}
else:
source = {'_source': str(source).lower()}
for ids in utils.grouper(doc_ids, n=1000):
res = self.storage.mget(body={'ids': list(ids)},
index=index_name,
doc_type=self.doc_type,
params=source)
if not res:
logger.error('Error during retrieval of data, skipping!')
continue
for doc in res['docs']:
found = doc.get('found')
if not found:
msg = 'Doc id %s not found, not indexed yet' % doc['_id']
logger.warning(msg)
continue
yield doc['_source']
def _streaming_bulk(self, index_name, doc_stream, chunk_size=500):
"""Bulk index data and returns the successful indexed data's
identifier.
Args:
index_name (str): Name of the concerned index.
doc_stream (generator): Generator of documents to index
chunk_size (int): Number of documents chunk to send for indexation
Yields:
document id indexed
"""
actions = ({'_index': index_name,
'_op_type': 'index',
'_type': self.doc_type,
'_source': data} for data in doc_stream)
for ok, result in helpers.streaming_bulk(client=self.storage,
actions=actions,
chunk_size=chunk_size,
raise_on_error=False,
raise_on_exception=False):
if not ok:
logger.error('Error during %s indexation. Skipping.', result)
continue
yield result['index']['_id']
def is_index_opened(self, index_name: str) -> bool:
"""Determine if an index is opened or not
"""
try:
self.storage.indices.stats(index_name)
return True
except Exception:
# fails when indice is closed (no other api call found)
return False
def streaming_bulk(self, index_name, doc_stream, chunk_size=500,
source=True):
"""Bulk index data and returns the successful indexed data as per
source's setup.
the `source` permits to retrieve only what's of interest to
us, e.g:
- source=True ; gives back the original indexed data
- source=False ; returns without the original _source field
- source=['task_id'] ; returns only task_id in the _source field
Args:
index_name (str): Name of the concerned index.
doc_stream (generator): Document generator to index
chunk_size (int): Number of documents chunk to send
source (bool, [str]): the information to return
"""
to_close = False
# index must exist
if not self.storage.indices.exists(index_name):
self.create(index_name)
# Close that new index (to avoid too much opened indices)
to_close = True
# index must be opened
if not self.is_index_opened(index_name):
to_close = True
self.storage.indices.open(index_name)
try:
indexed_ids = self._streaming_bulk(
index_name, doc_stream, chunk_size=chunk_size)
yield from self.mget(
index_name, indexed_ids, chunk_size=chunk_size, source=source)
finally:
# closing it to stay in the same state as prior to the call
if to_close:
self.storage.indices.close(index_name)
diff --git a/swh/scheduler/backend_es_memory.py b/swh/scheduler/elasticsearch_memory.py
similarity index 100%
rename from swh/scheduler/backend_es_memory.py
rename to swh/scheduler/elasticsearch_memory.py
diff --git a/swh/scheduler/tests/es/test_backend_es.py b/swh/scheduler/tests/es/test_backend_es.py
index 41cb61d..dd01546 100644
--- a/swh/scheduler/tests/es/test_backend_es.py
+++ b/swh/scheduler/tests/es/test_backend_es.py
@@ -1,78 +1,78 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import pytest
import elasticsearch
from swh.scheduler.backend_es import get_elasticsearch
from ..common import tasks_from_template, TEMPLATES
def test_get_elasticsearch():
with pytest.raises(ValueError, match='Unknown elasticsearch class'):
get_elasticsearch('unknown')
es = get_elasticsearch('memory')
assert es
- from swh.scheduler.backend_es_memory import MemoryElasticsearch
+ from swh.scheduler.elasticsearch_memory import MemoryElasticsearch
assert isinstance(es, MemoryElasticsearch)
es = get_elasticsearch('local')
assert es
assert isinstance(es, elasticsearch.Elasticsearch)
def test_backend_setup_basic(swh_elasticsearch):
"""Elastic search instance should allow to create/close/check index
"""
index_name = 'swh-tasks-2010-01'
try:
swh_elasticsearch.storage.indices.get_mapping(index_name)
except (elasticsearch.exceptions.NotFoundError, KeyError):
pass
assert not swh_elasticsearch.storage.indices.exists(index_name)
swh_elasticsearch.create(index_name)
assert swh_elasticsearch.storage.indices.exists(index_name)
assert swh_elasticsearch.is_index_opened(index_name)
# index exists with a mapping
mapping = swh_elasticsearch.storage.indices.get_mapping(index_name)
assert mapping != {}
def test_backend_setup_index(swh_elasticsearch):
"""Elastic search instance should allow to bulk index
"""
template_git = TEMPLATES['git']
next_run_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
tasks = tasks_from_template(template_git, next_run_date, 1)
index_name = swh_elasticsearch.compute_index_name(
next_run_date.year, next_run_date.month)
assert not swh_elasticsearch.storage.indices.exists(index_name)
tasks = list(swh_elasticsearch.streaming_bulk(index_name, tasks))
assert len(tasks) > 0
for output_task in tasks:
assert output_task is not None
assert output_task['type'] == template_git['type']
assert output_task['arguments'] is not None
next_run = output_task['next_run']
if isinstance(next_run, str): # real elasticsearch
assert next_run == next_run_date.isoformat()
else: # memory implem. does not really index
assert next_run == next_run_date
assert swh_elasticsearch.storage.indices.exists(index_name)
assert not swh_elasticsearch.is_index_opened(index_name)
mapping = swh_elasticsearch.storage.indices.get_mapping(index_name)
assert mapping != {}
diff --git a/swh/scheduler/tests/es/test_backend_es_memory.py b/swh/scheduler/tests/es/test_elasticsearch_memory.py
similarity index 98%
rename from swh/scheduler/tests/es/test_backend_es_memory.py
rename to swh/scheduler/tests/es/test_elasticsearch_memory.py
index cd76406..8da4ece 100644
--- a/swh/scheduler/tests/es/test_backend_es_memory.py
+++ b/swh/scheduler/tests/es/test_elasticsearch_memory.py
@@ -1,161 +1,159 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import hashlib
import logging
import random
import pytest
-from swh.scheduler.backend_es_memory import (
- BasicSerializer, BasicTransport
-)
+from swh.scheduler.elasticsearch_memory import BasicSerializer, BasicTransport
from ..common import tasks_from_template, TEMPLATES
from typing import Any, Dict
logger = logging.getLogger(__name__)
def test_serializer():
s = BasicSerializer()
assert s
data = {'something': [1, 2, 3], 'cool': {'1': '2'}}
actual_data = s.dumps(data)
assert isinstance(actual_data, str)
assert actual_data == str(data)
def test_basic_transport():
b = BasicTransport()
assert b
assert isinstance(b.serializer, BasicSerializer)
def test_index_manipulation(swh_memory_elasticsearch):
index_name = 'swh-tasks-xxxx'
indices = swh_memory_elasticsearch.index
assert not swh_memory_elasticsearch.exists(index_name)
assert index_name not in indices
# so stat raises
with pytest.raises(Exception):
swh_memory_elasticsearch.stats(index_name)
# we create the index
swh_memory_elasticsearch.create(index_name)
# now the index exists
assert swh_memory_elasticsearch.exists(index_name)
assert index_name in indices
# it's opened
assert indices[index_name]['status'] == 'opened'
# so stats is happy
swh_memory_elasticsearch.stats(index_name)
# open the index, nothing changes
swh_memory_elasticsearch.open(index_name)
assert indices[index_name]['status'] == 'opened'
# close the index
swh_memory_elasticsearch.close(index_name)
assert indices[index_name]['status'] == 'closed'
# reopen the index (fun times)
swh_memory_elasticsearch.open(index_name)
assert indices[index_name]['status'] == 'opened'
def test_bulk_and_mget(swh_memory_elasticsearch):
# initialize tasks
template_git = TEMPLATES['git']
next_run_start = datetime.datetime.utcnow() - datetime.timedelta(days=1)
tasks = tasks_from_template(template_git, next_run_start, 100)
def compute_id(stask):
return hashlib.sha1(stask.encode('utf-8')).hexdigest()
body = []
ids_to_task = {}
for task in tasks:
date = task['next_run']
index_name = f'swh-tasks-{date.year}-{date.month}'
idx = {'index': {'_index': index_name}}
sidx = swh_memory_elasticsearch.transport.serializer.dumps(idx)
body.append(sidx)
stask = swh_memory_elasticsearch.transport.serializer.dumps(task)
body.append(stask)
_id = compute_id(stask)
ids_to_task[_id] = task
logger.debug(f'_id: {_id}, task: {task}')
# store
# create the index first
swh_memory_elasticsearch.create(index_name)
# then bulk insert new data
result = swh_memory_elasticsearch.bulk('\n'.join(body))
# no guarantee in the order
assert result
actual_items = result['items']
assert len(actual_items) == len(ids_to_task)
def get_id(data: Dict[str, Any]) -> str:
return data['index']['_id']
actual_items = sorted(actual_items, key=get_id)
expected_items = {
'items': [
{
'index': {
'status': 200,
'_id': _id
}
} for _id in list(ids_to_task)
]
}
expected_items = sorted(expected_items['items'], key=get_id)
assert actual_items == expected_items
# retrieve
nb_docs = 10
ids = list(ids_to_task)
random_ids = []
# add some inexistent ids
for i in range(16):
noisy_id = f'{i}' * 40
random_ids.append(noisy_id)
random_ids.extend(random.sample(ids, nb_docs)) # add relevant ids
for i in range(16, 32):
noisy_id = f'{i}' * 40
random_ids.append(noisy_id)
result = swh_memory_elasticsearch.mget(
index=index_name, body={'ids': random_ids})
assert result['docs']
assert len(result['docs']) == nb_docs, "no random and inexistent id found"
for doc in result['docs']:
assert doc['found']
actual_task = doc['_source']
_id = compute_id(str(actual_task))
expected_task = ids_to_task[_id]
assert actual_task == expected_task

File Metadata

Mime Type
text/x-diff
Expires
Tue, Aug 19, 12:57 AM (3 w, 9 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3317296

Event Timeline