diff --git a/swh/search/api/server.py b/swh/search/api/server.py index 29546ef..e0ab43f 100644 --- a/swh/search/api/server.py +++ b/swh/search/api/server.py @@ -1,88 +1,90 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from swh.core import config from swh.core.api import RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler +from swh.search.metrics import timed from .. import get_search from ..interface import SearchInterface def _get_search(): global search if not search: search = get_search(**app.config["search"]) return search app = RPCServerApp(__name__, backend_class=SearchInterface, backend_factory=_get_search) search = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.route("/") +@timed def index(): return "SWH Search API server" api_cfg = None def load_and_check_config(config_file, type="elasticsearch"): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config.read(config_file) if "search" not in cfg: raise KeyError("Missing 'search' configuration") return cfg def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_file = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_file) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py index fc0aa4b..d4ca21f 100644 --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -1,240 +1,253 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from typing import Any, Dict, Iterable, Iterator, List, Optional -from elasticsearch import Elasticsearch -from elasticsearch.helpers import bulk, scan +from elasticsearch import Elasticsearch, helpers import msgpack from swh.indexer import codemeta from swh.model import model from swh.model.identifiers import origin_identifier from swh.search.interface import PagedResult +from swh.search.metrics import send_metric, timed def _sanitize_origin(origin): origin = origin.copy() # Whitelist fields to be saved in Elasticsearch res = {"url": origin.pop("url")} for field_name in ("intrinsic_metadata", "has_visits"): if field_name in origin: res[field_name] = origin.pop(field_name) # Run the JSON-LD expansion algorithm # # to normalize the Codemeta metadata. # This is required as Elasticsearch will needs each field to have a consistent # type across documents to be searchable; and non-expanded JSON-LD documents # can have various types in the same field. For example, all these are # equivalent in JSON-LD: # * {"author": "Jane Doe"} # * {"author": ["Jane Doe"]} # * {"author": {"@value": "Jane Doe"}} # * {"author": [{"@value": "Jane Doe"}]} # and JSON-LD expansion will convert them all to the last one. if "intrinsic_metadata" in res: res["intrinsic_metadata"] = codemeta.expand(res["intrinsic_metadata"]) return res def token_encode(index_to_tokenize: Dict[bytes, Any]) -> str: """Tokenize as string an index page result from a search """ page_token = base64.b64encode(msgpack.dumps(index_to_tokenize)) return page_token.decode() def token_decode(page_token: str) -> Dict[bytes, Any]: """Read the page_token """ return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) class ElasticSearch: def __init__(self, hosts: List[str], index_prefix=None): self._backend = Elasticsearch(hosts=hosts) self.index_prefix = index_prefix self.origin_index = "origin" if index_prefix: self.origin_index = index_prefix + "_" + self.origin_index + @timed def check(self): return self._backend.ping() def deinitialize(self) -> None: """Removes all indices from the Elasticsearch backend""" self._backend.indices.delete(index="*") def initialize(self) -> None: """Declare Elasticsearch indices and mappings""" if not self._backend.indices.exists(index=self.origin_index): self._backend.indices.create(index=self.origin_index) self._backend.indices.put_mapping( index=self.origin_index, body={ "properties": { # sha1 of the URL; used as the document id "sha1": {"type": "keyword", "doc_values": True,}, # Used both to search URLs, and as the result to return # as a response to queries "url": { "type": "text", # To split URLs into token on any character # that is not alphanumerical "analyzer": "simple", # 2-gram and partial-3-gram search (ie. with the end of the # third word potentially missing) "fields": { "as_you_type": { "type": "search_as_you_type", "analyzer": "simple", } }, }, # used to filter out origins that were never visited "has_visits": {"type": "boolean",}, "intrinsic_metadata": { "type": "nested", "properties": { "@context": { # don't bother indexing tokens in these URIs, as the # are used as namespaces "type": "keyword", } }, }, } }, ) + @timed def flush(self) -> None: self._backend.indices.refresh(index=self.origin_index) + @timed def origin_update(self, documents: Iterable[Dict]) -> None: documents = map(_sanitize_origin, documents) documents_with_sha1 = ( (origin_identifier(document), document) for document in documents ) actions = [ { "_op_type": "update", "_id": sha1, "_index": self.origin_index, "doc": {**document, "sha1": sha1,}, "doc_as_upsert": True, } for (sha1, document) in documents_with_sha1 ] - bulk(self._backend, actions, index=self.origin_index) + + indexed_count, errors = helpers.bulk( + self._backend, actions, index=self.origin_index + ) + assert isinstance(errors, List) # Make mypy happy + + send_metric("document:index", count=indexed_count, method_name="origin_update") + send_metric( + "document:index_error", count=len(errors), method_name="origin_update" + ) def origin_dump(self) -> Iterator[model.Origin]: - results = scan(self._backend, index=self.origin_index) + results = helpers.scan(self._backend, index=self.origin_index) for hit in results: yield self._backend.termvectors( index=self.origin_index, id=hit["_id"], fields=["*"] ) + @timed def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, page_token: Optional[str] = None, limit: int = 50, ) -> PagedResult[Dict[str, Any]]: query_clauses: List[Dict[str, Any]] = [] if url_pattern: query_clauses.append( { "multi_match": { "query": url_pattern, "type": "bool_prefix", "operator": "and", "fields": [ "url.as_you_type", "url.as_you_type._2gram", "url.as_you_type._3gram", ], } } ) if metadata_pattern: query_clauses.append( { "nested": { "path": "intrinsic_metadata", "query": { "multi_match": { "query": metadata_pattern, # Makes it so that the "foo bar" query returns # documents which contain "foo" in a field and "bar" # in a different field "type": "cross_fields", # All keywords must be found in a document for it to # be considered a match. # TODO: allow missing keywords? "operator": "and", # Searches on all fields of the intrinsic_metadata dict, # recursively. "fields": ["intrinsic_metadata.*"], } }, } } ) if not query_clauses: raise ValueError( "At least one of url_pattern and metadata_pattern must be provided." ) if with_visit: query_clauses.append({"term": {"has_visits": True,}}) body = { "query": {"bool": {"must": query_clauses,}}, "sort": [{"_score": "desc"}, {"sha1": "asc"},], } if page_token: # TODO: use ElasticSearch's scroll API? page_token_content = token_decode(page_token) body["search_after"] = [ page_token_content[b"score"], page_token_content[b"sha1"].decode("ascii"), ] res = self._backend.search(index=self.origin_index, body=body, size=limit) hits = res["hits"]["hits"] next_page_token: Optional[str] = None if len(hits) == limit: # There are more results after this page; return a pagination token # to get them in a future query last_hit = hits[-1] next_page_token_content = { b"score": last_hit["_score"], b"sha1": last_hit["_source"]["sha1"], } next_page_token = token_encode(next_page_token_content) assert len(hits) <= limit return PagedResult( results=[{"url": hit["_source"]["url"]} for hit in hits], next_page_token=next_page_token, ) diff --git a/swh/search/metrics.py b/swh/search/metrics.py new file mode 100644 index 0000000..f8afee0 --- /dev/null +++ b/swh/search/metrics.py @@ -0,0 +1,63 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps +import logging + +from swh.core.statsd import statsd + +OPERATIONS_METRIC = "swh_search_operations_total" +DURATION_METRIC = "swh_search_request_duration_seconds" + + +def timed(f): + """Time that function! + + """ + + @wraps(f) + def d(*a, **kw): + with statsd.timed(DURATION_METRIC, tags={"endpoint": f.__name__}): + return f(*a, **kw) + + return d + + +def send_metric(metric: str, count: int, method_name: str) -> bool: + """Send statsd metric with count for method `method_name` + + If count is 0, the metric is discarded. If the metric is not + parseable, the metric is discarded with a log message. + + Args: + metric: Metric's name (e.g content:add, content:add:bytes) + count: Associated value for the metric + method_name: Method's name + + Returns: + Bool to explicit if metric has been set or not + """ + if count == 0: + return False + + metric_type = metric.split(":") + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = OPERATIONS_METRIC + else: + logging.warning("Skipping unknown metric {%s: %s}" % (metric, count)) + return False + + statsd.increment( + metric_name, + count, + tags={ + "endpoint": method_name, + "object_type": object_type, + "operation": operation, + }, + ) + return True diff --git a/swh/search/tests/test_elasticsearch.py b/swh/search/tests/test_elasticsearch.py index 7fc24ed..1567dd6 100644 --- a/swh/search/tests/test_elasticsearch.py +++ b/swh/search/tests/test_elasticsearch.py @@ -1,25 +1,71 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import pytest +from swh.search.metrics import OPERATIONS_METRIC + from .test_search import CommonSearchTest class BaseElasticsearchTest(unittest.TestCase): @pytest.fixture(autouse=True) - def _instantiate_search(self, swh_search, elasticsearch_host): + def _instantiate_search(self, swh_search, elasticsearch_host, mocker): self._elasticsearch_host = elasticsearch_host self.search = swh_search + self.mocker = mocker def reset(self): self.search.deinitialize() self.search.initialize() class TestElasticsearchSearch(CommonSearchTest, BaseElasticsearchTest): - pass + def test_metrics_update_duration(self): + mock = self.mocker.patch("swh.search.metrics.statsd.timing") + + for url in ["http://foobar.bar", "http://foobar.baz"]: + self.search.origin_update([{"url": url}]) + + assert mock.call_count == 2 + + def test_metrics_search_duration(self): + mock = self.mocker.patch("swh.search.metrics.statsd.timing") + + for url_pattern in ["foobar", "foobaz"]: + self.search.origin_search(url_pattern=url_pattern, with_visit=True) + + assert mock.call_count == 2 + + def test_metrics_indexation_counters(self): + mock_es = self.mocker.patch("elasticsearch.helpers.bulk") + mock_es.return_value = 2, ["error"] + + mock_metrics = self.mocker.patch("swh.search.metrics.statsd.increment") + + self.search.origin_update([{"url": "http://foobar.baz"}]) + + assert mock_metrics.call_count == 2 + + mock_metrics.assert_any_call( + OPERATIONS_METRIC, + 2, + tags={ + "endpoint": "origin_update", + "object_type": "document", + "operation": "index", + }, + ) + mock_metrics.assert_any_call( + OPERATIONS_METRIC, + 1, + tags={ + "endpoint": "origin_update", + "object_type": "document", + "operation": "index_error", + }, + )