diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -318,6 +318,20 @@ object_types.add("raw_extrinsic_metadata") idx = ExtrinsicMetadataIndexer() + + if indexer in ("content-mimetype", "*"): + from swh.indexer.mimetype import MimetypeIndexer + + object_types.add("content") + idx = MimetypeIndexer() + idx.catch_exceptions = False # don't commit offsets if indexation failed + worker_fns.append(idx.process_journal_objects) + + if indexer in ("content-fossology-license", "*"): + from swh.indexer.fossology_license import FossologyLicenseIndexer + + object_types.add("content") + idx = FossologyLicenseIndexer() idx.catch_exceptions = False # don't commit offsets if indexation failed worker_fns.append(idx.process_journal_objects) diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -41,6 +41,9 @@ class ObjectsDict(TypedDict, total=False): + """Typed objects.""" + + content: List[Dict] directory: List[Dict] origin: List[Dict] origin_visit_status: List[Dict] @@ -282,12 +285,23 @@ """ return {} + def process_journal_objects(self, objects: ObjectsDict) -> Dict: + """Read swh message objects (content, origin, ...) from the journal to: + + - retrieve the associated objects from the storage backend (e.g. storage, + objstorage...) + - execute the associated indexing computations + - store the results in the indexer storage + + """ + raise NotImplementedError() + class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): - """A content indexer working on a list of ids directly. + """A content indexer working on the journal (method `process_journal_objects`) or on + a list of ids directly (method `run`). - To work on indexer partition, use the :class:`ContentPartitionIndexer` - instead. + To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To use it, one should inherit from this class and override the @@ -295,6 +309,44 @@ """ + def process_journal_objects(self, objects: ObjectsDict) -> Dict: + """Read content objects from the journal, retrieve their raw content and compute + content indexing (e.g. mimetype, fossology license, ...). + + Note that once this is deployed, this supersedes the main ContentIndexer.run + method call and the class ContentPartitionIndexer. + + """ + summary: Dict[str, Any] = {"status": "uneventful"} + try: + results = [] + content_ids = objects.get("content", []) + for item, raw_content in zip( + content_ids, self.objstorage.get_batch(content_ids) + ): + id_ = item["sha1"] + if not raw_content: + self.log.warning( + "Content %s not found in objstorage", hashutil.hash_to_hex(id_) + ) + continue + + results.extend(self.index(id_, data=raw_content)) + except Exception: + if not self.catch_exceptions: + raise + summary["status"] = "failed" + return summary + + summary_persist = self.persist_index_computations(results) + self.results = results + if summary_persist: + for value in summary_persist.values(): + if value > 0: + summary["status"] = "eventful" + summary.update(summary_persist) + return summary + def run(self, ids: List[Sha1], **kwargs) -> Dict: """Given a list of ids: