diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -15,6 +15,16 @@ from swh.search.interface import MinimalOriginDict, OriginDict, PagedResult from swh.search.metrics import send_metric, timed +INDEX_NAME_PARAM = "index" +READ_ALIAS_PARAM = "read_alias" +WRITE_ALIAS_PARAM = "write_alias" + +ORIGIN_DEFAULT_CONFIG = { + INDEX_NAME_PARAM: "origin", + READ_ALIAS_PARAM: "origin-read", + WRITE_ALIAS_PARAM: "origin-write", +} + def _sanitize_origin(origin): origin = origin.copy() @@ -59,14 +69,21 @@ class ElasticSearch: - def __init__(self, hosts: List[str], index_prefix=None): + def __init__(self, hosts: List[str], indexes: Dict[str, Dict[str, str]]): self._backend = Elasticsearch(hosts=hosts) - self.index_prefix = index_prefix - self.origin_index = "origin" + # Merge current configuration with default values + origin_config = indexes.get("origin", {}) + self.origin_config = {**ORIGIN_DEFAULT_CONFIG, **origin_config} + + def _get_origin_index(self) -> str: + return self.origin_config[INDEX_NAME_PARAM] - if index_prefix: - self.origin_index = index_prefix + "_" + self.origin_index + def _get_origin_read_alias(self) -> str: + return self.origin_config[READ_ALIAS_PARAM] + + def _get_origin_write_alias(self) -> str: + return self.origin_config[WRITE_ALIAS_PARAM] @timed def check(self): @@ -77,11 +94,23 @@ self._backend.indices.delete(index="*") def initialize(self) -> None: - """Declare Elasticsearch indices and mappings""" - if not self._backend.indices.exists(index=self.origin_index): - self._backend.indices.create(index=self.origin_index) + """Declare Elasticsearch indices, aliases and mappings""" + + if not self._backend.indices.exists(index=self._get_origin_index()): + self._backend.indices.create(index=self._get_origin_index()) + + if not self._backend.indices.exists_alias(self._get_origin_read_alias()): + self._backend.indices.put_alias( + index=self._get_origin_index(), name=self._get_origin_read_alias() + ) + + if not self._backend.indices.exists_alias(self._get_origin_write_alias()): + self._backend.indices.put_alias( + index=self._get_origin_index(), name=self._get_origin_write_alias() + ) + self._backend.indices.put_mapping( - index=self.origin_index, + index=self._get_origin_index(), body={ "date_detection": False, "properties": { @@ -122,10 +151,11 @@ @timed def flush(self) -> None: - self._backend.indices.refresh(index=self.origin_index) + self._backend.indices.refresh(index=self._get_origin_write_alias()) @timed def origin_update(self, documents: Iterable[OriginDict]) -> None: + write_index = self._get_origin_write_alias() documents = map(_sanitize_origin, documents) documents_with_sha1 = ( (origin_identifier(document), document) for document in documents @@ -152,7 +182,7 @@ { "_op_type": "update", "_id": sha1, - "_index": self.origin_index, + "_index": write_index, "scripted_upsert": True, "upsert": {**document, "sha1": sha1,}, "script": { @@ -164,9 +194,7 @@ for (sha1, document) in documents_with_sha1 ] - indexed_count, errors = helpers.bulk( - self._backend, actions, index=self.origin_index - ) + indexed_count, errors = helpers.bulk(self._backend, actions, index=write_index) assert isinstance(errors, List) # Make mypy happy send_metric("document:index", count=indexed_count, method_name="origin_update") @@ -175,10 +203,10 @@ ) def origin_dump(self) -> Iterator[model.Origin]: - results = helpers.scan(self._backend, index=self.origin_index) + results = helpers.scan(self._backend, index=self._get_origin_read_alias()) for hit in results: yield self._backend.termvectors( - index=self.origin_index, id=hit["_id"], fields=["*"] + index=self._get_origin_read_alias(), id=hit["_id"], fields=["*"] ) @timed @@ -258,7 +286,9 @@ page_token_content[b"sha1"].decode("ascii"), ] - res = self._backend.search(index=self.origin_index, body=body, size=limit) + res = self._backend.search( + index=self._get_origin_read_alias(), body=body, size=limit + ) hits = res["hits"]["hits"] diff --git a/swh/search/tests/conftest.py b/swh/search/tests/conftest.py --- a/swh/search/tests/conftest.py +++ b/swh/search/tests/conftest.py @@ -124,7 +124,15 @@ """ logger.debug("swh_search: elasticsearch_host: %s", elasticsearch_host) search = get_search( - "elasticsearch", hosts=[elasticsearch_host], index_prefix="test" + "elasticsearch", + hosts=[elasticsearch_host], + indexes={ + "origin": { + "index": "test", + "read_alias": "test-read", + "write_alias": "test-write", + } + }, ) search.deinitialize() # To reset internal state from previous runs search.initialize() # install required index diff --git a/swh/search/tests/test_api_client.py b/swh/search/tests/test_api_client.py --- a/swh/search/tests/test_api_client.py +++ b/swh/search/tests/test_api_client.py @@ -23,7 +23,16 @@ self.config = { "search": { "cls": "elasticsearch", - "args": {"hosts": [self._elasticsearch_host], "index_prefix": "test"}, + "args": { + "hosts": [self._elasticsearch_host], + "indexes": { + "origin": { + "index": "test", + "read_alias": "test-read", + "write_alias": "test-write", + } + }, + }, } } self.app = app @@ -33,7 +42,15 @@ def reset(self): search = get_search( - "elasticsearch", hosts=[self._elasticsearch_host], index_prefix="test" + "elasticsearch", + hosts=[self._elasticsearch_host], + indexes={ + "origin": { + "index": "test", + "read_alias": "test-read", + "write_alias": "test-write", + } + }, ) search.deinitialize() search.initialize() diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -21,7 +21,11 @@ cls: elasticsearch hosts: - '%(elasticsearch_host)s' - index_prefix: test + indexes: + origin: + index: test + read_alias: test-read + write_alias: test-write """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ @@ -392,19 +396,45 @@ ) -def test__initialize__with_prefix(elasticsearch_host): - """Initializing the index with a prefix should create an _origin index""" +def test__initialize__with_index_name(elasticsearch_host): + """Initializing the index with an index name should create the right index""" search = get_search( - "elasticsearch", hosts=[elasticsearch_host], index_prefix="test" + "elasticsearch", + hosts=[elasticsearch_host], + indexes={"origin": {"index": "test"}}, ) - assert search.origin_index == "test_origin" + assert search._get_origin_index() == "test" + assert search._get_origin_read_alias() == "origin-read" + assert search._get_origin_write_alias() == "origin-write" -def test__initialize__without_prefix(elasticsearch_host): - """Initializing the index without a prefix should create an origin index""" +def test__initialize__with_read_alias(elasticsearch_host): + """Initializing the index with a search alias name should create + the right search alias""" - search = get_search("elasticsearch", hosts=[elasticsearch_host]) + search = get_search( + "elasticsearch", + hosts=[elasticsearch_host], + indexes={"origin": {"read_alias": "test"}}, + ) + + assert search._get_origin_index() == "origin" + assert search._get_origin_read_alias() == "test" + assert search._get_origin_write_alias() == "origin-write" + + +def test__initialize__with_write_alias(elasticsearch_host): + """Initializing the index with an indexing alias name should create + the right indexing alias""" + + search = get_search( + "elasticsearch", + hosts=[elasticsearch_host], + indexes={"origin": {"write_alias": "test"}}, + ) - assert search.origin_index == "origin" + assert search._get_origin_index() == "origin" + assert search._get_origin_read_alias() == "origin-read" + assert search._get_origin_write_alias() == "test" diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py --- a/swh/search/tests/test_elasticsearch.py +++ b/swh/search/tests/test_elasticsearch.py @@ -69,3 +69,19 @@ "operation": "index_error", }, ) + + def test_write_alias_usage(self): + mock = self.mocker.patch("elasticsearch.helpers.bulk") + mock.return_value = 2, ["result"] + + self.search.origin_update([{"url": "http://foobar.baz"}]) + + assert mock.call_args[1]["index"] == "test-write" + + def test_read_alias_usage(self): + mock = self.mocker.patch("elasticsearch.Elasticsearch.search") + mock.return_value = {"hits": {"hits": []}} + + self.search.origin_search(url_pattern="foobar.baz") + + assert mock.call_args[1]["index"] == "test-read" diff --git a/swh/search/tests/test_init.py b/swh/search/tests/test_init.py --- a/swh/search/tests/test_init.py +++ b/swh/search/tests/test_init.py @@ -15,7 +15,11 @@ SEARCH_IMPLEMENTATIONS_KWARGS = [ ("remote", RemoteSearch, {"url": "localhost"}), - ("elasticsearch", ElasticSearch, {"hosts": ["localhost"], "index_prefix": "test"}), + ( + "elasticsearch", + ElasticSearch, + {"hosts": ["localhost"], "indexes": {"origin": {"index": "test"}}}, + ), ] SEARCH_IMPLEMENTATIONS = SEARCH_IMPLEMENTATIONS_KWARGS + [ diff --git a/swh/search/tests/test_server.py b/swh/search/tests/test_server.py --- a/swh/search/tests/test_server.py +++ b/swh/search/tests/test_server.py @@ -14,18 +14,45 @@ from swh.search.api.server import load_and_check_config, make_app_from_configfile +def _write_config_file(tmp_path, monkeypatch, content): + conf_path = os.path.join(str(tmp_path), "search.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(content)) + monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) + return conf_path + + @pytest.fixture -def swh_search_server_config() -> Dict[str, Any]: +def swh_search_server_config_without_indexes() -> Dict[str, Any]: return {"search": {"cls": "elasticsearch", "hosts": ["es1"],}} @pytest.fixture -def swh_search_config(monkeypatch, swh_search_server_config, tmp_path): - conf_path = os.path.join(str(tmp_path), "search.yml") - with open(conf_path, "w") as f: - f.write(yaml.dump(swh_search_server_config)) - monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) - return conf_path +def swh_search_server_config_with_indexes( + swh_search_server_config_without_indexes, +) -> Dict[str, Any]: + return { + **swh_search_server_config_without_indexes, + **{"indexes": {"origin": {"index": "test"}}}, + } + + +@pytest.fixture +def swh_search_config_without_indexes( + monkeypatch, swh_search_server_config_without_indexes, tmp_path +): + return _write_config_file( + tmp_path, monkeypatch, swh_search_server_config_without_indexes + ) + + +@pytest.fixture +def swh_search_config_with_indexes( + monkeypatch, swh_search_server_config_with_indexes, tmp_path +): + return _write_config_file( + tmp_path, monkeypatch, swh_search_server_config_with_indexes + ) def prepare_config_file(tmpdir, config_dict: Dict, name: str = "config.yml") -> str: @@ -69,14 +96,30 @@ load_and_check_config(config_path) -def test_load_and_check_config_local_config_fine(swh_search_server_config, tmpdir): +def test_load_and_check_config_local_config_fine( + swh_search_server_config_with_indexes, tmpdir +): """'local' complete configuration is fine""" - config_path = prepare_config_file(tmpdir, swh_search_server_config) + config_path = prepare_config_file(tmpdir, swh_search_server_config_with_indexes) cfg = load_and_check_config(config_path) - assert cfg == swh_search_server_config + assert cfg == swh_search_server_config_with_indexes + + +def test_server_make_app_from_config_file_without_indexes( + swh_search_config_without_indexes, +): + app = make_app_from_configfile() + expected_cfg = load_from_envvar() + + assert app is not None + assert isinstance(app, RPCServerApp) + assert app.config["search"] == expected_cfg["search"] + + app2 = make_app_from_configfile() + assert app is app2 -def test_server_make_app_from_config_file(swh_search_config): +def test_server_make_app_from_config_file_with_indexes(swh_search_config_with_indexes,): app = make_app_from_configfile() expected_cfg = load_from_envvar()