diff --git a/swh/indexer/data/pubspec.csv b/swh/indexer/data/pubspec.csv new file mode 100644 index 0000000..3032feb --- /dev/null +++ b/swh/indexer/data/pubspec.csv @@ -0,0 +1,68 @@ +Property,Pubspec +codeRepository,repository +programmingLanguage, +runtimePlatform,platforms +targetProduct, +applicationCategory, +applicationSubCategory, +downloadUrl, +fileSize, +installUrl, +memoryRequirements, +operatingSystem, +permissions, +processorRequirements, +releaseNotes, +softwareHelp, +softwareRequirements, +softwareVersion,version +storageRequirements, +supportingData, +author,author/authors +citation, +contributor, +copyrightHolder, +copyrightYear, +dateCreated, +dateModified, +datePublished, +editor, +encoding, +fileFormat, +funder, +keywords,keywords +license,license +producer, +provider, +publisher, +sponsor, +version,version +isAccessibleForFree, +isPartOf, +hasPart, +position, +description,description +identifier, +name,name +sameAs, +url,homepage +relatedLink, +givenName, +familyName, +email,author.email/authors.email +affiliation, +identifier, +name, +address, +type, +id, +softwareSuggestions, +maintainer, +contIntegration, +buildInstructions, +developmentStatus, +embargoDate, +funding, +issueTracker,issue_tracker +referencePublication, +readme, diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py index dc3ee6e..2d67c15 100644 --- a/swh/indexer/metadata_dictionary/__init__.py +++ b/swh/indexer/metadata_dictionary/__init__.py @@ -1,55 +1,56 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections from typing import Dict, Type import click -from . import cff, codemeta, composer, github, maven, npm, python, ruby +from . import cff, codemeta, composer, dart, github, maven, npm, python, ruby from .base import BaseExtrinsicMapping, BaseIntrinsicMapping, BaseMapping INTRINSIC_MAPPINGS: Dict[str, Type[BaseIntrinsicMapping]] = { "CffMapping": cff.CffMapping, "CodemetaMapping": codemeta.CodemetaMapping, "GemspecMapping": ruby.GemspecMapping, "MavenMapping": maven.MavenMapping, "NpmMapping": npm.NpmMapping, + "PubMapping": dart.PubspecMapping, "PythonPkginfoMapping": python.PythonPkginfoMapping, "ComposerMapping": composer.ComposerMapping, } EXTRINSIC_MAPPINGS: Dict[str, Type[BaseExtrinsicMapping]] = { "GitHubMapping": github.GitHubMapping, } MAPPINGS: Dict[str, Type[BaseMapping]] = {**INTRINSIC_MAPPINGS, **EXTRINSIC_MAPPINGS} def list_terms(): """Returns a dictionary with all supported CodeMeta terms as keys, and the mappings that support each of them as values.""" d = collections.defaultdict(set) for mapping in MAPPINGS.values(): for term in mapping.supported_terms(): d[term].add(mapping) return d @click.command() @click.argument("mapping_name") @click.argument("file_name") def main(mapping_name: str, file_name: str): from pprint import pprint with open(file_name, "rb") as fd: file_content = fd.read() res = MAPPINGS[mapping_name]().translate(file_content) pprint(res) if __name__ == "__main__": main() diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 9a09cd6..601dc6b 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,246 +1,270 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar from typing_extensions import TypedDict +import yaml from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values from swh.indexer.storage.interface import Sha1 class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None] ) def produce_terms( namespace: str, terms: List[str] ) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore f.produced_terms.extend(namespace + term for term in terms) # type: ignore return f return decorator class BaseMapping: """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`, not to be inherited directly.""" def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") def translate(self, file_content: bytes) -> Optional[Dict]: """Translates metadata, from the content of a file or of a RawExtrinsicMetadata object.""" raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation") class BaseExtrinsicMapping(BaseMapping): """Base class for extrinsic-metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=True) class BaseIntrinsicMapping(BaseMapping): """Base class for intrinsic-metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=False) class SingleFileIntrinsicMapping(BaseIntrinsicMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { term for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms def _translate_dict( self, content_dict: Dict, *, normalize: bool = True ) -> Dict[str, str]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v if normalize: return self.normalize_translation(translated_metadata) else: return translated_metadata class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None + + +class SafeLoader(yaml.SafeLoader): + yaml_implicit_resolvers = { + k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] + for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() + } + + +class YamlMapping(DictMapping, SingleFileIntrinsicMapping): + """Base class for all mappings that use Yaml data as input.""" + + def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: + raw_content_string: str = raw_content.decode() + try: + content_dict = yaml.load(raw_content_string, Loader=SafeLoader) + except yaml.scanner.ScannerError: + return None + + if isinstance(content_dict, dict): + return self._translate_dict(content_dict) + + return None diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py index 48be831..286ec77 100644 --- a/swh/indexer/metadata_dictionary/cff.py +++ b/swh/indexer/metadata_dictionary/cff.py @@ -1,74 +1,53 @@ from typing import Dict, List, Optional, Union -import yaml - from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI -from .base import DictMapping, SingleFileIntrinsicMapping - - -class SafeLoader(yaml.SafeLoader): - yaml_implicit_resolvers = { - k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] - for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() - } +from .base import YamlMapping -class CffMapping(DictMapping, SingleFileIntrinsicMapping): +class CffMapping(YamlMapping): """Dedicated class for Citation (CITATION.cff) mapping and translation""" name = "cff" filename = b"CITATION.cff" mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"] string_fields = ["keywords", "license", "abstract", "version", "doi"] - def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: - raw_content_string: str = raw_content.decode() - try: - content_dict = yaml.load(raw_content_string, Loader=SafeLoader) - except yaml.scanner.ScannerError: - return None - - if isinstance(content_dict, dict): - return self._translate_dict(content_dict) - - return None - def normalize_authors(self, d: List[dict]) -> Dict[str, list]: result = [] for author in d: author_data: Dict[str, Optional[Union[str, Dict]]] = { "@type": SCHEMA_URI + "Person" } if "orcid" in author and isinstance(author["orcid"], str): author_data["@id"] = author["orcid"] if "affiliation" in author and isinstance(author["affiliation"], str): author_data[SCHEMA_URI + "affiliation"] = { "@type": SCHEMA_URI + "Organization", SCHEMA_URI + "name": author["affiliation"], } if "family-names" in author and isinstance(author["family-names"], str): author_data[SCHEMA_URI + "familyName"] = author["family-names"] if "given-names" in author and isinstance(author["given-names"], str): author_data[SCHEMA_URI + "givenName"] = author["given-names"] result.append(author_data) result_final = {"@list": result} return result_final def normalize_doi(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": "https://doi.org/" + s} def normalize_license(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} def normalize_repository_code(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": s} def normalize_date_released(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@value": s, "@type": SCHEMA_URI + "Date"} diff --git a/swh/indexer/metadata_dictionary/dart.py b/swh/indexer/metadata_dictionary/dart.py new file mode 100644 index 0000000..26cd7d5 --- /dev/null +++ b/swh/indexer/metadata_dictionary/dart.py @@ -0,0 +1,74 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os.path +import re + +from swh.indexer.codemeta import _DATA_DIR, SCHEMA_URI, _read_crosstable + +from .base import YamlMapping + +PUB_TABLE_PATH = os.path.join(_DATA_DIR, "pubspec.csv") + +with open(PUB_TABLE_PATH) as fd: + (CODEMETA_TERMS, PUB_TABLE) = _read_crosstable(fd) + + +def name_to_person(name): + return { + "@type": SCHEMA_URI + "Person", + SCHEMA_URI + "name": name, + } + + +class PubspecMapping(YamlMapping): + + name = "pubspec" + filename = b"pubspec.yaml" + mapping = PUB_TABLE["Pubspec"] + string_fields = [ + "repository", + "keywords", + "description", + "name", + "homepage", + "issue_tracker", + "platforms", + "license" + # license will only be used with the SPDX Identifier + ] + + def normalize_license(self, s): + if isinstance(s, str): + return {"@id": "https://spdx.org/licenses/" + s} + + def normalize_homepage(self, s): + if isinstance(s, str): + return {"@id": s} + + def normalize_author(self, s): + name_email_regex = "(?P.*?)( <(?P.*)>)" + author = {"@type": SCHEMA_URI + "Person"} + if isinstance(s, str): + match = re.search(name_email_regex, s) + if match: + name = match.group("name") + email = match.group("email") + author[SCHEMA_URI + "email"] = email + else: + name = s + + author[SCHEMA_URI + "name"] = name + + return {"@list": [author]} + + def normalize_authors(self, authors_list): + authors = {"@list": []} + + if isinstance(authors_list, list): + for s in authors_list: + author = self.normalize_author(s)["@list"] + authors["@list"] += author + return authors diff --git a/swh/indexer/tests/metadata_dictionary/test_dart.py b/swh/indexer/tests/metadata_dictionary/test_dart.py new file mode 100644 index 0000000..146f7c7 --- /dev/null +++ b/swh/indexer/tests/metadata_dictionary/test_dart.py @@ -0,0 +1,157 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.indexer.metadata_dictionary import MAPPINGS + + +def test_compute_metadata_pubspec(): + raw_content = """ +--- +name: newtify +description: >- + Have you been turned into a newt? Would you like to be? + This package can help. It has all of the + newt-transmogrification functionality you have been looking + for. +keywords: + - polyfill + - shim + - compatibility + - portable + - mbstring +version: 1.2.3 +license: MIT +homepage: https://example-pet-store.com/newtify +documentation: https://example-pet-store.com/newtify/docs + +environment: + sdk: '>=2.10.0 <3.0.0' + +dependencies: + efts: ^2.0.4 + transmogrify: ^0.4.0 + +dev_dependencies: + test: '>=1.15.0 <2.0.0' + """.encode( + "utf-8" + ) + + result = MAPPINGS["PubMapping"]().translate(raw_content) + + expected = { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "type": "SoftwareSourceCode", + "name": "newtify", + "keywords": [ + "polyfill", + "shim", + "compatibility", + "portable", + "mbstring", + ], + "description": """Have you been turned into a newt? Would you like to be? \ +This package can help. It has all of the \ +newt-transmogrification functionality you have been looking \ +for.""", + "url": "https://example-pet-store.com/newtify", + "license": "https://spdx.org/licenses/MIT", + } + + assert result == expected + + +def test_normalize_author_pubspec(): + raw_content = """ + author: Atlee Pine + """.encode( + "utf-8" + ) + + result = MAPPINGS["PubMapping"]().translate(raw_content) + + expected = { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "type": "SoftwareSourceCode", + "author": [ + {"type": "Person", "name": "Atlee Pine", "email": "atlee@example.org"}, + ], + } + + assert result == expected + + +def test_normalize_authors_pubspec(): + raw_content = """ + authors: + - Vicky Merzown + - Ron Bilius Weasley + """.encode( + "utf-8" + ) + + result = MAPPINGS["PubMapping"]().translate(raw_content) + + expected = { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "type": "SoftwareSourceCode", + "author": [ + {"type": "Person", "name": "Vicky Merzown", "email": "vmz@example.org"}, + { + "type": "Person", + "name": "Ron Bilius Weasley", + }, + ], + } + + assert result == expected + + +def test_normalize_author_authors_pubspec(): + raw_content = """ + authors: + - Vicky Merzown + - Ron Bilius Weasley + author: Hermione Granger + """.encode( + "utf-8" + ) + + result = MAPPINGS["PubMapping"]().translate(raw_content) + + expected = { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "type": "SoftwareSourceCode", + "author": [ + {"type": "Person", "name": "Vicky Merzown", "email": "vmz@example.org"}, + { + "type": "Person", + "name": "Ron Bilius Weasley", + }, + { + "type": "Person", + "name": "Hermione Granger", + }, + ], + } + + assert result == expected + + +def test_normalize_empty_authors(): + raw_content = """ + authors: + """.encode( + "utf-8" + ) + + result = MAPPINGS["PubMapping"]().translate(raw_content) + + expected = { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "type": "SoftwareSourceCode", + } + + assert result == expected diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 24bde04..cc2a6b2 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,657 +1,658 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import reduce import re from typing import Any, Dict, List from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Consumer import pytest from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( DirectoryIntrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes from swh.model.model import OriginVisitStatus from .utils import DIRECTORY2, REVISION def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ { "tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, from_directory=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] directory_metadata = [ DirectoryIntrinsicMetadataRow( id=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.directory_intrinsic_metadata_add(directory_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) @pytest.fixture def cli_runner(): return CliRunner() def test_cli_mapping_list(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list"], catch_exceptions=False, ) expected_output = "\n".join( [ "cff", "codemeta", "composer", "gemspec", "github", "maven", "npm", "pkg-info", + "pubspec", "", ] # must be sorted for test to pass ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_cli_mapping_list_terms(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_cli_mapping_list_terms_exclude(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_empty_db( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_divisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_dry_run( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_nondivisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--batch-size", "20", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_mapping( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_two_mappings( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "--config-file", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_tool( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]), ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def now(): return datetime.datetime.now(tz=datetime.timezone.utc) def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), "--origin-metadata-task-type", "index-origin-metadata", ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") # This can be split into multiple tasks but no more than the origin-visit-statuses # written in the journal assert len(tasks) <= len(visit_statuses_full) actual_origins = [] for task in tasks: actual_task = dict(task) assert actual_task["type"] == "index-origin-metadata" scheduled_origins = actual_task["arguments"]["args"][0] actual_origins.extend(scheduled_origins) assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} def test_cli_journal_client_without_brokers( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer ): """Without brokers configuration, the cli fails.""" with pytest.raises(ValueError, match="brokers"): cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", ], catch_exceptions=False, ) @pytest.mark.parametrize("indexer_name", ["origin-intrinsic-metadata", "*"]) def test_cli_journal_client_index( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] storage.revision_add([REVISION]) mocker.patch( "swh.indexer.metadata.get_head_swhid", return_value=REVISION.swhid(), ) mocker.patch( "swh.indexer.metadata.DirectoryMetadataIndexer.index", return_value=[ DirectoryIntrinsicMetadataRow( id=DIRECTORY2.id, indexer_configuration_id=1, mappings=["cff"], metadata={"foo": "bar"}, ) ], ) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_intrinsic_metadata_get( [status.origin for status in visit_statuses] ) expected_results = [ OriginIntrinsicMetadataRow( id=status.origin, from_directory=DIRECTORY2.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["cff"], metadata={"foo": "bar"}, ) for status in sorted(visit_statuses_full, key=lambda r: r.origin) ] assert sorted(results, key=lambda r: r.id) == expected_results