Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show All 12 Lines | from typing import ( | ||||
Any, | Any, | ||||
Dict, | Dict, | ||||
Generic, | Generic, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Set, | Set, | ||||
Tuple, | |||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
import warnings | import warnings | ||||
import sentry_sdk | import sentry_sdk | ||||
from typing_extensions import TypedDict | from typing_extensions import TypedDict | ||||
from swh.core import utils | from swh.core import utils | ||||
from swh.core.config import load_from_envvar, merge_configs | from swh.core.config import load_from_envvar, merge_configs | ||||
from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage | from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage | ||||
from swh.indexer.storage.interface import IndexerStorageInterface | from swh.indexer.storage.interface import IndexerStorageInterface | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import Origin, Revision, Sha1Git | from swh.model.model import Directory, Origin, Sha1Git | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.objstorage.factory import get_objstorage | from swh.objstorage.factory import get_objstorage | ||||
from swh.scheduler import CONFIG as SWH_CONFIG | from swh.scheduler import CONFIG as SWH_CONFIG | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
class ObjectsDict(TypedDict, total=False): | class ObjectsDict(TypedDict, total=False): | ||||
revision: List[Dict] | directory: List[Dict] | ||||
origin: List[Dict] | origin: List[Dict] | ||||
origin_visit_status: List[Dict] | origin_visit_status: List[Dict] | ||||
@contextmanager | @contextmanager | ||||
def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: | def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: | ||||
"""Write the sha1's content in a temporary file. | """Write the sha1's content in a temporary file. | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta): | ||||
To implement a new object type indexer, inherit from the | To implement a new object type indexer, inherit from the | ||||
BaseIndexer and implement indexing: | BaseIndexer and implement indexing: | ||||
:meth:`~BaseIndexer.run`: | :meth:`~BaseIndexer.run`: | ||||
object_ids are different depending on object. For example: sha1 for | object_ids are different depending on object. For example: sha1 for | ||||
content, sha1_git for revision, directory, release, and id for origin | content, sha1_git for revision, directory, release, and id for origin | ||||
To implement a new concrete indexer, inherit from the object level | To implement a new concrete indexer, inherit from the object level | ||||
classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, | classes: :class:`ContentIndexer`, :class:`DirectoryIndexer`, | ||||
:class:`OriginIndexer`. | :class:`OriginIndexer`. | ||||
Then you need to implement the following functions: | Then you need to implement the following functions: | ||||
:meth:`~BaseIndexer.filter`: | :meth:`~BaseIndexer.filter`: | ||||
filter out data already indexed (in storage). | filter out data already indexed (in storage). | ||||
:meth:`~BaseIndexer.index_object`: | :meth:`~BaseIndexer.index_object`: | ||||
▲ Show 20 Lines • Show All 457 Lines • ▼ Show 20 Lines | def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: | ||||
results.extend(self.index(origin.url, **kwargs)) | results.extend(self.index(origin.url, **kwargs)) | ||||
except Exception: | except Exception: | ||||
self.log.exception("Problem when processing origin %s", origin.url) | self.log.exception("Problem when processing origin %s", origin.url) | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
raise | raise | ||||
return results | return results | ||||
class RevisionIndexer(BaseIndexer[Sha1Git, Revision, TResult], Generic[TResult]): | class DirectoryIndexer(BaseIndexer[Sha1Git, Directory, TResult], Generic[TResult]): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Revision indexing using the run method | implements Directory indexing using the run method | ||||
Note: the :class:`RevisionIndexer` is not an instantiable object. | Note: the :class:`DirectoryIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
""" | """ | ||||
def run(self, ids: List[Sha1Git], **kwargs) -> Dict: | def run(self, ids: List[Sha1Git], **kwargs) -> Dict: | ||||
"""Given a list of sha1_gits: | """Given a list of sha1_gits: | ||||
- retrieve revisions from storage | - retrieve directories from storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results | - store the results | ||||
Args: | Args: | ||||
ids: sha1_git's identifier list | ids: sha1_git's identifier list | ||||
""" | """ | ||||
if "policy_update" in kwargs: | if "policy_update" in kwargs: | ||||
warnings.warn( | warnings.warn( | ||||
"'policy_update' argument is deprecated and ignored.", | "'policy_update' argument is deprecated and ignored.", | ||||
DeprecationWarning, | DeprecationWarning, | ||||
) | ) | ||||
del kwargs["policy_update"] | del kwargs["policy_update"] | ||||
revision_ids = [ | directory_ids = [ | ||||
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | ||||
] | ] | ||||
revisions = [] | |||||
for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)): | |||||
if not rev: | |||||
# TODO: call self.index() with rev=None? | |||||
self.log.warning( | |||||
"Revision %s not found in storage", hashutil.hash_to_hex(rev_id) | |||||
) | |||||
continue | |||||
revisions.append(rev.to_dict()) | |||||
return self.process_journal_objects({"revision": revisions}) | return self._process_directories([(dir_id, None) for dir_id in directory_ids]) | ||||
def process_journal_objects(self, objects: ObjectsDict) -> Dict: | def process_journal_objects(self, objects: ObjectsDict) -> Dict: | ||||
"""Worker function for ``JournalClient``. Expects ``objects`` to have a single | """Worker function for ``JournalClient``. Expects ``objects`` to have a single | ||||
key, ``"revision"``.""" | key, ``"directory"``.""" | ||||
assert set(objects) == {"revision"} | assert set(objects) == {"directory"} | ||||
return self._process_directories( | |||||
[(dir_["id"], Directory.from_dict(dir_)) for dir_ in objects["directory"]] | |||||
) | |||||
def _process_directories( | |||||
self, | |||||
directories: Union[List[Tuple[Sha1Git, Directory]], List[Tuple[Sha1Git, None]]], | |||||
) -> Dict: | |||||
summary: Dict[str, Any] = {"status": "uneventful"} | summary: Dict[str, Any] = {"status": "uneventful"} | ||||
results = [] | results = [] | ||||
for rev in objects["revision"]: | # TODO: fetch raw_manifest when useful? | ||||
for (dir_id, dir_) in directories: | |||||
try: | try: | ||||
results.extend(self.index(rev["id"], Revision.from_dict(rev))) | results.extend(self.index(dir_id, dir_)) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when processing revision") | self.log.exception("Problem when processing directory") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
summary_persist = self.persist_index_computations(results) | summary_persist = self.persist_index_computations(results) | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
self.results = results | self.results = results | ||||
return summary | return summary |