Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show All 10 Lines | |||||
import tempfile | import tempfile | ||||
from typing import Any, Dict, Generic, Iterator, List, Optional, Set, TypeVar, Union | from typing import Any, Dict, Generic, Iterator, List, Optional, Set, TypeVar, Union | ||||
from swh.core import utils | from swh.core import utils | ||||
from swh.core.config import load_from_envvar, merge_configs | from swh.core.config import load_from_envvar, merge_configs | ||||
from swh.indexer.storage import INDEXER_CFG_KEY, PagedResult, Sha1, get_indexer_storage | from swh.indexer.storage import INDEXER_CFG_KEY, PagedResult, Sha1, get_indexer_storage | ||||
from swh.indexer.storage.interface import IndexerStorageInterface | from swh.indexer.storage.interface import IndexerStorageInterface | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import Revision, Sha1Git | |||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.objstorage.factory import get_objstorage | from swh.objstorage.factory import get_objstorage | ||||
from swh.scheduler import CONFIG as SWH_CONFIG | from swh.scheduler import CONFIG as SWH_CONFIG | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
@contextmanager | @contextmanager | ||||
Show All 25 Lines | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
INDEXER_CFG_KEY: {"cls": "memory"}, | INDEXER_CFG_KEY: {"cls": "memory"}, | ||||
"storage": {"cls": "memory"}, | "storage": {"cls": "memory"}, | ||||
"objstorage": {"cls": "memory"}, | "objstorage": {"cls": "memory"}, | ||||
} | } | ||||
TId = TypeVar("TId") | |||||
"""type of the ids of index()ed objects.""" | |||||
TData = TypeVar("TData") | |||||
"""type of the objects passed to index().""" | |||||
TResult = TypeVar("TResult") | TResult = TypeVar("TResult") | ||||
"""return type of index()""" | |||||
class BaseIndexer(Generic[TResult], metaclass=abc.ABCMeta): | class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta): | ||||
"""Base class for indexers to inherit from. | """Base class for indexers to inherit from. | ||||
The main entry point is the :func:`run` function which is in | The main entry point is the :func:`run` function which is in | ||||
charge of triggering the computations on the batch dict/ids | charge of triggering the computations on the batch dict/ids | ||||
received. | received. | ||||
Indexers can: | Indexers can: | ||||
▲ Show 20 Lines • Show All 139 Lines • ▼ Show 20 Lines | ) -> List[Dict[str, Any]]: | ||||
else: | else: | ||||
raise ValueError("Configuration tool(s) must be a dict or list!") | raise ValueError("Configuration tool(s) must be a dict or list!") | ||||
if tools: | if tools: | ||||
return self.idx_storage.indexer_configuration_add(tools) | return self.idx_storage.indexer_configuration_add(tools) | ||||
else: | else: | ||||
return [] | return [] | ||||
def index(self, id, data: Optional[bytes] = None, **kwargs) -> List[TResult]: | def index(self, id: TId, data: Optional[TData], **kwargs) -> List[TResult]: | ||||
"""Index computation for the id and associated raw data. | """Index computation for the id and associated raw data. | ||||
Args: | Args: | ||||
id: identifier or Dict object | id: identifier or Dict object | ||||
data: id's data from storage or objstorage depending on | data: id's data from storage or objstorage depending on | ||||
object type | object type | ||||
Returns: | Returns: | ||||
dict: a dict that makes sense for the | dict: a dict that makes sense for the | ||||
:meth:`.persist_index_computations` method. | :meth:`.persist_index_computations` method. | ||||
""" | """ | ||||
raise NotImplementedError() | raise NotImplementedError() | ||||
def filter(self, ids: List[bytes]) -> Iterator[bytes]: | def filter(self, ids: List[TId]) -> Iterator[TId]: | ||||
"""Filter missing ids for that particular indexer. | """Filter missing ids for that particular indexer. | ||||
Args: | Args: | ||||
ids: list of ids | ids: list of ids | ||||
Yields: | Yields: | ||||
iterator of missing ids | iterator of missing ids | ||||
Show All 15 Lines | ) -> Dict[str, int]: | ||||
Returns: | Returns: | ||||
a summary dict of what has been inserted in the storage | a summary dict of what has been inserted in the storage | ||||
""" | """ | ||||
return {} | return {} | ||||
class ContentIndexer(BaseIndexer[TResult], Generic[TResult]): | class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): | ||||
"""A content indexer working on a list of ids directly. | """A content indexer working on a list of ids directly. | ||||
To work on indexer partition, use the :class:`ContentPartitionIndexer` | To work on indexer partition, use the :class:`ContentPartitionIndexer` | ||||
instead. | instead. | ||||
Note: :class:`ContentIndexer` is not an instantiable object. To | Note: :class:`ContentIndexer` is not an instantiable object. To | ||||
use it, one should inherit from this class and override the | use it, one should inherit from this class and override the | ||||
methods mentioned in the :class:`BaseIndexer` class. | methods mentioned in the :class:`BaseIndexer` class. | ||||
""" | """ | ||||
def run( | def run(self, ids: List[Sha1], policy_update: str, **kwargs) -> Dict: | ||||
self, ids: Union[List[bytes], bytes, str], policy_update: str, **kwargs | |||||
) -> Dict: | |||||
"""Given a list of ids: | """Given a list of ids: | ||||
- retrieve the content from the storage | - retrieve the content from the storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results (according to policy_update) | - store the results (according to policy_update) | ||||
Args: | Args: | ||||
ids (Iterable[Union[bytes, str]]): sha1's identifier list | ids (Iterable[Union[bytes, str]]): sha1's identifier list | ||||
Show All 30 Lines | def run(self, ids: List[Sha1], policy_update: str, **kwargs) -> Dict: | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when reading contents metadata.") | self.log.exception("Problem when reading contents metadata.") | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | return summary | ||||
class ContentPartitionIndexer(BaseIndexer[TResult], Generic[TResult]): | class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): | ||||
"""A content partition indexer. | """A content partition indexer. | ||||
This expects as input a partition_id and a nb_partitions. This will then index the | This expects as input a partition_id and a nb_partitions. This will then index the | ||||
contents within that partition. | contents within that partition. | ||||
To work on a list of ids, use the :class:`ContentIndexer` instead. | To work on a list of ids, use the :class:`ContentIndexer` instead. | ||||
Note: :class:`ContentPartitionIndexer` is not an instantiable | Note: :class:`ContentPartitionIndexer` is not an instantiable | ||||
▲ Show 20 Lines • Show All 152 Lines • ▼ Show 20 Lines | ) -> Dict: | ||||
self.log.exception("Problem when computing metadata.") | self.log.exception("Problem when computing metadata.") | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
if count > 0 and count_object_added_key: | if count > 0 and count_object_added_key: | ||||
summary[count_object_added_key] = count | summary[count_object_added_key] = count | ||||
return summary | return summary | ||||
class OriginIndexer(BaseIndexer[TResult], Generic[TResult]): | class OriginIndexer(BaseIndexer[str, None, TResult], Generic[TResult]): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Origin indexing using the run method | implements Origin indexing using the run method | ||||
Note: the :class:`OriginIndexer` is not an instantiable object. | Note: the :class:`OriginIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
Show All 39 Lines | def index_list(self, origin_urls: List[str], **kwargs) -> List[TResult]: | ||||
try: | try: | ||||
results.extend(self.index(origin_url, **kwargs)) | results.extend(self.index(origin_url, **kwargs)) | ||||
except Exception: | except Exception: | ||||
self.log.exception("Problem when processing origin %s", origin_url) | self.log.exception("Problem when processing origin %s", origin_url) | ||||
raise | raise | ||||
return results | return results | ||||
class RevisionIndexer(BaseIndexer[TResult], Generic[TResult]): | class RevisionIndexer(BaseIndexer[Sha1Git, Revision, TResult], Generic[TResult]): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Revision indexing using the run method | implements Revision indexing using the run method | ||||
Note: the :class:`RevisionIndexer` is not an instantiable object. | Note: the :class:`RevisionIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
""" | """ | ||||
def run(self, ids: Union[str, bytes], policy_update: str) -> Dict: | def run(self, ids: List[Sha1Git], policy_update: str) -> Dict: | ||||
ardumont: List[Sha1Git] | |||||
"""Given a list of sha1_gits: | """Given a list of sha1_gits: | ||||
- retrieve revisions from storage | - retrieve revisions from storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results (according to policy_update) | - store the results (according to policy_update) | ||||
Args: | Args: | ||||
ids: sha1_git's identifier list | ids: sha1_git's identifier list | ||||
policy_update: either 'update-dups' or 'ignore-dups' to | policy_update: either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
""" | """ | ||||
summary: Dict[str, Any] = {"status": "uneventful"} | summary: Dict[str, Any] = {"status": "uneventful"} | ||||
results = [] | results = [] | ||||
revision_ids = [ | revision_ids = [ | ||||
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | ||||
] | ] | ||||
for rev in self.storage.revision_get(revision_ids): | for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)): | ||||
if not rev: | if not rev: | ||||
# TODO: call self.index() with rev=None? | |||||
self.log.warning( | self.log.warning( | ||||
"Revisions %s not found in storage" | "Revision %s not found in storage", hashutil.hash_to_hex(rev_id) | ||||
% list(map(hashutil.hash_to_hex, ids)) | |||||
) | ) | ||||
continue | continue | ||||
try: | try: | ||||
results.extend(self.index(rev)) | results.extend(self.index(rev_id, rev)) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when processing revision") | self.log.exception("Problem when processing revision") | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | return summary | ||||
summary_persist = self.persist_index_computations(results, policy_update) | summary_persist = self.persist_index_computations(results, policy_update) | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
self.results = results | self.results = results | ||||
return summary | return summary |
List[Sha1Git]