diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py index 2c909b9..f5c8889 100644 --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,397 +1,397 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Callable, Dict, Iterator, List, Optional # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group @swh_cli_group.group( name="indexer", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="Configuration file.", ) @click.pass_context def indexer_cli_group(ctx, config_file): """Software Heritage Indexer tools. The Indexer is used to mine the content of the archive and extract derived information from archive source code artifacts. """ from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf def _get_api(getter, config, config_key, url): if url: config[config_key] = {"cls": "remote", "url": url} elif config_key not in config: raise click.ClickException("Missing configuration for {}".format(config_key)) return getter(**config[config_key]) @indexer_cli_group.group("mapping") def mapping(): """Manage Software Heritage Indexer mappings.""" pass @mapping.command("list") def mapping_list(): """Prints the list of known mappings.""" from swh.indexer import metadata_dictionary mapping_names = [mapping.name for mapping in metadata_dictionary.MAPPINGS.values()] mapping_names.sort() for mapping_name in mapping_names: click.echo(mapping_name) @mapping.command("list-terms") @click.option( "--exclude-mapping", multiple=True, help="Exclude the given mapping from the output" ) @click.option( "--concise", is_flag=True, default=False, help="Don't print the list of mappings supporting each term.", ) def mapping_list_terms(concise, exclude_mapping): """Prints the list of known CodeMeta terms, and which mappings support them.""" from swh.indexer import metadata_dictionary properties = metadata_dictionary.list_terms() for (property_name, supported_mappings) in sorted(properties.items()): supported_mappings = {m.name for m in supported_mappings} supported_mappings -= set(exclude_mapping) if supported_mappings: if concise: click.echo(property_name) else: click.echo("{}:".format(property_name)) click.echo("\t" + ", ".join(sorted(supported_mappings))) @mapping.command("translate") @click.argument("mapping-name") @click.argument("file", type=click.File("rb")) def mapping_translate(mapping_name, file): """Translates file from mapping-name to codemeta format.""" import json from swh.indexer import metadata_dictionary mapping_cls = [ cls for cls in metadata_dictionary.MAPPINGS.values() if cls.name == mapping_name ] if not mapping_cls: raise click.ClickException("Unknown mapping {}".format(mapping_name)) assert len(mapping_cls) == 1 mapping_cls = mapping_cls[0] mapping = mapping_cls() codemeta_doc = mapping.translate(file.read()) click.echo(json.dumps(codemeta_doc, indent=4)) @indexer_cli_group.group("schedule") @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--indexer-storage-url", "-i", default=None, help="URL of the indexer storage API" ) @click.option( "--storage-url", "-g", default=None, help="URL of the (graph) storage API" ) @click.option( "--dry-run/--no-dry-run", is_flag=True, default=False, help="List only what would be scheduled.", ) @click.pass_context def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, dry_run): """Manipulate Software Heritage Indexer tasks. Via SWH Scheduler's API.""" from swh.indexer.storage import get_indexer_storage from swh.scheduler import get_scheduler from swh.storage import get_storage ctx.obj["indexer_storage"] = _get_api( get_indexer_storage, ctx.obj["config"], "indexer_storage", indexer_storage_url ) ctx.obj["storage"] = _get_api( get_storage, ctx.obj["config"], "storage", storage_url ) ctx.obj["scheduler"] = _get_api( get_scheduler, ctx.obj["config"], "scheduler", scheduler_url ) if dry_run: ctx.obj["scheduler"] = None def list_origins_by_producer(idx_storage, mappings, tool_ids) -> Iterator[str]: next_page_token = "" limit = 10000 while next_page_token is not None: result = idx_storage.origin_intrinsic_metadata_search_by_producer( page_token=next_page_token, limit=limit, ids_only=True, mappings=mappings or None, tool_ids=tool_ids or None, ) next_page_token = result.next_page_token yield from result.results @schedule.command("reindex_origin_metadata") @click.option( "--batch-size", "-b", "origin_batch_size", default=10, show_default=True, type=int, help="Number of origins per task", ) @click.option( "--tool-id", "-t", "tool_ids", type=int, multiple=True, help="Restrict search of old metadata to this/these tool ids.", ) @click.option( "--mapping", "-m", "mappings", multiple=True, help="Mapping(s) that should be re-scheduled (eg. 'npm', 'gemspec', 'maven')", ) @click.option( "--task-type", default="index-origin-metadata", show_default=True, help="Name of the task type to schedule.", ) @click.pass_context def schedule_origin_metadata_reindex( ctx, origin_batch_size, tool_ids, mappings, task_type ): """Schedules indexing tasks for origins that were already indexed.""" from swh.scheduler.cli_utils import schedule_origin_batches idx_storage = ctx.obj["indexer_storage"] scheduler = ctx.obj["scheduler"] origins = list_origins_by_producer(idx_storage, mappings, tool_ids) kwargs = {"retries_left": 1} schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) @indexer_cli_group.command("journal-client") @click.argument( "indexer", type=click.Choice( [ - "origin-intrinsic-metadata", - "extrinsic-metadata", - "content-mimetype", - "content-fossology-license", + "origin_intrinsic_metadata", + "extrinsic_metadata", + "content_mimetype", + "content_fossology_license", "*", ] ), required=False # TODO: remove required=False after we stop using it ) @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--origin-metadata-task-type", default="index-origin-metadata", help="Name of the task running the origin metadata indexer.", ) @click.option( "--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to." ) @click.option( "--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from." ) @click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client( ctx, indexer: Optional[str], scheduler_url: str, origin_metadata_task_type: str, brokers: List[str], prefix: str, group_id: str, stop_after_objects: Optional[int], ): """ Listens for new objects from the SWH Journal, and either: * runs the indexer with the name passed as argument, if any * schedules tasks to run relevant indexers (currently, only - origin-intrinsic-metadata) on these new objects otherwise. + origin_intrinsic_metadata) on these new objects otherwise. Passing '*' as indexer name runs all indexers. """ import functools import warnings from swh.indexer.indexer import BaseIndexer, ObjectsDict from swh.indexer.journal_client import process_journal_objects from swh.journal.client import get_journal_client from swh.scheduler import get_scheduler cfg = ctx.obj["config"] journal_cfg = cfg.get("journal", {}) scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) brokers = brokers or journal_cfg.get("brokers") if not brokers: raise ValueError("The brokers configuration is mandatory.") prefix = prefix or journal_cfg.get("prefix") group_id = group_id or journal_cfg.get("group_id") origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( "origin_metadata_task_type" ) stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") object_types = set() worker_fns: List[Callable[[ObjectsDict], Dict]] = [] if indexer is None: warnings.warn( "'swh indexer journal-client' with no argument creates scheduler tasks " "to index, rather than index directly.", DeprecationWarning, ) object_types.add("origin_visit_status") worker_fns.append( functools.partial( process_journal_objects, scheduler=scheduler, task_names={ "origin_metadata": origin_metadata_task_type, }, ) ) idx: Optional[BaseIndexer] = None - if indexer in ("origin-intrinsic-metadata", "*"): + if indexer in ("origin_intrinsic_metadata", "*"): from swh.indexer.metadata import OriginMetadataIndexer object_types.add("origin_visit_status") idx = OriginMetadataIndexer() idx.catch_exceptions = False # don't commit offsets if indexation failed worker_fns.append(idx.process_journal_objects) - if indexer in ("extrinsic-metadata", "*"): + if indexer in ("extrinsic_metadata", "*"): from swh.indexer.metadata import ExtrinsicMetadataIndexer object_types.add("raw_extrinsic_metadata") idx = ExtrinsicMetadataIndexer() idx.catch_exceptions = False # don't commit offsets if indexation failed worker_fns.append(idx.process_journal_objects) - if indexer in ("content-mimetype", "*"): + if indexer in ("content_mimetype", "*"): from swh.indexer.mimetype import MimetypeIndexer object_types.add("content") idx = MimetypeIndexer() idx.catch_exceptions = False # don't commit offsets if indexation failed worker_fns.append(idx.process_journal_objects) - if indexer in ("content-fossology-license", "*"): + if indexer in ("content_fossology_license", "*"): from swh.indexer.fossology_license import FossologyLicenseIndexer object_types.add("content") idx = FossologyLicenseIndexer() idx.catch_exceptions = False # don't commit offsets if indexation failed worker_fns.append(idx.process_journal_objects) if not worker_fns: raise click.ClickException(f"Unknown indexer: {indexer}") client = get_journal_client( cls="kafka", brokers=brokers, prefix=prefix, group_id=group_id, object_types=list(object_types), stop_after_objects=stop_after_objects, ) def worker_fn(objects: ObjectsDict): for fn in worker_fns: fn(objects) try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() @indexer_cli_group.command("rpc-serve") @click.argument("config-path", required=True) @click.option("--host", default="", help="Host to run the server") @click.option("--port", default=5007, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode", ) def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" from swh.indexer.storage.api.server import app, load_and_check_config api_cfg = load_and_check_config(config_path, type="any") app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) def main(): return indexer_cli_group(auto_envvar_prefix="SWH_INDEXER") if __name__ == "__main__": main() diff --git a/swh/indexer/codemeta.py b/swh/indexer/codemeta.py index 8f492a5..6c4ef58 100644 --- a/swh/indexer/codemeta.py +++ b/swh/indexer/codemeta.py @@ -1,220 +1,220 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import csv import itertools import json import os.path import re from typing import Any, List from pyld import jsonld import swh.indexer _DATA_DIR = os.path.join(os.path.dirname(swh.indexer.__file__), "data") CROSSWALK_TABLE_PATH = os.path.join(_DATA_DIR, "codemeta", "crosswalk.csv") CODEMETA_CONTEXT_PATH = os.path.join(_DATA_DIR, "codemeta", "codemeta.jsonld") with open(CODEMETA_CONTEXT_PATH) as fd: CODEMETA_CONTEXT = json.load(fd) _EMPTY_PROCESSED_CONTEXT: Any = {"mappings": {}} _PROCESSED_CODEMETA_CONTEXT = jsonld.JsonLdProcessor().process_context( _EMPTY_PROCESSED_CONTEXT, CODEMETA_CONTEXT, None ) CODEMETA_CONTEXT_URL = "https://doi.org/10.5063/schema/codemeta-2.0" CODEMETA_ALTERNATE_CONTEXT_URLS = { ("https://raw.githubusercontent.com/codemeta/codemeta/master/codemeta.jsonld") } CODEMETA_URI = "https://codemeta.github.io/terms/" SCHEMA_URI = "http://schema.org/" FORGEFED_URI = "https://forgefed.org/ns#" ACTIVITYSTREAMS_URI = "https://www.w3.org/ns/activitystreams#" PROPERTY_BLACKLIST = { # CodeMeta properties that we cannot properly represent. SCHEMA_URI + "softwareRequirements", CODEMETA_URI + "softwareSuggestions", # Duplicate of 'author' SCHEMA_URI + "creator", } _codemeta_field_separator = re.compile(r"\s*[,/]\s*") def make_absolute_uri(local_name): """Parses codemeta.jsonld, and returns the @id of terms it defines. >>> make_absolute_uri("name") 'http://schema.org/name' >>> make_absolute_uri("downloadUrl") 'http://schema.org/downloadUrl' >>> make_absolute_uri("referencePublication") 'https://codemeta.github.io/terms/referencePublication' """ uri = jsonld.JsonLdProcessor.get_context_value( _PROCESSED_CODEMETA_CONTEXT, local_name, "@id" ) assert uri.startswith(("@", CODEMETA_URI, SCHEMA_URI)), (local_name, uri) return uri def _read_crosstable(fd): reader = csv.reader(fd) try: header = next(reader) except StopIteration: raise ValueError("empty file") data_sources = set(header) - {"Parent Type", "Property", "Type", "Description"} codemeta_translation = {data_source: {} for data_source in data_sources} terms = set() for line in reader: # For each canonical name local_name = dict(zip(header, line))["Property"] if not local_name: continue canonical_name = make_absolute_uri(local_name) if canonical_name in PROPERTY_BLACKLIST: continue terms.add(canonical_name) for (col, value) in zip(header, line): # For each cell in the row if col in data_sources: # If that's not the parentType/property/type/description for local_name in _codemeta_field_separator.split(value): # For each of the data source's properties that maps # to this canonical name if local_name.strip(): codemeta_translation[col][local_name.strip()] = canonical_name return (terms, codemeta_translation) with open(CROSSWALK_TABLE_PATH) as fd: (CODEMETA_TERMS, CROSSWALK_TABLE) = _read_crosstable(fd) def _document_loader(url, options=None): """Document loader for pyld. Reads the local codemeta.jsonld file instead of fetching it from the Internet every single time.""" if url == CODEMETA_CONTEXT_URL or url in CODEMETA_ALTERNATE_CONTEXT_URLS: return { "contextUrl": None, "documentUrl": url, "document": CODEMETA_CONTEXT, } elif url == CODEMETA_URI: raise Exception( "{} is CodeMeta's URI, use {} as context url".format( CODEMETA_URI, CODEMETA_CONTEXT_URL ) ) else: raise Exception(url) def compact(doc, forgefed: bool): """Same as `pyld.jsonld.compact`, but in the context of CodeMeta. Args: forgefed: Whether to add ForgeFed and ActivityStreams as compact URIs. This is typically used for extrinsic metadata documents, which frequently use properties from these namespaces. """ contexts: List[Any] = [CODEMETA_CONTEXT_URL] if forgefed: contexts.append({"as": ACTIVITYSTREAMS_URI, "forge": FORGEFED_URI}) return jsonld.compact(doc, contexts, options={"documentLoader": _document_loader}) def expand(doc): """Same as `pyld.jsonld.expand`, but in the context of CodeMeta.""" return jsonld.expand(doc, options={"documentLoader": _document_loader}) def merge_values(v1, v2): """If v1 and v2 are of the form `{"@list": l1}` and `{"@list": l2}`, returns `{"@list": l1 + l2}`. Otherwise, make them lists (if they are not already) and concatenate them. >>> merge_values('a', 'b') ['a', 'b'] >>> merge_values(['a', 'b'], 'c') ['a', 'b', 'c'] >>> merge_values({'@list': ['a', 'b']}, {'@list': ['c']}) {'@list': ['a', 'b', 'c']} """ if v1 is None: return v2 elif v2 is None: return v1 elif isinstance(v1, dict) and set(v1) == {"@list"}: assert isinstance(v1["@list"], list) if isinstance(v2, dict) and set(v2) == {"@list"}: assert isinstance(v2["@list"], list) return {"@list": v1["@list"] + v2["@list"]} else: raise ValueError("Cannot merge %r and %r" % (v1, v2)) else: if isinstance(v2, dict) and "@list" in v2: raise ValueError("Cannot merge %r and %r" % (v1, v2)) if not isinstance(v1, list): v1 = [v1] if not isinstance(v2, list): v2 = [v2] return v1 + v2 def merge_documents(documents): """Takes a list of metadata dicts, each generated from a different metadata file, and merges them. Removes duplicates, if any.""" documents = list(itertools.chain.from_iterable(map(expand, documents))) merged_document = collections.defaultdict(list) for document in documents: for (key, values) in document.items(): if key == "@id": # @id does not get expanded to a list value = values # Only one @id is allowed, move it to sameAs if "@id" not in merged_document: merged_document["@id"] = value elif value != merged_document["@id"]: if value not in merged_document[SCHEMA_URI + "sameAs"]: merged_document[SCHEMA_URI + "sameAs"].append(value) else: for value in values: if isinstance(value, dict) and set(value) == {"@list"}: # Value is of the form {'@list': [item1, item2]} # instead of the usual [item1, item2]. # We need to merge the inner lists (and mostly # preserve order). merged_value = merged_document.setdefault(key, {"@list": []}) for subvalue in value["@list"]: # merged_value must be of the form # {'@list': [item1, item2]}; as it is the same # type as value, which is an @list. if subvalue not in merged_value["@list"]: merged_value["@list"].append(subvalue) elif value not in merged_document[key]: merged_document[key].append(value) - # XXX: we should set forgefed=True when merging extrinsic-metadata documents. + # XXX: we should set forgefed=True when merging extrinsic_metadata documents. # however, this function is only used to merge multiple files of the same # directory (which is only for intrinsic-metadata), so it is not an issue for now return compact(merged_document, forgefed=False) diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 601dc6b..2ac4adc 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,270 +1,270 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar from typing_extensions import TypedDict import yaml from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values from swh.indexer.storage.interface import Sha1 class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None] ) def produce_terms( namespace: str, terms: List[str] ) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore f.produced_terms.extend(namespace + term for term in terms) # type: ignore return f return decorator class BaseMapping: """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`, not to be inherited directly.""" def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") def translate(self, file_content: bytes) -> Optional[Dict]: """Translates metadata, from the content of a file or of a RawExtrinsicMetadata object.""" raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation") class BaseExtrinsicMapping(BaseMapping): - """Base class for extrinsic-metadata mappings to inherit from + """Base class for extrinsic_metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=True) class BaseIntrinsicMapping(BaseMapping): """Base class for intrinsic-metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=False) class SingleFileIntrinsicMapping(BaseIntrinsicMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { term for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms def _translate_dict( self, content_dict: Dict, *, normalize: bool = True ) -> Dict[str, str]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v if normalize: return self.normalize_translation(translated_metadata) else: return translated_metadata class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None class SafeLoader(yaml.SafeLoader): yaml_implicit_resolvers = { k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() } class YamlMapping(DictMapping, SingleFileIntrinsicMapping): """Base class for all mappings that use Yaml data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: raw_content_string: str = raw_content.decode() try: content_dict = yaml.load(raw_content_string, Loader=SafeLoader) except yaml.scanner.ScannerError: return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 1504374..bd67a05 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,908 +1,908 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import reduce import re from typing import Any, Dict, List from unittest.mock import patch import attr from click.testing import CliRunner from confluent_kafka import Consumer import pytest from swh.indexer import fossology_license from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( ContentLicenseRow, ContentMimetypeRow, DirectoryIntrinsicMetadataRow, OriginExtrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes from swh.model.model import Content, Origin, OriginVisitStatus from .test_metadata import REMD from .utils import ( DIRECTORY2, RAW_CONTENT_IDS, RAW_CONTENTS, REVISION, SHA1_TO_LICENSES, mock_compute_license, ) def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ { "tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, from_directory=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] directory_metadata = [ DirectoryIntrinsicMetadataRow( id=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.directory_intrinsic_metadata_add(directory_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) @pytest.fixture def cli_runner(): return CliRunner() def test_cli_mapping_list(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list"], catch_exceptions=False, ) expected_output = "\n".join( [ "cff", "codemeta", "composer", "gemspec", "github", "maven", "npm", "pkg-info", "pubspec", "", ] # must be sorted for test to pass ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_cli_mapping_list_terms(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_cli_mapping_list_terms_exclude(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_empty_db( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_divisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_dry_run( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_nondivisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--batch-size", "20", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_mapping( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_two_mappings( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "--config-file", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_tool( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]), ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def now(): return datetime.datetime.now(tz=datetime.timezone.utc) def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), "--origin-metadata-task-type", "index-origin-metadata", ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") # This can be split into multiple tasks but no more than the origin-visit-statuses # written in the journal assert len(tasks) <= len(visit_statuses_full) actual_origins = [] for task in tasks: actual_task = dict(task) assert actual_task["type"] == "index-origin-metadata" scheduled_origins = actual_task["arguments"]["args"][0] actual_origins.extend(scheduled_origins) assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} def test_cli_journal_client_without_brokers( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer ): """Without brokers configuration, the cli fails.""" with pytest.raises(ValueError, match="brokers"): cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", ], catch_exceptions=False, ) -@pytest.mark.parametrize("indexer_name", ["origin-intrinsic-metadata", "*"]) +@pytest.mark.parametrize("indexer_name", ["origin_intrinsic_metadata", "*"]) def test_cli_journal_client_index__origin_intrinsic_metadata( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] storage.revision_add([REVISION]) mocker.patch( "swh.indexer.metadata.get_head_swhid", return_value=REVISION.swhid(), ) mocker.patch( "swh.indexer.metadata.DirectoryMetadataIndexer.index", return_value=[ DirectoryIntrinsicMetadataRow( id=DIRECTORY2.id, indexer_configuration_id=1, mappings=["cff"], metadata={"foo": "bar"}, ) ], ) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_intrinsic_metadata_get( [status.origin for status in visit_statuses] ) expected_results = [ OriginIntrinsicMetadataRow( id=status.origin, from_directory=DIRECTORY2.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["cff"], metadata={"foo": "bar"}, ) for status in sorted(visit_statuses_full, key=lambda r: r.origin) ] assert sorted(results, key=lambda r: r.id) == expected_results -@pytest.mark.parametrize("indexer_name", ["extrinsic-metadata", "*"]) +@pytest.mark.parametrize("indexer_name", ["extrinsic_metadata", "*"]) def test_cli_journal_client_index__origin_extrinsic_metadata( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) origin = Origin("http://example.org/repo.git") storage.origin_add([origin]) raw_extrinsic_metadata = attr.evolve(REMD, target=origin.swhid()) raw_extrinsic_metadata = attr.evolve( raw_extrinsic_metadata, id=raw_extrinsic_metadata.compute_hash() ) journal_writer.write_additions("raw_extrinsic_metadata", [raw_extrinsic_metadata]) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", 1, ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_extrinsic_metadata_get([origin.url]) expected_results = [ OriginExtrinsicMetadataRow( id=origin.url, from_remd_id=raw_extrinsic_metadata.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["github"], metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "https://forgefed.org/ns#Repository", "name": "test software", }, ) ] assert sorted(results, key=lambda r: r.id) == expected_results def test_cli_journal_client_index__content_mimetype( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, obj_storage, storage, mocker, swh_indexer_config, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) contents = [] expected_results = [] content_ids = [] for content_id, (raw_content, mimetypes, encoding) in RAW_CONTENTS.items(): content = Content.from_data(raw_content) assert content_id == content.sha1 contents.append(content) content_ids.append(content_id) # Older libmagic versions (e.g. buster: 1:5.35-4+deb10u2, bullseye: 1:5.39-3) # returns different results. This allows to deal with such a case when executing # tests on different environments machines (e.g. ci tox, ci debian, dev machine, # ...) all_mimetypes = mimetypes if isinstance(mimetypes, tuple) else [mimetypes] expected_results.extend( [ ContentMimetypeRow( id=content.sha1, tool={"id": 1, **swh_indexer_config["tools"]}, mimetype=mimetype, encoding=encoding, ) for mimetype in all_mimetypes ] ) assert len(contents) == len(RAW_CONTENTS) journal_writer.write_additions("content", contents) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", - "content-mimetype", + "content_mimetype", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(contents), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.content_mimetype_get(content_ids) assert len(results) == len(contents) for result in results: assert result in expected_results def test_cli_journal_client_index__fossology_license( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, obj_storage, storage, mocker, swh_indexer_config, ): """Test the 'swh indexer journal-client' cli tool.""" # Patch fossology_license.compute_license = mock_compute_license journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) tool = {"id": 1, **swh_indexer_config["tools"]} id0, id1, id2 = RAW_CONTENT_IDS contents = [] content_ids = [] expected_results = [] for content_id, (raw_content, _, _) in RAW_CONTENTS.items(): content = Content.from_data(raw_content) assert content_id == content.sha1 contents.append(content) content_ids.append(content_id) expected_results.extend( [ ContentLicenseRow(id=content_id, tool=tool, license=license) for license in SHA1_TO_LICENSES[content_id] ] ) assert len(contents) == len(RAW_CONTENTS) journal_writer.write_additions("content", contents) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", - "content-fossology-license", + "content_fossology_license", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(contents), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.content_fossology_license_get(content_ids) assert len(results) == len(expected_results) for result in results: assert result in expected_results