diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py deleted file mode 100644 --- a/swh/scheduler/backend_es.py +++ /dev/null @@ -1,269 +0,0 @@ -# Copyright (C) 2018-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""Elastic Search backend - -""" - -from copy import deepcopy -import datetime # noqa -import logging -from typing import Any, Dict - -from elasticsearch import helpers - -from swh.core import utils - -logger = logging.getLogger(__name__) - - -DEFAULT_CONFIG = { - "elasticsearch": { - "cls": "local", - "args": { - "index_name_prefix": "swh-tasks", - "storage_nodes": ["localhost:9200"], - "client_options": { - "sniff_on_start": False, - "sniff_on_connection_fail": True, - "http_compress": False, - "sniffer_timeout": 60, - }, - }, - } -} - - -def get_elasticsearch(cls: str, args: Dict[str, Any] = {}): - """Instantiate an elastic search instance - - """ - if cls == "local": - from elasticsearch import Elasticsearch - elif cls == "memory": - from .elasticsearch_memory import ( # type: ignore # noqa - MemoryElasticsearch as Elasticsearch, - ) - else: - raise ValueError("Unknown elasticsearch class `%s`" % cls) - - return Elasticsearch(**args) - - -class ElasticSearchBackend: - """ElasticSearch backend to index tasks - - This uses an elasticsearch client to actually discuss with the - elasticsearch instance. - - """ - - def __init__(self, **config): - self.config = deepcopy(DEFAULT_CONFIG) - self.config.update(config) - es_conf = self.config["elasticsearch"] - args = deepcopy(es_conf["args"]) - self.index_name_prefix = args.pop("index_name_prefix") - self.storage = get_elasticsearch( - cls=es_conf["cls"], - args={ - "hosts": args.get("storage_nodes", []), - **args.get("client_options", {}), - }, - ) - # document's index type (cf. /data/elastic-template.json) - self.doc_type = "task" - - def initialize(self): - self.storage.indices.put_mapping( - index=f"{self.index_name_prefix}-*", - doc_type=self.doc_type, - # to allow type definition below - include_type_name=True, - # to allow install mapping even if no index yet - allow_no_indices=True, - body={ - "properties": { - "task_id": {"type": "double"}, - "task_policy": {"type": "text"}, - "task_status": {"type": "text"}, - "task_run_id": {"type": "double"}, - "arguments": { - "type": "object", - "properties": { - "args": {"type": "nested", "dynamic": False}, - "kwargs": {"type": "text"}, - }, - }, - "type": {"type": "text"}, - "backend_id": {"type": "text"}, - "metadata": {"type": "object", "enabled": False}, - "scheduled": { - "type": "date", - "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis", # noqa - }, - "started": { - "type": "date", - "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis", # noqa - }, - "ended": { - "type": "date", - "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis", # noqa - }, - } - }, - ) - - self.storage.indices.put_settings( - index=f"{self.index_name_prefix}-*", - allow_no_indices=True, - body={ - "index": { - "codec": "best_compression", - "refresh_interval": "1s", - "number_of_shards": 1, - } - }, - ) - - def create(self, index_name) -> None: - """Create and initialize index_name with mapping for all indices - matching `swh-tasks-` pattern - - """ - assert index_name.startswith(self.index_name_prefix) - self.storage.indices.create(index_name) - - def compute_index_name(self, year, month): - """Given a year, month, compute the index's name. - - """ - return "%s-%s-%s" % (self.index_name_prefix, year, "%02d" % month) - - def mget(self, index_name, doc_ids, chunk_size=500, source=True): - """Retrieve document's full content according to their ids as per - source's setup. - - The `source` allows to retrieve only what's interesting, e.g: - - source=True ; gives back the original indexed data - - source=False ; returns without the original _source field - - source=['task_id'] ; returns only task_id in the _source field - - Args: - index_name (str): Name of the concerned index. - doc_ids (generator): Generator of ids to retrieve - chunk_size (int): Number of documents chunk to send for retrieval - source (bool/[str]): Source of information to return - - Yields: - document indexed as per source's setup - - """ - if isinstance(source, list): - source = {"_source": ",".join(source)} - else: - source = {"_source": str(source).lower()} - - for ids in utils.grouper(doc_ids, n=1000): - res = self.storage.mget( - body={"ids": list(ids)}, - index=index_name, - doc_type=self.doc_type, - params=source, - ) - if not res: - logger.error("Error during retrieval of data, skipping!") - continue - - for doc in res["docs"]: - found = doc.get("found") - if not found: - msg = "Doc id %s not found, not indexed yet" % doc["_id"] - logger.warning(msg) - continue - yield doc["_source"] - - def _streaming_bulk(self, index_name, doc_stream, chunk_size=500): - """Bulk index data and returns the successful indexed data's - identifier. - - Args: - index_name (str): Name of the concerned index. - doc_stream (generator): Generator of documents to index - chunk_size (int): Number of documents chunk to send for indexation - - Yields: - document id indexed - - """ - actions = ( - { - "_index": index_name, - "_op_type": "index", - "_type": self.doc_type, - "_source": data, - } - for data in doc_stream - ) - for ok, result in helpers.streaming_bulk( - client=self.storage, - actions=actions, - chunk_size=chunk_size, - raise_on_error=False, - raise_on_exception=False, - ): - if not ok: - logger.error("Error during %s indexation. Skipping.", result) - continue - yield result["index"]["_id"] - - def is_index_opened(self, index_name: str) -> bool: - """Determine if an index is opened or not - - """ - try: - self.storage.indices.stats(index_name) - return True - except Exception: - # fails when indice is closed (no other api call found) - return False - - def streaming_bulk(self, index_name, doc_stream, chunk_size=500, source=True): - """Bulk index data and returns the successful indexed data as per - source's setup. - - the `source` permits to retrieve only what's of interest to - us, e.g: - - - source=True ; gives back the original indexed data - - source=False ; returns without the original _source field - - source=['task_id'] ; returns only task_id in the _source field - - Note that: - - if the index is closed, it will be opened - - if the index does not exist, it will be created and opened - - This keeps the index opened for performance reasons. - - Args: - index_name (str): Name of the concerned index. - doc_stream (generator): Document generator to index - chunk_size (int): Number of documents chunk to send - source (bool, [str]): the information to return - - """ - # index must exist - if not self.storage.indices.exists(index_name): - self.create(index_name) - # index must be opened - if not self.is_index_opened(index_name): - self.storage.indices.open(index_name) - - indexed_ids = self._streaming_bulk( - index_name, doc_stream, chunk_size=chunk_size - ) - yield from self.mget( - index_name, indexed_ids, chunk_size=chunk_size, source=source - ) diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -8,8 +8,7 @@ # WARNING: do not import unnecessary things here to keep cli startup time under # control import locale -import logging -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional +from typing import TYPE_CHECKING, Iterator, List, Optional import click @@ -589,166 +588,3 @@ output.append("Respawn tasks %s\n" % (task_ids_int,)) click.echo("\n".join(output)) - - -@task.command("archive") -@click.option( - "--before", - "-b", - default=None, - help="""Task whose ended date is anterior will be archived. - Default to current month's first day.""", -) -@click.option( - "--after", - "-a", - default=None, - help="""Task whose ended date is after the specified date will - be archived. Default to prior month's first day.""", -) -@click.option( - "--batch-index", - default=1000, - type=click.INT, - help="Batch size of tasks to read from db to archive", -) -@click.option( - "--bulk-index", - default=200, - type=click.INT, - help="Batch size of tasks to bulk index", -) -@click.option( - "--batch-clean", - default=1000, - type=click.INT, - help="Batch size of task to clean after archival", -) -@click.option( - "--dry-run/--no-dry-run", - is_flag=True, - default=False, - help="Default to list only what would be archived.", -) -@click.option("--verbose", is_flag=True, default=False, help="Verbose mode") -@click.option( - "--cleanup/--no-cleanup", - is_flag=True, - default=True, - help="Clean up archived tasks (default)", -) -@click.option( - "--start-from", - type=click.STRING, - default=None, - help="(Optional) default page to start from.", -) -@click.pass_context -def archive_tasks( - ctx, - before, - after, - batch_index, - bulk_index, - batch_clean, - dry_run, - verbose, - cleanup, - start_from, -): - """Archive task/task_run whose (task_type is 'oneshot' and task_status - is 'completed') or (task_type is 'recurring' and task_status is - 'disabled'). - - With --dry-run flag set (default), only list those. - - """ - from itertools import groupby - - from swh.core.utils import grouper - from swh.scheduler.backend_es import ElasticSearchBackend - from swh.scheduler.utils import utcnow - - config = ctx.obj["config"] - scheduler = ctx.obj["scheduler"] - - if not scheduler: - raise ValueError("Scheduler class (local/remote) must be instantiated") - - logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) - logger = logging.getLogger(__name__) - logging.getLogger("urllib3").setLevel(logging.WARN) - logging.getLogger("elasticsearch").setLevel(logging.ERROR) - if dry_run: - logger.info("**DRY-RUN** (only reading db)") - if not cleanup: - logger.info("**NO CLEANUP**") - - es_storage = ElasticSearchBackend(**config) - now = utcnow() - - # Default to archive tasks from a rolling month starting the week - # prior to the current one - if not before: - before = now.shift(weeks=-1).format("YYYY-MM-DD") - - if not after: - after = now.shift(weeks=-1).shift(months=-1).format("YYYY-MM-DD") - - logger.debug( - "index: %s; cleanup: %s; period: [%s ; %s]" - % (not dry_run, not dry_run and cleanup, after, before) - ) - - def get_index_name( - data: Dict[str, Any], es_storage: ElasticSearchBackend = es_storage - ) -> str: - """Given a data record, determine the index's name through its ending - date. This varies greatly depending on the task_run's - status. - - """ - date = data.get("started") - if not date: - date = data["scheduled"] - return es_storage.compute_index_name(date.year, date.month) - - def index_data(before, page_token, batch_index): - while True: - result = scheduler.filter_task_to_archive( - after, before, page_token=page_token, limit=batch_index - ) - tasks_sorted = sorted(result["tasks"], key=get_index_name) - groups = groupby(tasks_sorted, key=get_index_name) - for index_name, tasks_group in groups: - logger.debug("Index tasks to %s" % index_name) - if dry_run: - for task in tasks_group: - yield task - continue - - yield from es_storage.streaming_bulk( - index_name, - tasks_group, - source=["task_id", "task_run_id"], - chunk_size=bulk_index, - ) - - page_token = result.get("next_page_token") - if page_token is None: - break - - gen = index_data(before, page_token=start_from, batch_index=batch_index) - if cleanup: - for task_ids in grouper(gen, n=batch_clean): - task_ids = list(task_ids) - logger.info("Clean up %s tasks: [%s, ...]" % (len(task_ids), task_ids[0])) - if dry_run: # no clean up - continue - ctx.obj["scheduler"].delete_archived_tasks(task_ids) - else: - for task_ids in grouper(gen, n=batch_index): - task_ids = list(task_ids) - logger.info("Indexed %s tasks: [%s, ...]" % (len(task_ids), task_ids[0])) - - logger.debug("Done!") diff --git a/swh/scheduler/elasticsearch_memory.py b/swh/scheduler/elasticsearch_memory.py deleted file mode 100644 --- a/swh/scheduler/elasticsearch_memory.py +++ /dev/null @@ -1,144 +0,0 @@ -# Copyright (C) 2018-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""Memory Elastic Search backend - -""" - -from ast import literal_eval -import datetime # noqa serialization purposes -import hashlib -import logging -from typing import Optional - -import psycopg2 # noqa serialization purposes - -logger = logging.getLogger(__name__) - - -class BasicSerializer: - """For memory elastic search implementation (not for production) - - """ - - def __init__(self, *args, **kwargs): - pass - - def dumps(self, *args, **kwargs): - return str(*args) - - -class BasicTransport: - """For memory elastic search implementation, (not for production) - - """ - - def __init__(self, *args, **kwargs): - self.serializer = BasicSerializer() - - -class MemoryElasticsearch: - """Memory Elasticsearch instance (for test purposes) - - Partial implementation oriented towards index storage (and not search) - - For now, its sole client is the scheduler for task archival purposes. - - """ - - def __init__(self, *args, **kwargs): - self.index = {} - self.mapping = {} - self.settings = {} - self.indices = self # HACK - self.main_mapping_key: Optional[str] = None - self.main_settings_key: Optional[str] = None - self.transport = BasicTransport() - - def create(self, index, **kwargs): - logger.debug(f"create index {index}") - logger.debug(f"indices: {self.index}") - logger.debug(f"mapping: {self.mapping}") - logger.debug(f"settings: {self.settings}") - self.index[index] = { - "status": "opened", - "data": {}, - "mapping": self.get_mapping(self.main_mapping_key), - "settings": self.get_settings(self.main_settings_key), - } - logger.debug(f"index {index} created") - - def close(self, index, **kwargs): - """Close index""" - idx = self.index.get(index) - if idx: - idx["status"] = "closed" - - def open(self, index, **kwargs): - """Open index""" - idx = self.index.get(index) - if idx: - idx["status"] = "opened" - - def bulk(self, body, **kwargs): - """Bulk insert document in index""" - assert isinstance(body, str) - all_data = body.split("\n") - if all_data[-1] == "": - all_data = all_data[:-1] # drop the empty line if any - ids = [] - # data is sent as tuple (index, data-to-index) - for i in range(0, len(all_data), 2): - # The first entry is about the index to use - # not about a data to index - # find the index - index_data = literal_eval(all_data[i]) - idx_name = index_data["index"]["_index"] - # associated data to index - data = all_data[i + 1] - _id = hashlib.sha1(data.encode("utf-8")).hexdigest() - parsed_data = eval(data) # for datetime - self.index[idx_name]["data"][_id] = parsed_data - ids.append(_id) - - # everything is indexed fine - return {"items": [{"index": {"status": 200, "_id": _id,}} for _id in ids]} - - def mget(self, *args, body, index, **kwargs): - """Bulk indexed documents retrieval""" - idx = self.index[index] - docs = [] - idx_docs = idx["data"] - for _id in body["ids"]: - doc = idx_docs.get(_id) - if doc: - d = { - "found": True, - "_source": doc, - } - docs.append(d) - return {"docs": docs} - - def stats(self, index, **kwargs): - idx = self.index[index] # will raise if it does not exist - if not idx or idx["status"] == "closed": - raise ValueError("Closed index") # simulate issue if index closed - - def exists(self, index, **kwargs): - return self.index.get(index) is not None - - def put_mapping(self, index, body, **kwargs): - self.mapping[index] = body - self.main_mapping_key = index - - def get_mapping(self, index, **kwargs): - return self.mapping.get(index) or self.index.get(index, {}).get("mapping", {}) - - def put_settings(self, index, body, **kwargs): - self.settings[index] = body - self.main_settings_key = index - - def get_settings(self, index, **kwargs): - return self.settings.get(index) or self.index.get(index, {}).get("settings", {}) diff --git a/swh/scheduler/tests/es/__init__.py b/swh/scheduler/tests/es/__init__.py deleted file mode 100644 diff --git a/swh/scheduler/tests/es/conftest.py b/swh/scheduler/tests/es/conftest.py deleted file mode 100644 --- a/swh/scheduler/tests/es/conftest.py +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest -import yaml - -from swh.scheduler import get_scheduler - - -@pytest.fixture -def swh_sched_config(swh_scheduler_config): - return { - "scheduler": {"cls": "local", **swh_scheduler_config,}, - "elasticsearch": { - "cls": "memory", - "args": {"index_name_prefix": "swh-tasks",}, - }, - } - - -@pytest.fixture -def swh_sched_config_file(swh_sched_config, monkeypatch, tmp_path): - conffile = str(tmp_path / "elastic.yml") - with open(conffile, "w") as f: - f.write(yaml.dump(swh_sched_config)) - monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile) - return conffile - - -@pytest.fixture -def swh_sched(swh_sched_config): - return get_scheduler(**swh_sched_config["scheduler"]) - - -@pytest.fixture -def swh_elasticsearch_backend(swh_sched_config): - from swh.scheduler.backend_es import ElasticSearchBackend - - backend = ElasticSearchBackend(**swh_sched_config) - backend.initialize() - return backend - - -@pytest.fixture -def swh_elasticsearch_memory(swh_elasticsearch_backend): - return swh_elasticsearch_backend.storage diff --git a/swh/scheduler/tests/es/test_backend_es.py b/swh/scheduler/tests/es/test_backend_es.py deleted file mode 100644 --- a/swh/scheduler/tests/es/test_backend_es.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime - -import elasticsearch -import pytest - -from swh.scheduler.backend_es import get_elasticsearch - -from ..common import TEMPLATES, tasks_from_template - - -def test_get_elasticsearch(): - with pytest.raises(ValueError, match="Unknown elasticsearch class"): - get_elasticsearch("unknown") - - es = get_elasticsearch("memory") - assert es - from swh.scheduler.elasticsearch_memory import MemoryElasticsearch - - assert isinstance(es, MemoryElasticsearch) - - es = get_elasticsearch("local") - assert es - assert isinstance(es, elasticsearch.Elasticsearch) - - -def test_backend_setup_basic(swh_elasticsearch_backend): - """Elastic search instance should allow to create/close/check index - - """ - index_name = "swh-tasks-2010-01" - try: - swh_elasticsearch_backend.storage.indices.get_mapping(index_name) - except (elasticsearch.exceptions.NotFoundError, KeyError): - pass - - assert not swh_elasticsearch_backend.storage.indices.exists(index_name) - swh_elasticsearch_backend.create(index_name) - assert swh_elasticsearch_backend.storage.indices.exists(index_name) - assert swh_elasticsearch_backend.is_index_opened(index_name) - - # index exists with a mapping - mapping = swh_elasticsearch_backend.storage.indices.get_mapping(index_name) - assert mapping != {} - - -def test_backend_setup_index(swh_elasticsearch_backend): - """Elastic search instance should allow to bulk index - - """ - template_git = TEMPLATES["git"] - next_run_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) - tasks = tasks_from_template(template_git, next_run_date, 1) - index_name = swh_elasticsearch_backend.compute_index_name( - next_run_date.year, next_run_date.month - ) - assert not swh_elasticsearch_backend.storage.indices.exists(index_name) - - tasks = list(swh_elasticsearch_backend.streaming_bulk(index_name, tasks)) - assert len(tasks) > 0 - - for output_task in tasks: - assert output_task is not None - assert output_task["type"] == template_git["type"] - assert output_task["arguments"] is not None - next_run = output_task["next_run"] - if isinstance(next_run, str): # real elasticsearch - assert next_run == next_run_date.isoformat() - else: # memory implem. does not really index - assert next_run == next_run_date - - assert swh_elasticsearch_backend.storage.indices.exists(index_name) - assert swh_elasticsearch_backend.is_index_opened(index_name) - mapping = swh_elasticsearch_backend.storage.indices.get_mapping(index_name) - assert mapping != {} diff --git a/swh/scheduler/tests/es/test_cli_task.py b/swh/scheduler/tests/es/test_cli_task.py deleted file mode 100644 --- a/swh/scheduler/tests/es/test_cli_task.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -import logging -import random -import uuid - -from click.testing import CliRunner -import pytest - -from swh.scheduler.cli import cli -from swh.scheduler.utils import utcnow - -from ..common import TASK_TYPES, TEMPLATES, tasks_from_template - -logger = logging.getLogger(__name__) - - -@pytest.mark.usefixtures("swh_elasticsearch_backend") -def test_cli_archive_tasks(swh_sched, swh_sched_config_file): - scheduler = swh_sched - template_git = TEMPLATES["git"] - template_hg = TEMPLATES["hg"] - # first initialize scheduler's db (is this still needed?) - for tt in TASK_TYPES.values(): - scheduler.create_task_type(tt) - - next_run_start = utcnow() - datetime.timedelta(days=1) - - recurring = tasks_from_template(template_git, next_run_start, 100) - oneshots = tasks_from_template( - template_hg, next_run_start - datetime.timedelta(days=1), 50 - ) - - past_time = next_run_start - datetime.timedelta(days=7) - - all_tasks = recurring + oneshots - result = scheduler.create_tasks(all_tasks) - assert len(result) == len(all_tasks) - - # simulate task run - backend_tasks = [ - { - "task": task["id"], - "backend_id": str(uuid.uuid4()), - "scheduled": next_run_start - datetime.timedelta(minutes=i % 60), - } - for i, task in enumerate(result) - ] - scheduler.mass_schedule_task_runs(backend_tasks) - - # Disable some tasks - tasks_to_disable = set() - for task in result: - status = random.choice(["disabled", "completed"]) - if status == "disabled": - tasks_to_disable.add(task["id"]) - - scheduler.disable_tasks(tasks_to_disable) - - git_tasks = scheduler.search_tasks(task_type=template_git["type"]) - hg_tasks = scheduler.search_tasks(task_type=template_hg["type"]) - assert len(git_tasks) + len(hg_tasks) == len(all_tasks) - - # Ensure the task_run are in expected state - task_runs = scheduler.get_task_runs([t["id"] for t in git_tasks + hg_tasks]) - - # Same for the tasks - for t in git_tasks + hg_tasks: - if t["id"] in tasks_to_disable: - assert t["status"] == "disabled" - - future_time = next_run_start + datetime.timedelta(days=1) - for tr in task_runs: - assert past_time <= tr["scheduled"] - assert tr["scheduled"] < future_time - - runner = CliRunner() - result = runner.invoke( - cli, - [ - "--config-file", - swh_sched_config_file, - "task", - "archive", - "--after", - past_time.isoformat(), - "--before", - future_time.isoformat(), - "--cleanup", - ], - obj={"log_level": logging.DEBUG,}, - ) - - assert result.exit_code == 0, result.output - - # disabled tasks should no longer be in the scheduler - git_tasks = scheduler.search_tasks(task_type=template_git["type"]) - hg_tasks = scheduler.search_tasks(task_type=template_hg["type"]) - remaining_tasks = git_tasks + hg_tasks - count_disabled = 0 - for task in remaining_tasks: - logger.debug(f"task status: {task['status']}") - if task["status"] == "disabled": - count_disabled += 1 - - assert count_disabled == 0 - assert len(remaining_tasks) == len(all_tasks) - len(tasks_to_disable) diff --git a/swh/scheduler/tests/es/test_elasticsearch_memory.py b/swh/scheduler/tests/es/test_elasticsearch_memory.py deleted file mode 100644 --- a/swh/scheduler/tests/es/test_elasticsearch_memory.py +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -import hashlib -import logging -import random -from typing import Any, Dict - -import pytest - -from swh.scheduler.elasticsearch_memory import BasicSerializer, BasicTransport - -from ..common import TEMPLATES, tasks_from_template - -logger = logging.getLogger(__name__) - - -def test_serializer(): - s = BasicSerializer() - assert s - - data = {"something": [1, 2, 3], "cool": {"1": "2"}} - actual_data = s.dumps(data) - - assert isinstance(actual_data, str) - assert actual_data == str(data) - - -def test_basic_transport(): - b = BasicTransport() - assert b - - assert isinstance(b.serializer, BasicSerializer) - - -def test_index_manipulation(swh_elasticsearch_memory): - index_name = "swh-tasks-xxxx" - indices = swh_elasticsearch_memory.index - - assert not swh_elasticsearch_memory.exists(index_name) - assert index_name not in indices - - # so stat raises - with pytest.raises(Exception): - swh_elasticsearch_memory.stats(index_name) - - # we create the index - swh_elasticsearch_memory.create(index_name) - - # now the index exists - assert swh_elasticsearch_memory.exists(index_name) - assert index_name in indices - # it's opened - assert indices[index_name]["status"] == "opened" - - # so stats is happy - swh_elasticsearch_memory.stats(index_name) - - # open the index, nothing changes - swh_elasticsearch_memory.open(index_name) - assert indices[index_name]["status"] == "opened" - - # close the index - swh_elasticsearch_memory.close(index_name) - - assert indices[index_name]["status"] == "closed" - - # reopen the index (fun times) - swh_elasticsearch_memory.open(index_name) - assert indices[index_name]["status"] == "opened" - - -def test_bulk_and_mget(swh_elasticsearch_memory): - # initialize tasks - template_git = TEMPLATES["git"] - next_run_start = datetime.datetime.utcnow() - datetime.timedelta(days=1) - - tasks = tasks_from_template(template_git, next_run_start, 100) - - def compute_id(stask): - return hashlib.sha1(stask.encode("utf-8")).hexdigest() - - body = [] - ids_to_task = {} - for task in tasks: - date = task["next_run"] - index_name = f"swh-tasks-{date.year}-{date.month}" - idx = {"index": {"_index": index_name}} - sidx = swh_elasticsearch_memory.transport.serializer.dumps(idx) - body.append(sidx) - - stask = swh_elasticsearch_memory.transport.serializer.dumps(task) - body.append(stask) - - _id = compute_id(stask) - ids_to_task[_id] = task - logger.debug(f"_id: {_id}, task: {task}") - - # store - - # create the index first - swh_elasticsearch_memory.create(index_name) - - # then bulk insert new data - result = swh_elasticsearch_memory.bulk("\n".join(body)) - - # no guarantee in the order - assert result - actual_items = result["items"] - assert len(actual_items) == len(ids_to_task) - - def get_id(data: Dict[str, Any]) -> str: - return data["index"]["_id"] - - actual_items = sorted(actual_items, key=get_id) - - expected_items = { - "items": [{"index": {"status": 200, "_id": _id}} for _id in list(ids_to_task)] - } - - expected_items = sorted(expected_items["items"], key=get_id) - assert actual_items == expected_items - - # retrieve - - nb_docs = 10 - ids = list(ids_to_task) - random_ids = [] - # add some inexistent ids - for i in range(16): - noisy_id = f"{i}" * 40 - random_ids.append(noisy_id) - random_ids.extend(random.sample(ids, nb_docs)) # add relevant ids - for i in range(16, 32): - noisy_id = f"{i}" * 40 - random_ids.append(noisy_id) - - result = swh_elasticsearch_memory.mget(index=index_name, body={"ids": random_ids}) - assert result["docs"] - assert len(result["docs"]) == nb_docs, "no random and inexistent id found" - for doc in result["docs"]: - assert doc["found"] - - actual_task = doc["_source"] - _id = compute_id(str(actual_task)) - expected_task = ids_to_task[_id] - assert actual_task == expected_task