Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
# Copyright (C) 2016-2020 The Software Heritage developers | # Copyright (C) 2016-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import logging | import logging | ||||
import os | import os | ||||
import shutil | import shutil | ||||
import tempfile | import tempfile | ||||
from typing import Any, Dict, Iterator, List, Optional, Set, Union | from typing import Any, Dict, Generic, Iterator, List, Optional, Set, TypeVar, Union | ||||
from swh.core import utils | from swh.core import utils | ||||
from swh.core.config import load_from_envvar, merge_configs | from swh.core.config import load_from_envvar, merge_configs | ||||
from swh.indexer.storage import INDEXER_CFG_KEY, PagedResult, Sha1, get_indexer_storage | from swh.indexer.storage import INDEXER_CFG_KEY, PagedResult, Sha1, get_indexer_storage | ||||
from swh.indexer.storage.model import BaseRow | from swh.indexer.storage.model import BaseRow | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import Revision | from swh.model.model import Revision | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
Show All 31 Lines | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
INDEXER_CFG_KEY: {"cls": "memory"}, | INDEXER_CFG_KEY: {"cls": "memory"}, | ||||
"storage": {"cls": "memory"}, | "storage": {"cls": "memory"}, | ||||
"objstorage": {"cls": "memory"}, | "objstorage": {"cls": "memory"}, | ||||
} | } | ||||
class BaseIndexer(metaclass=abc.ABCMeta): | # TODO: should be bound=Optional[BaseRow] when all endpoints move away from dicts | ||||
TResult = TypeVar("TResult", bound=Union[None, Dict, BaseRow]) | |||||
class BaseIndexer(Generic[TResult], metaclass=abc.ABCMeta): | |||||
"""Base class for indexers to inherit from. | """Base class for indexers to inherit from. | ||||
The main entry point is the :func:`run` function which is in | The main entry point is the :func:`run` function which is in | ||||
charge of triggering the computations on the batch dict/ids | charge of triggering the computations on the batch dict/ids | ||||
received. | received. | ||||
Indexers can: | Indexers can: | ||||
Show All 36 Lines | :meth:`~BaseIndexer.check`: | ||||
`super().check()` instruction. | `super().check()` instruction. | ||||
:meth:`~BaseIndexer.register_tools`: | :meth:`~BaseIndexer.register_tools`: | ||||
This should return a dict of the tool(s) to use when indexing or | This should return a dict of the tool(s) to use when indexing or | ||||
filtering. | filtering. | ||||
""" | """ | ||||
results: List[Union[Dict, BaseRow]] | results: List[TResult] | ||||
USE_TOOLS = True | USE_TOOLS = True | ||||
catch_exceptions = True | catch_exceptions = True | ||||
"""Prevents exceptions in `index()` from raising too high. Set to False | """Prevents exceptions in `index()` from raising too high. Set to False | ||||
in tests to properly catch all exceptions.""" | in tests to properly catch all exceptions.""" | ||||
scheduler: Any | scheduler: Any | ||||
▲ Show 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | ) -> List[Dict[str, Any]]: | ||||
if tools: | if tools: | ||||
return self.idx_storage.indexer_configuration_add(tools) | return self.idx_storage.indexer_configuration_add(tools) | ||||
else: | else: | ||||
return [] | return [] | ||||
def index( | def index( | ||||
self, id: Union[bytes, Dict, Revision], data: Optional[bytes] = None, **kwargs | self, id: Union[bytes, Dict, Revision], data: Optional[bytes] = None, **kwargs | ||||
) -> Union[Dict[str, Any], BaseRow]: | ) -> TResult: | ||||
"""Index computation for the id and associated raw data. | """Index computation for the id and associated raw data. | ||||
Args: | Args: | ||||
id: identifier or Dict object | id: identifier or Dict object | ||||
data: id's data from storage or objstorage depending on | data: id's data from storage or objstorage depending on | ||||
object type | object type | ||||
Returns: | Returns: | ||||
Show All 28 Lines | def persist_index_computations(self, results, policy_update) -> Dict[str, int]: | ||||
Returns: | Returns: | ||||
a summary dict of what has been inserted in the storage | a summary dict of what has been inserted in the storage | ||||
""" | """ | ||||
return {} | return {} | ||||
class ContentIndexer(BaseIndexer): | class ContentIndexer(BaseIndexer[TResult], Generic[TResult]): | ||||
"""A content indexer working on a list of ids directly. | """A content indexer working on a list of ids directly. | ||||
To work on indexer partition, use the :class:`ContentPartitionIndexer` | To work on indexer partition, use the :class:`ContentPartitionIndexer` | ||||
instead. | instead. | ||||
Note: :class:`ContentIndexer` is not an instantiable object. To | Note: :class:`ContentIndexer` is not an instantiable object. To | ||||
use it, one should inherit from this class and override the | use it, one should inherit from this class and override the | ||||
methods mentioned in the :class:`BaseIndexer` class. | methods mentioned in the :class:`BaseIndexer` class. | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | ) -> Dict: | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when reading contents metadata.") | self.log.exception("Problem when reading contents metadata.") | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | return summary | ||||
class ContentPartitionIndexer(BaseIndexer): | class ContentPartitionIndexer(BaseIndexer[TResult], Generic[TResult]): | ||||
"""A content partition indexer. | """A content partition indexer. | ||||
This expects as input a partition_id and a nb_partitions. This will then index the | This expects as input a partition_id and a nb_partitions. This will then index the | ||||
contents within that partition. | contents within that partition. | ||||
To work on a list of ids, use the :class:`ContentIndexer` instead. | To work on a list of ids, use the :class:`ContentIndexer` instead. | ||||
Note: :class:`ContentPartitionIndexer` is not an instantiable | Note: :class:`ContentPartitionIndexer` is not an instantiable | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | ) -> Iterator[Sha1]: | ||||
continue | continue | ||||
yield _id | yield _id | ||||
next_page_token = result.next_page_token | next_page_token = result.next_page_token | ||||
if next_page_token is None: | if next_page_token is None: | ||||
break | break | ||||
def _index_contents( | def _index_contents( | ||||
self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any | self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any | ||||
) -> Iterator[Union[BaseRow, Dict]]: | ) -> Iterator[TResult]: | ||||
"""Index the contents within the partition_id. | """Index the contents within the partition_id. | ||||
Args: | Args: | ||||
start: Starting bound from range identifier | start: Starting bound from range identifier | ||||
end: End range identifier | end: End range identifier | ||||
indexed: Set of content already indexed. | indexed: Set of content already indexed. | ||||
Yields: | Yields: | ||||
Show All 13 Lines | ) -> Iterator[TResult]: | ||||
raise TypeError( | raise TypeError( | ||||
"%r.index should return ids as bytes, not %r" | "%r.index should return ids as bytes, not %r" | ||||
% (self.__class__.__name__, res["id"]) | % (self.__class__.__name__, res["id"]) | ||||
) | ) | ||||
yield res | yield res | ||||
def _index_with_skipping_already_done( | def _index_with_skipping_already_done( | ||||
self, partition_id: int, nb_partitions: int | self, partition_id: int, nb_partitions: int | ||||
) -> Iterator[Union[BaseRow, Dict]]: | ) -> Iterator[TResult]: | ||||
"""Index not already indexed contents within the partition partition_id | """Index not already indexed contents within the partition partition_id | ||||
Args: | Args: | ||||
partition_id: Index of the partition to fetch | partition_id: Index of the partition to fetch | ||||
nb_partitions: Total number of partitions to split into | nb_partitions: Total number of partitions to split into | ||||
Yields: | Yields: | ||||
indexing result as dict to persist in the indexer backend | indexing result as dict to persist in the indexer backend | ||||
▲ Show 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | ) -> Dict: | ||||
self.log.exception("Problem when computing metadata.") | self.log.exception("Problem when computing metadata.") | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
if count > 0 and count_object_added_key: | if count > 0 and count_object_added_key: | ||||
summary[count_object_added_key] = count | summary[count_object_added_key] = count | ||||
return summary | return summary | ||||
class OriginIndexer(BaseIndexer): | class OriginIndexer(BaseIndexer[TResult], Generic[TResult]): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Origin indexing using the run method | implements Origin indexing using the run method | ||||
Note: the :class:`OriginIndexer` is not an instantiable object. | Note: the :class:`OriginIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
Show All 28 Lines | ) -> Dict: | ||||
self.results = results | self.results = results | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
return summary | return summary | ||||
def index_list( | def index_list(self, origins: List[Any], **kwargs: Any) -> List[TResult]: | ||||
self, origins: List[Any], **kwargs: Any | |||||
) -> List[Union[Dict, BaseRow]]: | |||||
results = [] | results = [] | ||||
for origin in origins: | for origin in origins: | ||||
try: | try: | ||||
res = self.index(origin, **kwargs) | res = self.index(origin, **kwargs) | ||||
if res: # If no results, skip it | if res: # If no results, skip it | ||||
results.append(res) | results.append(res) | ||||
except Exception: | except Exception: | ||||
self.log.exception("Problem when processing origin %s", origin) | self.log.exception("Problem when processing origin %s", origin) | ||||
raise | raise | ||||
return results | return results | ||||
class RevisionIndexer(BaseIndexer): | class RevisionIndexer(BaseIndexer[TResult], Generic[TResult]): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Revision indexing using the run method | implements Revision indexing using the run method | ||||
Note: the :class:`RevisionIndexer` is not an instantiable object. | Note: the :class:`RevisionIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
▲ Show 20 Lines • Show All 47 Lines • Show Last 20 Lines |