Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show All 34 Lines | |||||
from swh.model.model import Directory, Origin, Sha1Git | from swh.model.model import Directory, Origin, Sha1Git | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.objstorage.factory import get_objstorage | from swh.objstorage.factory import get_objstorage | ||||
from swh.scheduler import CONFIG as SWH_CONFIG | from swh.scheduler import CONFIG as SWH_CONFIG | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
class ObjectsDict(TypedDict, total=False): | class ObjectsDict(TypedDict, total=False): | ||||
vlorentz: What does this mean? If you want to write a docstring, it should mention these keys are names… | |||||
Done Inline Actionsyes, well, i guess it should have been documented earlier. """Typed objects whose keys are names of Kafka topics and values are list of values of messages in that topic.""" ardumont: yes, well, i guess it should have been documented earlier.
I just wanted to stop seein' yellow… | |||||
"""Typed objects.""" | |||||
content: List[Dict] | |||||
directory: List[Dict] | directory: List[Dict] | ||||
origin: List[Dict] | origin: List[Dict] | ||||
origin_visit_status: List[Dict] | origin_visit_status: List[Dict] | ||||
raw_extrinsic_metadata: List[Dict] | raw_extrinsic_metadata: List[Dict] | ||||
@contextmanager | @contextmanager | ||||
def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: | def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: | ||||
▲ Show 20 Lines • Show All 225 Lines • ▼ Show 20 Lines | def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]: | ||||
result of the index function. | result of the index function. | ||||
Returns: | Returns: | ||||
a summary dict of what has been inserted in the storage | a summary dict of what has been inserted in the storage | ||||
""" | """ | ||||
return {} | return {} | ||||
def process_journal_objects(self, objects: ObjectsDict) -> Dict: | |||||
"""Read swh message objects (content, origin, ...) from the journal to: | |||||
Done Inline Actionsraise NotImplementedError vlorentz: raise NotImplementedError | |||||
- retrieve the associated objects from the storage backend (e.g. storage, | |||||
objstorage...) | |||||
- execute the associated indexing computations | |||||
- store the results in the indexer storage | |||||
""" | |||||
raise NotImplementedError() | |||||
class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): | class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): | ||||
"""A content indexer working on a list of ids directly. | """A content indexer working on the journal (method `process_journal_objects`) or on | ||||
a list of ids directly (method `run`). | |||||
To work on indexer partition, use the :class:`ContentPartitionIndexer` | To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. | ||||
instead. | |||||
Note: :class:`ContentIndexer` is not an instantiable object. To | Note: :class:`ContentIndexer` is not an instantiable object. To | ||||
use it, one should inherit from this class and override the | use it, one should inherit from this class and override the | ||||
methods mentioned in the :class:`BaseIndexer` class. | methods mentioned in the :class:`BaseIndexer` class. | ||||
""" | """ | ||||
def process_journal_objects(self, objects: ObjectsDict) -> Dict: | |||||
"""Read content objects from the journal, retrieve their raw content and compute | |||||
content indexing (e.g. mimetype, fossology license, ...). | |||||
Note that once this is deployed, this supersedes the main ContentIndexer.run | |||||
method call and the class ContentPartitionIndexer. | |||||
""" | |||||
summary: Dict[str, Any] = {"status": "uneventful"} | |||||
try: | |||||
results = [] | |||||
contents = objects.get("content", []) | |||||
Done Inline ActionsWrite a TODO: use get_batch vlorentz: Write a `TODO: use get_batch` | |||||
# FIXME: with swh.objstorage > v2.0: self.objstorage.get_batch(contents) | |||||
content_data = self.objstorage.get_batch(c["sha1"] for c in contents) | |||||
for item, raw_content in zip(contents, content_data): | |||||
id_ = item["sha1"] | |||||
if not raw_content: | |||||
self.log.warning( | |||||
"Content %s not found in objstorage", hashutil.hash_to_hex(id_) | |||||
) | |||||
continue | |||||
results.extend(self.index(id_, data=raw_content)) | |||||
except Exception: | |||||
if not self.catch_exceptions: | |||||
raise | |||||
summary["status"] = "failed" | |||||
return summary | |||||
summary_persist = self.persist_index_computations(results) | |||||
self.results = results | |||||
if summary_persist: | |||||
for value in summary_persist.values(): | |||||
if value > 0: | |||||
summary["status"] = "eventful" | |||||
summary.update(summary_persist) | |||||
return summary | |||||
def run(self, ids: List[Sha1], **kwargs) -> Dict: | def run(self, ids: List[Sha1], **kwargs) -> Dict: | ||||
"""Given a list of ids: | """Given a list of ids: | ||||
- retrieve the content from the storage | - retrieve the content from the storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results | - store the results | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 353 Lines • Show Last 20 Lines |
What does this mean? If you want to write a docstring, it should mention these keys are names of Kafka topics and values are list of values of messages in that topic