Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show All 12 Lines | |||||
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union | from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union | ||||
from swh.scheduler import CONFIG as SWH_CONFIG | from swh.scheduler import CONFIG as SWH_CONFIG | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY | from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY, PagedResult, Sha1 | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.core import utils | from swh.core import utils | ||||
@contextmanager | @contextmanager | ||||
def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: | def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: | ||||
"""Write the sha1's content in a temporary file. | """Write the sha1's content in a temporary file. | ||||
▲ Show 20 Lines • Show All 198 Lines • ▼ Show 20 Lines | ) -> List[Dict[str, Any]]: | ||||
raise ValueError("Configuration tool(s) must be a dict or list!") | raise ValueError("Configuration tool(s) must be a dict or list!") | ||||
if tools: | if tools: | ||||
return self.idx_storage.indexer_configuration_add(tools) | return self.idx_storage.indexer_configuration_add(tools) | ||||
else: | else: | ||||
return [] | return [] | ||||
def index( | def index( | ||||
self, id: bytes, data: Optional[bytes] = None, **kwargs | self, id: Union[bytes, Dict], data: Optional[bytes] = None, **kwargs | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
"""Index computation for the id and associated raw data. | """Index computation for the id and associated raw data. | ||||
Args: | Args: | ||||
id: identifier | id: identifier or Dict object | ||||
data: id's data from storage or objstorage depending on | data: id's data from storage or objstorage depending on | ||||
object type | object type | ||||
Returns: | Returns: | ||||
dict: a dict that makes sense for the | dict: a dict that makes sense for the | ||||
:meth:`.persist_index_computations` method. | :meth:`.persist_index_computations` method. | ||||
""" | """ | ||||
Show All 27 Lines | def persist_index_computations(self, results, policy_update) -> Dict[str, int]: | ||||
""" | """ | ||||
return {} | return {} | ||||
class ContentIndexer(BaseIndexer): | class ContentIndexer(BaseIndexer): | ||||
"""A content indexer working on a list of ids directly. | """A content indexer working on a list of ids directly. | ||||
To work on indexer range, use the :class:`ContentRangeIndexer` | To work on indexer partition, use the :class:`ContentPartitionIndexer` | ||||
instead. | instead. | ||||
ardumont: on indexer `partition` | |||||
Note: :class:`ContentIndexer` is not an instantiable object. To | Note: :class:`ContentIndexer` is not an instantiable object. To | ||||
use it, one should inherit from this class and override the | use it, one should inherit from this class and override the | ||||
methods mentioned in the :class:`BaseIndexer` class. | methods mentioned in the :class:`BaseIndexer` class. | ||||
""" | """ | ||||
def run( | def run( | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | ) -> Dict: | ||||
raise | raise | ||||
self.log.exception("Problem when reading contents metadata.") | self.log.exception("Problem when reading contents metadata.") | ||||
status = "failed" | status = "failed" | ||||
finally: | finally: | ||||
summary["status"] = status | summary["status"] = status | ||||
return summary | return summary | ||||
class ContentRangeIndexer(BaseIndexer): | class ContentPartitionIndexer(BaseIndexer): | ||||
"""A content range indexer. | """A content range indexer. | ||||
This expects as input a range of ids to index. | This expects as input a partition_id and a nb_partitions. This will then index the | ||||
contents within that partition. | |||||
To work on a list of ids, use the :class:`ContentIndexer` instead. | To work on a list of ids, use the :class:`ContentIndexer` instead. | ||||
Note: :class:`ContentRangeIndexer` is not an instantiable | Note: :class:`ContentPartitionIndexer` is not an instantiable | ||||
object. To use it, one should inherit from this class and override | object. To use it, one should inherit from this class and override | ||||
the methods mentioned in the :class:`BaseIndexer` class. | the methods mentioned in the :class:`BaseIndexer` class. | ||||
""" | """ | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def indexed_contents_in_range(self, start: bytes, end: bytes) -> Any: | def indexed_contents_in_partition( | ||||
self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None | |||||
) -> PagedResult[Sha1]: | |||||
"""Retrieve indexed contents within range [start, end]. | """Retrieve indexed contents within range [start, end]. | ||||
Args: | Args: | ||||
start: Starting bound from range identifier | partition_id: Index of the partition to fetch | ||||
end: End range identifier | nb_partitions: Total number of partitions to split into | ||||
page_token: opaque token used for pagination | |||||
Yields: | Returns: | ||||
bytes: Content identifier present in the range ``[start, end]`` | PagedResult of Sha1. If next_page_token is None, there is no more data | ||||
to fetch | |||||
""" | """ | ||||
Done Inline Actionsto fix... PagedResult[Sha1] ardumont: to fix... PagedResult[Sha1] | |||||
pass | pass | ||||
def _list_contents_to_index( | def _list_contents_to_index( | ||||
self, start: bytes, end: bytes, indexed: Set[bytes] | self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] | ||||
) -> Iterator[bytes]: | ) -> Iterator[Sha1]: | ||||
"""Compute from storage the new contents to index in the range [start, | """Compute from storage the new contents to index in the partition_id . The already | ||||
end]. The already indexed contents are skipped. | indexed contents are skipped. | ||||
Args: | Args: | ||||
start: Starting bound from range identifier | partition_id: Index of the partition to fetch data from | ||||
end: End range identifier | nb_partitions: Total number of partition | ||||
indexed: Set of content already indexed. | indexed: Set of content already indexed. | ||||
Yields: | Yields: | ||||
bytes: Identifier of contents to index. | Sha1 id (bytes) of contents to index | ||||
""" | """ | ||||
if not isinstance(start, bytes) or not isinstance(end, bytes): | if not isinstance(partition_id, int) or not isinstance(nb_partitions, int): | ||||
raise TypeError("identifiers must be bytes, not %r and %r." % (start, end)) | raise TypeError( | ||||
while start: | f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}." | ||||
result = self.storage.content_get_range(start, end) | ) | ||||
contents = result["contents"] | next_page_token = None | ||||
while True: | |||||
result = self.storage.content_get_partition( | |||||
partition_id, nb_partitions, page_token=next_page_token | |||||
) | |||||
contents = result.results | |||||
for c in contents: | for c in contents: | ||||
_id = hashutil.hash_to_bytes(c["sha1"]) | _id = hashutil.hash_to_bytes(c.sha1) | ||||
if _id in indexed: | if _id in indexed: | ||||
continue | continue | ||||
yield _id | yield _id | ||||
start = result["next"] | next_page_token = result.next_page_token | ||||
if next_page_token is None: | |||||
break | |||||
def _index_contents( | def _index_contents( | ||||
self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any | self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any | ||||
) -> Iterator[Dict]: | ) -> Iterator[Dict]: | ||||
"""Index the contents from within range [start, end] | """Index the contents within the partition_id. | ||||
Args: | Args: | ||||
start: Starting bound from range identifier | start: Starting bound from range identifier | ||||
end: End range identifier | end: End range identifier | ||||
indexed: Set of content already indexed. | indexed: Set of content already indexed. | ||||
Yields: | Yields: | ||||
dict: Data indexed to persist using the indexer storage | indexing result as dict to persist in the indexer backend | ||||
""" | """ | ||||
for sha1 in self._list_contents_to_index(start, end, indexed): | for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): | ||||
try: | try: | ||||
raw_content = self.objstorage.get(sha1) | raw_content = self.objstorage.get(sha1) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
self.log.warning( | self.log.warning(f"Content {sha1.hex()} not found in objstorage") | ||||
"Content %s not found in objstorage" % hashutil.hash_to_hex(sha1) | |||||
) | |||||
continue | continue | ||||
res = self.index(sha1, raw_content, **kwargs) | res = self.index(sha1, raw_content, **kwargs) | ||||
if res: | if res: | ||||
if not isinstance(res["id"], bytes): | if not isinstance(res["id"], bytes): | ||||
raise TypeError( | raise TypeError( | ||||
"%r.index should return ids as bytes, not %r" | "%r.index should return ids as bytes, not %r" | ||||
% (self.__class__.__name__, res["id"]) | % (self.__class__.__name__, res["id"]) | ||||
) | ) | ||||
yield res | yield res | ||||
def _index_with_skipping_already_done( | def _index_with_skipping_already_done( | ||||
self, start: bytes, end: bytes | self, partition_id: int, nb_partitions: int | ||||
) -> Iterator[Dict]: | ) -> Iterator[Dict]: | ||||
"""Index not already indexed contents in range [start, end]. | """Index not already indexed contents within the partition partition_id | ||||
Args: | Args: | ||||
start: Starting range identifier | partition_id: Index of the partition to fetch | ||||
end: Ending range identifier | nb_partitions: Total number of partitions to split into | ||||
Yields: | Yields: | ||||
dict: Content identifier present in the range | indexing result as dict to persist in the indexer backend | ||||
Done Inline Actionsdoc to fix ardumont: doc to fix | |||||
``[start, end]`` which are not already indexed. | |||||
""" | """ | ||||
while start: | next_page_token = None | ||||
indexed_page = self.indexed_contents_in_range(start, end) | contents = set() | ||||
contents = indexed_page["ids"] | while True: | ||||
_end = contents[-1] if contents else end | indexed_page = self.indexed_contents_in_partition( | ||||
yield from self._index_contents(start, _end, contents) | partition_id, nb_partitions, page_token=next_page_token | ||||
start = indexed_page["next"] | ) | ||||
for sha1 in indexed_page.results: | |||||
contents.add(sha1) | |||||
yield from self._index_contents(partition_id, nb_partitions, contents) | |||||
next_page_token = indexed_page.next_page_token | |||||
if next_page_token is None: | |||||
break | |||||
def run( | def run( | ||||
self, | self, | ||||
start: Union[bytes, str], | partition_id: int, | ||||
end: Union[bytes, str], | nb_partitions: int, | ||||
skip_existing: bool = True, | skip_existing: bool = True, | ||||
**kwargs | **kwargs, | ||||
) -> Dict: | ) -> Dict: | ||||
"""Given a range of content ids, compute the indexing computations on | """Given a partition of content ids, index the contents within. | ||||
Done Inline Actionsdoc to fix. ardumont: doc to fix. | |||||
the contents within. Either the indexer is incremental | |||||
(filter out existing computed data) or not (compute | Either the indexer is incremental (filter out existing computed data) or it | ||||
everything from scratch). | computes everything from scratch. | ||||
Args: | Args: | ||||
start: Starting range identifier | partition_id: Index of the partition to fetch | ||||
end: Ending range identifier | nb_partitions: Total number of partitions to split into | ||||
skip_existing: Skip existing indexed data | skip_existing: Skip existing indexed data | ||||
(default) or not | (default) or not | ||||
**kwargs: passed to the `index` method | **kwargs: passed to the `index` method | ||||
Returns: | Returns: | ||||
A dict with the task's status | dict with the indexing task status | ||||
""" | """ | ||||
status = "uneventful" | status = "uneventful" | ||||
summary: Dict[str, Any] = {} | summary: Dict[str, Any] = {} | ||||
count = 0 | count = 0 | ||||
try: | try: | ||||
range_start = ( | |||||
hashutil.hash_to_bytes(start) if isinstance(start, str) else start | |||||
) | |||||
range_end = hashutil.hash_to_bytes(end) if isinstance(end, str) else end | |||||
if skip_existing: | if skip_existing: | ||||
gen = self._index_with_skipping_already_done(range_start, range_end) | gen = self._index_with_skipping_already_done( | ||||
partition_id, nb_partitions | |||||
) | |||||
else: | else: | ||||
gen = self._index_contents(range_start, range_end, indexed=set([])) | gen = self._index_contents(partition_id, nb_partitions, indexed=set([])) | ||||
count_object_added_key: Optional[str] = None | count_object_added_key: Optional[str] = None | ||||
for contents in utils.grouper(gen, n=self.config["write_batch_size"]): | for contents in utils.grouper(gen, n=self.config["write_batch_size"]): | ||||
res = self.persist_index_computations( | res = self.persist_index_computations( | ||||
contents, policy_update="update-dups" | contents, policy_update="update-dups" | ||||
) | ) | ||||
if not count_object_added_key: | if not count_object_added_key: | ||||
count_object_added_key = list(res.keys())[0] | count_object_added_key = list(res.keys())[0] | ||||
count += res[count_object_added_key] | count += res[count_object_added_key] | ||||
if count > 0: | if count > 0: | ||||
status = "eventful" | status = "eventful" | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when computing metadata.") | self.log.exception("Problem when computing metadata.") | ||||
status = "failed" | status = "failed" | ||||
finally: | finally: | ||||
summary["status"] = status | summary["status"] = status | ||||
if count > 0 and count_object_added_key: | if count > 0 and count_object_added_key: | ||||
summary[count_object_added_key] = count | summary[count_object_added_key] = count | ||||
return summary | return summary | ||||
# alias for retrocompatibility | |||||
ContentRangeIndexer = ContentPartitionIndexer | |||||
class OriginIndexer(BaseIndexer): | class OriginIndexer(BaseIndexer): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Origin indexing using the run method | implements Origin indexing using the run method | ||||
Note: the :class:`OriginIndexer` is not an instantiable object. | Note: the :class:`OriginIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | def run(self, ids: Union[str, bytes], policy_update: str) -> Dict: | ||||
ids: sha1_git's identifier list | ids: sha1_git's identifier list | ||||
policy_update: either 'update-dups' or 'ignore-dups' to | policy_update: either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
""" | """ | ||||
summary: Dict[str, Any] = {} | summary: Dict[str, Any] = {} | ||||
status = "uneventful" | status = "uneventful" | ||||
results = [] | results = [] | ||||
revs = self.storage.revision_get( | |||||
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | |||||
) | |||||
for rev in revs: | revision_ids = [ | ||||
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | |||||
] | |||||
for rev in self.storage.revision_get(revision_ids): | |||||
if not rev: | if not rev: | ||||
self.log.warning( | self.log.warning( | ||||
"Revisions %s not found in storage" | "Revisions %s not found in storage" | ||||
% list(map(hashutil.hash_to_hex, ids)) | % list(map(hashutil.hash_to_hex, ids)) | ||||
) | ) | ||||
continue | continue | ||||
try: | try: | ||||
res = self.index(rev) | res = self.index(rev) | ||||
Show All 16 Lines |
on indexer partition