diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,6 +1,6 @@ -swh.core[db,http] >= 0.0.87 +swh.core[db,http] >= 0.2.2 swh.model >= 0.0.15 swh.objstorage >= 0.0.43 swh.scheduler >= 0.0.47 -swh.storage >= 0.8.0 +swh.storage >= 0.12.0 swh.journal >= 0.1.0 diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -6,11 +6,12 @@ import logging import subprocess -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from swh.model import hashutil -from .indexer import ContentIndexer, ContentRangeIndexer, write_to_temp +from .indexer import ContentIndexer, ContentPartitionIndexer, write_to_temp +from swh.indexer.storage.interface import PagedResult, Sha1 logger = logging.getLogger(__name__) @@ -56,7 +57,7 @@ """Mixin fossology license indexer. See :class:`FossologyLicenseIndexer` and - :class:`FossologyLicenseRangeIndexer` + :class:`FossologyLicensePartitionIndexer` """ @@ -82,7 +83,7 @@ self.working_directory = self.config["workdir"] def index( - self, id: bytes, data: Optional[bytes] = None, **kwargs + self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs ) -> Dict[str, Any]: """Index sha1s' content and store result. @@ -153,33 +154,39 @@ ) -class FossologyLicenseRangeIndexer(MixinFossologyLicenseIndexer, ContentRangeIndexer): - """FossologyLicense Range Indexer working on range of content identifiers. +class FossologyLicensePartitionIndexer( + MixinFossologyLicenseIndexer, ContentPartitionIndexer +): + """FossologyLicense Range Indexer working on range/partition of content identifiers. - filters out the non textual content - (optionally) filters out content already indexed (cf - :meth:`.indexed_contents_in_range`) + :meth:`.indexed_contents_in_partition`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ - def indexed_contents_in_range(self, start, end): - """Retrieve indexed content id within range [start, end]. + def indexed_contents_in_partition( + self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None + ) -> PagedResult[Sha1]: + """Retrieve indexed content id within the partition id Args: - start (bytes): Starting bound from range identifier - end (bytes): End range identifier + partition_id: Index of the partition to fetch + nb_partitions: Total number of partitions to split into + page_token: opaque token used for pagination Returns: - dict: a dict with keys: - - - **ids** [bytes]: iterable of content ids within the range. - - **next** (Optional[bytes]): The next range of sha1 starts at - this sha1 if any + PagedResult of Sha1. If next_page_token is None, there is no more data + to fetch """ - return self.idx_storage.content_fossology_license_get_range( - start, end, self.tool["id"] + return self.idx_storage.content_fossology_license_get_partition( + self.tool["id"], partition_id, nb_partitions, page_token=page_token ) + + +# alias for retrocompatibility +FossologyLicenseRangeIndexer = FossologyLicensePartitionIndexer diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -18,7 +18,7 @@ from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError -from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY +from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY, PagedResult, Sha1 from swh.model import hashutil from swh.core import utils @@ -233,12 +233,12 @@ return [] def index( - self, id: bytes, data: Optional[bytes] = None, **kwargs + self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs ) -> Dict[str, Any]: """Index computation for the id and associated raw data. Args: - id: identifier + id: identifier or Dict object data: id's data from storage or objstorage depending on object type @@ -282,7 +282,7 @@ class ContentIndexer(BaseIndexer): """A content indexer working on a list of ids directly. - To work on indexer range, use the :class:`ContentRangeIndexer` + To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To @@ -343,64 +343,76 @@ return summary -class ContentRangeIndexer(BaseIndexer): +class ContentPartitionIndexer(BaseIndexer): """A content range indexer. - This expects as input a range of ids to index. + This expects as input a partition_id and a nb_partitions. This will then index the + contents within that partition. To work on a list of ids, use the :class:`ContentIndexer` instead. - Note: :class:`ContentRangeIndexer` is not an instantiable + Note: :class:`ContentPartitionIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ @abc.abstractmethod - def indexed_contents_in_range(self, start: bytes, end: bytes) -> Any: + def indexed_contents_in_partition( + self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None + ) -> PagedResult[Sha1]: """Retrieve indexed contents within range [start, end]. Args: - start: Starting bound from range identifier - end: End range identifier + partition_id: Index of the partition to fetch + nb_partitions: Total number of partitions to split into + page_token: opaque token used for pagination - Yields: - bytes: Content identifier present in the range ``[start, end]`` + Returns: + PagedResult of Sha1. If next_page_token is None, there is no more data + to fetch """ pass def _list_contents_to_index( - self, start: bytes, end: bytes, indexed: Set[bytes] - ) -> Iterator[bytes]: - """Compute from storage the new contents to index in the range [start, - end]. The already indexed contents are skipped. + self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] + ) -> Iterator[Sha1]: + """Compute from storage the new contents to index in the partition_id . The already + indexed contents are skipped. Args: - start: Starting bound from range identifier - end: End range identifier + partition_id: Index of the partition to fetch data from + nb_partitions: Total number of partition indexed: Set of content already indexed. Yields: - bytes: Identifier of contents to index. + Sha1 id (bytes) of contents to index """ - if not isinstance(start, bytes) or not isinstance(end, bytes): - raise TypeError("identifiers must be bytes, not %r and %r." % (start, end)) - while start: - result = self.storage.content_get_range(start, end) - contents = result["contents"] + if not isinstance(partition_id, int) or not isinstance(nb_partitions, int): + raise TypeError( + f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}." + ) + next_page_token = None + while True: + result = self.storage.content_get_partition( + partition_id, nb_partitions, page_token=next_page_token + ) + contents = result.results for c in contents: - _id = hashutil.hash_to_bytes(c["sha1"]) + _id = hashutil.hash_to_bytes(c.sha1) if _id in indexed: continue yield _id - start = result["next"] + next_page_token = result.next_page_token + if next_page_token is None: + break def _index_contents( - self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any + self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any ) -> Iterator[Dict]: - """Index the contents from within range [start, end] + """Index the contents within the partition_id. Args: start: Starting bound from range identifier @@ -408,16 +420,14 @@ indexed: Set of content already indexed. Yields: - dict: Data indexed to persist using the indexer storage + indexing result as dict to persist in the indexer backend """ - for sha1 in self._list_contents_to_index(start, end, indexed): + for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: - self.log.warning( - "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1) - ) + self.log.warning(f"Content {sha1.hex()} not found in objstorage") continue res = self.index(sha1, raw_content, **kwargs) if res: @@ -429,62 +439,64 @@ yield res def _index_with_skipping_already_done( - self, start: bytes, end: bytes + self, partition_id: int, nb_partitions: int ) -> Iterator[Dict]: - """Index not already indexed contents in range [start, end]. + """Index not already indexed contents within the partition partition_id Args: - start: Starting range identifier - end: Ending range identifier + partition_id: Index of the partition to fetch + nb_partitions: Total number of partitions to split into Yields: - dict: Content identifier present in the range - ``[start, end]`` which are not already indexed. + indexing result as dict to persist in the indexer backend """ - while start: - indexed_page = self.indexed_contents_in_range(start, end) - contents = indexed_page["ids"] - _end = contents[-1] if contents else end - yield from self._index_contents(start, _end, contents) - start = indexed_page["next"] + next_page_token = None + contents = set() + while True: + indexed_page = self.indexed_contents_in_partition( + partition_id, nb_partitions, page_token=next_page_token + ) + for sha1 in indexed_page.results: + contents.add(sha1) + yield from self._index_contents(partition_id, nb_partitions, contents) + next_page_token = indexed_page.next_page_token + if next_page_token is None: + break def run( self, - start: Union[bytes, str], - end: Union[bytes, str], + partition_id: int, + nb_partitions: int, skip_existing: bool = True, - **kwargs + **kwargs, ) -> Dict: - """Given a range of content ids, compute the indexing computations on - the contents within. Either the indexer is incremental - (filter out existing computed data) or not (compute - everything from scratch). + """Given a partition of content ids, index the contents within. + + Either the indexer is incremental (filter out existing computed data) or it + computes everything from scratch. Args: - start: Starting range identifier - end: Ending range identifier + partition_id: Index of the partition to fetch + nb_partitions: Total number of partitions to split into skip_existing: Skip existing indexed data - (default) or not + (default) or not **kwargs: passed to the `index` method Returns: - A dict with the task's status + dict with the indexing task status """ status = "uneventful" summary: Dict[str, Any] = {} count = 0 try: - range_start = ( - hashutil.hash_to_bytes(start) if isinstance(start, str) else start - ) - range_end = hashutil.hash_to_bytes(end) if isinstance(end, str) else end - if skip_existing: - gen = self._index_with_skipping_already_done(range_start, range_end) + gen = self._index_with_skipping_already_done( + partition_id, nb_partitions + ) else: - gen = self._index_contents(range_start, range_end, indexed=set([])) + gen = self._index_contents(partition_id, nb_partitions, indexed=set([])) count_object_added_key: Optional[str] = None @@ -509,6 +521,10 @@ return summary +# alias for retrocompatibility +ContentRangeIndexer = ContentPartitionIndexer + + class OriginIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method @@ -590,11 +606,11 @@ summary: Dict[str, Any] = {} status = "uneventful" results = [] - revs = self.storage.revision_get( - hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids - ) - for rev in revs: + revision_ids = [ + hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids + ] + for rev in self.storage.revision_get(revision_ids): if not rev: self.log.warning( "Revisions %s not found in storage" diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -3,10 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Optional, Dict, Any, List import magic -from .indexer import ContentIndexer, ContentRangeIndexer +from typing import Any, Optional, Dict, List, Union + +from swh.indexer.storage.interface import PagedResult, Sha1 + +from .indexer import ContentIndexer, ContentPartitionIndexer if not hasattr(magic.Magic, "from_buffer"): raise ImportError( @@ -40,7 +43,7 @@ class MixinMimetypeIndexer: """Mixin mimetype indexer. - See :class:`MimetypeIndexer` and :class:`MimetypeRangeIndexer` + See :class:`MimetypeIndexer` and :class:`MimetypePartitionIndexer` """ @@ -61,7 +64,7 @@ CONFIG_BASE_FILENAME = "indexer/mimetype" # type: Optional[str] def index( - self, id: bytes, data: Optional[bytes] = None, **kwargs + self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs ) -> Dict[str, Any]: """Index sha1s' content and store result. @@ -79,6 +82,7 @@ """ assert data is not None properties = compute_mimetype_encoding(data) + assert isinstance(id, bytes) properties.update( {"id": id, "indexer_configuration_id": self.tool["id"],} ) @@ -124,34 +128,38 @@ ) -class MimetypeRangeIndexer(MixinMimetypeIndexer, ContentRangeIndexer): +class MimetypePartitionIndexer(MixinMimetypeIndexer, ContentPartitionIndexer): """Mimetype Range Indexer working on range of content identifiers. It: - (optionally) filters out content already indexed (cf - :meth:`.indexed_contents_in_range`) + :meth:`.indexed_contents_in_partition`) - reads content from objstorage per the content's id (sha1) - computes {mimetype, encoding} from that content - stores result in storage """ - def indexed_contents_in_range( - self, start: bytes, end: bytes - ) -> Dict[str, Optional[bytes]]: - """Retrieve indexed content id within range [start, end]. + def indexed_contents_in_partition( + self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, + ) -> PagedResult[Sha1]: + """Retrieve indexed content ids within partition_id. Args: - start: Starting bound from range identifier - end: End range identifier + partition_id: Index of the partition to fetch + nb_partitions: Total number of partitions to split into + page_token: opaque token used for pagination Returns: - dict: a dict with keys: - - - ids: iterable of content ids within the range. - - next: The next range of sha1 starts at - this sha1 if any + PagedResult of Sha1. If next_page_token is None, there is no more data + to fetch """ - return self.idx_storage.content_mimetype_get_range(start, end, self.tool["id"]) + return self.idx_storage.content_mimetype_get_partition( + self.tool["id"], partition_id, nb_partitions, page_token=page_token + ) + + +# alias for retrocompatibility +MimetypeRangeIndexer = MimetypePartitionIndexer diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -173,11 +173,11 @@ groups: Dict[str, List[Any]] = defaultdict(list) for content, keys_to_update in data: - keys = ",".join(keys_to_update) - groups[keys].append(content) + keys_str = ",".join(keys_to_update) + groups[keys_str].append(content) for keys_to_update, contents in groups.items(): - keys = keys_to_update.split(",") + keys: List[str] = keys_to_update.split(",") try: self.storage.content_update(contents, keys=keys) count += len(contents) diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -9,11 +9,16 @@ import psycopg2.pool from collections import defaultdict, Counter -from typing import Dict, List +from typing import Dict, List, Optional +from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.model import SHA1_SIZE from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError +from swh.storage.utils import get_partition_bounds_bytes + +from .interface import PagedResult, Sha1 from . import converters from .db import Db from .exc import IndexerStorageArgumentException, DuplicateId @@ -135,30 +140,60 @@ for obj in db.content_mimetype_missing_from_list(mimetypes, cur): yield obj[0] - def _content_get_range( + @timed + @db_transaction() + def get_partition( self, - content_type, - start, - end, - indexer_configuration_id, - limit=1000, + indexer_type: str, + indexer_configuration_id: int, + partition_id: int, + nb_partitions: int, + page_token: Optional[str] = None, + limit: int = 1000, with_textual_data=False, db=None, cur=None, - ): + ) -> PagedResult[Sha1]: + """Retrieve ids of content with `indexer_type` within within partition partition_id + bound by limit. + + Args: + **indexer_type**: Type of data content to index (mimetype, language, etc...) + **indexer_configuration_id**: The tool used to index data + **partition_id**: index of the partition to fetch + **nb_partitions**: total number of partitions to split into + **page_token**: opaque token used for pagination + **limit**: Limit result (default to 1000) + **with_textual_data** (bool): Deal with only textual content (True) or all + content (all contents by defaults, False) + + Raises: + IndexerStorageArgumentException for; + - limit to None + - wrong indexer_type provided + + Returns: + PagedResult of Sha1. If next_page_token is None, there is no more data to + fetch + + """ if limit is None: raise IndexerStorageArgumentException("limit should not be None") - if content_type not in db.content_indexer_names: - err = "Wrong type. Should be one of [%s]" % ( - ",".join(db.content_indexer_names) - ) + if indexer_type not in db.content_indexer_names: + err = f"Wrong type. Should be one of [{','.join(db.content_indexer_names)}]" raise IndexerStorageArgumentException(err) - ids = [] - next_id = None - for counter, obj in enumerate( - db.content_get_range( - content_type, + start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE) + if page_token is not None: + start = hash_to_bytes(page_token) + if end is None: + end = b"\xff" * SHA1_SIZE + + next_page_token: Optional[str] = None + ids = [ + row[0] + for row in db.content_get_range( + indexer_type, start, end, indexer_configuration_id, @@ -166,26 +201,33 @@ with_textual_data=with_textual_data, cur=cur, ) - ): - _id = obj[0] - if counter >= limit: - next_id = _id - break + ] - ids.append(_id) + if len(ids) >= limit: + next_page_token = hash_to_hex(ids[-1]) + ids = ids[:limit] - return {"ids": ids, "next": next_id} + assert len(ids) <= limit + return PagedResult(results=ids, next_page_token=next_page_token) @timed @db_transaction() - def content_mimetype_get_range( - self, start, end, indexer_configuration_id, limit=1000, db=None, cur=None - ): - return self._content_get_range( + def content_mimetype_get_partition( + self, + indexer_configuration_id: int, + partition_id: int, + nb_partitions: int, + page_token: Optional[str] = None, + limit: int = 1000, + db=None, + cur=None, + ) -> PagedResult[Sha1]: + return self.get_partition( "mimetype", - start, - end, indexer_configuration_id, + partition_id, + nb_partitions, + page_token=page_token, limit=limit, db=db, cur=cur, @@ -349,14 +391,22 @@ @timed @db_transaction() - def content_fossology_license_get_range( - self, start, end, indexer_configuration_id, limit=1000, db=None, cur=None - ): - return self._content_get_range( + def content_fossology_license_get_partition( + self, + indexer_configuration_id: int, + partition_id: int, + nb_partitions: int, + page_token: Optional[str] = None, + limit: int = 1000, + db=None, + cur=None, + ) -> PagedResult[Sha1]: + return self.get_partition( "fossology_license", - start, - end, indexer_configuration_id, + partition_id, + nb_partitions, + page_token=page_token, limit=limit, with_textual_data=True, db=db, diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -3,18 +3,25 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import bisect -from collections import defaultdict, Counter import itertools import json import operator import math import re -from typing import Any, Dict, List + +from collections import defaultdict, Counter +from typing import Any, Dict, List, Optional + +from swh.model.model import SHA1_SIZE +from swh.model.hashutil import hash_to_hex, hash_to_bytes +from swh.storage.utils import get_partition_bounds_bytes +from swh.storage.in_memory import SortedList from . import MAPPING_NAMES, check_id_duplicates from .exc import IndexerStorageArgumentException +from .interface import PagedResult, Sha1 + SHA1_DIGEST_SIZE = 160 @@ -38,7 +45,7 @@ def __init__(self, tools): self._tools = tools - self._sorted_ids = [] + self._sorted_ids = SortedList[bytes, bytes]() self._data = {} # map (id_, tool_id) -> metadata_dict self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id] @@ -88,41 +95,61 @@ def get_all(self): yield from self.get(self._sorted_ids) - def get_range(self, start, end, indexer_configuration_id, limit): - """Retrieve data within range [start, end] bound by limit. + def get_partition( + self, + indexer_configuration_id: int, + partition_id: int, + nb_partitions: int, + page_token: Optional[str] = None, + limit: int = 1000, + ) -> PagedResult[Sha1]: + """Retrieve ids of content with `indexer_type` within partition partition_id + bound by limit. Args: - **start** (bytes): Starting identifier range (expected smaller - than end) - **end** (bytes): Ending identifier range (expected larger - than start) - **indexer_configuration_id** (int): The tool used to index data - **limit** (int): Limit result + **indexer_type**: Type of data content to index (mimetype, language, etc...) + **indexer_configuration_id**: The tool used to index data + **partition_id**: index of the partition to fetch + **nb_partitions**: total number of partitions to split into + **page_token**: opaque token used for pagination + **limit**: Limit result (default to 1000) + **with_textual_data** (bool): Deal with only textual content (True) or all + content (all contents by defaults, False) Raises: - IndexerStorageArgumentException for limit to None + IndexerStorageArgumentException for; + - limit to None + - wrong indexer_type provided Returns: - a dict with keys: - - **ids** [bytes]: iterable of content ids within the range. - - **next** (Optional[bytes]): The next range of sha1 starts at - this sha1 if any + PagedResult of Sha1. If next_page_token is None, there is no more data to + fetch """ if limit is None: raise IndexerStorageArgumentException("limit should not be None") - from_index = bisect.bisect_left(self._sorted_ids, start) - to_index = bisect.bisect_right(self._sorted_ids, end, lo=from_index) - if to_index - from_index >= limit: - return { - "ids": self._sorted_ids[from_index : from_index + limit], - "next": self._sorted_ids[from_index + limit], - } - else: - return { - "ids": self._sorted_ids[from_index:to_index], - "next": None, - } + (start, end) = get_partition_bounds_bytes( + partition_id, nb_partitions, SHA1_SIZE + ) + + if page_token: + start = hash_to_bytes(page_token) + if end is None: + end = b"\xff" * SHA1_SIZE + + next_page_token: Optional[str] = None + ids: List[Sha1] = [] + sha1s = (sha1 for sha1 in self._sorted_ids.iter_from(start)) + for counter, sha1 in enumerate(sha1s): + if sha1 > end: + break + if counter >= limit: + next_page_token = hash_to_hex(sha1) + break + ids.append(sha1) + + assert len(ids) <= limit + return PagedResult(results=ids, next_page_token=next_page_token) def add(self, data: List[Dict], conflict_update: bool) -> int: """Add data not present in storage. @@ -155,7 +182,7 @@ self._tools_per_id[id_].add(tool_id) count += 1 if id_ not in self._sorted_ids: - bisect.insort(self._sorted_ids, id_) + self._sorted_ids.add(id_) return count def add_merge( @@ -190,7 +217,7 @@ conflict_update=True, ) if id_ not in self._sorted_ids: - bisect.insort(self._sorted_ids, id_) + self._sorted_ids.add(id_) return added def delete(self, entries: List[Dict]) -> int: @@ -228,10 +255,17 @@ def content_mimetype_missing(self, mimetypes): yield from self._mimetypes.missing(mimetypes) - def content_mimetype_get_range( - self, start, end, indexer_configuration_id, limit=1000 - ): - return self._mimetypes.get_range(start, end, indexer_configuration_id, limit) + def content_mimetype_get_partition( + self, + indexer_configuration_id: int, + partition_id: int, + nb_partitions: int, + page_token: Optional[str] = None, + limit: int = 1000, + ) -> PagedResult[Sha1]: + return self._mimetypes.get_partition( + indexer_configuration_id, partition_id, nb_partitions, page_token, limit + ) def content_mimetype_add( self, mimetypes: List[Dict], conflict_update: bool = False @@ -306,10 +340,17 @@ added = self._licenses.add_merge(licenses, conflict_update, "licenses") return {"fossology_license_add:add": added} - def content_fossology_license_get_range( - self, start, end, indexer_configuration_id, limit=1000 - ): - return self._licenses.get_range(start, end, indexer_configuration_id, limit) + def content_fossology_license_get_partition( + self, + indexer_configuration_id: int, + partition_id: int, + nb_partitions: int, + page_token: Optional[str] = None, + limit: int = 1000, + ) -> PagedResult[Sha1]: + return self._licenses.get_partition( + indexer_configuration_id, partition_id, nb_partitions, page_token, limit + ) def content_metadata_missing(self, metadata): yield from self._content_metadata.missing(metadata) diff --git a/swh/indexer/storage/interface.py b/swh/indexer/storage/interface.py --- a/swh/indexer/storage/interface.py +++ b/swh/indexer/storage/interface.py @@ -3,9 +3,17 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, List +from typing import Dict, List, Optional, TypeVar from swh.core.api import remote_api_endpoint +from swh.core.api.classes import PagedResult as CorePagedResult + + +TResult = TypeVar("TResult") +PagedResult = CorePagedResult[TResult, str] + + +Sha1 = bytes class IndexerStorageInterface: @@ -31,67 +39,32 @@ """ ... - def _content_get_range( + @remote_api_endpoint("content_mimetype/range") + def content_mimetype_get_partition( self, - content_type, - start, - end, - indexer_configuration_id, - limit=1000, - with_textual_data=False, - ): - """Retrieve ids of type content_type within range [start, end] bound - by limit. + indexer_configuration_id: int, + partition_id: int, + nb_partitions: int, + page_token: Optional[str] = None, + limit: int = 1000, + ) -> PagedResult[Sha1]: + """Retrieve mimetypes within partition partition_id bound by limit. Args: - **content_type** (str): content's type (mimetype, language, etc...) - **start** (bytes): Starting identifier range (expected smaller - than end) - **end** (bytes): Ending identifier range (expected larger - than start) - **indexer_configuration_id** (int): The tool used to index data - **limit** (int): Limit result (default to 1000) - **with_textual_data** (bool): Deal with only textual - content (True) or all - content (all contents by - defaults, False) + **indexer_configuration_id**: The tool used to index data + **partition_id**: index of the partition to fetch + **nb_partitions**: total number of partitions to split into + **page_token**: opaque token used for pagination + **limit**: Limit result (default to 1000) Raises: - ValueError for; + IndexerStorageArgumentException for; - limit to None - - wrong content_type provided - - Returns: - a dict with keys: - - **ids** [bytes]: iterable of content ids within the range. - - **next** (Optional[bytes]): The next range of sha1 starts at - this sha1 if any - - """ - ... - - @remote_api_endpoint("content_mimetype/range") - def content_mimetype_get_range( - self, start, end, indexer_configuration_id, limit=1000 - ): - """Retrieve mimetypes within range [start, end] bound by limit. - - Args: - **start** (bytes): Starting identifier range (expected smaller - than end) - **end** (bytes): Ending identifier range (expected larger - than start) - **indexer_configuration_id** (int): The tool used to index data - **limit** (int): Limit result (default to 1000) - - Raises: - ValueError for limit to None + - wrong indexer_type provided Returns: - a dict with keys: - - **ids** [bytes]: iterable of content ids within the range. - - **next** (Optional[bytes]): The next range of sha1 starts at - this sha1 if any + PagedResult of Sha1. If next_page_token is None, there is no more data + to fetch """ ... @@ -307,27 +280,30 @@ ... @remote_api_endpoint("content/fossology_license/range") - def content_fossology_license_get_range( - self, start, end, indexer_configuration_id, limit=1000 - ): - """Retrieve licenses within range [start, end] bound by limit. + def content_fossology_license_get_partition( + self, + indexer_configuration_id: int, + partition_id: int, + nb_partitions: int, + page_token: Optional[str] = None, + limit: int = 1000, + ) -> PagedResult[Sha1]: + """Retrieve licenses within the partition partition_id bound by limit. Args: - **start** (bytes): Starting identifier range (expected smaller - than end) - **end** (bytes): Ending identifier range (expected larger - than start) - **indexer_configuration_id** (int): The tool used to index data - **limit** (int): Limit result (default to 1000) + **indexer_configuration_id**: The tool used to index data + **partition_id**: index of the partition to fetch + **nb_partitions**: total number of partitions to split into + **page_token**: opaque token used for pagination + **limit**: Limit result (default to 1000) Raises: - ValueError for limit to None + IndexerStorageArgumentException for; + - limit to None + - wrong indexer_type provided - Returns: - a dict with keys: - - **ids** [bytes]: iterable of content ids within the range. - - **next** (Optional[bytes]): The next range of sha1 starts at - this sha1 if any + Returns: PagedResult of Sha1. If next_page_token is None, there is no more data + to fetch """ ... diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -6,9 +6,9 @@ from celery import current_app as app -from .mimetype import MimetypeIndexer, MimetypeRangeIndexer +from .mimetype import MimetypeIndexer, MimetypePartitionIndexer from .ctags import CtagsIndexer -from .fossology_license import FossologyLicenseIndexer, FossologyLicenseRangeIndexer +from .fossology_license import FossologyLicenseIndexer, FossologyLicensePartitionIndexer from .rehash import RecomputeChecksums from .metadata import OriginMetadataIndexer @@ -40,9 +40,9 @@ @app.task(name=__name__ + ".ContentRangeMimetype") def range_mimetype(*args, **kwargs): - return MimetypeRangeIndexer().run(*args, **kwargs) + return MimetypePartitionIndexer().run(*args, **kwargs) @app.task(name=__name__ + ".ContentRangeFossologyLicense") def range_license(*args, **kwargs): - return FossologyLicenseRangeIndexer().run(*args, **kwargs) + return FossologyLicensePartitionIndexer().run(*args, **kwargs) diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import inspect +import math import threading from typing import Dict @@ -28,7 +29,7 @@ mimetypes.append( { "id": c["id"], - "mimetype": "text/plain", + "mimetype": "text/plain", # for filtering on textual data to work "encoding": "utf-8", "indexer_configuration_id": c["indexer_configuration_id"], } @@ -363,85 +364,126 @@ {"mimetype": "text/html", "encoding": "us-ascii",}, ] - def test_generate_content_mimetype_get_range_limit_none(self, swh_indexer_storage): - """mimetype_get_range call with wrong limit input should fail""" + def test_generate_content_mimetype_get_partition_failure(self, swh_indexer_storage): + """get_partition call with wrong limit input should fail""" storage = swh_indexer_storage - with pytest.raises(IndexerStorageArgumentException) as e: - storage.content_mimetype_get_range( - start=None, end=None, indexer_configuration_id=None, limit=None + indexer_configuration_id = None + with pytest.raises( + IndexerStorageArgumentException, match="limit should not be None" + ): + storage.content_mimetype_get_partition( + indexer_configuration_id, 0, 3, limit=None ) - assert e.value.args == ("limit should not be None",) - - def test_generate_content_mimetype_get_range_no_limit( + def test_generate_content_mimetype_get_partition_no_limit( self, swh_indexer_storage_with_data ): - """mimetype_get_range returns mimetypes within range provided""" + """get_partition should return result""" storage, data = swh_indexer_storage_with_data mimetypes = data.mimetypes - # All ids from the db - content_ids = sorted([c["id"] for c in mimetypes]) - - start = content_ids[0] - end = content_ids[-1] + expected_ids = set([c["id"] for c in mimetypes]) + indexer_configuration_id = mimetypes[0]["indexer_configuration_id"] - # retrieve mimetypes - tool_id = mimetypes[0]["indexer_configuration_id"] - actual_result = storage.content_mimetype_get_range( - start, end, indexer_configuration_id=tool_id - ) + assert len(mimetypes) == 16 + nb_partitions = 16 - actual_ids = actual_result["ids"] - actual_next = actual_result["next"] + actual_ids = [] + for partition_id in range(nb_partitions): + actual_result = storage.content_mimetype_get_partition( + indexer_configuration_id, partition_id, nb_partitions + ) + assert actual_result.next_page_token is None + actual_ids.extend(actual_result.results) - assert len(mimetypes) == len(actual_ids) - assert actual_next is None - assert content_ids == actual_ids + assert len(actual_ids) == len(expected_ids) + for actual_id in actual_ids: + assert actual_id in expected_ids - def test_generate_content_mimetype_get_range_limit( + def test_generate_content_mimetype_get_partition_full( self, swh_indexer_storage_with_data ): - """mimetype_get_range paginates results if limit exceeded""" - storage, data = swh_indexer_storage_with_data + """get_partition for a single partition should return available ids - indexer_configuration_id = data.tools["file"]["id"] - - # input the list of sha1s we want from storage - content_ids = sorted([c["id"] for c in data.mimetypes]) - mimetypes = list(storage.content_mimetype_get(content_ids)) - assert len(mimetypes) == len(data.mimetypes) + """ + storage, data = swh_indexer_storage_with_data + mimetypes = data.mimetypes + expected_ids = set([c["id"] for c in mimetypes]) + indexer_configuration_id = mimetypes[0]["indexer_configuration_id"] - start = content_ids[0] - end = content_ids[-1] - # retrieve mimetypes limited to 10 results - actual_result = storage.content_mimetype_get_range( - start, end, indexer_configuration_id=indexer_configuration_id, limit=10 + actual_result = storage.content_mimetype_get_partition( + indexer_configuration_id, 0, 1 ) + assert actual_result.next_page_token is None + actual_ids = actual_result.results + assert len(actual_ids) == len(expected_ids) + for actual_id in actual_ids: + assert actual_id in expected_ids - assert actual_result - assert set(actual_result.keys()) == {"ids", "next"} - actual_ids = actual_result["ids"] - actual_next = actual_result["next"] + def test_generate_content_mimetype_get_partition_empty( + self, swh_indexer_storage_with_data + ): + """get_partition when at least one of the partitions is empty""" + storage, data = swh_indexer_storage_with_data + mimetypes = data.mimetypes + expected_ids = set([c["id"] for c in mimetypes]) + indexer_configuration_id = mimetypes[0]["indexer_configuration_id"] + + # nb_partitions = smallest power of 2 such that at least one of + # the partitions is empty + nb_mimetypes = len(mimetypes) + nb_partitions = 1 << math.floor(math.log2(nb_mimetypes) + 1) + + seen_ids = [] + + for partition_id in range(nb_partitions): + actual_result = storage.content_mimetype_get_partition( + indexer_configuration_id, + partition_id, + nb_partitions, + limit=nb_mimetypes + 1, + ) - assert len(actual_ids) == 10 - assert actual_next is not None - assert actual_next == content_ids[10] + for actual_id in actual_result.results: + seen_ids.append(actual_id) - expected_mimetypes = content_ids[:10] - assert expected_mimetypes == actual_ids + # Limit is higher than the max number of results + assert actual_result.next_page_token is None - # retrieve next part - actual_result = storage.content_mimetype_get_range( - start=end, end=end, indexer_configuration_id=indexer_configuration_id - ) - assert set(actual_result.keys()) == {"ids", "next"} - actual_ids = actual_result["ids"] - actual_next = actual_result["next"] + assert set(seen_ids) == expected_ids - assert actual_next is None - expected_mimetypes = [content_ids[-1]] - assert expected_mimetypes == actual_ids + def test_generate_content_mimetype_get_partition_with_pagination( + self, swh_indexer_storage_with_data + ): + """get_partition should return ids provided with pagination + + """ + storage, data = swh_indexer_storage_with_data + mimetypes = data.mimetypes + expected_ids = set([c["id"] for c in mimetypes]) + indexer_configuration_id = mimetypes[0]["indexer_configuration_id"] + + nb_partitions = 4 + + actual_ids = [] + for partition_id in range(nb_partitions): + next_page_token = None + while True: + actual_result = storage.content_mimetype_get_partition( + indexer_configuration_id, + partition_id, + nb_partitions, + limit=2, + page_token=next_page_token, + ) + actual_ids.extend(actual_result.results) + next_page_token = actual_result.next_page_token + if next_page_token is None: + break + + assert len(set(actual_ids)) == len(set(expected_ids)) + for actual_id in actual_ids: + assert actual_id in expected_ids class TestIndexerStorageContentLanguage(StorageETypeTester): @@ -907,135 +949,161 @@ # license did not change as the v2 was dropped. assert actual_licenses == [expected_license] - def test_generate_content_fossology_license_get_range_limit_none( + def test_generate_content_fossology_license_get_partition_failure( self, swh_indexer_storage_with_data ): + """get_partition call with wrong limit input should fail""" storage, data = swh_indexer_storage_with_data - """license_get_range call with wrong limit input should fail""" - with pytest.raises(IndexerStorageArgumentException) as e: - storage.content_fossology_license_get_range( - start=None, end=None, indexer_configuration_id=None, limit=None + indexer_configuration_id = None + with pytest.raises( + IndexerStorageArgumentException, match="limit should not be None" + ): + storage.content_fossology_license_get_partition( + indexer_configuration_id, 0, 3, limit=None, ) - assert e.value.args == ("limit should not be None",) - - def test_generate_content_fossology_license_get_range_no_limit( + def test_generate_content_fossology_license_get_partition_no_limit( self, swh_indexer_storage_with_data ): - """license_get_range returns licenses within range provided""" + """get_partition should return results""" storage, data = swh_indexer_storage_with_data # craft some consistent mimetypes fossology_licenses = data.fossology_licenses mimetypes = prepare_mimetypes_from(fossology_licenses) + indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"] storage.content_mimetype_add(mimetypes, conflict_update=True) # add fossology_licenses to storage storage.content_fossology_license_add(fossology_licenses) # All ids from the db - content_ids = sorted([c["id"] for c in fossology_licenses]) + expected_ids = set([c["id"] for c in fossology_licenses]) - start = content_ids[0] - end = content_ids[-1] + assert len(fossology_licenses) == 10 + assert len(mimetypes) == 10 + nb_partitions = 4 - # retrieve fossology_licenses - tool_id = fossology_licenses[0]["indexer_configuration_id"] - actual_result = storage.content_fossology_license_get_range( - start, end, indexer_configuration_id=tool_id - ) + actual_ids = [] + for partition_id in range(nb_partitions): - actual_ids = actual_result["ids"] - actual_next = actual_result["next"] + actual_result = storage.content_fossology_license_get_partition( + indexer_configuration_id, partition_id, nb_partitions + ) + assert actual_result.next_page_token is None + actual_ids.extend(actual_result.results) - assert len(fossology_licenses) == len(actual_ids) - assert actual_next is None - assert content_ids == actual_ids + assert len(set(actual_ids)) == len(expected_ids) + for actual_id in actual_ids: + assert actual_id in expected_ids - def test_generate_content_fossology_license_get_range_no_limit_with_filter( + def test_generate_content_fossology_license_get_partition_full( self, swh_indexer_storage_with_data ): - """This filters non textual, then returns results within range""" - storage, data = swh_indexer_storage_with_data - fossology_licenses = data.fossology_licenses - mimetypes = data.mimetypes + """get_partition for a single partition should return available ids + """ + storage, data = swh_indexer_storage_with_data # craft some consistent mimetypes - _mimetypes = prepare_mimetypes_from(fossology_licenses) - # add binary mimetypes which will get filtered out in results - for m in mimetypes: - _mimetypes.append( - {"mimetype": "binary", **m,} - ) + fossology_licenses = data.fossology_licenses + mimetypes = prepare_mimetypes_from(fossology_licenses) + indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"] - storage.content_mimetype_add(_mimetypes, conflict_update=True) + storage.content_mimetype_add(mimetypes, conflict_update=True) # add fossology_licenses to storage storage.content_fossology_license_add(fossology_licenses) # All ids from the db - content_ids = sorted([c["id"] for c in fossology_licenses]) + expected_ids = set([c["id"] for c in fossology_licenses]) - start = content_ids[0] - end = content_ids[-1] - - # retrieve fossology_licenses - tool_id = fossology_licenses[0]["indexer_configuration_id"] - actual_result = storage.content_fossology_license_get_range( - start, end, indexer_configuration_id=tool_id + actual_result = storage.content_fossology_license_get_partition( + indexer_configuration_id, 0, 1 ) + assert actual_result.next_page_token is None + actual_ids = actual_result.results + assert len(set(actual_ids)) == len(expected_ids) + for actual_id in actual_ids: + assert actual_id in expected_ids - actual_ids = actual_result["ids"] - actual_next = actual_result["next"] - - assert len(fossology_licenses) == len(actual_ids) - assert actual_next is None - assert content_ids == actual_ids - - def test_generate_fossology_license_get_range_limit( + def test_generate_content_fossology_license_get_partition_empty( self, swh_indexer_storage_with_data ): - """fossology_license_get_range paginates results if limit exceeded""" + """get_partition when at least one of the partitions is empty""" storage, data = swh_indexer_storage_with_data - fossology_licenses = data.fossology_licenses - # craft some consistent mimetypes + fossology_licenses = data.fossology_licenses mimetypes = prepare_mimetypes_from(fossology_licenses) + indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"] - # add fossology_licenses to storage storage.content_mimetype_add(mimetypes, conflict_update=True) + # add fossology_licenses to storage storage.content_fossology_license_add(fossology_licenses) - # input the list of sha1s we want from storage - content_ids = sorted([c["id"] for c in fossology_licenses]) - start = content_ids[0] - end = content_ids[-1] + # All ids from the db + expected_ids = set([c["id"] for c in fossology_licenses]) - # retrieve fossology_licenses limited to 3 results - limited_results = len(fossology_licenses) - 1 - tool_id = fossology_licenses[0]["indexer_configuration_id"] - actual_result = storage.content_fossology_license_get_range( - start, end, indexer_configuration_id=tool_id, limit=limited_results - ) + # nb_partitions = smallest power of 2 such that at least one of + # the partitions is empty + nb_licenses = len(fossology_licenses) + nb_partitions = 1 << math.floor(math.log2(nb_licenses) + 1) - actual_ids = actual_result["ids"] - actual_next = actual_result["next"] + seen_ids = [] - assert limited_results == len(actual_ids) - assert actual_next is not None - assert actual_next == content_ids[-1] + for partition_id in range(nb_partitions): + actual_result = storage.content_fossology_license_get_partition( + indexer_configuration_id, + partition_id, + nb_partitions, + limit=nb_licenses + 1, + ) - expected_fossology_licenses = content_ids[:-1] - assert expected_fossology_licenses == actual_ids + for actual_id in actual_result.results: + seen_ids.append(actual_id) - # retrieve next part - actual_results2 = storage.content_fossology_license_get_range( - start=end, end=end, indexer_configuration_id=tool_id - ) - actual_ids2 = actual_results2["ids"] - actual_next2 = actual_results2["next"] + # Limit is higher than the max number of results + assert actual_result.next_page_token is None - assert actual_next2 is None - expected_fossology_licenses2 = [content_ids[-1]] - assert expected_fossology_licenses2 == actual_ids2 + assert set(seen_ids) == expected_ids + + def test_generate_content_fossology_license_get_partition_with_pagination( + self, swh_indexer_storage_with_data + ): + """get_partition should return ids provided with paginationv + + """ + storage, data = swh_indexer_storage_with_data + # craft some consistent mimetypes + fossology_licenses = data.fossology_licenses + mimetypes = prepare_mimetypes_from(fossology_licenses) + indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"] + + storage.content_mimetype_add(mimetypes, conflict_update=True) + # add fossology_licenses to storage + storage.content_fossology_license_add(fossology_licenses) + + # All ids from the db + expected_ids = [c["id"] for c in fossology_licenses] + + nb_partitions = 4 + + actual_ids = [] + for partition_id in range(nb_partitions): + next_page_token = None + while True: + actual_result = storage.content_fossology_license_get_partition( + indexer_configuration_id, + partition_id, + nb_partitions, + limit=2, + page_token=next_page_token, + ) + actual_ids.extend(actual_result.results) + next_page_token = actual_result.next_page_token + if next_page_token is None: + break + + assert len(set(actual_ids)) == len(set(expected_ids)) + for actual_id in actual_ids: + assert actual_id in expected_ids class TestIndexerStorageOriginIntrinsicMetadata: diff --git a/swh/indexer/tests/test_fossology_license.py b/swh/indexer/tests/test_fossology_license.py --- a/swh/indexer/tests/test_fossology_license.py +++ b/swh/indexer/tests/test_fossology_license.py @@ -12,14 +12,14 @@ from swh.indexer import fossology_license from swh.indexer.fossology_license import ( FossologyLicenseIndexer, - FossologyLicenseRangeIndexer, + FossologyLicensePartitionIndexer, compute_license, ) from swh.indexer.tests.utils import ( SHA1_TO_LICENSES, CommonContentIndexerTest, - CommonContentIndexerRangeTest, + CommonContentIndexerPartitionTest, BASE_TEST_CONFIG, fill_storage, fill_obj_storage, @@ -109,8 +109,8 @@ fossology_license.compute_license = self.orig_compute_license -class TestFossologyLicenseRangeIndexer( - CommonContentIndexerRangeTest, unittest.TestCase +class TestFossologyLicensePartitionIndexer( + CommonContentIndexerPartitionTest, unittest.TestCase ): """Range Fossology License Indexer tests. @@ -128,7 +128,7 @@ self.orig_compute_license = fossology_license.compute_license fossology_license.compute_license = mock_compute_license - self.indexer = FossologyLicenseRangeIndexer(config=RANGE_CONFIG) + self.indexer = FossologyLicensePartitionIndexer(config=RANGE_CONFIG) self.indexer.catch_exceptions = False fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) @@ -167,4 +167,4 @@ def test_fossology_range_w_no_tool(): with pytest.raises(ValueError): - FossologyLicenseRangeIndexer(config=filter_dict(RANGE_CONFIG, "tools")) + FossologyLicensePartitionIndexer(config=filter_dict(RANGE_CONFIG, "tools")) diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py --- a/swh/indexer/tests/test_mimetype.py +++ b/swh/indexer/tests/test_mimetype.py @@ -10,13 +10,13 @@ from swh.indexer.mimetype import ( MimetypeIndexer, - MimetypeRangeIndexer, + MimetypePartitionIndexer, compute_mimetype_encoding, ) from swh.indexer.tests.utils import ( CommonContentIndexerTest, - CommonContentIndexerRangeTest, + CommonContentIndexerPartitionTest, BASE_TEST_CONFIG, fill_storage, fill_obj_storage, @@ -96,7 +96,9 @@ RANGE_CONFIG = dict(list(CONFIG.items()) + [("write_batch_size", 100)]) -class TestMimetypeRangeIndexer(CommonContentIndexerRangeTest, unittest.TestCase): +class TestMimetypePartitionIndexer( + CommonContentIndexerPartitionTest, unittest.TestCase +): """Range Mimetype Indexer tests. - new data within range are indexed @@ -108,7 +110,7 @@ def setUp(self): super().setUp() - self.indexer = MimetypeRangeIndexer(config=RANGE_CONFIG) + self.indexer = MimetypePartitionIndexer(config=RANGE_CONFIG) self.indexer.catch_exceptions = False fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) @@ -147,4 +149,4 @@ def test_mimetype_range_w_no_tool(): with pytest.raises(ValueError): - MimetypeRangeIndexer(config=filter_dict(CONFIG, "tools")) + MimetypePartitionIndexer(config=filter_dict(CONFIG, "tools")) diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py --- a/swh/indexer/tests/utils.py +++ b/swh/indexer/tests/utils.py @@ -677,7 +677,7 @@ self.assert_results_ok(sha1s, expected_results) -class CommonContentIndexerRangeTest: +class CommonContentIndexerPartitionTest: """Allows to factorize tests on range indexer. """