Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata.py
Show All 15 Lines | from typing import ( | |||||||||
TypeVar, | TypeVar, | |||||||||
) | ) | |||||||||
import sentry_sdk | import sentry_sdk | |||||||||
from swh.core.config import merge_configs | from swh.core.config import merge_configs | |||||||||
from swh.core.utils import grouper | from swh.core.utils import grouper | |||||||||
from swh.indexer.codemeta import merge_documents | from swh.indexer.codemeta import merge_documents | |||||||||
from swh.indexer.indexer import ContentIndexer, OriginIndexer, RevisionIndexer | from swh.indexer.indexer import ContentIndexer, DirectoryIndexer, OriginIndexer | |||||||||
from swh.indexer.metadata_detector import detect_metadata | from swh.indexer.metadata_detector import detect_metadata | |||||||||
from swh.indexer.metadata_dictionary import MAPPINGS | from swh.indexer.metadata_dictionary import MAPPINGS | |||||||||
from swh.indexer.origin_head import get_head_swhid | from swh.indexer.origin_head import get_head_swhid | |||||||||
from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 | from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 | |||||||||
from swh.indexer.storage.model import ( | from swh.indexer.storage.model import ( | |||||||||
ContentMetadataRow, | ContentMetadataRow, | |||||||||
DirectoryIntrinsicMetadataRow, | ||||||||||
OriginIntrinsicMetadataRow, | OriginIntrinsicMetadataRow, | |||||||||
RevisionIntrinsicMetadataRow, | ||||||||||
) | ) | |||||||||
from swh.model import hashutil | from swh.model import hashutil | |||||||||
from swh.model.model import Origin, Revision, Sha1Git | from swh.model.model import Directory, Origin, Sha1Git | |||||||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | |||||||||
REVISION_GET_BATCH_SIZE = 10 | REVISION_GET_BATCH_SIZE = 10 | |||||||||
ORIGIN_GET_BATCH_SIZE = 10 | ORIGIN_GET_BATCH_SIZE = 10 | |||||||||
T1 = TypeVar("T1") | T1 = TypeVar("T1") | |||||||||
T2 = TypeVar("T2") | T2 = TypeVar("T2") | |||||||||
Show All 34 Lines | def filter(self, ids): | |||||||||
for sha1 in ids | for sha1 in ids | |||||||||
) | ) | |||||||||
) | ) | |||||||||
def index( | def index( | |||||||||
self, | self, | |||||||||
id: Sha1, | id: Sha1, | |||||||||
data: Optional[bytes] = None, | data: Optional[bytes] = None, | |||||||||
log_suffix="unknown revision", | log_suffix="unknown directory", | |||||||||
**kwargs, | **kwargs, | |||||||||
) -> List[ContentMetadataRow]: | ) -> List[ContentMetadataRow]: | |||||||||
"""Index sha1s' content and store result. | """Index sha1s' content and store result. | |||||||||
Args: | Args: | |||||||||
id: content's identifier | id: content's identifier | |||||||||
data: raw content in bytes | data: raw content in bytes | |||||||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | DEFAULT_CONFIG: Dict[str, Any] = { | |||||||||
"tools": { | "tools": { | |||||||||
"name": "swh-metadata-detector", | "name": "swh-metadata-detector", | |||||||||
"version": "0.0.2", | "version": "0.0.2", | |||||||||
"configuration": {}, | "configuration": {}, | |||||||||
}, | }, | |||||||||
} | } | |||||||||
class RevisionMetadataIndexer(RevisionIndexer[RevisionIntrinsicMetadataRow]): | class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): | |||||||||
"""Revision-level indexer | """Directory-level indexer | |||||||||
This indexer is in charge of: | This indexer is in charge of: | |||||||||
- filtering revisions already indexed in revision_intrinsic_metadata table | - filtering directories already indexed in directory_intrinsic_metadata table | |||||||||
with defined computation tool | with defined computation tool | |||||||||
- retrieve all entry_files in root directory | - retrieve all entry_files in directory | |||||||||
- use metadata_detector for file_names containing metadata | - use metadata_detector for file_names containing metadata | |||||||||
- compute metadata translation if necessary and possible (depends on tool) | - compute metadata translation if necessary and possible (depends on tool) | |||||||||
- send sha1s to content indexing if possible | - send sha1s to content indexing if possible | |||||||||
- store the results for revision | - store the results for directory | |||||||||
""" | """ | |||||||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | |||||||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | |||||||||
self.config = merge_configs(DEFAULT_CONFIG, self.config) | self.config = merge_configs(DEFAULT_CONFIG, self.config) | |||||||||
def filter(self, sha1_gits): | def filter(self, sha1_gits): | |||||||||
"""Filter out known sha1s and return only missing ones.""" | """Filter out known sha1s and return only missing ones.""" | |||||||||
yield from self.idx_storage.revision_intrinsic_metadata_missing( | yield from self.idx_storage.directory_intrinsic_metadata_missing( | |||||||||
( | ( | |||||||||
{ | { | |||||||||
"id": sha1_git, | "id": sha1_git, | |||||||||
"indexer_configuration_id": self.tool["id"], | "indexer_configuration_id": self.tool["id"], | |||||||||
} | } | |||||||||
for sha1_git in sha1_gits | for sha1_git in sha1_gits | |||||||||
) | ) | |||||||||
) | ) | |||||||||
def index( | def index( | |||||||||
self, id: Sha1Git, data: Optional[Revision], **kwargs | self, id: Sha1Git, data: Optional[Directory] = None, **kwargs | |||||||||
) -> List[RevisionIntrinsicMetadataRow]: | ) -> List[DirectoryIntrinsicMetadataRow]: | |||||||||
"""Index rev by processing it and organizing result. | """Index directory by processing it and organizing result. | |||||||||
use metadata_detector to iterate on filenames | use metadata_detector to iterate on filenames | |||||||||
- if one filename detected -> sends file to content indexer | - if one filename detected -> sends file to content indexer | |||||||||
- if multiple file detected -> translation needed at revision level | - if multiple file detected -> translation needed at directory level | |||||||||
Args: | Args: | |||||||||
id: sha1_git of the revision | id: sha1_git of the directory | |||||||||
data: revision model object from storage | data: directory model object from storage | |||||||||
Returns: | Returns: | |||||||||
dict: dictionary representing a revision_intrinsic_metadata, with | dict: dictionary representing a directory_intrinsic_metadata, with | |||||||||
keys: | keys: | |||||||||
- id (str): rev's identifier (sha1_git) | - id (str): directory's identifier (sha1_git) | |||||||||
ardumontUnsubmitted Done Inline Actions
ardumont: | ||||||||||
- indexer_configuration_id (bytes): tool used | - indexer_configuration_id (bytes): tool used | |||||||||
- metadata: dict of retrieved metadata | - metadata: dict of retrieved metadata | |||||||||
""" | """ | |||||||||
rev = data | if data is None: | |||||||||
assert isinstance(rev, Revision) | dir_ = list(self.storage.directory_ls(id, recursive=False)) | |||||||||
else: | ||||||||||
assert isinstance(data, Directory) | ||||||||||
dir_ = data.to_dict() | ||||||||||
try: | try: | |||||||||
root_dir = rev.directory | if [entry["type"] for entry in dir_] == ["dir"]: | |||||||||
dir_ls = list(self.storage.directory_ls(root_dir, recursive=False)) | ||||||||||
if [entry["type"] for entry in dir_ls] == ["dir"]: | ||||||||||
# If the root is just a single directory, recurse into it | # If the root is just a single directory, recurse into it | |||||||||
# eg. PyPI packages, GNU tarballs | # eg. PyPI packages, GNU tarballs | |||||||||
subdir = dir_ls[0]["target"] | subdir = dir_[0]["target"] | |||||||||
dir_ls = list(self.storage.directory_ls(subdir, recursive=False)) | dir_ = list(self.storage.directory_ls(subdir, recursive=False)) | |||||||||
files = [entry for entry in dir_ls if entry["type"] == "file"] | files = [entry for entry in dir_ if entry["type"] == "file"] | |||||||||
detected_files = detect_metadata(files) | detected_files = detect_metadata(files) | |||||||||
(mappings, metadata) = self.translate_revision_intrinsic_metadata( | (mappings, metadata) = self.translate_directory_intrinsic_metadata( | |||||||||
detected_files, | detected_files, | |||||||||
log_suffix="revision=%s" % hashutil.hash_to_hex(rev.id), | log_suffix="directory=%s" % hashutil.hash_to_hex(id), | |||||||||
) | ) | |||||||||
except Exception as e: | except Exception as e: | |||||||||
self.log.exception("Problem when indexing rev: %r", e) | self.log.exception("Problem when indexing dir: %r", e) | |||||||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | |||||||||
return [ | return [ | |||||||||
RevisionIntrinsicMetadataRow( | DirectoryIntrinsicMetadataRow( | |||||||||
id=rev.id, | id=id, | |||||||||
indexer_configuration_id=self.tool["id"], | indexer_configuration_id=self.tool["id"], | |||||||||
mappings=mappings, | mappings=mappings, | |||||||||
metadata=metadata, | metadata=metadata, | |||||||||
) | ) | |||||||||
] | ] | |||||||||
def persist_index_computations( | def persist_index_computations( | |||||||||
self, results: List[RevisionIntrinsicMetadataRow] | self, results: List[DirectoryIntrinsicMetadataRow] | |||||||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | |||||||||
"""Persist the results in storage. | """Persist the results in storage. | |||||||||
Args: | Args: | |||||||||
results: list of content_mimetype, dict with the | results: list of content_mimetype, dict with the | |||||||||
following keys: | following keys: | |||||||||
- id (bytes): content's identifier (sha1) | - id (bytes): content's identifier (sha1) | |||||||||
- mimetype (bytes): mimetype in bytes | - mimetype (bytes): mimetype in bytes | |||||||||
- encoding (bytes): encoding in bytes | - encoding (bytes): encoding in bytes | |||||||||
""" | """ | |||||||||
# TODO: add functions in storage to keep data in | # TODO: add functions in storage to keep data in | |||||||||
# revision_intrinsic_metadata | # directory_intrinsic_metadata | |||||||||
return self.idx_storage.revision_intrinsic_metadata_add(results) | return self.idx_storage.directory_intrinsic_metadata_add(results) | |||||||||
def translate_revision_intrinsic_metadata( | def translate_directory_intrinsic_metadata( | |||||||||
self, detected_files: Dict[str, List[Any]], log_suffix: str | self, detected_files: Dict[str, List[Any]], log_suffix: str | |||||||||
) -> Tuple[List[Any], Any]: | ) -> Tuple[List[Any], Any]: | |||||||||
""" | """ | |||||||||
Determine plan of action to translate metadata when containing | Determine plan of action to translate metadata when containing | |||||||||
one or multiple detected files: | one or multiple detected files: | |||||||||
Args: | Args: | |||||||||
detected_files: dictionary mapping context names (e.g., | detected_files: dictionary mapping context names (e.g., | |||||||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | ) -> Tuple[List[Any], Any]: | |||||||||
self.log.exception("Exception while indexing metadata on contents") | self.log.exception("Exception while indexing metadata on contents") | |||||||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | |||||||||
metadata = merge_documents(metadata) | metadata = merge_documents(metadata) | |||||||||
return (used_mappings, metadata) | return (used_mappings, metadata) | |||||||||
class OriginMetadataIndexer( | class OriginMetadataIndexer( | |||||||||
OriginIndexer[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]] | OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] | |||||||||
): | ): | |||||||||
USE_TOOLS = False | USE_TOOLS = False | |||||||||
def __init__(self, config=None, **kwargs) -> None: | def __init__(self, config=None, **kwargs) -> None: | |||||||||
super().__init__(config=config, **kwargs) | super().__init__(config=config, **kwargs) | |||||||||
self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) | self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) | |||||||||
def index_list( | def index_list( | |||||||||
self, origins: List[Origin], check_origin_known: bool = True, **kwargs | self, origins: List[Origin], check_origin_known: bool = True, **kwargs | |||||||||
) -> List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]]: | ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: | |||||||||
head_rev_ids = [] | head_rev_ids = [] | |||||||||
origins_with_head = [] | origins_with_head = [] | |||||||||
# Filter out origins not in the storage | # Filter out origins not in the storage | |||||||||
if check_origin_known: | if check_origin_known: | |||||||||
known_origins = list( | known_origins = list( | |||||||||
call_with_batches( | call_with_batches( | |||||||||
self.storage.origin_get, | self.storage.origin_get, | |||||||||
Show All 22 Lines | ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: | |||||||||
assert len(head_revs) == len(head_rev_ids) | assert len(head_revs) == len(head_rev_ids) | |||||||||
results = [] | results = [] | |||||||||
for (origin, rev) in zip(origins_with_head, head_revs): | for (origin, rev) in zip(origins_with_head, head_revs): | |||||||||
if not rev: | if not rev: | |||||||||
self.log.warning("Missing head revision of origin %r", origin.url) | self.log.warning("Missing head revision of origin %r", origin.url) | |||||||||
continue | continue | |||||||||
for rev_metadata in self.revision_metadata_indexer.index(rev.id, rev): | for dir_metadata in self.directory_metadata_indexer.index(rev.directory): | |||||||||
# There is at most one rev_metadata | # There is at most one dir_metadata | |||||||||
orig_metadata = OriginIntrinsicMetadataRow( | orig_metadata = OriginIntrinsicMetadataRow( | |||||||||
from_revision=rev_metadata.id, | from_directory=dir_metadata.id, | |||||||||
id=origin.url, | id=origin.url, | |||||||||
metadata=rev_metadata.metadata, | metadata=dir_metadata.metadata, | |||||||||
mappings=rev_metadata.mappings, | mappings=dir_metadata.mappings, | |||||||||
indexer_configuration_id=rev_metadata.indexer_configuration_id, | indexer_configuration_id=dir_metadata.indexer_configuration_id, | |||||||||
) | ) | |||||||||
results.append((orig_metadata, rev_metadata)) | results.append((orig_metadata, dir_metadata)) | |||||||||
return results | return results | |||||||||
def persist_index_computations( | def persist_index_computations( | |||||||||
self, | self, | |||||||||
results: List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]], | results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], | |||||||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | |||||||||
# Deduplicate revisions | # Deduplicate directories | |||||||||
rev_metadata: List[RevisionIntrinsicMetadataRow] = [] | dir_metadata: List[DirectoryIntrinsicMetadataRow] = [] | |||||||||
orig_metadata: List[OriginIntrinsicMetadataRow] = [] | orig_metadata: List[OriginIntrinsicMetadataRow] = [] | |||||||||
summary: Dict = {} | summary: Dict = {} | |||||||||
for (orig_item, rev_item) in results: | for (orig_item, dir_item) in results: | |||||||||
assert rev_item.metadata == orig_item.metadata | assert dir_item.metadata == orig_item.metadata | |||||||||
if rev_item.metadata and not (rev_item.metadata.keys() <= {"@context"}): | if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): | |||||||||
# Only store non-empty metadata sets | # Only store non-empty metadata sets | |||||||||
if rev_item not in rev_metadata: | if dir_item not in dir_metadata: | |||||||||
rev_metadata.append(rev_item) | dir_metadata.append(dir_item) | |||||||||
if orig_item not in orig_metadata: | if orig_item not in orig_metadata: | |||||||||
orig_metadata.append(orig_item) | orig_metadata.append(orig_item) | |||||||||
if rev_metadata: | if dir_metadata: | |||||||||
summary_rev = self.idx_storage.revision_intrinsic_metadata_add(rev_metadata) | summary_dir = self.idx_storage.directory_intrinsic_metadata_add( | |||||||||
summary.update(summary_rev) | dir_metadata | |||||||||
) | ||||||||||
summary.update(summary_dir) | ||||||||||
if orig_metadata: | if orig_metadata: | |||||||||
summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) | summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) | |||||||||
summary.update(summary_ori) | summary.update(summary_ori) | |||||||||
return summary | return summary |