Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata.py
| Show All 15 Lines | from typing import ( | |||||||||
| TypeVar, | TypeVar, | |||||||||
| ) | ) | |||||||||
| import sentry_sdk | import sentry_sdk | |||||||||
| from swh.core.config import merge_configs | from swh.core.config import merge_configs | |||||||||
| from swh.core.utils import grouper | from swh.core.utils import grouper | |||||||||
| from swh.indexer.codemeta import merge_documents | from swh.indexer.codemeta import merge_documents | |||||||||
| from swh.indexer.indexer import ContentIndexer, OriginIndexer, RevisionIndexer | from swh.indexer.indexer import ContentIndexer, DirectoryIndexer, OriginIndexer | |||||||||
| from swh.indexer.metadata_detector import detect_metadata | from swh.indexer.metadata_detector import detect_metadata | |||||||||
| from swh.indexer.metadata_dictionary import MAPPINGS | from swh.indexer.metadata_dictionary import MAPPINGS | |||||||||
| from swh.indexer.origin_head import get_head_swhid | from swh.indexer.origin_head import get_head_swhid | |||||||||
| from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 | from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 | |||||||||
| from swh.indexer.storage.model import ( | from swh.indexer.storage.model import ( | |||||||||
| ContentMetadataRow, | ContentMetadataRow, | |||||||||
| DirectoryIntrinsicMetadataRow, | ||||||||||
| OriginIntrinsicMetadataRow, | OriginIntrinsicMetadataRow, | |||||||||
| RevisionIntrinsicMetadataRow, | ||||||||||
| ) | ) | |||||||||
| from swh.model import hashutil | from swh.model import hashutil | |||||||||
| from swh.model.model import Origin, Revision, Sha1Git | from swh.model.model import Directory, Origin, Sha1Git | |||||||||
| from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | |||||||||
| REVISION_GET_BATCH_SIZE = 10 | REVISION_GET_BATCH_SIZE = 10 | |||||||||
| ORIGIN_GET_BATCH_SIZE = 10 | ORIGIN_GET_BATCH_SIZE = 10 | |||||||||
| T1 = TypeVar("T1") | T1 = TypeVar("T1") | |||||||||
| T2 = TypeVar("T2") | T2 = TypeVar("T2") | |||||||||
| Show All 34 Lines | def filter(self, ids): | |||||||||
| for sha1 in ids | for sha1 in ids | |||||||||
| ) | ) | |||||||||
| ) | ) | |||||||||
| def index( | def index( | |||||||||
| self, | self, | |||||||||
| id: Sha1, | id: Sha1, | |||||||||
| data: Optional[bytes] = None, | data: Optional[bytes] = None, | |||||||||
| log_suffix="unknown revision", | log_suffix="unknown directory", | |||||||||
| **kwargs, | **kwargs, | |||||||||
| ) -> List[ContentMetadataRow]: | ) -> List[ContentMetadataRow]: | |||||||||
| """Index sha1s' content and store result. | """Index sha1s' content and store result. | |||||||||
| Args: | Args: | |||||||||
| id: content's identifier | id: content's identifier | |||||||||
| data: raw content in bytes | data: raw content in bytes | |||||||||
| ▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | DEFAULT_CONFIG: Dict[str, Any] = { | |||||||||
| "tools": { | "tools": { | |||||||||
| "name": "swh-metadata-detector", | "name": "swh-metadata-detector", | |||||||||
| "version": "0.0.2", | "version": "0.0.2", | |||||||||
| "configuration": {}, | "configuration": {}, | |||||||||
| }, | }, | |||||||||
| } | } | |||||||||
| class RevisionMetadataIndexer(RevisionIndexer[RevisionIntrinsicMetadataRow]): | class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): | |||||||||
| """Revision-level indexer | """Directory-level indexer | |||||||||
| This indexer is in charge of: | This indexer is in charge of: | |||||||||
| - filtering revisions already indexed in revision_intrinsic_metadata table | - filtering directories already indexed in directory_intrinsic_metadata table | |||||||||
| with defined computation tool | with defined computation tool | |||||||||
| - retrieve all entry_files in root directory | - retrieve all entry_files in directory | |||||||||
| - use metadata_detector for file_names containing metadata | - use metadata_detector for file_names containing metadata | |||||||||
| - compute metadata translation if necessary and possible (depends on tool) | - compute metadata translation if necessary and possible (depends on tool) | |||||||||
| - send sha1s to content indexing if possible | - send sha1s to content indexing if possible | |||||||||
| - store the results for revision | - store the results for directory | |||||||||
| """ | """ | |||||||||
| def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | |||||||||
| super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | |||||||||
| self.config = merge_configs(DEFAULT_CONFIG, self.config) | self.config = merge_configs(DEFAULT_CONFIG, self.config) | |||||||||
| def filter(self, sha1_gits): | def filter(self, sha1_gits): | |||||||||
| """Filter out known sha1s and return only missing ones.""" | """Filter out known sha1s and return only missing ones.""" | |||||||||
| yield from self.idx_storage.revision_intrinsic_metadata_missing( | yield from self.idx_storage.directory_intrinsic_metadata_missing( | |||||||||
| ( | ( | |||||||||
| { | { | |||||||||
| "id": sha1_git, | "id": sha1_git, | |||||||||
| "indexer_configuration_id": self.tool["id"], | "indexer_configuration_id": self.tool["id"], | |||||||||
| } | } | |||||||||
| for sha1_git in sha1_gits | for sha1_git in sha1_gits | |||||||||
| ) | ) | |||||||||
| ) | ) | |||||||||
| def index( | def index( | |||||||||
| self, id: Sha1Git, data: Optional[Revision], **kwargs | self, id: Sha1Git, data: Optional[Directory] = None, **kwargs | |||||||||
| ) -> List[RevisionIntrinsicMetadataRow]: | ) -> List[DirectoryIntrinsicMetadataRow]: | |||||||||
| """Index rev by processing it and organizing result. | """Index directory by processing it and organizing result. | |||||||||
| use metadata_detector to iterate on filenames | use metadata_detector to iterate on filenames | |||||||||
| - if one filename detected -> sends file to content indexer | - if one filename detected -> sends file to content indexer | |||||||||
| - if multiple file detected -> translation needed at revision level | - if multiple file detected -> translation needed at directory level | |||||||||
| Args: | Args: | |||||||||
| id: sha1_git of the revision | id: sha1_git of the directory | |||||||||
| data: revision model object from storage | data: directory model object from storage | |||||||||
| Returns: | Returns: | |||||||||
| dict: dictionary representing a revision_intrinsic_metadata, with | dict: dictionary representing a directory_intrinsic_metadata, with | |||||||||
| keys: | keys: | |||||||||
| - id (str): rev's identifier (sha1_git) | - id (str): directory's identifier (sha1_git) | |||||||||
ardumontUnsubmitted Done Inline Actions
ardumont: | ||||||||||
| - indexer_configuration_id (bytes): tool used | - indexer_configuration_id (bytes): tool used | |||||||||
| - metadata: dict of retrieved metadata | - metadata: dict of retrieved metadata | |||||||||
| """ | """ | |||||||||
| rev = data | if data is None: | |||||||||
| assert isinstance(rev, Revision) | dir_ = list(self.storage.directory_ls(id, recursive=False)) | |||||||||
| else: | ||||||||||
| assert isinstance(data, Directory) | ||||||||||
| dir_ = data.to_dict() | ||||||||||
| try: | try: | |||||||||
| root_dir = rev.directory | if [entry["type"] for entry in dir_] == ["dir"]: | |||||||||
| dir_ls = list(self.storage.directory_ls(root_dir, recursive=False)) | ||||||||||
| if [entry["type"] for entry in dir_ls] == ["dir"]: | ||||||||||
| # If the root is just a single directory, recurse into it | # If the root is just a single directory, recurse into it | |||||||||
| # eg. PyPI packages, GNU tarballs | # eg. PyPI packages, GNU tarballs | |||||||||
| subdir = dir_ls[0]["target"] | subdir = dir_[0]["target"] | |||||||||
| dir_ls = list(self.storage.directory_ls(subdir, recursive=False)) | dir_ = list(self.storage.directory_ls(subdir, recursive=False)) | |||||||||
| files = [entry for entry in dir_ls if entry["type"] == "file"] | files = [entry for entry in dir_ if entry["type"] == "file"] | |||||||||
| detected_files = detect_metadata(files) | detected_files = detect_metadata(files) | |||||||||
| (mappings, metadata) = self.translate_revision_intrinsic_metadata( | (mappings, metadata) = self.translate_directory_intrinsic_metadata( | |||||||||
| detected_files, | detected_files, | |||||||||
| log_suffix="revision=%s" % hashutil.hash_to_hex(rev.id), | log_suffix="directory=%s" % hashutil.hash_to_hex(id), | |||||||||
| ) | ) | |||||||||
| except Exception as e: | except Exception as e: | |||||||||
| self.log.exception("Problem when indexing rev: %r", e) | self.log.exception("Problem when indexing dir: %r", e) | |||||||||
| sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | |||||||||
| return [ | return [ | |||||||||
| RevisionIntrinsicMetadataRow( | DirectoryIntrinsicMetadataRow( | |||||||||
| id=rev.id, | id=id, | |||||||||
| indexer_configuration_id=self.tool["id"], | indexer_configuration_id=self.tool["id"], | |||||||||
| mappings=mappings, | mappings=mappings, | |||||||||
| metadata=metadata, | metadata=metadata, | |||||||||
| ) | ) | |||||||||
| ] | ] | |||||||||
| def persist_index_computations( | def persist_index_computations( | |||||||||
| self, results: List[RevisionIntrinsicMetadataRow] | self, results: List[DirectoryIntrinsicMetadataRow] | |||||||||
| ) -> Dict[str, int]: | ) -> Dict[str, int]: | |||||||||
| """Persist the results in storage. | """Persist the results in storage. | |||||||||
| Args: | Args: | |||||||||
| results: list of content_mimetype, dict with the | results: list of content_mimetype, dict with the | |||||||||
| following keys: | following keys: | |||||||||
| - id (bytes): content's identifier (sha1) | - id (bytes): content's identifier (sha1) | |||||||||
| - mimetype (bytes): mimetype in bytes | - mimetype (bytes): mimetype in bytes | |||||||||
| - encoding (bytes): encoding in bytes | - encoding (bytes): encoding in bytes | |||||||||
| """ | """ | |||||||||
| # TODO: add functions in storage to keep data in | # TODO: add functions in storage to keep data in | |||||||||
| # revision_intrinsic_metadata | # directory_intrinsic_metadata | |||||||||
| return self.idx_storage.revision_intrinsic_metadata_add(results) | return self.idx_storage.directory_intrinsic_metadata_add(results) | |||||||||
| def translate_revision_intrinsic_metadata( | def translate_directory_intrinsic_metadata( | |||||||||
| self, detected_files: Dict[str, List[Any]], log_suffix: str | self, detected_files: Dict[str, List[Any]], log_suffix: str | |||||||||
| ) -> Tuple[List[Any], Any]: | ) -> Tuple[List[Any], Any]: | |||||||||
| """ | """ | |||||||||
| Determine plan of action to translate metadata when containing | Determine plan of action to translate metadata when containing | |||||||||
| one or multiple detected files: | one or multiple detected files: | |||||||||
| Args: | Args: | |||||||||
| detected_files: dictionary mapping context names (e.g., | detected_files: dictionary mapping context names (e.g., | |||||||||
| ▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | ) -> Tuple[List[Any], Any]: | |||||||||
| self.log.exception("Exception while indexing metadata on contents") | self.log.exception("Exception while indexing metadata on contents") | |||||||||
| sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | |||||||||
| metadata = merge_documents(metadata) | metadata = merge_documents(metadata) | |||||||||
| return (used_mappings, metadata) | return (used_mappings, metadata) | |||||||||
| class OriginMetadataIndexer( | class OriginMetadataIndexer( | |||||||||
| OriginIndexer[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]] | OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] | |||||||||
| ): | ): | |||||||||
| USE_TOOLS = False | USE_TOOLS = False | |||||||||
| def __init__(self, config=None, **kwargs) -> None: | def __init__(self, config=None, **kwargs) -> None: | |||||||||
| super().__init__(config=config, **kwargs) | super().__init__(config=config, **kwargs) | |||||||||
| self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) | self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) | |||||||||
| def index_list( | def index_list( | |||||||||
| self, origins: List[Origin], check_origin_known: bool = True, **kwargs | self, origins: List[Origin], check_origin_known: bool = True, **kwargs | |||||||||
| ) -> List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]]: | ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: | |||||||||
| head_rev_ids = [] | head_rev_ids = [] | |||||||||
| origins_with_head = [] | origins_with_head = [] | |||||||||
| # Filter out origins not in the storage | # Filter out origins not in the storage | |||||||||
| if check_origin_known: | if check_origin_known: | |||||||||
| known_origins = list( | known_origins = list( | |||||||||
| call_with_batches( | call_with_batches( | |||||||||
| self.storage.origin_get, | self.storage.origin_get, | |||||||||
| Show All 22 Lines | ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: | |||||||||
| assert len(head_revs) == len(head_rev_ids) | assert len(head_revs) == len(head_rev_ids) | |||||||||
| results = [] | results = [] | |||||||||
| for (origin, rev) in zip(origins_with_head, head_revs): | for (origin, rev) in zip(origins_with_head, head_revs): | |||||||||
| if not rev: | if not rev: | |||||||||
| self.log.warning("Missing head revision of origin %r", origin.url) | self.log.warning("Missing head revision of origin %r", origin.url) | |||||||||
| continue | continue | |||||||||
| for rev_metadata in self.revision_metadata_indexer.index(rev.id, rev): | for dir_metadata in self.directory_metadata_indexer.index(rev.directory): | |||||||||
| # There is at most one rev_metadata | # There is at most one dir_metadata | |||||||||
| orig_metadata = OriginIntrinsicMetadataRow( | orig_metadata = OriginIntrinsicMetadataRow( | |||||||||
| from_revision=rev_metadata.id, | from_directory=dir_metadata.id, | |||||||||
| id=origin.url, | id=origin.url, | |||||||||
| metadata=rev_metadata.metadata, | metadata=dir_metadata.metadata, | |||||||||
| mappings=rev_metadata.mappings, | mappings=dir_metadata.mappings, | |||||||||
| indexer_configuration_id=rev_metadata.indexer_configuration_id, | indexer_configuration_id=dir_metadata.indexer_configuration_id, | |||||||||
| ) | ) | |||||||||
| results.append((orig_metadata, rev_metadata)) | results.append((orig_metadata, dir_metadata)) | |||||||||
| return results | return results | |||||||||
| def persist_index_computations( | def persist_index_computations( | |||||||||
| self, | self, | |||||||||
| results: List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]], | results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], | |||||||||
| ) -> Dict[str, int]: | ) -> Dict[str, int]: | |||||||||
| # Deduplicate revisions | # Deduplicate directories | |||||||||
| rev_metadata: List[RevisionIntrinsicMetadataRow] = [] | dir_metadata: List[DirectoryIntrinsicMetadataRow] = [] | |||||||||
| orig_metadata: List[OriginIntrinsicMetadataRow] = [] | orig_metadata: List[OriginIntrinsicMetadataRow] = [] | |||||||||
| summary: Dict = {} | summary: Dict = {} | |||||||||
| for (orig_item, rev_item) in results: | for (orig_item, dir_item) in results: | |||||||||
| assert rev_item.metadata == orig_item.metadata | assert dir_item.metadata == orig_item.metadata | |||||||||
| if rev_item.metadata and not (rev_item.metadata.keys() <= {"@context"}): | if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): | |||||||||
| # Only store non-empty metadata sets | # Only store non-empty metadata sets | |||||||||
| if rev_item not in rev_metadata: | if dir_item not in dir_metadata: | |||||||||
| rev_metadata.append(rev_item) | dir_metadata.append(dir_item) | |||||||||
| if orig_item not in orig_metadata: | if orig_item not in orig_metadata: | |||||||||
| orig_metadata.append(orig_item) | orig_metadata.append(orig_item) | |||||||||
| if rev_metadata: | if dir_metadata: | |||||||||
| summary_rev = self.idx_storage.revision_intrinsic_metadata_add(rev_metadata) | summary_dir = self.idx_storage.directory_intrinsic_metadata_add( | |||||||||
| summary.update(summary_rev) | dir_metadata | |||||||||
| ) | ||||||||||
| summary.update(summary_dir) | ||||||||||
| if orig_metadata: | if orig_metadata: | |||||||||
| summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) | summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) | |||||||||
| summary.update(summary_ori) | summary.update(summary_ori) | |||||||||
| return summary | return summary | |||||||||