diff --git a/requirements.txt b/requirements.txt index a94d3ff..6116d2c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ python-magic >= 0.4.13 click # frozendict: dependency of pyld # the version 2.1.2 is causing segmentation faults # cf T3815 frozendict < 2.1.2 pyld -xmltodict +sentry-sdk typing-extensions +xmltodict \ No newline at end of file diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py index 76440a2..1873fba 100644 --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -1,192 +1,195 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import subprocess from typing import Any, Dict, Iterable, List, Optional +import sentry_sdk + from swh.core.api.classes import stream_results from swh.core.config import merge_configs from swh.indexer.storage.interface import IndexerStorageInterface, Sha1 from swh.indexer.storage.model import ContentLicenseRow from swh.model import hashutil from .indexer import ContentIndexer, ContentPartitionIndexer, write_to_temp logger = logging.getLogger(__name__) def compute_license(path) -> Dict: """Determine license from file at path. Args: path: filepath to determine the license Returns: dict: A dict with the following keys: - licenses ([str]): associated detected licenses to path - path (bytes): content filepath """ try: properties = subprocess.check_output(["nomossa", path], universal_newlines=True) if properties: res = properties.rstrip().split(" contains license(s) ") licenses = res[1].split(",") else: licenses = [] return { "licenses": licenses, "path": path, } except subprocess.CalledProcessError: from os import path as __path logger.exception( "Problem during license detection for sha1 %s" % __path.basename(path) ) + sentry_sdk.capture_exception() return { "licenses": [], "path": path, } DEFAULT_CONFIG: Dict[str, Any] = { "workdir": "/tmp/swh/indexer.fossology.license", "tools": { "name": "nomos", "version": "3.1.0rc2-31-ga2cbb8c", "configuration": { "command_line": "nomossa ", }, }, "write_batch_size": 1000, } class MixinFossologyLicenseIndexer: """Mixin fossology license indexer. See :class:`FossologyLicenseIndexer` and :class:`FossologyLicensePartitionIndexer` """ tool: Any idx_storage: IndexerStorageInterface def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) self.working_directory = self.config["workdir"] def index( self, id: Sha1, data: Optional[bytes] = None, **kwargs ) -> List[ContentLicenseRow]: """Index sha1s' content and store result. Args: id (bytes): content's identifier raw_content (bytes): associated raw content to content id Returns: dict: A dict, representing a content_license, with keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path - indexer_configuration_id (int): tool used to compute the output """ assert data is not None with write_to_temp( filename=hashutil.hash_to_hex(id), # use the id as pathname data=data, working_directory=self.working_directory, ) as content_path: properties = compute_license(path=content_path) return [ ContentLicenseRow( id=id, indexer_configuration_id=self.tool["id"], license=license, ) for license in properties["licenses"] ] def persist_index_computations( self, results: List[ContentLicenseRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_license dict with the following keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path """ return self.idx_storage.content_fossology_license_add(results) class FossologyLicenseIndexer( MixinFossologyLicenseIndexer, ContentIndexer[ContentLicenseRow] ): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {license, encoding} from that content - store result in storage """ def filter(self, ids): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_fossology_license_missing( ( { "id": sha1, "indexer_configuration_id": self.tool["id"], } for sha1 in ids ) ) class FossologyLicensePartitionIndexer( MixinFossologyLicenseIndexer, ContentPartitionIndexer[ContentLicenseRow] ): """FossologyLicense Range Indexer working on range/partition of content identifiers. - filters out the non textual content - (optionally) filters out content already indexed (cf :meth:`.indexed_contents_in_partition`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None ) -> Iterable[Sha1]: """Retrieve indexed content id within the partition id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination """ return stream_results( self.idx_storage.content_fossology_license_get_partition, self.tool["id"], partition_id, nb_partitions, ) diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 58ebce4..066a462 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,611 +1,617 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from contextlib import contextmanager import logging import os import shutil import tempfile from typing import ( Any, Dict, Generic, Iterable, Iterator, List, Optional, Set, TypeVar, Union, ) import warnings +import sentry_sdk + from swh.core import utils from swh.core.config import load_from_envvar, merge_configs from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage from swh.indexer.storage.interface import IndexerStorageInterface from swh.model import hashutil from swh.model.model import Revision, Sha1Git from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.scheduler import CONFIG as SWH_CONFIG from swh.storage import get_storage from swh.storage.interface import StorageInterface @contextmanager def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. Args: filename: one of sha1's many filenames data: the sha1's content to write in temporary file working_directory: the directory into which the file is written Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, "wb") as f: f.write(data) yield content_path shutil.rmtree(temp_dir) DEFAULT_CONFIG = { INDEXER_CFG_KEY: {"cls": "memory"}, "storage": {"cls": "memory"}, "objstorage": {"cls": "memory"}, } TId = TypeVar("TId") """type of the ids of index()ed objects.""" TData = TypeVar("TData") """type of the objects passed to index().""" TResult = TypeVar("TResult") """return type of index()""" class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :meth:`~BaseIndexer.run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :meth:`~BaseIndexer.filter`: filter out data already indexed (in storage). :meth:`~BaseIndexer.index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :meth:`~BaseIndexer.persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :meth:`~BaseIndexer.prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :meth:`~BaseIndexer.check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :meth:`~BaseIndexer.register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ results: List[TResult] USE_TOOLS = True catch_exceptions = True """Prevents exceptions in `index()` from raising too high. Set to False in tests to properly catch all exceptions.""" scheduler: Any storage: StorageInterface objstorage: Any idx_storage: IndexerStorageInterface def __init__(self, config=None, **kw) -> None: """Prepare and check that the indexer is ready to run.""" super().__init__() if config is not None: self.config = config elif SWH_CONFIG: self.config = SWH_CONFIG.copy() else: self.config = load_from_envvar() self.config = merge_configs(DEFAULT_CONFIG, self.config) self.prepare() self.check() self.log.debug("%s: config=%s", self, self.config) def prepare(self) -> None: """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ config_storage = self.config.get("storage") if config_storage: self.storage = get_storage(**config_storage) self.objstorage = get_objstorage(**self.config["objstorage"]) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) self.log = logging.getLogger("swh.indexer") if self.USE_TOOLS: self.tools = list(self.register_tools(self.config.get("tools", []))) self.results = [] @property def tool(self) -> Dict: return self.tools[0] def check(self) -> None: """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if self.USE_TOOLS and not self.tools: raise ValueError("Tools %s is unknown, cannot continue" % self.tools) def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]: """Prepare the tool dict to be compliant with the storage api.""" return {"tool_%s" % key: value for key, value in tool.items()} def register_tools( self, tools: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> List[Dict[str, Any]]: """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools: Either a dict or a list of dict. Returns: list: List of dicts with additional id key. Raises: ValueError: if not a list nor a dict. """ if isinstance(tools, list): tools = list(map(self._prepare_tool, tools)) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError("Configuration tool(s) must be a dict or list!") if tools: return self.idx_storage.indexer_configuration_add(tools) else: return [] def index(self, id: TId, data: Optional[TData], **kwargs) -> List[TResult]: """Index computation for the id and associated raw data. Args: id: identifier or Dict object data: id's data from storage or objstorage depending on object type Returns: dict: a dict that makes sense for the :meth:`.persist_index_computations` method. """ raise NotImplementedError() def filter(self, ids: List[TId]) -> Iterator[TId]: """Filter missing ids for that particular indexer. Args: ids: list of ids Yields: iterator of missing ids """ yield from ids @abc.abstractmethod def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]: """Persist the computation resulting from the index. Args: results: List of results. One result is the result of the index function. Returns: a summary dict of what has been inserted in the storage """ return {} class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content indexer working on a list of ids directly. To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1], **kwargs) -> Dict: """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results Args: ids (Iterable[Union[bytes, str]]): sha1's identifier list **kwargs: passed to the `index` method Returns: A summary Dict of the task's status """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] sha1s = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] results = [] summary: Dict = {"status": "uneventful"} try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning( "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1) ) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.extend(res) summary["status"] = "eventful" summary = self.persist_index_computations(results) self.results = results except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when reading contents metadata.") + sentry_sdk.capture_exception() summary["status"] = "failed" return summary class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content partition indexer. This expects as input a partition_id and a nb_partitions. This will then index the contents within that partition. To work on a list of ids, use the :class:`ContentIndexer` instead. Note: :class:`ContentPartitionIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ @abc.abstractmethod def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int ) -> Iterable[Sha1]: """Retrieve indexed contents within range [start, end]. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination """ pass def _list_contents_to_index( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] ) -> Iterable[Sha1]: """Compute from storage the new contents to index in the partition_id . The already indexed contents are skipped. Args: partition_id: Index of the partition to fetch data from nb_partitions: Total number of partition indexed: Set of content already indexed. Yields: Sha1 id (bytes) of contents to index """ if not isinstance(partition_id, int) or not isinstance(nb_partitions, int): raise TypeError( f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}." ) next_page_token = None while True: result = self.storage.content_get_partition( partition_id, nb_partitions, page_token=next_page_token ) contents = result.results for c in contents: _id = hashutil.hash_to_bytes(c.sha1) if _id in indexed: continue yield _id next_page_token = result.next_page_token if next_page_token is None: break def _index_contents( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any ) -> Iterator[TResult]: """Index the contents within the partition_id. Args: start: Starting bound from range identifier end: End range identifier indexed: Set of content already indexed. Yields: indexing result as dict to persist in the indexer backend """ for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning(f"Content {sha1.hex()} not found in objstorage") continue yield from self.index(sha1, raw_content, **kwargs) def _index_with_skipping_already_done( self, partition_id: int, nb_partitions: int ) -> Iterator[TResult]: """Index not already indexed contents within the partition partition_id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into Yields: indexing result as dict to persist in the indexer backend """ already_indexed_contents = set( self.indexed_contents_in_partition(partition_id, nb_partitions) ) return self._index_contents( partition_id, nb_partitions, already_indexed_contents ) def run( self, partition_id: int, nb_partitions: int, skip_existing: bool = True, **kwargs, ) -> Dict: """Given a partition of content ids, index the contents within. Either the indexer is incremental (filter out existing computed data) or it computes everything from scratch. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into skip_existing: Skip existing indexed data (default) or not **kwargs: passed to the `index` method Returns: dict with the indexing task status """ summary: Dict[str, Any] = {"status": "uneventful"} count = 0 try: if skip_existing: gen = self._index_with_skipping_already_done( partition_id, nb_partitions ) else: gen = self._index_contents(partition_id, nb_partitions, indexed=set([])) count_object_added_key: Optional[str] = None for contents in utils.grouper(gen, n=self.config["write_batch_size"]): res = self.persist_index_computations(list(contents)) if not count_object_added_key: count_object_added_key = list(res.keys())[0] count += res[count_object_added_key] if count > 0: summary["status"] = "eventful" except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when computing metadata.") + sentry_sdk.capture_exception() summary["status"] = "failed" if count > 0 and count_object_added_key: summary[count_object_added_key] = count return summary class OriginIndexer(BaseIndexer[str, None, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, origin_urls: List[str], **kwargs) -> Dict: """Given a list of origin urls: - retrieve origins from storage - execute the indexing computations - store the results Args: origin_urls: list of origin urls. **kwargs: passed to the `index` method """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] summary: Dict[str, Any] = {"status": "uneventful"} try: results = self.index_list(origin_urls, **kwargs) except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) self.results = results if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary def index_list(self, origin_urls: List[str], **kwargs) -> List[TResult]: results = [] for origin_url in origin_urls: try: results.extend(self.index(origin_url, **kwargs)) except Exception: self.log.exception("Problem when processing origin %s", origin_url) + sentry_sdk.capture_exception() raise return results class RevisionIndexer(BaseIndexer[Sha1Git, Revision, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Revision indexing using the run method Note: the :class:`RevisionIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1Git], **kwargs) -> Dict: """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - store the results Args: ids: sha1_git's identifier list """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] summary: Dict[str, Any] = {"status": "uneventful"} results = [] revision_ids = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)): if not rev: # TODO: call self.index() with rev=None? self.log.warning( "Revision %s not found in storage", hashutil.hash_to_hex(rev_id) ) continue try: results.extend(self.index(rev_id, rev)) except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when processing revision") + sentry_sdk.capture_exception() summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) self.results = results return summary diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 9a4e6cb..6fc3705 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,392 +1,397 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, ) +import sentry_sdk + from swh.core.config import merge_configs from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents from swh.indexer.indexer import ContentIndexer, OriginIndexer, RevisionIndexer from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.origin_head import OriginHeadIndexer from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 from swh.indexer.storage.model import ( ContentMetadataRow, OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) from swh.model import hashutil from swh.model.model import Revision, Sha1Git REVISION_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 T1 = TypeVar("T1") T2 = TypeVar("T2") def call_with_batches( f: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function with batches of args, and concatenates the results.""" groups = grouper(args, batch_size) for group in groups: yield from f(list(group)) class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": sha1, "indexer_configuration_id": self.tool["id"], } for sha1 in ids ) ) def index( self, id: Sha1, data: Optional[bytes] = None, log_suffix="unknown revision", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert isinstance(id, bytes) assert data is not None metadata = None try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hashutil.hash_to_hex(id) metadata = MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id) ) + sentry_sdk.capture_exception() if metadata is None: return [] return [ ContentMetadataRow( id=id, indexer_configuration_id=self.tool["id"], metadata=metadata, ) ] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - metadata (jsonb): detected metadata """ return self.idx_storage.content_metadata_add(results) DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh-metadata-detector", "version": "0.0.2", "configuration": {}, }, } class RevisionMetadataIndexer(RevisionIndexer[RevisionIntrinsicMetadataRow]): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_intrinsic_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.revision_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) ) def index( self, id: Sha1Git, data: Optional[Revision], **kwargs ) -> List[RevisionIntrinsicMetadataRow]: """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: id: sha1_git of the revision data: revision model object from storage Returns: dict: dictionary representing a revision_intrinsic_metadata, with keys: - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ rev = data assert isinstance(rev, Revision) try: root_dir = rev.directory dir_ls = list(self.storage.directory_ls(root_dir, recursive=False)) if [entry["type"] for entry in dir_ls] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_ls[0]["target"] dir_ls = list(self.storage.directory_ls(subdir, recursive=False)) files = [entry for entry in dir_ls if entry["type"] == "file"] detected_files = detect_metadata(files) (mappings, metadata) = self.translate_revision_intrinsic_metadata( detected_files, log_suffix="revision=%s" % hashutil.hash_to_hex(rev.id), ) except Exception as e: self.log.exception("Problem when indexing rev: %r", e) + sentry_sdk.capture_exception() return [ RevisionIntrinsicMetadataRow( id=rev.id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[RevisionIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ # TODO: add functions in storage to keep data in # revision_intrinsic_metadata return self.idx_storage.revision_intrinsic_metadata_add(results) def translate_revision_intrinsic_metadata( self, detected_files: Dict[str, List[Any]], log_suffix: str ) -> Tuple[List[Any], Any]: """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files: dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ used_mappings = [MAPPINGS[context].name for context in detected_files] metadata = [] tool = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {}, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = {k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage"]} config["tools"] = [tool] for context in detected_files.keys(): cfg = deepcopy(config) cfg["tools"][0]["configuration"]["context"] = context c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context] ) for c in metadata_generator: # extracting metadata sha1 = c.id sha1s_in_storage.append(sha1) local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ item for item in detected_files[context] if item not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: c_metadata_indexer.run( sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result.metadata metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") + sentry_sdk.capture_exception() metadata = merge_documents(metadata) return (used_mappings, metadata) class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]] ): USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.origin_head_indexer = OriginHeadIndexer(config=config) self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) def index_list( self, origin_urls: List[str], **kwargs ) -> List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]]: head_rev_ids = [] origins_with_head = [] origins = list( call_with_batches( self.storage.origin_get, origin_urls, ORIGIN_GET_BATCH_SIZE, ) ) for origin in origins: if origin is None: continue head_results = self.origin_head_indexer.index(origin.url) if head_results: (head_result,) = head_results origins_with_head.append(origin) head_rev_ids.append(head_result["revision_id"]) head_revs = list( call_with_batches( self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE ) ) assert len(head_revs) == len(head_rev_ids) results = [] for (origin, rev) in zip(origins_with_head, head_revs): if not rev: self.log.warning("Missing head revision of origin %r", origin.url) continue for rev_metadata in self.revision_metadata_indexer.index(rev.id, rev): # There is at most one rev_metadata orig_metadata = OriginIntrinsicMetadataRow( from_revision=rev_metadata.id, id=origin.url, metadata=rev_metadata.metadata, mappings=rev_metadata.mappings, indexer_configuration_id=rev_metadata.indexer_configuration_id, ) results.append((orig_metadata, rev_metadata)) return results def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate revisions rev_metadata: List[RevisionIntrinsicMetadataRow] = [] orig_metadata: List[OriginIntrinsicMetadataRow] = [] summary: Dict = {} for (orig_item, rev_item) in results: assert rev_item.metadata == orig_item.metadata if rev_item.metadata and not (rev_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if rev_item not in rev_metadata: rev_metadata.append(rev_item) if orig_item not in orig_metadata: orig_metadata.append(orig_item) if rev_metadata: summary_rev = self.idx_storage.revision_intrinsic_metadata_add(rev_metadata) summary.update(summary_rev) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) summary.update(summary_ori) return summary diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py index bcebffe..305226e 100644 --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -1,173 +1,177 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import itertools import logging from typing import Any, Dict, Generator, List, Optional, Tuple +import sentry_sdk + from swh.core import utils from swh.core.config import load_from_envvar from swh.model import hashutil from swh.model.model import Content from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.storage import get_storage DEFAULT_CONFIG: Dict[str, Any] = { "storage": {"cls": "memory"}, "objstorage": {"cls": "memory"}, # the set of checksums that should be computed. # Examples: 'sha1_git', 'blake2b512', 'blake2s256' "compute_checksums": [], # whether checksums that already exist in the DB should be # recomputed/updated or left untouched "recompute_checksums": False, # Number of contents to retrieve blobs at the same time "batch_size_retrieve_content": 10, # Number of contents to update at the same time "batch_size_update": 100, } class RecomputeChecksums: """Class in charge of (re)computing content's hashes. Hashes to compute are defined across 2 configuration options: compute_checksums ([str]) list of hash algorithms that py:func:`swh.model.hashutil.MultiHash.from_data` function should be able to deal with. For variable-length checksums, a desired checksum length should also be provided. Their format is : e.g: blake2:512 recompute_checksums (bool) a boolean to notify that we also want to recompute potential existing hashes specified in compute_checksums. Default to False. """ def __init__(self) -> None: self.config = load_from_envvar(DEFAULT_CONFIG) self.storage = get_storage(**self.config["storage"]) self.objstorage = get_objstorage(**self.config["objstorage"]) self.compute_checksums = self.config["compute_checksums"] self.recompute_checksums = self.config["recompute_checksums"] self.batch_size_retrieve_content = self.config["batch_size_retrieve_content"] self.batch_size_update = self.config["batch_size_update"] self.log = logging.getLogger("swh.indexer.rehash") if not self.compute_checksums: raise ValueError("Checksums list should not be empty.") def _read_content_ids( self, contents: List[Dict[str, Any]] ) -> Generator[bytes, Any, None]: """Read the content identifiers from the contents.""" for c in contents: h = c["sha1"] if isinstance(h, str): h = hashutil.hash_to_bytes(h) yield h def get_new_contents_metadata( self, all_contents: List[Dict[str, Any]] ) -> Generator[Tuple[Dict[str, Any], List[Any]], Any, None]: """Retrieve raw contents and compute new checksums on the contents. Unknown or corrupted contents are skipped. Args: all_contents: List of contents as dictionary with the necessary primary keys Yields: tuple: tuple of (content to update, list of checksums computed) """ content_ids = self._read_content_ids(all_contents) for contents in utils.grouper(content_ids, self.batch_size_retrieve_content): contents_iter = itertools.tee(contents, 2) try: sha1s = [s for s in contents_iter[0]] content_metadata: List[Optional[Content]] = self.storage.content_get( sha1s ) except Exception: self.log.exception("Problem when reading contents metadata.") + sentry_sdk.capture_exception() continue for sha1, content_model in zip(sha1s, content_metadata): if not content_model: continue content: Dict = content_model.to_dict() # Recompute checksums provided in compute_checksums options if self.recompute_checksums: checksums_to_compute = list(self.compute_checksums) else: # Compute checksums provided in compute_checksums # options not already defined for that content checksums_to_compute = [ h for h in self.compute_checksums if not content.get(h) ] if not checksums_to_compute: # Nothing to recompute continue try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning("Content %s not found in objstorage!", sha1) continue content_hashes = hashutil.MultiHash.from_data( raw_content, hash_names=checksums_to_compute ).digest() content.update(content_hashes) yield content, checksums_to_compute def run(self, contents: List[Dict[str, Any]]) -> Dict: """Given a list of content: - (re)compute a given set of checksums on contents available in our object storage - update those contents with the new metadata Args: contents: contents as dictionary with necessary keys. key present in such dictionary should be the ones defined in the 'primary_key' option. Returns: A summary dict with key 'status', task' status and 'count' the number of updated contents. """ status = "uneventful" count = 0 for data in utils.grouper( self.get_new_contents_metadata(contents), self.batch_size_update ): groups: Dict[str, List[Any]] = defaultdict(list) for content, keys_to_update in data: keys_str = ",".join(keys_to_update) groups[keys_str].append(content) for keys_to_update, contents in groups.items(): keys: List[str] = keys_to_update.split(",") try: self.storage.content_update(contents, keys=keys) count += len(contents) status = "eventful" except Exception: self.log.exception("Problem during update.") + sentry_sdk.capture_exception() continue return { "status": status, "count": count, }