diff --git a/PKG-INFO b/PKG-INFO index 45fd492..922684c 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,52 +1,52 @@ Metadata-Version: 2.1 Name: swh.search -Version: 0.6.1 +Version: 0.7.0 Summary: Software Heritage search service Home-page: https://forge.softwareheritage.org/diffusion/DSEA Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-search Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-search/ Description: swh-search ========== Search service for the Software Heritage archive. It is similar to swh-storage in what it contains, but provides different ways to query it: while swh-storage is mostly a key-value store that returns an object from a primary key, swh-search is focused on reverse indices, to allow finding objects that match some criteria; for example full-text search. Currently uses ElasticSearch, and provides only origin search (by URL and metadata) # Dependencies Python tests for this module include tests that cannot be run without a local ElasticSearch instance, so you need the ElasticSearch server executable on your machine (no need to have a running ElasticSearch server). ## Debian-like host The elasticsearch package is required. As it's not part of debian-stable, [another debian repository is required to be configured](https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html#deb-repo) ## Non Debian-like host The tests expect: - `/usr/share/elasticsearch/jdk/bin/java` to exist. - `org.elasticsearch.bootstrap.Elasticsearch` to be in java's classpath. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements.txt b/requirements.txt index 60e006a..12608a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click elasticsearch>=7.0.0,<8.0.0 +typing-extensions diff --git a/swh.search.egg-info/PKG-INFO b/swh.search.egg-info/PKG-INFO index 45fd492..922684c 100644 --- a/swh.search.egg-info/PKG-INFO +++ b/swh.search.egg-info/PKG-INFO @@ -1,52 +1,52 @@ Metadata-Version: 2.1 Name: swh.search -Version: 0.6.1 +Version: 0.7.0 Summary: Software Heritage search service Home-page: https://forge.softwareheritage.org/diffusion/DSEA Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-search Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-search/ Description: swh-search ========== Search service for the Software Heritage archive. It is similar to swh-storage in what it contains, but provides different ways to query it: while swh-storage is mostly a key-value store that returns an object from a primary key, swh-search is focused on reverse indices, to allow finding objects that match some criteria; for example full-text search. Currently uses ElasticSearch, and provides only origin search (by URL and metadata) # Dependencies Python tests for this module include tests that cannot be run without a local ElasticSearch instance, so you need the ElasticSearch server executable on your machine (no need to have a running ElasticSearch server). ## Debian-like host The elasticsearch package is required. As it's not part of debian-stable, [another debian repository is required to be configured](https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html#deb-repo) ## Non Debian-like host The tests expect: - `/usr/share/elasticsearch/jdk/bin/java` to exist. - `org.elasticsearch.bootstrap.Elasticsearch` to be in java's classpath. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.search.egg-info/SOURCES.txt b/swh.search.egg-info/SOURCES.txt index ac8c7c4..b61efaa 100644 --- a/swh.search.egg-info/SOURCES.txt +++ b/swh.search.egg-info/SOURCES.txt @@ -1,57 +1,57 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/cli.rst docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder es_config/elasticsearch.keystore es_config/elasticsearch.yml es_config/jvm.options es_config/log4j2.properties swh/__init__.py swh.search.egg-info/PKG-INFO swh.search.egg-info/SOURCES.txt swh.search.egg-info/dependency_links.txt swh.search.egg-info/entry_points.txt swh.search.egg-info/requires.txt swh.search.egg-info/top_level.txt swh/search/__init__.py swh/search/cli.py swh/search/elasticsearch.py swh/search/in_memory.py swh/search/interface.py swh/search/journal_client.py swh/search/metrics.py swh/search/py.typed swh/search/api/__init__.py swh/search/api/client.py swh/search/api/server.py -swh/search/api/wsgi.py swh/search/tests/__init__.py swh/search/tests/conftest.py swh/search/tests/test_api_client.py swh/search/tests/test_cli.py swh/search/tests/test_elasticsearch.py swh/search/tests/test_in_memory.py swh/search/tests/test_init.py swh/search/tests/test_journal_client.py -swh/search/tests/test_search.py \ No newline at end of file +swh/search/tests/test_search.py +swh/search/tests/test_server.py \ No newline at end of file diff --git a/swh.search.egg-info/requires.txt b/swh.search.egg-info/requires.txt index 061ae77..e5f5d2b 100644 --- a/swh.search.egg-info/requires.txt +++ b/swh.search.egg-info/requires.txt @@ -1,11 +1,12 @@ click elasticsearch<8.0.0,>=7.0.0 +typing-extensions swh.core[http]>=0.3.0 swh.indexer swh.journal>=0.1.0 swh.model [testing] pytest pytest-mock confluent-kafka diff --git a/swh/search/api/server.py b/swh/search/api/server.py index e0ab43f..112c903 100644 --- a/swh/search/api/server.py +++ b/swh/search/api/server.py @@ -1,90 +1,100 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os +from typing import Any, Dict from swh.core import config from swh.core.api import RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler from swh.search.metrics import timed from .. import get_search from ..interface import SearchInterface +logger = logging.getLogger(__name__) + def _get_search(): global search if not search: search = get_search(**app.config["search"]) return search app = RPCServerApp(__name__, backend_class=SearchInterface, backend_factory=_get_search) search = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.route("/") @timed def index(): return "SWH Search API server" +@app.before_first_request +def initialized_index(): + search = _get_search() + logger.info("Initializing indexes with configuration: ", search.origin_config) + search.initialize() + + api_cfg = None -def load_and_check_config(config_file, type="elasticsearch"): +def load_and_check_config(config_file: str) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: - config_file (str): Path to the configuration file to load - type (str): configuration type. For 'local' type, more + config_file: Path to the configuration file to load + type: configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config.read(config_file) if "search" not in cfg: raise KeyError("Missing 'search' configuration") return cfg def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_file = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_file) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app diff --git a/swh/search/api/wsgi.py b/swh/search/api/wsgi.py deleted file mode 100644 index 02c4901..0000000 --- a/swh/search/api/wsgi.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from .server import make_app_from_configfile - -application = make_app_from_configfile() diff --git a/swh/search/cli.py b/swh/search/cli.py index ae67b1c..1819446 100644 --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -1,130 +1,109 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group @swh_cli_group.group(name="search", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def search_cli_group(ctx, config_file): """Software Heritage Search tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @search_cli_group.command("initialize") @click.pass_context def initialize(ctx): """Creates Elasticsearch indices.""" from . import get_search search = get_search(**ctx.obj["config"]["search"]) search.initialize() print("Done.") @search_cli_group.group("journal-client") @click.pass_context def journal_client(ctx): """""" pass @journal_client.command("objects") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.option( "--object-type", "-o", multiple=True, help="Default list of object types to subscribe to", ) @click.option( "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.indexed)", ) @click.pass_context def journal_client_objects(ctx, stop_after_objects, object_type, prefix): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, origin and origin_visit) on these new objects. """ import functools from swh.journal.client import get_journal_client from . import get_search from .journal_client import process_journal_objects config = ctx.obj["config"] journal_cfg = config["journal"] journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", []) journal_cfg["prefix"] = prefix or journal_cfg.get("prefix") journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( "stop_after_objects" ) if len(journal_cfg["object_types"]) == 0: raise ValueError("'object_types' must be specified by cli or configuration") if journal_cfg["prefix"] is None: raise ValueError("'prefix' must be specified by cli or configuration") client = get_journal_client(cls="kafka", **journal_cfg,) search = get_search(**config["search"]) worker_fn = functools.partial(process_journal_objects, search=search,) nb_messages = 0 try: nb_messages = client.process(worker_fn) print("Processed %d messages." % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() - - -@search_cli_group.command("rpc-serve") -@click.argument("config-path", required=True) -@click.option("--host", default="0.0.0.0", help="Host to run the server") -@click.option("--port", default=5010, type=click.INT, help="Binding port of the server") -@click.option( - "--index-prefix", required=False, help="The prefix to add before the index names" -) -@click.option( - "--debug/--nodebug", - default=True, - help="Indicates if the server should run in debug mode", -) -def rpc_server(config_path, host, port, index_prefix, debug): - """Starts a Software Heritage Indexer RPC HTTP server.""" - from .api.server import app, load_and_check_config - - api_cfg = load_and_check_config(config_path, type="any") - app.config.update(api_cfg) - app.run(host, port=int(port), index_prefix=index_prefix, debug=bool(debug)) diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py index 24f74da..b90885d 100644 --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -1,282 +1,312 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from typing import Any, Dict, Iterable, Iterator, List, Optional from elasticsearch import Elasticsearch, helpers import msgpack from swh.indexer import codemeta from swh.model import model from swh.model.identifiers import origin_identifier -from swh.search.interface import PagedResult +from swh.search.interface import MinimalOriginDict, OriginDict, PagedResult from swh.search.metrics import send_metric, timed +INDEX_NAME_PARAM = "index" +READ_ALIAS_PARAM = "read_alias" +WRITE_ALIAS_PARAM = "write_alias" + +ORIGIN_DEFAULT_CONFIG = { + INDEX_NAME_PARAM: "origin", + READ_ALIAS_PARAM: "origin-read", + WRITE_ALIAS_PARAM: "origin-write", +} + def _sanitize_origin(origin): origin = origin.copy() # Whitelist fields to be saved in Elasticsearch res = {"url": origin.pop("url")} for field_name in ("intrinsic_metadata", "has_visits", "visit_types"): if field_name in origin: res[field_name] = origin.pop(field_name) # Run the JSON-LD expansion algorithm # # to normalize the Codemeta metadata. # This is required as Elasticsearch will needs each field to have a consistent # type across documents to be searchable; and non-expanded JSON-LD documents # can have various types in the same field. For example, all these are # equivalent in JSON-LD: # * {"author": "Jane Doe"} # * {"author": ["Jane Doe"]} # * {"author": {"@value": "Jane Doe"}} # * {"author": [{"@value": "Jane Doe"}]} # and JSON-LD expansion will convert them all to the last one. if "intrinsic_metadata" in res: res["intrinsic_metadata"] = codemeta.expand(res["intrinsic_metadata"]) return res def token_encode(index_to_tokenize: Dict[bytes, Any]) -> str: """Tokenize as string an index page result from a search """ page_token = base64.b64encode(msgpack.dumps(index_to_tokenize)) return page_token.decode() def token_decode(page_token: str) -> Dict[bytes, Any]: """Read the page_token """ return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) class ElasticSearch: - def __init__(self, hosts: List[str], index_prefix=None): + def __init__(self, hosts: List[str], indexes: Dict[str, Dict[str, str]]): self._backend = Elasticsearch(hosts=hosts) - self.index_prefix = index_prefix - self.origin_index = "origin" + # Merge current configuration with default values + origin_config = indexes.get("origin", {}) + self.origin_config = {**ORIGIN_DEFAULT_CONFIG, **origin_config} + + def _get_origin_index(self) -> str: + return self.origin_config[INDEX_NAME_PARAM] - if index_prefix: - self.origin_index = index_prefix + "_" + self.origin_index + def _get_origin_read_alias(self) -> str: + return self.origin_config[READ_ALIAS_PARAM] + + def _get_origin_write_alias(self) -> str: + return self.origin_config[WRITE_ALIAS_PARAM] @timed def check(self): return self._backend.ping() def deinitialize(self) -> None: """Removes all indices from the Elasticsearch backend""" self._backend.indices.delete(index="*") def initialize(self) -> None: - """Declare Elasticsearch indices and mappings""" - if not self._backend.indices.exists(index=self.origin_index): - self._backend.indices.create(index=self.origin_index) + """Declare Elasticsearch indices, aliases and mappings""" + + if not self._backend.indices.exists(index=self._get_origin_index()): + self._backend.indices.create(index=self._get_origin_index()) + + if not self._backend.indices.exists_alias(self._get_origin_read_alias()): + self._backend.indices.put_alias( + index=self._get_origin_index(), name=self._get_origin_read_alias() + ) + + if not self._backend.indices.exists_alias(self._get_origin_write_alias()): + self._backend.indices.put_alias( + index=self._get_origin_index(), name=self._get_origin_write_alias() + ) + self._backend.indices.put_mapping( - index=self.origin_index, + index=self._get_origin_index(), body={ "date_detection": False, "properties": { # sha1 of the URL; used as the document id "sha1": {"type": "keyword", "doc_values": True,}, # Used both to search URLs, and as the result to return # as a response to queries "url": { "type": "text", # To split URLs into token on any character # that is not alphanumerical "analyzer": "simple", # 2-gram and partial-3-gram search (ie. with the end of the # third word potentially missing) "fields": { "as_you_type": { "type": "search_as_you_type", "analyzer": "simple", } }, }, "visit_types": {"type": "keyword"}, # used to filter out origins that were never visited "has_visits": {"type": "boolean",}, "intrinsic_metadata": { "type": "nested", "properties": { "@context": { # don't bother indexing tokens in these URIs, as the # are used as namespaces "type": "keyword", } }, }, }, }, ) @timed def flush(self) -> None: - self._backend.indices.refresh(index=self.origin_index) + self._backend.indices.refresh(index=self._get_origin_write_alias()) @timed - def origin_update(self, documents: Iterable[Dict]) -> None: + def origin_update(self, documents: Iterable[OriginDict]) -> None: + write_index = self._get_origin_write_alias() documents = map(_sanitize_origin, documents) documents_with_sha1 = ( (origin_identifier(document), document) for document in documents ) # painless script that will be executed when updating an origin document update_script = """ // backup current visit_types field value List visit_types = ctx._source.getOrDefault("visit_types", []); // update origin document with new field values ctx._source.putAll(params); // restore previous visit types after visit_types field overriding if (ctx._source.containsKey("visit_types")) { for (int i = 0; i < visit_types.length; ++i) { if (!ctx._source.visit_types.contains(visit_types[i])) { ctx._source.visit_types.add(visit_types[i]); } } } """ actions = [ { "_op_type": "update", "_id": sha1, - "_index": self.origin_index, + "_index": write_index, "scripted_upsert": True, "upsert": {**document, "sha1": sha1,}, "script": { "source": update_script, "lang": "painless", "params": document, }, } for (sha1, document) in documents_with_sha1 ] - indexed_count, errors = helpers.bulk( - self._backend, actions, index=self.origin_index - ) + indexed_count, errors = helpers.bulk(self._backend, actions, index=write_index) assert isinstance(errors, List) # Make mypy happy send_metric("document:index", count=indexed_count, method_name="origin_update") send_metric( "document:index_error", count=len(errors), method_name="origin_update" ) def origin_dump(self) -> Iterator[model.Origin]: - results = helpers.scan(self._backend, index=self.origin_index) + results = helpers.scan(self._backend, index=self._get_origin_read_alias()) for hit in results: yield self._backend.termvectors( - index=self.origin_index, id=hit["_id"], fields=["*"] + index=self._get_origin_read_alias(), id=hit["_id"], fields=["*"] ) @timed def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, visit_types: Optional[List[str]] = None, page_token: Optional[str] = None, limit: int = 50, - ) -> PagedResult[Dict[str, Any]]: + ) -> PagedResult[MinimalOriginDict]: query_clauses: List[Dict[str, Any]] = [] if url_pattern: query_clauses.append( { "multi_match": { "query": url_pattern, "type": "bool_prefix", "operator": "and", "fields": [ "url.as_you_type", "url.as_you_type._2gram", "url.as_you_type._3gram", ], } } ) if metadata_pattern: query_clauses.append( { "nested": { "path": "intrinsic_metadata", "query": { "multi_match": { "query": metadata_pattern, # Makes it so that the "foo bar" query returns # documents which contain "foo" in a field and "bar" # in a different field "type": "cross_fields", # All keywords must be found in a document for it to # be considered a match. # TODO: allow missing keywords? "operator": "and", # Searches on all fields of the intrinsic_metadata dict, # recursively. "fields": ["intrinsic_metadata.*"], } }, } } ) if not query_clauses: raise ValueError( "At least one of url_pattern and metadata_pattern must be provided." ) if with_visit: query_clauses.append({"term": {"has_visits": True,}}) if visit_types is not None: query_clauses.append({"terms": {"visit_types": visit_types}}) body = { "query": {"bool": {"must": query_clauses,}}, "sort": [{"_score": "desc"}, {"sha1": "asc"},], } if page_token: # TODO: use ElasticSearch's scroll API? page_token_content = token_decode(page_token) body["search_after"] = [ page_token_content[b"score"], page_token_content[b"sha1"].decode("ascii"), ] - res = self._backend.search(index=self.origin_index, body=body, size=limit) + res = self._backend.search( + index=self._get_origin_read_alias(), body=body, size=limit + ) hits = res["hits"]["hits"] next_page_token: Optional[str] = None if len(hits) == limit: # There are more results after this page; return a pagination token # to get them in a future query last_hit = hits[-1] next_page_token_content = { b"score": last_hit["_score"], b"sha1": last_hit["_source"]["sha1"], } next_page_token = token_encode(next_page_token_content) assert len(hits) <= limit return PagedResult( results=[{"url": hit["_source"]["url"]} for hit in hits], next_page_token=next_page_token, ) diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py index c668a48..1c400c2 100644 --- a/swh/search/in_memory.py +++ b/swh/search/in_memory.py @@ -1,148 +1,150 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import itertools import re from typing import Any, Dict, Iterable, Iterator, List, Optional from swh.model.identifiers import origin_identifier -from swh.search.interface import PagedResult +from swh.search.interface import MinimalOriginDict, OriginDict, PagedResult _words_regexp = re.compile(r"\w+") def _dict_words_set(d): """Recursively extract set of words from dict content.""" values = set() def extract(obj, words): if isinstance(obj, dict): for k, v in obj.items(): extract(v, words) elif isinstance(obj, list): for item in obj: extract(item, words) else: words.update(_words_regexp.findall(str(obj).lower())) return words return extract(d, values) class InMemorySearch: def __init__(self): pass def check(self): return True def deinitialize(self) -> None: if hasattr(self, "_origins"): del self._origins del self._origin_ids def initialize(self) -> None: self._origins: Dict[str, Dict[str, Any]] = defaultdict(dict) self._origin_ids: List[str] = [] def flush(self) -> None: pass _url_splitter = re.compile(r"\W") - def origin_update(self, documents: Iterable[Dict]) -> None: - for document in documents: - document = document.copy() + def origin_update(self, documents: Iterable[OriginDict]) -> None: + for source_document in documents: + document: Dict[str, Any] = dict(source_document) id_ = origin_identifier(document) if "url" in document: - document["_url_tokens"] = set(self._url_splitter.split(document["url"])) + document["_url_tokens"] = set( + self._url_splitter.split(source_document["url"]) + ) if "visit_types" in document: - document["visit_types"] = set(document["visit_types"]) + document["visit_types"] = set(source_document["visit_types"]) if "visit_types" in self._origins[id_]: document["visit_types"].update(self._origins[id_]["visit_types"]) self._origins[id_].update(document) if id_ not in self._origin_ids: self._origin_ids.append(id_) def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, visit_types: Optional[List[str]] = None, page_token: Optional[str] = None, limit: int = 50, - ) -> PagedResult[Dict[str, Any]]: + ) -> PagedResult[MinimalOriginDict]: hits: Iterator[Dict[str, Any]] = ( self._origins[id_] for id_ in self._origin_ids ) if url_pattern: tokens = set(self._url_splitter.split(url_pattern)) def predicate(match): missing_tokens = tokens - match["_url_tokens"] if len(missing_tokens) == 0: return True elif len(missing_tokens) > 1: return False else: # There is one missing token, look up by prefix. (missing_token,) = missing_tokens return any( token.startswith(missing_token) for token in match["_url_tokens"] ) hits = filter(predicate, hits) if metadata_pattern: metadata_pattern_words = set( _words_regexp.findall(metadata_pattern.lower()) ) def predicate(match): if "intrinsic_metadata" not in match: return False return metadata_pattern_words.issubset( _dict_words_set(match["intrinsic_metadata"]) ) hits = filter(predicate, hits) if not url_pattern and not metadata_pattern: raise ValueError( "At least one of url_pattern and metadata_pattern must be provided." ) next_page_token: Optional[str] = None if with_visit: hits = filter(lambda o: o.get("has_visits"), hits) if visit_types is not None: visit_types_set = set(visit_types) hits = filter( lambda o: visit_types_set.intersection(o.get("visit_types", set())), hits, ) start_at_index = int(page_token) if page_token else 0 origins = [ {"url": hit["url"]} for hit in itertools.islice(hits, start_at_index, start_at_index + limit) ] if len(origins) == limit: next_page_token = str(start_at_index + limit) assert len(origins) <= limit return PagedResult(results=origins, next_page_token=next_page_token,) diff --git a/swh/search/interface.py b/swh/search/interface.py index b0c3e82..86dbb07 100644 --- a/swh/search/interface.py +++ b/swh/search/interface.py @@ -1,65 +1,80 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, List, Optional, TypeVar +from typing import Iterable, List, Optional, TypeVar + +from typing_extensions import TypedDict from swh.core.api import remote_api_endpoint from swh.core.api.classes import PagedResult as CorePagedResult TResult = TypeVar("TResult") PagedResult = CorePagedResult[TResult, str] +class MinimalOriginDict(TypedDict): + """Mandatory keys of an :cls:`OriginDict`""" + + url: str + + +class OriginDict(MinimalOriginDict, total=False): + """Argument passed to :meth:`SearchInterface.origin_update`.""" + + visit_types: List[str] + has_visits: bool + + class SearchInterface: @remote_api_endpoint("check") def check(self): """Dedicated method to execute some specific check per implementation. """ ... @remote_api_endpoint("flush") def flush(self) -> None: """Blocks until all previous calls to _update() are completely applied. """ ... @remote_api_endpoint("origin/update") - def origin_update(self, documents: Iterable[Dict]) -> None: + def origin_update(self, documents: Iterable[OriginDict]) -> None: """Persist documents to the search backend. """ ... @remote_api_endpoint("origin/search") def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, visit_types: Optional[List[str]] = None, page_token: Optional[str] = None, limit: int = 50, - ) -> PagedResult[Dict[str, Any]]: + ) -> PagedResult[MinimalOriginDict]: """Searches for origins matching the `url_pattern`. Args: url_pattern: Part of the URL to search for with_visit: Whether origins with no visit are to be filtered out visit_types: Only origins having any of the provided visit types (e.g. git, svn, pypi) will be returned page_token: Opaque value used for pagination limit: number of results to return Returns: PagedResult of origin dicts matching the search criteria. If next_page_token is None, there is no longer data to retrieve. """ ... diff --git a/swh/search/tests/conftest.py b/swh/search/tests/conftest.py index 084c1b2..62de0a2 100644 --- a/swh/search/tests/conftest.py +++ b/swh/search/tests/conftest.py @@ -1,131 +1,139 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import socket import subprocess import time import elasticsearch import pytest from swh.search import get_search logger = logging.getLogger(__name__) def free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 0)) port = sock.getsockname()[1] sock.close() return port def wait_for_peer(addr, port): while True: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((addr, port)) except ConnectionRefusedError: time.sleep(0.1) else: sock.close() break CONFIG_TEMPLATE = """ node.name: node-1 path.data: {data} path.logs: {logs} network.host: 127.0.0.1 http.port: {http_port} transport.port: {transport_port} """ def _run_elasticsearch(conf_dir, data_dir, logs_dir, http_port, transport_port): es_home = "/usr/share/elasticsearch" with open(conf_dir + "/elasticsearch.yml", "w") as fd: fd.write( CONFIG_TEMPLATE.format( data=data_dir, logs=logs_dir, http_port=http_port, transport_port=transport_port, ) ) with open(conf_dir + "/log4j2.properties", "w") as fd: pass cmd = [ "/usr/share/elasticsearch/jdk/bin/java", "-Des.path.home={}".format(es_home), "-Des.path.conf={}".format(conf_dir), "-Des.bundled_jdk=true", "-Dlog4j2.disable.jmx=true", "-cp", "{}/lib/*".format(es_home), "org.elasticsearch.bootstrap.Elasticsearch", ] host = "127.0.0.1:{}".format(http_port) with open(logs_dir + "/output.txt", "w") as fd: p = subprocess.Popen(cmd) wait_for_peer("127.0.0.1", http_port) client = elasticsearch.Elasticsearch([host]) assert client.ping() return p @pytest.fixture(scope="session") def elasticsearch_session(tmpdir_factory): tmpdir = tmpdir_factory.mktemp("elasticsearch") es_conf = tmpdir.mkdir("conf") http_port = free_port() transport_port = free_port() p = _run_elasticsearch( conf_dir=str(es_conf), data_dir=str(tmpdir.mkdir("data")), logs_dir=str(tmpdir.mkdir("logs")), http_port=http_port, transport_port=transport_port, ) yield "127.0.0.1:{}".format(http_port) # Check ES didn't stop assert p.returncode is None, p.returncode p.kill() p.wait() @pytest.fixture(scope="class") def elasticsearch_host(elasticsearch_session): yield elasticsearch_session @pytest.fixture def swh_search(elasticsearch_host): """Instantiate a search client, initialize the elasticsearch instance, and returns it """ logger.debug("swh_search: elasticsearch_host: %s", elasticsearch_host) search = get_search( - "elasticsearch", hosts=[elasticsearch_host], index_prefix="test" + "elasticsearch", + hosts=[elasticsearch_host], + indexes={ + "origin": { + "index": "test", + "read_alias": "test-read", + "write_alias": "test-write", + } + }, ) search.deinitialize() # To reset internal state from previous runs search.initialize() # install required index yield search diff --git a/swh/search/tests/test_api_client.py b/swh/search/tests/test_api_client.py index 25a8b90..b33fe0e 100644 --- a/swh/search/tests/test_api_client.py +++ b/swh/search/tests/test_api_client.py @@ -1,45 +1,62 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import pytest from swh.core.api.tests.server_testing import ServerTestFixture from swh.search import get_search from swh.search.api.server import app from .test_search import CommonSearchTest class TestRemoteSearch(CommonSearchTest, ServerTestFixture, unittest.TestCase): @pytest.fixture(autouse=True) def _instantiate_search(self, elasticsearch_host): self._elasticsearch_host = elasticsearch_host def setUp(self): self.config = { "search": { "cls": "elasticsearch", - "args": {"hosts": [self._elasticsearch_host], "index_prefix": "test"}, + "args": { + "hosts": [self._elasticsearch_host], + "indexes": { + "origin": { + "index": "test", + "read_alias": "test-read", + "write_alias": "test-write", + } + }, + }, } } self.app = app super().setUp() self.reset() self.search = get_search("remote", url=self.url(),) def reset(self): search = get_search( - "elasticsearch", hosts=[self._elasticsearch_host], index_prefix="test" + "elasticsearch", + hosts=[self._elasticsearch_host], + indexes={ + "origin": { + "index": "test", + "read_alias": "test-read", + "write_alias": "test-write", + } + }, ) search.deinitialize() search.initialize() @pytest.mark.skip( "Elasticsearch also returns close matches, so this test would fail" ) def test_origin_url_paging(self, count): pass diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py index 4e03d06..1ef5787 100644 --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -1,410 +1,440 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import tempfile from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.journal.serializers import value_to_kafka from swh.model.hashutil import hash_to_bytes from swh.search import get_search from swh.search.cli import search_cli_group CLI_CONFIG = """ search: cls: elasticsearch hosts: - '%(elasticsearch_host)s' - index_prefix: test + indexes: + origin: + index: test + read_alias: test-read + write_alias: test-write """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - {broker} prefix: {prefix} group_id: {group_id} """ def invoke(catch_exceptions, args, config="", *, elasticsearch_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write( (CLI_CONFIG + config) % {"elasticsearch_host": elasticsearch_host} ) config_fd.seek(0) result = runner.invoke(search_cli_group, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test__journal_client__origin( swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin producer", "acks": "all", } ) origin_foobar_baz = { "url": "http://foobar.baz", } value = value_to_kafka(origin_foobar_baz) topic = f"{kafka_prefix}.origin" producer.produce(topic=topic, key=b"bogus-origin", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "objects", "--stop-after-objects", "1", "--object-type", "origin", "--prefix", kafka_prefix, ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # searching origin without visit as requirement actual_page = swh_search.origin_search(url_pattern="foobar") # We find it assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar_baz] # It's an origin with no visit, searching for it with visit actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) # returns nothing assert actual_page.next_page_token is None assert actual_page.results == [] def test__journal_client__origin_visit( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" origin_foobar = {"url": "http://baz.foobar"} producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin visit producer", "acks": "all", } ) topic = f"{kafka_prefix}.origin_visit" value = value_to_kafka({"origin": origin_foobar["url"], "type": "git"}) producer.produce(topic=topic, key=b"bogus-origin-visit", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "objects", "--stop-after-objects", "1", "--object-type", "origin_visit", ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=False) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] # Not considered visited unless the visit is full actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [] def test__journal_client__origin_visit_status( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): """Subscribing to origin-visit-status should result in swh-search indexation """ origin_foobar = {"url": "http://baz.foobar"} producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin visit status producer", "acks": "all", } ) topic = f"{kafka_prefix}.origin_visit_status" value = value_to_kafka( { "origin": origin_foobar["url"], "visit": 1, "snapshot": None, "status": "full", } ) producer.produce(topic=topic, key=b"bogus-origin-visit-status", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "objects", "--stop-after-objects", "1", "--prefix", kafka_prefix, "--object-type", "origin_visit_status", ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # Both search returns the visit actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=False) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] def test__journal_client__origin_intrinsic_metadata( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): """Subscribing to origin-intrinsic-metadata should result in swh-search indexation """ origin_foobar = {"url": "https://github.com/clojure/clojure"} origin_intrinsic_metadata = { "id": origin_foobar["url"], "metadata": { "name": "clojure", "type": "SoftwareSourceCode", "license": "http://opensource.org/licenses/eclipse-1.0.php", "version": "1.10.2-master-SNAPSHOT", "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "identifier": "org.clojure", "description": "Clojure core environment and runtime library.", "codeRepository": "https://repo.maven.apache.org/maven2/org/clojure/clojure", # noqa }, "indexer_configuration_id": 1, "from_revision": hash_to_bytes("f47c139e20970ee0852166f48ee2a4626632b86e"), "mappings": ["maven"], } producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin intrinsic metadata producer", "acks": "all", } ) topic = f"{kafka_prefix}.origin_intrinsic_metadata" value = value_to_kafka(origin_intrinsic_metadata) producer.produce(topic=topic, key=b"bogus-origin-intrinsic-metadata", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "objects", "--stop-after-objects", "1", "--object-type", "origin_intrinsic_metadata", ], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # search without visit returns the metadata actual_page = swh_search.origin_search(url_pattern="clojure", with_visit=False) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] # no visit associated so it does not return anything actual_page = swh_search.origin_search(url_pattern="clojure", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [] def test__journal_client__missing_main_journal_config_key(elasticsearch_host): """Missing configuration on journal should raise""" with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, args=["journal-client", "objects", "--stop-after-objects", "1",], config="", # missing config will make it raise elasticsearch_host=elasticsearch_host, ) def test__journal_client__missing_journal_config_keys(elasticsearch_host): """Missing configuration on mandatory journal keys should raise""" kafka_prefix = "swh.journal.objects" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer" ) journal_config = yaml.safe_load(journal_objects_config) for key in journal_config["journal"].keys(): if key == "prefix": # optional continue cfg = copy.deepcopy(journal_config) del cfg["journal"][key] # make config incomplete yaml_cfg = yaml.dump(cfg) with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=[ "journal-client", "objects", "--stop-after-objects", "1", "--prefix", kafka_prefix, "--object-type", "origin_visit_status", ], config=yaml_cfg, # incomplete config will make the cli raise elasticsearch_host=elasticsearch_host, ) def test__journal_client__missing_prefix_config_key( swh_search, elasticsearch_host, kafka_server ): """Missing configuration on mandatory prefix key should raise""" journal_cfg_template = """ journal: brokers: - {broker} group_id: {group_id} """ journal_cfg = journal_cfg_template.format( broker=kafka_server, group_id="test-consumer" ) with pytest.raises(ValueError, match="prefix"): invoke( False, # Missing --prefix (and no config key) will make the cli raise [ "journal-client", "objects", "--stop-after-objects", "1", "--object-type", "origin_visit_status", ], journal_cfg, elasticsearch_host=elasticsearch_host, ) def test__journal_client__missing_object_types_config_key( swh_search, elasticsearch_host, kafka_server ): """Missing configuration on mandatory object-types key should raise""" journal_cfg_template = """ journal: brokers: - {broker} prefix: swh.journal.objects group_id: {group_id} """ journal_cfg = journal_cfg_template.format( broker=kafka_server, group_id="test-consumer" ) with pytest.raises(ValueError, match="object_types"): invoke( False, # Missing --object-types (and no config key) will make the cli raise ["journal-client", "objects", "--stop-after-objects", "1"], journal_cfg, elasticsearch_host=elasticsearch_host, ) -def test__initialize__with_prefix(elasticsearch_host): - """Initializing the index with a prefix should create an _origin index""" +def test__initialize__with_index_name(elasticsearch_host): + """Initializing the index with an index name should create the right index""" search = get_search( - "elasticsearch", hosts=[elasticsearch_host], index_prefix="test" + "elasticsearch", + hosts=[elasticsearch_host], + indexes={"origin": {"index": "test"}}, ) - assert search.origin_index == "test_origin" + assert search._get_origin_index() == "test" + assert search._get_origin_read_alias() == "origin-read" + assert search._get_origin_write_alias() == "origin-write" -def test__initialize__without_prefix(elasticsearch_host): - """Initializing the index without a prefix should create an origin index""" +def test__initialize__with_read_alias(elasticsearch_host): + """Initializing the index with a search alias name should create + the right search alias""" - search = get_search("elasticsearch", hosts=[elasticsearch_host]) + search = get_search( + "elasticsearch", + hosts=[elasticsearch_host], + indexes={"origin": {"read_alias": "test"}}, + ) + + assert search._get_origin_index() == "origin" + assert search._get_origin_read_alias() == "test" + assert search._get_origin_write_alias() == "origin-write" + + +def test__initialize__with_write_alias(elasticsearch_host): + """Initializing the index with an indexing alias name should create + the right indexing alias""" + + search = get_search( + "elasticsearch", + hosts=[elasticsearch_host], + indexes={"origin": {"write_alias": "test"}}, + ) - assert search.origin_index == "origin" + assert search._get_origin_index() == "origin" + assert search._get_origin_read_alias() == "origin-read" + assert search._get_origin_write_alias() == "test" diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py index 1567dd6..90dc1f1 100644 --- a/swh/search/tests/test_elasticsearch.py +++ b/swh/search/tests/test_elasticsearch.py @@ -1,71 +1,87 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import pytest from swh.search.metrics import OPERATIONS_METRIC from .test_search import CommonSearchTest class BaseElasticsearchTest(unittest.TestCase): @pytest.fixture(autouse=True) def _instantiate_search(self, swh_search, elasticsearch_host, mocker): self._elasticsearch_host = elasticsearch_host self.search = swh_search self.mocker = mocker def reset(self): self.search.deinitialize() self.search.initialize() class TestElasticsearchSearch(CommonSearchTest, BaseElasticsearchTest): def test_metrics_update_duration(self): mock = self.mocker.patch("swh.search.metrics.statsd.timing") for url in ["http://foobar.bar", "http://foobar.baz"]: self.search.origin_update([{"url": url}]) assert mock.call_count == 2 def test_metrics_search_duration(self): mock = self.mocker.patch("swh.search.metrics.statsd.timing") for url_pattern in ["foobar", "foobaz"]: self.search.origin_search(url_pattern=url_pattern, with_visit=True) assert mock.call_count == 2 def test_metrics_indexation_counters(self): mock_es = self.mocker.patch("elasticsearch.helpers.bulk") mock_es.return_value = 2, ["error"] mock_metrics = self.mocker.patch("swh.search.metrics.statsd.increment") self.search.origin_update([{"url": "http://foobar.baz"}]) assert mock_metrics.call_count == 2 mock_metrics.assert_any_call( OPERATIONS_METRIC, 2, tags={ "endpoint": "origin_update", "object_type": "document", "operation": "index", }, ) mock_metrics.assert_any_call( OPERATIONS_METRIC, 1, tags={ "endpoint": "origin_update", "object_type": "document", "operation": "index_error", }, ) + + def test_write_alias_usage(self): + mock = self.mocker.patch("elasticsearch.helpers.bulk") + mock.return_value = 2, ["result"] + + self.search.origin_update([{"url": "http://foobar.baz"}]) + + assert mock.call_args[1]["index"] == "test-write" + + def test_read_alias_usage(self): + mock = self.mocker.patch("elasticsearch.Elasticsearch.search") + mock.return_value = {"hits": {"hits": []}} + + self.search.origin_search(url_pattern="foobar.baz") + + assert mock.call_args[1]["index"] == "test-read" diff --git a/swh/search/tests/test_init.py b/swh/search/tests/test_init.py index 153309c..5de2513 100644 --- a/swh/search/tests/test_init.py +++ b/swh/search/tests/test_init.py @@ -1,84 +1,88 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import inspect import pytest from swh.search import get_search from swh.search.api.client import RemoteSearch from swh.search.elasticsearch import ElasticSearch from swh.search.in_memory import InMemorySearch from swh.search.interface import SearchInterface SEARCH_IMPLEMENTATIONS_KWARGS = [ ("remote", RemoteSearch, {"url": "localhost"}), - ("elasticsearch", ElasticSearch, {"hosts": ["localhost"], "index_prefix": "test"}), + ( + "elasticsearch", + ElasticSearch, + {"hosts": ["localhost"], "indexes": {"origin": {"index": "test"}}}, + ), ] SEARCH_IMPLEMENTATIONS = SEARCH_IMPLEMENTATIONS_KWARGS + [ ("memory", InMemorySearch, None), ] def test_get_search_failure(): with pytest.raises(ValueError, match="Unknown search class"): get_search("unknown-search") @pytest.mark.parametrize("class_,expected_class,kwargs", SEARCH_IMPLEMENTATIONS) def test_get_search(mocker, class_, expected_class, kwargs): mocker.patch("swh.search.elasticsearch.Elasticsearch") if kwargs: concrete_search = get_search(class_, **kwargs) else: concrete_search = get_search(class_) assert isinstance(concrete_search, expected_class) @pytest.mark.parametrize("class_,expected_class,kwargs", SEARCH_IMPLEMENTATIONS_KWARGS) def test_get_search_deprecation_warning(mocker, class_, expected_class, kwargs): with pytest.warns(DeprecationWarning): concrete_search = get_search(class_, args=kwargs) assert isinstance(concrete_search, expected_class) @pytest.mark.parametrize("class_,expected_class,kwargs", SEARCH_IMPLEMENTATIONS) def test_types(mocker, class_, expected_class, kwargs): """Checks all methods of SearchInterface are implemented by this backend, and that they have the same signature. """ mocker.patch("swh.search.elasticsearch.Elasticsearch") if kwargs: concrete_search = get_search(class_, **kwargs) else: concrete_search = get_search(class_) # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (SearchInterface,), {})() for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) missing_methods = [] try: concrete_meth = getattr(concrete_search, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] diff --git a/swh/search/tests/test_server.py b/swh/search/tests/test_server.py new file mode 100644 index 0000000..15b1b65 --- /dev/null +++ b/swh/search/tests/test_server.py @@ -0,0 +1,156 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +from typing import Any, Dict + +import pytest +import yaml + +from swh.core.api import RPCServerApp +from swh.core.config import load_from_envvar +from swh.search.api import server +from swh.search.api.server import load_and_check_config, make_app_from_configfile + + +def teardown_function(): + # Ensure there is no configuration loaded from a previous test + server.api_cfg = None + + +def _write_config_file(tmp_path, monkeypatch, content): + conf_path = os.path.join(str(tmp_path), "search.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(content)) + monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) + return conf_path + + +@pytest.fixture +def swh_search_server_config_without_indexes() -> Dict[str, Any]: + return {"search": {"cls": "elasticsearch", "hosts": ["es1"],}} + + +@pytest.fixture +def swh_search_server_config_with_indexes( + swh_search_server_config_without_indexes, +) -> Dict[str, Any]: + return { + "search": { + **{"indexes": {"origin": {"index": "test"}}}, + **swh_search_server_config_without_indexes["search"], + } + } + + +@pytest.fixture +def swh_search_config_without_indexes( + monkeypatch, swh_search_server_config_without_indexes, tmp_path +): + return _write_config_file( + tmp_path, monkeypatch, swh_search_server_config_without_indexes + ) + + +@pytest.fixture +def swh_search_config_with_indexes( + monkeypatch, swh_search_server_config_with_indexes, tmp_path +): + return _write_config_file( + tmp_path, monkeypatch, swh_search_server_config_with_indexes + ) + + +def prepare_config_file(tmpdir, config_dict: Dict, name: str = "config.yml") -> str: + """Prepare configuration file in `$tmpdir/name` with content `content`. + + Args: + tmpdir (LocalPath): root directory + content: Content of the file either as string or as a dict. + If a dict, converts the dict into a yaml string. + name: configuration filename + + Returns + path of the configuration file prepared. + + """ + config_path = tmpdir / name + config_path.write_text(yaml.dump(config_dict), encoding="utf-8") + # pytest on python3.5 does not support LocalPath manipulation, so + # convert path to string + return str(config_path) + + +@pytest.mark.parametrize("config_file", [None, ""]) +def test_load_and_check_config_no_configuration(config_file): + """Inexistent configuration files raises""" + with pytest.raises(EnvironmentError, match="Configuration file must be defined"): + load_and_check_config(config_file) + + +def test_load_and_check_config_inexistent_file(): + config_path = "/some/inexistent/config.yml" + expected_error = f"Configuration file {config_path} does not exist" + with pytest.raises(EnvironmentError, match=expected_error): + load_and_check_config(config_path) + + +def test_load_and_check_config_wrong_configuration(tmpdir): + """Wrong configuration raises""" + config_path = prepare_config_file(tmpdir, {"something": "useless"}) + with pytest.raises(KeyError, match="Missing 'search' configuration"): + load_and_check_config(config_path) + + +def test_load_and_check_config_local_config_fine( + swh_search_server_config_with_indexes, tmpdir +): + """'local' complete configuration is fine""" + config_path = prepare_config_file(tmpdir, swh_search_server_config_with_indexes) + cfg = load_and_check_config(config_path) + assert cfg == swh_search_server_config_with_indexes + + +def test_server_make_app_from_config_file_without_indexes( + swh_search_config_without_indexes, +): + app = make_app_from_configfile() + expected_cfg = load_from_envvar() + + assert app is not None + assert isinstance(app, RPCServerApp) + assert app.config["search"] == expected_cfg["search"] + + app2 = make_app_from_configfile() + assert app is app2 + + +def test_server_make_app_from_config_file_with_indexes(swh_search_config_with_indexes,): + app = make_app_from_configfile() + expected_cfg = load_from_envvar() + assert app is not None + assert isinstance(app, RPCServerApp) + assert app.config["search"] == expected_cfg["search"] + + app2 = make_app_from_configfile() + assert app is app2 + + +def test_server_first_call_initialize_elasticsearch( + swh_search_config_with_indexes, mocker +): + """Test the initialize method is called during the first and first only + request to the server""" + mock = mocker.patch("swh.search.elasticsearch.ElasticSearch.initialize") + + app = make_app_from_configfile() + app.config["TESTING"] = True + tc = app.test_client() + + tc.get("/") + assert mock.call_count == 1 + + tc.get("/") + assert mock.call_count == 1