diff --git a/PKG-INFO b/PKG-INFO index 2ab19e4..8f413ad 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,71 +1,71 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.4.0 +Version: 0.4.1 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/ Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO index 2ab19e4..8f413ad 100644 --- a/swh.indexer.egg-info/PKG-INFO +++ b/swh.indexer.egg-info/PKG-INFO @@ -1,71 +1,71 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.4.0 +Version: 0.4.1 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/ Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh/indexer/storage/api/server.py b/swh/indexer/storage/api/server.py index 09979ca..a6d6f32 100644 --- a/swh/indexer/storage/api/server.py +++ b/swh/indexer/storage/api/server.py @@ -1,118 +1,120 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os +from typing import Any, Dict, Optional from swh.core import config from swh.core.api import RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler from swh.indexer.storage import INDEXER_CFG_KEY, get_indexer_storage from swh.indexer.storage.exc import IndexerStorageArgumentException from swh.indexer.storage.interface import IndexerStorageInterface from .serializers import DECODERS, ENCODERS def get_storage(): global storage if not storage: storage = get_indexer_storage(**app.config[INDEXER_CFG_KEY]) return storage class IndexerStorageServerApp(RPCServerApp): extra_type_decoders = DECODERS extra_type_encoders = ENCODERS app = IndexerStorageServerApp( __name__, backend_class=IndexerStorageInterface, backend_factory=get_storage ) storage = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.errorhandler(IndexerStorageArgumentException) def argument_error_handler(exception): return error_handler(exception, encode_data, status_code=400) @app.route("/") def index(): return "SWH Indexer Storage API server" api_cfg = None -def load_and_check_config(config_file, type="local"): +def load_and_check_config( + config_path: Optional[str], type: str = "local" +) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: - config_file (str): Path to the configuration file to load - type (str): configuration type. For 'local' type, more - checks are done. + config_path: Path to the configuration file to load + type: configuration type. For 'local' type, more + checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ - if not config_file: + if not config_path: raise EnvironmentError("Configuration file must be defined") - if not os.path.exists(config_file): - raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) + if not os.path.exists(config_path): + raise FileNotFoundError(f"Configuration file {config_path} does not exist") - cfg = config.read(config_file) + cfg = config.read(config_path) if "indexer_storage" not in cfg: raise KeyError("Missing '%indexer_storage' configuration") if type == "local": vcfg = cfg["indexer_storage"] cls = vcfg.get("cls") if cls != "local": raise ValueError( "The indexer_storage backend can only be started with a " "'local' configuration" ) - args = vcfg["args"] - if not args.get("db"): + if not vcfg.get("db"): raise ValueError("Invalid configuration; missing 'db' config entry") return cfg def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: - config_file = os.environ.get("SWH_CONFIG_FILENAME") - api_cfg = load_and_check_config(config_file) + config_path = os.environ.get("SWH_CONFIG_FILENAME") + api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == "__main__": print("Deprecated. Use swh-indexer") diff --git a/swh/indexer/tests/storage/test_server.py b/swh/indexer/tests/storage/test_server.py index 85d1f06..91cf037 100644 --- a/swh/indexer/tests/storage/test_server.py +++ b/swh/indexer/tests/storage/test_server.py @@ -1,96 +1,99 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import yaml from swh.indexer.storage.api.server import load_and_check_config def prepare_config_file(tmpdir, content, name="config.yml") -> str: """Prepare configuration file in `$tmpdir/name` with content `content`. Args: tmpdir (LocalPath): root directory content (str/dict): Content of the file either as string or as a dict. If a dict, converts the dict into a yaml string. name (str): configuration filename Returns path (str) of the configuration file prepared. """ config_path = tmpdir / name if isinstance(content, dict): # convert if needed content = yaml.dump(content) config_path.write_text(content, encoding="utf-8") # pytest on python3.5 does not support LocalPath manipulation, so # convert path to string return str(config_path) -def test_load_and_check_config_no_configuration() -> None: - """Inexistent configuration files raises""" - with pytest.raises(EnvironmentError) as e: - load_and_check_config(None) +@pytest.mark.parametrize("config_path", [None, ""]) +def test_load_and_check_config_no_configuration(config_path) -> None: + """Irrelevant configuration file path raises""" + with pytest.raises(EnvironmentError, match="Configuration file must be defined"): + load_and_check_config(config_path) - assert e.value.args[0] == "Configuration file must be defined" +def test_load_and_check_inexistent_config_path() -> None: + """Inexistent configuration file raises""" config_path = "/indexer/inexistent/config.yml" - with pytest.raises(FileNotFoundError) as e: + expected_error = f"Configuration file {config_path} does not exist" + with pytest.raises(FileNotFoundError, match=expected_error): load_and_check_config(config_path) - assert e.value.args[0] == "Configuration file %s does not exist" % (config_path,) - def test_load_and_check_config_wrong_configuration(tmpdir) -> None: """Wrong configuration raises""" config_path = prepare_config_file(tmpdir, "something: useless") - with pytest.raises(KeyError) as e: + with pytest.raises(KeyError, match="Missing '%indexer_storage' configuration"): load_and_check_config(config_path) - assert e.value.args[0] == "Missing '%indexer_storage' configuration" - -def test_load_and_check_config_remote_config_local_type_raise(tmpdir) -> None: - """'local' configuration without 'local' storage raises""" - config = {"indexer_storage": {"cls": "remote", "args": {}}} - config_path = prepare_config_file(tmpdir, config) - with pytest.raises(ValueError) as e: - load_and_check_config(config_path, type="local") +@pytest.mark.parametrize("class_storage", ["remote", "memory"]) +def test_load_and_check_config_remote_config_local_type_raise( + class_storage, tmpdir +) -> None: + """Any other configuration than 'local' (the default) is rejected""" + assert class_storage != "local" + incompatible_config = {"indexer_storage": {"cls": class_storage}} + config_path = prepare_config_file(tmpdir, incompatible_config) - assert ( - e.value.args[0] - == "The indexer_storage backend can only be started with a 'local' " + expected_error = ( + "The indexer_storage backend can only be started with a 'local' " "configuration" ) + with pytest.raises(ValueError, match=expected_error): + load_and_check_config(config_path) + with pytest.raises(ValueError, match=expected_error): + load_and_check_config(config_path, type="local") + + +def test_load_and_check_config_remote_config_fine(tmpdir) -> None: + """'Remote configuration is fine (when changing the default type)""" + config = {"indexer_storage": {"cls": "remote"}} + config_path = prepare_config_file(tmpdir, config) + cfg = load_and_check_config(config_path, type="any") + + assert cfg == config def test_load_and_check_config_local_incomplete_configuration(tmpdir) -> None: """Incomplete 'local' configuration should raise""" - config = {"indexer_storage": {"cls": "local", "args": {}}} + config = {"indexer_storage": {"cls": "local"}} + expected_error = "Invalid configuration; missing 'db' config entry" config_path = prepare_config_file(tmpdir, config) - with pytest.raises(ValueError) as e: + with pytest.raises(ValueError, match=expected_error): load_and_check_config(config_path) - assert e.value.args[0] == "Invalid configuration; missing 'db' config entry" - def test_load_and_check_config_local_config_fine(tmpdir) -> None: - """'Remote configuration is fine""" - config = {"indexer_storage": {"cls": "local", "args": {"db": "db",}}} + """'Complete 'local' configuration is fine""" + config = {"indexer_storage": {"cls": "local", "db": "db",}} config_path = prepare_config_file(tmpdir, config) cfg = load_and_check_config(config_path, type="local") assert cfg == config - - -def test_load_and_check_config_remote_config_fine(tmpdir) -> None: - """'Remote configuration is fine""" - config = {"indexer_storage": {"cls": "remote", "args": {}}} - config_path = prepare_config_file(tmpdir, config) - cfg = load_and_check_config(config_path, type="any") - - assert cfg == config diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 6da0e7b..59c9528 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,386 +1,385 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import reduce import re import tempfile from typing import Any, Dict, List from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Consumer, Producer from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) from swh.journal.serializers import value_to_kafka from swh.model.hashutil import hash_to_bytes CLI_CONFIG = """ scheduler: cls: foo args: {} storage: cls: memory indexer_storage: cls: memory - args: {} """ def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ {"tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {},} for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, from_revision=hash_to_bytes("abcd{:0>4}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] revision_metadata = [ RevisionIntrinsicMetadataRow( id=hash_to_bytes("abcd{:0>4}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.revision_intrinsic_metadata_add(revision_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {"policy_update": "update-dups"} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch( "swh.scheduler.get_scheduler" ) as get_scheduler_mock, tempfile.NamedTemporaryFile( "a", suffix=".yml" ) as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler result = runner.invoke(indexer_cli_group, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_mapping_list(indexer_scheduler): result = invoke(indexer_scheduler, False, ["mapping", "list",]) expected_output = "\n".join( ["codemeta", "gemspec", "maven", "npm", "pkg-info", "",] ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_mapping_list_terms(indexer_scheduler): result = invoke(indexer_scheduler, False, ["mapping", "list-terms",]) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_mapping_list_terms_exclude(indexer_scheduler): result = invoke( indexer_scheduler, False, ["mapping", "list-terms", "--exclude-mapping", "codemeta"], ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_empty_db(indexer_scheduler, idx_storage, storage): result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",]) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_divisor(indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",]) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_dry_run(indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke( indexer_scheduler, False, ["schedule", "--dry-run", "reindex_origin_metadata",] ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_nondivisor(indexer_scheduler, idx_storage, storage): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = invoke( indexer_scheduler, False, ["schedule", "reindex_origin_metadata", "--batch-size", "20",], ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_filter_one_mapping( indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke( indexer_scheduler, False, ["schedule", "reindex_origin_metadata", "--mapping", "mapping1",], ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_filter_two_mappings( indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke( indexer_scheduler, False, [ "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_filter_one_tool( indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = invoke( indexer_scheduler, False, ["schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]),], ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def test_journal_client( storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer ): """Test the 'swh indexer journal-client' cli tool.""" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}} producer.produce( topic=kafka_prefix + ".origin_visit", key=b"bogus", value=value_to_kafka(STATUS), ) result = invoke( indexer_scheduler, False, [ "journal-client", "--stop-after-objects", "1", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", ], ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 1 _assert_tasks_for_origins(tasks, [0])