diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py index aac7788..33d462c 100644 --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,320 +1,348 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Iterator +from typing import Callable, Dict, Iterator, List, Optional # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group @swh_cli_group.group( name="indexer", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="Configuration file.", ) @click.pass_context def indexer_cli_group(ctx, config_file): """Software Heritage Indexer tools. The Indexer is used to mine the content of the archive and extract derived information from archive source code artifacts. """ from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf def _get_api(getter, config, config_key, url): if url: config[config_key] = {"cls": "remote", "url": url} elif config_key not in config: raise click.ClickException("Missing configuration for {}".format(config_key)) return getter(**config[config_key]) @indexer_cli_group.group("mapping") def mapping(): """Manage Software Heritage Indexer mappings.""" pass @mapping.command("list") def mapping_list(): """Prints the list of known mappings.""" from swh.indexer import metadata_dictionary mapping_names = [mapping.name for mapping in metadata_dictionary.MAPPINGS.values()] mapping_names.sort() for mapping_name in mapping_names: click.echo(mapping_name) @mapping.command("list-terms") @click.option( "--exclude-mapping", multiple=True, help="Exclude the given mapping from the output" ) @click.option( "--concise", is_flag=True, default=False, help="Don't print the list of mappings supporting each term.", ) def mapping_list_terms(concise, exclude_mapping): """Prints the list of known CodeMeta terms, and which mappings support them.""" from swh.indexer import metadata_dictionary properties = metadata_dictionary.list_terms() for (property_name, supported_mappings) in sorted(properties.items()): supported_mappings = {m.name for m in supported_mappings} supported_mappings -= set(exclude_mapping) if supported_mappings: if concise: click.echo(property_name) else: click.echo("{}:".format(property_name)) click.echo("\t" + ", ".join(sorted(supported_mappings))) @mapping.command("translate") @click.argument("mapping-name") @click.argument("file", type=click.File("rb")) def mapping_translate(mapping_name, file): """Translates file from mapping-name to codemeta format.""" import json from swh.indexer import metadata_dictionary mapping_cls = [ cls for cls in metadata_dictionary.MAPPINGS.values() if cls.name == mapping_name ] if not mapping_cls: raise click.ClickException("Unknown mapping {}".format(mapping_name)) assert len(mapping_cls) == 1 mapping_cls = mapping_cls[0] mapping = mapping_cls() codemeta_doc = mapping.translate(file.read()) click.echo(json.dumps(codemeta_doc, indent=4)) @indexer_cli_group.group("schedule") @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--indexer-storage-url", "-i", default=None, help="URL of the indexer storage API" ) @click.option( "--storage-url", "-g", default=None, help="URL of the (graph) storage API" ) @click.option( "--dry-run/--no-dry-run", is_flag=True, default=False, help="List only what would be scheduled.", ) @click.pass_context def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, dry_run): """Manipulate Software Heritage Indexer tasks. Via SWH Scheduler's API.""" from swh.indexer.storage import get_indexer_storage from swh.scheduler import get_scheduler from swh.storage import get_storage ctx.obj["indexer_storage"] = _get_api( get_indexer_storage, ctx.obj["config"], "indexer_storage", indexer_storage_url ) ctx.obj["storage"] = _get_api( get_storage, ctx.obj["config"], "storage", storage_url ) ctx.obj["scheduler"] = _get_api( get_scheduler, ctx.obj["config"], "scheduler", scheduler_url ) if dry_run: ctx.obj["scheduler"] = None def list_origins_by_producer(idx_storage, mappings, tool_ids) -> Iterator[str]: next_page_token = "" limit = 10000 while next_page_token is not None: result = idx_storage.origin_intrinsic_metadata_search_by_producer( page_token=next_page_token, limit=limit, ids_only=True, mappings=mappings or None, tool_ids=tool_ids or None, ) next_page_token = result.next_page_token yield from result.results @schedule.command("reindex_origin_metadata") @click.option( "--batch-size", "-b", "origin_batch_size", default=10, show_default=True, type=int, help="Number of origins per task", ) @click.option( "--tool-id", "-t", "tool_ids", type=int, multiple=True, help="Restrict search of old metadata to this/these tool ids.", ) @click.option( "--mapping", "-m", "mappings", multiple=True, help="Mapping(s) that should be re-scheduled (eg. 'npm', 'gemspec', 'maven')", ) @click.option( "--task-type", default="index-origin-metadata", show_default=True, help="Name of the task type to schedule.", ) @click.pass_context def schedule_origin_metadata_reindex( ctx, origin_batch_size, tool_ids, mappings, task_type ): """Schedules indexing tasks for origins that were already indexed.""" from swh.scheduler.cli_utils import schedule_origin_batches idx_storage = ctx.obj["indexer_storage"] scheduler = ctx.obj["scheduler"] origins = list_origins_by_producer(idx_storage, mappings, tool_ids) kwargs = {"retries_left": 1} schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) @indexer_cli_group.command("journal-client") +@click.argument( + "indexer", + type=click.Choice(["origin-intrinsic-metadata"]), + required=False + # TODO: remove required=False after we stop using it +) @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--origin-metadata-task-type", default="index-origin-metadata", help="Name of the task running the origin metadata indexer.", ) @click.option( "--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to." ) @click.option( "--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from." ) @click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client( ctx, - scheduler_url, - origin_metadata_task_type, - brokers, - prefix, - group_id, - stop_after_objects, + indexer: Optional[str], + scheduler_url: str, + origin_metadata_task_type: str, + brokers: List[str], + prefix: str, + group_id: str, + stop_after_objects: Optional[int], ): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin-intrinsic-metadata) on these new objects.""" import functools + import warnings + from swh.indexer.indexer import ObjectsDict from swh.indexer.journal_client import process_journal_objects from swh.journal.client import get_journal_client from swh.scheduler import get_scheduler cfg = ctx.obj["config"] journal_cfg = cfg.get("journal", {}) scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) brokers = brokers or journal_cfg.get("brokers") if not brokers: raise ValueError("The brokers configuration is mandatory.") prefix = prefix or journal_cfg.get("prefix") group_id = group_id or journal_cfg.get("group_id") origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( "origin_metadata_task_type" ) stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") + worker_fn: Callable[[ObjectsDict], Dict] + + if indexer is None: + warnings.warn( + "'swh indexer journal-client' with no argument creates scheduler tasks " + "to index, rather than index directly.", + DeprecationWarning, + ) + object_types = ["origin_visit_status"] + worker_fn = functools.partial( + process_journal_objects, + scheduler=scheduler, + task_names={ + "origin_metadata": origin_metadata_task_type, + }, + ) + elif indexer == "origin-intrinsic-metadata": + from swh.indexer.metadata import OriginMetadataIndexer + + object_types = ["origin_visit_status"] + idx = OriginMetadataIndexer() + idx.catch_exceptions = False # don't commit offsets if indexation failed + worker_fn = idx.process_journal_objects + else: + raise click.ClickException(f"Unknown indexer: {indexer}") + client = get_journal_client( cls="kafka", brokers=brokers, prefix=prefix, group_id=group_id, - object_types=["origin_visit_status"], + object_types=object_types, stop_after_objects=stop_after_objects, ) - worker_fn = functools.partial( - process_journal_objects, - scheduler=scheduler, - task_names={ - "origin_metadata": origin_metadata_task_type, - }, - ) try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() @indexer_cli_group.command("rpc-serve") @click.argument("config-path", required=True) @click.option("--host", default="0.0.0.0", help="Host to run the server") @click.option("--port", default=5007, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode", ) def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" from swh.indexer.storage.api.server import app, load_and_check_config api_cfg = load_and_check_config(config_path, type="any") app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) def main(): return indexer_cli_group(auto_envvar_prefix="SWH_INDEXER") if __name__ == "__main__": main() diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 81d8666..3341a75 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,647 +1,659 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from contextlib import contextmanager import logging import os import shutil import tempfile from typing import ( Any, Dict, Generic, Iterable, Iterator, List, Optional, Set, TypeVar, Union, ) import warnings import sentry_sdk from typing_extensions import TypedDict from swh.core import utils from swh.core.config import load_from_envvar, merge_configs from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage from swh.indexer.storage.interface import IndexerStorageInterface from swh.model import hashutil from swh.model.model import Origin, Revision, Sha1Git from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.scheduler import CONFIG as SWH_CONFIG from swh.storage import get_storage from swh.storage.interface import StorageInterface class ObjectsDict(TypedDict, total=False): revision: List[Dict] origin: List[Dict] origin_visit_status: List[Dict] @contextmanager def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. Args: filename: one of sha1's many filenames data: the sha1's content to write in temporary file working_directory: the directory into which the file is written Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, "wb") as f: f.write(data) yield content_path shutil.rmtree(temp_dir) DEFAULT_CONFIG = { INDEXER_CFG_KEY: {"cls": "memory"}, "storage": {"cls": "memory"}, "objstorage": {"cls": "memory"}, } TId = TypeVar("TId") """type of the ids of index()ed objects.""" TData = TypeVar("TData") """type of the objects passed to index().""" TResult = TypeVar("TResult") """return type of index()""" class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :meth:`~BaseIndexer.run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :meth:`~BaseIndexer.filter`: filter out data already indexed (in storage). :meth:`~BaseIndexer.index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :meth:`~BaseIndexer.persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :meth:`~BaseIndexer.prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :meth:`~BaseIndexer.check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :meth:`~BaseIndexer.register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ results: List[TResult] USE_TOOLS = True catch_exceptions = True """Prevents exceptions in `index()` from raising too high. Set to False in tests to properly catch all exceptions.""" scheduler: Any storage: StorageInterface objstorage: Any idx_storage: IndexerStorageInterface def __init__(self, config=None, **kw) -> None: """Prepare and check that the indexer is ready to run.""" super().__init__() if config is not None: self.config = config elif SWH_CONFIG: self.config = SWH_CONFIG.copy() else: self.config = load_from_envvar() self.config = merge_configs(DEFAULT_CONFIG, self.config) self.prepare() self.check() self.log.debug("%s: config=%s", self, self.config) def prepare(self) -> None: """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ config_storage = self.config.get("storage") if config_storage: self.storage = get_storage(**config_storage) self.objstorage = get_objstorage(**self.config["objstorage"]) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) self.log = logging.getLogger("swh.indexer") if self.USE_TOOLS: self.tools = list(self.register_tools(self.config.get("tools", []))) self.results = [] @property def tool(self) -> Dict: return self.tools[0] def check(self) -> None: """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if self.USE_TOOLS and not self.tools: raise ValueError("Tools %s is unknown, cannot continue" % self.tools) def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]: """Prepare the tool dict to be compliant with the storage api.""" return {"tool_%s" % key: value for key, value in tool.items()} def register_tools( self, tools: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> List[Dict[str, Any]]: """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools: Either a dict or a list of dict. Returns: list: List of dicts with additional id key. Raises: ValueError: if not a list nor a dict. """ if isinstance(tools, list): tools = list(map(self._prepare_tool, tools)) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError("Configuration tool(s) must be a dict or list!") if tools: return self.idx_storage.indexer_configuration_add(tools) else: return [] def index(self, id: TId, data: Optional[TData], **kwargs) -> List[TResult]: """Index computation for the id and associated raw data. Args: id: identifier or Dict object data: id's data from storage or objstorage depending on object type Returns: dict: a dict that makes sense for the :meth:`.persist_index_computations` method. """ raise NotImplementedError() def filter(self, ids: List[TId]) -> Iterator[TId]: """Filter missing ids for that particular indexer. Args: ids: list of ids Yields: iterator of missing ids """ yield from ids @abc.abstractmethod def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]: """Persist the computation resulting from the index. Args: results: List of results. One result is the result of the index function. Returns: a summary dict of what has been inserted in the storage """ return {} class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content indexer working on a list of ids directly. To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1], **kwargs) -> Dict: """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results Args: ids (Iterable[Union[bytes, str]]): sha1's identifier list **kwargs: passed to the `index` method Returns: A summary Dict of the task's status """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] sha1s = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] results = [] summary: Dict = {"status": "uneventful"} try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning( "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1) ) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.extend(res) summary["status"] = "eventful" summary = self.persist_index_computations(results) self.results = results except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when reading contents metadata.") sentry_sdk.capture_exception() summary["status"] = "failed" return summary class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content partition indexer. This expects as input a partition_id and a nb_partitions. This will then index the contents within that partition. To work on a list of ids, use the :class:`ContentIndexer` instead. Note: :class:`ContentPartitionIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ @abc.abstractmethod def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int ) -> Iterable[Sha1]: """Retrieve indexed contents within range [start, end]. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination """ pass def _list_contents_to_index( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] ) -> Iterable[Sha1]: """Compute from storage the new contents to index in the partition_id . The already indexed contents are skipped. Args: partition_id: Index of the partition to fetch data from nb_partitions: Total number of partition indexed: Set of content already indexed. Yields: Sha1 id (bytes) of contents to index """ if not isinstance(partition_id, int) or not isinstance(nb_partitions, int): raise TypeError( f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}." ) next_page_token = None while True: result = self.storage.content_get_partition( partition_id, nb_partitions, page_token=next_page_token ) contents = result.results for c in contents: _id = hashutil.hash_to_bytes(c.sha1) if _id in indexed: continue yield _id next_page_token = result.next_page_token if next_page_token is None: break def _index_contents( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any ) -> Iterator[TResult]: """Index the contents within the partition_id. Args: start: Starting bound from range identifier end: End range identifier indexed: Set of content already indexed. Yields: indexing result as dict to persist in the indexer backend """ for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning(f"Content {sha1.hex()} not found in objstorage") continue yield from self.index(sha1, raw_content, **kwargs) def _index_with_skipping_already_done( self, partition_id: int, nb_partitions: int ) -> Iterator[TResult]: """Index not already indexed contents within the partition partition_id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into Yields: indexing result as dict to persist in the indexer backend """ already_indexed_contents = set( self.indexed_contents_in_partition(partition_id, nb_partitions) ) return self._index_contents( partition_id, nb_partitions, already_indexed_contents ) def run( self, partition_id: int, nb_partitions: int, skip_existing: bool = True, **kwargs, ) -> Dict: """Given a partition of content ids, index the contents within. Either the indexer is incremental (filter out existing computed data) or it computes everything from scratch. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into skip_existing: Skip existing indexed data (default) or not **kwargs: passed to the `index` method Returns: dict with the indexing task status """ summary: Dict[str, Any] = {"status": "uneventful"} count = 0 try: if skip_existing: gen = self._index_with_skipping_already_done( partition_id, nb_partitions ) else: gen = self._index_contents(partition_id, nb_partitions, indexed=set([])) count_object_added_key: Optional[str] = None for contents in utils.grouper(gen, n=self.config["write_batch_size"]): res = self.persist_index_computations(list(contents)) if not count_object_added_key: count_object_added_key = list(res.keys())[0] count += res[count_object_added_key] if count > 0: summary["status"] = "eventful" except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when computing metadata.") sentry_sdk.capture_exception() summary["status"] = "failed" if count > 0 and count_object_added_key: summary[count_object_added_key] = count return summary class OriginIndexer(BaseIndexer[str, None, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, origin_urls: List[str], **kwargs) -> Dict: """Given a list of origin urls: - retrieve origins from storage - execute the indexing computations - store the results Args: origin_urls: list of origin urls. **kwargs: passed to the `index` method """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] origins = [{"url": url} for url in origin_urls] return self.process_journal_objects({"origin": origins}) def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Worker function for ``JournalClient``. Expects ``objects`` to have a single key, either ``origin`` or ``"origin_visit_status"``.""" - assert set(objects) == {"origin"} + # TODO: add support for subscribing to other topics? Currently, this is + # not implemented because no indexer would use it. + assert set(objects) <= {"origin", "origin_visit_status"} - origins = [Origin(url=origin["url"]) for origin in objects.get("origin", [])] + origins = [ + Origin(url=status["origin"]) + for status in objects.get("origin_visit_status", []) + if status["status"] == "full" + ] + [Origin(url=origin["url"]) for origin in objects.get("origin", [])] summary: Dict[str, Any] = {"status": "uneventful"} try: - results = self.index_list(origins) + results = self.index_list( + origins, + check_origin_known=False, + # no need to check they exist, as we just received either an origin or + # visit status; which cannot be created by swh-storage unless the origin + # already exists + ) except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) self.results = results if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: results = [] for origin in origins: try: results.extend(self.index(origin.url, **kwargs)) except Exception: self.log.exception("Problem when processing origin %s", origin.url) sentry_sdk.capture_exception() raise return results class RevisionIndexer(BaseIndexer[Sha1Git, Revision, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Revision indexing using the run method Note: the :class:`RevisionIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1Git], **kwargs) -> Dict: """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - store the results Args: ids: sha1_git's identifier list """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] revision_ids = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] revisions = [] for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)): if not rev: # TODO: call self.index() with rev=None? self.log.warning( "Revision %s not found in storage", hashutil.hash_to_hex(rev_id) ) continue revisions.append(rev.to_dict()) return self.process_journal_objects({"revision": revisions}) def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Worker function for ``JournalClient``. Expects ``objects`` to have a single key, ``"revision"``.""" assert set(objects) == {"revision"} summary: Dict[str, Any] = {"status": "uneventful"} results = [] for rev in objects["revision"]: try: results.extend(self.index(rev["id"], Revision.from_dict(rev))) except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when processing revision") sentry_sdk.capture_exception() summary["status"] = "failed" summary_persist = self.persist_index_computations(results) if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) self.results = results return summary diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index b3d7717..a133a40 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,401 +1,404 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, ) import sentry_sdk from swh.core.config import merge_configs from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents from swh.indexer.indexer import ContentIndexer, OriginIndexer, RevisionIndexer from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.origin_head import OriginHeadIndexer from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 from swh.indexer.storage.model import ( ContentMetadataRow, OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) from swh.model import hashutil from swh.model.model import Origin, Revision, Sha1Git REVISION_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 T1 = TypeVar("T1") T2 = TypeVar("T2") def call_with_batches( f: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function with batches of args, and concatenates the results.""" groups = grouper(args, batch_size) for group in groups: yield from f(list(group)) class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": sha1, "indexer_configuration_id": self.tool["id"], } for sha1 in ids ) ) def index( self, id: Sha1, data: Optional[bytes] = None, log_suffix="unknown revision", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert isinstance(id, bytes) assert data is not None metadata = None try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hashutil.hash_to_hex(id) metadata = MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id) ) sentry_sdk.capture_exception() if metadata is None: return [] return [ ContentMetadataRow( id=id, indexer_configuration_id=self.tool["id"], metadata=metadata, ) ] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - metadata (jsonb): detected metadata """ return self.idx_storage.content_metadata_add(results) DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh-metadata-detector", "version": "0.0.2", "configuration": {}, }, } class RevisionMetadataIndexer(RevisionIndexer[RevisionIntrinsicMetadataRow]): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_intrinsic_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.revision_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) ) def index( self, id: Sha1Git, data: Optional[Revision], **kwargs ) -> List[RevisionIntrinsicMetadataRow]: """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: id: sha1_git of the revision data: revision model object from storage Returns: dict: dictionary representing a revision_intrinsic_metadata, with keys: - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ rev = data assert isinstance(rev, Revision) try: root_dir = rev.directory dir_ls = list(self.storage.directory_ls(root_dir, recursive=False)) if [entry["type"] for entry in dir_ls] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_ls[0]["target"] dir_ls = list(self.storage.directory_ls(subdir, recursive=False)) files = [entry for entry in dir_ls if entry["type"] == "file"] detected_files = detect_metadata(files) (mappings, metadata) = self.translate_revision_intrinsic_metadata( detected_files, log_suffix="revision=%s" % hashutil.hash_to_hex(rev.id), ) except Exception as e: self.log.exception("Problem when indexing rev: %r", e) sentry_sdk.capture_exception() return [ RevisionIntrinsicMetadataRow( id=rev.id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[RevisionIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ # TODO: add functions in storage to keep data in # revision_intrinsic_metadata return self.idx_storage.revision_intrinsic_metadata_add(results) def translate_revision_intrinsic_metadata( self, detected_files: Dict[str, List[Any]], log_suffix: str ) -> Tuple[List[Any], Any]: """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files: dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ used_mappings = [MAPPINGS[context].name for context in detected_files] metadata = [] tool = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {}, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = {k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage"]} config["tools"] = [tool] for context in detected_files.keys(): cfg = deepcopy(config) cfg["tools"][0]["configuration"]["context"] = context c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context] ) for c in metadata_generator: # extracting metadata sha1 = c.id sha1s_in_storage.append(sha1) local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ item for item in detected_files[context] if item not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: c_metadata_indexer.run( sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result.metadata metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") sentry_sdk.capture_exception() metadata = merge_documents(metadata) return (used_mappings, metadata) class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]] ): USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.origin_head_indexer = OriginHeadIndexer(config=config) self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) def index_list( - self, origins: List[Origin], **kwargs + self, origins: List[Origin], check_origin_known: bool = True, **kwargs ) -> List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]]: head_rev_ids = [] origins_with_head = [] # Filter out origins not in the storage - known_origins = list( - call_with_batches( - self.storage.origin_get, - [origin.url for origin in origins], - ORIGIN_GET_BATCH_SIZE, + if check_origin_known: + known_origins = list( + call_with_batches( + self.storage.origin_get, + [origin.url for origin in origins], + ORIGIN_GET_BATCH_SIZE, + ) ) - ) + else: + known_origins = list(origins) for origin in known_origins: if origin is None: continue head_results = self.origin_head_indexer.index(origin.url) if head_results: (head_result,) = head_results origins_with_head.append(origin) head_rev_ids.append(head_result["revision_id"]) head_revs = list( call_with_batches( self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE ) ) assert len(head_revs) == len(head_rev_ids) results = [] for (origin, rev) in zip(origins_with_head, head_revs): if not rev: self.log.warning("Missing head revision of origin %r", origin.url) continue for rev_metadata in self.revision_metadata_indexer.index(rev.id, rev): # There is at most one rev_metadata orig_metadata = OriginIntrinsicMetadataRow( from_revision=rev_metadata.id, id=origin.url, metadata=rev_metadata.metadata, mappings=rev_metadata.mappings, indexer_configuration_id=rev_metadata.indexer_configuration_id, ) results.append((orig_metadata, rev_metadata)) return results def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate revisions rev_metadata: List[RevisionIntrinsicMetadataRow] = [] orig_metadata: List[OriginIntrinsicMetadataRow] = [] summary: Dict = {} for (orig_item, rev_item) in results: assert rev_item.metadata == orig_item.metadata if rev_item.metadata and not (rev_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if rev_item not in rev_metadata: rev_metadata.append(rev_item) if orig_item not in orig_metadata: orig_metadata.append(orig_item) if rev_metadata: summary_rev = self.idx_storage.revision_intrinsic_metadata_add(rev_metadata) summary.update(summary_rev) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) summary.update(summary_ori) return summary diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index e0628df..fd57816 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,525 +1,653 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import reduce import re from typing import Any, Dict, List from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Consumer import pytest from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes from swh.model.model import OriginVisitStatus +from .utils import REVISION + def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ { "tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, from_revision=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] revision_metadata = [ RevisionIntrinsicMetadataRow( id=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.revision_intrinsic_metadata_add(revision_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) @pytest.fixture def cli_runner(): return CliRunner() def test_cli_mapping_list(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list"], catch_exceptions=False, ) expected_output = "\n".join( [ "cff", "codemeta", "gemspec", "maven", "npm", "pkg-info", "", ] # must be sorted for test to pass ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_cli_mapping_list_terms(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_cli_mapping_list_terms_exclude(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_empty_db( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_divisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_dry_run( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_nondivisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--batch-size", "20", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_mapping( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_two_mappings( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "--config-file", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_tool( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]), ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def now(): return datetime.datetime.now(tz=datetime.timezone.utc) -def test_cli_journal_client( +def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), "--origin-metadata-task-type", "index-origin-metadata", ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") # This can be split into multiple tasks but no more than the origin-visit-statuses # written in the journal assert len(tasks) <= len(visit_statuses_full) actual_origins = [] for task in tasks: actual_task = dict(task) assert actual_task["type"] == "index-origin-metadata" scheduled_origins = actual_task["arguments"]["args"][0] actual_origins.extend(scheduled_origins) assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} def test_cli_journal_client_without_brokers( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer ): """Without brokers configuration, the cli fails.""" with pytest.raises(ValueError, match="brokers"): cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", ], catch_exceptions=False, ) + + +def test_cli_journal_client_index( + cli_runner, + swh_config, + kafka_prefix: str, + kafka_server, + consumer: Consumer, + idx_storage, + storage, + mocker, + swh_indexer_config, +): + """Test the 'swh indexer journal-client' cli tool.""" + journal_writer = get_journal_writer( + "kafka", + brokers=[kafka_server], + prefix=kafka_prefix, + client_id="test producer", + value_sanitizer=lambda object_type, value: value, + flush_timeout=3, # fail early if something is going wrong + ) + + visit_statuses = [ + OriginVisitStatus( + origin="file:///dev/zero", + visit=1, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/foobar", + visit=2, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///tmp/spamegg", + visit=3, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/0002", + visit=6, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'partial' status + origin="file:///dev/0000", + visit=4, + date=now(), + status="partial", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'ongoing' status + origin="file:///dev/0001", + visit=5, + date=now(), + status="ongoing", + snapshot=None, + ), + ] + + journal_writer.write_additions("origin_visit_status", visit_statuses) + visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] + storage.revision_add([REVISION]) + + mocker.patch( + "swh.indexer.origin_head.OriginHeadIndexer.index", + return_value=[{"revision_id": REVISION.id}], + ) + + mocker.patch( + "swh.indexer.metadata.RevisionMetadataIndexer.index", + return_value=[ + RevisionIntrinsicMetadataRow( + id=REVISION.id, + indexer_configuration_id=1, + mappings=["cff"], + metadata={"foo": "bar"}, + ) + ], + ) + result = cli_runner.invoke( + indexer_cli_group, + [ + "-C", + swh_config, + "journal-client", + "origin-intrinsic-metadata", + "--broker", + kafka_server, + "--prefix", + kafka_prefix, + "--group-id", + "test-consumer", + "--stop-after-objects", + len(visit_statuses), + ], + catch_exceptions=False, + ) + + # Check the output + expected_output = "Done.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + results = idx_storage.origin_intrinsic_metadata_get( + [status.origin for status in visit_statuses] + ) + expected_results = [ + OriginIntrinsicMetadataRow( + id=status.origin, + from_revision=REVISION.id, + tool={"id": 1, **swh_indexer_config["tools"]}, + mappings=["cff"], + metadata={"foo": "bar"}, + ) + for status in sorted(visit_statuses_full, key=lambda r: r.origin) + ] + assert sorted(results, key=lambda r: r.id) == expected_results diff --git a/swh/indexer/tests/test_indexer.py b/swh/indexer/tests/test_indexer.py index 85d1a62..17f34ed 100644 --- a/swh/indexer/tests/test_indexer.py +++ b/swh/indexer/tests/test_indexer.py @@ -1,155 +1,169 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List, Optional from unittest.mock import Mock import pytest from swh.indexer.indexer import ( ContentIndexer, ContentPartitionIndexer, OriginIndexer, RevisionIndexer, ) from swh.indexer.storage import PagedResult, Sha1 from swh.model.model import Content from .utils import BASE_TEST_CONFIG, REVISION class _TestException(Exception): pass class CrashingIndexerMixin: USE_TOOLS = False def index( self, id: Any, data: Optional[Any] = None, **kwargs ) -> List[Dict[str, Any]]: raise _TestException() def persist_index_computations(self, results) -> Dict[str, int]: return {} def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int ) -> Iterable[Sha1]: raise _TestException() class CrashingContentIndexer(CrashingIndexerMixin, ContentIndexer): pass class CrashingContentPartitionIndexer(CrashingIndexerMixin, ContentPartitionIndexer): pass class CrashingRevisionIndexer(CrashingIndexerMixin, RevisionIndexer): pass class CrashingOriginIndexer(CrashingIndexerMixin, OriginIndexer): pass class TrivialContentPartitionIndexer(ContentPartitionIndexer[str]): USE_TOOLS = False def index(self, id: bytes, data: Optional[bytes], **kwargs) -> List[str]: return ["indexed " + id.decode()] def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int ) -> Iterable[Sha1]: return iter([b"excluded hash", b"other excluded hash"]) def persist_index_computations(self, results: List[str]) -> Dict[str, int]: self._results.append(results) # type: ignore return {"nb_added": len(results)} def test_content_indexer_catch_exceptions(): indexer = CrashingContentIndexer(config=BASE_TEST_CONFIG) indexer.objstorage = Mock() indexer.objstorage.get.return_value = b"content" assert indexer.run([b"foo"]) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run([b"foo"]) def test_revision_indexer_catch_exceptions(): indexer = CrashingRevisionIndexer(config=BASE_TEST_CONFIG) indexer.storage = Mock() indexer.storage.revision_get.return_value = [REVISION] assert indexer.run([b"foo"]) == {"status": "failed"} + assert indexer.process_journal_objects({"revision": [REVISION.to_dict()]}) == { + "status": "failed" + } + indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run([b"foo"]) + with pytest.raises(_TestException): + indexer.process_journal_objects({"revision": [REVISION.to_dict()]}) + def test_origin_indexer_catch_exceptions(): indexer = CrashingOriginIndexer(config=BASE_TEST_CONFIG) assert indexer.run(["http://example.org"]) == {"status": "failed"} + assert indexer.process_journal_objects( + {"origin": [{"url": "http://example.org"}]} + ) == {"status": "failed"} + indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run(["http://example.org"]) + with pytest.raises(_TestException): + indexer.process_journal_objects({"origin": [{"url": "http://example.org"}]}) + def test_content_partition_indexer_catch_exceptions(): indexer = CrashingContentPartitionIndexer( config={**BASE_TEST_CONFIG, "write_batch_size": 42} ) assert indexer.run(0, 42) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run(0, 42) def test_content_partition_indexer(): # TODO: simplify the mocking in this test indexer = TrivialContentPartitionIndexer( config={ **BASE_TEST_CONFIG, "write_batch_size": 10, } # doesn't matter ) indexer.catch_exceptions = False indexer._results = [] indexer.storage = Mock() indexer.storage.content_get_partition = lambda *args, **kwargs: PagedResult( results=[ Content(sha1=c, sha1_git=c, sha256=c, blake2s256=c, length=42) for c in [ b"hash1", b"excluded hash", b"hash2", b"other excluded hash", b"hash3", ] ], next_page_token=None, ) indexer.objstorage = Mock() indexer.objstorage.get = lambda id: b"foo" nb_partitions = 1 partition_id = 0 indexer.run(partition_id, nb_partitions) assert indexer._results == [["indexed hash1", "indexed hash2", "indexed hash3"]]