diff --git a/PKG-INFO b/PKG-INFO index 87a7e3d..ff967a3 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,71 +1,71 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.6.3 +Version: 0.6.4 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/ Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/conftest.py b/conftest.py index 1e93ce8..8392690 100644 --- a/conftest.py +++ b/conftest.py @@ -1,27 +1,31 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import settings import pytest # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) # Ignore the following modules because wsgi module fails as no # configuration file is found (--doctest-modules forces the module # loading) collect_ignore = ["swh/indexer/storage/api/wsgi.py"] -# we use the swh_scheduler fixture -pytest_plugins = ["swh.scheduler.pytest_plugin"] +# we use the various swh fixtures +pytest_plugins = [ + "swh.scheduler.pytest_plugin", + "swh.storage.pytest_plugin", + "swh.core.db.pytest_plugin", +] @pytest.fixture(scope="session") def swh_scheduler_celery_includes(swh_scheduler_celery_includes): return swh_scheduler_celery_includes + [ "swh.indexer.tasks", ] diff --git a/docs/cli.rst b/docs/cli.rst new file mode 100644 index 0000000..2ad5a52 --- /dev/null +++ b/docs/cli.rst @@ -0,0 +1,8 @@ +.. _swh-indexer-cli: + +Command-line interface +====================== + +.. click:: swh.indexer.cli:indexer_cli_group + :prog: swh indexer + :nested: full diff --git a/docs/index.rst b/docs/index.rst index b80d6f4..ed79b46 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,25 +1,26 @@ .. _swh-indexer: Software Heritage - Indexer =========================== Tools and workers used to mine the content of the archive and extract derived information from archive source code artifacts. .. toctree:: :maxdepth: 1 :caption: Contents: README.md dev-info.rst metadata-workflow.rst Reference Documentation ----------------------- .. toctree:: :maxdepth: 2 + cli /apidoc/swh.indexer diff --git a/requirements-swh.txt b/requirements-swh.txt index 40b5ff6..854fbce 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,6 +1,6 @@ -swh.core[db,http] >= 0.3 +swh.core[db,http] >= 0.9.1 swh.model >= 0.0.15 swh.objstorage >= 0.2.2 swh.scheduler >= 0.5.2 swh.storage >= 0.12.0 swh.journal >= 0.1.0 diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO index 87a7e3d..ff967a3 100644 --- a/swh.indexer.egg-info/PKG-INFO +++ b/swh.indexer.egg-info/PKG-INFO @@ -1,71 +1,71 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.6.3 +Version: 0.6.4 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/ Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.indexer.egg-info/SOURCES.txt b/swh.indexer.egg-info/SOURCES.txt index 88bcdbe..9f4e312 100644 --- a/swh.indexer.egg-info/SOURCES.txt +++ b/swh.indexer.egg-info/SOURCES.txt @@ -1,141 +1,142 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile Makefile.local README.md codemeta.json conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/Makefile.local docs/README.md +docs/cli.rst docs/conf.py docs/dev-info.rst docs/index.rst docs/metadata-workflow.rst docs/_static/.placeholder docs/_templates/.placeholder docs/images/.gitignore docs/images/Makefile docs/images/tasks-metadata-indexers.uml sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/indexer_configuration.tool_configuration.schema.json sql/doc/json/revision_metadata.translated_metadata.json sql/json/.gitignore sql/json/Makefile sql/json/indexer_configuration.tool_configuration.schema.json sql/json/revision_metadata.translated_metadata.json sql/upgrades/115.sql sql/upgrades/116.sql sql/upgrades/117.sql sql/upgrades/118.sql sql/upgrades/119.sql sql/upgrades/120.sql sql/upgrades/121.sql sql/upgrades/122.sql sql/upgrades/123.sql sql/upgrades/124.sql sql/upgrades/125.sql sql/upgrades/126.sql sql/upgrades/127.sql sql/upgrades/128.sql sql/upgrades/129.sql sql/upgrades/130.sql sql/upgrades/131.sql sql/upgrades/132.sql sql/upgrades/133.sql swh/__init__.py swh.indexer.egg-info/PKG-INFO swh.indexer.egg-info/SOURCES.txt swh.indexer.egg-info/dependency_links.txt swh.indexer.egg-info/entry_points.txt swh.indexer.egg-info/requires.txt swh.indexer.egg-info/top_level.txt swh/indexer/__init__.py swh/indexer/cli.py swh/indexer/codemeta.py swh/indexer/ctags.py swh/indexer/fossology_license.py swh/indexer/indexer.py swh/indexer/journal_client.py swh/indexer/metadata.py swh/indexer/metadata_detector.py swh/indexer/mimetype.py swh/indexer/origin_head.py swh/indexer/py.typed swh/indexer/rehash.py swh/indexer/tasks.py swh/indexer/data/codemeta/CITATION swh/indexer/data/codemeta/LICENSE swh/indexer/data/codemeta/codemeta.jsonld swh/indexer/data/codemeta/crosswalk.csv swh/indexer/metadata_dictionary/__init__.py swh/indexer/metadata_dictionary/base.py swh/indexer/metadata_dictionary/codemeta.py swh/indexer/metadata_dictionary/maven.py swh/indexer/metadata_dictionary/npm.py swh/indexer/metadata_dictionary/python.py swh/indexer/metadata_dictionary/ruby.py swh/indexer/sql/10-superuser-init.sql swh/indexer/sql/20-enums.sql swh/indexer/sql/30-schema.sql swh/indexer/sql/50-data.sql swh/indexer/sql/50-func.sql swh/indexer/sql/60-indexes.sql swh/indexer/storage/__init__.py swh/indexer/storage/converters.py swh/indexer/storage/db.py swh/indexer/storage/exc.py swh/indexer/storage/in_memory.py swh/indexer/storage/interface.py swh/indexer/storage/metrics.py swh/indexer/storage/model.py swh/indexer/storage/writer.py swh/indexer/storage/api/__init__.py swh/indexer/storage/api/client.py swh/indexer/storage/api/serializers.py swh/indexer/storage/api/server.py swh/indexer/tests/__init__.py swh/indexer/tests/conftest.py swh/indexer/tests/tasks.py swh/indexer/tests/test_cli.py swh/indexer/tests/test_codemeta.py swh/indexer/tests/test_ctags.py swh/indexer/tests/test_fossology_license.py swh/indexer/tests/test_indexer.py swh/indexer/tests/test_journal_client.py swh/indexer/tests/test_metadata.py swh/indexer/tests/test_mimetype.py swh/indexer/tests/test_origin_head.py swh/indexer/tests/test_origin_metadata.py swh/indexer/tests/test_tasks.py swh/indexer/tests/utils.py swh/indexer/tests/storage/__init__.py swh/indexer/tests/storage/conftest.py swh/indexer/tests/storage/generate_data_test.py swh/indexer/tests/storage/test_api_client.py swh/indexer/tests/storage/test_converters.py swh/indexer/tests/storage/test_in_memory.py swh/indexer/tests/storage/test_init.py swh/indexer/tests/storage/test_metrics.py swh/indexer/tests/storage/test_model.py swh/indexer/tests/storage/test_server.py swh/indexer/tests/storage/test_storage.py \ No newline at end of file diff --git a/swh.indexer.egg-info/requires.txt b/swh.indexer.egg-info/requires.txt index 25feb9c..c834180 100644 --- a/swh.indexer.egg-info/requires.txt +++ b/swh.indexer.egg-info/requires.txt @@ -1,19 +1,19 @@ click python-magic>=0.4.13 pyld xmltodict typing-extensions -swh.core[db,http]>=0.3 +swh.core[db,http]>=0.9.1 swh.model>=0.0.15 swh.objstorage>=0.2.2 swh.scheduler>=0.5.2 swh.storage>=0.12.0 swh.journal>=0.1.0 [testing] confluent-kafka pytest pytest-mock hypothesis>=3.11.0 swh.scheduler[testing]>=0.5.0 swh.storage[testing]>=0.10.0 diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py index e423ccf..ab8b8b3 100644 --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -1,185 +1,184 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import subprocess -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional +from swh.core.api.classes import stream_results from swh.core.config import merge_configs -from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult, Sha1 +from swh.indexer.storage.interface import IndexerStorageInterface, Sha1 from swh.indexer.storage.model import ContentLicenseRow from swh.model import hashutil from .indexer import ContentIndexer, ContentPartitionIndexer, write_to_temp logger = logging.getLogger(__name__) def compute_license(path) -> Dict: """Determine license from file at path. Args: path: filepath to determine the license Returns: dict: A dict with the following keys: - licenses ([str]): associated detected licenses to path - path (bytes): content filepath """ try: properties = subprocess.check_output(["nomossa", path], universal_newlines=True) if properties: res = properties.rstrip().split(" contains license(s) ") licenses = res[1].split(",") else: licenses = [] return { "licenses": licenses, "path": path, } except subprocess.CalledProcessError: from os import path as __path logger.exception( "Problem during license detection for sha1 %s" % __path.basename(path) ) return { "licenses": [], "path": path, } DEFAULT_CONFIG: Dict[str, Any] = { "workdir": "/tmp/swh/indexer.fossology.license", "tools": { "name": "nomos", "version": "3.1.0rc2-31-ga2cbb8c", "configuration": {"command_line": "nomossa ",}, }, "write_batch_size": 1000, } class MixinFossologyLicenseIndexer: """Mixin fossology license indexer. See :class:`FossologyLicenseIndexer` and :class:`FossologyLicensePartitionIndexer` """ tool: Any idx_storage: IndexerStorageInterface def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) self.working_directory = self.config["workdir"] def index( self, id: Sha1, data: Optional[bytes] = None, **kwargs ) -> List[ContentLicenseRow]: """Index sha1s' content and store result. Args: id (bytes): content's identifier raw_content (bytes): associated raw content to content id Returns: dict: A dict, representing a content_license, with keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path - indexer_configuration_id (int): tool used to compute the output """ assert data is not None with write_to_temp( filename=hashutil.hash_to_hex(id), # use the id as pathname data=data, working_directory=self.working_directory, ) as content_path: properties = compute_license(path=content_path) return [ ContentLicenseRow( id=id, indexer_configuration_id=self.tool["id"], license=license, ) for license in properties["licenses"] ] def persist_index_computations( self, results: List[ContentLicenseRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_license dict with the following keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path """ return self.idx_storage.content_fossology_license_add(results) class FossologyLicenseIndexer( MixinFossologyLicenseIndexer, ContentIndexer[ContentLicenseRow] ): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {license, encoding} from that content - store result in storage """ def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_fossology_license_missing( ({"id": sha1, "indexer_configuration_id": self.tool["id"],} for sha1 in ids) ) class FossologyLicensePartitionIndexer( MixinFossologyLicenseIndexer, ContentPartitionIndexer[ContentLicenseRow] ): """FossologyLicense Range Indexer working on range/partition of content identifiers. - filters out the non textual content - (optionally) filters out content already indexed (cf :meth:`.indexed_contents_in_partition`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None - ) -> PagedResult[Sha1]: + ) -> Iterable[Sha1]: """Retrieve indexed content id within the partition id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination - - Returns: - PagedResult of Sha1. If next_page_token is None, there is no more data - to fetch - """ - return self.idx_storage.content_fossology_license_get_partition( - self.tool["id"], partition_id, nb_partitions, page_token=page_token + return stream_results( + self.idx_storage.content_fossology_license_get_partition, + self.tool["id"], + partition_id, + nb_partitions, ) diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 984ffbf..85c0d72 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,613 +1,615 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from contextlib import contextmanager import logging import os import shutil import tempfile -from typing import Any, Dict, Generic, Iterator, List, Optional, Set, TypeVar, Union +from typing import ( + Any, + Dict, + Generic, + Iterable, + Iterator, + List, + Optional, + Set, + TypeVar, + Union, +) import warnings from swh.core import utils from swh.core.config import load_from_envvar, merge_configs -from swh.indexer.storage import INDEXER_CFG_KEY, PagedResult, Sha1, get_indexer_storage +from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage from swh.indexer.storage.interface import IndexerStorageInterface from swh.model import hashutil from swh.model.model import Revision, Sha1Git from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.scheduler import CONFIG as SWH_CONFIG from swh.storage import get_storage from swh.storage.interface import StorageInterface @contextmanager def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. Args: filename: one of sha1's many filenames data: the sha1's content to write in temporary file working_directory: the directory into which the file is written Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, "wb") as f: f.write(data) yield content_path shutil.rmtree(temp_dir) DEFAULT_CONFIG = { INDEXER_CFG_KEY: {"cls": "memory"}, "storage": {"cls": "memory"}, "objstorage": {"cls": "memory"}, } TId = TypeVar("TId") """type of the ids of index()ed objects.""" TData = TypeVar("TData") """type of the objects passed to index().""" TResult = TypeVar("TResult") """return type of index()""" class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :meth:`~BaseIndexer.run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :meth:`~BaseIndexer.filter`: filter out data already indexed (in storage). :meth:`~BaseIndexer.index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :meth:`~BaseIndexer.persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :meth:`~BaseIndexer.prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :meth:`~BaseIndexer.check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :meth:`~BaseIndexer.register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ results: List[TResult] USE_TOOLS = True catch_exceptions = True """Prevents exceptions in `index()` from raising too high. Set to False in tests to properly catch all exceptions.""" scheduler: Any storage: StorageInterface objstorage: Any idx_storage: IndexerStorageInterface def __init__(self, config=None, **kw) -> None: """Prepare and check that the indexer is ready to run. """ super().__init__() if config is not None: self.config = config elif SWH_CONFIG: self.config = SWH_CONFIG.copy() else: self.config = load_from_envvar() self.config = merge_configs(DEFAULT_CONFIG, self.config) self.prepare() self.check() self.log.debug("%s: config=%s", self, self.config) def prepare(self) -> None: """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ config_storage = self.config.get("storage") if config_storage: self.storage = get_storage(**config_storage) self.objstorage = get_objstorage(**self.config["objstorage"]) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) self.log = logging.getLogger("swh.indexer") if self.USE_TOOLS: self.tools = list(self.register_tools(self.config.get("tools", []))) self.results = [] @property def tool(self) -> Dict: return self.tools[0] def check(self) -> None: """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if self.USE_TOOLS and not self.tools: raise ValueError("Tools %s is unknown, cannot continue" % self.tools) def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]: """Prepare the tool dict to be compliant with the storage api. """ return {"tool_%s" % key: value for key, value in tool.items()} def register_tools( self, tools: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> List[Dict[str, Any]]: """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools: Either a dict or a list of dict. Returns: list: List of dicts with additional id key. Raises: ValueError: if not a list nor a dict. """ if isinstance(tools, list): tools = list(map(self._prepare_tool, tools)) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError("Configuration tool(s) must be a dict or list!") if tools: return self.idx_storage.indexer_configuration_add(tools) else: return [] def index(self, id: TId, data: Optional[TData], **kwargs) -> List[TResult]: """Index computation for the id and associated raw data. Args: id: identifier or Dict object data: id's data from storage or objstorage depending on object type Returns: dict: a dict that makes sense for the :meth:`.persist_index_computations` method. """ raise NotImplementedError() def filter(self, ids: List[TId]) -> Iterator[TId]: """Filter missing ids for that particular indexer. Args: ids: list of ids Yields: iterator of missing ids """ yield from ids @abc.abstractmethod def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]: """Persist the computation resulting from the index. Args: results: List of results. One result is the result of the index function. Returns: a summary dict of what has been inserted in the storage """ return {} class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content indexer working on a list of ids directly. To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1], **kwargs) -> Dict: """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results Args: ids (Iterable[Union[bytes, str]]): sha1's identifier list **kwargs: passed to the `index` method Returns: A summary Dict of the task's status """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] sha1s = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] results = [] summary: Dict = {"status": "uneventful"} try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning( "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1) ) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.extend(res) summary["status"] = "eventful" summary = self.persist_index_computations(results) self.results = results except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when reading contents metadata.") summary["status"] = "failed" return summary class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content partition indexer. This expects as input a partition_id and a nb_partitions. This will then index the contents within that partition. To work on a list of ids, use the :class:`ContentIndexer` instead. Note: :class:`ContentPartitionIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ @abc.abstractmethod def indexed_contents_in_partition( - self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None - ) -> PagedResult[Sha1]: + self, partition_id: int, nb_partitions: int + ) -> Iterable[Sha1]: """Retrieve indexed contents within range [start, end]. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination - Returns: - PagedResult of Sha1. If next_page_token is None, there is no more data - to fetch - """ pass def _list_contents_to_index( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] - ) -> Iterator[Sha1]: + ) -> Iterable[Sha1]: """Compute from storage the new contents to index in the partition_id . The already indexed contents are skipped. Args: partition_id: Index of the partition to fetch data from nb_partitions: Total number of partition indexed: Set of content already indexed. Yields: Sha1 id (bytes) of contents to index """ if not isinstance(partition_id, int) or not isinstance(nb_partitions, int): raise TypeError( f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}." ) next_page_token = None while True: result = self.storage.content_get_partition( partition_id, nb_partitions, page_token=next_page_token ) contents = result.results for c in contents: _id = hashutil.hash_to_bytes(c.sha1) if _id in indexed: continue yield _id next_page_token = result.next_page_token if next_page_token is None: break def _index_contents( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any ) -> Iterator[TResult]: """Index the contents within the partition_id. Args: start: Starting bound from range identifier end: End range identifier indexed: Set of content already indexed. Yields: indexing result as dict to persist in the indexer backend """ for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning(f"Content {sha1.hex()} not found in objstorage") continue yield from self.index(sha1, raw_content, **kwargs) def _index_with_skipping_already_done( self, partition_id: int, nb_partitions: int ) -> Iterator[TResult]: """Index not already indexed contents within the partition partition_id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into Yields: indexing result as dict to persist in the indexer backend """ - next_page_token = None - contents = set() - while True: - indexed_page = self.indexed_contents_in_partition( - partition_id, nb_partitions, page_token=next_page_token - ) - for sha1 in indexed_page.results: - contents.add(sha1) - yield from self._index_contents(partition_id, nb_partitions, contents) - next_page_token = indexed_page.next_page_token - if next_page_token is None: - break + already_indexed_contents = set( + self.indexed_contents_in_partition(partition_id, nb_partitions) + ) + + return self._index_contents( + partition_id, nb_partitions, already_indexed_contents + ) def run( self, partition_id: int, nb_partitions: int, skip_existing: bool = True, **kwargs, ) -> Dict: """Given a partition of content ids, index the contents within. Either the indexer is incremental (filter out existing computed data) or it computes everything from scratch. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into skip_existing: Skip existing indexed data (default) or not **kwargs: passed to the `index` method Returns: dict with the indexing task status """ summary: Dict[str, Any] = {"status": "uneventful"} count = 0 try: if skip_existing: gen = self._index_with_skipping_already_done( partition_id, nb_partitions ) else: gen = self._index_contents(partition_id, nb_partitions, indexed=set([])) count_object_added_key: Optional[str] = None for contents in utils.grouper(gen, n=self.config["write_batch_size"]): res = self.persist_index_computations(list(contents)) if not count_object_added_key: count_object_added_key = list(res.keys())[0] count += res[count_object_added_key] if count > 0: summary["status"] = "eventful" except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when computing metadata.") summary["status"] = "failed" if count > 0 and count_object_added_key: summary[count_object_added_key] = count return summary class OriginIndexer(BaseIndexer[str, None, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, origin_urls: List[str], **kwargs) -> Dict: """Given a list of origin urls: - retrieve origins from storage - execute the indexing computations - store the results Args: origin_urls: list of origin urls. **kwargs: passed to the `index` method """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] summary: Dict[str, Any] = {"status": "uneventful"} try: results = self.index_list(origin_urls, **kwargs) except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) self.results = results if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary def index_list(self, origin_urls: List[str], **kwargs) -> List[TResult]: results = [] for origin_url in origin_urls: try: results.extend(self.index(origin_url, **kwargs)) except Exception: self.log.exception("Problem when processing origin %s", origin_url) raise return results class RevisionIndexer(BaseIndexer[Sha1Git, Revision, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Revision indexing using the run method Note: the :class:`RevisionIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1Git], **kwargs) -> Dict: """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - store the results Args: ids: sha1_git's identifier list """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] summary: Dict[str, Any] = {"status": "uneventful"} results = [] revision_ids = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)): if not rev: # TODO: call self.index() with rev=None? self.log.warning( "Revision %s not found in storage", hashutil.hash_to_hex(rev_id) ) continue try: results.extend(self.index(rev_id, rev)) except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when processing revision") summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) self.results = results return summary diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py index 3185534..aa5fb4e 100644 --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,164 +1,163 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional import magic +from swh.core.api.classes import stream_results from swh.core.config import merge_configs -from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult, Sha1 +from swh.indexer.storage.interface import IndexerStorageInterface, Sha1 from swh.indexer.storage.model import ContentMimetypeRow from .indexer import ContentIndexer, ContentPartitionIndexer if not hasattr(magic.Magic, "from_buffer"): raise ImportError( 'Expected "import magic" to import python-magic, but file_magic ' "was imported instead." ) def compute_mimetype_encoding(raw_content: bytes) -> Dict[str, str]: """Determine mimetype and encoding from the raw content. Args: raw_content: content's raw data Returns: dict: mimetype and encoding key and corresponding values. """ m = magic.Magic(mime=True, mime_encoding=True) res = m.from_buffer(raw_content) try: mimetype, encoding = res.split("; charset=") except ValueError: mimetype, encoding = res, "" return { "mimetype": mimetype, "encoding": encoding, } DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "file", "version": "1:5.30-1+deb9u1", "configuration": {"type": "library", "debian-package": "python3-magic"}, }, "write_batch_size": 1000, } class MixinMimetypeIndexer: """Mixin mimetype indexer. See :class:`MimetypeIndexer` and :class:`MimetypePartitionIndexer` """ tool: Any idx_storage: IndexerStorageInterface def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) def index( self, id: Sha1, data: Optional[bytes] = None, **kwargs ) -> List[ContentMimetypeRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: content's mimetype; dict keys being - id: content's identifier (sha1) - mimetype: mimetype in bytes - encoding: encoding in bytes """ assert data is not None properties = compute_mimetype_encoding(data) return [ ContentMimetypeRow( id=id, indexer_configuration_id=self.tool["id"], mimetype=properties["mimetype"], encoding=properties["encoding"], ) ] def persist_index_computations( self, results: List[ContentMimetypeRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content's mimetype dicts (see :meth:`.index`) """ return self.idx_storage.content_mimetype_add(results) class MimetypeIndexer(MixinMimetypeIndexer, ContentIndexer[ContentMimetypeRow]): """Mimetype Indexer working on list of content identifiers. It: - (optionally) filters out content already indexed (cf. :meth:`.filter`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_mimetype_missing( ({"id": sha1, "indexer_configuration_id": self.tool["id"],} for sha1 in ids) ) class MimetypePartitionIndexer( MixinMimetypeIndexer, ContentPartitionIndexer[ContentMimetypeRow] ): """Mimetype Range Indexer working on range of content identifiers. It: - (optionally) filters out content already indexed (cf :meth:`.indexed_contents_in_partition`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def indexed_contents_in_partition( - self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, - ) -> PagedResult[Sha1]: + self, partition_id: int, nb_partitions: int, + ) -> Iterable[Sha1]: """Retrieve indexed content ids within partition_id. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination - - Returns: - PagedResult of Sha1. If next_page_token is None, there is no more data - to fetch - """ - return self.idx_storage.content_mimetype_get_partition( - self.tool["id"], partition_id, nb_partitions, page_token=page_token + return stream_results( + self.idx_storage.content_mimetype_get_partition, + self.tool["id"], + partition_id, + nb_partitions, ) diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py index ec2e084..6f41d44 100644 --- a/swh/indexer/storage/db.py +++ b/swh/indexer/storage/db.py @@ -1,550 +1,550 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Iterable, Iterator, List from swh.core.db import BaseDb from swh.core.db.db_utils import execute_values_generator, stored_procedure from swh.model import hashutil from .interface import Sha1 class Db(BaseDb): """Proxy to the SWH Indexer DB, with wrappers around stored procedures """ content_mimetype_hash_keys = ["id", "indexer_configuration_id"] def _missing_from_list( self, table: str, data: Iterable[Dict], hash_keys: List[str], cur=None ): """Read from table the data with hash_keys that are missing. Args: table: Table name (e.g content_mimetype, content_language, etc...) data: Dict of data to read from hash_keys: List of keys to read in the data dict. Yields: The data which is missing from the db. """ cur = self._cursor(cur) keys = ", ".join(hash_keys) equality = " AND ".join(("t.%s = c.%s" % (key, key)) for key in hash_keys) yield from execute_values_generator( cur, """ select %s from (values %%s) as t(%s) where not exists ( select 1 from %s c where %s ) """ % (keys, keys, table, equality), (tuple(m[k] for k in hash_keys) for m in data), ) def content_mimetype_missing_from_list( self, mimetypes: Iterable[Dict], cur=None ) -> Iterator[Sha1]: """List missing mimetypes. """ yield from self._missing_from_list( "content_mimetype", mimetypes, self.content_mimetype_hash_keys, cur=cur ) content_mimetype_cols = [ "id", "mimetype", "encoding", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_content_mimetype") def mktemp_content_mimetype(self, cur=None): pass def content_mimetype_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_content_mimetype_add()") return cur.fetchone()[0] def _convert_key(self, key, main_table="c"): """Convert keys according to specific use in the module. Args: key (str): Key expression to change according to the alias used in the query main_table (str): Alias to use for the main table. Default to c for content_{something}. Expected: Tables content_{something} being aliased as 'c' (something in {language, mimetype, ...}), table indexer_configuration being aliased as 'i'. """ if key == "id": return "%s.id" % main_table elif key == "tool_id": return "i.id as tool_id" elif key == "license": return ( """ ( select name from fossology_license where id = %s.license_id ) as licenses""" % main_table ) return key def _get_from_list(self, table, ids, cols, cur=None, id_col="id"): """Fetches entries from the `table` such that their `id` field (or whatever is given to `id_col`) is in `ids`. Returns the columns `cols`. The `cur` parameter is used to connect to the database. """ cur = self._cursor(cur) keys = map(self._convert_key, cols) query = """ select {keys} from (values %s) as t(id) inner join {table} c on c.{id_col}=t.id inner join indexer_configuration i on c.indexer_configuration_id=i.id; """.format( keys=", ".join(keys), id_col=id_col, table=table ) yield from execute_values_generator(cur, query, ((_id,) for _id in ids)) content_indexer_names = { "mimetype": "content_mimetype", "fossology_license": "content_fossology_license", } def content_get_range( self, content_type, start, end, indexer_configuration_id, limit=1000, with_textual_data=False, cur=None, ): """Retrieve contents with content_type, within range [start, end] bound by limit and associated to the given indexer configuration id. When asking to work on textual content, that filters on the mimetype table with any mimetype that is not binary. """ cur = self._cursor(cur) table = self.content_indexer_names[content_type] if with_textual_data: extra = """inner join content_mimetype cm on (t.id=cm.id and cm.mimetype like 'text/%%' and %(start)s <= cm.id and cm.id <= %(end)s) """ else: extra = "" query = f"""select t.id from {table} t {extra} where t.indexer_configuration_id=%(tool_id)s and %(start)s <= t.id and t.id <= %(end)s order by t.indexer_configuration_id, t.id limit %(limit)s""" cur.execute( query, { "start": start, "end": end, "tool_id": indexer_configuration_id, "limit": limit, }, ) yield from cur def content_mimetype_get_from_list(self, ids, cur=None): yield from self._get_from_list( "content_mimetype", ids, self.content_mimetype_cols, cur=cur ) content_language_hash_keys = ["id", "indexer_configuration_id"] def content_language_missing_from_list(self, languages, cur=None): """List missing languages. """ yield from self._missing_from_list( "content_language", languages, self.content_language_hash_keys, cur=cur ) content_language_cols = [ "id", "lang", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_content_language") def mktemp_content_language(self, cur=None): pass def content_language_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_content_language_add()") return cur.fetchone()[0] def content_language_get_from_list(self, ids, cur=None): yield from self._get_from_list( "content_language", ids, self.content_language_cols, cur=cur ) content_ctags_hash_keys = ["id", "indexer_configuration_id"] def content_ctags_missing_from_list(self, ctags, cur=None): """List missing ctags. """ yield from self._missing_from_list( "content_ctags", ctags, self.content_ctags_hash_keys, cur=cur ) content_ctags_cols = [ "id", "name", "kind", "line", "lang", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_content_ctags") def mktemp_content_ctags(self, cur=None): pass def content_ctags_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_content_ctags_add()") return cur.fetchone()[0] def content_ctags_get_from_list(self, ids, cur=None): cur = self._cursor(cur) keys = map(self._convert_key, self.content_ctags_cols) yield from execute_values_generator( cur, """ select %s from (values %%s) as t(id) inner join content_ctags c on c.id=t.id inner join indexer_configuration i on c.indexer_configuration_id=i.id order by line """ % ", ".join(keys), ((_id,) for _id in ids), ) def content_ctags_search(self, expression, last_sha1, limit, cur=None): cur = self._cursor(cur) if not last_sha1: query = """SELECT %s FROM swh_content_ctags_search(%%s, %%s)""" % ( ",".join(self.content_ctags_cols) ) cur.execute(query, (expression, limit)) else: if last_sha1 and isinstance(last_sha1, bytes): last_sha1 = "\\x%s" % hashutil.hash_to_hex(last_sha1) elif last_sha1: last_sha1 = "\\x%s" % last_sha1 query = """SELECT %s FROM swh_content_ctags_search(%%s, %%s, %%s)""" % ( ",".join(self.content_ctags_cols) ) cur.execute(query, (expression, limit, last_sha1)) yield from cur content_fossology_license_cols = [ "id", "tool_id", "tool_name", "tool_version", "tool_configuration", "license", ] @stored_procedure("swh_mktemp_content_fossology_license") def mktemp_content_fossology_license(self, cur=None): pass def content_fossology_license_add_from_temp(self, cur=None): """Add new licenses per content. """ cur = self._cursor(cur) cur.execute("select * from swh_content_fossology_license_add()") return cur.fetchone()[0] def content_fossology_license_get_from_list(self, ids, cur=None): """Retrieve licenses per id. """ cur = self._cursor(cur) keys = map(self._convert_key, self.content_fossology_license_cols) yield from execute_values_generator( cur, """ select %s from (values %%s) as t(id) inner join content_fossology_license c on t.id=c.id inner join indexer_configuration i on i.id=c.indexer_configuration_id """ % ", ".join(keys), ((_id,) for _id in ids), ) content_metadata_hash_keys = ["id", "indexer_configuration_id"] def content_metadata_missing_from_list(self, metadata, cur=None): """List missing metadata. """ yield from self._missing_from_list( "content_metadata", metadata, self.content_metadata_hash_keys, cur=cur ) content_metadata_cols = [ "id", "metadata", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_content_metadata") def mktemp_content_metadata(self, cur=None): pass def content_metadata_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_content_metadata_add()") return cur.fetchone()[0] def content_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( "content_metadata", ids, self.content_metadata_cols, cur=cur ) revision_intrinsic_metadata_hash_keys = ["id", "indexer_configuration_id"] def revision_intrinsic_metadata_missing_from_list(self, metadata, cur=None): """List missing metadata. """ yield from self._missing_from_list( "revision_intrinsic_metadata", metadata, self.revision_intrinsic_metadata_hash_keys, cur=cur, ) revision_intrinsic_metadata_cols = [ "id", "metadata", "mappings", "tool_id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_revision_intrinsic_metadata") def mktemp_revision_intrinsic_metadata(self, cur=None): pass def revision_intrinsic_metadata_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_revision_intrinsic_metadata_add()") return cur.fetchone()[0] def revision_intrinsic_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( "revision_intrinsic_metadata", ids, self.revision_intrinsic_metadata_cols, cur=cur, ) origin_intrinsic_metadata_cols = [ "id", "metadata", "from_revision", "mappings", "tool_id", "tool_name", "tool_version", "tool_configuration", ] origin_intrinsic_metadata_regconfig = "pg_catalog.simple" """The dictionary used to normalize 'metadata' and queries. 'pg_catalog.simple' provides no stopword, so it should be suitable for proper names and non-English content. When updating this value, make sure to add a new index on origin_intrinsic_metadata.metadata.""" @stored_procedure("swh_mktemp_origin_intrinsic_metadata") def mktemp_origin_intrinsic_metadata(self, cur=None): pass def origin_intrinsic_metadata_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("select * from swh_origin_intrinsic_metadata_add()") return cur.fetchone()[0] def origin_intrinsic_metadata_get_from_list(self, ids, cur=None): yield from self._get_from_list( "origin_intrinsic_metadata", ids, self.origin_intrinsic_metadata_cols, cur=cur, id_col="id", ) def origin_intrinsic_metadata_search_fulltext(self, terms, *, limit, cur): regconfig = self.origin_intrinsic_metadata_regconfig tsquery_template = " && ".join( "plainto_tsquery('%s', %%s)" % regconfig for _ in terms ) tsquery_args = [(term,) for term in terms] keys = ( self._convert_key(col, "oim") for col in self.origin_intrinsic_metadata_cols ) query = ( "SELECT {keys} FROM origin_intrinsic_metadata AS oim " "INNER JOIN indexer_configuration AS i " "ON oim.indexer_configuration_id=i.id " "JOIN LATERAL (SELECT {tsquery_template}) AS s(tsq) ON true " "WHERE oim.metadata_tsvector @@ tsq " "ORDER BY ts_rank(oim.metadata_tsvector, tsq, 1) DESC " "LIMIT %s;" ).format(keys=", ".join(keys), tsquery_template=tsquery_template) cur.execute(query, tsquery_args + [limit]) yield from cur def origin_intrinsic_metadata_search_by_producer( self, last, limit, ids_only, mappings, tool_ids, cur ): if ids_only: keys = "oim.id" else: keys = ", ".join( ( self._convert_key(col, "oim") for col in self.origin_intrinsic_metadata_cols ) ) query_parts = [ "SELECT %s" % keys, "FROM origin_intrinsic_metadata AS oim", "INNER JOIN indexer_configuration AS i", "ON oim.indexer_configuration_id=i.id", ] args = [] where = [] if last: where.append("oim.id > %s") args.append(last) if mappings is not None: where.append("oim.mappings && %s") - args.append(mappings) + args.append(list(mappings)) if tool_ids is not None: where.append("oim.indexer_configuration_id = ANY(%s)") - args.append(tool_ids) + args.append(list(tool_ids)) if where: query_parts.append("WHERE") query_parts.append(" AND ".join(where)) if limit: query_parts.append("LIMIT %s") args.append(limit) cur.execute(" ".join(query_parts), args) yield from cur indexer_configuration_cols = [ "id", "tool_name", "tool_version", "tool_configuration", ] @stored_procedure("swh_mktemp_indexer_configuration") def mktemp_indexer_configuration(self, cur=None): pass def indexer_configuration_add_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute( "SELECT %s from swh_indexer_configuration_add()" % (",".join(self.indexer_configuration_cols),) ) yield from cur def indexer_configuration_get( self, tool_name, tool_version, tool_configuration, cur=None ): cur = self._cursor(cur) cur.execute( """select %s from indexer_configuration where tool_name=%%s and tool_version=%%s and tool_configuration=%%s""" % (",".join(self.indexer_configuration_cols)), (tool_name, tool_version, tool_configuration), ) return cur.fetchone() def indexer_configuration_get_from_id(self, id_, cur=None): cur = self._cursor(cur) cur.execute( """select %s from indexer_configuration where id=%%s""" % (",".join(self.indexer_configuration_cols)), (id_,), ) return cur.fetchone() diff --git a/swh/indexer/tests/conftest.py b/swh/indexer/tests/conftest.py index e16588b..270812d 100644 --- a/swh/indexer/tests/conftest.py +++ b/swh/indexer/tests/conftest.py @@ -1,105 +1,127 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import os +from os import path from typing import List, Tuple from unittest.mock import patch import pytest import yaml +from swh.core.db.pytest_plugin import postgresql_fact +import swh.indexer from swh.indexer.storage import get_indexer_storage from swh.objstorage.factory import get_objstorage from swh.storage import get_storage from .utils import fill_obj_storage, fill_storage TASK_NAMES: List[Tuple[str, str]] = [ # (scheduler-task-type, task-class-test-name) ("index-revision-metadata", "revision_intrinsic_metadata"), ("index-origin-metadata", "origin_intrinsic_metadata"), ] +SQL_FILES = path.join(path.dirname(swh.indexer.__file__), "sql", "*.sql") + + +idx_storage_postgresql = postgresql_fact( + "postgresql_proc", db_name="indexer_storage", dump_files=SQL_FILES, +) + + @pytest.fixture def indexer_scheduler(swh_scheduler): # Insert the expected task types within the scheduler for task_name, task_class_name in TASK_NAMES: swh_scheduler.create_task_type( { "type": task_name, "description": f"The {task_class_name} indexer testing task", "backend_name": f"swh.indexer.tests.tasks.{task_class_name}", "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), "num_retries": 3, } ) return swh_scheduler @pytest.fixture -def idx_storage(): +def idx_storage_backend_config(idx_storage_postgresql): + """Basic pg storage configuration with no journal collaborator for the indexer + storage (to avoid pulling optional dependency on clients of this fixture) + + """ + return { + "cls": "local", + "db": idx_storage_postgresql.dsn, + } + + +@pytest.fixture +def swh_indexer_config( + swh_storage_backend_config, idx_storage_backend_config, swh_scheduler_config +): + return { + "storage": swh_storage_backend_config, + "objstorage": {"cls": "memory"}, + "indexer_storage": idx_storage_backend_config, + "scheduler": {"cls": "local", **swh_scheduler_config}, + "tools": { + "name": "file", + "version": "1:5.30-1+deb9u1", + "configuration": {"type": "library", "debian-package": "python3-magic"}, + }, + "compute_checksums": ["blake2b512"], # for rehash indexer + } + + +@pytest.fixture +def idx_storage(swh_indexer_config): """An instance of in-memory indexer storage that gets injected into all indexers classes. """ - idx_storage = get_indexer_storage("memory") - with patch("swh.indexer.storage.in_memory.IndexerStorage") as idx_storage_mock: - idx_storage_mock.return_value = idx_storage - yield idx_storage + idx_storage_config = swh_indexer_config["indexer_storage"] + return get_indexer_storage(**idx_storage_config) @pytest.fixture -def storage(): +def storage(swh_indexer_config): """An instance of in-memory storage that gets injected into all indexers classes. """ - storage = get_storage(cls="memory") + storage = get_storage(**swh_indexer_config["storage"]) fill_storage(storage) - with patch("swh.storage.in_memory.InMemoryStorage") as storage_mock: - storage_mock.return_value = storage - yield storage + return storage @pytest.fixture -def obj_storage(): +def obj_storage(swh_indexer_config): """An instance of in-memory objstorage that gets injected into all indexers classes. """ - objstorage = get_objstorage("memory") + objstorage = get_objstorage(**swh_indexer_config["objstorage"]) fill_obj_storage(objstorage) with patch.dict( "swh.objstorage.factory._STORAGE_CLASSES", {"memory": lambda: objstorage} ): yield objstorage -@pytest.fixture -def swh_indexer_config(): - return { - "storage": {"cls": "memory"}, - "objstorage": {"cls": "memory"}, - "indexer_storage": {"cls": "memory"}, - "tools": { - "name": "file", - "version": "1:5.30-1+deb9u1", - "configuration": {"type": "library", "debian-package": "python3-magic"}, - }, - "compute_checksums": ["blake2b512"], # for rehash indexer - } - - @pytest.fixture def swh_config(swh_indexer_config, monkeypatch, tmp_path): conffile = os.path.join(str(tmp_path), "indexer.yml") with open(conffile, "w") as f: f.write(yaml.dump(swh_indexer_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile) return conffile diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 3deca15..265fbea 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,397 +1,493 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from functools import reduce import re -import tempfile from typing import Any, Dict, List from unittest.mock import patch from click.testing import CliRunner -from confluent_kafka import Consumer, Producer +from confluent_kafka import Consumer import pytest from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) -from swh.journal.serializers import value_to_kafka +from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes - -CLI_CONFIG = """ -scheduler: - cls: foo - args: {} -storage: - cls: memory -indexer_storage: - cls: memory -""" +from swh.model.model import OriginVisitStatus def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ {"tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {},} for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, - from_revision=hash_to_bytes("abcd{:0>4}".format(origin_id)), + from_revision=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] revision_metadata = [ RevisionIntrinsicMetadataRow( - id=hash_to_bytes("abcd{:0>4}".format(origin_id)), + id=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.revision_intrinsic_metadata_add(revision_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) -def invoke(scheduler, catch_exceptions, args): - runner = CliRunner() - with patch( - "swh.scheduler.get_scheduler" - ) as get_scheduler_mock, tempfile.NamedTemporaryFile( - "a", suffix=".yml" - ) as config_fd: - config_fd.write(CLI_CONFIG) - config_fd.seek(0) - get_scheduler_mock.return_value = scheduler - result = runner.invoke(indexer_cli_group, ["-C" + config_fd.name] + args) - if not catch_exceptions and result.exception: - print(result.output) - raise result.exception - return result - - -def test_mapping_list(indexer_scheduler): - result = invoke(indexer_scheduler, False, ["mapping", "list",]) +@pytest.fixture +def cli_runner(): + return CliRunner() + + +def test_cli_mapping_list(cli_runner, swh_config): + result = cli_runner.invoke( + indexer_cli_group, + ["-C", swh_config, "mapping", "list"], + catch_exceptions=False, + ) expected_output = "\n".join( ["codemeta", "gemspec", "maven", "npm", "pkg-info", "",] ) assert result.exit_code == 0, result.output assert result.output == expected_output -def test_mapping_list_terms(indexer_scheduler): - result = invoke(indexer_scheduler, False, ["mapping", "list-terms",]) +def test_cli_mapping_list_terms(cli_runner, swh_config): + result = cli_runner.invoke( + indexer_cli_group, + ["-C", swh_config, "mapping", "list-terms"], + catch_exceptions=False, + ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) -def test_mapping_list_terms_exclude(indexer_scheduler): - result = invoke( - indexer_scheduler, - False, - ["mapping", "list-terms", "--exclude-mapping", "codemeta"], +def test_cli_mapping_list_terms_exclude(cli_runner, swh_config): + result = cli_runner.invoke( + indexer_cli_group, + ["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"], + catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) -def test_origin_metadata_reindex_empty_db(indexer_scheduler, idx_storage, storage): - result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",]) +def test_cli_origin_metadata_reindex_empty_db( + cli_runner, swh_config, indexer_scheduler, idx_storage, storage +): + result = cli_runner.invoke( + indexer_cli_group, + ["-C", swh_config, "schedule", "reindex_origin_metadata",], + catch_exceptions=False, + ) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) -def test_origin_metadata_reindex_divisor(indexer_scheduler, idx_storage, storage): +def test_cli_origin_metadata_reindex_divisor( + cli_runner, swh_config, indexer_scheduler, idx_storage, storage +): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) - result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",]) + result = cli_runner.invoke( + indexer_cli_group, + ["-C", swh_config, "schedule", "reindex_origin_metadata",], + catch_exceptions=False, + ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) -def test_origin_metadata_reindex_dry_run(indexer_scheduler, idx_storage, storage): +def test_cli_origin_metadata_reindex_dry_run( + cli_runner, swh_config, indexer_scheduler, idx_storage, storage +): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) - result = invoke( - indexer_scheduler, False, ["schedule", "--dry-run", "reindex_origin_metadata",] + result = cli_runner.invoke( + indexer_cli_group, + ["-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata",], + catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) -def test_origin_metadata_reindex_nondivisor(indexer_scheduler, idx_storage, storage): +def test_cli_origin_metadata_reindex_nondivisor( + cli_runner, swh_config, indexer_scheduler, idx_storage, storage +): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) - result = invoke( - indexer_scheduler, - False, - ["schedule", "reindex_origin_metadata", "--batch-size", "20",], + result = cli_runner.invoke( + indexer_cli_group, + [ + "-C", + swh_config, + "schedule", + "reindex_origin_metadata", + "--batch-size", + "20", + ], + catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) -def test_origin_metadata_reindex_filter_one_mapping( - indexer_scheduler, idx_storage, storage +def test_cli_origin_metadata_reindex_filter_one_mapping( + cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) - result = invoke( - indexer_scheduler, - False, - ["schedule", "reindex_origin_metadata", "--mapping", "mapping1",], + result = cli_runner.invoke( + indexer_cli_group, + [ + "-C", + swh_config, + "schedule", + "reindex_origin_metadata", + "--mapping", + "mapping1", + ], + catch_exceptions=False, ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) -def test_origin_metadata_reindex_filter_two_mappings( - indexer_scheduler, idx_storage, storage +def test_cli_origin_metadata_reindex_filter_two_mappings( + cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) - result = invoke( - indexer_scheduler, - False, + result = cli_runner.invoke( + indexer_cli_group, [ + "--config-file", + swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], + catch_exceptions=False, ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) -def test_origin_metadata_reindex_filter_one_tool( - indexer_scheduler, idx_storage, storage +def test_cli_origin_metadata_reindex_filter_one_tool( + cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) - result = invoke( - indexer_scheduler, - False, - ["schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]),], + result = cli_runner.invoke( + indexer_cli_group, + [ + "-C", + swh_config, + "schedule", + "reindex_origin_metadata", + "--tool-id", + str(tool_ids[0]), + ], + catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) -def test_journal_client( - storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer +def now(): + return datetime.datetime.now(tz=datetime.timezone.utc) + + +def test_cli_journal_client( + cli_runner, + swh_config, + indexer_scheduler, + kafka_prefix: str, + kafka_server, + consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "acks": "all", - } + journal_writer = get_journal_writer( + "kafka", + brokers=[kafka_server], + prefix=kafka_prefix, + client_id="test producer", + value_sanitizer=lambda object_type, value: value, + flush_timeout=3, # fail early if something is going wrong ) - STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}} - producer.produce( - topic=f"{kafka_prefix}.origin_visit_status", - key=b"bogus", - value=value_to_kafka(STATUS), - ) + visit_statuses = [ + OriginVisitStatus( + origin="file:///dev/zero", + visit=1, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/foobar", + visit=2, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///tmp/spamegg", + visit=3, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/0002", + visit=6, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'partial' status + origin="file:///dev/0000", + visit=4, + date=now(), + status="partial", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'ongoing' status + origin="file:///dev/0001", + visit=5, + date=now(), + status="ongoing", + snapshot=None, + ), + ] + + journal_writer.write_additions("origin_visit_status", visit_statuses) + visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] - result = invoke( - indexer_scheduler, - False, + result = cli_runner.invoke( + indexer_cli_group, [ + "-C", + swh_config, "journal-client", - "--stop-after-objects", - "1", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", + "--stop-after-objects", + len(visit_statuses), + "--origin-metadata-task-type", + "index-origin-metadata", ], + catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks - tasks = indexer_scheduler.search_tasks() - assert len(tasks) == 1 - _assert_tasks_for_origins(tasks, [0]) + tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") + + # This can be split into multiple tasks but no more than the origin-visit-statuses + # written in the journal + assert len(tasks) <= len(visit_statuses_full) + + actual_origins = [] + for task in tasks: + actual_task = dict(task) + assert actual_task["type"] == "index-origin-metadata" + scheduled_origins = actual_task["arguments"]["args"][0] + actual_origins.extend(scheduled_origins) + + assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} -def test_journal_client_without_brokers( - storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer +def test_cli_journal_client_without_brokers( + cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer ): """Without brokers configuration, the cli fails.""" with pytest.raises(ValueError, match="brokers"): - invoke( - indexer_scheduler, False, ["journal-client",], + cli_runner.invoke( + indexer_cli_group, + ["-C", swh_config, "journal-client",], + catch_exceptions=False, ) diff --git a/swh/indexer/tests/test_indexer.py b/swh/indexer/tests/test_indexer.py index 10e7155..6edaa91 100644 --- a/swh/indexer/tests/test_indexer.py +++ b/swh/indexer/tests/test_indexer.py @@ -1,106 +1,152 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional from unittest.mock import Mock import pytest from swh.indexer.indexer import ( ContentIndexer, ContentPartitionIndexer, OriginIndexer, RevisionIndexer, ) from swh.indexer.storage import PagedResult, Sha1 +from swh.model.model import Content from .utils import BASE_TEST_CONFIG class _TestException(Exception): pass class CrashingIndexerMixin: USE_TOOLS = False def index( self, id: Any, data: Optional[Any] = None, **kwargs ) -> List[Dict[str, Any]]: raise _TestException() def persist_index_computations(self, results) -> Dict[str, int]: return {} def indexed_contents_in_partition( - self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None - ) -> PagedResult[Sha1]: + self, partition_id: int, nb_partitions: int + ) -> Iterable[Sha1]: raise _TestException() class CrashingContentIndexer(CrashingIndexerMixin, ContentIndexer): pass class CrashingContentPartitionIndexer(CrashingIndexerMixin, ContentPartitionIndexer): pass class CrashingRevisionIndexer(CrashingIndexerMixin, RevisionIndexer): pass class CrashingOriginIndexer(CrashingIndexerMixin, OriginIndexer): pass +class TrivialContentPartitionIndexer(ContentPartitionIndexer[str]): + USE_TOOLS = False + + def index(self, id: bytes, data: Optional[bytes], **kwargs) -> List[str]: + return ["indexed " + id.decode()] + + def indexed_contents_in_partition( + self, partition_id: int, nb_partitions: int + ) -> Iterable[Sha1]: + return iter([b"excluded hash", b"other excluded hash"]) + + def persist_index_computations(self, results: List[str]) -> Dict[str, int]: + self._results.append(results) # type: ignore + return {"nb_added": len(results)} + + def test_content_indexer_catch_exceptions(): indexer = CrashingContentIndexer(config=BASE_TEST_CONFIG) indexer.objstorage = Mock() indexer.objstorage.get.return_value = b"content" assert indexer.run([b"foo"]) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run([b"foo"]) def test_revision_indexer_catch_exceptions(): indexer = CrashingRevisionIndexer(config=BASE_TEST_CONFIG) indexer.storage = Mock() indexer.storage.revision_get.return_value = ["rev"] assert indexer.run([b"foo"]) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run([b"foo"]) def test_origin_indexer_catch_exceptions(): indexer = CrashingOriginIndexer(config=BASE_TEST_CONFIG) assert indexer.run(["http://example.org"]) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run(["http://example.org"]) def test_content_partition_indexer_catch_exceptions(): indexer = CrashingContentPartitionIndexer( config={**BASE_TEST_CONFIG, "write_batch_size": 42} ) assert indexer.run(0, 42) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run(0, 42) + + +def test_content_partition_indexer(): + # TODO: simplify the mocking in this test + indexer = TrivialContentPartitionIndexer( + config={**BASE_TEST_CONFIG, "write_batch_size": 10,} # doesn't matter + ) + indexer.catch_exceptions = False + indexer._results = [] + indexer.storage = Mock() + indexer.storage.content_get_partition = lambda *args, **kwargs: PagedResult( + results=[ + Content(sha1=c, sha1_git=c, sha256=c, blake2s256=c, length=42) + for c in [ + b"hash1", + b"excluded hash", + b"hash2", + b"other excluded hash", + b"hash3", + ] + ], + next_page_token=None, + ) + indexer.objstorage = Mock() + indexer.objstorage.get = lambda id: b"foo" + nb_partitions = 1 + partition_id = 0 + indexer.run(partition_id, nb_partitions) + assert indexer._results == [["indexed hash1", "indexed hash2", "indexed hash3"]] diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py index 21e5e0b..b3e4140 100644 --- a/swh/indexer/tests/test_journal_client.py +++ b/swh/indexer/tests/test_journal_client.py @@ -1,132 +1,116 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Dict, List +from unittest.mock import patch -from unittest.mock import Mock, patch +import pytest from swh.indexer.journal_client import process_journal_objects +from swh.scheduler.interface import SchedulerInterface -def test_one_origin_visit_status(): - mock_scheduler = Mock() - messages = { - "origin_visit_status": [{"status": "full", "origin": "file:///dev/zero",},] - } - process_journal_objects( - messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"}, - ) - assert mock_scheduler.create_tasks.called is True - call_args = mock_scheduler.create_tasks.call_args - (args, kwargs) = call_args - assert kwargs == {} - del args[0][0]["next_run"] - assert args == ( - [ - { - "arguments": {"kwargs": {}, "args": (["file:///dev/zero"],),}, - "policy": "oneshot", - "type": "task-name", - "retries_left": 1, - }, - ], - ) +def search_tasks(indexer_scheduler: SchedulerInterface, task_type) -> List[Dict]: + tasks = indexer_scheduler.search_tasks(task_type=task_type) + keys_not_to_compare = ["next_run", "current_interval", "id", "priority", "status"] -def test_origin_visit_legacy(): - mock_scheduler = Mock() - messages = { - "origin_visit_status": [ - {"status": "full", "origin": {"url": "file:///dev/zero",}}, - ] - } + result_tasks = [] + for task in tasks: + task = dict(task) + + for key in keys_not_to_compare: + del task[key] + + result_tasks.append(task) + + return result_tasks + + +@pytest.mark.parametrize( + "origin", + [ + "file:///dev/zero", # current format + {"url": "file:///dev/zero",}, # legacy format + ], +) +def test_journal_client_origin_visit_status(origin, indexer_scheduler): + messages = {"origin_visit_status": [{"status": "full", "origin": origin},]} process_journal_objects( - messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"}, - ) - assert mock_scheduler.create_tasks.called is True - call_args = mock_scheduler.create_tasks.call_args - (args, kwargs) = call_args - assert kwargs == {} - del args[0][0]["next_run"] - assert args == ( - [ - { - "arguments": {"kwargs": {}, "args": (["file:///dev/zero"],),}, - "policy": "oneshot", - "type": "task-name", - "retries_left": 1, - }, - ], + messages, + scheduler=indexer_scheduler, + task_names={"origin_metadata": "index-origin-metadata"}, ) + actual_tasks = search_tasks(indexer_scheduler, task_type="index-origin-metadata") + assert actual_tasks == [ + { + "arguments": {"kwargs": {}, "args": [["file:///dev/zero"]],}, + "policy": "oneshot", + "type": "index-origin-metadata", + "retries_left": 1, + } + ] -def test_one_origin_visit_batch(): - mock_scheduler = Mock() + +def test_journal_client_one_origin_visit_batch(indexer_scheduler): messages = { "origin_visit_status": [ {"status": "full", "origin": "file:///dev/zero",}, {"status": "full", "origin": "file:///tmp/foobar",}, ] } process_journal_objects( - messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"}, + messages, + scheduler=indexer_scheduler, + task_names={"origin_metadata": "index-origin-metadata"}, ) - assert mock_scheduler.create_tasks.called is True - call_args = mock_scheduler.create_tasks.call_args - (args, kwargs) = call_args - assert kwargs == {} - del args[0][0]["next_run"] - assert args == ( - [ - { - "arguments": { - "kwargs": {}, - "args": (["file:///dev/zero", "file:///tmp/foobar"],), - }, - "policy": "oneshot", - "type": "task-name", - "retries_left": 1, + + actual_tasks = search_tasks(indexer_scheduler, task_type="index-origin-metadata") + assert actual_tasks == [ + { + "arguments": { + "kwargs": {}, + "args": [["file:///dev/zero", "file:///tmp/foobar"]], }, - ], - ) + "policy": "oneshot", + "type": "index-origin-metadata", + "retries_left": 1, + } + ] @patch("swh.indexer.journal_client.MAX_ORIGINS_PER_TASK", 2) -def test_origin_visit_batches(): - mock_scheduler = Mock() +def test_journal_client_origin_visit_batches(indexer_scheduler): messages = { "origin_visit_status": [ {"status": "full", "origin": "file:///dev/zero",}, {"status": "full", "origin": "file:///tmp/foobar",}, {"status": "full", "origin": "file:///tmp/spamegg",}, ] } process_journal_objects( - messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"}, + messages, + scheduler=indexer_scheduler, + task_names={"origin_metadata": "index-origin-metadata"}, ) - assert mock_scheduler.create_tasks.called is True - call_args = mock_scheduler.create_tasks.call_args - (args, kwargs) = call_args - assert kwargs == {} - del args[0][0]["next_run"] - del args[0][1]["next_run"] - assert args == ( - [ - { - "arguments": { - "kwargs": {}, - "args": (["file:///dev/zero", "file:///tmp/foobar"],), - }, - "policy": "oneshot", - "type": "task-name", - "retries_left": 1, + actual_tasks = search_tasks(indexer_scheduler, task_type="index-origin-metadata") + assert actual_tasks == [ + { + "arguments": { + "kwargs": {}, + "args": [["file:///dev/zero", "file:///tmp/foobar"],], }, - { - "arguments": {"kwargs": {}, "args": (["file:///tmp/spamegg"],),}, - "policy": "oneshot", - "type": "task-name", - "retries_left": 1, - }, - ], - ) + "policy": "oneshot", + "type": "index-origin-metadata", + "retries_left": 1, + }, + { + "arguments": {"kwargs": {}, "args": [["file:///tmp/spamegg"]],}, + "policy": "oneshot", + "type": "index-origin-metadata", + "retries_left": 1, + }, + ] diff --git a/swh/indexer/tests/test_origin_head.py b/swh/indexer/tests/test_origin_head.py index 3ad457c..67b5b05 100644 --- a/swh/indexer/tests/test_origin_head.py +++ b/swh/indexer/tests/test_origin_head.py @@ -1,190 +1,176 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy from datetime import datetime, timezone import unittest import pytest from swh.indexer.origin_head import OriginHeadIndexer from swh.indexer.tests.utils import fill_storage from swh.model.model import ( Origin, OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.utils import now @pytest.fixture def swh_indexer_config(swh_indexer_config): config = copy.deepcopy(swh_indexer_config) config.update( { "tools": { "name": "origin-metadata", "version": "0.0.1", "configuration": {}, }, "tasks": { "revision_intrinsic_metadata": None, "origin_intrinsic_metadata": None, }, } ) return config class OriginHeadTestIndexer(OriginHeadIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ def persist_index_computations(self, results): self.results = results +SAMPLE_SNAPSHOT = Snapshot( + branches={ + b"foo": None, + b"HEAD": SnapshotBranch(target_type=TargetType.ALIAS, target=b"foo",), + }, +) + + class OriginHead(unittest.TestCase): @pytest.fixture(autouse=True) def init(self, swh_config): super().setUp() self.indexer = OriginHeadTestIndexer() self.indexer.catch_exceptions = False fill_storage(self.indexer.storage) def test_git(self): origin_url = "https://github.com/SoftwareHeritage/swh-storage" self.indexer.run([origin_url]) rev_id = b"8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}\xac\xefrm" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}], ) def test_git_partial_snapshot(self): """Checks partial snapshots are ignored.""" origin_url = "https://github.com/SoftwareHeritage/swh-core" self.indexer.storage.origin_add([Origin(url=origin_url)]) visit = self.indexer.storage.origin_visit_add( [ OriginVisit( origin=origin_url, date=datetime(2019, 2, 27, tzinfo=timezone.utc), type="git", ) ] )[0] - self.indexer.storage.snapshot_add( - [ - Snapshot( - branches={ - b"foo": None, - b"HEAD": SnapshotBranch( - target_type=TargetType.ALIAS, target=b"foo", - ), - }, - ), - ] - ) + self.indexer.storage.snapshot_add([SAMPLE_SNAPSHOT]) visit_status = OriginVisitStatus( origin=origin_url, visit=visit.visit, date=now(), status="partial", - snapshot=b"foo", + snapshot=SAMPLE_SNAPSHOT.id, ) self.indexer.storage.origin_visit_status_add([visit_status]) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_vcs_missing_snapshot(self): origin_url = "https://github.com/SoftwareHeritage/swh-indexer" self.indexer.storage.origin_add([Origin(url=origin_url)]) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_pypi_missing_branch(self): origin_url = "https://pypi.org/project/abcdef/" self.indexer.storage.origin_add([Origin(url=origin_url,)]) visit = self.indexer.storage.origin_visit_add( [ OriginVisit( origin=origin_url, date=datetime(2019, 2, 27, tzinfo=timezone.utc), type="pypi", ) ] )[0] - self.indexer.storage.snapshot_add( - [ - Snapshot( - branches={ - b"foo": None, - b"HEAD": SnapshotBranch( - target_type=TargetType.ALIAS, target=b"foo", - ), - }, - ) - ] - ) + self.indexer.storage.snapshot_add([SAMPLE_SNAPSHOT]) visit_status = OriginVisitStatus( origin=origin_url, visit=visit.visit, date=now(), status="full", - snapshot=b"foo", + snapshot=SAMPLE_SNAPSHOT.id, ) self.indexer.storage.origin_visit_status_add([visit_status]) self.indexer.run(["https://pypi.org/project/abcdef/"]) self.assertEqual(self.indexer.results, []) def test_ftp(self): origin_url = "rsync://ftp.gnu.org/gnu/3dldf" self.indexer.run([origin_url]) rev_id = b"\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee\xcc\x1a\xb4`\x8c\x8by" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}], ) def test_ftp_missing_snapshot(self): origin_url = "rsync://ftp.gnu.org/gnu/foobar" self.indexer.storage.origin_add([Origin(url=origin_url)]) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_deposit(self): origin_url = "https://forge.softwareheritage.org/source/jesuisgpl/" self.indexer.storage.origin_add([Origin(url=origin_url)]) self.indexer.run([origin_url]) rev_id = b"\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{\xa6\xe9\x99\xb1\x9e]q\xeb" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}], ) def test_deposit_missing_snapshot(self): origin_url = "https://forge.softwareheritage.org/source/foobar" self.indexer.storage.origin_add([Origin(url=origin_url,)]) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_pypi(self): origin_url = "https://pypi.org/project/limnoria/" self.indexer.run([origin_url]) rev_id = b"\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8kA\x10\x9d\xc5\xfa2\xf8t" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url}], ) def test_svn(self): origin_url = "http://0-512-md.googlecode.com/svn/" self.indexer.run([origin_url]) rev_id = b"\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8\xc9\xad#.\x1bw=\x18" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}], ) diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py index 63ce2aa..a555e9c 100644 --- a/swh/indexer/tests/test_origin_metadata.py +++ b/swh/indexer/tests/test_origin_metadata.py @@ -1,220 +1,255 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy from unittest.mock import patch +import pytest + from swh.indexer.metadata import OriginMetadataIndexer from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) from swh.model.model import Origin from swh.storage.interface import StorageInterface -from .test_metadata import REVISION_METADATA_CONFIG +from .test_metadata import TRANSLATOR_TOOL from .utils import REVISION, YARN_PARSER_METADATA +@pytest.fixture +def swh_indexer_config(swh_indexer_config): + """Override the default configuration to override the tools entry + + """ + cfg = copy.deepcopy(swh_indexer_config) + cfg["tools"] = TRANSLATOR_TOOL + return cfg + + def test_origin_metadata_indexer( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: - - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) origin = "https://github.com/librariesio/yarn-parser" indexer.run([origin]) - tool = { - "name": "swh-metadata-translator", - "version": "0.0.2", - "configuration": {"context": "NpmMapping", "type": "local"}, - } + tool = swh_indexer_config["tools"] rev_id = REVISION.id rev_metadata = RevisionIntrinsicMetadataRow( id=rev_id, tool=tool, metadata=YARN_PARSER_METADATA, mappings=["npm"], ) origin_metadata = OriginIntrinsicMetadataRow( id=origin, tool=tool, from_revision=rev_id, metadata=YARN_PARSER_METADATA, mappings=["npm"], ) - rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id])) + rev_results = list(idx_storage.revision_intrinsic_metadata_get([rev_id])) for rev_result in rev_results: assert rev_result.tool del rev_result.tool["id"] assert rev_results == [rev_metadata] - orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin])) + orig_results = list(idx_storage.origin_intrinsic_metadata_get([origin])) for orig_result in orig_results: assert orig_result.tool del orig_result.tool["id"] assert orig_results == [origin_metadata] def test_origin_metadata_indexer_duplicate_origin( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) indexer.storage = storage indexer.idx_storage = idx_storage indexer.run(["https://github.com/librariesio/yarn-parser"]) indexer.run(["https://github.com/librariesio/yarn-parser"] * 2) origin = "https://github.com/librariesio/yarn-parser" rev_id = REVISION.id rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id])) assert len(rev_results) == 1 orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin])) assert len(orig_results) == 1 def test_origin_metadata_indexer_missing_head( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: storage.origin_add([Origin(url="https://example.com")]) - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) indexer.run(["https://example.com"]) origin = "https://example.com" results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin])) assert results == [] def test_origin_metadata_indexer_partial_missing_head( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: origin1 = "https://example.com" origin2 = "https://github.com/librariesio/yarn-parser" storage.origin_add([Origin(url=origin1)]) - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) indexer.run([origin1, origin2]) rev_id = REVISION.id rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id])) assert rev_results == [ RevisionIntrinsicMetadataRow( id=rev_id, metadata=YARN_PARSER_METADATA, mappings=["npm"], tool=rev_results[0].tool, ) ] orig_results = list( indexer.idx_storage.origin_intrinsic_metadata_get([origin1, origin2]) ) for orig_result in orig_results: assert orig_results == [ OriginIntrinsicMetadataRow( id=origin2, from_revision=rev_id, metadata=YARN_PARSER_METADATA, mappings=["npm"], tool=orig_results[0].tool, ) ] def test_origin_metadata_indexer_duplicate_revision( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) indexer.storage = storage indexer.idx_storage = idx_storage indexer.catch_exceptions = False origin1 = "https://github.com/librariesio/yarn-parser" origin2 = "https://github.com/librariesio/yarn-parser.git" indexer.run([origin1, origin2]) rev_id = REVISION.id rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id])) assert len(rev_results) == 1 orig_results = list( indexer.idx_storage.origin_intrinsic_metadata_get([origin1, origin2]) ) assert len(orig_results) == 2 def test_origin_metadata_indexer_no_metadata_file( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) origin = "https://github.com/librariesio/yarn-parser" with patch("swh.indexer.metadata_dictionary.npm.NpmMapping.filename", b"foo.json"): indexer.run([origin]) rev_id = REVISION.id rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id])) assert rev_results == [] orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin])) assert orig_results == [] def test_origin_metadata_indexer_no_metadata( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) origin = "https://github.com/librariesio/yarn-parser" with patch( "swh.indexer.metadata.RevisionMetadataIndexer" ".translate_revision_intrinsic_metadata", return_value=(["npm"], {"@context": "foo"}), ): indexer.run([origin]) rev_id = REVISION.id rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id])) assert rev_results == [] orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin])) assert orig_results == [] def test_origin_metadata_indexer_error( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) origin = "https://github.com/librariesio/yarn-parser" with patch( "swh.indexer.metadata.RevisionMetadataIndexer" ".translate_revision_intrinsic_metadata", return_value=None, ): indexer.run([origin]) rev_id = REVISION.id rev_results = list(indexer.idx_storage.revision_intrinsic_metadata_get([rev_id])) assert rev_results == [] orig_results = list(indexer.idx_storage.origin_intrinsic_metadata_get([origin])) assert orig_results == [] def test_origin_metadata_indexer_unknown_origin( - idx_storage: IndexerStorageInterface, storage: StorageInterface, obj_storage + swh_indexer_config, + idx_storage: IndexerStorageInterface, + storage: StorageInterface, + obj_storage, ) -> None: - indexer = OriginMetadataIndexer(config=REVISION_METADATA_CONFIG) + indexer = OriginMetadataIndexer(config=swh_indexer_config) result = indexer.index_list(["https://unknown.org/foo"]) assert not result