diff --git a/setup.py b/setup.py index 0cd5f8f..261ef13 100755 --- a/setup.py +++ b/setup.py @@ -1,71 +1,71 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from setuptools import setup, find_packages - -from os import path from io import open +from os import path + +from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.search", description="Software Heritage search service", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSEA", packages=find_packages(), # packages's modules install_requires=parse_requirements() + parse_requirements("swh"), tests_require=parse_requirements("test"), entry_points=""" [swh.cli.subcommands] search=swh.search.cli:cli """, setup_requires=["setuptools-scm"], use_scm_version=True, extras_require={"testing": parse_requirements("test")}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-search", "Documentation": "https://docs.softwareheritage.org/devel/swh-search/", }, ) diff --git a/swh/search/__init__.py b/swh/search/__init__.py index f1e42d8..0690186 100644 --- a/swh/search/__init__.py +++ b/swh/search/__init__.py @@ -1,56 +1,54 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import importlib +from typing import TYPE_CHECKING, Any, Dict import warnings -from typing import Any, Dict, TYPE_CHECKING - - if TYPE_CHECKING: # importing swh.storage.interface triggers the load of 300+ modules, so... from swh.search.interface import SearchInterface SEARCH_IMPLEMENTATIONS = { "elasticsearch": ".elasticsearch.ElasticSearch", "remote": ".api.client.RemoteSearch", "memory": ".in_memory.InMemorySearch", } def get_search(cls: str, **kwargs: Dict[str, Any]) -> "SearchInterface": """Get an search object of class `cls` with arguments `args`. Args: cls: search's class, either 'local' or 'remote' args: dictionary of arguments passed to the search class constructor Returns: an instance of swh.search's classes (either local or remote) Raises: ValueError if passed an unknown search class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] class_path = SEARCH_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( "Unknown search class `%s`. Supported: %s" % (cls, ", ".join(SEARCH_IMPLEMENTATIONS)) ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Search = getattr(module, class_name) return Search(**kwargs) diff --git a/swh/search/api/server.py b/swh/search/api/server.py index 6d16853..29546ef 100644 --- a/swh/search/api/server.py +++ b/swh/search/api/server.py @@ -1,86 +1,88 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from swh.core import config -from swh.core.api import RPCServerApp, error_handler, encode_data_server as encode_data +from swh.core.api import RPCServerApp +from swh.core.api import encode_data_server as encode_data +from swh.core.api import error_handler from .. import get_search from ..interface import SearchInterface def _get_search(): global search if not search: search = get_search(**app.config["search"]) return search app = RPCServerApp(__name__, backend_class=SearchInterface, backend_factory=_get_search) search = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.route("/") def index(): return "SWH Search API server" api_cfg = None def load_and_check_config(config_file, type="elasticsearch"): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config.read(config_file) if "search" not in cfg: raise KeyError("Missing 'search' configuration") return cfg def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_file = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_file) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app diff --git a/swh/search/cli.py b/swh/search/cli.py index c28294d..1c05b57 100644 --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -1,106 +1,108 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS @click.group(name="search", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def cli(ctx, config_file): """Software Heritage Search tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @cli.command("initialize") @click.pass_context def initialize(ctx): """Creates Elasticsearch indices.""" from . import get_search search = get_search(**ctx.obj["config"]["search"]) search.initialize() print("Done.") @cli.group("journal-client") @click.pass_context def journal_client(ctx): """""" pass @journal_client.command("objects") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client_objects(ctx, stop_after_objects): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin) on these new objects.""" import functools + from swh.journal.client import get_journal_client + from . import get_search from .journal_client import process_journal_objects config = ctx.obj["config"] journal_cfg = config["journal"] client = get_journal_client( cls="kafka", object_types=["origin", "origin_visit"], stop_after_objects=stop_after_objects, **journal_cfg, ) search = get_search(**config["search"]) worker_fn = functools.partial(process_journal_objects, search=search,) nb_messages = 0 try: nb_messages = client.process(worker_fn) print("Processed %d messages." % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() @cli.command("rpc-serve") @click.argument("config-path", required=True) @click.option("--host", default="0.0.0.0", help="Host to run the server") @click.option("--port", default=5010, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode", ) def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" - from .api.server import load_and_check_config, app + from .api.server import app, load_and_check_config api_cfg = load_and_check_config(config_path, type="any") app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) diff --git a/swh/search/elasticsearch.py b/swh/search/elasticsearch.py index 33cea95..840860c 100644 --- a/swh/search/elasticsearch.py +++ b/swh/search/elasticsearch.py @@ -1,197 +1,195 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 -import msgpack - -from typing import Any, Iterable, Dict, List, Iterator, Optional +from typing import Any, Dict, Iterable, Iterator, List, Optional from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, scan +import msgpack -from swh.model.identifiers import origin_identifier from swh.model import model - +from swh.model.identifiers import origin_identifier from swh.search.interface import PagedResult def _sanitize_origin(origin): origin = origin.copy() res = {"url": origin.pop("url")} for field_name in ("intrinsic_metadata", "has_visits"): if field_name in origin: res[field_name] = origin.pop(field_name) return res def token_encode(index_to_tokenize: Dict[bytes, Any]) -> str: """Tokenize as string an index page result from a search """ page_token = base64.b64encode(msgpack.dumps(index_to_tokenize)) return page_token.decode() def token_decode(page_token: str) -> Dict[bytes, Any]: """Read the page_token """ return msgpack.loads(base64.b64decode(page_token.encode()), raw=True) class ElasticSearch: def __init__(self, hosts: List[str]): self._backend = Elasticsearch(hosts=hosts) def check(self): return self._backend.ping() def deinitialize(self) -> None: """Removes all indices from the Elasticsearch backend""" self._backend.indices.delete(index="*") def initialize(self) -> None: """Declare Elasticsearch indices and mappings""" if not self._backend.indices.exists(index="origin"): self._backend.indices.create(index="origin") self._backend.indices.put_mapping( index="origin", body={ "properties": { "sha1": {"type": "keyword", "doc_values": True,}, "url": { "type": "text", # To split URLs into token on any character # that is not alphanumerical "analyzer": "simple", "fields": { "as_you_type": { "type": "search_as_you_type", "analyzer": "simple", } }, }, "has_visits": {"type": "boolean",}, "intrinsic_metadata": { "type": "nested", "properties": { "@context": { # don't bother indexing tokens "type": "keyword", } }, }, } }, ) def flush(self) -> None: self._backend.indices.refresh(index="_all") def origin_update(self, documents: Iterable[Dict]) -> None: documents = map(_sanitize_origin, documents) documents_with_sha1 = ( (origin_identifier(document), document) for document in documents ) actions = [ { "_op_type": "update", "_id": sha1, "_index": "origin", "doc": {**document, "sha1": sha1,}, "doc_as_upsert": True, } for (sha1, document) in documents_with_sha1 ] bulk(self._backend, actions, index="origin") def origin_dump(self) -> Iterator[model.Origin]: results = scan(self._backend, index="*") for hit in results: yield self._backend.termvectors(index="origin", id=hit["_id"], fields=["*"]) def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, page_token: Optional[str] = None, limit: int = 50, ) -> PagedResult[Dict[str, Any]]: query_clauses: List[Dict[str, Any]] = [] if url_pattern: query_clauses.append( { "multi_match": { "query": url_pattern, "type": "bool_prefix", "operator": "and", "fields": [ "url.as_you_type", "url.as_you_type._2gram", "url.as_you_type._3gram", ], } } ) if metadata_pattern: query_clauses.append( { "nested": { "path": "intrinsic_metadata", "query": { "multi_match": { "query": metadata_pattern, "operator": "and", "fields": ["intrinsic_metadata.*"], } }, } } ) if not query_clauses: raise ValueError( "At least one of url_pattern and metadata_pattern must be provided." ) next_page_token: Optional[str] = None if with_visit: query_clauses.append({"term": {"has_visits": True,}}) body = { "query": {"bool": {"must": query_clauses,}}, "sort": [{"_score": "desc"}, {"sha1": "asc"},], } if page_token: # TODO: use ElasticSearch's scroll API? page_token_content = token_decode(page_token) body["search_after"] = [ page_token_content[b"score"], page_token_content[b"sha1"].decode("ascii"), ] res = self._backend.search(index="origin", body=body, size=limit) hits = res["hits"]["hits"] if len(hits) == limit: last_hit = hits[-1] next_page_token_content = { b"score": last_hit["_score"], b"sha1": last_hit["_source"]["sha1"], } next_page_token = token_encode(next_page_token_content) assert len(hits) <= limit return PagedResult( results=[{"url": hit["_source"]["url"]} for hit in hits], next_page_token=next_page_token, ) diff --git a/swh/search/in_memory.py b/swh/search/in_memory.py index 7858a7f..578b08b 100644 --- a/swh/search/in_memory.py +++ b/swh/search/in_memory.py @@ -1,107 +1,105 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict import itertools import re - -from collections import defaultdict from typing import Any, Dict, Iterable, Iterator, List, Optional from swh.model.identifiers import origin_identifier - from swh.search.interface import PagedResult class InMemorySearch: def __init__(self): pass def check(self): return True def deinitialize(self) -> None: if hasattr(self, "_origins"): del self._origins del self._origin_ids def initialize(self) -> None: self._origins: Dict[str, Dict[str, Any]] = defaultdict(dict) self._origin_ids: List[str] = [] def flush(self) -> None: pass _url_splitter = re.compile(r"\W") def origin_update(self, documents: Iterable[Dict]) -> None: for document in documents: document = document.copy() id_ = origin_identifier(document) if "url" in document: document["_url_tokens"] = set(self._url_splitter.split(document["url"])) self._origins[id_].update(document) if id_ not in self._origin_ids: self._origin_ids.append(id_) def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, page_token: Optional[str] = None, limit: int = 50, ) -> PagedResult[Dict[str, Any]]: hits: Iterator[Dict[str, Any]] = ( self._origins[id_] for id_ in self._origin_ids ) if url_pattern: tokens = set(self._url_splitter.split(url_pattern)) def predicate(match): missing_tokens = tokens - match["_url_tokens"] if len(missing_tokens) == 0: return True elif len(missing_tokens) > 1: return False else: # There is one missing token, look up by prefix. (missing_token,) = missing_tokens return any( token.startswith(missing_token) for token in match["_url_tokens"] ) hits = filter(predicate, hits) if metadata_pattern: raise NotImplementedError( "Metadata search is not implemented in the in-memory backend." ) if not url_pattern and not metadata_pattern: raise ValueError( "At least one of url_pattern and metadata_pattern must be provided." ) next_page_token: Optional[str] = None if with_visit: hits = filter(lambda o: o.get("has_visits"), hits) start_at_index = int(page_token) if page_token else 0 origins = [ {"url": hit["url"]} for hit in itertools.islice(hits, start_at_index, start_at_index + limit) ] if len(origins) == limit: next_page_token = str(start_at_index + limit) assert len(origins) <= limit return PagedResult(results=origins, next_page_token=next_page_token,) diff --git a/swh/search/interface.py b/swh/search/interface.py index 57c7181..7fdc8ca 100644 --- a/swh/search/interface.py +++ b/swh/search/interface.py @@ -1,64 +1,62 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, Optional, TypeVar -from swh.core.api.classes import PagedResult as CorePagedResult - from swh.core.api import remote_api_endpoint - +from swh.core.api.classes import PagedResult as CorePagedResult TResult = TypeVar("TResult") PagedResult = CorePagedResult[TResult, str] class SearchInterface: @remote_api_endpoint("check") def check(self): """Dedicated method to execute some specific check per implementation. """ ... @remote_api_endpoint("flush") def flush(self) -> None: """Blocks until all previous calls to _update() are completely applied. """ ... @remote_api_endpoint("origin/update") def origin_update(self, documents: Iterable[Dict]) -> None: """Persist documents to the search backend. """ ... @remote_api_endpoint("origin/search") def origin_search( self, *, url_pattern: Optional[str] = None, metadata_pattern: Optional[str] = None, with_visit: bool = False, page_token: Optional[str] = None, limit: int = 50, ) -> PagedResult[Dict[str, Any]]: """Searches for origins matching the `url_pattern`. Args: url_pattern: Part of the URL to search for with_visit: Whether origins with no visit are to be filtered out page_token: Opaque value used for pagination limit: number of results to return Returns: PagedResult of origin dicts matching the search criteria. If next_page_token is None, there is no longer data to retrieve. """ ... diff --git a/swh/search/journal_client.py b/swh/search/journal_client.py index 660a0f9..2c131fc 100644 --- a/swh/search/journal_client.py +++ b/swh/search/journal_client.py @@ -1,63 +1,62 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging - EXPECTED_MESSAGE_TYPES = { "origin", "origin_visit", "origin_intrinsic_metadata", } def process_journal_objects(messages, *, search): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) <= EXPECTED_MESSAGE_TYPES, set(messages) if "origin" in messages: process_origins(messages["origin"], search) if "origin_visit" in messages: process_origin_visits(messages["origin_visit"], search) if "origin_intrinsic_metadata" in messages: process_origin_intrinsic_metadata(messages["origin_intrinsic_metadata"], search) def process_origins(origins, search): logging.debug("processing origins %r", origins) search.origin_update(origins) def process_origin_visits(visits, search): logging.debug("processing origin visits %r", visits) search.origin_update( [ { "url": ( visit["origin"] if isinstance(visit["origin"], str) else visit["origin"]["url"] ), "has_visits": True, } for visit in visits ] ) def process_origin_intrinsic_metadata(origin_metadata, search): logging.debug("processing origin intrinsic_metadata %r", origin_metadata) origin_metadata = [ {"url": item["origin_url"], "intrinsic_metadata": item["metadata"],} for item in origin_metadata ] search.origin_update(origin_metadata) diff --git a/swh/search/tests/conftest.py b/swh/search/tests/conftest.py index 5c9f5b9..b14e443 100644 --- a/swh/search/tests/conftest.py +++ b/swh/search/tests/conftest.py @@ -1,130 +1,129 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import socket import subprocess import time import elasticsearch import pytest from swh.search import get_search - logger = logging.getLogger(__name__) def free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("127.0.0.1", 0)) port = sock.getsockname()[1] sock.close() return port def wait_for_peer(addr, port): while True: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((addr, port)) except ConnectionRefusedError: time.sleep(0.1) else: sock.close() break CONFIG_TEMPLATE = """ node.name: node-1 path.data: {data} path.logs: {logs} network.host: 127.0.0.1 http.port: {http_port} transport.port: {transport_port} """ def _run_elasticsearch(conf_dir, data_dir, logs_dir, http_port, transport_port): es_home = "/usr/share/elasticsearch" with open(conf_dir + "/elasticsearch.yml", "w") as fd: fd.write( CONFIG_TEMPLATE.format( data=data_dir, logs=logs_dir, http_port=http_port, transport_port=transport_port, ) ) with open(conf_dir + "/log4j2.properties", "w") as fd: pass cmd = [ "/usr/share/elasticsearch/jdk/bin/java", "-Des.path.home={}".format(es_home), "-Des.path.conf={}".format(conf_dir), "-Des.bundled_jdk=true", "-Dlog4j2.disable.jmx=true", "-cp", "{}/lib/*".format(es_home), "org.elasticsearch.bootstrap.Elasticsearch", ] host = "127.0.0.1:{}".format(http_port) with open(logs_dir + "/output.txt", "w") as fd: p = subprocess.Popen(cmd) wait_for_peer("127.0.0.1", http_port) client = elasticsearch.Elasticsearch([host]) assert client.ping() return p @pytest.fixture(scope="session") def elasticsearch_session(tmpdir_factory): tmpdir = tmpdir_factory.mktemp("elasticsearch") es_conf = tmpdir.mkdir("conf") http_port = free_port() transport_port = free_port() p = _run_elasticsearch( conf_dir=str(es_conf), data_dir=str(tmpdir.mkdir("data")), logs_dir=str(tmpdir.mkdir("logs")), http_port=http_port, transport_port=transport_port, ) yield "127.0.0.1:{}".format(http_port) # Check ES didn't stop assert p.returncode is None, p.returncode p.kill() p.wait() @pytest.fixture(scope="class") def elasticsearch_host(elasticsearch_session): yield elasticsearch_session @pytest.fixture def swh_search(elasticsearch_host): """Instantiate a search client, initialize the elasticsearch instance, and returns it """ logger.debug("swh_search: elasticsearch_host: %s", elasticsearch_host) search = get_search("elasticsearch", hosts=[elasticsearch_host],) search.deinitialize() # To reset internal state from previous runs search.initialize() # install required index yield search diff --git a/swh/search/tests/test_api_client.py b/swh/search/tests/test_api_client.py index c766e5e..a173ebe 100644 --- a/swh/search/tests/test_api_client.py +++ b/swh/search/tests/test_api_client.py @@ -1,43 +1,43 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import pytest from swh.core.api.tests.server_testing import ServerTestFixture - from swh.search import get_search from swh.search.api.server import app + from .test_search import CommonSearchTest class TestRemoteSearch(CommonSearchTest, ServerTestFixture, unittest.TestCase): @pytest.fixture(autouse=True) def _instantiate_search(self, elasticsearch_host): self._elasticsearch_host = elasticsearch_host def setUp(self): self.config = { "search": { "cls": "elasticsearch", "args": {"hosts": [self._elasticsearch_host],}, } } self.app = app super().setUp() self.reset() self.search = get_search("remote", url=self.url(),) def reset(self): search = get_search("elasticsearch", hosts=[self._elasticsearch_host],) search.deinitialize() search.initialize() @pytest.mark.skip( "Elasticsearch also returns close matches, so this test would fail" ) def test_origin_url_paging(self, count): pass diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py index 6d3419e..308f8d7 100644 --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -1,175 +1,172 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import tempfile -import yaml - -import pytest -from confluent_kafka import Producer from click.testing import CliRunner +from confluent_kafka import Producer +import pytest +import yaml from swh.journal.serializers import value_to_kafka - from swh.search.cli import cli - CLI_CONFIG = """ search: cls: elasticsearch args: hosts: - '%(elasticsearch_host)s' """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - {broker} prefix: {prefix} group_id: {group_id} """ def invoke(catch_exceptions, args, config="", *, elasticsearch_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write( (CLI_CONFIG + config) % {"elasticsearch_host": elasticsearch_host} ) config_fd.seek(0) result = runner.invoke(cli, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test__journal_client__origin( swh_search, elasticsearch_host: str, kafka_prefix: str, kafka_server ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin producer", "acks": "all", } ) origin_foobar_baz = { "url": "http://foobar.baz", } value = value_to_kafka(origin_foobar_baz) topic = f"{kafka_prefix}.origin" producer.produce(topic=topic, key=b"bogus-origin", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, ["journal-client", "objects", "--stop-after-objects", "1",], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # searching origin without visit as requirement actual_page = swh_search.origin_search(url_pattern="foobar") # We find it assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar_baz] # It's an origin with no visit, searching for it with visit actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) # returns nothing assert actual_page.next_page_token is None assert actual_page.results == [] def test__journal_client__origin_visit( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" origin_foobar = {"url": "http://baz.foobar"} producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin visit producer", "acks": "all", } ) topic = f"{kafka_prefix}.origin_visit" value = value_to_kafka({"origin": origin_foobar["url"]}) producer.produce(topic=topic, key=b"bogus-origin-visit", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, ["journal-client", "objects", "--stop-after-objects", "1",], journal_objects_config, elasticsearch_host=elasticsearch_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output swh_search.flush() # Both search returns the visit actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=False) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] actual_page = swh_search.origin_search(url_pattern="foobar", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar] def test__journal_client__missing_main_journal_config_key(elasticsearch_host): """Missing configuration on journal should raise""" with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, args=["journal-client", "objects", "--stop-after-objects", "1",], config="", # missing config will make it raise elasticsearch_host=elasticsearch_host, ) def test__journal_client__missing_journal_config_keys(elasticsearch_host): """Missing configuration on mandatory journal keys should raise""" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker="192.0.2.1", prefix="swh.journal.objects", group_id="test-consumer" ) journal_config = yaml.safe_load(journal_objects_config) for key in journal_config["journal"].keys(): if key == "prefix": # optional continue cfg = copy.deepcopy(journal_config) del cfg["journal"][key] # make config incomplete yaml_cfg = yaml.dump(cfg) with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=["journal-client", "objects", "--stop-after-objects", "1",], config=yaml_cfg, # incomplete config will make the cli raise elasticsearch_host=elasticsearch_host, ) diff --git a/swh/search/tests/test_in_memory.py b/swh/search/tests/test_in_memory.py index 427f4c3..b26e4a5 100644 --- a/swh/search/tests/test_in_memory.py +++ b/swh/search/tests/test_in_memory.py @@ -1,40 +1,41 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import pytest from swh.search import get_search + from .test_search import CommonSearchTest class InmemorySearchTest(unittest.TestCase, CommonSearchTest): @pytest.fixture(autouse=True) def _instantiate_search(self): self.search = get_search("memory") def setUp(self): self.reset() def reset(self): self.search.deinitialize() self.search.initialize() @pytest.mark.skip("Not implemented in the in-memory search") def test_origin_intrinsic_metadata_description(self): pass @pytest.mark.skip("Not implemented in the in-memory search") def test_origin_intrinsic_metadata_all_terms(self): pass @pytest.mark.skip("Not implemented in the in-memory search") def test_origin_intrinsic_metadata_nested(self): pass @pytest.mark.skip("Not implemented in the in-memory search") def test_origin_intrinsic_metadata_paging(self): pass diff --git a/swh/search/tests/test_init.py b/swh/search/tests/test_init.py index 2ea535f..8008a48 100644 --- a/swh/search/tests/test_init.py +++ b/swh/search/tests/test_init.py @@ -1,86 +1,84 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import inspect import pytest from swh.search import get_search - -from swh.search.interface import SearchInterface -from swh.search.elasticsearch import ElasticSearch from swh.search.api.client import RemoteSearch +from swh.search.elasticsearch import ElasticSearch from swh.search.in_memory import InMemorySearch - +from swh.search.interface import SearchInterface SEARCH_IMPLEMENTATIONS_KWARGS = [ ("remote", RemoteSearch, {"url": "localhost"}), ("elasticsearch", ElasticSearch, {"hosts": ["localhost"]}), ] SEARCH_IMPLEMENTATIONS = SEARCH_IMPLEMENTATIONS_KWARGS + [ ("memory", InMemorySearch, None), ] def test_get_search_failure(): with pytest.raises(ValueError, match="Unknown search class"): get_search("unknown-search") @pytest.mark.parametrize("class_,expected_class,kwargs", SEARCH_IMPLEMENTATIONS) def test_get_search(mocker, class_, expected_class, kwargs): mocker.patch("swh.search.elasticsearch.Elasticsearch") if kwargs: concrete_search = get_search(class_, **kwargs) else: concrete_search = get_search(class_) assert isinstance(concrete_search, expected_class) @pytest.mark.parametrize("class_,expected_class,kwargs", SEARCH_IMPLEMENTATIONS_KWARGS) def test_get_search_deprecation_warning(mocker, class_, expected_class, kwargs): with pytest.warns(DeprecationWarning): concrete_search = get_search(class_, args=kwargs) assert isinstance(concrete_search, expected_class) @pytest.mark.parametrize("class_,expected_class,kwargs", SEARCH_IMPLEMENTATIONS) def test_types(mocker, class_, expected_class, kwargs): """Checks all methods of SearchInterface are implemented by this backend, and that they have the same signature. """ mocker.patch("swh.search.elasticsearch.Elasticsearch") if kwargs: concrete_search = get_search(class_, **kwargs) else: concrete_search = get_search(class_) # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (SearchInterface,), {})() for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) missing_methods = [] try: concrete_meth = getattr(concrete_search, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] diff --git a/swh/search/tests/test_search.py b/swh/search/tests/test_search.py index 6ee727a..3ea9911 100644 --- a/swh/search/tests/test_search.py +++ b/swh/search/tests/test_search.py @@ -1,296 +1,296 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from hypothesis import given, strategies, settings +from hypothesis import given, settings, strategies from swh.core.api.classes import stream_results class CommonSearchTest: def test_origin_url_unique_word_prefix(self): origin_foobar_baz = {"url": "http://foobar.baz"} origin_barbaz_qux = {"url": "http://barbaz.qux"} origin_qux_quux = {"url": "http://qux.quux"} origins = [origin_foobar_baz, origin_barbaz_qux, origin_qux_quux] self.search.origin_update(origins) self.search.flush() actual_page = self.search.origin_search(url_pattern="foobar") assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar_baz] actual_page = self.search.origin_search(url_pattern="barb") assert actual_page.next_page_token is None assert actual_page.results == [origin_barbaz_qux] # 'bar' is part of 'foobar', but is not the beginning of it actual_page = self.search.origin_search(url_pattern="bar") assert actual_page.next_page_token is None assert actual_page.results == [origin_barbaz_qux] actual_page = self.search.origin_search(url_pattern="barbaz") assert actual_page.next_page_token is None assert actual_page.results == [origin_barbaz_qux] def test_origin_url_unique_word_prefix_multiple_results(self): origin_foobar_baz = {"url": "http://foobar.baz"} origin_barbaz_qux = {"url": "http://barbaz.qux"} origin_qux_quux = {"url": "http://qux.quux"} self.search.origin_update( [origin_foobar_baz, origin_barbaz_qux, origin_qux_quux] ) self.search.flush() actual_page = self.search.origin_search(url_pattern="qu") assert actual_page.next_page_token is None results = [r["url"] for r in actual_page.results] expected_results = [o["url"] for o in [origin_qux_quux, origin_barbaz_qux]] assert sorted(results) == sorted(expected_results) actual_page = self.search.origin_search(url_pattern="qux") assert actual_page.next_page_token is None results = [r["url"] for r in actual_page.results] expected_results = [o["url"] for o in [origin_qux_quux, origin_barbaz_qux]] assert sorted(results) == sorted(expected_results) def test_origin_url_all_terms(self): origin_foo_bar_baz = {"url": "http://foo.bar/baz"} origin_foo_bar_foo_bar = {"url": "http://foo.bar/foo.bar"} origins = [origin_foo_bar_baz, origin_foo_bar_foo_bar] self.search.origin_update(origins) self.search.flush() # Only results containing all terms should be returned. actual_page = self.search.origin_search(url_pattern="foo bar baz") assert actual_page.next_page_token is None assert actual_page.results == [origin_foo_bar_baz] def test_origin_with_visit(self): origin_foobar_baz = {"url": "http://foobar/baz"} self.search.origin_update( [{**o, "has_visits": True} for o in [origin_foobar_baz]] ) self.search.flush() actual_page = self.search.origin_search(url_pattern="foobar", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar_baz] def test_origin_with_visit_added(self): origin_foobar_baz = {"url": "http://foobar.baz"} self.search.origin_update([origin_foobar_baz]) self.search.flush() actual_page = self.search.origin_search(url_pattern="foobar", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [] self.search.origin_update( [{**o, "has_visits": True} for o in [origin_foobar_baz]] ) self.search.flush() actual_page = self.search.origin_search(url_pattern="foobar", with_visit=True) assert actual_page.next_page_token is None assert actual_page.results == [origin_foobar_baz] def test_origin_intrinsic_metadata_description(self): origin1_nothin = {"url": "http://origin1"} origin2_foobar = {"url": "http://origin2"} origin3_barbaz = {"url": "http://origin3"} self.search.origin_update( [ {**origin1_nothin, "intrinsic_metadata": {},}, { **origin2_foobar, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "foo bar", }, }, { **origin3_barbaz, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "bar baz", }, }, ] ) self.search.flush() actual_page = self.search.origin_search(metadata_pattern="foo") assert actual_page.next_page_token is None assert actual_page.results == [origin2_foobar] actual_page = self.search.origin_search(metadata_pattern="foo bar") assert actual_page.next_page_token is None assert actual_page.results == [origin2_foobar] actual_page = self.search.origin_search(metadata_pattern="bar baz") assert actual_page.next_page_token is None assert actual_page.results == [origin3_barbaz] def test_origin_intrinsic_metadata_all_terms(self): origin1_foobarfoobar = {"url": "http://origin1"} origin3_foobarbaz = {"url": "http://origin2"} self.search.origin_update( [ { **origin1_foobarfoobar, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "foo bar foo bar", }, }, { **origin3_foobarbaz, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "description": "foo bar baz", }, }, ] ) self.search.flush() actual_page = self.search.origin_search(metadata_pattern="foo bar baz") assert actual_page.next_page_token is None assert actual_page.results == [origin3_foobarbaz] def test_origin_intrinsic_metadata_nested(self): origin1_nothin = {"url": "http://origin1"} origin2_foobar = {"url": "http://origin2"} origin3_barbaz = {"url": "http://origin3"} self.search.origin_update( [ {**origin1_nothin, "intrinsic_metadata": {},}, { **origin2_foobar, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "keywords": ["foo", "bar"], }, }, { **origin3_barbaz, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "keywords": ["bar", "baz"], }, }, ] ) self.search.flush() actual_page = self.search.origin_search(metadata_pattern="foo") assert actual_page.next_page_token is None assert actual_page.results == [origin2_foobar] actual_page = self.search.origin_search(metadata_pattern="foo bar") assert actual_page.next_page_token is None assert actual_page.results == [origin2_foobar] actual_page = self.search.origin_search(metadata_pattern="bar baz") assert actual_page.next_page_token is None assert actual_page.results == [origin3_barbaz] # TODO: add more tests with more codemeta terms # TODO: add more tests with edge cases @settings(deadline=None) @given(strategies.integers(min_value=1, max_value=4)) def test_origin_url_paging(self, limit): # TODO: no hypothesis origin1_foo = {"url": "http://origin1/foo"} origin2_foobar = {"url": "http://origin2/foo/bar"} origin3_foobarbaz = {"url": "http://origin3/foo/bar/baz"} self.reset() self.search.origin_update([origin1_foo, origin2_foobar, origin3_foobarbaz]) self.search.flush() results = stream_results( self.search.origin_search, url_pattern="foo bar baz", limit=limit ) results = [res["url"] for res in results] expected_results = [o["url"] for o in [origin3_foobarbaz]] assert sorted(results[0 : len(expected_results)]) == sorted(expected_results) results = stream_results( self.search.origin_search, url_pattern="foo bar", limit=limit ) results = [res["url"] for res in results] expected_results = [o["url"] for o in [origin2_foobar, origin3_foobarbaz]] assert sorted(results[0 : len(expected_results)]) == sorted(expected_results) results = stream_results( self.search.origin_search, url_pattern="foo", limit=limit ) results = [res["url"] for res in results] expected_results = [ o["url"] for o in [origin1_foo, origin2_foobar, origin3_foobarbaz] ] assert sorted(results[0 : len(expected_results)]) == sorted(expected_results) @settings(deadline=None) @given(strategies.integers(min_value=1, max_value=4)) def test_origin_intrinsic_metadata_paging(self, limit): # TODO: no hypothesis origin1_foo = {"url": "http://origin1"} origin2_foobar = {"url": "http://origin2"} origin3_foobarbaz = {"url": "http://origin3"} self.reset() self.search.origin_update( [ { **origin1_foo, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "keywords": ["foo"], }, }, { **origin2_foobar, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "keywords": ["foo", "bar"], }, }, { **origin3_foobarbaz, "intrinsic_metadata": { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "keywords": ["foo", "bar", "baz"], }, }, ] ) self.search.flush() results = stream_results( self.search.origin_search, metadata_pattern="foo bar baz", limit=limit ) assert list(results) == [origin3_foobarbaz] results = stream_results( self.search.origin_search, metadata_pattern="foo bar", limit=limit ) assert list(results) == [origin2_foobar, origin3_foobarbaz] results = stream_results( self.search.origin_search, metadata_pattern="foo", limit=limit ) assert list(results) == [origin1_foo, origin2_foobar, origin3_foobarbaz]