diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 885efa8..e576dab 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,710 +1,711 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from contextlib import contextmanager import logging import os import shutil import tempfile from typing import ( Any, Dict, Generic, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar, Union, ) import warnings import sentry_sdk from typing_extensions import TypedDict from swh.core import utils from swh.core.config import load_from_envvar, merge_configs from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage from swh.indexer.storage.interface import IndexerStorageInterface from swh.model import hashutil from swh.model.model import Directory, Origin, Sha1Git from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.scheduler import CONFIG as SWH_CONFIG from swh.storage import get_storage from swh.storage.interface import StorageInterface class ObjectsDict(TypedDict, total=False): - """Typed objects.""" + """Typed objects whose keys are names of Kafka topics and values are list of values + of messages in that topic.""" content: List[Dict] directory: List[Dict] origin: List[Dict] origin_visit_status: List[Dict] raw_extrinsic_metadata: List[Dict] @contextmanager def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. Args: filename: one of sha1's many filenames data: the sha1's content to write in temporary file working_directory: the directory into which the file is written Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, "wb") as f: f.write(data) yield content_path shutil.rmtree(temp_dir) DEFAULT_CONFIG = { INDEXER_CFG_KEY: {"cls": "memory"}, "storage": {"cls": "memory"}, "objstorage": {"cls": "memory"}, } TId = TypeVar("TId") """type of the ids of index()ed objects.""" TData = TypeVar("TData") """type of the objects passed to index().""" TResult = TypeVar("TResult") """return type of index()""" class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :meth:`~BaseIndexer.run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`DirectoryIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :meth:`~BaseIndexer.filter`: filter out data already indexed (in storage). :meth:`~BaseIndexer.index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :meth:`~BaseIndexer.persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :meth:`~BaseIndexer.prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :meth:`~BaseIndexer.check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :meth:`~BaseIndexer.register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ results: List[TResult] USE_TOOLS = True catch_exceptions = True """Prevents exceptions in `index()` from raising too high. Set to False in tests to properly catch all exceptions.""" scheduler: Any storage: StorageInterface objstorage: Any idx_storage: IndexerStorageInterface def __init__(self, config=None, **kw) -> None: """Prepare and check that the indexer is ready to run.""" super().__init__() if config is not None: self.config = config elif SWH_CONFIG: self.config = SWH_CONFIG.copy() else: self.config = load_from_envvar() self.config = merge_configs(DEFAULT_CONFIG, self.config) self.prepare() self.check() self.log.debug("%s: config=%s", self, self.config) def prepare(self) -> None: """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ config_storage = self.config.get("storage") if config_storage: self.storage = get_storage(**config_storage) self.objstorage = get_objstorage(**self.config["objstorage"]) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) self.log = logging.getLogger("swh.indexer") if self.USE_TOOLS: self.tools = list(self.register_tools(self.config.get("tools", []))) self.results = [] @property def tool(self) -> Dict: return self.tools[0] def check(self) -> None: """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if self.USE_TOOLS and not self.tools: raise ValueError("Tools %s is unknown, cannot continue" % self.tools) def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]: """Prepare the tool dict to be compliant with the storage api.""" return {"tool_%s" % key: value for key, value in tool.items()} def register_tools( self, tools: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> List[Dict[str, Any]]: """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools: Either a dict or a list of dict. Returns: list: List of dicts with additional id key. Raises: ValueError: if not a list nor a dict. """ if isinstance(tools, list): tools = list(map(self._prepare_tool, tools)) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError("Configuration tool(s) must be a dict or list!") if tools: return self.idx_storage.indexer_configuration_add(tools) else: return [] def index(self, id: TId, data: Optional[TData], **kwargs) -> List[TResult]: """Index computation for the id and associated raw data. Args: id: identifier or Dict object data: id's data from storage or objstorage depending on object type Returns: dict: a dict that makes sense for the :meth:`.persist_index_computations` method. """ raise NotImplementedError() def filter(self, ids: List[TId]) -> Iterator[TId]: """Filter missing ids for that particular indexer. Args: ids: list of ids Yields: iterator of missing ids """ yield from ids @abc.abstractmethod def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]: """Persist the computation resulting from the index. Args: results: List of results. One result is the result of the index function. Returns: a summary dict of what has been inserted in the storage """ return {} def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Read swh message objects (content, origin, ...) from the journal to: - retrieve the associated objects from the storage backend (e.g. storage, objstorage...) - execute the associated indexing computations - store the results in the indexer storage """ raise NotImplementedError() class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content indexer working on the journal (method `process_journal_objects`) or on a list of ids directly (method `run`). To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Read content objects from the journal, retrieve their raw content and compute content indexing (e.g. mimetype, fossology license, ...). Note that once this is deployed, this supersedes the main ContentIndexer.run method call and the class ContentPartitionIndexer. """ summary: Dict[str, Any] = {"status": "uneventful"} try: results = [] contents = objects.get("content", []) # FIXME: with swh.objstorage > v2.0: self.objstorage.get_batch(contents) content_data = self.objstorage.get_batch(c["sha1"] for c in contents) for item, raw_content in zip(contents, content_data): id_ = item["sha1"] if not raw_content: self.log.warning( "Content %s not found in objstorage", hashutil.hash_to_hex(id_) ) continue results.extend(self.index(id_, data=raw_content)) except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) self.results = results if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary def run(self, ids: List[Sha1], **kwargs) -> Dict: """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results Args: ids (Iterable[Union[bytes, str]]): sha1's identifier list **kwargs: passed to the `index` method Returns: A summary Dict of the task's status """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] sha1s = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] results = [] summary: Dict = {"status": "uneventful"} try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning( "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1) ) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.extend(res) summary["status"] = "eventful" summary = self.persist_index_computations(results) self.results = results except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when reading contents metadata.") sentry_sdk.capture_exception() summary["status"] = "failed" return summary class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content partition indexer. This expects as input a partition_id and a nb_partitions. This will then index the contents within that partition. To work on a list of ids, use the :class:`ContentIndexer` instead. Note: :class:`ContentPartitionIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ @abc.abstractmethod def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int ) -> Iterable[Sha1]: """Retrieve indexed contents within range [start, end]. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination """ pass def _list_contents_to_index( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] ) -> Iterable[Sha1]: """Compute from storage the new contents to index in the partition_id . The already indexed contents are skipped. Args: partition_id: Index of the partition to fetch data from nb_partitions: Total number of partition indexed: Set of content already indexed. Yields: Sha1 id (bytes) of contents to index """ if not isinstance(partition_id, int) or not isinstance(nb_partitions, int): raise TypeError( f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}." ) next_page_token = None while True: result = self.storage.content_get_partition( partition_id, nb_partitions, page_token=next_page_token ) contents = result.results for c in contents: _id = hashutil.hash_to_bytes(c.sha1) if _id in indexed: continue yield _id next_page_token = result.next_page_token if next_page_token is None: break def _index_contents( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any ) -> Iterator[TResult]: """Index the contents within the partition_id. Args: start: Starting bound from range identifier end: End range identifier indexed: Set of content already indexed. Yields: indexing result as dict to persist in the indexer backend """ for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning(f"Content {sha1.hex()} not found in objstorage") continue yield from self.index(sha1, raw_content, **kwargs) def _index_with_skipping_already_done( self, partition_id: int, nb_partitions: int ) -> Iterator[TResult]: """Index not already indexed contents within the partition partition_id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into Yields: indexing result as dict to persist in the indexer backend """ already_indexed_contents = set( self.indexed_contents_in_partition(partition_id, nb_partitions) ) return self._index_contents( partition_id, nb_partitions, already_indexed_contents ) def run( self, partition_id: int, nb_partitions: int, skip_existing: bool = True, **kwargs, ) -> Dict: """Given a partition of content ids, index the contents within. Either the indexer is incremental (filter out existing computed data) or it computes everything from scratch. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into skip_existing: Skip existing indexed data (default) or not **kwargs: passed to the `index` method Returns: dict with the indexing task status """ summary: Dict[str, Any] = {"status": "uneventful"} count = 0 try: if skip_existing: gen = self._index_with_skipping_already_done( partition_id, nb_partitions ) else: gen = self._index_contents(partition_id, nb_partitions, indexed=set([])) count_object_added_key: Optional[str] = None for contents in utils.grouper(gen, n=self.config["write_batch_size"]): res = self.persist_index_computations(list(contents)) if not count_object_added_key: count_object_added_key = list(res.keys())[0] count += res[count_object_added_key] if count > 0: summary["status"] = "eventful" except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when computing metadata.") sentry_sdk.capture_exception() summary["status"] = "failed" if count > 0 and count_object_added_key: summary[count_object_added_key] = count return summary class OriginIndexer(BaseIndexer[str, None, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, origin_urls: List[str], **kwargs) -> Dict: """Given a list of origin urls: - retrieve origins from storage - execute the indexing computations - store the results Args: origin_urls: list of origin urls. **kwargs: passed to the `index` method """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] origins = [{"url": url} for url in origin_urls] return self.process_journal_objects({"origin": origins}) def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Worker function for ``JournalClient``.""" origins = [ Origin(url=status["origin"]) for status in objects.get("origin_visit_status", []) if status["status"] == "full" ] + [Origin(url=origin["url"]) for origin in objects.get("origin", [])] summary: Dict[str, Any] = {"status": "uneventful"} try: results = self.index_list( origins, check_origin_known=False, # no need to check they exist, as we just received either an origin or # visit status; which cannot be created by swh-storage unless the origin # already exists ) except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) self.results = results if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: results = [] for origin in origins: try: results.extend(self.index(origin.url, **kwargs)) except Exception: self.log.exception("Problem when processing origin %s", origin.url) sentry_sdk.capture_exception() raise return results class DirectoryIndexer(BaseIndexer[Sha1Git, Directory, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Directory indexing using the run method Note: the :class:`DirectoryIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1Git], **kwargs) -> Dict: """Given a list of sha1_gits: - retrieve directories from storage - execute the indexing computations - store the results Args: ids: sha1_git's identifier list """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] directory_ids = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] return self._process_directories([(dir_id, None) for dir_id in directory_ids]) def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Worker function for ``JournalClient``.""" return self._process_directories( [ (dir_["id"], Directory.from_dict(dir_)) for dir_ in objects.get("directory", []) ] ) def _process_directories( self, directories: Union[List[Tuple[Sha1Git, Directory]], List[Tuple[Sha1Git, None]]], ) -> Dict: summary: Dict[str, Any] = {"status": "uneventful"} results = [] # TODO: fetch raw_manifest when useful? for (dir_id, dir_) in directories: try: results.extend(self.index(dir_id, dir_)) except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when processing directory") sentry_sdk.capture_exception() summary["status"] = "failed" summary_persist = self.persist_index_computations(results) if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) self.results = results return summary diff --git a/swh/indexer/tests/conftest.py b/swh/indexer/tests/conftest.py index bcf0af1..29a8de7 100644 --- a/swh/indexer/tests/conftest.py +++ b/swh/indexer/tests/conftest.py @@ -1,130 +1,132 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta from functools import partial import os from typing import List, Tuple from unittest.mock import patch import pytest from pytest_postgresql import factories import yaml from swh.core.db.pytest_plugin import initialize_database_for_module from swh.indexer.storage import IndexerStorage, get_indexer_storage from swh.objstorage.factory import get_objstorage from swh.storage import get_storage from .utils import fill_obj_storage, fill_storage TASK_NAMES: List[Tuple[str, str]] = [ # (scheduler-task-type, task-class-test-name) ("index-directory-metadata", "directory_intrinsic_metadata"), ("index-origin-metadata", "origin_intrinsic_metadata"), ] idx_postgresql_proc = factories.postgresql_proc( load=[ partial( initialize_database_for_module, modname="indexer", version=IndexerStorage.current_version, ) ], ) idx_storage_postgresql = factories.postgresql("idx_postgresql_proc") @pytest.fixture def indexer_scheduler(swh_scheduler): # Insert the expected task types within the scheduler for task_name, task_class_name in TASK_NAMES: swh_scheduler.create_task_type( { "type": task_name, "description": f"The {task_class_name} indexer testing task", "backend_name": f"swh.indexer.tests.tasks.{task_class_name}", "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), "num_retries": 3, } ) return swh_scheduler @pytest.fixture def idx_storage_backend_config(idx_storage_postgresql): """Basic pg storage configuration with no journal collaborator for the indexer storage (to avoid pulling optional dependency on clients of this fixture) """ return { "cls": "local", "db": idx_storage_postgresql.dsn, } @pytest.fixture def swh_indexer_config( - swh_storage_backend_config, idx_storage_backend_config, swh_scheduler_config + swh_storage_backend_config, + idx_storage_backend_config, + swh_scheduler_config, ): return { "storage": swh_storage_backend_config, "objstorage": {"cls": "memory"}, "indexer_storage": idx_storage_backend_config, "scheduler": {"cls": "local", **swh_scheduler_config}, "tools": { "name": "file", "version": "1:5.30-1+deb9u1", "configuration": {"type": "library", "debian-package": "python3-magic"}, }, "compute_checksums": ["blake2b512"], # for rehash indexer } @pytest.fixture def idx_storage(swh_indexer_config): """An instance of in-memory indexer storage that gets injected into all indexers classes. """ idx_storage_config = swh_indexer_config["indexer_storage"] return get_indexer_storage(**idx_storage_config) @pytest.fixture def storage(swh_indexer_config): """An instance of in-memory storage that gets injected into all indexers classes. """ storage = get_storage(**swh_indexer_config["storage"]) fill_storage(storage) return storage @pytest.fixture def obj_storage(swh_indexer_config): """An instance of in-memory objstorage that gets injected into all indexers classes. """ objstorage = get_objstorage(**swh_indexer_config["objstorage"]) fill_obj_storage(objstorage) with patch("swh.indexer.indexer.get_objstorage", return_value=objstorage): yield objstorage @pytest.fixture def swh_config(swh_indexer_config, monkeypatch, tmp_path): conffile = os.path.join(str(tmp_path), "indexer.yml") with open(conffile, "w") as f: f.write(yaml.dump(swh_indexer_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile) return conffile diff --git a/swh/indexer/tests/metadata_dictionary/test_npm.py b/swh/indexer/tests/metadata_dictionary/test_npm.py index 2f7d7cf..781e995 100644 --- a/swh/indexer/tests/metadata_dictionary/test_npm.py +++ b/swh/indexer/tests/metadata_dictionary/test_npm.py @@ -1,322 +1,318 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from hypothesis import HealthCheck, given, settings import pytest from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.storage.model import ContentMetadataRow -from swh.model.hashutil import hash_to_bytes from ..test_metadata import TRANSLATOR_TOOL, ContentMetadataTestIndexer from ..utils import ( BASE_TEST_CONFIG, - fill_obj_storage, - fill_storage, + MAPPING_DESCRIPTION_CONTENT_SHA1, json_document_strategy, ) def test_compute_metadata_none(): """ testing content empty content is empty should return None """ content = b"" # None if no metadata was found or an error occurred declared_metadata = None result = MAPPINGS["NpmMapping"]().translate(content) assert declared_metadata == result def test_compute_metadata_npm(): """ testing only computation of metadata with hard_mapping_npm """ content = b""" { "name": "test_metadata", "version": "0.0.2", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" }, "author": { "email": "moranegg@example.com", "name": "Morane G" } } """ declared_metadata = { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "name": "test_metadata", "version": "0.0.2", "description": "Simple package.json test for indexer", "codeRepository": "git+https://github.com/moranegg/metadata_test", "author": [ { "type": "Person", "name": "Morane G", "email": "moranegg@example.com", } ], } result = MAPPINGS["NpmMapping"]().translate(content) assert declared_metadata == result def test_compute_metadata_invalid_description_npm(): """ testing only computation of metadata with hard_mapping_npm """ content = b""" { "name": "test_metadata", "version": "0.0.2", "description": 1234 } """ declared_metadata = { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "name": "test_metadata", "version": "0.0.2", } result = MAPPINGS["NpmMapping"]().translate(content) assert declared_metadata == result -def test_index_content_metadata_npm(): +def test_index_content_metadata_npm(storage, obj_storage): """ testing NPM with package.json - one sha1 uses a file that can't be translated to metadata and should return None in the translated metadata """ sha1s = [ - hash_to_bytes("26a9f72a7c87cc9205725cfd879f514ff4f3d8d5"), - hash_to_bytes("d4c647f0fc257591cc9ba1722484229780d1c607"), - hash_to_bytes("02fb2c89e14f7fab46701478c83779c7beb7b069"), + MAPPING_DESCRIPTION_CONTENT_SHA1["json:test-metadata-package.json"], + MAPPING_DESCRIPTION_CONTENT_SHA1["json:npm-package.json"], + MAPPING_DESCRIPTION_CONTENT_SHA1["python:code"], ] + # this metadata indexer computes only metadata for package.json # in npm context with a hard mapping config = BASE_TEST_CONFIG.copy() config["tools"] = [TRANSLATOR_TOOL] metadata_indexer = ContentMetadataTestIndexer(config=config) - fill_obj_storage(metadata_indexer.objstorage) - fill_storage(metadata_indexer.storage) - - metadata_indexer.run(sha1s) + metadata_indexer.run(sha1s, log_suffix="unknown content") results = list(metadata_indexer.idx_storage.content_metadata_get(sha1s)) expected_results = [ ContentMetadataRow( - id=hash_to_bytes("26a9f72a7c87cc9205725cfd879f514ff4f3d8d5"), + id=sha1s[0], tool=TRANSLATOR_TOOL, metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "codeRepository": "git+https://github.com/moranegg/metadata_test", "description": "Simple package.json test for indexer", "name": "test_metadata", "version": "0.0.1", }, ), ContentMetadataRow( - id=hash_to_bytes("d4c647f0fc257591cc9ba1722484229780d1c607"), + id=sha1s[1], tool=TRANSLATOR_TOOL, metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "issueTracker": "https://github.com/npm/npm/issues", "author": [ { "type": "Person", "name": "Isaac Z. Schlueter", "email": "i@izs.me", "url": "http://blog.izs.me", } ], "codeRepository": "git+https://github.com/npm/npm", "description": "a package manager for JavaScript", "license": "https://spdx.org/licenses/Artistic-2.0", "version": "5.0.3", "name": "npm", "keywords": [ "install", "modules", "package manager", "package.json", ], "url": "https://docs.npmjs.com/", }, ), ] for result in results: del result.tool["id"] # The assertion below returns False sometimes because of nested lists assert expected_results == results def test_npm_bugs_normalization(): # valid dictionary package_json = b"""{ "name": "foo", "bugs": { "url": "https://github.com/owner/project/issues", "email": "foo@example.com" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "issueTracker": "https://github.com/owner/project/issues", "type": "SoftwareSourceCode", } # "invalid" dictionary package_json = b"""{ "name": "foo", "bugs": { "email": "foo@example.com" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "type": "SoftwareSourceCode", } # string package_json = b"""{ "name": "foo", "bugs": "https://github.com/owner/project/issues" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "issueTracker": "https://github.com/owner/project/issues", "type": "SoftwareSourceCode", } def test_npm_repository_normalization(): # normal package_json = b"""{ "name": "foo", "repository": { "type" : "git", "url" : "https://github.com/npm/cli.git" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "codeRepository": "git+https://github.com/npm/cli.git", "type": "SoftwareSourceCode", } # missing url package_json = b"""{ "name": "foo", "repository": { "type" : "git" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "type": "SoftwareSourceCode", } # github shortcut package_json = b"""{ "name": "foo", "repository": "github:npm/cli" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) expected_result = { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "codeRepository": "git+https://github.com/npm/cli.git", "type": "SoftwareSourceCode", } assert result == expected_result # github shortshortcut package_json = b"""{ "name": "foo", "repository": "npm/cli" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == expected_result # gitlab shortcut package_json = b"""{ "name": "foo", "repository": "gitlab:user/repo" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "codeRepository": "git+https://gitlab.com/user/repo.git", "type": "SoftwareSourceCode", } @settings(suppress_health_check=[HealthCheck.too_slow]) @given(json_document_strategy(keys=list(MAPPINGS["NpmMapping"].mapping))) # type: ignore def test_npm_adversarial(doc): raw = json.dumps(doc).encode() MAPPINGS["NpmMapping"]().translate(raw) @pytest.mark.parametrize( "filename", [b"package.json", b"Package.json", b"PACKAGE.json", b"PACKAGE.JSON"] ) def test_detect_metadata_package_json(filename): df = [ { "sha1_git": b"abc", "name": b"index.js", "target": b"abc", "length": 897, "status": "visible", "type": "file", "perms": 33188, "dir_id": b"dir_a", "sha1": b"bcd", }, { "sha1_git": b"aab", "name": filename, "target": b"aab", "length": 712, "status": "visible", "type": "file", "perms": 33188, "dir_id": b"dir_a", "sha1": b"cde", }, ] results = detect_metadata(df) expected_results = {"NpmMapping": [b"cde"]} assert expected_results == results diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 93c626a..fb78e88 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,733 +1,819 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import reduce import re from typing import Any, Dict, List from unittest.mock import patch import attr from click.testing import CliRunner from confluent_kafka import Consumer import pytest from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( + ContentMimetypeRow, DirectoryIntrinsicMetadataRow, OriginExtrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes -from swh.model.model import Origin, OriginVisitStatus +from swh.model.model import Content, Origin, OriginVisitStatus from .test_metadata import REMD -from .utils import DIRECTORY2, REVISION +from .utils import DIRECTORY2, RAW_CONTENTS, REVISION def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ { "tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, from_directory=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] directory_metadata = [ DirectoryIntrinsicMetadataRow( id=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.directory_intrinsic_metadata_add(directory_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) @pytest.fixture def cli_runner(): return CliRunner() def test_cli_mapping_list(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list"], catch_exceptions=False, ) expected_output = "\n".join( [ "cff", "codemeta", "composer", "gemspec", "github", "maven", "npm", "pkg-info", "pubspec", "", ] # must be sorted for test to pass ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_cli_mapping_list_terms(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_cli_mapping_list_terms_exclude(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_empty_db( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_divisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_dry_run( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_nondivisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--batch-size", "20", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_mapping( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_two_mappings( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "--config-file", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_tool( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]), ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def now(): return datetime.datetime.now(tz=datetime.timezone.utc) def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), "--origin-metadata-task-type", "index-origin-metadata", ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") # This can be split into multiple tasks but no more than the origin-visit-statuses # written in the journal assert len(tasks) <= len(visit_statuses_full) actual_origins = [] for task in tasks: actual_task = dict(task) assert actual_task["type"] == "index-origin-metadata" scheduled_origins = actual_task["arguments"]["args"][0] actual_origins.extend(scheduled_origins) assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} def test_cli_journal_client_without_brokers( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer ): """Without brokers configuration, the cli fails.""" with pytest.raises(ValueError, match="brokers"): cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", ], catch_exceptions=False, ) @pytest.mark.parametrize("indexer_name", ["origin-intrinsic-metadata", "*"]) def test_cli_journal_client_index__origin_intrinsic_metadata( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] storage.revision_add([REVISION]) mocker.patch( "swh.indexer.metadata.get_head_swhid", return_value=REVISION.swhid(), ) mocker.patch( "swh.indexer.metadata.DirectoryMetadataIndexer.index", return_value=[ DirectoryIntrinsicMetadataRow( id=DIRECTORY2.id, indexer_configuration_id=1, mappings=["cff"], metadata={"foo": "bar"}, ) ], ) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_intrinsic_metadata_get( [status.origin for status in visit_statuses] ) expected_results = [ OriginIntrinsicMetadataRow( id=status.origin, from_directory=DIRECTORY2.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["cff"], metadata={"foo": "bar"}, ) for status in sorted(visit_statuses_full, key=lambda r: r.origin) ] assert sorted(results, key=lambda r: r.id) == expected_results @pytest.mark.parametrize("indexer_name", ["extrinsic-metadata", "*"]) def test_cli_journal_client_index__origin_extrinsic_metadata( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) origin = Origin("http://example.org/repo.git") storage.origin_add([origin]) raw_extrinsic_metadata = attr.evolve(REMD, target=origin.swhid()) raw_extrinsic_metadata = attr.evolve( raw_extrinsic_metadata, id=raw_extrinsic_metadata.compute_hash() ) journal_writer.write_additions("raw_extrinsic_metadata", [raw_extrinsic_metadata]) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", 1, ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_extrinsic_metadata_get([origin.url]) expected_results = [ OriginExtrinsicMetadataRow( id=origin.url, from_remd_id=raw_extrinsic_metadata.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["github"], metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "https://forgefed.org/ns#Repository", "name": "test software", }, ) ] assert sorted(results, key=lambda r: r.id) == expected_results + + +def test_cli_journal_client_index__content_mimetype( + cli_runner, + swh_config, + kafka_prefix: str, + kafka_server, + consumer: Consumer, + idx_storage, + obj_storage, + storage, + mocker, + swh_indexer_config, +): + """Test the 'swh indexer journal-client' cli tool.""" + journal_writer = get_journal_writer( + "kafka", + brokers=[kafka_server], + prefix=kafka_prefix, + client_id="test producer", + value_sanitizer=lambda object_type, value: value, + flush_timeout=3, # fail early if something is going wrong + ) + + contents = [] + expected_results = [] + content_ids = [] + for content_id, (raw_content, mimetypes, encoding) in RAW_CONTENTS.items(): + content = Content.from_data(raw_content) + assert content_id == content.sha1 + + contents.append(content) + content_ids.append(content_id) + + # Older libmagic versions (e.g. buster: 1:5.35-4+deb10u2, bullseye: 1:5.39-3) + # returns different results. This allows to deal with such a case when executing + # tests on different environments machines (e.g. ci tox, ci debian, dev machine, + # ...) + all_mimetypes = mimetypes if isinstance(mimetypes, tuple) else [mimetypes] + + expected_results.extend( + [ + ContentMimetypeRow( + id=content.sha1, + tool={"id": 1, **swh_indexer_config["tools"]}, + mimetype=mimetype, + encoding=encoding, + ) + for mimetype in all_mimetypes + ] + ) + + assert len(contents) == len(RAW_CONTENTS) + + storage.content_add(contents) + journal_writer.write_additions("content", contents) + + result = cli_runner.invoke( + indexer_cli_group, + [ + "-C", + swh_config, + "journal-client", + "content-mimetype", + "--broker", + kafka_server, + "--prefix", + kafka_prefix, + "--group-id", + "test-consumer", + "--stop-after-objects", + len(contents), + ], + catch_exceptions=False, + ) + + # Check the output + expected_output = "Done.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + results = idx_storage.content_mimetype_get(content_ids) + assert len(results) > 0 + for result in results: + assert result in expected_results diff --git a/swh/indexer/tests/test_ctags.py b/swh/indexer/tests/test_ctags.py index 720d8c4..02626ea 100644 --- a/swh/indexer/tests/test_ctags.py +++ b/swh/indexer/tests/test_ctags.py @@ -1,172 +1,171 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest from unittest.mock import patch import pytest import swh.indexer.ctags from swh.indexer.ctags import CtagsIndexer, run_ctags from swh.indexer.storage.model import ContentCtagsRow from swh.indexer.tests.utils import ( BASE_TEST_CONFIG, OBJ_STORAGE_DATA, + RAW_CONTENT_IDS, SHA1_TO_CTAGS, CommonContentIndexerTest, fill_obj_storage, fill_storage, filter_dict, ) from swh.model.hashutil import hash_to_bytes class BasicTest(unittest.TestCase): @patch("swh.indexer.ctags.subprocess") def test_run_ctags(self, mock_subprocess): """Computing licenses from a raw content should return results""" output0 = """ {"name":"defun","kind":"function","line":1,"language":"scheme"} {"name":"name","kind":"symbol","line":5,"language":"else"}""" output1 = """ {"name":"let","kind":"var","line":10,"language":"something"}""" expected_result0 = [ {"name": "defun", "kind": "function", "line": 1, "lang": "scheme"}, {"name": "name", "kind": "symbol", "line": 5, "lang": "else"}, ] expected_result1 = [ {"name": "let", "kind": "var", "line": 10, "lang": "something"} ] for path, lang, intermediary_result, expected_result in [ (b"some/path", "lisp", output0, expected_result0), (b"some/path/2", "markdown", output1, expected_result1), ]: mock_subprocess.check_output.return_value = intermediary_result actual_result = list(run_ctags(path, lang=lang)) self.assertEqual(actual_result, expected_result) class InjectCtagsIndexer: """Override ctags computations.""" def compute_ctags(self, path, lang): """Inject fake ctags given path (sha1 identifier).""" return {"lang": lang, **SHA1_TO_CTAGS.get(path)} CONFIG = { **BASE_TEST_CONFIG, "tools": { "name": "universal-ctags", "version": "~git7859817b", "configuration": { "command_line": """ctags --fields=+lnz --sort=no """ """ --links=no """, "max_content_size": 1000, }, }, "languages": { "python": "python", "haskell": "haskell", "bar": "bar", }, "workdir": "/tmp", } class TestCtagsIndexer(CommonContentIndexerTest, unittest.TestCase): """Ctags indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ def get_indexer_results(self, ids): yield from self.idx_storage.content_ctags_get(ids) def setUp(self): super().setUp() self.indexer = CtagsIndexer(config=CONFIG) self.indexer.catch_exceptions = False self.idx_storage = self.indexer.idx_storage fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) # Prepare test input - self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5" - self.id1 = "d4c647f0fc257591cc9ba1722484229780d1c607" - self.id2 = "688a5ef812c53907562fe379d4b3851e69c7cb15" + self.id0, self.id1, self.id2 = RAW_CONTENT_IDS tool = {k.replace("tool_", ""): v for (k, v) in self.indexer.tool.items()} self.expected_results = [ *[ ContentCtagsRow( - id=hash_to_bytes(self.id0), + id=self.id0, tool=tool, **kwargs, ) for kwargs in SHA1_TO_CTAGS[self.id0] ], *[ ContentCtagsRow( - id=hash_to_bytes(self.id1), + id=self.id1, tool=tool, **kwargs, ) for kwargs in SHA1_TO_CTAGS[self.id1] ], *[ ContentCtagsRow( - id=hash_to_bytes(self.id2), + id=self.id2, tool=tool, **kwargs, ) for kwargs in SHA1_TO_CTAGS[self.id2] ], ] self._set_mocks() def _set_mocks(self): def find_ctags_for_content(raw_content): for (sha1, ctags) in SHA1_TO_CTAGS.items(): - if OBJ_STORAGE_DATA[sha1] == raw_content: + if OBJ_STORAGE_DATA[hash_to_bytes(sha1)] == raw_content: return ctags else: raise ValueError( ("%r not found in objstorage, can't mock its ctags.") % raw_content ) def fake_language(raw_content, *args, **kwargs): ctags = find_ctags_for_content(raw_content) return {"lang": ctags[0]["lang"]} self._real_compute_language = swh.indexer.ctags.compute_language swh.indexer.ctags.compute_language = fake_language def fake_check_output(cmd, *args, **kwargs): id_ = cmd[-1].split("/")[-1] return "\n".join( json.dumps({"language": ctag["lang"], **ctag}) - for ctag in SHA1_TO_CTAGS[id_] + for ctag in SHA1_TO_CTAGS[hash_to_bytes(id_)] ) self._real_check_output = swh.indexer.ctags.subprocess.check_output swh.indexer.ctags.subprocess.check_output = fake_check_output def tearDown(self): swh.indexer.ctags.compute_language = self._real_compute_language swh.indexer.ctags.subprocess.check_output = self._real_check_output super().tearDown() def test_ctags_w_no_tool(): with pytest.raises(ValueError): CtagsIndexer(config=filter_dict(CONFIG, "tools")) diff --git a/swh/indexer/tests/test_fossology_license.py b/swh/indexer/tests/test_fossology_license.py index ed81b27..a38167b 100644 --- a/swh/indexer/tests/test_fossology_license.py +++ b/swh/indexer/tests/test_fossology_license.py @@ -1,163 +1,159 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict import unittest from unittest.mock import patch import pytest from swh.indexer import fossology_license from swh.indexer.fossology_license import ( FossologyLicenseIndexer, FossologyLicensePartitionIndexer, compute_license, ) from swh.indexer.storage.model import ContentLicenseRow from swh.indexer.tests.utils import ( BASE_TEST_CONFIG, + RAW_CONTENT_IDS, SHA1_TO_LICENSES, CommonContentIndexerPartitionTest, CommonContentIndexerTest, fill_obj_storage, fill_storage, filter_dict, ) from swh.model.hashutil import hash_to_bytes class BasicTest(unittest.TestCase): @patch("swh.indexer.fossology_license.subprocess") def test_compute_license(self, mock_subprocess): """Computing licenses from a raw content should return results""" for path, intermediary_result, output in [ (b"some/path", None, []), (b"some/path/2", [], []), (b"other/path", " contains license(s) GPL,AGPL", ["GPL", "AGPL"]), ]: mock_subprocess.check_output.return_value = intermediary_result actual_result = compute_license(path) self.assertEqual( actual_result, { "licenses": output, "path": path, }, ) def mock_compute_license(path): """path is the content identifier""" if isinstance(id, bytes): path = path.decode("utf-8") # path is something like /tmp/tmpXXX/ so we keep only the sha1 part - path = path.split("/")[-1] - return {"licenses": SHA1_TO_LICENSES.get(path, [])} + id_ = path.split("/")[-1] + return {"licenses": SHA1_TO_LICENSES.get(hash_to_bytes(id_), [])} CONFIG = { **BASE_TEST_CONFIG, "workdir": "/tmp", "tools": { "name": "nomos", "version": "3.1.0rc2-31-ga2cbb8c", "configuration": { "command_line": "nomossa ", }, }, } # type: Dict[str, Any] RANGE_CONFIG = dict(list(CONFIG.items()) + [("write_batch_size", 100)]) class TestFossologyLicenseIndexer(CommonContentIndexerTest, unittest.TestCase): """Language indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ def get_indexer_results(self, ids): yield from self.idx_storage.content_fossology_license_get(ids) def setUp(self): super().setUp() # replace actual license computation with a mock self.orig_compute_license = fossology_license.compute_license fossology_license.compute_license = mock_compute_license self.indexer = FossologyLicenseIndexer(CONFIG) self.indexer.catch_exceptions = False self.idx_storage = self.indexer.idx_storage fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) - self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5" - self.id1 = "688a5ef812c53907562fe379d4b3851e69c7cb15" - self.id2 = "da39a3ee5e6b4b0d3255bfef95601890afd80709" # empty content + self.id0, self.id1, self.id2 = RAW_CONTENT_IDS tool = {k.replace("tool_", ""): v for (k, v) in self.indexer.tool.items()} + # then self.expected_results = [ *[ - ContentLicenseRow( - id=hash_to_bytes(self.id0), tool=tool, license=license - ) + ContentLicenseRow(id=self.id0, tool=tool, license=license) for license in SHA1_TO_LICENSES[self.id0] ], *[ - ContentLicenseRow( - id=hash_to_bytes(self.id1), tool=tool, license=license - ) + ContentLicenseRow(id=self.id1, tool=tool, license=license) for license in SHA1_TO_LICENSES[self.id1] ], *[], # self.id2 ] def tearDown(self): super().tearDown() fossology_license.compute_license = self.orig_compute_license class TestFossologyLicensePartitionIndexer( CommonContentIndexerPartitionTest, unittest.TestCase ): """Range Fossology License Indexer tests. - new data within range are indexed - no data outside a range are indexed - with filtering existing indexed data prior to compute new index - without filtering existing indexed data prior to compute new index """ def setUp(self): super().setUp() # replace actual license computation with a mock self.orig_compute_license = fossology_license.compute_license fossology_license.compute_license = mock_compute_license self.indexer = FossologyLicensePartitionIndexer(config=RANGE_CONFIG) self.indexer.catch_exceptions = False fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) def tearDown(self): super().tearDown() fossology_license.compute_license = self.orig_compute_license def test_fossology_w_no_tool(): with pytest.raises(ValueError): FossologyLicenseIndexer(config=filter_dict(CONFIG, "tools")) def test_fossology_range_w_no_tool(): with pytest.raises(ValueError): FossologyLicensePartitionIndexer(config=filter_dict(RANGE_CONFIG, "tools")) diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py index 027d693..20c49c0 100644 --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -1,270 +1,283 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from unittest.mock import call import attr from swh.indexer.metadata import ( ContentMetadataIndexer, DirectoryMetadataIndexer, ExtrinsicMetadataIndexer, ) from swh.indexer.storage.model import ( ContentMetadataRow, DirectoryIntrinsicMetadataRow, OriginExtrinsicMetadataRow, ) from swh.indexer.tests.utils import DIRECTORY2 from swh.model.model import ( Directory, DirectoryEntry, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, RawExtrinsicMetadata, ) from swh.model.swhids import ExtendedObjectType, ExtendedSWHID from .utils import ( BASE_TEST_CONFIG, + MAPPING_DESCRIPTION_CONTENT_SHA1, + MAPPING_DESCRIPTION_CONTENT_SHA1GIT, YARN_PARSER_METADATA, fill_obj_storage, fill_storage, ) TRANSLATOR_TOOL = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {"type": "local", "context": "NpmMapping"}, } class ContentMetadataTestIndexer(ContentMetadataIndexer): """Specific Metadata whose configuration is enough to satisfy the indexing tests. """ def parse_config_file(self, *args, **kwargs): assert False, "should not be called; the dir indexer configures it." DIRECTORY_METADATA_CONFIG = { **BASE_TEST_CONFIG, "tools": TRANSLATOR_TOOL, } REMD = RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.ORIGIN, object_id=b"\x01" * 20, ), discovery_date=datetime.datetime.now(tz=datetime.timezone.utc), authority=MetadataAuthority( type=MetadataAuthorityType.FORGE, url="https://example.org/", ), fetcher=MetadataFetcher( name="example-fetcher", version="1.0.0", ), format="application/vnd.github.v3+json", metadata=b'{"full_name": "test software"}', ) class TestMetadata: """ Tests metadata_mock_tool tool for Metadata detection """ def test_directory_metadata_indexer(self): metadata_indexer = DirectoryMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) fill_obj_storage(metadata_indexer.objstorage) fill_storage(metadata_indexer.storage) tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None dir_ = DIRECTORY2 + assert ( + dir_.entries[0].target + == MAPPING_DESCRIPTION_CONTENT_SHA1GIT["json:yarn-parser-package.json"] + ) + metadata_indexer.idx_storage.content_metadata_add( [ ContentMetadataRow( - id=DIRECTORY2.entries[0].target, + id=MAPPING_DESCRIPTION_CONTENT_SHA1[ + "json:yarn-parser-package.json" + ], indexer_configuration_id=tool["id"], metadata=YARN_PARSER_METADATA, ) ] ) metadata_indexer.run([dir_.id]) results = list( - metadata_indexer.idx_storage.directory_intrinsic_metadata_get( - [DIRECTORY2.id] - ) + metadata_indexer.idx_storage.directory_intrinsic_metadata_get([dir_.id]) ) expected_results = [ DirectoryIntrinsicMetadataRow( id=dir_.id, tool=TRANSLATOR_TOOL, metadata=YARN_PARSER_METADATA, mappings=["npm"], ) ] for result in results: del result.tool["id"] assert results == expected_results def test_directory_metadata_indexer_single_root_dir(self): metadata_indexer = DirectoryMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) fill_obj_storage(metadata_indexer.objstorage) fill_storage(metadata_indexer.storage) # Add a parent directory, that is the only directory at the root # of the directory dir_ = DIRECTORY2 + assert ( + dir_.entries[0].target + == MAPPING_DESCRIPTION_CONTENT_SHA1GIT["json:yarn-parser-package.json"] + ) new_dir = Directory( entries=( DirectoryEntry( name=b"foobar-1.0.0", type="dir", target=dir_.id, perms=16384, ), ), ) assert new_dir.id is not None metadata_indexer.storage.directory_add([new_dir]) tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None metadata_indexer.idx_storage.content_metadata_add( [ ContentMetadataRow( - id=DIRECTORY2.entries[0].target, + id=MAPPING_DESCRIPTION_CONTENT_SHA1[ + "json:yarn-parser-package.json" + ], indexer_configuration_id=tool["id"], metadata=YARN_PARSER_METADATA, ) ] ) metadata_indexer.run([new_dir.id]) results = list( metadata_indexer.idx_storage.directory_intrinsic_metadata_get([new_dir.id]) ) expected_results = [ DirectoryIntrinsicMetadataRow( id=new_dir.id, tool=TRANSLATOR_TOOL, metadata=YARN_PARSER_METADATA, mappings=["npm"], ) ] for result in results: del result.tool["id"] assert results == expected_results def test_extrinsic_metadata_indexer_unknown_format(self, mocker): """Should be ignored when unknown format""" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") remd = attr.evolve(REMD, format="unknown format") results = metadata_indexer.index(remd.id, data=remd) assert metadata_indexer.storage.method_calls == [] assert results == [] def test_extrinsic_metadata_indexer_github(self, mocker): """Nominal case, calling the mapping and storing the result""" origin = "https://example.org/jdoe/myrepo" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.catch_exceptions = False metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None assert metadata_indexer.process_journal_objects( {"raw_extrinsic_metadata": [REMD.to_dict()]} ) == {"status": "eventful", "origin_extrinsic_metadata:add": 1} assert metadata_indexer.storage.method_calls == [ call.origin_get_by_sha1([b"\x01" * 20]) ] results = list( metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) ) assert results == [ OriginExtrinsicMetadataRow( id="https://example.org/jdoe/myrepo", tool={"id": tool["id"], **TRANSLATOR_TOOL}, metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "https://forgefed.org/ns#Repository", "name": "test software", }, from_remd_id=REMD.id, mappings=["github"], ) ] def test_extrinsic_metadata_indexer_nonforge_authority(self, mocker): """Early abort on non-forge authorities""" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") remd = attr.evolve( REMD, authority=attr.evolve(REMD.authority, type=MetadataAuthorityType.REGISTRY), ) results = metadata_indexer.index(remd.id, data=remd) assert metadata_indexer.storage.method_calls == [] assert results == [] def test_extrinsic_metadata_indexer_thirdparty_authority(self, mocker): """Should be ignored when authority URL does not match the origin""" origin = "https://different-domain.example.org/jdoe/myrepo" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.catch_exceptions = False metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None results = metadata_indexer.index(REMD.id, data=REMD) assert metadata_indexer.storage.method_calls == [ call.origin_get_by_sha1([b"\x01" * 20]) ] assert results == [] diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py index 73d8d41..cc2f9cb 100644 --- a/swh/indexer/tests/test_mimetype.py +++ b/swh/indexer/tests/test_mimetype.py @@ -1,140 +1,133 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict import unittest import pytest from swh.indexer.mimetype import ( MimetypeIndexer, MimetypePartitionIndexer, compute_mimetype_encoding, ) from swh.indexer.storage.model import ContentMimetypeRow from swh.indexer.tests.utils import ( BASE_TEST_CONFIG, + RAW_CONTENT_IDS, + RAW_CONTENTS, CommonContentIndexerPartitionTest, CommonContentIndexerTest, fill_obj_storage, fill_storage, filter_dict, ) -from swh.model.hashutil import hash_to_bytes @pytest.mark.parametrize( - "raw_text,mimetype,encoding", - [ - ("du français".encode(), "text/plain", "utf-8"), - (b"def __init__(self):", ("text/x-python", "text/x-script.python"), "us-ascii"), - (b"\xff\xfe\x00\x00\x00\x00\xff\xfe\xff\xff", "application/octet-stream", ""), - ], + "raw_text,mimetypes,encoding", + RAW_CONTENTS.values(), ) -def test_compute_mimetype_encoding(raw_text, mimetype, encoding): +def test_compute_mimetype_encoding(raw_text, mimetypes, encoding): """Compute mimetype encoding should return results""" actual_result = compute_mimetype_encoding(raw_text) - if isinstance(mimetype, tuple): - # New magic version can return different results, this deals with such a case - expected_result = {"mimetype": mimetype[0], "encoding": encoding} - # as a fallback - fallback_expected_result = {"mimetype": mimetype[1], "encoding": encoding} - else: - expected_result = {"mimetype": mimetype, "encoding": encoding} - fallback_expected_result = expected_result - try: - assert actual_result == expected_result - except AssertionError: - assert actual_result == fallback_expected_result + # Older libmagic versions (e.g. buster: 1:5.35-4+deb10u2, bullseye: 1:5.39-3) + # returns different results. This allows to deal with such a case when executing + # tests on different environments machines (e.g. ci tox, ci debian, dev machine, + # ...) + all_mimetypes = mimetypes if isinstance(mimetypes, tuple) else [mimetypes] + + assert actual_result in [ + {"mimetype": mimetype, "encoding": encoding} for mimetype in all_mimetypes + ] CONFIG = { **BASE_TEST_CONFIG, "tools": { "name": "file", "version": "1:5.30-1+deb9u1", "configuration": {"type": "library", "debian-package": "python3-magic"}, }, } # type: Dict[str, Any] class TestMimetypeIndexer(CommonContentIndexerTest, unittest.TestCase): """Mimetype indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ def get_indexer_results(self, ids): yield from self.idx_storage.content_mimetype_get(ids) def setUp(self): self.indexer = MimetypeIndexer(config=CONFIG) self.indexer.catch_exceptions = False self.idx_storage = self.indexer.idx_storage fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) - self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5" - self.id1 = "688a5ef812c53907562fe379d4b3851e69c7cb15" - self.id2 = "da39a3ee5e6b4b0d3255bfef95601890afd80709" + self.id0, self.id1, self.id2 = RAW_CONTENT_IDS tool = {k.replace("tool_", ""): v for (k, v) in self.indexer.tool.items()} - self.expected_results = [ - ContentMimetypeRow( - id=hash_to_bytes(self.id0), - tool=tool, - mimetype="text/plain", - encoding="us-ascii", - ), - ContentMimetypeRow( - id=hash_to_bytes(self.id1), - tool=tool, - mimetype="text/plain", - encoding="us-ascii", - ), - ContentMimetypeRow( - id=hash_to_bytes(self.id2), - tool=tool, - mimetype="application/x-empty", - encoding="binary", - ), - ] + results = [] + for raw_content_id, (raw_content, mimetypes, encoding) in RAW_CONTENTS.items(): + # Older libmagic versions (e.g. buster: 1:5.35-4+deb10u2, bullseye: + # 1:5.39-3) returns different results. This allows to deal with such a case + # when executing tests on different environments machines (e.g. ci tox, ci + # debian, dev machine, ...) + all_mimetypes = mimetypes if isinstance(mimetypes, tuple) else [mimetypes] + + results.extend( + [ + ContentMimetypeRow( + id=raw_content_id, + tool=tool, + mimetype=mimetype, + encoding=encoding, + ) + for mimetype in all_mimetypes + ] + ) + + self.expected_results = results RANGE_CONFIG = dict(list(CONFIG.items()) + [("write_batch_size", 100)]) class TestMimetypePartitionIndexer( CommonContentIndexerPartitionTest, unittest.TestCase ): """Range Mimetype Indexer tests. - new data within range are indexed - no data outside a range are indexed - with filtering existing indexed data prior to compute new index - without filtering existing indexed data prior to compute new index """ def setUp(self): super().setUp() self.indexer = MimetypePartitionIndexer(config=RANGE_CONFIG) self.indexer.catch_exceptions = False fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) def test_mimetype_w_no_tool(): with pytest.raises(ValueError): MimetypeIndexer(config=filter_dict(CONFIG, "tools")) def test_mimetype_range_w_no_tool(): with pytest.raises(ValueError): MimetypePartitionIndexer(config=filter_dict(CONFIG, "tools")) diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py index 5171bae..1e80dc2 100644 --- a/swh/indexer/tests/utils.py +++ b/swh/indexer/tests/utils.py @@ -1,783 +1,793 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import datetime import functools -from typing import Any, Dict +from typing import Any, Dict, List, Tuple import unittest from hypothesis import strategies from swh.core.api.classes import stream_results from swh.indexer.storage import INDEXER_CFG_KEY -from swh.model import hashutil from swh.model.hashutil import hash_to_bytes from swh.model.model import ( Content, Directory, DirectoryEntry, ObjectType, Origin, OriginVisit, OriginVisitStatus, Person, Release, Revision, RevisionType, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.storage.utils import now BASE_TEST_CONFIG: Dict[str, Dict[str, Any]] = { "storage": {"cls": "memory"}, "objstorage": {"cls": "memory"}, INDEXER_CFG_KEY: {"cls": "memory"}, } - ORIGIN_VISITS = [ {"type": "git", "origin": "https://github.com/SoftwareHeritage/swh-storage"}, {"type": "ftp", "origin": "rsync://ftp.gnu.org/gnu/3dldf"}, { "type": "deposit", "origin": "https://forge.softwareheritage.org/source/jesuisgpl/", }, { "type": "pypi", "origin": "https://old-pypi.example.org/project/limnoria/", }, # with rev head {"type": "pypi", "origin": "https://pypi.org/project/limnoria/"}, # with rel head {"type": "svn", "origin": "http://0-512-md.googlecode.com/svn/"}, {"type": "git", "origin": "https://github.com/librariesio/yarn-parser"}, {"type": "git", "origin": "https://github.com/librariesio/yarn-parser.git"}, {"type": "git", "origin": "https://npm.example.org/yarn-parser"}, ] ORIGINS = [Origin(url=visit["origin"]) for visit in ORIGIN_VISITS] +OBJ_STORAGE_RAW_CONTENT: Dict[str, bytes] = { + "text:some": b"this is some text", + "text:another": b"another text", + "text:yet": b"yet another text", + "python:code": b""" + import unittest + import logging + from swh.indexer.mimetype import MimetypeIndexer + from swh.indexer.tests.test_utils import MockObjStorage + + class MockStorage(): + def content_mimetype_add(self, mimetypes): + self.state = mimetypes + + def indexer_configuration_add(self, tools): + return [{ + 'id': 10, + }] + """, + "c:struct": b""" + #ifndef __AVL__ + #define __AVL__ + + typedef struct _avl_tree avl_tree; + + typedef struct _data_t { + int content; + } data_t; + """, + "lisp:assertion": b""" + (should 'pygments (recognize 'lisp 'easily)) + + """, + "json:test-metadata-package.json": b""" + { + "name": "test_metadata", + "version": "0.0.1", + "description": "Simple package.json test for indexer", + "repository": { + "type": "git", + "url": "https://github.com/moranegg/metadata_test" + } + } + """, + "json:npm-package.json": b""" + { + "version": "5.0.3", + "name": "npm", + "description": "a package manager for JavaScript", + "keywords": [ + "install", + "modules", + "package manager", + "package.json" + ], + "preferGlobal": true, + "config": { + "publishtest": false + }, + "homepage": "https://docs.npmjs.com/", + "author": "Isaac Z. Schlueter (http://blog.izs.me)", + "repository": { + "type": "git", + "url": "https://github.com/npm/npm" + }, + "bugs": { + "url": "https://github.com/npm/npm/issues" + }, + "dependencies": { + "JSONStream": "~1.3.1", + "abbrev": "~1.1.0", + "ansi-regex": "~2.1.1", + "ansicolors": "~0.3.2", + "ansistyles": "~0.1.3" + }, + "devDependencies": { + "tacks": "~1.2.6", + "tap": "~10.3.2" + }, + "license": "Artistic-2.0" + } + + """, + "text:carriage-return": b""" + """, + "text:empty": b"", + # was 626364 / b'bcd' + "text:unimportant": b"unimportant content for bcd", + # was 636465 / b'cde' now yarn-parser package.json + "json:yarn-parser-package.json": b""" + { + "name": "yarn-parser", + "version": "1.0.0", + "description": "Tiny web service for parsing yarn.lock files", + "main": "index.js", + "scripts": { + "start": "node index.js", + "test": "mocha" + }, + "engines": { + "node": "9.8.0" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/librariesio/yarn-parser.git" + }, + "keywords": [ + "yarn", + "parse", + "lock", + "dependencies" + ], + "author": "Andrew Nesbitt", + "license": "AGPL-3.0", + "bugs": { + "url": "https://github.com/librariesio/yarn-parser/issues" + }, + "homepage": "https://github.com/librariesio/yarn-parser#readme", + "dependencies": { + "@yarnpkg/lockfile": "^1.0.0", + "body-parser": "^1.15.2", + "express": "^4.14.0" + }, + "devDependencies": { + "chai": "^4.1.2", + "mocha": "^5.2.0", + "request": "^2.87.0", + "test": "^0.6.0" + } + } + +""", +} + +MAPPING_DESCRIPTION_CONTENT_SHA1GIT: Dict[str, bytes] = {} +MAPPING_DESCRIPTION_CONTENT_SHA1: Dict[str, bytes] = {} +OBJ_STORAGE_DATA: Dict[bytes, bytes] = {} + +for key_description, data in OBJ_STORAGE_RAW_CONTENT.items(): + content = Content.from_data(data) + MAPPING_DESCRIPTION_CONTENT_SHA1GIT[key_description] = content.sha1_git + MAPPING_DESCRIPTION_CONTENT_SHA1[key_description] = content.sha1 + OBJ_STORAGE_DATA[content.sha1] = data + + +RAW_CONTENT_METADATA = [ + ( + "du français".encode(), + "text/plain", + "utf-8", + ), + ( + b"def __init__(self):", + ("text/x-python", "text/x-script.python"), + "us-ascii", + ), + ( + b"\xff\xfe\x00\x00\x00\x00\xff\xfe\xff\xff", + "application/octet-stream", + "", + ), +] + +RAW_CONTENTS: Dict[bytes, Tuple] = {} +RAW_CONTENT_IDS: List[bytes] = [] + +for index, raw_content_d in enumerate(RAW_CONTENT_METADATA): + raw_content = raw_content_d[0] + content = Content.from_data(raw_content) + RAW_CONTENTS[content.sha1] = raw_content_d + RAW_CONTENT_IDS.append(content.sha1) + # and write it to objstorage data so it's flushed in the objstorage + OBJ_STORAGE_DATA[content.sha1] = raw_content + + +SHA1_TO_LICENSES: Dict[bytes, List[str]] = { + RAW_CONTENT_IDS[0]: ["GPL"], + RAW_CONTENT_IDS[1]: ["AGPL"], + RAW_CONTENT_IDS[2]: [], +} + + +SHA1_TO_CTAGS: Dict[bytes, List[Dict[str, Any]]] = { + RAW_CONTENT_IDS[0]: [ + { + "name": "foo", + "kind": "str", + "line": 10, + "lang": "bar", + } + ], + RAW_CONTENT_IDS[1]: [ + { + "name": "symbol", + "kind": "float", + "line": 99, + "lang": "python", + } + ], + RAW_CONTENT_IDS[2]: [ + { + "name": "let", + "kind": "int", + "line": 100, + "lang": "haskell", + } + ], +} + DIRECTORY = Directory( - id=hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"), entries=( DirectoryEntry( name=b"index.js", type="file", - target=hash_to_bytes("01c9379dfc33803963d07c1ccc748d3fe4c96bb5"), + target=MAPPING_DESCRIPTION_CONTENT_SHA1GIT["text:some"], perms=0o100644, ), DirectoryEntry( name=b"package.json", type="file", - target=hash_to_bytes("26a9f72a7c87cc9205725cfd879f514ff4f3d8d5"), + target=MAPPING_DESCRIPTION_CONTENT_SHA1GIT[ + "json:test-metadata-package.json" + ], perms=0o100644, ), DirectoryEntry( name=b".github", type="dir", target=Directory(entries=()).id, perms=0o040000, ), ), ) DIRECTORY2 = Directory( - id=b"\xf8zz\xa1\x12`<1$\xfav\xf9\x01\xfd5\x85F`\xf2\xb6", entries=( DirectoryEntry( name=b"package.json", type="file", - target=hash_to_bytes("f5305243b3ce7ef8dc864ebc73794da304025beb"), + target=MAPPING_DESCRIPTION_CONTENT_SHA1GIT["json:yarn-parser-package.json"], perms=0o100644, ), ), ) _utc_plus_2 = datetime.timezone(datetime.timedelta(minutes=120)) REVISION = Revision( - id=hash_to_bytes("c6201cb1b9b9df9a7542f9665c3b5dfab85e9775"), message=b"Improve search functionality", author=Person( name=b"Andrew Nesbitt", fullname=b"Andrew Nesbitt ", email=b"andrewnez@gmail.com", ), committer=Person( name=b"Andrew Nesbitt", fullname=b"Andrew Nesbitt ", email=b"andrewnez@gmail.com", ), committer_date=TimestampWithTimezone.from_datetime( datetime.datetime(2013, 10, 4, 12, 50, 49, tzinfo=_utc_plus_2) ), type=RevisionType.GIT, synthetic=False, date=TimestampWithTimezone.from_datetime( datetime.datetime(2017, 2, 20, 16, 14, 16, tzinfo=_utc_plus_2) ), directory=DIRECTORY2.id, parents=(), ) REVISIONS = [REVISION] RELEASE = Release( name=b"v0.0.0", message=None, author=Person( name=b"Andrew Nesbitt", fullname=b"Andrew Nesbitt ", email=b"andrewnez@gmail.com", ), synthetic=False, date=TimestampWithTimezone.from_datetime( datetime.datetime(2017, 2, 20, 16, 14, 16, tzinfo=_utc_plus_2) ), target_type=ObjectType.DIRECTORY, target=DIRECTORY2.id, ) RELEASES = [RELEASE] SNAPSHOTS = [ # https://github.com/SoftwareHeritage/swh-storage Snapshot( - id=hash_to_bytes("a50fde72265343b7d28cecf6db20d98a81d21965"), branches={ b"refs/heads/add-revision-origin-cache": SnapshotBranch( target=b'L[\xce\x1c\x88\x8eF\t\xf1"\x19\x1e\xfb\xc0s\xe7/\xe9l\x1e', target_type=TargetType.REVISION, ), b"refs/head/master": SnapshotBranch( target=b"8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}\xac\xefrm", target_type=TargetType.REVISION, ), b"HEAD": SnapshotBranch( target=b"refs/head/master", target_type=TargetType.ALIAS ), b"refs/tags/v0.0.103": SnapshotBranch( target=b'\xb6"Im{\xfdLb\xb0\x94N\xea\x96m\x13x\x88+\x0f\xdd', target_type=TargetType.RELEASE, ), }, ), # rsync://ftp.gnu.org/gnu/3dldf Snapshot( - id=hash_to_bytes("2c67f69a416bca4e1f3fcd848c588fab88ad0642"), branches={ b"3DLDF-1.1.4.tar.gz": SnapshotBranch( target=b'dJ\xfb\x1c\x91\xf4\x82B%]6\xa2\x90|\xd3\xfc"G\x99\x11', target_type=TargetType.REVISION, ), b"3DLDF-2.0.2.tar.gz": SnapshotBranch( target=b"\xb6\x0e\xe7\x9e9\xac\xaa\x19\x9e=\xd1\xc5\x00\\\xc6\xfc\xe0\xa6\xb4V", # noqa target_type=TargetType.REVISION, ), b"3DLDF-2.0.3-examples.tar.gz": SnapshotBranch( target=b"!H\x19\xc0\xee\x82-\x12F1\xbd\x97\xfe\xadZ\x80\x80\xc1\x83\xff", # noqa target_type=TargetType.REVISION, ), b"3DLDF-2.0.3.tar.gz": SnapshotBranch( target=b"\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee\xcc\x1a\xb4`\x8c\x8by", # noqa target_type=TargetType.REVISION, ), b"3DLDF-2.0.tar.gz": SnapshotBranch( target=b"F6*\xff(?\x19a\xef\xb6\xc2\x1fv$S\xe3G\xd3\xd1m", target_type=TargetType.REVISION, ), }, ), # https://forge.softwareheritage.org/source/jesuisgpl/", Snapshot( - id=hash_to_bytes("68c0d26104d47e278dd6be07ed61fafb561d0d20"), branches={ b"master": SnapshotBranch( target=b"\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{\xa6\xe9\x99\xb1\x9e]q\xeb", # noqa target_type=TargetType.REVISION, ) }, ), # https://old-pypi.example.org/project/limnoria/ Snapshot( - id=hash_to_bytes("f255245269e15fc99d284affd79f766668de0b67"), branches={ b"HEAD": SnapshotBranch( target=b"releases/2018.09.09", target_type=TargetType.ALIAS ), b"releases/2018.09.01": SnapshotBranch( target=b"<\xee1(\xe8\x8d_\xc1\xc9\xa6rT\xf1\x1d\xbb\xdfF\xfdw\xcf", target_type=TargetType.REVISION, ), b"releases/2018.09.09": SnapshotBranch( target=b"\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8kA\x10\x9d\xc5\xfa2\xf8t", # noqa target_type=TargetType.REVISION, ), }, ), # https://pypi.org/project/limnoria/ Snapshot( branches={ b"HEAD": SnapshotBranch( target=b"releases/2018.09.09", target_type=TargetType.ALIAS ), b"releases/2018.09.01": SnapshotBranch( target=b"<\xee1(\xe8\x8d_\xc1\xc9\xa6rT\xf1\x1d\xbb\xdfF\xfdw\xcf", target_type=TargetType.RELEASE, ), b"releases/2018.09.09": SnapshotBranch( target=b"\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8kA\x10\x9d\xc5\xfa2\xf8t", # noqa target_type=TargetType.RELEASE, ), }, ), # http://0-512-md.googlecode.com/svn/ Snapshot( - id=hash_to_bytes("a1a28c0ab387a8f9e0618cb705eab81fc448f473"), branches={ b"master": SnapshotBranch( target=b"\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8\xc9\xad#.\x1bw=\x18", target_type=TargetType.REVISION, ) }, ), # https://github.com/librariesio/yarn-parser Snapshot( - id=hash_to_bytes("bb4fd3a836930ce629d912864319637040ff3040"), branches={ b"HEAD": SnapshotBranch( target=REVISION.id, target_type=TargetType.REVISION, ) }, ), # https://github.com/librariesio/yarn-parser.git Snapshot( - id=hash_to_bytes("bb4fd3a836930ce629d912864319637040ff3040"), branches={ b"HEAD": SnapshotBranch( target=REVISION.id, target_type=TargetType.REVISION, ) }, ), # https://npm.example.org/yarn-parser Snapshot( branches={ b"HEAD": SnapshotBranch( target=RELEASE.id, target_type=TargetType.RELEASE, ) }, ), ] assert len(SNAPSHOTS) == len(ORIGIN_VISITS) -SHA1_TO_LICENSES = { - "01c9379dfc33803963d07c1ccc748d3fe4c96bb5": ["GPL"], - "02fb2c89e14f7fab46701478c83779c7beb7b069": ["Apache2.0"], - "103bc087db1d26afc3a0283f38663d081e9b01e6": ["MIT"], - "688a5ef812c53907562fe379d4b3851e69c7cb15": ["AGPL"], - "da39a3ee5e6b4b0d3255bfef95601890afd80709": [], -} - - -SHA1_TO_CTAGS = { - "01c9379dfc33803963d07c1ccc748d3fe4c96bb5": [ - { - "name": "foo", - "kind": "str", - "line": 10, - "lang": "bar", - } - ], - "d4c647f0fc257591cc9ba1722484229780d1c607": [ - { - "name": "let", - "kind": "int", - "line": 100, - "lang": "haskell", - } - ], - "688a5ef812c53907562fe379d4b3851e69c7cb15": [ - { - "name": "symbol", - "kind": "float", - "line": 99, - "lang": "python", - } - ], -} - - -OBJ_STORAGE_DATA = { - "01c9379dfc33803963d07c1ccc748d3fe4c96bb5": b"this is some text", - "688a5ef812c53907562fe379d4b3851e69c7cb15": b"another text", - "8986af901dd2043044ce8f0d8fc039153641cf17": b"yet another text", - "02fb2c89e14f7fab46701478c83779c7beb7b069": b""" - import unittest - import logging - from swh.indexer.mimetype import MimetypeIndexer - from swh.indexer.tests.test_utils import MockObjStorage - - class MockStorage(): - def content_mimetype_add(self, mimetypes): - self.state = mimetypes - - def indexer_configuration_add(self, tools): - return [{ - 'id': 10, - }] - """, - "103bc087db1d26afc3a0283f38663d081e9b01e6": b""" - #ifndef __AVL__ - #define __AVL__ - - typedef struct _avl_tree avl_tree; - - typedef struct _data_t { - int content; - } data_t; - """, - "93666f74f1cf635c8c8ac118879da6ec5623c410": b""" - (should 'pygments (recognize 'lisp 'easily)) - - """, - "26a9f72a7c87cc9205725cfd879f514ff4f3d8d5": b""" - { - "name": "test_metadata", - "version": "0.0.1", - "description": "Simple package.json test for indexer", - "repository": { - "type": "git", - "url": "https://github.com/moranegg/metadata_test" - } - } - """, - "d4c647f0fc257591cc9ba1722484229780d1c607": b""" - { - "version": "5.0.3", - "name": "npm", - "description": "a package manager for JavaScript", - "keywords": [ - "install", - "modules", - "package manager", - "package.json" - ], - "preferGlobal": true, - "config": { - "publishtest": false - }, - "homepage": "https://docs.npmjs.com/", - "author": "Isaac Z. Schlueter (http://blog.izs.me)", - "repository": { - "type": "git", - "url": "https://github.com/npm/npm" - }, - "bugs": { - "url": "https://github.com/npm/npm/issues" - }, - "dependencies": { - "JSONStream": "~1.3.1", - "abbrev": "~1.1.0", - "ansi-regex": "~2.1.1", - "ansicolors": "~0.3.2", - "ansistyles": "~0.1.3" - }, - "devDependencies": { - "tacks": "~1.2.6", - "tap": "~10.3.2" - }, - "license": "Artistic-2.0" - } - - """, - "a7ab314d8a11d2c93e3dcf528ca294e7b431c449": b""" - """, - "da39a3ee5e6b4b0d3255bfef95601890afd80709": b"", - # was 626364 / b'bcd' - "e3e40fee6ff8a52f06c3b428bfe7c0ed2ef56e92": b"unimportant content for bcd", - # was 636465 / b'cde' now yarn-parser package.json - "f5305243b3ce7ef8dc864ebc73794da304025beb": b""" - { - "name": "yarn-parser", - "version": "1.0.0", - "description": "Tiny web service for parsing yarn.lock files", - "main": "index.js", - "scripts": { - "start": "node index.js", - "test": "mocha" - }, - "engines": { - "node": "9.8.0" - }, - "repository": { - "type": "git", - "url": "git+https://github.com/librariesio/yarn-parser.git" - }, - "keywords": [ - "yarn", - "parse", - "lock", - "dependencies" - ], - "author": "Andrew Nesbitt", - "license": "AGPL-3.0", - "bugs": { - "url": "https://github.com/librariesio/yarn-parser/issues" - }, - "homepage": "https://github.com/librariesio/yarn-parser#readme", - "dependencies": { - "@yarnpkg/lockfile": "^1.0.0", - "body-parser": "^1.15.2", - "express": "^4.14.0" - }, - "devDependencies": { - "chai": "^4.1.2", - "mocha": "^5.2.0", - "request": "^2.87.0", - "test": "^0.6.0" - } - } - -""", -} - - YARN_PARSER_METADATA = { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "url": "https://github.com/librariesio/yarn-parser#readme", "codeRepository": "git+git+https://github.com/librariesio/yarn-parser.git", "author": [{"type": "Person", "name": "Andrew Nesbitt"}], "license": "https://spdx.org/licenses/AGPL-3.0", "version": "1.0.0", "description": "Tiny web service for parsing yarn.lock files", "issueTracker": "https://github.com/librariesio/yarn-parser/issues", "name": "yarn-parser", "keywords": ["yarn", "parse", "lock", "dependencies"], "type": "SoftwareSourceCode", } json_dict_keys = strategies.one_of( strategies.characters(), strategies.just("type"), strategies.just("url"), strategies.just("name"), strategies.just("email"), strategies.just("@id"), strategies.just("@context"), strategies.just("repository"), strategies.just("license"), strategies.just("repositories"), strategies.just("licenses"), ) """Hypothesis strategy that generates strings, with an emphasis on those that are often used as dictionary keys in metadata files.""" generic_json_document = strategies.recursive( strategies.none() | strategies.booleans() | strategies.floats() | strategies.characters(), lambda children: ( strategies.lists(children, min_size=1) | strategies.dictionaries(json_dict_keys, children, min_size=1) ), ) """Hypothesis strategy that generates possible values for values of JSON metadata files.""" def json_document_strategy(keys=None): """Generates an hypothesis strategy that generates metadata files for a JSON-based format that uses the given keys.""" if keys is None: keys = strategies.characters() else: keys = strategies.one_of(map(strategies.just, keys)) return strategies.dictionaries(keys, generic_json_document, min_size=1) def _tree_to_xml(root, xmlns, data): def encode(s): "Skips unpaired surrogates generated by json_document_strategy" return s.encode("utf8", "replace") def to_xml(data, indent=b" "): if data is None: return b"" elif isinstance(data, (bool, str, int, float)): return indent + encode(str(data)) elif isinstance(data, list): return b"\n".join(to_xml(v, indent=indent) for v in data) elif isinstance(data, dict): lines = [] for (key, value) in data.items(): lines.append(indent + encode("<{}>".format(key))) lines.append(to_xml(value, indent=indent + b" ")) lines.append(indent + encode("".format(key))) return b"\n".join(lines) else: raise TypeError(data) return b"\n".join( [ '<{} xmlns="{}">'.format(root, xmlns).encode(), to_xml(data), "".format(root).encode(), ] ) class TreeToXmlTest(unittest.TestCase): def test_leaves(self): self.assertEqual( _tree_to_xml("root", "http://example.com", None), b'\n\n', ) self.assertEqual( _tree_to_xml("root", "http://example.com", True), b'\n True\n', ) self.assertEqual( _tree_to_xml("root", "http://example.com", "abc"), b'\n abc\n', ) self.assertEqual( _tree_to_xml("root", "http://example.com", 42), b'\n 42\n', ) self.assertEqual( _tree_to_xml("root", "http://example.com", 3.14), b'\n 3.14\n', ) def test_dict(self): self.assertIn( _tree_to_xml("root", "http://example.com", {"foo": "bar", "baz": "qux"}), [ b'\n' b" \n bar\n \n" b" \n qux\n \n" b"", b'\n' b" \n qux\n \n" b" \n bar\n \n" b"", ], ) def test_list(self): self.assertEqual( _tree_to_xml( "root", "http://example.com", [ {"foo": "bar"}, {"foo": "baz"}, ], ), b'\n' b" \n bar\n \n" b" \n baz\n \n" b"", ) def xml_document_strategy(keys, root, xmlns): """Generates an hypothesis strategy that generates metadata files for an XML format that uses the given keys.""" return strategies.builds( functools.partial(_tree_to_xml, root, xmlns), json_document_strategy(keys) ) def filter_dict(d, keys): "return a copy of the dict with keys deleted" if not isinstance(keys, (list, tuple)): keys = (keys,) return dict((k, v) for (k, v) in d.items() if k not in keys) def fill_obj_storage(obj_storage): """Add some content in an object storage.""" - for (obj_id, content) in OBJ_STORAGE_DATA.items(): - obj_storage.add(content, obj_id=hash_to_bytes(obj_id)) + for obj_id, content in OBJ_STORAGE_DATA.items(): + obj_storage.add(content, obj_id) def fill_storage(storage): - storage.origin_add(ORIGINS) + """Fill in storage with consistent test dataset.""" + storage.content_add([Content.from_data(data) for data in OBJ_STORAGE_DATA.values()]) storage.directory_add([DIRECTORY, DIRECTORY2]) storage.revision_add(REVISIONS) storage.release_add(RELEASES) storage.snapshot_add(SNAPSHOTS) + storage.origin_add(ORIGINS) for visit, snapshot in zip(ORIGIN_VISITS, SNAPSHOTS): assert snapshot.id is not None visit = storage.origin_visit_add( [OriginVisit(origin=visit["origin"], date=now(), type=visit["type"])] )[0] visit_status = OriginVisitStatus( origin=visit.origin, visit=visit.visit, date=now(), status="full", snapshot=snapshot.id, ) storage.origin_visit_status_add([visit_status]) - contents = [] - for (obj_id, content) in OBJ_STORAGE_DATA.items(): - content_hashes = hashutil.MultiHash.from_data(content).digest() - contents.append( - Content( - data=content, - length=len(content), - status="visible", - sha1=hash_to_bytes(obj_id), - sha1_git=hash_to_bytes(obj_id), - sha256=content_hashes["sha256"], - blake2s256=content_hashes["blake2s256"], - ) - ) - storage.content_add(contents) - class CommonContentIndexerTest(metaclass=abc.ABCMeta): def get_indexer_results(self, ids): """Override this for indexers that don't have a mock storage.""" return self.indexer.idx_storage.state def assert_results_ok(self, sha1s, expected_results=None): - sha1s = [ - sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1) for sha1 in sha1s - ] + sha1s = [hash_to_bytes(sha1) for sha1 in sha1s] actual_results = list(self.get_indexer_results(sha1s)) if expected_results is None: expected_results = self.expected_results - self.assertEqual(expected_results, actual_results) + # expected results may contain slightly duplicated results + assert 0 < len(actual_results) <= len(expected_results) + for result in actual_results: + assert result in expected_results def test_index(self): """Known sha1 have their data indexed""" sha1s = [self.id0, self.id1, self.id2] # when self.indexer.run(sha1s) self.assert_results_ok(sha1s) # 2nd pass self.indexer.run(sha1s) self.assert_results_ok(sha1s) def test_index_one_unknown_sha1(self): - """Unknown sha1 are not indexed""" + """Unknown sha1s are not indexed""" sha1s = [ self.id1, "799a5ef812c53907562fe379d4b3851e69c7cb15", # unknown - "800a5ef812c53907562fe379d4b3851e69c7cb15", + "800a5ef812c53907562fe379d4b3851e69c7cb15", # unknown ] # unknown # when self.indexer.run(sha1s) # then - expected_results = [ - res - for res in self.expected_results - if hashutil.hash_to_hex(res.id) in sha1s - ] + expected_results = [res for res in self.expected_results if res.id in sha1s] self.assert_results_ok(sha1s, expected_results) class CommonContentIndexerPartitionTest: """Allows to factorize tests on range indexer.""" def setUp(self): self.contents = sorted(OBJ_STORAGE_DATA) def assert_results_ok(self, partition_id, nb_partitions, actual_results): expected_ids = [ c.sha1 for c in stream_results( self.indexer.storage.content_get_partition, partition_id=partition_id, nb_partitions=nb_partitions, ) ] actual_results = list(actual_results) for indexed_data in actual_results: _id = indexed_data.id assert _id in expected_ids _tool_id = indexed_data.indexer_configuration_id assert _tool_id == self.indexer.tool["id"] def test__index_contents(self): """Indexing contents without existing data results in indexed data""" partition_id = 0 nb_partitions = 4 actual_results = list( self.indexer._index_contents(partition_id, nb_partitions, indexed={}) ) self.assert_results_ok(partition_id, nb_partitions, actual_results) def test__index_contents_with_indexed_data(self): """Indexing contents with existing data results in less indexed data""" partition_id = 3 nb_partitions = 4 # first pass actual_results = list( self.indexer._index_contents(partition_id, nb_partitions, indexed={}), ) self.assert_results_ok(partition_id, nb_partitions, actual_results) indexed_ids = {res.id for res in actual_results} actual_results = list( self.indexer._index_contents( partition_id, nb_partitions, indexed=indexed_ids ) ) # already indexed, so nothing new assert actual_results == [] def test_generate_content_get(self): """Optimal indexing should result in indexed data""" partition_id = 0 nb_partitions = 1 actual_results = self.indexer.run( partition_id, nb_partitions, skip_existing=False ) assert actual_results["status"] == "eventful", actual_results def test_generate_content_get_no_result(self): """No result indexed returns False""" actual_results = self.indexer.run(1, 2**512, incremental=False) assert actual_results == {"status": "uneventful"}