Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show First 20 Lines • Show All 313 Lines • ▼ Show 20 Lines | def process_journal_objects(self, objects: ObjectsDict) -> Dict: | ||||
"""Read content objects from the journal, retrieve their raw content and compute | """Read content objects from the journal, retrieve their raw content and compute | ||||
content indexing (e.g. mimetype, fossology license, ...). | content indexing (e.g. mimetype, fossology license, ...). | ||||
Note that once this is deployed, this supersedes the main ContentIndexer.run | Note that once this is deployed, this supersedes the main ContentIndexer.run | ||||
method call and the class ContentPartitionIndexer. | method call and the class ContentPartitionIndexer. | ||||
""" | """ | ||||
summary: Dict[str, Any] = {"status": "uneventful"} | summary: Dict[str, Any] = {"status": "uneventful"} | ||||
with sentry_sdk.push_scope() as scope: | |||||
try: | try: | ||||
results = [] | results = [] | ||||
contents = objects.get("content", []) | contents = objects.get("content", []) | ||||
# FIXME: with swh.objstorage > v2.0: self.objstorage.get_batch(contents) | # FIXME: with swh.objstorage > v2.0: self.objstorage.get_batch(contents) | ||||
content_data = self.objstorage.get_batch(c["sha1"] for c in contents) | content_data = self.objstorage.get_batch(c["sha1"] for c in contents) | ||||
for item, raw_content in zip(contents, content_data): | for item, raw_content in zip(contents, content_data): | ||||
id_ = item["sha1"] | id_ = item["sha1"] | ||||
scope.set_tag("swh-indexer-content-sha1", hashutil.hash_to_hex(id_)) | |||||
if not raw_content: | if not raw_content: | ||||
self.log.warning( | self.log.warning( | ||||
"Content %s not found in objstorage", hashutil.hash_to_hex(id_) | "Content %s not found in objstorage", | ||||
hashutil.hash_to_hex(id_), | |||||
) | ) | ||||
continue | continue | ||||
results.extend(self.index(id_, data=raw_content)) | results.extend(self.index(id_, data=raw_content)) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when reading contents metadata.") | self.log.exception("Problem when reading contents metadata.") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | return summary | ||||
summary_persist = self.persist_index_computations(results) | summary_persist = self.persist_index_computations(results) | ||||
self.results = results | self.results = results | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
Show All 21 Lines | def run(self, ids: List[Sha1], **kwargs) -> Dict: | ||||
) | ) | ||||
del kwargs["policy_update"] | del kwargs["policy_update"] | ||||
sha1s = [ | sha1s = [ | ||||
hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids | ||||
] | ] | ||||
results = [] | results = [] | ||||
summary: Dict = {"status": "uneventful"} | summary: Dict = {"status": "uneventful"} | ||||
with sentry_sdk.push_scope() as scope: | |||||
try: | try: | ||||
for sha1 in sha1s: | for sha1 in sha1s: | ||||
scope.set_tag( | |||||
"swh-indexer-content-sha1", hashutil.hash_to_hex(sha1) | |||||
) | |||||
try: | try: | ||||
raw_content = self.objstorage.get(sha1) | raw_content = self.objstorage.get(sha1) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
self.log.warning( | self.log.warning( | ||||
"Content %s not found in objstorage" | "Content %s not found in objstorage" | ||||
% hashutil.hash_to_hex(sha1) | % hashutil.hash_to_hex(sha1) | ||||
) | ) | ||||
continue | continue | ||||
res = self.index(sha1, raw_content, **kwargs) | res = self.index(sha1, raw_content, **kwargs) | ||||
if res: # If no results, skip it | if res: # If no results, skip it | ||||
results.extend(res) | results.extend(res) | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary = self.persist_index_computations(results) | summary = self.persist_index_computations(results) | ||||
self.results = results | self.results = results | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when reading contents metadata.") | self.log.exception("Problem when reading contents metadata.") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | return summary | ||||
class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): | class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): | ||||
"""A content partition indexer. | """A content partition indexer. | ||||
This expects as input a partition_id and a nb_partitions. This will then index the | This expects as input a partition_id and a nb_partitions. This will then index the | ||||
contents within that partition. | contents within that partition. | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | ) -> Iterator[TResult]: | ||||
start: Starting bound from range identifier | start: Starting bound from range identifier | ||||
end: End range identifier | end: End range identifier | ||||
indexed: Set of content already indexed. | indexed: Set of content already indexed. | ||||
Yields: | Yields: | ||||
indexing result as dict to persist in the indexer backend | indexing result as dict to persist in the indexer backend | ||||
""" | """ | ||||
for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): | with sentry_sdk.push_scope() as scope: | ||||
for sha1 in self._list_contents_to_index( | |||||
partition_id, nb_partitions, indexed | |||||
): | |||||
try: | try: | ||||
raw_content = self.objstorage.get(sha1) | raw_content = self.objstorage.get(sha1) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
self.log.warning(f"Content {sha1.hex()} not found in objstorage") | self.log.warning(f"Content {sha1.hex()} not found in objstorage") | ||||
continue | continue | ||||
scope.set_tag("swh-indexer-content-sha1", sha1) | |||||
yield from self.index(sha1, raw_content, **kwargs) | yield from self.index(sha1, raw_content, **kwargs) | ||||
def _index_with_skipping_already_done( | def _index_with_skipping_already_done( | ||||
self, partition_id: int, nb_partitions: int | self, partition_id: int, nb_partitions: int | ||||
) -> Iterator[TResult]: | ) -> Iterator[TResult]: | ||||
"""Index not already indexed contents within the partition partition_id | """Index not already indexed contents within the partition partition_id | ||||
Args: | Args: | ||||
partition_id: Index of the partition to fetch | partition_id: Index of the partition to fetch | ||||
▲ Show 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | def process_journal_objects(self, objects: ObjectsDict) -> Dict: | ||||
"""Worker function for ``JournalClient``.""" | """Worker function for ``JournalClient``.""" | ||||
origins = [ | origins = [ | ||||
Origin(url=status["origin"]) | Origin(url=status["origin"]) | ||||
for status in objects.get("origin_visit_status", []) | for status in objects.get("origin_visit_status", []) | ||||
if status["status"] == "full" | if status["status"] == "full" | ||||
] + [Origin(url=origin["url"]) for origin in objects.get("origin", [])] | ] + [Origin(url=origin["url"]) for origin in objects.get("origin", [])] | ||||
summary: Dict[str, Any] = {"status": "uneventful"} | summary: Dict[str, Any] = {"status": "uneventful"} | ||||
with sentry_sdk.push_scope() as scope: | |||||
try: | try: | ||||
results = self.index_list( | results = self.index_list( | ||||
origins, | origins, | ||||
check_origin_known=False, | # no need to check they exist, as we just received either an origin | ||||
# no need to check they exist, as we just received either an origin or | # or visit status; which cannot be created by swh-storage unless | ||||
# visit status; which cannot be created by swh-storage unless the origin | # the origin | ||||
anlambert: could save a line here by concatenating with comment line above | |||||
# already exists | # already exists | ||||
check_origin_known=False, | |||||
sentry_scope=scope, | |||||
) | ) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when processing origins") | |||||
sentry_sdk.capture_exception() | |||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
return summary | return summary | ||||
summary_persist = self.persist_index_computations(results) | summary_persist = self.persist_index_computations(results) | ||||
self.results = results | self.results = results | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
return summary | return summary | ||||
def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: | def index_list( | ||||
self, origins: List[Origin], *, sentry_scope=None, **kwargs | |||||
) -> List[TResult]: | |||||
results = [] | results = [] | ||||
for origin in origins: | for origin in origins: | ||||
try: | if sentry_scope is not None: | ||||
sentry_scope.set_tag("swh-indexer-origin-url", origin.url) | |||||
results.extend(self.index(origin.url, **kwargs)) | results.extend(self.index(origin.url, **kwargs)) | ||||
except Exception: | |||||
if not self.catch_exceptions: | |||||
raise | |||||
self.log.exception("Problem when processing origin %s", origin.url) | |||||
sentry_sdk.capture_exception() | |||||
raise | |||||
return results | return results | ||||
class DirectoryIndexer(BaseIndexer[Sha1Git, Directory, TResult], Generic[TResult]): | class DirectoryIndexer(BaseIndexer[Sha1Git, Directory, TResult], Generic[TResult]): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Directory indexing using the run method | implements Directory indexing using the run method | ||||
Note: the :class:`DirectoryIndexer` is not an instantiable object. | Note: the :class:`DirectoryIndexer` is not an instantiable object. | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def _process_directories( | ||||
directories: Union[List[Tuple[Sha1Git, Directory]], List[Tuple[Sha1Git, None]]], | directories: Union[List[Tuple[Sha1Git, Directory]], List[Tuple[Sha1Git, None]]], | ||||
) -> Dict: | ) -> Dict: | ||||
summary: Dict[str, Any] = {"status": "uneventful"} | summary: Dict[str, Any] = {"status": "uneventful"} | ||||
results = [] | results = [] | ||||
# TODO: fetch raw_manifest when useful? | # TODO: fetch raw_manifest when useful? | ||||
with sentry_sdk.push_scope() as scope: | |||||
for (dir_id, dir_) in directories: | for (dir_id, dir_) in directories: | ||||
swhid = f"swh:1:dir:{hashutil.hash_to_hex(dir_id)}" | |||||
scope.set_tag("swh-indexer-directory-swhid", swhid) | |||||
try: | try: | ||||
results.extend(self.index(dir_id, dir_)) | results.extend(self.index(dir_id, dir_)) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception("Problem when processing directory") | self.log.exception("Problem when processing directory") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
summary["status"] = "failed" | summary["status"] = "failed" | ||||
summary_persist = self.persist_index_computations(results) | summary_persist = self.persist_index_computations(results) | ||||
if summary_persist: | if summary_persist: | ||||
for value in summary_persist.values(): | for value in summary_persist.values(): | ||||
if value > 0: | if value > 0: | ||||
summary["status"] = "eventful" | summary["status"] = "eventful" | ||||
summary.update(summary_persist) | summary.update(summary_persist) | ||||
self.results = results | self.results = results | ||||
return summary | return summary |
could save a line here by concatenating with comment line above