Page MenuHomeSoftware Heritage

D6772.id.diff
No OneTemporary

D6772.id.diff

diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -6,7 +6,6 @@
attrs-strict
celery >= 4.3, != 5.0.3
click < 8.0
-elasticsearch > 5.4
flask
humanize
pika >= 1.1.0
diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py
deleted file mode 100644
--- a/swh/scheduler/backend_es.py
+++ /dev/null
@@ -1,269 +0,0 @@
-# Copyright (C) 2018-2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-"""Elastic Search backend
-
-"""
-
-from copy import deepcopy
-import datetime # noqa
-import logging
-from typing import Any, Dict
-
-from elasticsearch import helpers
-
-from swh.core import utils
-
-logger = logging.getLogger(__name__)
-
-
-DEFAULT_CONFIG = {
- "elasticsearch": {
- "cls": "local",
- "args": {
- "index_name_prefix": "swh-tasks",
- "storage_nodes": ["localhost:9200"],
- "client_options": {
- "sniff_on_start": False,
- "sniff_on_connection_fail": True,
- "http_compress": False,
- "sniffer_timeout": 60,
- },
- },
- }
-}
-
-
-def get_elasticsearch(cls: str, args: Dict[str, Any] = {}):
- """Instantiate an elastic search instance
-
- """
- if cls == "local":
- from elasticsearch import Elasticsearch
- elif cls == "memory":
- from .elasticsearch_memory import ( # type: ignore # noqa
- MemoryElasticsearch as Elasticsearch,
- )
- else:
- raise ValueError("Unknown elasticsearch class `%s`" % cls)
-
- return Elasticsearch(**args)
-
-
-class ElasticSearchBackend:
- """ElasticSearch backend to index tasks
-
- This uses an elasticsearch client to actually discuss with the
- elasticsearch instance.
-
- """
-
- def __init__(self, **config):
- self.config = deepcopy(DEFAULT_CONFIG)
- self.config.update(config)
- es_conf = self.config["elasticsearch"]
- args = deepcopy(es_conf["args"])
- self.index_name_prefix = args.pop("index_name_prefix")
- self.storage = get_elasticsearch(
- cls=es_conf["cls"],
- args={
- "hosts": args.get("storage_nodes", []),
- **args.get("client_options", {}),
- },
- )
- # document's index type (cf. /data/elastic-template.json)
- self.doc_type = "task"
-
- def initialize(self):
- self.storage.indices.put_mapping(
- index=f"{self.index_name_prefix}-*",
- doc_type=self.doc_type,
- # to allow type definition below
- include_type_name=True,
- # to allow install mapping even if no index yet
- allow_no_indices=True,
- body={
- "properties": {
- "task_id": {"type": "double"},
- "task_policy": {"type": "text"},
- "task_status": {"type": "text"},
- "task_run_id": {"type": "double"},
- "arguments": {
- "type": "object",
- "properties": {
- "args": {"type": "nested", "dynamic": False},
- "kwargs": {"type": "text"},
- },
- },
- "type": {"type": "text"},
- "backend_id": {"type": "text"},
- "metadata": {"type": "object", "enabled": False},
- "scheduled": {
- "type": "date",
- "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis", # noqa
- },
- "started": {
- "type": "date",
- "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis", # noqa
- },
- "ended": {
- "type": "date",
- "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis", # noqa
- },
- }
- },
- )
-
- self.storage.indices.put_settings(
- index=f"{self.index_name_prefix}-*",
- allow_no_indices=True,
- body={
- "index": {
- "codec": "best_compression",
- "refresh_interval": "1s",
- "number_of_shards": 1,
- }
- },
- )
-
- def create(self, index_name) -> None:
- """Create and initialize index_name with mapping for all indices
- matching `swh-tasks-` pattern
-
- """
- assert index_name.startswith(self.index_name_prefix)
- self.storage.indices.create(index_name)
-
- def compute_index_name(self, year, month):
- """Given a year, month, compute the index's name.
-
- """
- return "%s-%s-%s" % (self.index_name_prefix, year, "%02d" % month)
-
- def mget(self, index_name, doc_ids, chunk_size=500, source=True):
- """Retrieve document's full content according to their ids as per
- source's setup.
-
- The `source` allows to retrieve only what's interesting, e.g:
- - source=True ; gives back the original indexed data
- - source=False ; returns without the original _source field
- - source=['task_id'] ; returns only task_id in the _source field
-
- Args:
- index_name (str): Name of the concerned index.
- doc_ids (generator): Generator of ids to retrieve
- chunk_size (int): Number of documents chunk to send for retrieval
- source (bool/[str]): Source of information to return
-
- Yields:
- document indexed as per source's setup
-
- """
- if isinstance(source, list):
- source = {"_source": ",".join(source)}
- else:
- source = {"_source": str(source).lower()}
-
- for ids in utils.grouper(doc_ids, n=1000):
- res = self.storage.mget(
- body={"ids": list(ids)},
- index=index_name,
- doc_type=self.doc_type,
- params=source,
- )
- if not res:
- logger.error("Error during retrieval of data, skipping!")
- continue
-
- for doc in res["docs"]:
- found = doc.get("found")
- if not found:
- msg = "Doc id %s not found, not indexed yet" % doc["_id"]
- logger.warning(msg)
- continue
- yield doc["_source"]
-
- def _streaming_bulk(self, index_name, doc_stream, chunk_size=500):
- """Bulk index data and returns the successful indexed data's
- identifier.
-
- Args:
- index_name (str): Name of the concerned index.
- doc_stream (generator): Generator of documents to index
- chunk_size (int): Number of documents chunk to send for indexation
-
- Yields:
- document id indexed
-
- """
- actions = (
- {
- "_index": index_name,
- "_op_type": "index",
- "_type": self.doc_type,
- "_source": data,
- }
- for data in doc_stream
- )
- for ok, result in helpers.streaming_bulk(
- client=self.storage,
- actions=actions,
- chunk_size=chunk_size,
- raise_on_error=False,
- raise_on_exception=False,
- ):
- if not ok:
- logger.error("Error during %s indexation. Skipping.", result)
- continue
- yield result["index"]["_id"]
-
- def is_index_opened(self, index_name: str) -> bool:
- """Determine if an index is opened or not
-
- """
- try:
- self.storage.indices.stats(index_name)
- return True
- except Exception:
- # fails when indice is closed (no other api call found)
- return False
-
- def streaming_bulk(self, index_name, doc_stream, chunk_size=500, source=True):
- """Bulk index data and returns the successful indexed data as per
- source's setup.
-
- the `source` permits to retrieve only what's of interest to
- us, e.g:
-
- - source=True ; gives back the original indexed data
- - source=False ; returns without the original _source field
- - source=['task_id'] ; returns only task_id in the _source field
-
- Note that:
- - if the index is closed, it will be opened
- - if the index does not exist, it will be created and opened
-
- This keeps the index opened for performance reasons.
-
- Args:
- index_name (str): Name of the concerned index.
- doc_stream (generator): Document generator to index
- chunk_size (int): Number of documents chunk to send
- source (bool, [str]): the information to return
-
- """
- # index must exist
- if not self.storage.indices.exists(index_name):
- self.create(index_name)
- # index must be opened
- if not self.is_index_opened(index_name):
- self.storage.indices.open(index_name)
-
- indexed_ids = self._streaming_bulk(
- index_name, doc_stream, chunk_size=chunk_size
- )
- yield from self.mget(
- index_name, indexed_ids, chunk_size=chunk_size, source=source
- )
diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py
--- a/swh/scheduler/cli/task.py
+++ b/swh/scheduler/cli/task.py
@@ -8,8 +8,7 @@
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import locale
-import logging
-from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional
+from typing import TYPE_CHECKING, Iterator, List, Optional
import click
@@ -589,166 +588,3 @@
output.append("Respawn tasks %s\n" % (task_ids_int,))
click.echo("\n".join(output))
-
-
-@task.command("archive")
-@click.option(
- "--before",
- "-b",
- default=None,
- help="""Task whose ended date is anterior will be archived.
- Default to current month's first day.""",
-)
-@click.option(
- "--after",
- "-a",
- default=None,
- help="""Task whose ended date is after the specified date will
- be archived. Default to prior month's first day.""",
-)
-@click.option(
- "--batch-index",
- default=1000,
- type=click.INT,
- help="Batch size of tasks to read from db to archive",
-)
-@click.option(
- "--bulk-index",
- default=200,
- type=click.INT,
- help="Batch size of tasks to bulk index",
-)
-@click.option(
- "--batch-clean",
- default=1000,
- type=click.INT,
- help="Batch size of task to clean after archival",
-)
-@click.option(
- "--dry-run/--no-dry-run",
- is_flag=True,
- default=False,
- help="Default to list only what would be archived.",
-)
-@click.option("--verbose", is_flag=True, default=False, help="Verbose mode")
-@click.option(
- "--cleanup/--no-cleanup",
- is_flag=True,
- default=True,
- help="Clean up archived tasks (default)",
-)
-@click.option(
- "--start-from",
- type=click.STRING,
- default=None,
- help="(Optional) default page to start from.",
-)
-@click.pass_context
-def archive_tasks(
- ctx,
- before,
- after,
- batch_index,
- bulk_index,
- batch_clean,
- dry_run,
- verbose,
- cleanup,
- start_from,
-):
- """Archive task/task_run whose (task_type is 'oneshot' and task_status
- is 'completed') or (task_type is 'recurring' and task_status is
- 'disabled').
-
- With --dry-run flag set (default), only list those.
-
- """
- from itertools import groupby
-
- from swh.core.utils import grouper
- from swh.scheduler.backend_es import ElasticSearchBackend
- from swh.scheduler.utils import utcnow
-
- config = ctx.obj["config"]
- scheduler = ctx.obj["scheduler"]
-
- if not scheduler:
- raise ValueError("Scheduler class (local/remote) must be instantiated")
-
- logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
- logger = logging.getLogger(__name__)
- logging.getLogger("urllib3").setLevel(logging.WARN)
- logging.getLogger("elasticsearch").setLevel(logging.ERROR)
- if dry_run:
- logger.info("**DRY-RUN** (only reading db)")
- if not cleanup:
- logger.info("**NO CLEANUP**")
-
- es_storage = ElasticSearchBackend(**config)
- now = utcnow()
-
- # Default to archive tasks from a rolling month starting the week
- # prior to the current one
- if not before:
- before = now.shift(weeks=-1).format("YYYY-MM-DD")
-
- if not after:
- after = now.shift(weeks=-1).shift(months=-1).format("YYYY-MM-DD")
-
- logger.debug(
- "index: %s; cleanup: %s; period: [%s ; %s]"
- % (not dry_run, not dry_run and cleanup, after, before)
- )
-
- def get_index_name(
- data: Dict[str, Any], es_storage: ElasticSearchBackend = es_storage
- ) -> str:
- """Given a data record, determine the index's name through its ending
- date. This varies greatly depending on the task_run's
- status.
-
- """
- date = data.get("started")
- if not date:
- date = data["scheduled"]
- return es_storage.compute_index_name(date.year, date.month)
-
- def index_data(before, page_token, batch_index):
- while True:
- result = scheduler.filter_task_to_archive(
- after, before, page_token=page_token, limit=batch_index
- )
- tasks_sorted = sorted(result["tasks"], key=get_index_name)
- groups = groupby(tasks_sorted, key=get_index_name)
- for index_name, tasks_group in groups:
- logger.debug("Index tasks to %s" % index_name)
- if dry_run:
- for task in tasks_group:
- yield task
- continue
-
- yield from es_storage.streaming_bulk(
- index_name,
- tasks_group,
- source=["task_id", "task_run_id"],
- chunk_size=bulk_index,
- )
-
- page_token = result.get("next_page_token")
- if page_token is None:
- break
-
- gen = index_data(before, page_token=start_from, batch_index=batch_index)
- if cleanup:
- for task_ids in grouper(gen, n=batch_clean):
- task_ids = list(task_ids)
- logger.info("Clean up %s tasks: [%s, ...]" % (len(task_ids), task_ids[0]))
- if dry_run: # no clean up
- continue
- ctx.obj["scheduler"].delete_archived_tasks(task_ids)
- else:
- for task_ids in grouper(gen, n=batch_index):
- task_ids = list(task_ids)
- logger.info("Indexed %s tasks: [%s, ...]" % (len(task_ids), task_ids[0]))
-
- logger.debug("Done!")
diff --git a/swh/scheduler/elasticsearch_memory.py b/swh/scheduler/elasticsearch_memory.py
deleted file mode 100644
--- a/swh/scheduler/elasticsearch_memory.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Copyright (C) 2018-2019 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-"""Memory Elastic Search backend
-
-"""
-
-from ast import literal_eval
-import datetime # noqa serialization purposes
-import hashlib
-import logging
-from typing import Optional
-
-import psycopg2 # noqa serialization purposes
-
-logger = logging.getLogger(__name__)
-
-
-class BasicSerializer:
- """For memory elastic search implementation (not for production)
-
- """
-
- def __init__(self, *args, **kwargs):
- pass
-
- def dumps(self, *args, **kwargs):
- return str(*args)
-
-
-class BasicTransport:
- """For memory elastic search implementation, (not for production)
-
- """
-
- def __init__(self, *args, **kwargs):
- self.serializer = BasicSerializer()
-
-
-class MemoryElasticsearch:
- """Memory Elasticsearch instance (for test purposes)
-
- Partial implementation oriented towards index storage (and not search)
-
- For now, its sole client is the scheduler for task archival purposes.
-
- """
-
- def __init__(self, *args, **kwargs):
- self.index = {}
- self.mapping = {}
- self.settings = {}
- self.indices = self # HACK
- self.main_mapping_key: Optional[str] = None
- self.main_settings_key: Optional[str] = None
- self.transport = BasicTransport()
-
- def create(self, index, **kwargs):
- logger.debug(f"create index {index}")
- logger.debug(f"indices: {self.index}")
- logger.debug(f"mapping: {self.mapping}")
- logger.debug(f"settings: {self.settings}")
- self.index[index] = {
- "status": "opened",
- "data": {},
- "mapping": self.get_mapping(self.main_mapping_key),
- "settings": self.get_settings(self.main_settings_key),
- }
- logger.debug(f"index {index} created")
-
- def close(self, index, **kwargs):
- """Close index"""
- idx = self.index.get(index)
- if idx:
- idx["status"] = "closed"
-
- def open(self, index, **kwargs):
- """Open index"""
- idx = self.index.get(index)
- if idx:
- idx["status"] = "opened"
-
- def bulk(self, body, **kwargs):
- """Bulk insert document in index"""
- assert isinstance(body, str)
- all_data = body.split("\n")
- if all_data[-1] == "":
- all_data = all_data[:-1] # drop the empty line if any
- ids = []
- # data is sent as tuple (index, data-to-index)
- for i in range(0, len(all_data), 2):
- # The first entry is about the index to use
- # not about a data to index
- # find the index
- index_data = literal_eval(all_data[i])
- idx_name = index_data["index"]["_index"]
- # associated data to index
- data = all_data[i + 1]
- _id = hashlib.sha1(data.encode("utf-8")).hexdigest()
- parsed_data = eval(data) # for datetime
- self.index[idx_name]["data"][_id] = parsed_data
- ids.append(_id)
-
- # everything is indexed fine
- return {"items": [{"index": {"status": 200, "_id": _id,}} for _id in ids]}
-
- def mget(self, *args, body, index, **kwargs):
- """Bulk indexed documents retrieval"""
- idx = self.index[index]
- docs = []
- idx_docs = idx["data"]
- for _id in body["ids"]:
- doc = idx_docs.get(_id)
- if doc:
- d = {
- "found": True,
- "_source": doc,
- }
- docs.append(d)
- return {"docs": docs}
-
- def stats(self, index, **kwargs):
- idx = self.index[index] # will raise if it does not exist
- if not idx or idx["status"] == "closed":
- raise ValueError("Closed index") # simulate issue if index closed
-
- def exists(self, index, **kwargs):
- return self.index.get(index) is not None
-
- def put_mapping(self, index, body, **kwargs):
- self.mapping[index] = body
- self.main_mapping_key = index
-
- def get_mapping(self, index, **kwargs):
- return self.mapping.get(index) or self.index.get(index, {}).get("mapping", {})
-
- def put_settings(self, index, body, **kwargs):
- self.settings[index] = body
- self.main_settings_key = index
-
- def get_settings(self, index, **kwargs):
- return self.settings.get(index) or self.index.get(index, {}).get("settings", {})
diff --git a/swh/scheduler/tests/es/__init__.py b/swh/scheduler/tests/es/__init__.py
deleted file mode 100644
diff --git a/swh/scheduler/tests/es/conftest.py b/swh/scheduler/tests/es/conftest.py
deleted file mode 100644
--- a/swh/scheduler/tests/es/conftest.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# Copyright (C) 2019 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import pytest
-import yaml
-
-from swh.scheduler import get_scheduler
-
-
-@pytest.fixture
-def swh_sched_config(swh_scheduler_config):
- return {
- "scheduler": {"cls": "local", **swh_scheduler_config,},
- "elasticsearch": {
- "cls": "memory",
- "args": {"index_name_prefix": "swh-tasks",},
- },
- }
-
-
-@pytest.fixture
-def swh_sched_config_file(swh_sched_config, monkeypatch, tmp_path):
- conffile = str(tmp_path / "elastic.yml")
- with open(conffile, "w") as f:
- f.write(yaml.dump(swh_sched_config))
- monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile)
- return conffile
-
-
-@pytest.fixture
-def swh_sched(swh_sched_config):
- return get_scheduler(**swh_sched_config["scheduler"])
-
-
-@pytest.fixture
-def swh_elasticsearch_backend(swh_sched_config):
- from swh.scheduler.backend_es import ElasticSearchBackend
-
- backend = ElasticSearchBackend(**swh_sched_config)
- backend.initialize()
- return backend
-
-
-@pytest.fixture
-def swh_elasticsearch_memory(swh_elasticsearch_backend):
- return swh_elasticsearch_backend.storage
diff --git a/swh/scheduler/tests/es/test_backend_es.py b/swh/scheduler/tests/es/test_backend_es.py
deleted file mode 100644
--- a/swh/scheduler/tests/es/test_backend_es.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright (C) 2019-2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-
-import elasticsearch
-import pytest
-
-from swh.scheduler.backend_es import get_elasticsearch
-
-from ..common import TEMPLATES, tasks_from_template
-
-
-def test_get_elasticsearch():
- with pytest.raises(ValueError, match="Unknown elasticsearch class"):
- get_elasticsearch("unknown")
-
- es = get_elasticsearch("memory")
- assert es
- from swh.scheduler.elasticsearch_memory import MemoryElasticsearch
-
- assert isinstance(es, MemoryElasticsearch)
-
- es = get_elasticsearch("local")
- assert es
- assert isinstance(es, elasticsearch.Elasticsearch)
-
-
-def test_backend_setup_basic(swh_elasticsearch_backend):
- """Elastic search instance should allow to create/close/check index
-
- """
- index_name = "swh-tasks-2010-01"
- try:
- swh_elasticsearch_backend.storage.indices.get_mapping(index_name)
- except (elasticsearch.exceptions.NotFoundError, KeyError):
- pass
-
- assert not swh_elasticsearch_backend.storage.indices.exists(index_name)
- swh_elasticsearch_backend.create(index_name)
- assert swh_elasticsearch_backend.storage.indices.exists(index_name)
- assert swh_elasticsearch_backend.is_index_opened(index_name)
-
- # index exists with a mapping
- mapping = swh_elasticsearch_backend.storage.indices.get_mapping(index_name)
- assert mapping != {}
-
-
-def test_backend_setup_index(swh_elasticsearch_backend):
- """Elastic search instance should allow to bulk index
-
- """
- template_git = TEMPLATES["git"]
- next_run_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
- tasks = tasks_from_template(template_git, next_run_date, 1)
- index_name = swh_elasticsearch_backend.compute_index_name(
- next_run_date.year, next_run_date.month
- )
- assert not swh_elasticsearch_backend.storage.indices.exists(index_name)
-
- tasks = list(swh_elasticsearch_backend.streaming_bulk(index_name, tasks))
- assert len(tasks) > 0
-
- for output_task in tasks:
- assert output_task is not None
- assert output_task["type"] == template_git["type"]
- assert output_task["arguments"] is not None
- next_run = output_task["next_run"]
- if isinstance(next_run, str): # real elasticsearch
- assert next_run == next_run_date.isoformat()
- else: # memory implem. does not really index
- assert next_run == next_run_date
-
- assert swh_elasticsearch_backend.storage.indices.exists(index_name)
- assert swh_elasticsearch_backend.is_index_opened(index_name)
- mapping = swh_elasticsearch_backend.storage.indices.get_mapping(index_name)
- assert mapping != {}
diff --git a/swh/scheduler/tests/es/test_cli_task.py b/swh/scheduler/tests/es/test_cli_task.py
deleted file mode 100644
--- a/swh/scheduler/tests/es/test_cli_task.py
+++ /dev/null
@@ -1,111 +0,0 @@
-# Copyright (C) 2019 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-import logging
-import random
-import uuid
-
-from click.testing import CliRunner
-import pytest
-
-from swh.scheduler.cli import cli
-from swh.scheduler.utils import utcnow
-
-from ..common import TASK_TYPES, TEMPLATES, tasks_from_template
-
-logger = logging.getLogger(__name__)
-
-
-@pytest.mark.usefixtures("swh_elasticsearch_backend")
-def test_cli_archive_tasks(swh_sched, swh_sched_config_file):
- scheduler = swh_sched
- template_git = TEMPLATES["git"]
- template_hg = TEMPLATES["hg"]
- # first initialize scheduler's db (is this still needed?)
- for tt in TASK_TYPES.values():
- scheduler.create_task_type(tt)
-
- next_run_start = utcnow() - datetime.timedelta(days=1)
-
- recurring = tasks_from_template(template_git, next_run_start, 100)
- oneshots = tasks_from_template(
- template_hg, next_run_start - datetime.timedelta(days=1), 50
- )
-
- past_time = next_run_start - datetime.timedelta(days=7)
-
- all_tasks = recurring + oneshots
- result = scheduler.create_tasks(all_tasks)
- assert len(result) == len(all_tasks)
-
- # simulate task run
- backend_tasks = [
- {
- "task": task["id"],
- "backend_id": str(uuid.uuid4()),
- "scheduled": next_run_start - datetime.timedelta(minutes=i % 60),
- }
- for i, task in enumerate(result)
- ]
- scheduler.mass_schedule_task_runs(backend_tasks)
-
- # Disable some tasks
- tasks_to_disable = set()
- for task in result:
- status = random.choice(["disabled", "completed"])
- if status == "disabled":
- tasks_to_disable.add(task["id"])
-
- scheduler.disable_tasks(tasks_to_disable)
-
- git_tasks = scheduler.search_tasks(task_type=template_git["type"])
- hg_tasks = scheduler.search_tasks(task_type=template_hg["type"])
- assert len(git_tasks) + len(hg_tasks) == len(all_tasks)
-
- # Ensure the task_run are in expected state
- task_runs = scheduler.get_task_runs([t["id"] for t in git_tasks + hg_tasks])
-
- # Same for the tasks
- for t in git_tasks + hg_tasks:
- if t["id"] in tasks_to_disable:
- assert t["status"] == "disabled"
-
- future_time = next_run_start + datetime.timedelta(days=1)
- for tr in task_runs:
- assert past_time <= tr["scheduled"]
- assert tr["scheduled"] < future_time
-
- runner = CliRunner()
- result = runner.invoke(
- cli,
- [
- "--config-file",
- swh_sched_config_file,
- "task",
- "archive",
- "--after",
- past_time.isoformat(),
- "--before",
- future_time.isoformat(),
- "--cleanup",
- ],
- obj={"log_level": logging.DEBUG,},
- )
-
- assert result.exit_code == 0, result.output
-
- # disabled tasks should no longer be in the scheduler
- git_tasks = scheduler.search_tasks(task_type=template_git["type"])
- hg_tasks = scheduler.search_tasks(task_type=template_hg["type"])
- remaining_tasks = git_tasks + hg_tasks
- count_disabled = 0
- for task in remaining_tasks:
- logger.debug(f"task status: {task['status']}")
- if task["status"] == "disabled":
- count_disabled += 1
-
- assert count_disabled == 0
- assert len(remaining_tasks) == len(all_tasks) - len(tasks_to_disable)
diff --git a/swh/scheduler/tests/es/test_elasticsearch_memory.py b/swh/scheduler/tests/es/test_elasticsearch_memory.py
deleted file mode 100644
--- a/swh/scheduler/tests/es/test_elasticsearch_memory.py
+++ /dev/null
@@ -1,150 +0,0 @@
-# Copyright (C) 2019 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-import hashlib
-import logging
-import random
-from typing import Any, Dict
-
-import pytest
-
-from swh.scheduler.elasticsearch_memory import BasicSerializer, BasicTransport
-
-from ..common import TEMPLATES, tasks_from_template
-
-logger = logging.getLogger(__name__)
-
-
-def test_serializer():
- s = BasicSerializer()
- assert s
-
- data = {"something": [1, 2, 3], "cool": {"1": "2"}}
- actual_data = s.dumps(data)
-
- assert isinstance(actual_data, str)
- assert actual_data == str(data)
-
-
-def test_basic_transport():
- b = BasicTransport()
- assert b
-
- assert isinstance(b.serializer, BasicSerializer)
-
-
-def test_index_manipulation(swh_elasticsearch_memory):
- index_name = "swh-tasks-xxxx"
- indices = swh_elasticsearch_memory.index
-
- assert not swh_elasticsearch_memory.exists(index_name)
- assert index_name not in indices
-
- # so stat raises
- with pytest.raises(Exception):
- swh_elasticsearch_memory.stats(index_name)
-
- # we create the index
- swh_elasticsearch_memory.create(index_name)
-
- # now the index exists
- assert swh_elasticsearch_memory.exists(index_name)
- assert index_name in indices
- # it's opened
- assert indices[index_name]["status"] == "opened"
-
- # so stats is happy
- swh_elasticsearch_memory.stats(index_name)
-
- # open the index, nothing changes
- swh_elasticsearch_memory.open(index_name)
- assert indices[index_name]["status"] == "opened"
-
- # close the index
- swh_elasticsearch_memory.close(index_name)
-
- assert indices[index_name]["status"] == "closed"
-
- # reopen the index (fun times)
- swh_elasticsearch_memory.open(index_name)
- assert indices[index_name]["status"] == "opened"
-
-
-def test_bulk_and_mget(swh_elasticsearch_memory):
- # initialize tasks
- template_git = TEMPLATES["git"]
- next_run_start = datetime.datetime.utcnow() - datetime.timedelta(days=1)
-
- tasks = tasks_from_template(template_git, next_run_start, 100)
-
- def compute_id(stask):
- return hashlib.sha1(stask.encode("utf-8")).hexdigest()
-
- body = []
- ids_to_task = {}
- for task in tasks:
- date = task["next_run"]
- index_name = f"swh-tasks-{date.year}-{date.month}"
- idx = {"index": {"_index": index_name}}
- sidx = swh_elasticsearch_memory.transport.serializer.dumps(idx)
- body.append(sidx)
-
- stask = swh_elasticsearch_memory.transport.serializer.dumps(task)
- body.append(stask)
-
- _id = compute_id(stask)
- ids_to_task[_id] = task
- logger.debug(f"_id: {_id}, task: {task}")
-
- # store
-
- # create the index first
- swh_elasticsearch_memory.create(index_name)
-
- # then bulk insert new data
- result = swh_elasticsearch_memory.bulk("\n".join(body))
-
- # no guarantee in the order
- assert result
- actual_items = result["items"]
- assert len(actual_items) == len(ids_to_task)
-
- def get_id(data: Dict[str, Any]) -> str:
- return data["index"]["_id"]
-
- actual_items = sorted(actual_items, key=get_id)
-
- expected_items = {
- "items": [{"index": {"status": 200, "_id": _id}} for _id in list(ids_to_task)]
- }
-
- expected_items = sorted(expected_items["items"], key=get_id)
- assert actual_items == expected_items
-
- # retrieve
-
- nb_docs = 10
- ids = list(ids_to_task)
- random_ids = []
- # add some inexistent ids
- for i in range(16):
- noisy_id = f"{i}" * 40
- random_ids.append(noisy_id)
- random_ids.extend(random.sample(ids, nb_docs)) # add relevant ids
- for i in range(16, 32):
- noisy_id = f"{i}" * 40
- random_ids.append(noisy_id)
-
- result = swh_elasticsearch_memory.mget(index=index_name, body={"ids": random_ids})
- assert result["docs"]
- assert len(result["docs"]) == nb_docs, "no random and inexistent id found"
- for doc in result["docs"]:
- assert doc["found"]
-
- actual_task = doc["_source"]
- _id = compute_id(str(actual_task))
- expected_task = ids_to_task[_id]
- assert actual_task == expected_task

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 5:42 AM (8 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221444

Event Timeline