Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066616
D3718.id13108.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
66 KB
Subscribers
None
D3718.id13108.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,6 +1,6 @@
-swh.core[db,http] >= 0.0.87
+swh.core[db,http] >= 0.2.2
swh.model >= 0.0.15
swh.objstorage >= 0.0.43
swh.scheduler >= 0.0.47
-swh.storage >= 0.8.0
+swh.storage >= 0.12.0
swh.journal >= 0.1.0
diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py
--- a/swh/indexer/fossology_license.py
+++ b/swh/indexer/fossology_license.py
@@ -6,11 +6,12 @@
import logging
import subprocess
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Union
from swh.model import hashutil
-from .indexer import ContentIndexer, ContentRangeIndexer, write_to_temp
+from .indexer import ContentIndexer, ContentPartitionIndexer, write_to_temp
+from swh.indexer.storage.interface import PagedResult, Sha1
logger = logging.getLogger(__name__)
@@ -56,7 +57,7 @@
"""Mixin fossology license indexer.
See :class:`FossologyLicenseIndexer` and
- :class:`FossologyLicenseRangeIndexer`
+ :class:`FossologyLicensePartitionIndexer`
"""
@@ -82,7 +83,7 @@
self.working_directory = self.config["workdir"]
def index(
- self, id: bytes, data: Optional[bytes] = None, **kwargs
+ self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs
) -> Dict[str, Any]:
"""Index sha1s' content and store result.
@@ -153,33 +154,39 @@
)
-class FossologyLicenseRangeIndexer(MixinFossologyLicenseIndexer, ContentRangeIndexer):
- """FossologyLicense Range Indexer working on range of content identifiers.
+class FossologyLicensePartitionIndexer(
+ MixinFossologyLicenseIndexer, ContentPartitionIndexer
+):
+ """FossologyLicense Range Indexer working on range/partition of content identifiers.
- filters out the non textual content
- (optionally) filters out content already indexed (cf
- :meth:`.indexed_contents_in_range`)
+ :meth:`.indexed_contents_in_partition`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
- def indexed_contents_in_range(self, start, end):
- """Retrieve indexed content id within range [start, end].
+ def indexed_contents_in_partition(
+ self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None
+ ) -> PagedResult[Sha1]:
+ """Retrieve indexed content id within the partition id
Args:
- start (bytes): Starting bound from range identifier
- end (bytes): End range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
+ page_token: opaque token used for pagination
Returns:
- dict: a dict with keys:
-
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
+ PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
- return self.idx_storage.content_fossology_license_get_range(
- start, end, self.tool["id"]
+ return self.idx_storage.content_fossology_license_get_partition(
+ self.tool["id"], partition_id, nb_partitions, page_token=page_token
)
+
+
+# alias for retrocompatibility
+FossologyLicenseRangeIndexer = FossologyLicensePartitionIndexer
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -18,7 +18,7 @@
from swh.core.config import SWHConfig
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
-from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY
+from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY, PagedResult, Sha1
from swh.model import hashutil
from swh.core import utils
@@ -233,12 +233,12 @@
return []
def index(
- self, id: bytes, data: Optional[bytes] = None, **kwargs
+ self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs
) -> Dict[str, Any]:
"""Index computation for the id and associated raw data.
Args:
- id: identifier
+ id: identifier or Dict object
data: id's data from storage or objstorage depending on
object type
@@ -282,7 +282,7 @@
class ContentIndexer(BaseIndexer):
"""A content indexer working on a list of ids directly.
- To work on indexer range, use the :class:`ContentRangeIndexer`
+ To work on indexer partition, use the :class:`ContentPartitionIndexer`
instead.
Note: :class:`ContentIndexer` is not an instantiable object. To
@@ -343,64 +343,76 @@
return summary
-class ContentRangeIndexer(BaseIndexer):
+class ContentPartitionIndexer(BaseIndexer):
"""A content range indexer.
- This expects as input a range of ids to index.
+ This expects as input a partition_id and a nb_partitions. This will then index the
+ contents within that partition.
To work on a list of ids, use the :class:`ContentIndexer` instead.
- Note: :class:`ContentRangeIndexer` is not an instantiable
+ Note: :class:`ContentPartitionIndexer` is not an instantiable
object. To use it, one should inherit from this class and override
the methods mentioned in the :class:`BaseIndexer` class.
"""
@abc.abstractmethod
- def indexed_contents_in_range(self, start: bytes, end: bytes) -> Any:
+ def indexed_contents_in_partition(
+ self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None
+ ) -> PagedResult[Sha1]:
"""Retrieve indexed contents within range [start, end].
Args:
- start: Starting bound from range identifier
- end: End range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
+ page_token: opaque token used for pagination
- Yields:
- bytes: Content identifier present in the range ``[start, end]``
+ Returns:
+ PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
pass
def _list_contents_to_index(
- self, start: bytes, end: bytes, indexed: Set[bytes]
- ) -> Iterator[bytes]:
- """Compute from storage the new contents to index in the range [start,
- end]. The already indexed contents are skipped.
+ self, partition_id: int, nb_partitions: int, indexed: Set[Sha1]
+ ) -> Iterator[Sha1]:
+ """Compute from storage the new contents to index in the partition_id . The already
+ indexed contents are skipped.
Args:
- start: Starting bound from range identifier
- end: End range identifier
+ partition_id: Index of the partition to fetch data from
+ nb_partitions: Total number of partition
indexed: Set of content already indexed.
Yields:
- bytes: Identifier of contents to index.
+ Sha1 id (bytes) of contents to index
"""
- if not isinstance(start, bytes) or not isinstance(end, bytes):
- raise TypeError("identifiers must be bytes, not %r and %r." % (start, end))
- while start:
- result = self.storage.content_get_range(start, end)
- contents = result["contents"]
+ if not isinstance(partition_id, int) or not isinstance(nb_partitions, int):
+ raise TypeError(
+ f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}."
+ )
+ next_page_token = None
+ while True:
+ result = self.storage.content_get_partition(
+ partition_id, nb_partitions, page_token=next_page_token
+ )
+ contents = result.results
for c in contents:
- _id = hashutil.hash_to_bytes(c["sha1"])
+ _id = hashutil.hash_to_bytes(c.sha1)
if _id in indexed:
continue
yield _id
- start = result["next"]
+ next_page_token = result.next_page_token
+ if next_page_token is None:
+ break
def _index_contents(
- self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any
+ self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any
) -> Iterator[Dict]:
- """Index the contents from within range [start, end]
+ """Index the contents within the partition_id.
Args:
start: Starting bound from range identifier
@@ -408,16 +420,14 @@
indexed: Set of content already indexed.
Yields:
- dict: Data indexed to persist using the indexer storage
+ indexing result as dict to persist in the indexer backend
"""
- for sha1 in self._list_contents_to_index(start, end, indexed):
+ for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed):
try:
raw_content = self.objstorage.get(sha1)
except ObjNotFoundError:
- self.log.warning(
- "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1)
- )
+ self.log.warning(f"Content {sha1.hex()} not found in objstorage")
continue
res = self.index(sha1, raw_content, **kwargs)
if res:
@@ -429,62 +439,64 @@
yield res
def _index_with_skipping_already_done(
- self, start: bytes, end: bytes
+ self, partition_id: int, nb_partitions: int
) -> Iterator[Dict]:
- """Index not already indexed contents in range [start, end].
+ """Index not already indexed contents within the partition partition_id
Args:
- start: Starting range identifier
- end: Ending range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
Yields:
- dict: Content identifier present in the range
- ``[start, end]`` which are not already indexed.
+ indexing result as dict to persist in the indexer backend
"""
- while start:
- indexed_page = self.indexed_contents_in_range(start, end)
- contents = indexed_page["ids"]
- _end = contents[-1] if contents else end
- yield from self._index_contents(start, _end, contents)
- start = indexed_page["next"]
+ next_page_token = None
+ contents = set()
+ while True:
+ indexed_page = self.indexed_contents_in_partition(
+ partition_id, nb_partitions, page_token=next_page_token
+ )
+ for sha1 in indexed_page.results:
+ contents.add(sha1)
+ yield from self._index_contents(partition_id, nb_partitions, contents)
+ next_page_token = indexed_page.next_page_token
+ if next_page_token is None:
+ break
def run(
self,
- start: Union[bytes, str],
- end: Union[bytes, str],
+ partition_id: int,
+ nb_partitions: int,
skip_existing: bool = True,
- **kwargs
+ **kwargs,
) -> Dict:
- """Given a range of content ids, compute the indexing computations on
- the contents within. Either the indexer is incremental
- (filter out existing computed data) or not (compute
- everything from scratch).
+ """Given a partition of content ids, index the contents within.
+
+ Either the indexer is incremental (filter out existing computed data) or it
+ computes everything from scratch.
Args:
- start: Starting range identifier
- end: Ending range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
skip_existing: Skip existing indexed data
- (default) or not
+ (default) or not
**kwargs: passed to the `index` method
Returns:
- A dict with the task's status
+ dict with the indexing task status
"""
status = "uneventful"
summary: Dict[str, Any] = {}
count = 0
try:
- range_start = (
- hashutil.hash_to_bytes(start) if isinstance(start, str) else start
- )
- range_end = hashutil.hash_to_bytes(end) if isinstance(end, str) else end
-
if skip_existing:
- gen = self._index_with_skipping_already_done(range_start, range_end)
+ gen = self._index_with_skipping_already_done(
+ partition_id, nb_partitions
+ )
else:
- gen = self._index_contents(range_start, range_end, indexed=set([]))
+ gen = self._index_contents(partition_id, nb_partitions, indexed=set([]))
count_object_added_key: Optional[str] = None
@@ -509,6 +521,10 @@
return summary
+# alias for retrocompatibility
+ContentRangeIndexer = ContentPartitionIndexer
+
+
class OriginIndexer(BaseIndexer):
"""An object type indexer, inherits from the :class:`BaseIndexer` and
implements Origin indexing using the run method
@@ -590,11 +606,11 @@
summary: Dict[str, Any] = {}
status = "uneventful"
results = []
- revs = self.storage.revision_get(
- hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids
- )
- for rev in revs:
+ revision_ids = [
+ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids
+ ]
+ for rev in self.storage.revision_get(revision_ids):
if not rev:
self.log.warning(
"Revisions %s not found in storage"
diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py
--- a/swh/indexer/mimetype.py
+++ b/swh/indexer/mimetype.py
@@ -3,10 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Optional, Dict, Any, List
import magic
-from .indexer import ContentIndexer, ContentRangeIndexer
+from typing import Any, Optional, Dict, List, Union
+
+from swh.indexer.storage.interface import PagedResult, Sha1
+
+from .indexer import ContentIndexer, ContentPartitionIndexer
if not hasattr(magic.Magic, "from_buffer"):
raise ImportError(
@@ -40,7 +43,7 @@
class MixinMimetypeIndexer:
"""Mixin mimetype indexer.
- See :class:`MimetypeIndexer` and :class:`MimetypeRangeIndexer`
+ See :class:`MimetypeIndexer` and :class:`MimetypePartitionIndexer`
"""
@@ -61,7 +64,7 @@
CONFIG_BASE_FILENAME = "indexer/mimetype" # type: Optional[str]
def index(
- self, id: bytes, data: Optional[bytes] = None, **kwargs
+ self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs
) -> Dict[str, Any]:
"""Index sha1s' content and store result.
@@ -79,6 +82,7 @@
"""
assert data is not None
properties = compute_mimetype_encoding(data)
+ assert isinstance(id, bytes)
properties.update(
{"id": id, "indexer_configuration_id": self.tool["id"],}
)
@@ -124,34 +128,38 @@
)
-class MimetypeRangeIndexer(MixinMimetypeIndexer, ContentRangeIndexer):
+class MimetypePartitionIndexer(MixinMimetypeIndexer, ContentPartitionIndexer):
"""Mimetype Range Indexer working on range of content identifiers.
It:
- (optionally) filters out content already indexed (cf
- :meth:`.indexed_contents_in_range`)
+ :meth:`.indexed_contents_in_partition`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
- def indexed_contents_in_range(
- self, start: bytes, end: bytes
- ) -> Dict[str, Optional[bytes]]:
- """Retrieve indexed content id within range [start, end].
+ def indexed_contents_in_partition(
+ self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None,
+ ) -> PagedResult[Sha1]:
+ """Retrieve indexed content ids within partition_id.
Args:
- start: Starting bound from range identifier
- end: End range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
+ page_token: opaque token used for pagination
Returns:
- dict: a dict with keys:
-
- - ids: iterable of content ids within the range.
- - next: The next range of sha1 starts at
- this sha1 if any
+ PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
- return self.idx_storage.content_mimetype_get_range(start, end, self.tool["id"])
+ return self.idx_storage.content_mimetype_get_partition(
+ self.tool["id"], partition_id, nb_partitions, page_token=page_token
+ )
+
+
+# alias for retrocompatibility
+MimetypeRangeIndexer = MimetypePartitionIndexer
diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py
--- a/swh/indexer/rehash.py
+++ b/swh/indexer/rehash.py
@@ -173,11 +173,11 @@
groups: Dict[str, List[Any]] = defaultdict(list)
for content, keys_to_update in data:
- keys = ",".join(keys_to_update)
- groups[keys].append(content)
+ keys_str = ",".join(keys_to_update)
+ groups[keys_str].append(content)
for keys_to_update, contents in groups.items():
- keys = keys_to_update.split(",")
+ keys: List[str] = keys_to_update.split(",")
try:
self.storage.content_update(contents, keys=keys)
count += len(contents)
diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py
--- a/swh/indexer/storage/__init__.py
+++ b/swh/indexer/storage/__init__.py
@@ -9,11 +9,16 @@
import psycopg2.pool
from collections import defaultdict, Counter
-from typing import Dict, List
+from typing import Dict, List, Optional
+from swh.model.hashutil import hash_to_bytes, hash_to_hex
+from swh.model.model import SHA1_SIZE
from swh.storage.common import db_transaction_generator, db_transaction
from swh.storage.exc import StorageDBError
+from swh.storage.utils import get_partition_bounds_bytes
+
+from .interface import PagedResult, Sha1
from . import converters
from .db import Db
from .exc import IndexerStorageArgumentException, DuplicateId
@@ -135,30 +140,60 @@
for obj in db.content_mimetype_missing_from_list(mimetypes, cur):
yield obj[0]
- def _content_get_range(
+ @timed
+ @db_transaction()
+ def get_partition(
self,
- content_type,
- start,
- end,
- indexer_configuration_id,
- limit=1000,
+ indexer_type: str,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
with_textual_data=False,
db=None,
cur=None,
- ):
+ ) -> PagedResult[Sha1]:
+ """Retrieve ids of content with `indexer_type` within within partition partition_id
+ bound by limit.
+
+ Args:
+ **indexer_type**: Type of data content to index (mimetype, language, etc...)
+ **indexer_configuration_id**: The tool used to index data
+ **partition_id**: index of the partition to fetch
+ **nb_partitions**: total number of partitions to split into
+ **page_token**: opaque token used for pagination
+ **limit**: Limit result (default to 1000)
+ **with_textual_data** (bool): Deal with only textual content (True) or all
+ content (all contents by defaults, False)
+
+ Raises:
+ IndexerStorageArgumentException for;
+ - limit to None
+ - wrong indexer_type provided
+
+ Returns:
+ PagedResult of Sha1. If next_page_token is None, there is no more data to
+ fetch
+
+ """
if limit is None:
raise IndexerStorageArgumentException("limit should not be None")
- if content_type not in db.content_indexer_names:
- err = "Wrong type. Should be one of [%s]" % (
- ",".join(db.content_indexer_names)
- )
+ if indexer_type not in db.content_indexer_names:
+ err = f"Wrong type. Should be one of [{','.join(db.content_indexer_names)}]"
raise IndexerStorageArgumentException(err)
- ids = []
- next_id = None
- for counter, obj in enumerate(
- db.content_get_range(
- content_type,
+ start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE)
+ if page_token is not None:
+ start = hash_to_bytes(page_token)
+ if end is None:
+ end = b"\xff" * SHA1_SIZE
+
+ next_page_token: Optional[str] = None
+ ids = [
+ row[0]
+ for row in db.content_get_range(
+ indexer_type,
start,
end,
indexer_configuration_id,
@@ -166,26 +201,33 @@
with_textual_data=with_textual_data,
cur=cur,
)
- ):
- _id = obj[0]
- if counter >= limit:
- next_id = _id
- break
+ ]
- ids.append(_id)
+ if len(ids) >= limit:
+ next_page_token = hash_to_hex(ids[-1])
+ ids = ids[:limit]
- return {"ids": ids, "next": next_id}
+ assert len(ids) <= limit
+ return PagedResult(results=ids, next_page_token=next_page_token)
@timed
@db_transaction()
- def content_mimetype_get_range(
- self, start, end, indexer_configuration_id, limit=1000, db=None, cur=None
- ):
- return self._content_get_range(
+ def content_mimetype_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ db=None,
+ cur=None,
+ ) -> PagedResult[Sha1]:
+ return self.get_partition(
"mimetype",
- start,
- end,
indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ page_token=page_token,
limit=limit,
db=db,
cur=cur,
@@ -349,14 +391,22 @@
@timed
@db_transaction()
- def content_fossology_license_get_range(
- self, start, end, indexer_configuration_id, limit=1000, db=None, cur=None
- ):
- return self._content_get_range(
+ def content_fossology_license_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ db=None,
+ cur=None,
+ ) -> PagedResult[Sha1]:
+ return self.get_partition(
"fossology_license",
- start,
- end,
indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ page_token=page_token,
limit=limit,
with_textual_data=True,
db=db,
diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py
--- a/swh/indexer/storage/in_memory.py
+++ b/swh/indexer/storage/in_memory.py
@@ -3,18 +3,25 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import bisect
-from collections import defaultdict, Counter
import itertools
import json
import operator
import math
import re
-from typing import Any, Dict, List
+
+from collections import defaultdict, Counter
+from typing import Any, Dict, List, Optional
+
+from swh.model.model import SHA1_SIZE
+from swh.model.hashutil import hash_to_hex, hash_to_bytes
+from swh.storage.utils import get_partition_bounds_bytes
+from swh.storage.in_memory import SortedList
from . import MAPPING_NAMES, check_id_duplicates
from .exc import IndexerStorageArgumentException
+from .interface import PagedResult, Sha1
+
SHA1_DIGEST_SIZE = 160
@@ -38,7 +45,7 @@
def __init__(self, tools):
self._tools = tools
- self._sorted_ids = []
+ self._sorted_ids = SortedList[bytes, bytes]()
self._data = {} # map (id_, tool_id) -> metadata_dict
self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id]
@@ -88,41 +95,61 @@
def get_all(self):
yield from self.get(self._sorted_ids)
- def get_range(self, start, end, indexer_configuration_id, limit):
- """Retrieve data within range [start, end] bound by limit.
+ def get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ """Retrieve ids of content with `indexer_type` within partition partition_id
+ bound by limit.
Args:
- **start** (bytes): Starting identifier range (expected smaller
- than end)
- **end** (bytes): Ending identifier range (expected larger
- than start)
- **indexer_configuration_id** (int): The tool used to index data
- **limit** (int): Limit result
+ **indexer_type**: Type of data content to index (mimetype, language, etc...)
+ **indexer_configuration_id**: The tool used to index data
+ **partition_id**: index of the partition to fetch
+ **nb_partitions**: total number of partitions to split into
+ **page_token**: opaque token used for pagination
+ **limit**: Limit result (default to 1000)
+ **with_textual_data** (bool): Deal with only textual content (True) or all
+ content (all contents by defaults, False)
Raises:
- IndexerStorageArgumentException for limit to None
+ IndexerStorageArgumentException for;
+ - limit to None
+ - wrong indexer_type provided
Returns:
- a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
+ PagedResult of Sha1. If next_page_token is None, there is no more data to
+ fetch
"""
if limit is None:
raise IndexerStorageArgumentException("limit should not be None")
- from_index = bisect.bisect_left(self._sorted_ids, start)
- to_index = bisect.bisect_right(self._sorted_ids, end, lo=from_index)
- if to_index - from_index >= limit:
- return {
- "ids": self._sorted_ids[from_index : from_index + limit],
- "next": self._sorted_ids[from_index + limit],
- }
- else:
- return {
- "ids": self._sorted_ids[from_index:to_index],
- "next": None,
- }
+ (start, end) = get_partition_bounds_bytes(
+ partition_id, nb_partitions, SHA1_SIZE
+ )
+
+ if page_token:
+ start = hash_to_bytes(page_token)
+ if end is None:
+ end = b"\xff" * SHA1_SIZE
+
+ next_page_token: Optional[str] = None
+ ids: List[Sha1] = []
+ sha1s = (sha1 for sha1 in self._sorted_ids.iter_from(start))
+ for counter, sha1 in enumerate(sha1s):
+ if sha1 > end:
+ break
+ if counter >= limit:
+ next_page_token = hash_to_hex(sha1)
+ break
+ ids.append(sha1)
+
+ assert len(ids) <= limit
+ return PagedResult(results=ids, next_page_token=next_page_token)
def add(self, data: List[Dict], conflict_update: bool) -> int:
"""Add data not present in storage.
@@ -155,7 +182,7 @@
self._tools_per_id[id_].add(tool_id)
count += 1
if id_ not in self._sorted_ids:
- bisect.insort(self._sorted_ids, id_)
+ self._sorted_ids.add(id_)
return count
def add_merge(
@@ -190,7 +217,7 @@
conflict_update=True,
)
if id_ not in self._sorted_ids:
- bisect.insort(self._sorted_ids, id_)
+ self._sorted_ids.add(id_)
return added
def delete(self, entries: List[Dict]) -> int:
@@ -228,10 +255,17 @@
def content_mimetype_missing(self, mimetypes):
yield from self._mimetypes.missing(mimetypes)
- def content_mimetype_get_range(
- self, start, end, indexer_configuration_id, limit=1000
- ):
- return self._mimetypes.get_range(start, end, indexer_configuration_id, limit)
+ def content_mimetype_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ return self._mimetypes.get_partition(
+ indexer_configuration_id, partition_id, nb_partitions, page_token, limit
+ )
def content_mimetype_add(
self, mimetypes: List[Dict], conflict_update: bool = False
@@ -306,10 +340,17 @@
added = self._licenses.add_merge(licenses, conflict_update, "licenses")
return {"fossology_license_add:add": added}
- def content_fossology_license_get_range(
- self, start, end, indexer_configuration_id, limit=1000
- ):
- return self._licenses.get_range(start, end, indexer_configuration_id, limit)
+ def content_fossology_license_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ return self._licenses.get_partition(
+ indexer_configuration_id, partition_id, nb_partitions, page_token, limit
+ )
def content_metadata_missing(self, metadata):
yield from self._content_metadata.missing(metadata)
diff --git a/swh/indexer/storage/interface.py b/swh/indexer/storage/interface.py
--- a/swh/indexer/storage/interface.py
+++ b/swh/indexer/storage/interface.py
@@ -3,9 +3,17 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Dict, List
+from typing import Dict, List, Optional, TypeVar
from swh.core.api import remote_api_endpoint
+from swh.core.api.classes import PagedResult as CorePagedResult
+
+
+TResult = TypeVar("TResult")
+PagedResult = CorePagedResult[TResult, str]
+
+
+Sha1 = bytes
class IndexerStorageInterface:
@@ -31,67 +39,32 @@
"""
...
- def _content_get_range(
+ @remote_api_endpoint("content_mimetype/range")
+ def content_mimetype_get_partition(
self,
- content_type,
- start,
- end,
- indexer_configuration_id,
- limit=1000,
- with_textual_data=False,
- ):
- """Retrieve ids of type content_type within range [start, end] bound
- by limit.
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ """Retrieve mimetypes within partition partition_id bound by limit.
Args:
- **content_type** (str): content's type (mimetype, language, etc...)
- **start** (bytes): Starting identifier range (expected smaller
- than end)
- **end** (bytes): Ending identifier range (expected larger
- than start)
- **indexer_configuration_id** (int): The tool used to index data
- **limit** (int): Limit result (default to 1000)
- **with_textual_data** (bool): Deal with only textual
- content (True) or all
- content (all contents by
- defaults, False)
+ **indexer_configuration_id**: The tool used to index data
+ **partition_id**: index of the partition to fetch
+ **nb_partitions**: total number of partitions to split into
+ **page_token**: opaque token used for pagination
+ **limit**: Limit result (default to 1000)
Raises:
- ValueError for;
+ IndexerStorageArgumentException for;
- limit to None
- - wrong content_type provided
-
- Returns:
- a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
-
- """
- ...
-
- @remote_api_endpoint("content_mimetype/range")
- def content_mimetype_get_range(
- self, start, end, indexer_configuration_id, limit=1000
- ):
- """Retrieve mimetypes within range [start, end] bound by limit.
-
- Args:
- **start** (bytes): Starting identifier range (expected smaller
- than end)
- **end** (bytes): Ending identifier range (expected larger
- than start)
- **indexer_configuration_id** (int): The tool used to index data
- **limit** (int): Limit result (default to 1000)
-
- Raises:
- ValueError for limit to None
+ - wrong indexer_type provided
Returns:
- a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
+ PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
...
@@ -307,27 +280,30 @@
...
@remote_api_endpoint("content/fossology_license/range")
- def content_fossology_license_get_range(
- self, start, end, indexer_configuration_id, limit=1000
- ):
- """Retrieve licenses within range [start, end] bound by limit.
+ def content_fossology_license_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ """Retrieve licenses within the partition partition_id bound by limit.
Args:
- **start** (bytes): Starting identifier range (expected smaller
- than end)
- **end** (bytes): Ending identifier range (expected larger
- than start)
- **indexer_configuration_id** (int): The tool used to index data
- **limit** (int): Limit result (default to 1000)
+ **indexer_configuration_id**: The tool used to index data
+ **partition_id**: index of the partition to fetch
+ **nb_partitions**: total number of partitions to split into
+ **page_token**: opaque token used for pagination
+ **limit**: Limit result (default to 1000)
Raises:
- ValueError for limit to None
+ IndexerStorageArgumentException for;
+ - limit to None
+ - wrong indexer_type provided
- Returns:
- a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
+ Returns: PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
...
diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py
--- a/swh/indexer/tasks.py
+++ b/swh/indexer/tasks.py
@@ -6,9 +6,9 @@
from celery import current_app as app
-from .mimetype import MimetypeIndexer, MimetypeRangeIndexer
+from .mimetype import MimetypeIndexer, MimetypePartitionIndexer
from .ctags import CtagsIndexer
-from .fossology_license import FossologyLicenseIndexer, FossologyLicenseRangeIndexer
+from .fossology_license import FossologyLicenseIndexer, FossologyLicensePartitionIndexer
from .rehash import RecomputeChecksums
from .metadata import OriginMetadataIndexer
@@ -40,9 +40,9 @@
@app.task(name=__name__ + ".ContentRangeMimetype")
def range_mimetype(*args, **kwargs):
- return MimetypeRangeIndexer().run(*args, **kwargs)
+ return MimetypePartitionIndexer().run(*args, **kwargs)
@app.task(name=__name__ + ".ContentRangeFossologyLicense")
def range_license(*args, **kwargs):
- return FossologyLicenseRangeIndexer().run(*args, **kwargs)
+ return FossologyLicensePartitionIndexer().run(*args, **kwargs)
diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py
--- a/swh/indexer/tests/storage/test_storage.py
+++ b/swh/indexer/tests/storage/test_storage.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import inspect
+import math
import threading
from typing import Dict
@@ -28,7 +29,7 @@
mimetypes.append(
{
"id": c["id"],
- "mimetype": "text/plain",
+ "mimetype": "text/plain", # for filtering on textual data to work
"encoding": "utf-8",
"indexer_configuration_id": c["indexer_configuration_id"],
}
@@ -363,85 +364,126 @@
{"mimetype": "text/html", "encoding": "us-ascii",},
]
- def test_generate_content_mimetype_get_range_limit_none(self, swh_indexer_storage):
- """mimetype_get_range call with wrong limit input should fail"""
+ def test_generate_content_mimetype_get_partition_failure(self, swh_indexer_storage):
+ """get_partition call with wrong limit input should fail"""
storage = swh_indexer_storage
- with pytest.raises(IndexerStorageArgumentException) as e:
- storage.content_mimetype_get_range(
- start=None, end=None, indexer_configuration_id=None, limit=None
+ indexer_configuration_id = None
+ with pytest.raises(
+ IndexerStorageArgumentException, match="limit should not be None"
+ ):
+ storage.content_mimetype_get_partition(
+ indexer_configuration_id, 0, 3, limit=None
)
- assert e.value.args == ("limit should not be None",)
-
- def test_generate_content_mimetype_get_range_no_limit(
+ def test_generate_content_mimetype_get_partition_no_limit(
self, swh_indexer_storage_with_data
):
- """mimetype_get_range returns mimetypes within range provided"""
+ """get_partition should return result"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
- # All ids from the db
- content_ids = sorted([c["id"] for c in mimetypes])
-
- start = content_ids[0]
- end = content_ids[-1]
+ expected_ids = set([c["id"] for c in mimetypes])
+ indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
- # retrieve mimetypes
- tool_id = mimetypes[0]["indexer_configuration_id"]
- actual_result = storage.content_mimetype_get_range(
- start, end, indexer_configuration_id=tool_id
- )
+ assert len(mimetypes) == 16
+ nb_partitions = 16
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ actual_ids = []
+ for partition_id in range(nb_partitions):
+ actual_result = storage.content_mimetype_get_partition(
+ indexer_configuration_id, partition_id, nb_partitions
+ )
+ assert actual_result.next_page_token is None
+ actual_ids.extend(actual_result.results)
- assert len(mimetypes) == len(actual_ids)
- assert actual_next is None
- assert content_ids == actual_ids
+ assert len(actual_ids) == len(expected_ids)
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
- def test_generate_content_mimetype_get_range_limit(
+ def test_generate_content_mimetype_get_partition_full(
self, swh_indexer_storage_with_data
):
- """mimetype_get_range paginates results if limit exceeded"""
- storage, data = swh_indexer_storage_with_data
+ """get_partition for a single partition should return available ids
- indexer_configuration_id = data.tools["file"]["id"]
-
- # input the list of sha1s we want from storage
- content_ids = sorted([c["id"] for c in data.mimetypes])
- mimetypes = list(storage.content_mimetype_get(content_ids))
- assert len(mimetypes) == len(data.mimetypes)
+ """
+ storage, data = swh_indexer_storage_with_data
+ mimetypes = data.mimetypes
+ expected_ids = set([c["id"] for c in mimetypes])
+ indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
- start = content_ids[0]
- end = content_ids[-1]
- # retrieve mimetypes limited to 10 results
- actual_result = storage.content_mimetype_get_range(
- start, end, indexer_configuration_id=indexer_configuration_id, limit=10
+ actual_result = storage.content_mimetype_get_partition(
+ indexer_configuration_id, 0, 1
)
+ assert actual_result.next_page_token is None
+ actual_ids = actual_result.results
+ assert len(actual_ids) == len(expected_ids)
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
- assert actual_result
- assert set(actual_result.keys()) == {"ids", "next"}
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ def test_generate_content_mimetype_get_partition_empty(
+ self, swh_indexer_storage_with_data
+ ):
+ """get_partition when at least one of the partitions is empty"""
+ storage, data = swh_indexer_storage_with_data
+ mimetypes = data.mimetypes
+ expected_ids = set([c["id"] for c in mimetypes])
+ indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
+
+ # nb_partitions = smallest power of 2 such that at least one of
+ # the partitions is empty
+ nb_mimetypes = len(mimetypes)
+ nb_partitions = 1 << math.floor(math.log2(nb_mimetypes) + 1)
+
+ seen_ids = []
+
+ for partition_id in range(nb_partitions):
+ actual_result = storage.content_mimetype_get_partition(
+ indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ limit=nb_mimetypes + 1,
+ )
- assert len(actual_ids) == 10
- assert actual_next is not None
- assert actual_next == content_ids[10]
+ for actual_id in actual_result.results:
+ seen_ids.append(actual_id)
- expected_mimetypes = content_ids[:10]
- assert expected_mimetypes == actual_ids
+ # Limit is higher than the max number of results
+ assert actual_result.next_page_token is None
- # retrieve next part
- actual_result = storage.content_mimetype_get_range(
- start=end, end=end, indexer_configuration_id=indexer_configuration_id
- )
- assert set(actual_result.keys()) == {"ids", "next"}
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ assert set(seen_ids) == expected_ids
- assert actual_next is None
- expected_mimetypes = [content_ids[-1]]
- assert expected_mimetypes == actual_ids
+ def test_generate_content_mimetype_get_partition_with_pagination(
+ self, swh_indexer_storage_with_data
+ ):
+ """get_partition should return ids provided with pagination
+
+ """
+ storage, data = swh_indexer_storage_with_data
+ mimetypes = data.mimetypes
+ expected_ids = set([c["id"] for c in mimetypes])
+ indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
+
+ nb_partitions = 4
+
+ actual_ids = []
+ for partition_id in range(nb_partitions):
+ next_page_token = None
+ while True:
+ actual_result = storage.content_mimetype_get_partition(
+ indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ limit=2,
+ page_token=next_page_token,
+ )
+ actual_ids.extend(actual_result.results)
+ next_page_token = actual_result.next_page_token
+ if next_page_token is None:
+ break
+
+ assert len(set(actual_ids)) == len(set(expected_ids))
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
class TestIndexerStorageContentLanguage(StorageETypeTester):
@@ -907,135 +949,161 @@
# license did not change as the v2 was dropped.
assert actual_licenses == [expected_license]
- def test_generate_content_fossology_license_get_range_limit_none(
+ def test_generate_content_fossology_license_get_partition_failure(
self, swh_indexer_storage_with_data
):
+ """get_partition call with wrong limit input should fail"""
storage, data = swh_indexer_storage_with_data
- """license_get_range call with wrong limit input should fail"""
- with pytest.raises(IndexerStorageArgumentException) as e:
- storage.content_fossology_license_get_range(
- start=None, end=None, indexer_configuration_id=None, limit=None
+ indexer_configuration_id = None
+ with pytest.raises(
+ IndexerStorageArgumentException, match="limit should not be None"
+ ):
+ storage.content_fossology_license_get_partition(
+ indexer_configuration_id, 0, 3, limit=None,
)
- assert e.value.args == ("limit should not be None",)
-
- def test_generate_content_fossology_license_get_range_no_limit(
+ def test_generate_content_fossology_license_get_partition_no_limit(
self, swh_indexer_storage_with_data
):
- """license_get_range returns licenses within range provided"""
+ """get_partition should return results"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from(fossology_licenses)
+ indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
- content_ids = sorted([c["id"] for c in fossology_licenses])
+ expected_ids = set([c["id"] for c in fossology_licenses])
- start = content_ids[0]
- end = content_ids[-1]
+ assert len(fossology_licenses) == 10
+ assert len(mimetypes) == 10
+ nb_partitions = 4
- # retrieve fossology_licenses
- tool_id = fossology_licenses[0]["indexer_configuration_id"]
- actual_result = storage.content_fossology_license_get_range(
- start, end, indexer_configuration_id=tool_id
- )
+ actual_ids = []
+ for partition_id in range(nb_partitions):
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ actual_result = storage.content_fossology_license_get_partition(
+ indexer_configuration_id, partition_id, nb_partitions
+ )
+ assert actual_result.next_page_token is None
+ actual_ids.extend(actual_result.results)
- assert len(fossology_licenses) == len(actual_ids)
- assert actual_next is None
- assert content_ids == actual_ids
+ assert len(set(actual_ids)) == len(expected_ids)
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
- def test_generate_content_fossology_license_get_range_no_limit_with_filter(
+ def test_generate_content_fossology_license_get_partition_full(
self, swh_indexer_storage_with_data
):
- """This filters non textual, then returns results within range"""
- storage, data = swh_indexer_storage_with_data
- fossology_licenses = data.fossology_licenses
- mimetypes = data.mimetypes
+ """get_partition for a single partition should return available ids
+ """
+ storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
- _mimetypes = prepare_mimetypes_from(fossology_licenses)
- # add binary mimetypes which will get filtered out in results
- for m in mimetypes:
- _mimetypes.append(
- {"mimetype": "binary", **m,}
- )
+ fossology_licenses = data.fossology_licenses
+ mimetypes = prepare_mimetypes_from(fossology_licenses)
+ indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
- storage.content_mimetype_add(_mimetypes, conflict_update=True)
+ storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
- content_ids = sorted([c["id"] for c in fossology_licenses])
+ expected_ids = set([c["id"] for c in fossology_licenses])
- start = content_ids[0]
- end = content_ids[-1]
-
- # retrieve fossology_licenses
- tool_id = fossology_licenses[0]["indexer_configuration_id"]
- actual_result = storage.content_fossology_license_get_range(
- start, end, indexer_configuration_id=tool_id
+ actual_result = storage.content_fossology_license_get_partition(
+ indexer_configuration_id, 0, 1
)
+ assert actual_result.next_page_token is None
+ actual_ids = actual_result.results
+ assert len(set(actual_ids)) == len(expected_ids)
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
-
- assert len(fossology_licenses) == len(actual_ids)
- assert actual_next is None
- assert content_ids == actual_ids
-
- def test_generate_fossology_license_get_range_limit(
+ def test_generate_content_fossology_license_get_partition_empty(
self, swh_indexer_storage_with_data
):
- """fossology_license_get_range paginates results if limit exceeded"""
+ """get_partition when at least one of the partitions is empty"""
storage, data = swh_indexer_storage_with_data
- fossology_licenses = data.fossology_licenses
-
# craft some consistent mimetypes
+ fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from(fossology_licenses)
+ indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
- # add fossology_licenses to storage
storage.content_mimetype_add(mimetypes, conflict_update=True)
+ # add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
- # input the list of sha1s we want from storage
- content_ids = sorted([c["id"] for c in fossology_licenses])
- start = content_ids[0]
- end = content_ids[-1]
+ # All ids from the db
+ expected_ids = set([c["id"] for c in fossology_licenses])
- # retrieve fossology_licenses limited to 3 results
- limited_results = len(fossology_licenses) - 1
- tool_id = fossology_licenses[0]["indexer_configuration_id"]
- actual_result = storage.content_fossology_license_get_range(
- start, end, indexer_configuration_id=tool_id, limit=limited_results
- )
+ # nb_partitions = smallest power of 2 such that at least one of
+ # the partitions is empty
+ nb_licenses = len(fossology_licenses)
+ nb_partitions = 1 << math.floor(math.log2(nb_licenses) + 1)
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ seen_ids = []
- assert limited_results == len(actual_ids)
- assert actual_next is not None
- assert actual_next == content_ids[-1]
+ for partition_id in range(nb_partitions):
+ actual_result = storage.content_fossology_license_get_partition(
+ indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ limit=nb_licenses + 1,
+ )
- expected_fossology_licenses = content_ids[:-1]
- assert expected_fossology_licenses == actual_ids
+ for actual_id in actual_result.results:
+ seen_ids.append(actual_id)
- # retrieve next part
- actual_results2 = storage.content_fossology_license_get_range(
- start=end, end=end, indexer_configuration_id=tool_id
- )
- actual_ids2 = actual_results2["ids"]
- actual_next2 = actual_results2["next"]
+ # Limit is higher than the max number of results
+ assert actual_result.next_page_token is None
- assert actual_next2 is None
- expected_fossology_licenses2 = [content_ids[-1]]
- assert expected_fossology_licenses2 == actual_ids2
+ assert set(seen_ids) == expected_ids
+
+ def test_generate_content_fossology_license_get_partition_with_pagination(
+ self, swh_indexer_storage_with_data
+ ):
+ """get_partition should return ids provided with paginationv
+
+ """
+ storage, data = swh_indexer_storage_with_data
+ # craft some consistent mimetypes
+ fossology_licenses = data.fossology_licenses
+ mimetypes = prepare_mimetypes_from(fossology_licenses)
+ indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
+
+ storage.content_mimetype_add(mimetypes, conflict_update=True)
+ # add fossology_licenses to storage
+ storage.content_fossology_license_add(fossology_licenses)
+
+ # All ids from the db
+ expected_ids = [c["id"] for c in fossology_licenses]
+
+ nb_partitions = 4
+
+ actual_ids = []
+ for partition_id in range(nb_partitions):
+ next_page_token = None
+ while True:
+ actual_result = storage.content_fossology_license_get_partition(
+ indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ limit=2,
+ page_token=next_page_token,
+ )
+ actual_ids.extend(actual_result.results)
+ next_page_token = actual_result.next_page_token
+ if next_page_token is None:
+ break
+
+ assert len(set(actual_ids)) == len(set(expected_ids))
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
class TestIndexerStorageOriginIntrinsicMetadata:
diff --git a/swh/indexer/tests/test_fossology_license.py b/swh/indexer/tests/test_fossology_license.py
--- a/swh/indexer/tests/test_fossology_license.py
+++ b/swh/indexer/tests/test_fossology_license.py
@@ -12,14 +12,14 @@
from swh.indexer import fossology_license
from swh.indexer.fossology_license import (
FossologyLicenseIndexer,
- FossologyLicenseRangeIndexer,
+ FossologyLicensePartitionIndexer,
compute_license,
)
from swh.indexer.tests.utils import (
SHA1_TO_LICENSES,
CommonContentIndexerTest,
- CommonContentIndexerRangeTest,
+ CommonContentIndexerPartitionTest,
BASE_TEST_CONFIG,
fill_storage,
fill_obj_storage,
@@ -109,8 +109,8 @@
fossology_license.compute_license = self.orig_compute_license
-class TestFossologyLicenseRangeIndexer(
- CommonContentIndexerRangeTest, unittest.TestCase
+class TestFossologyLicensePartitionIndexer(
+ CommonContentIndexerPartitionTest, unittest.TestCase
):
"""Range Fossology License Indexer tests.
@@ -128,33 +128,11 @@
self.orig_compute_license = fossology_license.compute_license
fossology_license.compute_license = mock_compute_license
- self.indexer = FossologyLicenseRangeIndexer(config=RANGE_CONFIG)
+ self.indexer = FossologyLicensePartitionIndexer(config=RANGE_CONFIG)
self.indexer.catch_exceptions = False
fill_storage(self.indexer.storage)
fill_obj_storage(self.indexer.objstorage)
- self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5"
- self.id1 = "02fb2c89e14f7fab46701478c83779c7beb7b069"
- self.id2 = "103bc087db1d26afc3a0283f38663d081e9b01e6"
- tool_id = self.indexer.tool["id"]
- self.expected_results = {
- self.id0: {
- "id": self.id0,
- "indexer_configuration_id": tool_id,
- "licenses": SHA1_TO_LICENSES[self.id0],
- },
- self.id1: {
- "id": self.id1,
- "indexer_configuration_id": tool_id,
- "licenses": SHA1_TO_LICENSES[self.id1],
- },
- self.id2: {
- "id": self.id2,
- "indexer_configuration_id": tool_id,
- "licenses": SHA1_TO_LICENSES[self.id2],
- },
- }
-
def tearDown(self):
super().tearDown()
fossology_license.compute_license = self.orig_compute_license
@@ -167,4 +145,4 @@
def test_fossology_range_w_no_tool():
with pytest.raises(ValueError):
- FossologyLicenseRangeIndexer(config=filter_dict(RANGE_CONFIG, "tools"))
+ FossologyLicensePartitionIndexer(config=filter_dict(RANGE_CONFIG, "tools"))
diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py
--- a/swh/indexer/tests/test_mimetype.py
+++ b/swh/indexer/tests/test_mimetype.py
@@ -10,13 +10,13 @@
from swh.indexer.mimetype import (
MimetypeIndexer,
- MimetypeRangeIndexer,
+ MimetypePartitionIndexer,
compute_mimetype_encoding,
)
from swh.indexer.tests.utils import (
CommonContentIndexerTest,
- CommonContentIndexerRangeTest,
+ CommonContentIndexerPartitionTest,
BASE_TEST_CONFIG,
fill_storage,
fill_obj_storage,
@@ -96,7 +96,9 @@
RANGE_CONFIG = dict(list(CONFIG.items()) + [("write_batch_size", 100)])
-class TestMimetypeRangeIndexer(CommonContentIndexerRangeTest, unittest.TestCase):
+class TestMimetypePartitionIndexer(
+ CommonContentIndexerPartitionTest, unittest.TestCase
+):
"""Range Mimetype Indexer tests.
- new data within range are indexed
@@ -108,37 +110,11 @@
def setUp(self):
super().setUp()
- self.indexer = MimetypeRangeIndexer(config=RANGE_CONFIG)
+ self.indexer = MimetypePartitionIndexer(config=RANGE_CONFIG)
self.indexer.catch_exceptions = False
fill_storage(self.indexer.storage)
fill_obj_storage(self.indexer.objstorage)
- self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5"
- self.id1 = "02fb2c89e14f7fab46701478c83779c7beb7b069"
- self.id2 = "103bc087db1d26afc3a0283f38663d081e9b01e6"
- tool_id = self.indexer.tool["id"]
-
- self.expected_results = {
- self.id0: {
- "encoding": "us-ascii",
- "id": self.id0,
- "indexer_configuration_id": tool_id,
- "mimetype": "text/plain",
- },
- self.id1: {
- "encoding": "us-ascii",
- "id": self.id1,
- "indexer_configuration_id": tool_id,
- "mimetype": "text/x-python",
- },
- self.id2: {
- "encoding": "us-ascii",
- "id": self.id2,
- "indexer_configuration_id": tool_id,
- "mimetype": "text/plain",
- },
- }
-
def test_mimetype_w_no_tool():
with pytest.raises(ValueError):
@@ -147,4 +123,4 @@
def test_mimetype_range_w_no_tool():
with pytest.raises(ValueError):
- MimetypeRangeIndexer(config=filter_dict(CONFIG, "tools"))
+ MimetypePartitionIndexer(config=filter_dict(CONFIG, "tools"))
diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py
--- a/swh/indexer/tests/utils.py
+++ b/swh/indexer/tests/utils.py
@@ -10,8 +10,9 @@
from hypothesis import strategies
+from swh.core.api.classes import stream_results
from swh.model import hashutil
-from swh.model.hashutil import hash_to_bytes, hash_to_hex
+from swh.model.hashutil import hash_to_bytes
from swh.model.model import (
Content,
Directory,
@@ -22,13 +23,14 @@
Person,
Revision,
RevisionType,
+ SHA1_SIZE,
Snapshot,
SnapshotBranch,
TargetType,
Timestamp,
TimestampWithTimezone,
)
-from swh.storage.utils import now
+from swh.storage.utils import now, get_partition_bounds_bytes
from swh.indexer.storage import INDEXER_CFG_KEY
@@ -677,7 +679,7 @@
self.assert_results_ok(sha1s, expected_results)
-class CommonContentIndexerRangeTest:
+class CommonContentIndexerPartitionTest:
"""Allows to factorize tests on range indexer.
"""
@@ -685,90 +687,84 @@
def setUp(self):
self.contents = sorted(OBJ_STORAGE_DATA)
- def assert_results_ok(self, start, end, actual_results, expected_results=None):
- if expected_results is None:
- expected_results = self.expected_results
+ def assert_results_ok(self, partition_id, nb_partitions, actual_results):
+ expected_ids = [
+ c.sha1
+ for c in stream_results(
+ self.indexer.storage.content_get_partition,
+ partition_id=partition_id,
+ nb_partitions=nb_partitions,
+ )
+ ]
+
+ start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE)
actual_results = list(actual_results)
for indexed_data in actual_results:
_id = indexed_data["id"]
assert isinstance(_id, bytes)
- indexed_data = indexed_data.copy()
- indexed_data["id"] = hash_to_hex(indexed_data["id"])
- self.assertEqual(indexed_data, expected_results[hash_to_hex(_id)])
- self.assertTrue(start <= _id <= end)
+ assert _id in expected_ids
+
+ assert start <= _id
+ if end:
+ assert _id <= end
+
_tool_id = indexed_data["indexer_configuration_id"]
- self.assertEqual(_tool_id, self.indexer.tool["id"])
+ assert _tool_id == self.indexer.tool["id"]
def test__index_contents(self):
"""Indexing contents without existing data results in indexed data
"""
- _start, _end = [self.contents[0], self.contents[2]] # output hex ids
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
- # given
- actual_results = list(self.indexer._index_contents(start, end, indexed={}))
+ partition_id = 0
+ nb_partitions = 4
+
+ actual_results = list(
+ self.indexer._index_contents(partition_id, nb_partitions, indexed={})
+ )
- self.assert_results_ok(start, end, actual_results)
+ self.assert_results_ok(partition_id, nb_partitions, actual_results)
def test__index_contents_with_indexed_data(self):
"""Indexing contents with existing data results in less indexed data
"""
- _start, _end = [self.contents[0], self.contents[2]] # output hex ids
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
- data_indexed = [self.id0, self.id2]
+ partition_id = 3
+ nb_partitions = 4
- # given
- actual_results = self.indexer._index_contents(
- start, end, indexed=set(map(hash_to_bytes, data_indexed))
+ # first pass
+ actual_results = list(
+ self.indexer._index_contents(partition_id, nb_partitions, indexed={})
)
- # craft the expected results
- expected_results = self.expected_results.copy()
- for already_indexed_key in data_indexed:
- expected_results.pop(already_indexed_key)
+ self.assert_results_ok(partition_id, nb_partitions, actual_results)
- self.assert_results_ok(start, end, actual_results, expected_results)
-
- def test_generate_content_get(self):
- """Optimal indexing should result in indexed data
+ indexed_ids = set(res["id"] for res in actual_results)
- """
- _start, _end = [self.contents[0], self.contents[2]] # output hex ids
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
-
- # given
- actual_results = self.indexer.run(start, end)
+ actual_results = list(
+ self.indexer._index_contents(
+ partition_id, nb_partitions, indexed=indexed_ids
+ )
+ )
- # then
- self.assertEqual(actual_results, {"status": "uneventful"})
+ # already indexed, so nothing new
+ assert actual_results == []
- def test_generate_content_get_input_as_bytes(self):
+ def test_generate_content_get(self):
"""Optimal indexing should result in indexed data
- Input are in bytes here.
-
"""
- _start, _end = [self.contents[0], self.contents[2]] # output hex ids
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
+ partition_id = 0
+ nb_partitions = 4
- # given
- actual_results = self.indexer.run(start, end, skip_existing=False)
- # no already indexed data so same result as prior test
+ actual_results = self.indexer.run(
+ partition_id, nb_partitions, skip_existing=False
+ )
- # then
- self.assertEqual(actual_results, {"status": "uneventful"})
+ assert actual_results == {"status": "uneventful"} # why?
def test_generate_content_get_no_result(self):
"""No result indexed returns False"""
- _start, _end = [
- "0000000000000000000000000000000000000000",
- "0000000000000000000000000000000000000001",
- ]
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
- # given
- actual_results = self.indexer.run(start, end, incremental=False)
+ actual_results = self.indexer.run(0, 0, incremental=False)
- # then
- self.assertEqual(actual_results, {"status": "uneventful"})
+ assert actual_results == {"status": "uneventful"}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 4:01 PM (18 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215900
Attached To
D3718: "text"-indexers: Migrate to partition index instead of range
Event Timeline
Log In to Comment