diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index a133a40..b20be46 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,404 +1,405 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, ) import sentry_sdk from swh.core.config import merge_configs from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents from swh.indexer.indexer import ContentIndexer, OriginIndexer, RevisionIndexer from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import MAPPINGS -from swh.indexer.origin_head import OriginHeadIndexer +from swh.indexer.origin_head import get_head_swhid from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 from swh.indexer.storage.model import ( ContentMetadataRow, OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) from swh.model import hashutil from swh.model.model import Origin, Revision, Sha1Git +from swh.model.swhids import ObjectType REVISION_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 T1 = TypeVar("T1") T2 = TypeVar("T2") def call_with_batches( f: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function with batches of args, and concatenates the results.""" groups = grouper(args, batch_size) for group in groups: yield from f(list(group)) class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": sha1, "indexer_configuration_id": self.tool["id"], } for sha1 in ids ) ) def index( self, id: Sha1, data: Optional[bytes] = None, log_suffix="unknown revision", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert isinstance(id, bytes) assert data is not None metadata = None try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hashutil.hash_to_hex(id) metadata = MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id) ) sentry_sdk.capture_exception() if metadata is None: return [] return [ ContentMetadataRow( id=id, indexer_configuration_id=self.tool["id"], metadata=metadata, ) ] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - metadata (jsonb): detected metadata """ return self.idx_storage.content_metadata_add(results) DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh-metadata-detector", "version": "0.0.2", "configuration": {}, }, } class RevisionMetadataIndexer(RevisionIndexer[RevisionIntrinsicMetadataRow]): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_intrinsic_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.revision_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) ) def index( self, id: Sha1Git, data: Optional[Revision], **kwargs ) -> List[RevisionIntrinsicMetadataRow]: """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: id: sha1_git of the revision data: revision model object from storage Returns: dict: dictionary representing a revision_intrinsic_metadata, with keys: - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ rev = data assert isinstance(rev, Revision) try: root_dir = rev.directory dir_ls = list(self.storage.directory_ls(root_dir, recursive=False)) if [entry["type"] for entry in dir_ls] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_ls[0]["target"] dir_ls = list(self.storage.directory_ls(subdir, recursive=False)) files = [entry for entry in dir_ls if entry["type"] == "file"] detected_files = detect_metadata(files) (mappings, metadata) = self.translate_revision_intrinsic_metadata( detected_files, log_suffix="revision=%s" % hashutil.hash_to_hex(rev.id), ) except Exception as e: self.log.exception("Problem when indexing rev: %r", e) sentry_sdk.capture_exception() return [ RevisionIntrinsicMetadataRow( id=rev.id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[RevisionIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ # TODO: add functions in storage to keep data in # revision_intrinsic_metadata return self.idx_storage.revision_intrinsic_metadata_add(results) def translate_revision_intrinsic_metadata( self, detected_files: Dict[str, List[Any]], log_suffix: str ) -> Tuple[List[Any], Any]: """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files: dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ used_mappings = [MAPPINGS[context].name for context in detected_files] metadata = [] tool = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {}, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = {k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage"]} config["tools"] = [tool] for context in detected_files.keys(): cfg = deepcopy(config) cfg["tools"][0]["configuration"]["context"] = context c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context] ) for c in metadata_generator: # extracting metadata sha1 = c.id sha1s_in_storage.append(sha1) local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ item for item in detected_files[context] if item not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: c_metadata_indexer.run( sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result.metadata metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") sentry_sdk.capture_exception() metadata = merge_documents(metadata) return (used_mappings, metadata) class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]] ): USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) - self.origin_head_indexer = OriginHeadIndexer(config=config) self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) def index_list( self, origins: List[Origin], check_origin_known: bool = True, **kwargs ) -> List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]]: head_rev_ids = [] origins_with_head = [] # Filter out origins not in the storage if check_origin_known: known_origins = list( call_with_batches( self.storage.origin_get, [origin.url for origin in origins], ORIGIN_GET_BATCH_SIZE, ) ) else: known_origins = list(origins) for origin in known_origins: if origin is None: continue - head_results = self.origin_head_indexer.index(origin.url) - if head_results: - (head_result,) = head_results + head_swhid = get_head_swhid(self.storage, origin.url) + if head_swhid: + # TODO: add support for releases + assert head_swhid.object_type == ObjectType.REVISION, head_swhid origins_with_head.append(origin) - head_rev_ids.append(head_result["revision_id"]) + head_rev_ids.append(head_swhid.object_id) head_revs = list( call_with_batches( self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE ) ) assert len(head_revs) == len(head_rev_ids) results = [] for (origin, rev) in zip(origins_with_head, head_revs): if not rev: self.log.warning("Missing head revision of origin %r", origin.url) continue for rev_metadata in self.revision_metadata_indexer.index(rev.id, rev): # There is at most one rev_metadata orig_metadata = OriginIntrinsicMetadataRow( from_revision=rev_metadata.id, id=origin.url, metadata=rev_metadata.metadata, mappings=rev_metadata.mappings, indexer_configuration_id=rev_metadata.indexer_configuration_id, ) results.append((orig_metadata, rev_metadata)) return results def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate revisions rev_metadata: List[RevisionIntrinsicMetadataRow] = [] orig_metadata: List[OriginIntrinsicMetadataRow] = [] summary: Dict = {} for (orig_item, rev_item) in results: assert rev_item.metadata == orig_item.metadata if rev_item.metadata and not (rev_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if rev_item not in rev_metadata: rev_metadata.append(rev_item) if orig_item not in orig_metadata: orig_metadata.append(orig_item) if rev_metadata: summary_rev = self.idx_storage.revision_intrinsic_metadata_add(rev_metadata) summary.update(summary_rev) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) summary.update(summary_ori) return summary diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py index 1b955dd..6e79e1e 100644 --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -1,159 +1,120 @@ -# Copyright (C) 2018-2020 The Software Heritage developers +# Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging import re -from typing import Any, Dict, List, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union -import click - -from swh.indexer.indexer import OriginIndexer from swh.model.model import SnapshotBranch, TargetType +from swh.model.swhids import CoreSWHID, ObjectType from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.snapshot import snapshot_get_all_branches -class OriginHeadIndexer(OriginIndexer[Dict]): - """Origin-level indexer. - - This indexer is in charge of looking up the revision that acts as the - "head" of an origin. - - In git, this is usually the commit pointed to by the 'master' branch.""" - - USE_TOOLS = False - - def persist_index_computations(self, results: Any) -> Dict[str, int]: - """Do nothing. The indexer's results are not persistent, they - should only be piped to another indexer.""" - return {} - - # Dispatch - - def index(self, id: str, data: None = None, **kwargs) -> List[Dict]: - origin_url = id - visit_status = origin_get_latest_visit_status( - self.storage, origin_url, allowed_statuses=["full"], require_snapshot=True - ) - if not visit_status: - return [] - assert visit_status.snapshot is not None - snapshot = snapshot_get_all_branches(self.storage, visit_status.snapshot) - if snapshot is None: - return [] - method = getattr( - self, "_try_get_%s_head" % visit_status.type, self._try_get_head_generic - ) - - rev_id = method(snapshot.branches) # type: ignore - if rev_id is not None: - return [ - { - "origin_url": origin_url, - "revision_id": rev_id, - } - ] - - # could not find a head revision - return [] - - # Tarballs - - _archive_filename_re = re.compile( - rb"^" - rb"(?P.*)[-_]" - rb"(?P[0-9]+(\.[0-9])*)" - rb"(?P[-+][a-zA-Z0-9.~]+?)?" - rb"(?P(\.[a-zA-Z0-9]+)+)" - rb"$" +def get_head_swhid(storage, origin_url: str) -> Optional[CoreSWHID]: + """Returns the SWHID of the head revision or release of an origin""" + visit_status = origin_get_latest_visit_status( + storage, origin_url, allowed_statuses=["full"], require_snapshot=True ) + if not visit_status: + return None + assert visit_status.snapshot is not None + snapshot = snapshot_get_all_branches(storage, visit_status.snapshot) + if snapshot is None: + return None + + if visit_status.type == "ftp": + return _try_get_ftp_head(dict(snapshot.branches)) + else: + return _try_get_head_generic(dict(snapshot.branches)) + + +_archive_filename_re = re.compile( + rb"^" + rb"(?P.*)[-_]" + rb"(?P[0-9]+(\.[0-9])*)" + rb"(?P[-+][a-zA-Z0-9.~]+?)?" + rb"(?P(\.[a-zA-Z0-9]+)+)" + rb"$" +) - @classmethod - def _parse_version(cls: Any, filename: bytes) -> Tuple[Union[float, int], ...]: - """Extracts the release version from an archive filename, - to get an ordering whose maximum is likely to be the last - version of the software - - >>> OriginHeadIndexer._parse_version(b'foo') - (-inf,) - >>> OriginHeadIndexer._parse_version(b'foo.tar.gz') - (-inf,) - >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1.tar.gz') - (0, 0, 1, 0) - >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') - (0, 0, 1, -1, 'beta2') - >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') - (0, 0, 1, 1, 'foobar') - """ - res = cls._archive_filename_re.match(filename) - if res is None: - return (float("-infinity"),) - version = [int(n) for n in res.group("version").decode().split(".")] - if res.group("preversion") is None: - version.append(0) + +def _parse_version(filename: bytes) -> Tuple[Union[float, int, str], ...]: + """Extracts the release version from an archive filename, + to get an ordering whose maximum is likely to be the last + version of the software + + >>> _parse_version(b'foo') + (-inf,) + >>> _parse_version(b'foo.tar.gz') + (-inf,) + >>> _parse_version(b'gnu-hello-0.0.1.tar.gz') + (0, 0, 1, 0) + >>> _parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') + (0, 0, 1, -1, 'beta2') + >>> _parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') + (0, 0, 1, 1, 'foobar') + """ + res = _archive_filename_re.match(filename) + if res is None: + return (float("-infinity"),) + version: List[Union[float, int, str]] = [ + int(n) for n in res.group("version").decode().split(".") + ] + if res.group("preversion") is None: + version.append(0) + else: + preversion = res.group("preversion").decode() + if preversion.startswith("-"): + version.append(-1) + version.append(preversion[1:]) + elif preversion.startswith("+"): + version.append(1) + version.append(preversion[1:]) else: - preversion = res.group("preversion").decode() - if preversion.startswith("-"): - version.append(-1) - version.append(preversion[1:]) - elif preversion.startswith("+"): - version.append(1) - version.append(preversion[1:]) - else: - assert False, res.group("preversion") - return tuple(version) - - def _try_get_ftp_head(self, branches: Dict[bytes, SnapshotBranch]) -> Any: - archive_names = list(branches) - max_archive_name = max(archive_names, key=self._parse_version) - r = self._try_resolve_target(branches, max_archive_name) - return r - - # Generic - - def _try_get_head_generic(self, branches: Dict[bytes, SnapshotBranch]) -> Any: - # Works on 'deposit', 'pypi', and VCSs. - return self._try_resolve_target(branches, b"HEAD") or self._try_resolve_target( - branches, b"master" - ) - - def _try_resolve_target( - self, branches: Dict[bytes, SnapshotBranch], branch_name: bytes - ) -> Any: - try: - branch = branches[branch_name] - if branch is None: - return None - while branch.target_type == TargetType.ALIAS: - branch = branches[branch.target] - if branch is None: - return None - - if branch.target_type == TargetType.REVISION: - return branch.target - elif branch.target_type == TargetType.CONTENT: - return None # TODO - elif branch.target_type == TargetType.DIRECTORY: - return None # TODO - elif branch.target_type == TargetType.RELEASE: - return None # TODO - else: - assert False, branch - except KeyError: - return None + assert False, res.group("preversion") + return tuple(version) -@click.command() -@click.option( - "--origins", "-i", help='Origins to lookup, in the "type+url" format', multiple=True -) -def main(origins: List[str]) -> None: - rev_metadata_indexer = OriginHeadIndexer() - rev_metadata_indexer.run(origins) +def _try_get_ftp_head( + branches: Dict[bytes, Optional[SnapshotBranch]] +) -> Optional[CoreSWHID]: + archive_names = list(branches) + max_archive_name = max(archive_names, key=_parse_version) + return _try_resolve_target(branches, max_archive_name) -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - main() +def _try_get_head_generic( + branches: Dict[bytes, Optional[SnapshotBranch]] +) -> Optional[CoreSWHID]: + # Works on 'deposit', 'pypi', and VCSs. + return _try_resolve_target(branches, b"HEAD") or _try_resolve_target( + branches, b"master" + ) + + +def _try_resolve_target( + branches: Dict[bytes, Optional[SnapshotBranch]], branch_name: bytes +) -> Optional[CoreSWHID]: + try: + branch = branches[branch_name] + if branch is None: + return None + while branch.target_type == TargetType.ALIAS: + branch = branches[branch.target] + if branch is None: + return None + + if branch.target_type == TargetType.REVISION: + return CoreSWHID(object_type=ObjectType.REVISION, object_id=branch.target) + elif branch.target_type == TargetType.CONTENT: + return None # TODO + elif branch.target_type == TargetType.DIRECTORY: + return None # TODO + elif branch.target_type == TargetType.RELEASE: + return None # TODO + else: + assert False, branch + except KeyError: + return None diff --git a/swh/indexer/tests/tasks.py b/swh/indexer/tests/tasks.py index 0cd55fd..58bcb81 100644 --- a/swh/indexer/tests/tasks.py +++ b/swh/indexer/tests/tasks.py @@ -1,47 +1,45 @@ from celery import current_app as app from swh.indexer.metadata import OriginMetadataIndexer, RevisionMetadataIndexer from .test_metadata import ContentMetadataTestIndexer -from .test_origin_head import OriginHeadTestIndexer from .utils import BASE_TEST_CONFIG class RevisionMetadataTestIndexer(RevisionMetadataIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ ContentMetadataIndexer = ContentMetadataTestIndexer def parse_config_file(self, *args, **kwargs): return { **BASE_TEST_CONFIG, "tools": { "name": "swh-metadata-detector", "version": "0.0.2", "configuration": {"type": "local", "context": "NpmMapping"}, }, } class OriginMetadataTestIndexer(OriginMetadataIndexer): def parse_config_file(self, *args, **kwargs): return {**BASE_TEST_CONFIG, "tools": []} def _prepare_sub_indexers(self): - self.origin_head_indexer = OriginHeadTestIndexer() self.revision_metadata_indexer = RevisionMetadataTestIndexer() @app.task def revision_intrinsic_metadata(*args, **kwargs): indexer = RevisionMetadataTestIndexer() indexer.run(*args, **kwargs) print("REV RESULT=", indexer.results) @app.task def origin_intrinsic_metadata(*args, **kwargs): indexer = OriginMetadataTestIndexer() indexer.run(*args, **kwargs) diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 763c896..20ea218 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,655 +1,655 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import reduce import re from typing import Any, Dict, List from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Consumer import pytest from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes from swh.model.model import OriginVisitStatus from .utils import REVISION def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ { "tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, from_revision=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] revision_metadata = [ RevisionIntrinsicMetadataRow( id=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.revision_intrinsic_metadata_add(revision_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) @pytest.fixture def cli_runner(): return CliRunner() def test_cli_mapping_list(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list"], catch_exceptions=False, ) expected_output = "\n".join( [ "cff", "codemeta", "gemspec", "maven", "npm", "pkg-info", "", ] # must be sorted for test to pass ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_cli_mapping_list_terms(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_cli_mapping_list_terms_exclude(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_empty_db( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_divisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_dry_run( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_nondivisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--batch-size", "20", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_mapping( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_two_mappings( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "--config-file", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_tool( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]), ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def now(): return datetime.datetime.now(tz=datetime.timezone.utc) def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), "--origin-metadata-task-type", "index-origin-metadata", ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") # This can be split into multiple tasks but no more than the origin-visit-statuses # written in the journal assert len(tasks) <= len(visit_statuses_full) actual_origins = [] for task in tasks: actual_task = dict(task) assert actual_task["type"] == "index-origin-metadata" scheduled_origins = actual_task["arguments"]["args"][0] actual_origins.extend(scheduled_origins) assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} def test_cli_journal_client_without_brokers( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer ): """Without brokers configuration, the cli fails.""" with pytest.raises(ValueError, match="brokers"): cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", ], catch_exceptions=False, ) @pytest.mark.parametrize("indexer_name", ["origin-intrinsic-metadata", "*"]) def test_cli_journal_client_index( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] storage.revision_add([REVISION]) mocker.patch( - "swh.indexer.origin_head.OriginHeadIndexer.index", - return_value=[{"revision_id": REVISION.id}], + "swh.indexer.metadata.get_head_swhid", + return_value=REVISION.swhid(), ) mocker.patch( "swh.indexer.metadata.RevisionMetadataIndexer.index", return_value=[ RevisionIntrinsicMetadataRow( id=REVISION.id, indexer_configuration_id=1, mappings=["cff"], metadata={"foo": "bar"}, ) ], ) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_intrinsic_metadata_get( [status.origin for status in visit_statuses] ) expected_results = [ OriginIntrinsicMetadataRow( id=status.origin, from_revision=REVISION.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["cff"], metadata={"foo": "bar"}, ) for status in sorted(visit_statuses_full, key=lambda r: r.origin) ] assert sorted(results, key=lambda r: r.id) == expected_results diff --git a/swh/indexer/tests/test_origin_head.py b/swh/indexer/tests/test_origin_head.py index 5e87331..21f8637 100644 --- a/swh/indexer/tests/test_origin_head.py +++ b/swh/indexer/tests/test_origin_head.py @@ -1,211 +1,152 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import copy from datetime import datetime, timezone import pytest -from swh.indexer.origin_head import OriginHeadIndexer +from swh.indexer.origin_head import get_head_swhid from swh.indexer.tests.utils import fill_storage from swh.model.model import ( Origin, OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, TargetType, ) +from swh.model.swhids import CoreSWHID from swh.storage.utils import now - -@pytest.fixture -def swh_indexer_config(swh_indexer_config): - config = copy.deepcopy(swh_indexer_config) - config.update( - { - "tools": { - "name": "origin-metadata", - "version": "0.0.1", - "configuration": {}, - }, - "tasks": { - "revision_intrinsic_metadata": None, - "origin_intrinsic_metadata": None, - }, - } - ) - return config - - -class OriginHeadTestIndexer(OriginHeadIndexer): - """Specific indexer whose configuration is enough to satisfy the - indexing tests. - """ - - def persist_index_computations(self, results): - self.results = results - - SAMPLE_SNAPSHOT = Snapshot( branches={ b"foo": None, b"HEAD": SnapshotBranch( target_type=TargetType.ALIAS, target=b"foo", ), }, ) @pytest.fixture -def indexer(swh_config): - indexer = OriginHeadTestIndexer() - indexer.catch_exceptions = False - fill_storage(indexer.storage) - return indexer +def storage(swh_storage): + fill_storage(swh_storage) + return swh_storage -def test_git(indexer): +def test_git(storage): origin_url = "https://github.com/SoftwareHeritage/swh-storage" - indexer.run([origin_url]) - rev_id = b"8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}\xac\xefrm" - assert indexer.results == ( - [ - { - "revision_id": rev_id, - "origin_url": origin_url, - } - ] + assert get_head_swhid(storage, origin_url) == CoreSWHID.from_string( + "swh:1:rev:384b12006403cce45d6253e38f7bd77dacef726d" ) -def test_git_partial_snapshot(indexer): +def test_git_partial_snapshot(storage): """Checks partial snapshots are ignored.""" origin_url = "https://github.com/SoftwareHeritage/swh-core" - indexer.storage.origin_add([Origin(url=origin_url)]) - visit = indexer.storage.origin_visit_add( + storage.origin_add([Origin(url=origin_url)]) + visit = storage.origin_visit_add( [ OriginVisit( origin=origin_url, date=datetime(2019, 2, 27, tzinfo=timezone.utc), type="git", ) ] )[0] - indexer.storage.snapshot_add([SAMPLE_SNAPSHOT]) + storage.snapshot_add([SAMPLE_SNAPSHOT]) visit_status = OriginVisitStatus( origin=origin_url, visit=visit.visit, date=now(), status="partial", snapshot=SAMPLE_SNAPSHOT.id, ) - indexer.storage.origin_visit_status_add([visit_status]) - indexer.run([origin_url]) - assert indexer.results == [] + storage.origin_visit_status_add([visit_status]) + assert get_head_swhid(storage, origin_url) is None -def test_vcs_missing_snapshot(indexer): +def test_vcs_missing_snapshot(storage): origin_url = "https://github.com/SoftwareHeritage/swh-indexer" - indexer.storage.origin_add([Origin(url=origin_url)]) - indexer.run([origin_url]) - assert indexer.results == [] + storage.origin_add([Origin(url=origin_url)]) + assert get_head_swhid(storage, origin_url) is None -def test_pypi_missing_branch(indexer): +def test_pypi_missing_branch(storage): origin_url = "https://pypi.org/project/abcdef/" - indexer.storage.origin_add( + storage.origin_add( [ Origin( url=origin_url, ) ] ) - visit = indexer.storage.origin_visit_add( + visit = storage.origin_visit_add( [ OriginVisit( origin=origin_url, date=datetime(2019, 2, 27, tzinfo=timezone.utc), type="pypi", ) ] )[0] - indexer.storage.snapshot_add([SAMPLE_SNAPSHOT]) + storage.snapshot_add([SAMPLE_SNAPSHOT]) visit_status = OriginVisitStatus( origin=origin_url, visit=visit.visit, date=now(), status="full", snapshot=SAMPLE_SNAPSHOT.id, ) - indexer.storage.origin_visit_status_add([visit_status]) - indexer.run(["https://pypi.org/project/abcdef/"]) - assert indexer.results == [] + storage.origin_visit_status_add([visit_status]) + assert get_head_swhid(storage, origin_url) is None -def test_ftp(indexer): +def test_ftp(storage): origin_url = "rsync://ftp.gnu.org/gnu/3dldf" - indexer.run([origin_url]) - rev_id = b"\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee\xcc\x1a\xb4`\x8c\x8by" - assert indexer.results == [ - { - "revision_id": rev_id, - "origin_url": origin_url, - } - ] + assert get_head_swhid(storage, origin_url) == CoreSWHID.from_string( + "swh:1:rev:8ea98e2fea7d9f6546f49ffdeecc1ab4608c8b79" + ) -def test_ftp_missing_snapshot(indexer): +def test_ftp_missing_snapshot(storage): origin_url = "rsync://ftp.gnu.org/gnu/foobar" - indexer.storage.origin_add([Origin(url=origin_url)]) - indexer.run([origin_url]) - assert indexer.results == [] + storage.origin_add([Origin(url=origin_url)]) + assert get_head_swhid(storage, origin_url) is None -def test_deposit(indexer): +def test_deposit(storage): origin_url = "https://forge.softwareheritage.org/source/jesuisgpl/" - indexer.storage.origin_add([Origin(url=origin_url)]) - indexer.run([origin_url]) - rev_id = b"\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{\xa6\xe9\x99\xb1\x9e]q\xeb" - assert indexer.results == [ - { - "revision_id": rev_id, - "origin_url": origin_url, - } - ] - - -def test_deposit_missing_snapshot(indexer): + storage.origin_add([Origin(url=origin_url)]) + assert get_head_swhid(storage, origin_url) == CoreSWHID.from_string( + "swh:1:rev:e76ea49c9ffbb7f73611087ba6e999b19e5d71eb" + ) + + +def test_deposit_missing_snapshot(storage): origin_url = "https://forge.softwareheritage.org/source/foobar" - indexer.storage.origin_add( + storage.origin_add( [ Origin( url=origin_url, ) ] ) - indexer.run([origin_url]) - assert indexer.results == [] + assert get_head_swhid(storage, origin_url) is None -def test_pypi(indexer): +def test_pypi(storage): origin_url = "https://pypi.org/project/limnoria/" - indexer.run([origin_url]) - - rev_id = b"\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8kA\x10\x9d\xc5\xfa2\xf8t" - assert indexer.results == [{"revision_id": rev_id, "origin_url": origin_url}] + assert get_head_swhid(storage, origin_url) == CoreSWHID.from_string( + "swh:1:rev:83b9b6c705b125d0fe6dd86b41109dc5fa32f874" + ) -def test_svn(indexer): +def test_svn(storage): origin_url = "http://0-512-md.googlecode.com/svn/" - indexer.run([origin_url]) - rev_id = b"\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8\xc9\xad#.\x1bw=\x18" - assert indexer.results == [ - { - "revision_id": rev_id, - "origin_url": origin_url, - } - ] + assert get_head_swhid(storage, origin_url) == CoreSWHID.from_string( + "swh:1:rev:e43f72e12c88abece79a87b8c9ad232e1b773d18" + )