Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show First 20 Lines • Show All 338 Lines • ▼ Show 20 Lines | def process_journal_objects(self, objects: ObjectsDict) -> Dict: | ||||
results.extend(self.index(id_, data=raw_content)) | results.extend(self.index(id_, data=raw_content)) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when reading contents metadata.") | self.log.exception("Problem when reading contents metadata.") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | return summary | ||||
else: | |||||
# Reset tag after we finished processing the given content | |||||
sentry_sdk.set_tag("swh-indexer-content-sha1", "") | |||||
summary_persist = self.persist_index_computations(results) | summary_persist = self.persist_index_computations(results) | ||||
self.results = results | self.results = results | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def run(self, ids: List[Sha1], **kwargs) -> Dict: | ||||
summary = self.persist_index_computations(results) | summary = self.persist_index_computations(results) | ||||
self.results = results | self.results = results | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when reading contents metadata.") | self.log.exception("Problem when reading contents metadata.") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
else: | |||||
# Reset tag after we finished processing the given content | |||||
sentry_sdk.set_tag("swh-indexer-content-sha1", "") | |||||
return summary | return summary | ||||
class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): | class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): | ||||
"""A content partition indexer. | """A content partition indexer. | ||||
This expects as input a partition_id and a nb_partitions. This will then index the | This expects as input a partition_id and a nb_partitions. This will then index the | ||||
contents within that partition. | contents within that partition. | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | ) -> Iterator[TResult]: | ||||
for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): | for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): | ||||
try: | try: | ||||
raw_content = self.objstorage.get(sha1) | raw_content = self.objstorage.get(sha1) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
self.log.warning(f"Content {sha1.hex()} not found in objstorage") | self.log.warning(f"Content {sha1.hex()} not found in objstorage") | ||||
continue | continue | ||||
sentry_sdk.set_tag("swh-indexer-content-sha1", sha1) | sentry_sdk.set_tag("swh-indexer-content-sha1", sha1) | ||||
yield from self.index(sha1, raw_content, **kwargs) | yield from self.index(sha1, raw_content, **kwargs) | ||||
sentry_sdk.set_tag("swh-indexer-content-sha1", "") | |||||
def _index_with_skipping_already_done( | def _index_with_skipping_already_done( | ||||
self, partition_id: int, nb_partitions: int | self, partition_id: int, nb_partitions: int | ||||
) -> Iterator[TResult]: | ) -> Iterator[TResult]: | ||||
"""Index not already indexed contents within the partition partition_id | """Index not already indexed contents within the partition partition_id | ||||
Args: | Args: | ||||
partition_id: Index of the partition to fetch | partition_id: Index of the partition to fetch | ||||
▲ Show 20 Lines • Show All 133 Lines • ▼ Show 20 Lines | def process_journal_objects(self, objects: ObjectsDict) -> Dict: | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
return summary | return summary | ||||
def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: | def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: | ||||
results = [] | results = [] | ||||
for origin in origins: | for origin in origins: | ||||
sentry_sdk.set_tag("swh-indexer-origin-url", origin.url) | sentry_sdk.set_tag("swh-indexer-origin-url", origin.url) | ||||
results.extend(self.index(origin.url, **kwargs)) | results.extend(self.index(origin.url, **kwargs)) | ||||
sentry_sdk.set_tag("swh-indexer-origin-url", "") | |||||
return results | return results | ||||
class DirectoryIndexer(BaseIndexer[Sha1Git, Directory, TResult], Generic[TResult]): | class DirectoryIndexer(BaseIndexer[Sha1Git, Directory, TResult], Generic[TResult]): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Directory indexing using the run method | implements Directory indexing using the run method | ||||
Note: the :class:`DirectoryIndexer` is not an instantiable object. | Note: the :class:`DirectoryIndexer` is not an instantiable object. | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | ) -> Dict: | ||||
try: | try: | ||||
results.extend(self.index(dir_id, dir_)) | results.extend(self.index(dir_id, dir_)) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when processing directory") | self.log.exception("Problem when processing directory") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
else: | |||||
sentry_sdk.set_tag("swh-indexer-directory-swhid", "") | |||||
summary_persist = self.persist_index_computations(results) | summary_persist = self.persist_index_computations(results) | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
self.results = results | self.results = results | ||||
return summary | return summary |