Page MenuHomeSoftware Heritage

D3718.id13108.diff
No OneTemporary

D3718.id13108.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,6 +1,6 @@
-swh.core[db,http] >= 0.0.87
+swh.core[db,http] >= 0.2.2
swh.model >= 0.0.15
swh.objstorage >= 0.0.43
swh.scheduler >= 0.0.47
-swh.storage >= 0.8.0
+swh.storage >= 0.12.0
swh.journal >= 0.1.0
diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py
--- a/swh/indexer/fossology_license.py
+++ b/swh/indexer/fossology_license.py
@@ -6,11 +6,12 @@
import logging
import subprocess
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Union
from swh.model import hashutil
-from .indexer import ContentIndexer, ContentRangeIndexer, write_to_temp
+from .indexer import ContentIndexer, ContentPartitionIndexer, write_to_temp
+from swh.indexer.storage.interface import PagedResult, Sha1
logger = logging.getLogger(__name__)
@@ -56,7 +57,7 @@
"""Mixin fossology license indexer.
See :class:`FossologyLicenseIndexer` and
- :class:`FossologyLicenseRangeIndexer`
+ :class:`FossologyLicensePartitionIndexer`
"""
@@ -82,7 +83,7 @@
self.working_directory = self.config["workdir"]
def index(
- self, id: bytes, data: Optional[bytes] = None, **kwargs
+ self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs
) -> Dict[str, Any]:
"""Index sha1s' content and store result.
@@ -153,33 +154,39 @@
)
-class FossologyLicenseRangeIndexer(MixinFossologyLicenseIndexer, ContentRangeIndexer):
- """FossologyLicense Range Indexer working on range of content identifiers.
+class FossologyLicensePartitionIndexer(
+ MixinFossologyLicenseIndexer, ContentPartitionIndexer
+):
+ """FossologyLicense Range Indexer working on range/partition of content identifiers.
- filters out the non textual content
- (optionally) filters out content already indexed (cf
- :meth:`.indexed_contents_in_range`)
+ :meth:`.indexed_contents_in_partition`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
- def indexed_contents_in_range(self, start, end):
- """Retrieve indexed content id within range [start, end].
+ def indexed_contents_in_partition(
+ self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None
+ ) -> PagedResult[Sha1]:
+ """Retrieve indexed content id within the partition id
Args:
- start (bytes): Starting bound from range identifier
- end (bytes): End range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
+ page_token: opaque token used for pagination
Returns:
- dict: a dict with keys:
-
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
+ PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
- return self.idx_storage.content_fossology_license_get_range(
- start, end, self.tool["id"]
+ return self.idx_storage.content_fossology_license_get_partition(
+ self.tool["id"], partition_id, nb_partitions, page_token=page_token
)
+
+
+# alias for retrocompatibility
+FossologyLicenseRangeIndexer = FossologyLicensePartitionIndexer
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -18,7 +18,7 @@
from swh.core.config import SWHConfig
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
-from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY
+from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY, PagedResult, Sha1
from swh.model import hashutil
from swh.core import utils
@@ -233,12 +233,12 @@
return []
def index(
- self, id: bytes, data: Optional[bytes] = None, **kwargs
+ self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs
) -> Dict[str, Any]:
"""Index computation for the id and associated raw data.
Args:
- id: identifier
+ id: identifier or Dict object
data: id's data from storage or objstorage depending on
object type
@@ -282,7 +282,7 @@
class ContentIndexer(BaseIndexer):
"""A content indexer working on a list of ids directly.
- To work on indexer range, use the :class:`ContentRangeIndexer`
+ To work on indexer partition, use the :class:`ContentPartitionIndexer`
instead.
Note: :class:`ContentIndexer` is not an instantiable object. To
@@ -343,64 +343,76 @@
return summary
-class ContentRangeIndexer(BaseIndexer):
+class ContentPartitionIndexer(BaseIndexer):
"""A content range indexer.
- This expects as input a range of ids to index.
+ This expects as input a partition_id and a nb_partitions. This will then index the
+ contents within that partition.
To work on a list of ids, use the :class:`ContentIndexer` instead.
- Note: :class:`ContentRangeIndexer` is not an instantiable
+ Note: :class:`ContentPartitionIndexer` is not an instantiable
object. To use it, one should inherit from this class and override
the methods mentioned in the :class:`BaseIndexer` class.
"""
@abc.abstractmethod
- def indexed_contents_in_range(self, start: bytes, end: bytes) -> Any:
+ def indexed_contents_in_partition(
+ self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None
+ ) -> PagedResult[Sha1]:
"""Retrieve indexed contents within range [start, end].
Args:
- start: Starting bound from range identifier
- end: End range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
+ page_token: opaque token used for pagination
- Yields:
- bytes: Content identifier present in the range ``[start, end]``
+ Returns:
+ PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
pass
def _list_contents_to_index(
- self, start: bytes, end: bytes, indexed: Set[bytes]
- ) -> Iterator[bytes]:
- """Compute from storage the new contents to index in the range [start,
- end]. The already indexed contents are skipped.
+ self, partition_id: int, nb_partitions: int, indexed: Set[Sha1]
+ ) -> Iterator[Sha1]:
+ """Compute from storage the new contents to index in the partition_id . The already
+ indexed contents are skipped.
Args:
- start: Starting bound from range identifier
- end: End range identifier
+ partition_id: Index of the partition to fetch data from
+ nb_partitions: Total number of partition
indexed: Set of content already indexed.
Yields:
- bytes: Identifier of contents to index.
+ Sha1 id (bytes) of contents to index
"""
- if not isinstance(start, bytes) or not isinstance(end, bytes):
- raise TypeError("identifiers must be bytes, not %r and %r." % (start, end))
- while start:
- result = self.storage.content_get_range(start, end)
- contents = result["contents"]
+ if not isinstance(partition_id, int) or not isinstance(nb_partitions, int):
+ raise TypeError(
+ f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}."
+ )
+ next_page_token = None
+ while True:
+ result = self.storage.content_get_partition(
+ partition_id, nb_partitions, page_token=next_page_token
+ )
+ contents = result.results
for c in contents:
- _id = hashutil.hash_to_bytes(c["sha1"])
+ _id = hashutil.hash_to_bytes(c.sha1)
if _id in indexed:
continue
yield _id
- start = result["next"]
+ next_page_token = result.next_page_token
+ if next_page_token is None:
+ break
def _index_contents(
- self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any
+ self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any
) -> Iterator[Dict]:
- """Index the contents from within range [start, end]
+ """Index the contents within the partition_id.
Args:
start: Starting bound from range identifier
@@ -408,16 +420,14 @@
indexed: Set of content already indexed.
Yields:
- dict: Data indexed to persist using the indexer storage
+ indexing result as dict to persist in the indexer backend
"""
- for sha1 in self._list_contents_to_index(start, end, indexed):
+ for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed):
try:
raw_content = self.objstorage.get(sha1)
except ObjNotFoundError:
- self.log.warning(
- "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1)
- )
+ self.log.warning(f"Content {sha1.hex()} not found in objstorage")
continue
res = self.index(sha1, raw_content, **kwargs)
if res:
@@ -429,62 +439,64 @@
yield res
def _index_with_skipping_already_done(
- self, start: bytes, end: bytes
+ self, partition_id: int, nb_partitions: int
) -> Iterator[Dict]:
- """Index not already indexed contents in range [start, end].
+ """Index not already indexed contents within the partition partition_id
Args:
- start: Starting range identifier
- end: Ending range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
Yields:
- dict: Content identifier present in the range
- ``[start, end]`` which are not already indexed.
+ indexing result as dict to persist in the indexer backend
"""
- while start:
- indexed_page = self.indexed_contents_in_range(start, end)
- contents = indexed_page["ids"]
- _end = contents[-1] if contents else end
- yield from self._index_contents(start, _end, contents)
- start = indexed_page["next"]
+ next_page_token = None
+ contents = set()
+ while True:
+ indexed_page = self.indexed_contents_in_partition(
+ partition_id, nb_partitions, page_token=next_page_token
+ )
+ for sha1 in indexed_page.results:
+ contents.add(sha1)
+ yield from self._index_contents(partition_id, nb_partitions, contents)
+ next_page_token = indexed_page.next_page_token
+ if next_page_token is None:
+ break
def run(
self,
- start: Union[bytes, str],
- end: Union[bytes, str],
+ partition_id: int,
+ nb_partitions: int,
skip_existing: bool = True,
- **kwargs
+ **kwargs,
) -> Dict:
- """Given a range of content ids, compute the indexing computations on
- the contents within. Either the indexer is incremental
- (filter out existing computed data) or not (compute
- everything from scratch).
+ """Given a partition of content ids, index the contents within.
+
+ Either the indexer is incremental (filter out existing computed data) or it
+ computes everything from scratch.
Args:
- start: Starting range identifier
- end: Ending range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
skip_existing: Skip existing indexed data
- (default) or not
+ (default) or not
**kwargs: passed to the `index` method
Returns:
- A dict with the task's status
+ dict with the indexing task status
"""
status = "uneventful"
summary: Dict[str, Any] = {}
count = 0
try:
- range_start = (
- hashutil.hash_to_bytes(start) if isinstance(start, str) else start
- )
- range_end = hashutil.hash_to_bytes(end) if isinstance(end, str) else end
-
if skip_existing:
- gen = self._index_with_skipping_already_done(range_start, range_end)
+ gen = self._index_with_skipping_already_done(
+ partition_id, nb_partitions
+ )
else:
- gen = self._index_contents(range_start, range_end, indexed=set([]))
+ gen = self._index_contents(partition_id, nb_partitions, indexed=set([]))
count_object_added_key: Optional[str] = None
@@ -509,6 +521,10 @@
return summary
+# alias for retrocompatibility
+ContentRangeIndexer = ContentPartitionIndexer
+
+
class OriginIndexer(BaseIndexer):
"""An object type indexer, inherits from the :class:`BaseIndexer` and
implements Origin indexing using the run method
@@ -590,11 +606,11 @@
summary: Dict[str, Any] = {}
status = "uneventful"
results = []
- revs = self.storage.revision_get(
- hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids
- )
- for rev in revs:
+ revision_ids = [
+ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids
+ ]
+ for rev in self.storage.revision_get(revision_ids):
if not rev:
self.log.warning(
"Revisions %s not found in storage"
diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py
--- a/swh/indexer/mimetype.py
+++ b/swh/indexer/mimetype.py
@@ -3,10 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Optional, Dict, Any, List
import magic
-from .indexer import ContentIndexer, ContentRangeIndexer
+from typing import Any, Optional, Dict, List, Union
+
+from swh.indexer.storage.interface import PagedResult, Sha1
+
+from .indexer import ContentIndexer, ContentPartitionIndexer
if not hasattr(magic.Magic, "from_buffer"):
raise ImportError(
@@ -40,7 +43,7 @@
class MixinMimetypeIndexer:
"""Mixin mimetype indexer.
- See :class:`MimetypeIndexer` and :class:`MimetypeRangeIndexer`
+ See :class:`MimetypeIndexer` and :class:`MimetypePartitionIndexer`
"""
@@ -61,7 +64,7 @@
CONFIG_BASE_FILENAME = "indexer/mimetype" # type: Optional[str]
def index(
- self, id: bytes, data: Optional[bytes] = None, **kwargs
+ self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs
) -> Dict[str, Any]:
"""Index sha1s' content and store result.
@@ -79,6 +82,7 @@
"""
assert data is not None
properties = compute_mimetype_encoding(data)
+ assert isinstance(id, bytes)
properties.update(
{"id": id, "indexer_configuration_id": self.tool["id"],}
)
@@ -124,34 +128,38 @@
)
-class MimetypeRangeIndexer(MixinMimetypeIndexer, ContentRangeIndexer):
+class MimetypePartitionIndexer(MixinMimetypeIndexer, ContentPartitionIndexer):
"""Mimetype Range Indexer working on range of content identifiers.
It:
- (optionally) filters out content already indexed (cf
- :meth:`.indexed_contents_in_range`)
+ :meth:`.indexed_contents_in_partition`)
- reads content from objstorage per the content's id (sha1)
- computes {mimetype, encoding} from that content
- stores result in storage
"""
- def indexed_contents_in_range(
- self, start: bytes, end: bytes
- ) -> Dict[str, Optional[bytes]]:
- """Retrieve indexed content id within range [start, end].
+ def indexed_contents_in_partition(
+ self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None,
+ ) -> PagedResult[Sha1]:
+ """Retrieve indexed content ids within partition_id.
Args:
- start: Starting bound from range identifier
- end: End range identifier
+ partition_id: Index of the partition to fetch
+ nb_partitions: Total number of partitions to split into
+ page_token: opaque token used for pagination
Returns:
- dict: a dict with keys:
-
- - ids: iterable of content ids within the range.
- - next: The next range of sha1 starts at
- this sha1 if any
+ PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
- return self.idx_storage.content_mimetype_get_range(start, end, self.tool["id"])
+ return self.idx_storage.content_mimetype_get_partition(
+ self.tool["id"], partition_id, nb_partitions, page_token=page_token
+ )
+
+
+# alias for retrocompatibility
+MimetypeRangeIndexer = MimetypePartitionIndexer
diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py
--- a/swh/indexer/rehash.py
+++ b/swh/indexer/rehash.py
@@ -173,11 +173,11 @@
groups: Dict[str, List[Any]] = defaultdict(list)
for content, keys_to_update in data:
- keys = ",".join(keys_to_update)
- groups[keys].append(content)
+ keys_str = ",".join(keys_to_update)
+ groups[keys_str].append(content)
for keys_to_update, contents in groups.items():
- keys = keys_to_update.split(",")
+ keys: List[str] = keys_to_update.split(",")
try:
self.storage.content_update(contents, keys=keys)
count += len(contents)
diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py
--- a/swh/indexer/storage/__init__.py
+++ b/swh/indexer/storage/__init__.py
@@ -9,11 +9,16 @@
import psycopg2.pool
from collections import defaultdict, Counter
-from typing import Dict, List
+from typing import Dict, List, Optional
+from swh.model.hashutil import hash_to_bytes, hash_to_hex
+from swh.model.model import SHA1_SIZE
from swh.storage.common import db_transaction_generator, db_transaction
from swh.storage.exc import StorageDBError
+from swh.storage.utils import get_partition_bounds_bytes
+
+from .interface import PagedResult, Sha1
from . import converters
from .db import Db
from .exc import IndexerStorageArgumentException, DuplicateId
@@ -135,30 +140,60 @@
for obj in db.content_mimetype_missing_from_list(mimetypes, cur):
yield obj[0]
- def _content_get_range(
+ @timed
+ @db_transaction()
+ def get_partition(
self,
- content_type,
- start,
- end,
- indexer_configuration_id,
- limit=1000,
+ indexer_type: str,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
with_textual_data=False,
db=None,
cur=None,
- ):
+ ) -> PagedResult[Sha1]:
+ """Retrieve ids of content with `indexer_type` within within partition partition_id
+ bound by limit.
+
+ Args:
+ **indexer_type**: Type of data content to index (mimetype, language, etc...)
+ **indexer_configuration_id**: The tool used to index data
+ **partition_id**: index of the partition to fetch
+ **nb_partitions**: total number of partitions to split into
+ **page_token**: opaque token used for pagination
+ **limit**: Limit result (default to 1000)
+ **with_textual_data** (bool): Deal with only textual content (True) or all
+ content (all contents by defaults, False)
+
+ Raises:
+ IndexerStorageArgumentException for;
+ - limit to None
+ - wrong indexer_type provided
+
+ Returns:
+ PagedResult of Sha1. If next_page_token is None, there is no more data to
+ fetch
+
+ """
if limit is None:
raise IndexerStorageArgumentException("limit should not be None")
- if content_type not in db.content_indexer_names:
- err = "Wrong type. Should be one of [%s]" % (
- ",".join(db.content_indexer_names)
- )
+ if indexer_type not in db.content_indexer_names:
+ err = f"Wrong type. Should be one of [{','.join(db.content_indexer_names)}]"
raise IndexerStorageArgumentException(err)
- ids = []
- next_id = None
- for counter, obj in enumerate(
- db.content_get_range(
- content_type,
+ start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE)
+ if page_token is not None:
+ start = hash_to_bytes(page_token)
+ if end is None:
+ end = b"\xff" * SHA1_SIZE
+
+ next_page_token: Optional[str] = None
+ ids = [
+ row[0]
+ for row in db.content_get_range(
+ indexer_type,
start,
end,
indexer_configuration_id,
@@ -166,26 +201,33 @@
with_textual_data=with_textual_data,
cur=cur,
)
- ):
- _id = obj[0]
- if counter >= limit:
- next_id = _id
- break
+ ]
- ids.append(_id)
+ if len(ids) >= limit:
+ next_page_token = hash_to_hex(ids[-1])
+ ids = ids[:limit]
- return {"ids": ids, "next": next_id}
+ assert len(ids) <= limit
+ return PagedResult(results=ids, next_page_token=next_page_token)
@timed
@db_transaction()
- def content_mimetype_get_range(
- self, start, end, indexer_configuration_id, limit=1000, db=None, cur=None
- ):
- return self._content_get_range(
+ def content_mimetype_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ db=None,
+ cur=None,
+ ) -> PagedResult[Sha1]:
+ return self.get_partition(
"mimetype",
- start,
- end,
indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ page_token=page_token,
limit=limit,
db=db,
cur=cur,
@@ -349,14 +391,22 @@
@timed
@db_transaction()
- def content_fossology_license_get_range(
- self, start, end, indexer_configuration_id, limit=1000, db=None, cur=None
- ):
- return self._content_get_range(
+ def content_fossology_license_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ db=None,
+ cur=None,
+ ) -> PagedResult[Sha1]:
+ return self.get_partition(
"fossology_license",
- start,
- end,
indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ page_token=page_token,
limit=limit,
with_textual_data=True,
db=db,
diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py
--- a/swh/indexer/storage/in_memory.py
+++ b/swh/indexer/storage/in_memory.py
@@ -3,18 +3,25 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import bisect
-from collections import defaultdict, Counter
import itertools
import json
import operator
import math
import re
-from typing import Any, Dict, List
+
+from collections import defaultdict, Counter
+from typing import Any, Dict, List, Optional
+
+from swh.model.model import SHA1_SIZE
+from swh.model.hashutil import hash_to_hex, hash_to_bytes
+from swh.storage.utils import get_partition_bounds_bytes
+from swh.storage.in_memory import SortedList
from . import MAPPING_NAMES, check_id_duplicates
from .exc import IndexerStorageArgumentException
+from .interface import PagedResult, Sha1
+
SHA1_DIGEST_SIZE = 160
@@ -38,7 +45,7 @@
def __init__(self, tools):
self._tools = tools
- self._sorted_ids = []
+ self._sorted_ids = SortedList[bytes, bytes]()
self._data = {} # map (id_, tool_id) -> metadata_dict
self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id]
@@ -88,41 +95,61 @@
def get_all(self):
yield from self.get(self._sorted_ids)
- def get_range(self, start, end, indexer_configuration_id, limit):
- """Retrieve data within range [start, end] bound by limit.
+ def get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ """Retrieve ids of content with `indexer_type` within partition partition_id
+ bound by limit.
Args:
- **start** (bytes): Starting identifier range (expected smaller
- than end)
- **end** (bytes): Ending identifier range (expected larger
- than start)
- **indexer_configuration_id** (int): The tool used to index data
- **limit** (int): Limit result
+ **indexer_type**: Type of data content to index (mimetype, language, etc...)
+ **indexer_configuration_id**: The tool used to index data
+ **partition_id**: index of the partition to fetch
+ **nb_partitions**: total number of partitions to split into
+ **page_token**: opaque token used for pagination
+ **limit**: Limit result (default to 1000)
+ **with_textual_data** (bool): Deal with only textual content (True) or all
+ content (all contents by defaults, False)
Raises:
- IndexerStorageArgumentException for limit to None
+ IndexerStorageArgumentException for;
+ - limit to None
+ - wrong indexer_type provided
Returns:
- a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
+ PagedResult of Sha1. If next_page_token is None, there is no more data to
+ fetch
"""
if limit is None:
raise IndexerStorageArgumentException("limit should not be None")
- from_index = bisect.bisect_left(self._sorted_ids, start)
- to_index = bisect.bisect_right(self._sorted_ids, end, lo=from_index)
- if to_index - from_index >= limit:
- return {
- "ids": self._sorted_ids[from_index : from_index + limit],
- "next": self._sorted_ids[from_index + limit],
- }
- else:
- return {
- "ids": self._sorted_ids[from_index:to_index],
- "next": None,
- }
+ (start, end) = get_partition_bounds_bytes(
+ partition_id, nb_partitions, SHA1_SIZE
+ )
+
+ if page_token:
+ start = hash_to_bytes(page_token)
+ if end is None:
+ end = b"\xff" * SHA1_SIZE
+
+ next_page_token: Optional[str] = None
+ ids: List[Sha1] = []
+ sha1s = (sha1 for sha1 in self._sorted_ids.iter_from(start))
+ for counter, sha1 in enumerate(sha1s):
+ if sha1 > end:
+ break
+ if counter >= limit:
+ next_page_token = hash_to_hex(sha1)
+ break
+ ids.append(sha1)
+
+ assert len(ids) <= limit
+ return PagedResult(results=ids, next_page_token=next_page_token)
def add(self, data: List[Dict], conflict_update: bool) -> int:
"""Add data not present in storage.
@@ -155,7 +182,7 @@
self._tools_per_id[id_].add(tool_id)
count += 1
if id_ not in self._sorted_ids:
- bisect.insort(self._sorted_ids, id_)
+ self._sorted_ids.add(id_)
return count
def add_merge(
@@ -190,7 +217,7 @@
conflict_update=True,
)
if id_ not in self._sorted_ids:
- bisect.insort(self._sorted_ids, id_)
+ self._sorted_ids.add(id_)
return added
def delete(self, entries: List[Dict]) -> int:
@@ -228,10 +255,17 @@
def content_mimetype_missing(self, mimetypes):
yield from self._mimetypes.missing(mimetypes)
- def content_mimetype_get_range(
- self, start, end, indexer_configuration_id, limit=1000
- ):
- return self._mimetypes.get_range(start, end, indexer_configuration_id, limit)
+ def content_mimetype_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ return self._mimetypes.get_partition(
+ indexer_configuration_id, partition_id, nb_partitions, page_token, limit
+ )
def content_mimetype_add(
self, mimetypes: List[Dict], conflict_update: bool = False
@@ -306,10 +340,17 @@
added = self._licenses.add_merge(licenses, conflict_update, "licenses")
return {"fossology_license_add:add": added}
- def content_fossology_license_get_range(
- self, start, end, indexer_configuration_id, limit=1000
- ):
- return self._licenses.get_range(start, end, indexer_configuration_id, limit)
+ def content_fossology_license_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ return self._licenses.get_partition(
+ indexer_configuration_id, partition_id, nb_partitions, page_token, limit
+ )
def content_metadata_missing(self, metadata):
yield from self._content_metadata.missing(metadata)
diff --git a/swh/indexer/storage/interface.py b/swh/indexer/storage/interface.py
--- a/swh/indexer/storage/interface.py
+++ b/swh/indexer/storage/interface.py
@@ -3,9 +3,17 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Dict, List
+from typing import Dict, List, Optional, TypeVar
from swh.core.api import remote_api_endpoint
+from swh.core.api.classes import PagedResult as CorePagedResult
+
+
+TResult = TypeVar("TResult")
+PagedResult = CorePagedResult[TResult, str]
+
+
+Sha1 = bytes
class IndexerStorageInterface:
@@ -31,67 +39,32 @@
"""
...
- def _content_get_range(
+ @remote_api_endpoint("content_mimetype/range")
+ def content_mimetype_get_partition(
self,
- content_type,
- start,
- end,
- indexer_configuration_id,
- limit=1000,
- with_textual_data=False,
- ):
- """Retrieve ids of type content_type within range [start, end] bound
- by limit.
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ """Retrieve mimetypes within partition partition_id bound by limit.
Args:
- **content_type** (str): content's type (mimetype, language, etc...)
- **start** (bytes): Starting identifier range (expected smaller
- than end)
- **end** (bytes): Ending identifier range (expected larger
- than start)
- **indexer_configuration_id** (int): The tool used to index data
- **limit** (int): Limit result (default to 1000)
- **with_textual_data** (bool): Deal with only textual
- content (True) or all
- content (all contents by
- defaults, False)
+ **indexer_configuration_id**: The tool used to index data
+ **partition_id**: index of the partition to fetch
+ **nb_partitions**: total number of partitions to split into
+ **page_token**: opaque token used for pagination
+ **limit**: Limit result (default to 1000)
Raises:
- ValueError for;
+ IndexerStorageArgumentException for;
- limit to None
- - wrong content_type provided
-
- Returns:
- a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
-
- """
- ...
-
- @remote_api_endpoint("content_mimetype/range")
- def content_mimetype_get_range(
- self, start, end, indexer_configuration_id, limit=1000
- ):
- """Retrieve mimetypes within range [start, end] bound by limit.
-
- Args:
- **start** (bytes): Starting identifier range (expected smaller
- than end)
- **end** (bytes): Ending identifier range (expected larger
- than start)
- **indexer_configuration_id** (int): The tool used to index data
- **limit** (int): Limit result (default to 1000)
-
- Raises:
- ValueError for limit to None
+ - wrong indexer_type provided
Returns:
- a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
+ PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
...
@@ -307,27 +280,30 @@
...
@remote_api_endpoint("content/fossology_license/range")
- def content_fossology_license_get_range(
- self, start, end, indexer_configuration_id, limit=1000
- ):
- """Retrieve licenses within range [start, end] bound by limit.
+ def content_fossology_license_get_partition(
+ self,
+ indexer_configuration_id: int,
+ partition_id: int,
+ nb_partitions: int,
+ page_token: Optional[str] = None,
+ limit: int = 1000,
+ ) -> PagedResult[Sha1]:
+ """Retrieve licenses within the partition partition_id bound by limit.
Args:
- **start** (bytes): Starting identifier range (expected smaller
- than end)
- **end** (bytes): Ending identifier range (expected larger
- than start)
- **indexer_configuration_id** (int): The tool used to index data
- **limit** (int): Limit result (default to 1000)
+ **indexer_configuration_id**: The tool used to index data
+ **partition_id**: index of the partition to fetch
+ **nb_partitions**: total number of partitions to split into
+ **page_token**: opaque token used for pagination
+ **limit**: Limit result (default to 1000)
Raises:
- ValueError for limit to None
+ IndexerStorageArgumentException for;
+ - limit to None
+ - wrong indexer_type provided
- Returns:
- a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
- this sha1 if any
+ Returns: PagedResult of Sha1. If next_page_token is None, there is no more data
+ to fetch
"""
...
diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py
--- a/swh/indexer/tasks.py
+++ b/swh/indexer/tasks.py
@@ -6,9 +6,9 @@
from celery import current_app as app
-from .mimetype import MimetypeIndexer, MimetypeRangeIndexer
+from .mimetype import MimetypeIndexer, MimetypePartitionIndexer
from .ctags import CtagsIndexer
-from .fossology_license import FossologyLicenseIndexer, FossologyLicenseRangeIndexer
+from .fossology_license import FossologyLicenseIndexer, FossologyLicensePartitionIndexer
from .rehash import RecomputeChecksums
from .metadata import OriginMetadataIndexer
@@ -40,9 +40,9 @@
@app.task(name=__name__ + ".ContentRangeMimetype")
def range_mimetype(*args, **kwargs):
- return MimetypeRangeIndexer().run(*args, **kwargs)
+ return MimetypePartitionIndexer().run(*args, **kwargs)
@app.task(name=__name__ + ".ContentRangeFossologyLicense")
def range_license(*args, **kwargs):
- return FossologyLicenseRangeIndexer().run(*args, **kwargs)
+ return FossologyLicensePartitionIndexer().run(*args, **kwargs)
diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py
--- a/swh/indexer/tests/storage/test_storage.py
+++ b/swh/indexer/tests/storage/test_storage.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import inspect
+import math
import threading
from typing import Dict
@@ -28,7 +29,7 @@
mimetypes.append(
{
"id": c["id"],
- "mimetype": "text/plain",
+ "mimetype": "text/plain", # for filtering on textual data to work
"encoding": "utf-8",
"indexer_configuration_id": c["indexer_configuration_id"],
}
@@ -363,85 +364,126 @@
{"mimetype": "text/html", "encoding": "us-ascii",},
]
- def test_generate_content_mimetype_get_range_limit_none(self, swh_indexer_storage):
- """mimetype_get_range call with wrong limit input should fail"""
+ def test_generate_content_mimetype_get_partition_failure(self, swh_indexer_storage):
+ """get_partition call with wrong limit input should fail"""
storage = swh_indexer_storage
- with pytest.raises(IndexerStorageArgumentException) as e:
- storage.content_mimetype_get_range(
- start=None, end=None, indexer_configuration_id=None, limit=None
+ indexer_configuration_id = None
+ with pytest.raises(
+ IndexerStorageArgumentException, match="limit should not be None"
+ ):
+ storage.content_mimetype_get_partition(
+ indexer_configuration_id, 0, 3, limit=None
)
- assert e.value.args == ("limit should not be None",)
-
- def test_generate_content_mimetype_get_range_no_limit(
+ def test_generate_content_mimetype_get_partition_no_limit(
self, swh_indexer_storage_with_data
):
- """mimetype_get_range returns mimetypes within range provided"""
+ """get_partition should return result"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
- # All ids from the db
- content_ids = sorted([c["id"] for c in mimetypes])
-
- start = content_ids[0]
- end = content_ids[-1]
+ expected_ids = set([c["id"] for c in mimetypes])
+ indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
- # retrieve mimetypes
- tool_id = mimetypes[0]["indexer_configuration_id"]
- actual_result = storage.content_mimetype_get_range(
- start, end, indexer_configuration_id=tool_id
- )
+ assert len(mimetypes) == 16
+ nb_partitions = 16
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ actual_ids = []
+ for partition_id in range(nb_partitions):
+ actual_result = storage.content_mimetype_get_partition(
+ indexer_configuration_id, partition_id, nb_partitions
+ )
+ assert actual_result.next_page_token is None
+ actual_ids.extend(actual_result.results)
- assert len(mimetypes) == len(actual_ids)
- assert actual_next is None
- assert content_ids == actual_ids
+ assert len(actual_ids) == len(expected_ids)
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
- def test_generate_content_mimetype_get_range_limit(
+ def test_generate_content_mimetype_get_partition_full(
self, swh_indexer_storage_with_data
):
- """mimetype_get_range paginates results if limit exceeded"""
- storage, data = swh_indexer_storage_with_data
+ """get_partition for a single partition should return available ids
- indexer_configuration_id = data.tools["file"]["id"]
-
- # input the list of sha1s we want from storage
- content_ids = sorted([c["id"] for c in data.mimetypes])
- mimetypes = list(storage.content_mimetype_get(content_ids))
- assert len(mimetypes) == len(data.mimetypes)
+ """
+ storage, data = swh_indexer_storage_with_data
+ mimetypes = data.mimetypes
+ expected_ids = set([c["id"] for c in mimetypes])
+ indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
- start = content_ids[0]
- end = content_ids[-1]
- # retrieve mimetypes limited to 10 results
- actual_result = storage.content_mimetype_get_range(
- start, end, indexer_configuration_id=indexer_configuration_id, limit=10
+ actual_result = storage.content_mimetype_get_partition(
+ indexer_configuration_id, 0, 1
)
+ assert actual_result.next_page_token is None
+ actual_ids = actual_result.results
+ assert len(actual_ids) == len(expected_ids)
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
- assert actual_result
- assert set(actual_result.keys()) == {"ids", "next"}
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ def test_generate_content_mimetype_get_partition_empty(
+ self, swh_indexer_storage_with_data
+ ):
+ """get_partition when at least one of the partitions is empty"""
+ storage, data = swh_indexer_storage_with_data
+ mimetypes = data.mimetypes
+ expected_ids = set([c["id"] for c in mimetypes])
+ indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
+
+ # nb_partitions = smallest power of 2 such that at least one of
+ # the partitions is empty
+ nb_mimetypes = len(mimetypes)
+ nb_partitions = 1 << math.floor(math.log2(nb_mimetypes) + 1)
+
+ seen_ids = []
+
+ for partition_id in range(nb_partitions):
+ actual_result = storage.content_mimetype_get_partition(
+ indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ limit=nb_mimetypes + 1,
+ )
- assert len(actual_ids) == 10
- assert actual_next is not None
- assert actual_next == content_ids[10]
+ for actual_id in actual_result.results:
+ seen_ids.append(actual_id)
- expected_mimetypes = content_ids[:10]
- assert expected_mimetypes == actual_ids
+ # Limit is higher than the max number of results
+ assert actual_result.next_page_token is None
- # retrieve next part
- actual_result = storage.content_mimetype_get_range(
- start=end, end=end, indexer_configuration_id=indexer_configuration_id
- )
- assert set(actual_result.keys()) == {"ids", "next"}
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ assert set(seen_ids) == expected_ids
- assert actual_next is None
- expected_mimetypes = [content_ids[-1]]
- assert expected_mimetypes == actual_ids
+ def test_generate_content_mimetype_get_partition_with_pagination(
+ self, swh_indexer_storage_with_data
+ ):
+ """get_partition should return ids provided with pagination
+
+ """
+ storage, data = swh_indexer_storage_with_data
+ mimetypes = data.mimetypes
+ expected_ids = set([c["id"] for c in mimetypes])
+ indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
+
+ nb_partitions = 4
+
+ actual_ids = []
+ for partition_id in range(nb_partitions):
+ next_page_token = None
+ while True:
+ actual_result = storage.content_mimetype_get_partition(
+ indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ limit=2,
+ page_token=next_page_token,
+ )
+ actual_ids.extend(actual_result.results)
+ next_page_token = actual_result.next_page_token
+ if next_page_token is None:
+ break
+
+ assert len(set(actual_ids)) == len(set(expected_ids))
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
class TestIndexerStorageContentLanguage(StorageETypeTester):
@@ -907,135 +949,161 @@
# license did not change as the v2 was dropped.
assert actual_licenses == [expected_license]
- def test_generate_content_fossology_license_get_range_limit_none(
+ def test_generate_content_fossology_license_get_partition_failure(
self, swh_indexer_storage_with_data
):
+ """get_partition call with wrong limit input should fail"""
storage, data = swh_indexer_storage_with_data
- """license_get_range call with wrong limit input should fail"""
- with pytest.raises(IndexerStorageArgumentException) as e:
- storage.content_fossology_license_get_range(
- start=None, end=None, indexer_configuration_id=None, limit=None
+ indexer_configuration_id = None
+ with pytest.raises(
+ IndexerStorageArgumentException, match="limit should not be None"
+ ):
+ storage.content_fossology_license_get_partition(
+ indexer_configuration_id, 0, 3, limit=None,
)
- assert e.value.args == ("limit should not be None",)
-
- def test_generate_content_fossology_license_get_range_no_limit(
+ def test_generate_content_fossology_license_get_partition_no_limit(
self, swh_indexer_storage_with_data
):
- """license_get_range returns licenses within range provided"""
+ """get_partition should return results"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from(fossology_licenses)
+ indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
- content_ids = sorted([c["id"] for c in fossology_licenses])
+ expected_ids = set([c["id"] for c in fossology_licenses])
- start = content_ids[0]
- end = content_ids[-1]
+ assert len(fossology_licenses) == 10
+ assert len(mimetypes) == 10
+ nb_partitions = 4
- # retrieve fossology_licenses
- tool_id = fossology_licenses[0]["indexer_configuration_id"]
- actual_result = storage.content_fossology_license_get_range(
- start, end, indexer_configuration_id=tool_id
- )
+ actual_ids = []
+ for partition_id in range(nb_partitions):
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ actual_result = storage.content_fossology_license_get_partition(
+ indexer_configuration_id, partition_id, nb_partitions
+ )
+ assert actual_result.next_page_token is None
+ actual_ids.extend(actual_result.results)
- assert len(fossology_licenses) == len(actual_ids)
- assert actual_next is None
- assert content_ids == actual_ids
+ assert len(set(actual_ids)) == len(expected_ids)
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
- def test_generate_content_fossology_license_get_range_no_limit_with_filter(
+ def test_generate_content_fossology_license_get_partition_full(
self, swh_indexer_storage_with_data
):
- """This filters non textual, then returns results within range"""
- storage, data = swh_indexer_storage_with_data
- fossology_licenses = data.fossology_licenses
- mimetypes = data.mimetypes
+ """get_partition for a single partition should return available ids
+ """
+ storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
- _mimetypes = prepare_mimetypes_from(fossology_licenses)
- # add binary mimetypes which will get filtered out in results
- for m in mimetypes:
- _mimetypes.append(
- {"mimetype": "binary", **m,}
- )
+ fossology_licenses = data.fossology_licenses
+ mimetypes = prepare_mimetypes_from(fossology_licenses)
+ indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
- storage.content_mimetype_add(_mimetypes, conflict_update=True)
+ storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
- content_ids = sorted([c["id"] for c in fossology_licenses])
+ expected_ids = set([c["id"] for c in fossology_licenses])
- start = content_ids[0]
- end = content_ids[-1]
-
- # retrieve fossology_licenses
- tool_id = fossology_licenses[0]["indexer_configuration_id"]
- actual_result = storage.content_fossology_license_get_range(
- start, end, indexer_configuration_id=tool_id
+ actual_result = storage.content_fossology_license_get_partition(
+ indexer_configuration_id, 0, 1
)
+ assert actual_result.next_page_token is None
+ actual_ids = actual_result.results
+ assert len(set(actual_ids)) == len(expected_ids)
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
-
- assert len(fossology_licenses) == len(actual_ids)
- assert actual_next is None
- assert content_ids == actual_ids
-
- def test_generate_fossology_license_get_range_limit(
+ def test_generate_content_fossology_license_get_partition_empty(
self, swh_indexer_storage_with_data
):
- """fossology_license_get_range paginates results if limit exceeded"""
+ """get_partition when at least one of the partitions is empty"""
storage, data = swh_indexer_storage_with_data
- fossology_licenses = data.fossology_licenses
-
# craft some consistent mimetypes
+ fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from(fossology_licenses)
+ indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
- # add fossology_licenses to storage
storage.content_mimetype_add(mimetypes, conflict_update=True)
+ # add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
- # input the list of sha1s we want from storage
- content_ids = sorted([c["id"] for c in fossology_licenses])
- start = content_ids[0]
- end = content_ids[-1]
+ # All ids from the db
+ expected_ids = set([c["id"] for c in fossology_licenses])
- # retrieve fossology_licenses limited to 3 results
- limited_results = len(fossology_licenses) - 1
- tool_id = fossology_licenses[0]["indexer_configuration_id"]
- actual_result = storage.content_fossology_license_get_range(
- start, end, indexer_configuration_id=tool_id, limit=limited_results
- )
+ # nb_partitions = smallest power of 2 such that at least one of
+ # the partitions is empty
+ nb_licenses = len(fossology_licenses)
+ nb_partitions = 1 << math.floor(math.log2(nb_licenses) + 1)
- actual_ids = actual_result["ids"]
- actual_next = actual_result["next"]
+ seen_ids = []
- assert limited_results == len(actual_ids)
- assert actual_next is not None
- assert actual_next == content_ids[-1]
+ for partition_id in range(nb_partitions):
+ actual_result = storage.content_fossology_license_get_partition(
+ indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ limit=nb_licenses + 1,
+ )
- expected_fossology_licenses = content_ids[:-1]
- assert expected_fossology_licenses == actual_ids
+ for actual_id in actual_result.results:
+ seen_ids.append(actual_id)
- # retrieve next part
- actual_results2 = storage.content_fossology_license_get_range(
- start=end, end=end, indexer_configuration_id=tool_id
- )
- actual_ids2 = actual_results2["ids"]
- actual_next2 = actual_results2["next"]
+ # Limit is higher than the max number of results
+ assert actual_result.next_page_token is None
- assert actual_next2 is None
- expected_fossology_licenses2 = [content_ids[-1]]
- assert expected_fossology_licenses2 == actual_ids2
+ assert set(seen_ids) == expected_ids
+
+ def test_generate_content_fossology_license_get_partition_with_pagination(
+ self, swh_indexer_storage_with_data
+ ):
+ """get_partition should return ids provided with paginationv
+
+ """
+ storage, data = swh_indexer_storage_with_data
+ # craft some consistent mimetypes
+ fossology_licenses = data.fossology_licenses
+ mimetypes = prepare_mimetypes_from(fossology_licenses)
+ indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
+
+ storage.content_mimetype_add(mimetypes, conflict_update=True)
+ # add fossology_licenses to storage
+ storage.content_fossology_license_add(fossology_licenses)
+
+ # All ids from the db
+ expected_ids = [c["id"] for c in fossology_licenses]
+
+ nb_partitions = 4
+
+ actual_ids = []
+ for partition_id in range(nb_partitions):
+ next_page_token = None
+ while True:
+ actual_result = storage.content_fossology_license_get_partition(
+ indexer_configuration_id,
+ partition_id,
+ nb_partitions,
+ limit=2,
+ page_token=next_page_token,
+ )
+ actual_ids.extend(actual_result.results)
+ next_page_token = actual_result.next_page_token
+ if next_page_token is None:
+ break
+
+ assert len(set(actual_ids)) == len(set(expected_ids))
+ for actual_id in actual_ids:
+ assert actual_id in expected_ids
class TestIndexerStorageOriginIntrinsicMetadata:
diff --git a/swh/indexer/tests/test_fossology_license.py b/swh/indexer/tests/test_fossology_license.py
--- a/swh/indexer/tests/test_fossology_license.py
+++ b/swh/indexer/tests/test_fossology_license.py
@@ -12,14 +12,14 @@
from swh.indexer import fossology_license
from swh.indexer.fossology_license import (
FossologyLicenseIndexer,
- FossologyLicenseRangeIndexer,
+ FossologyLicensePartitionIndexer,
compute_license,
)
from swh.indexer.tests.utils import (
SHA1_TO_LICENSES,
CommonContentIndexerTest,
- CommonContentIndexerRangeTest,
+ CommonContentIndexerPartitionTest,
BASE_TEST_CONFIG,
fill_storage,
fill_obj_storage,
@@ -109,8 +109,8 @@
fossology_license.compute_license = self.orig_compute_license
-class TestFossologyLicenseRangeIndexer(
- CommonContentIndexerRangeTest, unittest.TestCase
+class TestFossologyLicensePartitionIndexer(
+ CommonContentIndexerPartitionTest, unittest.TestCase
):
"""Range Fossology License Indexer tests.
@@ -128,33 +128,11 @@
self.orig_compute_license = fossology_license.compute_license
fossology_license.compute_license = mock_compute_license
- self.indexer = FossologyLicenseRangeIndexer(config=RANGE_CONFIG)
+ self.indexer = FossologyLicensePartitionIndexer(config=RANGE_CONFIG)
self.indexer.catch_exceptions = False
fill_storage(self.indexer.storage)
fill_obj_storage(self.indexer.objstorage)
- self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5"
- self.id1 = "02fb2c89e14f7fab46701478c83779c7beb7b069"
- self.id2 = "103bc087db1d26afc3a0283f38663d081e9b01e6"
- tool_id = self.indexer.tool["id"]
- self.expected_results = {
- self.id0: {
- "id": self.id0,
- "indexer_configuration_id": tool_id,
- "licenses": SHA1_TO_LICENSES[self.id0],
- },
- self.id1: {
- "id": self.id1,
- "indexer_configuration_id": tool_id,
- "licenses": SHA1_TO_LICENSES[self.id1],
- },
- self.id2: {
- "id": self.id2,
- "indexer_configuration_id": tool_id,
- "licenses": SHA1_TO_LICENSES[self.id2],
- },
- }
-
def tearDown(self):
super().tearDown()
fossology_license.compute_license = self.orig_compute_license
@@ -167,4 +145,4 @@
def test_fossology_range_w_no_tool():
with pytest.raises(ValueError):
- FossologyLicenseRangeIndexer(config=filter_dict(RANGE_CONFIG, "tools"))
+ FossologyLicensePartitionIndexer(config=filter_dict(RANGE_CONFIG, "tools"))
diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py
--- a/swh/indexer/tests/test_mimetype.py
+++ b/swh/indexer/tests/test_mimetype.py
@@ -10,13 +10,13 @@
from swh.indexer.mimetype import (
MimetypeIndexer,
- MimetypeRangeIndexer,
+ MimetypePartitionIndexer,
compute_mimetype_encoding,
)
from swh.indexer.tests.utils import (
CommonContentIndexerTest,
- CommonContentIndexerRangeTest,
+ CommonContentIndexerPartitionTest,
BASE_TEST_CONFIG,
fill_storage,
fill_obj_storage,
@@ -96,7 +96,9 @@
RANGE_CONFIG = dict(list(CONFIG.items()) + [("write_batch_size", 100)])
-class TestMimetypeRangeIndexer(CommonContentIndexerRangeTest, unittest.TestCase):
+class TestMimetypePartitionIndexer(
+ CommonContentIndexerPartitionTest, unittest.TestCase
+):
"""Range Mimetype Indexer tests.
- new data within range are indexed
@@ -108,37 +110,11 @@
def setUp(self):
super().setUp()
- self.indexer = MimetypeRangeIndexer(config=RANGE_CONFIG)
+ self.indexer = MimetypePartitionIndexer(config=RANGE_CONFIG)
self.indexer.catch_exceptions = False
fill_storage(self.indexer.storage)
fill_obj_storage(self.indexer.objstorage)
- self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5"
- self.id1 = "02fb2c89e14f7fab46701478c83779c7beb7b069"
- self.id2 = "103bc087db1d26afc3a0283f38663d081e9b01e6"
- tool_id = self.indexer.tool["id"]
-
- self.expected_results = {
- self.id0: {
- "encoding": "us-ascii",
- "id": self.id0,
- "indexer_configuration_id": tool_id,
- "mimetype": "text/plain",
- },
- self.id1: {
- "encoding": "us-ascii",
- "id": self.id1,
- "indexer_configuration_id": tool_id,
- "mimetype": "text/x-python",
- },
- self.id2: {
- "encoding": "us-ascii",
- "id": self.id2,
- "indexer_configuration_id": tool_id,
- "mimetype": "text/plain",
- },
- }
-
def test_mimetype_w_no_tool():
with pytest.raises(ValueError):
@@ -147,4 +123,4 @@
def test_mimetype_range_w_no_tool():
with pytest.raises(ValueError):
- MimetypeRangeIndexer(config=filter_dict(CONFIG, "tools"))
+ MimetypePartitionIndexer(config=filter_dict(CONFIG, "tools"))
diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py
--- a/swh/indexer/tests/utils.py
+++ b/swh/indexer/tests/utils.py
@@ -10,8 +10,9 @@
from hypothesis import strategies
+from swh.core.api.classes import stream_results
from swh.model import hashutil
-from swh.model.hashutil import hash_to_bytes, hash_to_hex
+from swh.model.hashutil import hash_to_bytes
from swh.model.model import (
Content,
Directory,
@@ -22,13 +23,14 @@
Person,
Revision,
RevisionType,
+ SHA1_SIZE,
Snapshot,
SnapshotBranch,
TargetType,
Timestamp,
TimestampWithTimezone,
)
-from swh.storage.utils import now
+from swh.storage.utils import now, get_partition_bounds_bytes
from swh.indexer.storage import INDEXER_CFG_KEY
@@ -677,7 +679,7 @@
self.assert_results_ok(sha1s, expected_results)
-class CommonContentIndexerRangeTest:
+class CommonContentIndexerPartitionTest:
"""Allows to factorize tests on range indexer.
"""
@@ -685,90 +687,84 @@
def setUp(self):
self.contents = sorted(OBJ_STORAGE_DATA)
- def assert_results_ok(self, start, end, actual_results, expected_results=None):
- if expected_results is None:
- expected_results = self.expected_results
+ def assert_results_ok(self, partition_id, nb_partitions, actual_results):
+ expected_ids = [
+ c.sha1
+ for c in stream_results(
+ self.indexer.storage.content_get_partition,
+ partition_id=partition_id,
+ nb_partitions=nb_partitions,
+ )
+ ]
+
+ start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE)
actual_results = list(actual_results)
for indexed_data in actual_results:
_id = indexed_data["id"]
assert isinstance(_id, bytes)
- indexed_data = indexed_data.copy()
- indexed_data["id"] = hash_to_hex(indexed_data["id"])
- self.assertEqual(indexed_data, expected_results[hash_to_hex(_id)])
- self.assertTrue(start <= _id <= end)
+ assert _id in expected_ids
+
+ assert start <= _id
+ if end:
+ assert _id <= end
+
_tool_id = indexed_data["indexer_configuration_id"]
- self.assertEqual(_tool_id, self.indexer.tool["id"])
+ assert _tool_id == self.indexer.tool["id"]
def test__index_contents(self):
"""Indexing contents without existing data results in indexed data
"""
- _start, _end = [self.contents[0], self.contents[2]] # output hex ids
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
- # given
- actual_results = list(self.indexer._index_contents(start, end, indexed={}))
+ partition_id = 0
+ nb_partitions = 4
+
+ actual_results = list(
+ self.indexer._index_contents(partition_id, nb_partitions, indexed={})
+ )
- self.assert_results_ok(start, end, actual_results)
+ self.assert_results_ok(partition_id, nb_partitions, actual_results)
def test__index_contents_with_indexed_data(self):
"""Indexing contents with existing data results in less indexed data
"""
- _start, _end = [self.contents[0], self.contents[2]] # output hex ids
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
- data_indexed = [self.id0, self.id2]
+ partition_id = 3
+ nb_partitions = 4
- # given
- actual_results = self.indexer._index_contents(
- start, end, indexed=set(map(hash_to_bytes, data_indexed))
+ # first pass
+ actual_results = list(
+ self.indexer._index_contents(partition_id, nb_partitions, indexed={})
)
- # craft the expected results
- expected_results = self.expected_results.copy()
- for already_indexed_key in data_indexed:
- expected_results.pop(already_indexed_key)
+ self.assert_results_ok(partition_id, nb_partitions, actual_results)
- self.assert_results_ok(start, end, actual_results, expected_results)
-
- def test_generate_content_get(self):
- """Optimal indexing should result in indexed data
+ indexed_ids = set(res["id"] for res in actual_results)
- """
- _start, _end = [self.contents[0], self.contents[2]] # output hex ids
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
-
- # given
- actual_results = self.indexer.run(start, end)
+ actual_results = list(
+ self.indexer._index_contents(
+ partition_id, nb_partitions, indexed=indexed_ids
+ )
+ )
- # then
- self.assertEqual(actual_results, {"status": "uneventful"})
+ # already indexed, so nothing new
+ assert actual_results == []
- def test_generate_content_get_input_as_bytes(self):
+ def test_generate_content_get(self):
"""Optimal indexing should result in indexed data
- Input are in bytes here.
-
"""
- _start, _end = [self.contents[0], self.contents[2]] # output hex ids
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
+ partition_id = 0
+ nb_partitions = 4
- # given
- actual_results = self.indexer.run(start, end, skip_existing=False)
- # no already indexed data so same result as prior test
+ actual_results = self.indexer.run(
+ partition_id, nb_partitions, skip_existing=False
+ )
- # then
- self.assertEqual(actual_results, {"status": "uneventful"})
+ assert actual_results == {"status": "uneventful"} # why?
def test_generate_content_get_no_result(self):
"""No result indexed returns False"""
- _start, _end = [
- "0000000000000000000000000000000000000000",
- "0000000000000000000000000000000000000001",
- ]
- start, end = map(hashutil.hash_to_bytes, (_start, _end))
- # given
- actual_results = self.indexer.run(start, end, incremental=False)
+ actual_results = self.indexer.run(0, 0, incremental=False)
- # then
- self.assertEqual(actual_results, {"status": "uneventful"})
+ assert actual_results == {"status": "uneventful"}

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 4:01 PM (18 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215900

Event Timeline