diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -1,14 +1,15 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import subprocess -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional +from swh.core.api.classes import stream_results from swh.core.config import merge_configs -from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult, Sha1 +from swh.indexer.storage.interface import IndexerStorageInterface, Sha1 from swh.indexer.storage.model import ContentLicenseRow from swh.model import hashutil @@ -167,19 +168,17 @@ def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None - ) -> PagedResult[Sha1]: + ) -> Iterable[Sha1]: """Retrieve indexed content id within the partition id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination - - Returns: - PagedResult of Sha1. If next_page_token is None, there is no more data - to fetch - """ - return self.idx_storage.content_fossology_license_get_partition( - self.tool["id"], partition_id, nb_partitions, page_token=page_token + return stream_results( + self.idx_storage.content_fossology_license_get_partition, + self.tool["id"], + partition_id, + nb_partitions, ) diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,12 +9,23 @@ import os import shutil import tempfile -from typing import Any, Dict, Generic, Iterator, List, Optional, Set, TypeVar, Union +from typing import ( + Any, + Dict, + Generic, + Iterable, + Iterator, + List, + Optional, + Set, + TypeVar, + Union, +) import warnings from swh.core import utils from swh.core.config import load_from_envvar, merge_configs -from swh.indexer.storage import INDEXER_CFG_KEY, PagedResult, Sha1, get_indexer_storage +from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage from swh.indexer.storage.interface import IndexerStorageInterface from swh.model import hashutil from swh.model.model import Revision, Sha1Git @@ -344,8 +355,8 @@ @abc.abstractmethod def indexed_contents_in_partition( - self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None - ) -> PagedResult[Sha1]: + self, partition_id: int, nb_partitions: int + ) -> Iterable[Sha1]: """Retrieve indexed contents within range [start, end]. Args: @@ -353,16 +364,12 @@ nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination - Returns: - PagedResult of Sha1. If next_page_token is None, there is no more data - to fetch - """ pass def _list_contents_to_index( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] - ) -> Iterator[Sha1]: + ) -> Iterable[Sha1]: """Compute from storage the new contents to index in the partition_id . The already indexed contents are skipped. @@ -429,19 +436,13 @@ indexing result as dict to persist in the indexer backend """ - next_page_token = None - contents = set() - while True: - indexed_page = self.indexed_contents_in_partition( - partition_id, nb_partitions, page_token=next_page_token - ) - for sha1 in indexed_page.results: - contents.add(sha1) - next_page_token = indexed_page.next_page_token - if next_page_token is None: - break + already_indexed_contents = set( + self.indexed_contents_in_partition(partition_id, nb_partitions) + ) - return self._index_contents(partition_id, nb_partitions, contents) + return self._index_contents( + partition_id, nb_partitions, already_indexed_contents + ) def run( self, diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,14 +1,15 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional import magic +from swh.core.api.classes import stream_results from swh.core.config import merge_configs -from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult, Sha1 +from swh.indexer.storage.interface import IndexerStorageInterface, Sha1 from swh.indexer.storage.model import ContentMimetypeRow from .indexer import ContentIndexer, ContentPartitionIndexer @@ -145,20 +146,18 @@ """ def indexed_contents_in_partition( - self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, - ) -> PagedResult[Sha1]: + self, partition_id: int, nb_partitions: int, + ) -> Iterable[Sha1]: """Retrieve indexed content ids within partition_id. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination - - Returns: - PagedResult of Sha1. If next_page_token is None, there is no more data - to fetch - """ - return self.idx_storage.content_mimetype_get_partition( - self.tool["id"], partition_id, nb_partitions, page_token=page_token + return stream_results( + self.idx_storage.content_mimetype_get_partition, + self.tool["id"], + partition_id, + nb_partitions, ) diff --git a/swh/indexer/tests/test_indexer.py b/swh/indexer/tests/test_indexer.py --- a/swh/indexer/tests/test_indexer.py +++ b/swh/indexer/tests/test_indexer.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional from unittest.mock import Mock import pytest @@ -36,8 +36,8 @@ return {} def indexed_contents_in_partition( - self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None - ) -> PagedResult[Sha1]: + self, partition_id: int, nb_partitions: int + ) -> Iterable[Sha1]: raise _TestException() @@ -64,14 +64,9 @@ return ["indexed " + id.decode()] def indexed_contents_in_partition( - self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None - ) -> PagedResult[Sha1]: - if page_token is None: - return PagedResult(results=[b"excluded hash"], next_page_token="not none") - elif page_token == "not none": - return PagedResult(results=[b"other excluded hash"], next_page_token=None) - else: - assert False, page_token + self, partition_id: int, nb_partitions: int + ) -> Iterable[Sha1]: + return iter([b"excluded hash", b"other excluded hash"]) def persist_index_computations(self, results: List[str]) -> Dict[str, int]: self._results.append(results) # type: ignore