diff --git a/swh/indexer/ctags.py b/swh/indexer/ctags.py index eef0973..5c2aac6 100644 --- a/swh/indexer/ctags.py +++ b/swh/indexer/ctags.py @@ -1,157 +1,147 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import subprocess -from typing import Dict, List +from typing import Any, Dict, List +from swh.core.config import merge_configs from swh.model import hashutil from .indexer import ContentIndexer, write_to_temp # Options used to compute tags __FLAGS = [ "--fields=+lnz", # +l: language # +n: line number of tag definition # +z: include the symbol's kind (function, variable, ...) "--sort=no", # sort output on tag name "--links=no", # do not follow symlinks "--output-format=json", # outputs in json ] def compute_language(content, log=None): raise NotImplementedError( "Language detection was unreliable, so it is currently disabled. " "See https://forge.softwareheritage.org/D1455" ) def run_ctags(path, lang=None, ctags_command="ctags"): """Run ctags on file path with optional language. Args: path: path to the file lang: language for that path (optional) Yields: dict: ctags' output """ optional = [] if lang: optional = ["--language-force=%s" % lang] cmd = [ctags_command] + __FLAGS + optional + [path] output = subprocess.check_output(cmd, universal_newlines=True) for symbol in output.split("\n"): if not symbol: continue js_symbol = json.loads(symbol) yield { "name": js_symbol["name"], "kind": js_symbol["kind"], "line": js_symbol["line"], "lang": js_symbol["language"], } +DEFAULT_CONFIG: Dict[str, Any] = { + "workdir": "/tmp/swh/indexer.ctags", + "tools": { + "name": "universal-ctags", + "version": "~git7859817b", + "configuration": { + "command_line": """ctags --fields=+lnz --sort=no --links=no """ + """--output-format=json """ + }, + }, + "languages": {}, +} + + class CtagsIndexer(ContentIndexer): - CONFIG_BASE_FILENAME = "indexer/ctags" - - ADDITIONAL_CONFIG = { - "workdir": ("str", "/tmp/swh/indexer.ctags"), - "tools": ( - "dict", - { - "name": "universal-ctags", - "version": "~git7859817b", - "configuration": { - "command_line": """ctags --fields=+lnz --sort=no --links=no """ - """--output-format=json """ - }, - }, - ), - "languages": ( - "dict", - { - "ada": "Ada", - "adl": None, - "agda": None, - # ... - }, - ), - } - - def prepare(self): - super().prepare() + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config = merge_configs(DEFAULT_CONFIG, self.config) self.working_directory = self.config["workdir"] self.language_map = self.config["languages"] def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_ctags_missing( ({"id": sha1, "indexer_configuration_id": self.tool["id"],} for sha1 in ids) ) def index(self, id, data): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: dict: a dict representing a content_mimetype with keys: - **id** (bytes): content's identifier (sha1) - **ctags** ([dict]): ctags list of symbols """ lang = compute_language(data, log=self.log)["lang"] if not lang: return None ctags_lang = self.language_map.get(lang) if not ctags_lang: return None ctags = { "id": id, } filename = hashutil.hash_to_hex(id) with write_to_temp( filename=filename, data=data, working_directory=self.working_directory ) as content_path: result = run_ctags(content_path, lang=ctags_lang) ctags.update( {"ctags": list(result), "indexer_configuration_id": self.tool["id"],} ) return ctags def persist_index_computations( self, results: List[Dict], policy_update: str ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - ctags ([dict]): ctags list of symbols policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ return self.idx_storage.content_ctags_add( results, conflict_update=(policy_update == "update-dups") ) diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py index 910be2c..219552c 100644 --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -1,188 +1,187 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import subprocess from typing import Any, Dict, List, Optional, Union +from swh.core.config import merge_configs from swh.indexer.storage.interface import PagedResult, Sha1 from swh.model import hashutil from swh.model.model import Revision from .indexer import ContentIndexer, ContentPartitionIndexer, write_to_temp logger = logging.getLogger(__name__) def compute_license(path): """Determine license from file at path. Args: path: filepath to determine the license Returns: dict: A dict with the following keys: - licenses ([str]): associated detected licenses to path - path (bytes): content filepath """ try: properties = subprocess.check_output(["nomossa", path], universal_newlines=True) if properties: res = properties.rstrip().split(" contains license(s) ") licenses = res[1].split(",") else: licenses = [] return { "licenses": licenses, "path": path, } except subprocess.CalledProcessError: from os import path as __path logger.exception( "Problem during license detection for sha1 %s" % __path.basename(path) ) return { "licenses": [], "path": path, } +DEFAULT_CONFIG: Dict[str, Any] = { + "workdir": "/tmp/swh/indexer.fossology.license", + "tools": { + "name": "nomos", + "version": "3.1.0rc2-31-ga2cbb8c", + "configuration": {"command_line": "nomossa ",}, + }, + "write_batch_size": 1000, +} + + class MixinFossologyLicenseIndexer: """Mixin fossology license indexer. See :class:`FossologyLicenseIndexer` and :class:`FossologyLicensePartitionIndexer` """ - ADDITIONAL_CONFIG = { - "workdir": ("str", "/tmp/swh/indexer.fossology.license"), - "tools": ( - "dict", - { - "name": "nomos", - "version": "3.1.0rc2-31-ga2cbb8c", - "configuration": {"command_line": "nomossa ",}, - }, - ), - "write_batch_size": ("int", 1000), - } - - CONFIG_BASE_FILENAME = "indexer/fossology_license" # type: Optional[str] tool: Any idx_storage: Any - def prepare(self): - super().prepare() + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config = merge_configs(DEFAULT_CONFIG, self.config) self.working_directory = self.config["workdir"] def index( self, id: Union[bytes, Dict, Revision], data: Optional[bytes] = None, **kwargs ) -> Dict[str, Any]: """Index sha1s' content and store result. Args: id (bytes): content's identifier raw_content (bytes): associated raw content to content id Returns: dict: A dict, representing a content_license, with keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path - indexer_configuration_id (int): tool used to compute the output """ assert isinstance(id, bytes) assert data is not None with write_to_temp( filename=hashutil.hash_to_hex(id), # use the id as pathname data=data, working_directory=self.working_directory, ) as content_path: properties = compute_license(path=content_path) properties.update( {"id": id, "indexer_configuration_id": self.tool["id"],} ) return properties def persist_index_computations( self, results: List[Dict], policy_update: str ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_license dict with the following keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ return self.idx_storage.content_fossology_license_add( results, conflict_update=(policy_update == "update-dups") ) class FossologyLicenseIndexer(MixinFossologyLicenseIndexer, ContentIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {license, encoding} from that content - store result in storage """ def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_fossology_license_missing( ({"id": sha1, "indexer_configuration_id": self.tool["id"],} for sha1 in ids) ) class FossologyLicensePartitionIndexer( MixinFossologyLicenseIndexer, ContentPartitionIndexer ): """FossologyLicense Range Indexer working on range/partition of content identifiers. - filters out the non textual content - (optionally) filters out content already indexed (cf :meth:`.indexed_contents_in_partition`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None ) -> PagedResult[Sha1]: """Retrieve indexed content id within the partition id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination Returns: PagedResult of Sha1. If next_page_token is None, there is no more data to fetch """ return self.idx_storage.content_fossology_license_get_partition( self.tool["id"], partition_id, nb_partitions, page_token=page_token ) diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 22fad12..9f24e14 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,632 +1,610 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from contextlib import contextmanager import logging import os import shutil import tempfile -from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union +from typing import Any, Dict, Iterator, List, Optional, Set, Union from swh.core import utils -from swh.core.config import SWHConfig +from swh.core.config import load_from_envvar, merge_configs from swh.indexer.storage import INDEXER_CFG_KEY, PagedResult, Sha1, get_indexer_storage from swh.model import hashutil from swh.model.model import Revision from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.scheduler import CONFIG as SWH_CONFIG from swh.storage import get_storage @contextmanager def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. Args: filename: one of sha1's many filenames data: the sha1's content to write in temporary file working_directory: the directory into which the file is written Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, "wb") as f: f.write(data) yield content_path shutil.rmtree(temp_dir) -class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): +DEFAULT_CONFIG = { + INDEXER_CFG_KEY: {"cls": "memory"}, + "storage": {"cls": "memory"}, + "objstorage": {"cls": "memory"}, +} + + +class BaseIndexer(metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :meth:`~BaseIndexer.run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :meth:`~BaseIndexer.filter`: filter out data already indexed (in storage). :meth:`~BaseIndexer.index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :meth:`~BaseIndexer.persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :meth:`~BaseIndexer.prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :meth:`~BaseIndexer.check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :meth:`~BaseIndexer.register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ results: List[Dict] - CONFIG = "indexer/base" - - DEFAULT_CONFIG = { - INDEXER_CFG_KEY: ( - "dict", - {"cls": "remote", "args": {"url": "http://localhost:5007/"}}, - ), - "storage": ( - "dict", - {"cls": "remote", "args": {"url": "http://localhost:5002/",}}, - ), - "objstorage": ( - "dict", - {"cls": "remote", "args": {"url": "http://localhost:5003/",}}, - ), - } - - ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] - USE_TOOLS = True catch_exceptions = True """Prevents exceptions in `index()` from raising too high. Set to False in tests to properly catch all exceptions.""" scheduler: Any def __init__(self, config=None, **kw) -> None: """Prepare and check that the indexer is ready to run. """ super().__init__() if config is not None: self.config = config elif SWH_CONFIG: self.config = SWH_CONFIG.copy() else: - config_keys = ( - "base_filename", - "config_filename", - "additional_configs", - "global_config", - ) - config_args = {k: v for k, v in kw.items() if k in config_keys} - if self.ADDITIONAL_CONFIG: - config_args.setdefault("additional_configs", []).append( - self.ADDITIONAL_CONFIG - ) - self.config = self.parse_config_file(**config_args) + self.config = load_from_envvar() + self.config = merge_configs(DEFAULT_CONFIG, self.config) self.prepare() self.check() self.log.debug("%s: config=%s", self, self.config) def prepare(self) -> None: """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ config_storage = self.config.get("storage") if config_storage: self.storage = get_storage(**config_storage) objstorage = self.config["objstorage"] self.objstorage = get_objstorage(objstorage["cls"], objstorage["args"]) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) self.log = logging.getLogger("swh.indexer") if self.USE_TOOLS: self.tools = list(self.register_tools(self.config.get("tools", []))) self.results = [] @property def tool(self) -> Dict: return self.tools[0] def check(self) -> None: """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if self.USE_TOOLS and not self.tools: raise ValueError("Tools %s is unknown, cannot continue" % self.tools) def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]: """Prepare the tool dict to be compliant with the storage api. """ return {"tool_%s" % key: value for key, value in tool.items()} def register_tools( self, tools: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> List[Dict[str, Any]]: """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools: Either a dict or a list of dict. Returns: list: List of dicts with additional id key. Raises: ValueError: if not a list nor a dict. """ if isinstance(tools, list): tools = list(map(self._prepare_tool, tools)) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError("Configuration tool(s) must be a dict or list!") if tools: return self.idx_storage.indexer_configuration_add(tools) else: return [] def index( self, id: Union[bytes, Dict, Revision], data: Optional[bytes] = None, **kwargs ) -> Dict[str, Any]: """Index computation for the id and associated raw data. Args: id: identifier or Dict object data: id's data from storage or objstorage depending on object type Returns: dict: a dict that makes sense for the :meth:`.persist_index_computations` method. """ raise NotImplementedError() def filter(self, ids: List[bytes]) -> Iterator[bytes]: """Filter missing ids for that particular indexer. Args: ids: list of ids Yields: iterator of missing ids """ yield from ids @abc.abstractmethod def persist_index_computations(self, results, policy_update) -> Dict[str, int]: """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them Returns: a summary dict of what has been inserted in the storage """ return {} class ContentIndexer(BaseIndexer): """A content indexer working on a list of ids directly. To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run( self, ids: Union[List[bytes], bytes, str], policy_update: str, **kwargs ) -> Dict: """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: ids (Iterable[Union[bytes, str]]): sha1's identifier list policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them **kwargs: passed to the `index` method Returns: A summary Dict of the task's status """ status = "uneventful" sha1s = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] results = [] summary: Dict = {} try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning( "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1) ) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.append(res) status = "eventful" summary = self.persist_index_computations(results, policy_update) self.results = results except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when reading contents metadata.") status = "failed" finally: summary["status"] = status return summary class ContentPartitionIndexer(BaseIndexer): """A content partition indexer. This expects as input a partition_id and a nb_partitions. This will then index the contents within that partition. To work on a list of ids, use the :class:`ContentIndexer` instead. Note: :class:`ContentPartitionIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ @abc.abstractmethod def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None ) -> PagedResult[Sha1]: """Retrieve indexed contents within range [start, end]. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination Returns: PagedResult of Sha1. If next_page_token is None, there is no more data to fetch """ pass def _list_contents_to_index( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] ) -> Iterator[Sha1]: """Compute from storage the new contents to index in the partition_id . The already indexed contents are skipped. Args: partition_id: Index of the partition to fetch data from nb_partitions: Total number of partition indexed: Set of content already indexed. Yields: Sha1 id (bytes) of contents to index """ if not isinstance(partition_id, int) or not isinstance(nb_partitions, int): raise TypeError( f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}." ) next_page_token = None while True: result = self.storage.content_get_partition( partition_id, nb_partitions, page_token=next_page_token ) contents = result.results for c in contents: _id = hashutil.hash_to_bytes(c.sha1) if _id in indexed: continue yield _id next_page_token = result.next_page_token if next_page_token is None: break def _index_contents( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any ) -> Iterator[Dict]: """Index the contents within the partition_id. Args: start: Starting bound from range identifier end: End range identifier indexed: Set of content already indexed. Yields: indexing result as dict to persist in the indexer backend """ for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning(f"Content {sha1.hex()} not found in objstorage") continue res = self.index(sha1, raw_content, **kwargs) if res: if not isinstance(res["id"], bytes): raise TypeError( "%r.index should return ids as bytes, not %r" % (self.__class__.__name__, res["id"]) ) yield res def _index_with_skipping_already_done( self, partition_id: int, nb_partitions: int ) -> Iterator[Dict]: """Index not already indexed contents within the partition partition_id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into Yields: indexing result as dict to persist in the indexer backend """ next_page_token = None contents = set() while True: indexed_page = self.indexed_contents_in_partition( partition_id, nb_partitions, page_token=next_page_token ) for sha1 in indexed_page.results: contents.add(sha1) yield from self._index_contents(partition_id, nb_partitions, contents) next_page_token = indexed_page.next_page_token if next_page_token is None: break def run( self, partition_id: int, nb_partitions: int, skip_existing: bool = True, **kwargs, ) -> Dict: """Given a partition of content ids, index the contents within. Either the indexer is incremental (filter out existing computed data) or it computes everything from scratch. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into skip_existing: Skip existing indexed data (default) or not **kwargs: passed to the `index` method Returns: dict with the indexing task status """ status = "uneventful" summary: Dict[str, Any] = {} count = 0 try: if skip_existing: gen = self._index_with_skipping_already_done( partition_id, nb_partitions ) else: gen = self._index_contents(partition_id, nb_partitions, indexed=set([])) count_object_added_key: Optional[str] = None for contents in utils.grouper(gen, n=self.config["write_batch_size"]): res = self.persist_index_computations( contents, policy_update="update-dups" ) if not count_object_added_key: count_object_added_key = list(res.keys())[0] count += res[count_object_added_key] if count > 0: status = "eventful" except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when computing metadata.") status = "failed" finally: summary["status"] = status if count > 0 and count_object_added_key: summary[count_object_added_key] = count return summary class OriginIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run( self, origin_urls: List[str], policy_update: str = "update-dups", **kwargs ) -> Dict: """Given a list of origin urls: - retrieve origins from storage - execute the indexing computations - store the results (according to policy_update) Args: origin_urls: list of origin urls. policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates (default) or ignore them **kwargs: passed to the `index` method """ summary: Dict[str, Any] = {} status = "uneventful" results = self.index_list(origin_urls, **kwargs) summary_persist = self.persist_index_computations(results, policy_update) self.results = results if summary_persist: for value in summary_persist.values(): if value > 0: status = "eventful" summary.update(summary_persist) summary["status"] = status return summary def index_list(self, origins: List[Any], **kwargs: Any) -> List[Dict]: results = [] for origin in origins: try: res = self.index(origin, **kwargs) if res: # If no results, skip it results.append(res) except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when processing origin %s", origin["id"]) return results class RevisionIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Revision indexing using the run method Note: the :class:`RevisionIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: Union[str, bytes], policy_update: str) -> Dict: """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - store the results (according to policy_update) Args: ids: sha1_git's identifier list policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ summary: Dict[str, Any] = {} status = "uneventful" results = [] revision_ids = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] for rev in self.storage.revision_get(revision_ids): if not rev: self.log.warning( "Revisions %s not found in storage" % list(map(hashutil.hash_to_hex, ids)) ) continue try: res = self.index(rev) if res: # If no results, skip it results.append(res) except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when processing revision") status = "failed" summary_persist = self.persist_index_computations(results, policy_update) if summary_persist: for value in summary_persist.values(): if value > 0: status = "eventful" summary.update(summary_persist) self.results = results summary["status"] = status return summary diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 4260a3f..5863918 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,376 +1,381 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from typing import Any, Callable, Dict, Iterator, List, Tuple +from swh.core.config import merge_configs from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents from swh.indexer.indexer import ContentIndexer, OriginIndexer, RevisionIndexer from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.origin_head import OriginHeadIndexer from swh.indexer.storage import INDEXER_CFG_KEY from swh.model import hashutil REVISION_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 def call_with_batches( f: Callable[[List[Dict[str, Any]]], Dict["str", Any]], args: List[Dict[str, str]], batch_size: int, ) -> Iterator[str]: """Calls a function with batches of args, and concatenates the results. """ groups = grouper(args, batch_size) for group in groups: yield from f(list(group)) class ContentMetadataIndexer(ContentIndexer): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_metadata_missing( ({"id": sha1, "indexer_configuration_id": self.tool["id"],} for sha1 in ids) ) def index(self, id, data, log_suffix="unknown revision"): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ result = { "id": id, "indexer_configuration_id": self.tool["id"], "metadata": None, } try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hashutil.hash_to_hex(id) result["metadata"] = MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id) ) if result["metadata"] is None: return None return result def persist_index_computations( self, results: List[Dict], policy_update: str ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - metadata (jsonb): detected metadata policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ return self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == "update-dups") ) +DEFAULT_CONFIG: Dict[str, Any] = { + "tools": { + "name": "swh-metadata-detector", + "version": "0.0.2", + "configuration": {}, + }, +} + + class RevisionMetadataIndexer(RevisionIndexer): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_intrinsic_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ - ADDITIONAL_CONFIG = { - "tools": ( - "dict", - {"name": "swh-metadata-detector", "version": "0.0.2", "configuration": {},}, - ), - } + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config = merge_configs(DEFAULT_CONFIG, self.config) def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.revision_intrinsic_metadata_missing( ( {"id": sha1_git, "indexer_configuration_id": self.tool["id"],} for sha1_git in sha1_gits ) ) def index(self, rev): """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: rev: revision model object from storage Returns: dict: dictionary representing a revision_intrinsic_metadata, with keys: - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ result = { "id": rev.id, "indexer_configuration_id": self.tool["id"], "mappings": None, "metadata": None, } try: root_dir = rev.directory dir_ls = list(self.storage.directory_ls(root_dir, recursive=False)) if [entry["type"] for entry in dir_ls] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_ls[0]["target"] dir_ls = self.storage.directory_ls(subdir, recursive=False) files = [entry for entry in dir_ls if entry["type"] == "file"] detected_files = detect_metadata(files) (mappings, metadata) = self.translate_revision_intrinsic_metadata( detected_files, log_suffix="revision=%s" % hashutil.hash_to_hex(rev.id), ) result["mappings"] = mappings result["metadata"] = metadata except Exception as e: self.log.exception("Problem when indexing rev: %r", e) return result def persist_index_computations( self, results: List[Dict], policy_update: str ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in # revision_intrinsic_metadata return self.idx_storage.revision_intrinsic_metadata_add( results, conflict_update=(policy_update == "update-dups") ) def translate_revision_intrinsic_metadata( self, detected_files: Dict[str, List[Any]], log_suffix: str ) -> Tuple[List[Any], List[Any]]: """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files: dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ used_mappings = [MAPPINGS[context].name for context in detected_files] metadata = [] tool = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {}, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = {k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage"]} config["tools"] = [tool] for context in detected_files.keys(): cfg = deepcopy(config) cfg["tools"][0]["configuration"]["context"] = context c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context] ) for c in metadata_generator: # extracting metadata sha1 = c["id"] sha1s_in_storage.append(sha1) local_metadata = c["metadata"] # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ item for item in detected_files[context] if item not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: c_metadata_indexer.run( sha1s_filtered, policy_update="ignore-dups", log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result["metadata"] metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") metadata = merge_documents(metadata) return (used_mappings, metadata) class OriginMetadataIndexer(OriginIndexer): - ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG - USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.origin_head_indexer = OriginHeadIndexer(config=config) self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) def index_list(self, origin_urls, **kwargs): head_rev_ids = [] origins_with_head = [] origins = list( call_with_batches( self.storage.origin_get, origin_urls, ORIGIN_GET_BATCH_SIZE, ) ) for origin in origins: if origin is None: continue head_result = self.origin_head_indexer.index(origin.url) if head_result: origins_with_head.append(origin) head_rev_ids.append(head_result["revision_id"]) head_revs = list( call_with_batches( self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE ) ) assert len(head_revs) == len(head_rev_ids) results = [] for (origin, rev) in zip(origins_with_head, head_revs): if not rev: self.log.warning("Missing head revision of origin %r", origin.url) continue rev_metadata = self.revision_metadata_indexer.index(rev) orig_metadata = { "from_revision": rev_metadata["id"], "id": origin.url, "metadata": rev_metadata["metadata"], "mappings": rev_metadata["mappings"], "indexer_configuration_id": rev_metadata["indexer_configuration_id"], } results.append((orig_metadata, rev_metadata)) return results def persist_index_computations( self, results: List[Dict], policy_update: str ) -> Dict[str, int]: conflict_update = policy_update == "update-dups" # Deduplicate revisions rev_metadata: List[Any] = [] orig_metadata: List[Any] = [] revs_to_delete: List[Any] = [] origs_to_delete: List[Any] = [] summary: Dict = {} for (orig_item, rev_item) in results: assert rev_item["metadata"] == orig_item["metadata"] if not rev_item["metadata"] or rev_item["metadata"].keys() <= {"@context"}: # If we didn't find any metadata, don't store a DB record # (and delete existing ones, if any) if rev_item not in revs_to_delete: revs_to_delete.append(rev_item) if orig_item not in origs_to_delete: origs_to_delete.append(orig_item) else: if rev_item not in rev_metadata: rev_metadata.append(rev_item) if orig_item not in orig_metadata: orig_metadata.append(orig_item) if rev_metadata: summary_rev = self.idx_storage.revision_intrinsic_metadata_add( rev_metadata, conflict_update=conflict_update ) summary.update(summary_rev) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add( orig_metadata, conflict_update=conflict_update ) summary.update(summary_ori) # revs_to_delete should always be empty unless we changed a mapping # to detect less files or less content. # However, origs_to_delete may be empty whenever an upstream deletes # a metadata file. if origs_to_delete: summary_ori = self.idx_storage.origin_intrinsic_metadata_delete( origs_to_delete ) summary.update(summary_ori) if revs_to_delete: summary_rev = self.idx_storage.revision_intrinsic_metadata_delete( revs_to_delete ) summary.update(summary_rev) return summary diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py index 80fbf0b..d2cc437 100644 --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,162 +1,164 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Union import magic +from swh.core.config import merge_configs from swh.indexer.storage.interface import PagedResult, Sha1 from swh.model.model import Revision from .indexer import ContentIndexer, ContentPartitionIndexer if not hasattr(magic.Magic, "from_buffer"): raise ImportError( 'Expected "import magic" to import python-magic, but file_magic ' "was imported instead." ) def compute_mimetype_encoding(raw_content: bytes) -> Dict[str, bytes]: """Determine mimetype and encoding from the raw content. Args: raw_content: content's raw data Returns: dict: mimetype and encoding key and corresponding values. """ m = magic.Magic(mime=True, mime_encoding=True) res = m.from_buffer(raw_content) try: mimetype, encoding = res.split("; charset=") except ValueError: mimetype, encoding = res, "" return { "mimetype": mimetype, "encoding": encoding, } +DEFAULT_CONFIG: Dict[str, Any] = { + "tools": { + "name": "file", + "version": "1:5.30-1+deb9u1", + "configuration": {"type": "library", "debian-package": "python3-magic"}, + }, + "write_batch_size": 1000, +} + + class MixinMimetypeIndexer: """Mixin mimetype indexer. See :class:`MimetypeIndexer` and :class:`MimetypePartitionIndexer` """ tool: Any idx_storage: Any - ADDITIONAL_CONFIG = { - "tools": ( - "dict", - { - "name": "file", - "version": "1:5.30-1+deb9u1", - "configuration": {"type": "library", "debian-package": "python3-magic"}, - }, - ), - "write_batch_size": ("int", 1000), - } - CONFIG_BASE_FILENAME = "indexer/mimetype" # type: Optional[str] + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config = merge_configs(DEFAULT_CONFIG, self.config) def index( self, id: Union[bytes, Dict, Revision], data: Optional[bytes] = None, **kwargs ) -> Dict[str, Any]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: content's mimetype; dict keys being - id: content's identifier (sha1) - mimetype: mimetype in bytes - encoding: encoding in bytes """ assert data is not None properties = compute_mimetype_encoding(data) assert isinstance(id, bytes) properties.update( {"id": id, "indexer_configuration_id": self.tool["id"],} ) return properties def persist_index_computations( self, results: List[Dict], policy_update: str ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content's mimetype dicts (see :meth:`.index`) policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ return self.idx_storage.content_mimetype_add( results, conflict_update=(policy_update == "update-dups") ) class MimetypeIndexer(MixinMimetypeIndexer, ContentIndexer): """Mimetype Indexer working on list of content identifiers. It: - (optionally) filters out content already indexed (cf. :meth:`.filter`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_mimetype_missing( ({"id": sha1, "indexer_configuration_id": self.tool["id"],} for sha1 in ids) ) class MimetypePartitionIndexer(MixinMimetypeIndexer, ContentPartitionIndexer): """Mimetype Range Indexer working on range of content identifiers. It: - (optionally) filters out content already indexed (cf :meth:`.indexed_contents_in_partition`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, ) -> PagedResult[Sha1]: """Retrieve indexed content ids within partition_id. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination Returns: PagedResult of Sha1. If next_page_token is None, there is no more data to fetch """ return self.idx_storage.content_mimetype_get_partition( self.tool["id"], partition_id, nb_partitions, page_token=page_token ) diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py index 9b0151c..0db0f84 100644 --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -1,191 +1,175 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import itertools import logging from typing import Any, Dict, Generator, List, Optional, Tuple from swh.core import utils -from swh.core.config import SWHConfig +from swh.core.config import load_from_envvar from swh.model import hashutil from swh.model.model import Content from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.storage import get_storage - -class RecomputeChecksums(SWHConfig): +DEFAULT_CONFIG: Dict[str, Any] = { + "storage": {"cls": "memory"}, + "objstorage": {"cls": "memory"}, + # the set of checksums that should be computed. + # Examples: 'sha1_git', 'blake2b512', 'blake2s256' + "compute_checksums": [], + # whether checksums that already exist in the DB should be + # recomputed/updated or left untouched + "recompute_checksums": False, + # Number of contents to retrieve blobs at the same time + "batch_size_retrieve_content": 10, + # Number of contents to update at the same time + "batch_size_update": 100, +} + + +class RecomputeChecksums: """Class in charge of (re)computing content's hashes. Hashes to compute are defined across 2 configuration options: compute_checksums ([str]) list of hash algorithms that py:func:`swh.model.hashutil.MultiHash.from_data` function should be able to deal with. For variable-length checksums, a desired checksum length should also be provided. Their format is : e.g: blake2:512 recompute_checksums (bool) a boolean to notify that we also want to recompute potential existing hashes specified in compute_checksums. Default to False. """ - DEFAULT_CONFIG = { - # The storage to read from or update metadata to - "storage": ( - "dict", - {"cls": "remote", "args": {"url": "http://localhost:5002/"},}, - ), - # The objstorage to read contents' data from - "objstorage": ( - "dict", - { - "cls": "pathslicing", - "args": { - "root": "/srv/softwareheritage/objects", - "slicing": "0:2/2:4/4:6", - }, - }, - ), - # the set of checksums that should be computed. - # Examples: 'sha1_git', 'blake2b512', 'blake2s256' - "compute_checksums": ("list[str]", []), - # whether checksums that already exist in the DB should be - # recomputed/updated or left untouched - "recompute_checksums": ("bool", False), - # Number of contents to retrieve blobs at the same time - "batch_size_retrieve_content": ("int", 10), - # Number of contents to update at the same time - "batch_size_update": ("int", 100), - } - - CONFIG_BASE_FILENAME = "indexer/rehash" - def __init__(self) -> None: - self.config = self.parse_config_file() + self.config = load_from_envvar(DEFAULT_CONFIG) self.storage = get_storage(**self.config["storage"]) self.objstorage = get_objstorage(**self.config["objstorage"]) self.compute_checksums = self.config["compute_checksums"] self.recompute_checksums = self.config["recompute_checksums"] self.batch_size_retrieve_content = self.config["batch_size_retrieve_content"] self.batch_size_update = self.config["batch_size_update"] self.log = logging.getLogger("swh.indexer.rehash") if not self.compute_checksums: raise ValueError("Checksums list should not be empty.") def _read_content_ids( self, contents: List[Dict[str, Any]] ) -> Generator[bytes, Any, None]: """Read the content identifiers from the contents. """ for c in contents: h = c["sha1"] if isinstance(h, str): h = hashutil.hash_to_bytes(h) yield h def get_new_contents_metadata( self, all_contents: List[Dict[str, Any]] ) -> Generator[Tuple[Dict[str, Any], List[Any]], Any, None]: """Retrieve raw contents and compute new checksums on the contents. Unknown or corrupted contents are skipped. Args: all_contents: List of contents as dictionary with the necessary primary keys Yields: tuple: tuple of (content to update, list of checksums computed) """ content_ids = self._read_content_ids(all_contents) for contents in utils.grouper(content_ids, self.batch_size_retrieve_content): contents_iter = itertools.tee(contents, 2) try: sha1s = [s for s in contents_iter[0]] content_metadata: List[Optional[Content]] = self.storage.content_get( sha1s ) except Exception: self.log.exception("Problem when reading contents metadata.") continue for sha1, content_model in zip(sha1s, content_metadata): if not content_model: continue content: Dict = content_model.to_dict() # Recompute checksums provided in compute_checksums options if self.recompute_checksums: checksums_to_compute = list(self.compute_checksums) else: # Compute checksums provided in compute_checksums # options not already defined for that content checksums_to_compute = [ h for h in self.compute_checksums if not content.get(h) ] if not checksums_to_compute: # Nothing to recompute continue try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning("Content %s not found in objstorage!", sha1) continue content_hashes = hashutil.MultiHash.from_data( raw_content, hash_names=checksums_to_compute ).digest() content.update(content_hashes) yield content, checksums_to_compute def run(self, contents: List[Dict[str, Any]]) -> Dict: """Given a list of content: - (re)compute a given set of checksums on contents available in our object storage - update those contents with the new metadata Args: contents: contents as dictionary with necessary keys. key present in such dictionary should be the ones defined in the 'primary_key' option. Returns: A summary dict with key 'status', task' status and 'count' the number of updated contents. """ status = "uneventful" count = 0 for data in utils.grouper( self.get_new_contents_metadata(contents), self.batch_size_update ): groups: Dict[str, List[Any]] = defaultdict(list) for content, keys_to_update in data: keys_str = ",".join(keys_to_update) groups[keys_str].append(content) for keys_to_update, contents in groups.items(): keys: List[str] = keys_to_update.split(",") try: self.storage.content_update(contents, keys=keys) count += len(contents) status = "eventful" except Exception: self.log.exception("Problem during update.") continue return { "status": status, "count": count, } diff --git a/swh/indexer/tests/test_origin_head.py b/swh/indexer/tests/test_origin_head.py index ae05a52..5faeb04 100644 --- a/swh/indexer/tests/test_origin_head.py +++ b/swh/indexer/tests/test_origin_head.py @@ -1,175 +1,190 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy from datetime import datetime, timezone import unittest +import pytest + from swh.indexer.origin_head import OriginHeadIndexer -from swh.indexer.tests.utils import BASE_TEST_CONFIG, fill_storage +from swh.indexer.tests.utils import fill_storage from swh.model.model import ( Origin, OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.utils import now -ORIGIN_HEAD_CONFIG = { - **BASE_TEST_CONFIG, - "tools": {"name": "origin-metadata", "version": "0.0.1", "configuration": {},}, - "tasks": {"revision_intrinsic_metadata": None, "origin_intrinsic_metadata": None,}, -} + +@pytest.fixture +def swh_indexer_config(swh_indexer_config): + config = copy.deepcopy(swh_indexer_config) + config.update( + { + "tools": { + "name": "origin-metadata", + "version": "0.0.1", + "configuration": {}, + }, + "tasks": { + "revision_intrinsic_metadata": None, + "origin_intrinsic_metadata": None, + }, + } + ) + return config class OriginHeadTestIndexer(OriginHeadIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ - def parse_config_file(self, *args, **kwargs): - return ORIGIN_HEAD_CONFIG - def persist_index_computations(self, results, policy_update): self.results = results class OriginHead(unittest.TestCase): - def setUp(self): + @pytest.fixture(autouse=True) + def init(self, swh_config): + super().setUp() self.indexer = OriginHeadTestIndexer() self.indexer.catch_exceptions = False fill_storage(self.indexer.storage) def test_git(self): origin_url = "https://github.com/SoftwareHeritage/swh-storage" self.indexer.run([origin_url]) rev_id = b"8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}\xac\xefrm" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}], ) def test_git_partial_snapshot(self): """Checks partial snapshots are ignored.""" origin_url = "https://github.com/SoftwareHeritage/swh-core" self.indexer.storage.origin_add([Origin(url=origin_url)]) visit = self.indexer.storage.origin_visit_add( [ OriginVisit( origin=origin_url, date=datetime(2019, 2, 27, tzinfo=timezone.utc), type="git", ) ] )[0] self.indexer.storage.snapshot_add( [ Snapshot( branches={ b"foo": None, b"HEAD": SnapshotBranch( target_type=TargetType.ALIAS, target=b"foo", ), }, ), ] ) visit_status = OriginVisitStatus( origin=origin_url, visit=visit.visit, date=now(), status="partial", snapshot=b"foo", ) self.indexer.storage.origin_visit_status_add([visit_status]) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_vcs_missing_snapshot(self): origin_url = "https://github.com/SoftwareHeritage/swh-indexer" self.indexer.storage.origin_add([Origin(url=origin_url)]) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_pypi_missing_branch(self): origin_url = "https://pypi.org/project/abcdef/" self.indexer.storage.origin_add([Origin(url=origin_url,)]) visit = self.indexer.storage.origin_visit_add( [ OriginVisit( origin=origin_url, date=datetime(2019, 2, 27, tzinfo=timezone.utc), type="pypi", ) ] )[0] self.indexer.storage.snapshot_add( [ Snapshot( branches={ b"foo": None, b"HEAD": SnapshotBranch( target_type=TargetType.ALIAS, target=b"foo", ), }, ) ] ) visit_status = OriginVisitStatus( origin=origin_url, visit=visit.visit, date=now(), status="full", snapshot=b"foo", ) self.indexer.storage.origin_visit_status_add([visit_status]) self.indexer.run(["https://pypi.org/project/abcdef/"]) self.assertEqual(self.indexer.results, []) def test_ftp(self): origin_url = "rsync://ftp.gnu.org/gnu/3dldf" self.indexer.run([origin_url]) rev_id = b"\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee\xcc\x1a\xb4`\x8c\x8by" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}], ) def test_ftp_missing_snapshot(self): origin_url = "rsync://ftp.gnu.org/gnu/foobar" self.indexer.storage.origin_add([Origin(url=origin_url)]) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_deposit(self): origin_url = "https://forge.softwareheritage.org/source/jesuisgpl/" self.indexer.storage.origin_add([Origin(url=origin_url)]) self.indexer.run([origin_url]) rev_id = b"\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{\xa6\xe9\x99\xb1\x9e]q\xeb" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}], ) def test_deposit_missing_snapshot(self): origin_url = "https://forge.softwareheritage.org/source/foobar" self.indexer.storage.origin_add([Origin(url=origin_url,)]) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_pypi(self): origin_url = "https://pypi.org/project/limnoria/" self.indexer.run([origin_url]) rev_id = b"\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8kA\x10\x9d\xc5\xfa2\xf8t" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url}], ) def test_svn(self): origin_url = "http://0-512-md.googlecode.com/svn/" self.indexer.run([origin_url]) rev_id = b"\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8\xc9\xad#.\x1bw=\x18" self.assertEqual( self.indexer.results, [{"revision_id": rev_id, "origin_url": origin_url,}], )