diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2021 The Software Heritage developers +# Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -24,13 +24,14 @@ import warnings import sentry_sdk +from typing_extensions import TypedDict from swh.core import utils from swh.core.config import load_from_envvar, merge_configs from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage from swh.indexer.storage.interface import IndexerStorageInterface from swh.model import hashutil -from swh.model.model import Revision, Sha1Git +from swh.model.model import Origin, Revision, Sha1Git from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.scheduler import CONFIG as SWH_CONFIG @@ -38,6 +39,12 @@ from swh.storage.interface import StorageInterface +class ObjectsDict(TypedDict, total=False): + revision: List[Dict] + origin: List[Dict] + origin_visit_status: List[Dict] + + @contextmanager def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. @@ -526,9 +533,21 @@ DeprecationWarning, ) del kwargs["policy_update"] + + origins = [{"url": url} for url in origin_urls] + + return self.process_journal_objects({"origin": origins}) + + def process_journal_objects(self, objects: ObjectsDict) -> Dict: + """Worker function for ``JournalClient``. Expects ``objects`` to have a single + key, either ``origin`` or ``"origin_visit_status"``.""" + assert set(objects) == {"origin"} + + origins = [Origin(url=origin["url"]) for origin in objects.get("origin", [])] + summary: Dict[str, Any] = {"status": "uneventful"} try: - results = self.index_list(origin_urls, **kwargs) + results = self.index_list(origins) except Exception: if not self.catch_exceptions: raise @@ -544,13 +563,13 @@ summary.update(summary_persist) return summary - def index_list(self, origin_urls: List[str], **kwargs) -> List[TResult]: + def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: results = [] - for origin_url in origin_urls: + for origin in origins: try: - results.extend(self.index(origin_url, **kwargs)) + results.extend(self.index(origin.url, **kwargs)) except Exception: - self.log.exception("Problem when processing origin %s", origin_url) + self.log.exception("Problem when processing origin %s", origin.url) sentry_sdk.capture_exception() raise return results @@ -584,12 +603,11 @@ DeprecationWarning, ) del kwargs["policy_update"] - summary: Dict[str, Any] = {"status": "uneventful"} - results = [] revision_ids = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] + revisions = [] for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)): if not rev: # TODO: call self.index() with rev=None? @@ -597,15 +615,27 @@ "Revision %s not found in storage", hashutil.hash_to_hex(rev_id) ) continue + revisions.append(rev.to_dict()) + + return self.process_journal_objects({"revision": revisions}) + + def process_journal_objects(self, objects: ObjectsDict) -> Dict: + """Worker function for ``JournalClient``. Expects ``objects`` to have a single + key, ``"revision"``.""" + assert set(objects) == {"revision"} + + summary: Dict[str, Any] = {"status": "uneventful"} + results = [] + + for rev in objects["revision"]: try: - results.extend(self.index(rev_id, rev)) + results.extend(self.index(rev["id"], Revision.from_dict(rev))) except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when processing revision") sentry_sdk.capture_exception() summary["status"] = "failed" - return summary summary_persist = self.persist_index_computations(results) if summary_persist: diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -32,7 +32,7 @@ RevisionIntrinsicMetadataRow, ) from swh.model import hashutil -from swh.model.model import Revision, Sha1Git +from swh.model.model import Origin, Revision, Sha1Git REVISION_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 @@ -325,18 +325,21 @@ self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) def index_list( - self, origin_urls: List[str], **kwargs + self, origins: List[Origin], **kwargs ) -> List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]]: head_rev_ids = [] origins_with_head = [] - origins = list( + + # Filter out origins not in the storage + known_origins = list( call_with_batches( self.storage.origin_get, - origin_urls, + [origin.url for origin in origins], ORIGIN_GET_BATCH_SIZE, ) ) - for origin in origins: + + for origin in known_origins: if origin is None: continue head_results = self.origin_head_indexer.index(origin.url) @@ -368,6 +371,7 @@ indexer_configuration_id=rev_metadata.indexer_configuration_id, ) results.append((orig_metadata, rev_metadata)) + return results def persist_index_computations( diff --git a/swh/indexer/tests/test_indexer.py b/swh/indexer/tests/test_indexer.py --- a/swh/indexer/tests/test_indexer.py +++ b/swh/indexer/tests/test_indexer.py @@ -17,7 +17,7 @@ from swh.indexer.storage import PagedResult, Sha1 from swh.model.model import Content -from .utils import BASE_TEST_CONFIG +from .utils import BASE_TEST_CONFIG, REVISION class _TestException(Exception): @@ -89,7 +89,7 @@ def test_revision_indexer_catch_exceptions(): indexer = CrashingRevisionIndexer(config=BASE_TEST_CONFIG) indexer.storage = Mock() - indexer.storage.revision_get.return_value = ["rev"] + indexer.storage.revision_get.return_value = [REVISION] assert indexer.run([b"foo"]) == {"status": "failed"} diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py --- a/swh/indexer/tests/test_origin_metadata.py +++ b/swh/indexer/tests/test_origin_metadata.py @@ -252,5 +252,5 @@ ) -> None: indexer = OriginMetadataIndexer(config=swh_indexer_config) - result = indexer.index_list(["https://unknown.org/foo"]) + result = indexer.index_list([Origin("https://unknown.org/foo")]) assert not result