diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -303,15 +303,31 @@ ) ) + from swh.indexer.indexer import BaseIndexer + + idx: BaseIndexer if indexer in ("origin-intrinsic-metadata", "*"): from swh.indexer.metadata import OriginMetadataIndexer object_types.add("origin_visit_status") idx = OriginMetadataIndexer() + + if indexer in ("content-mimetype", "*"): + from swh.indexer.mimetype import MimetypeIndexer + + object_types.add("content") + idx = MimetypeIndexer() + + if indexer in ("content-fossology-license", "*"): + from swh.indexer.fossology_license import FossologyLicenseIndexer + + object_types.add("content") + idx = FossologyLicenseIndexer() + + if idx: idx.catch_exceptions = False # don't commit offsets if indexation failed worker_fns.append(idx.process_journal_objects) - - if not worker_fns: + else: raise click.ClickException(f"Unknown indexer: {indexer}") client = get_journal_client( diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -41,6 +41,9 @@ class ObjectsDict(TypedDict, total=False): + """Typed objects.""" + + content: List[Dict] directory: List[Dict] origin: List[Dict] origin_visit_status: List[Dict] @@ -282,12 +285,15 @@ """ return {} + def process_journal_objects(self, objects: ObjectsDict) -> Dict: + """Compute indexer result out of the journal.""" + return {} + class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content indexer working on a list of ids directly. - To work on indexer partition, use the :class:`ContentPartitionIndexer` - instead. + To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To use it, one should inherit from this class and override the @@ -295,6 +301,41 @@ """ + def process_journal_objects(self, objects: ObjectsDict) -> Dict: + """Compute content indexer out of the journal. + + Note that when we will deploy this, this renders the main run method obsolete. + + """ + summary: Dict[str, Any] = {"status": "uneventful"} + try: + results = [] + for item in objects.get("content", []): + id_ = item["sha1"] + try: + raw_content = self.objstorage.get(item) + except ObjNotFoundError: + self.log.warning( + "Content %s not found in objstorage", hashutil.hash_to_hex(id_) + ) + continue + + results.extend(self.index(id_, data=raw_content)) + except Exception: + if not self.catch_exceptions: + raise + summary["status"] = "failed" + return summary + + summary_persist = self.persist_index_computations(results) + self.results = results + if summary_persist: + for value in summary_persist.values(): + if value > 0: + summary["status"] = "eventful" + summary.update(summary_persist) + return summary + def run(self, ids: List[Sha1], **kwargs) -> Dict: """Given a list of ids: