Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
# Copyright (C) 2016-2021 The Software Heritage developers | # Copyright (C) 2016-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import logging | import logging | ||||
import os | import os | ||||
import shutil | import shutil | ||||
import tempfile | import tempfile | ||||
from typing import ( | from typing import ( | ||||
Any, | Any, | ||||
Dict, | Dict, | ||||
Generic, | Generic, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Set, | Set, | ||||
Tuple, | |||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
import warnings | import warnings | ||||
import sentry_sdk | import sentry_sdk | ||||
from typing_extensions import TypedDict | |||||
from swh.core import utils | from swh.core import utils | ||||
from swh.core.config import load_from_envvar, merge_configs | from swh.core.config import load_from_envvar, merge_configs | ||||
from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage | from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage | ||||
from swh.indexer.storage.interface import IndexerStorageInterface | from swh.indexer.storage.interface import IndexerStorageInterface | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import Revision, Sha1Git | from swh.model.model import Directory, Origin, Sha1Git | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.objstorage.factory import get_objstorage | from swh.objstorage.factory import get_objstorage | ||||
from swh.scheduler import CONFIG as SWH_CONFIG | from swh.scheduler import CONFIG as SWH_CONFIG | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
class ObjectsDict(TypedDict, total=False): | |||||
directory: List[Dict] | |||||
origin: List[Dict] | |||||
origin_visit_status: List[Dict] | |||||
@contextmanager | @contextmanager | ||||
def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: | def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: | ||||
"""Write the sha1's content in a temporary file. | """Write the sha1's content in a temporary file. | ||||
Args: | Args: | ||||
filename: one of sha1's many filenames | filename: one of sha1's many filenames | ||||
data: the sha1's content to write in temporary | data: the sha1's content to write in temporary | ||||
file | file | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta): | ||||
To implement a new object type indexer, inherit from the | To implement a new object type indexer, inherit from the | ||||
BaseIndexer and implement indexing: | BaseIndexer and implement indexing: | ||||
:meth:`~BaseIndexer.run`: | :meth:`~BaseIndexer.run`: | ||||
object_ids are different depending on object. For example: sha1 for | object_ids are different depending on object. For example: sha1 for | ||||
content, sha1_git for revision, directory, release, and id for origin | content, sha1_git for revision, directory, release, and id for origin | ||||
To implement a new concrete indexer, inherit from the object level | To implement a new concrete indexer, inherit from the object level | ||||
classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, | classes: :class:`ContentIndexer`, :class:`DirectoryIndexer`, | ||||
:class:`OriginIndexer`. | :class:`OriginIndexer`. | ||||
Then you need to implement the following functions: | Then you need to implement the following functions: | ||||
:meth:`~BaseIndexer.filter`: | :meth:`~BaseIndexer.filter`: | ||||
filter out data already indexed (in storage). | filter out data already indexed (in storage). | ||||
:meth:`~BaseIndexer.index_object`: | :meth:`~BaseIndexer.index_object`: | ||||
▲ Show 20 Lines • Show All 407 Lines • ▼ Show 20 Lines | def run(self, origin_urls: List[str], **kwargs) -> Dict: | ||||
""" | """ | ||||
if "policy_update" in kwargs: | if "policy_update" in kwargs: | ||||
warnings.warn( | warnings.warn( | ||||
"'policy_update' argument is deprecated and ignored.", | "'policy_update' argument is deprecated and ignored.", | ||||
DeprecationWarning, | DeprecationWarning, | ||||
) | ) | ||||
del kwargs["policy_update"] | del kwargs["policy_update"] | ||||
origins = [{"url": url} for url in origin_urls] | |||||
return self.process_journal_objects({"origin": origins}) | |||||
def process_journal_objects(self, objects: ObjectsDict) -> Dict: | |||||
"""Worker function for ``JournalClient``. Expects ``objects`` to have a single | |||||
key, either ``origin`` or ``"origin_visit_status"``.""" | |||||
origins = [ | |||||
Origin(url=status["origin"]) | |||||
for status in objects.get("origin_visit_status", []) | |||||
if status["status"] == "full" | |||||
] + [Origin(url=origin["url"]) for origin in objects.get("origin", [])] | |||||
summary: Dict[str, Any] = {"status": "uneventful"} | summary: Dict[str, Any] = {"status": "uneventful"} | ||||
try: | try: | ||||
results = self.index_list(origin_urls, **kwargs) | results = self.index_list( | ||||
origins, | |||||
check_origin_known=False, | |||||
# no need to check they exist, as we just received either an origin or | |||||
# visit status; which cannot be created by swh-storage unless the origin | |||||
# already exists | |||||
) | |||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | return summary | ||||
summary_persist = self.persist_index_computations(results) | summary_persist = self.persist_index_computations(results) | ||||
self.results = results | self.results = results | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
return summary | return summary | ||||
def index_list(self, origin_urls: List[str], **kwargs) -> List[TResult]: | def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: | ||||
results = [] | results = [] | ||||
for origin_url in origin_urls: | for origin in origins: | ||||
try: | try: | ||||
results.extend(self.index(origin_url, **kwargs)) | results.extend(self.index(origin.url, **kwargs)) | ||||
except Exception: | except Exception: | ||||
self.log.exception("Problem when processing origin %s", origin_url) | self.log.exception("Problem when processing origin %s", origin.url) | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
raise | raise | ||||
return results | return results | ||||
class RevisionIndexer(BaseIndexer[Sha1Git, Revision, TResult], Generic[TResult]): | class DirectoryIndexer(BaseIndexer[Sha1Git, Directory, TResult], Generic[TResult]): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Revision indexing using the run method | implements Directory indexing using the run method | ||||
Note: the :class:`RevisionIndexer` is not an instantiable object. | Note: the :class:`DirectoryIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
""" | """ | ||||
def run(self, ids: List[Sha1Git], **kwargs) -> Dict: | def run(self, ids: List[Sha1Git], **kwargs) -> Dict: | ||||
"""Given a list of sha1_gits: | """Given a list of sha1_gits: | ||||
- retrieve revisions from storage | - retrieve directories from storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results | - store the results | ||||
Args: | Args: | ||||
ids: sha1_git's identifier list | ids: sha1_git's identifier list | ||||
""" | """ | ||||
if "policy_update" in kwargs: | if "policy_update" in kwargs: | ||||
warnings.warn( | warnings.warn( | ||||
"'policy_update' argument is deprecated and ignored.", | "'policy_update' argument is deprecated and ignored.", | ||||
DeprecationWarning, | DeprecationWarning, | ||||
) | ) | ||||
del kwargs["policy_update"] | del kwargs["policy_update"] | ||||
summary: Dict[str, Any] = {"status": "uneventful"} | |||||
results = [] | |||||
revision_ids = [ | directory_ids = [ | ||||
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | ||||
] | ] | ||||
for (rev_id, rev) in zip(revision_ids, self.storage.revision_get(revision_ids)): | |||||
if not rev: | return self._process_directories([(dir_id, None) for dir_id in directory_ids]) | ||||
# TODO: call self.index() with rev=None? | |||||
self.log.warning( | def process_journal_objects(self, objects: ObjectsDict) -> Dict: | ||||
"Revision %s not found in storage", hashutil.hash_to_hex(rev_id) | """Worker function for ``JournalClient``. Expects ``objects`` to have a single | ||||
key, ``"directory"``.""" | |||||
assert set(objects) == {"directory"} | |||||
return self._process_directories( | |||||
[(dir_["id"], Directory.from_dict(dir_)) for dir_ in objects["directory"]] | |||||
) | ) | ||||
continue | |||||
def _process_directories( | |||||
self, | |||||
directories: Union[List[Tuple[Sha1Git, Directory]], List[Tuple[Sha1Git, None]]], | |||||
) -> Dict: | |||||
summary: Dict[str, Any] = {"status": "uneventful"} | |||||
results = [] | |||||
# TODO: fetch raw_manifest when useful? | |||||
for (dir_id, dir_) in directories: | |||||
try: | try: | ||||
results.extend(self.index(rev_id, rev)) | results.extend(self.index(dir_id, dir_)) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when processing revision") | self.log.exception("Problem when processing directory") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | |||||
summary_persist = self.persist_index_computations(results) | summary_persist = self.persist_index_computations(results) | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
self.results = results | self.results = results | ||||
return summary | return summary |