Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata.py
# Copyright (C) 2017-2021 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from copy import deepcopy | from copy import deepcopy | ||||
from typing import ( | from typing import ( | ||||
Any, | Any, | ||||
Callable, | Callable, | ||||
Dict, | Dict, | ||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Tuple, | Tuple, | ||||
TypeVar, | TypeVar, | ||||
) | ) | ||||
import sentry_sdk | import sentry_sdk | ||||
from swh.core.config import merge_configs | from swh.core.config import merge_configs | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.indexer.codemeta import merge_documents | from swh.indexer.codemeta import merge_documents | ||||
from swh.indexer.indexer import ContentIndexer, OriginIndexer, RevisionIndexer | from swh.indexer.indexer import ContentIndexer, DirectoryIndexer, OriginIndexer | ||||
from swh.indexer.metadata_detector import detect_metadata | from swh.indexer.metadata_detector import detect_metadata | ||||
from swh.indexer.metadata_dictionary import MAPPINGS | from swh.indexer.metadata_dictionary import MAPPINGS | ||||
from swh.indexer.origin_head import OriginHeadIndexer | from swh.indexer.origin_head import get_head_swhid | ||||
from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 | from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 | ||||
from swh.indexer.storage.model import ( | from swh.indexer.storage.model import ( | ||||
ContentMetadataRow, | ContentMetadataRow, | ||||
DirectoryIntrinsicMetadataRow, | |||||
OriginIntrinsicMetadataRow, | OriginIntrinsicMetadataRow, | ||||
RevisionIntrinsicMetadataRow, | |||||
) | ) | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import Revision, Sha1Git | from swh.model.model import Directory | ||||
from swh.model.model import ObjectType as ModelObjectType | |||||
from swh.model.model import Origin, Sha1Git | |||||
from swh.model.swhids import CoreSWHID, ObjectType | |||||
REVISION_GET_BATCH_SIZE = 10 | REVISION_GET_BATCH_SIZE = 10 | ||||
RELEASE_GET_BATCH_SIZE = 10 | |||||
ORIGIN_GET_BATCH_SIZE = 10 | ORIGIN_GET_BATCH_SIZE = 10 | ||||
T1 = TypeVar("T1") | T1 = TypeVar("T1") | ||||
T2 = TypeVar("T2") | T2 = TypeVar("T2") | ||||
def call_with_batches( | def call_with_batches( | ||||
Show All 31 Lines | def filter(self, ids): | ||||
for sha1 in ids | for sha1 in ids | ||||
) | ) | ||||
) | ) | ||||
def index( | def index( | ||||
self, | self, | ||||
id: Sha1, | id: Sha1, | ||||
data: Optional[bytes] = None, | data: Optional[bytes] = None, | ||||
log_suffix="unknown revision", | log_suffix="unknown directory", | ||||
**kwargs, | **kwargs, | ||||
) -> List[ContentMetadataRow]: | ) -> List[ContentMetadataRow]: | ||||
"""Index sha1s' content and store result. | """Index sha1s' content and store result. | ||||
Args: | Args: | ||||
id: content's identifier | id: content's identifier | ||||
data: raw content in bytes | data: raw content in bytes | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | DEFAULT_CONFIG: Dict[str, Any] = { | ||||
"tools": { | "tools": { | ||||
"name": "swh-metadata-detector", | "name": "swh-metadata-detector", | ||||
"version": "0.0.2", | "version": "0.0.2", | ||||
"configuration": {}, | "configuration": {}, | ||||
}, | }, | ||||
} | } | ||||
class RevisionMetadataIndexer(RevisionIndexer[RevisionIntrinsicMetadataRow]): | class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): | ||||
"""Revision-level indexer | """Directory-level indexer | ||||
This indexer is in charge of: | This indexer is in charge of: | ||||
- filtering revisions already indexed in revision_intrinsic_metadata table | - filtering directories already indexed in directory_intrinsic_metadata table | ||||
with defined computation tool | with defined computation tool | ||||
- retrieve all entry_files in root directory | - retrieve all entry_files in directory | ||||
- use metadata_detector for file_names containing metadata | - use metadata_detector for file_names containing metadata | ||||
- compute metadata translation if necessary and possible (depends on tool) | - compute metadata translation if necessary and possible (depends on tool) | ||||
- send sha1s to content indexing if possible | - send sha1s to content indexing if possible | ||||
- store the results for revision | - store the results for directory | ||||
""" | """ | ||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
self.config = merge_configs(DEFAULT_CONFIG, self.config) | self.config = merge_configs(DEFAULT_CONFIG, self.config) | ||||
def filter(self, sha1_gits): | def filter(self, sha1_gits): | ||||
"""Filter out known sha1s and return only missing ones.""" | """Filter out known sha1s and return only missing ones.""" | ||||
yield from self.idx_storage.revision_intrinsic_metadata_missing( | yield from self.idx_storage.directory_intrinsic_metadata_missing( | ||||
( | ( | ||||
{ | { | ||||
"id": sha1_git, | "id": sha1_git, | ||||
"indexer_configuration_id": self.tool["id"], | "indexer_configuration_id": self.tool["id"], | ||||
} | } | ||||
for sha1_git in sha1_gits | for sha1_git in sha1_gits | ||||
) | ) | ||||
) | ) | ||||
def index( | def index( | ||||
self, id: Sha1Git, data: Optional[Revision], **kwargs | self, id: Sha1Git, data: Optional[Directory] = None, **kwargs | ||||
) -> List[RevisionIntrinsicMetadataRow]: | ) -> List[DirectoryIntrinsicMetadataRow]: | ||||
"""Index rev by processing it and organizing result. | """Index directory by processing it and organizing result. | ||||
use metadata_detector to iterate on filenames | use metadata_detector to iterate on filenames | ||||
- if one filename detected -> sends file to content indexer | - if one filename detected -> sends file to content indexer | ||||
- if multiple file detected -> translation needed at revision level | - if multiple file detected -> translation needed at directory level | ||||
Args: | Args: | ||||
id: sha1_git of the revision | id: sha1_git of the directory | ||||
data: revision model object from storage | data: directory model object from storage | ||||
Returns: | Returns: | ||||
dict: dictionary representing a revision_intrinsic_metadata, with | dict: dictionary representing a directory_intrinsic_metadata, with | ||||
keys: | keys: | ||||
- id (str): rev's identifier (sha1_git) | - id: directory's identifier (sha1_git) | ||||
- indexer_configuration_id (bytes): tool used | - indexer_configuration_id (bytes): tool used | ||||
- metadata: dict of retrieved metadata | - metadata: dict of retrieved metadata | ||||
""" | """ | ||||
rev = data | if data is None: | ||||
assert isinstance(rev, Revision) | dir_ = list(self.storage.directory_ls(id, recursive=False)) | ||||
else: | |||||
assert isinstance(data, Directory) | |||||
dir_ = data.to_dict() | |||||
try: | try: | ||||
root_dir = rev.directory | if [entry["type"] for entry in dir_] == ["dir"]: | ||||
dir_ls = list(self.storage.directory_ls(root_dir, recursive=False)) | |||||
if [entry["type"] for entry in dir_ls] == ["dir"]: | |||||
# If the root is just a single directory, recurse into it | # If the root is just a single directory, recurse into it | ||||
# eg. PyPI packages, GNU tarballs | # eg. PyPI packages, GNU tarballs | ||||
subdir = dir_ls[0]["target"] | subdir = dir_[0]["target"] | ||||
dir_ls = list(self.storage.directory_ls(subdir, recursive=False)) | dir_ = list(self.storage.directory_ls(subdir, recursive=False)) | ||||
files = [entry for entry in dir_ls if entry["type"] == "file"] | files = [entry for entry in dir_ if entry["type"] == "file"] | ||||
detected_files = detect_metadata(files) | detected_files = detect_metadata(files) | ||||
(mappings, metadata) = self.translate_revision_intrinsic_metadata( | (mappings, metadata) = self.translate_directory_intrinsic_metadata( | ||||
detected_files, | detected_files, | ||||
log_suffix="revision=%s" % hashutil.hash_to_hex(rev.id), | log_suffix="directory=%s" % hashutil.hash_to_hex(id), | ||||
) | ) | ||||
except Exception as e: | except Exception as e: | ||||
self.log.exception("Problem when indexing rev: %r", e) | self.log.exception("Problem when indexing dir: %r", e) | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
return [ | return [ | ||||
RevisionIntrinsicMetadataRow( | DirectoryIntrinsicMetadataRow( | ||||
id=rev.id, | id=id, | ||||
indexer_configuration_id=self.tool["id"], | indexer_configuration_id=self.tool["id"], | ||||
mappings=mappings, | mappings=mappings, | ||||
metadata=metadata, | metadata=metadata, | ||||
) | ) | ||||
] | ] | ||||
def persist_index_computations( | def persist_index_computations( | ||||
self, results: List[RevisionIntrinsicMetadataRow] | self, results: List[DirectoryIntrinsicMetadataRow] | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
"""Persist the results in storage. | """Persist the results in storage. | ||||
Args: | Args: | ||||
results: list of content_mimetype, dict with the | results: list of content_mimetype, dict with the | ||||
following keys: | following keys: | ||||
- id (bytes): content's identifier (sha1) | - id (bytes): content's identifier (sha1) | ||||
- mimetype (bytes): mimetype in bytes | - mimetype (bytes): mimetype in bytes | ||||
- encoding (bytes): encoding in bytes | - encoding (bytes): encoding in bytes | ||||
""" | """ | ||||
# TODO: add functions in storage to keep data in | # TODO: add functions in storage to keep data in | ||||
# revision_intrinsic_metadata | # directory_intrinsic_metadata | ||||
return self.idx_storage.revision_intrinsic_metadata_add(results) | return self.idx_storage.directory_intrinsic_metadata_add(results) | ||||
def translate_revision_intrinsic_metadata( | def translate_directory_intrinsic_metadata( | ||||
self, detected_files: Dict[str, List[Any]], log_suffix: str | self, detected_files: Dict[str, List[Any]], log_suffix: str | ||||
) -> Tuple[List[Any], Any]: | ) -> Tuple[List[Any], Any]: | ||||
""" | """ | ||||
Determine plan of action to translate metadata when containing | Determine plan of action to translate metadata when containing | ||||
one or multiple detected files: | one or multiple detected files: | ||||
Args: | Args: | ||||
detected_files: dictionary mapping context names (e.g., | detected_files: dictionary mapping context names (e.g., | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | ) -> Tuple[List[Any], Any]: | ||||
self.log.exception("Exception while indexing metadata on contents") | self.log.exception("Exception while indexing metadata on contents") | ||||
sentry_sdk.capture_exception() | sentry_sdk.capture_exception() | ||||
metadata = merge_documents(metadata) | metadata = merge_documents(metadata) | ||||
return (used_mappings, metadata) | return (used_mappings, metadata) | ||||
class OriginMetadataIndexer( | class OriginMetadataIndexer( | ||||
OriginIndexer[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]] | OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] | ||||
): | ): | ||||
USE_TOOLS = False | USE_TOOLS = False | ||||
def __init__(self, config=None, **kwargs) -> None: | def __init__(self, config=None, **kwargs) -> None: | ||||
super().__init__(config=config, **kwargs) | super().__init__(config=config, **kwargs) | ||||
self.origin_head_indexer = OriginHeadIndexer(config=config) | self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) | ||||
self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) | |||||
def index_list( | def index_list( | ||||
self, origin_urls: List[str], **kwargs | self, origins: List[Origin], check_origin_known: bool = True, **kwargs | ||||
) -> List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]]: | ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: | ||||
head_rev_ids = [] | head_rev_ids = [] | ||||
origins_with_head = [] | head_rel_ids = [] | ||||
origins = list( | origin_heads: Dict[Origin, CoreSWHID] = {} | ||||
# Filter out origins not in the storage | |||||
if check_origin_known: | |||||
known_origins = list( | |||||
call_with_batches( | call_with_batches( | ||||
self.storage.origin_get, | self.storage.origin_get, | ||||
origin_urls, | [origin.url for origin in origins], | ||||
ORIGIN_GET_BATCH_SIZE, | ORIGIN_GET_BATCH_SIZE, | ||||
) | ) | ||||
) | ) | ||||
for origin in origins: | else: | ||||
known_origins = list(origins) | |||||
for origin in known_origins: | |||||
if origin is None: | if origin is None: | ||||
continue | continue | ||||
head_results = self.origin_head_indexer.index(origin.url) | head_swhid = get_head_swhid(self.storage, origin.url) | ||||
if head_results: | if head_swhid: | ||||
(head_result,) = head_results | origin_heads[origin] = head_swhid | ||||
origins_with_head.append(origin) | if head_swhid.object_type == ObjectType.REVISION: | ||||
head_rev_ids.append(head_result["revision_id"]) | head_rev_ids.append(head_swhid.object_id) | ||||
elif head_swhid.object_type == ObjectType.RELEASE: | |||||
head_revs = list( | head_rel_ids.append(head_swhid.object_id) | ||||
else: | |||||
assert False, head_swhid | |||||
head_revs = dict( | |||||
zip( | |||||
head_rev_ids, | |||||
call_with_batches( | call_with_batches( | ||||
self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE | self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE | ||||
), | |||||
) | |||||
) | |||||
head_rels = dict( | |||||
zip( | |||||
head_rel_ids, | |||||
call_with_batches( | |||||
self.storage.release_get, head_rel_ids, RELEASE_GET_BATCH_SIZE | |||||
), | |||||
) | ) | ||||
) | ) | ||||
assert len(head_revs) == len(head_rev_ids) | |||||
results = [] | results = [] | ||||
for (origin, rev) in zip(origins_with_head, head_revs): | for (origin, head_swhid) in origin_heads.items(): | ||||
if head_swhid.object_type == ObjectType.REVISION: | |||||
rev = head_revs[head_swhid.object_id] | |||||
if not rev: | if not rev: | ||||
self.log.warning("Missing head revision of origin %r", origin.url) | self.log.warning( | ||||
"Missing head object %s of origin %r", head_swhid, origin.url | |||||
) | |||||
continue | |||||
directory_id = rev.directory | |||||
elif head_swhid.object_type == ObjectType.RELEASE: | |||||
rel = head_rels[head_swhid.object_id] | |||||
if not rel: | |||||
self.log.warning( | |||||
"Missing head object %s of origin %r", head_swhid, origin.url | |||||
) | |||||
continue | continue | ||||
if rel.target_type != ModelObjectType.DIRECTORY: | |||||
# TODO | |||||
self.log.warning( | |||||
"Head release %s of %r has unexpected target type %s", | |||||
head_swhid, | |||||
origin.url, | |||||
rel.target_type, | |||||
) | |||||
continue | |||||
assert rel.target, rel | |||||
directory_id = rel.target | |||||
else: | |||||
assert False, head_swhid | |||||
for rev_metadata in self.revision_metadata_indexer.index(rev.id, rev): | for dir_metadata in self.directory_metadata_indexer.index(directory_id): | ||||
# There is at most one rev_metadata | # There is at most one dir_metadata | ||||
orig_metadata = OriginIntrinsicMetadataRow( | orig_metadata = OriginIntrinsicMetadataRow( | ||||
from_revision=rev_metadata.id, | from_directory=dir_metadata.id, | ||||
id=origin.url, | id=origin.url, | ||||
metadata=rev_metadata.metadata, | metadata=dir_metadata.metadata, | ||||
mappings=rev_metadata.mappings, | mappings=dir_metadata.mappings, | ||||
indexer_configuration_id=rev_metadata.indexer_configuration_id, | indexer_configuration_id=dir_metadata.indexer_configuration_id, | ||||
) | ) | ||||
results.append((orig_metadata, rev_metadata)) | results.append((orig_metadata, dir_metadata)) | ||||
return results | return results | ||||
def persist_index_computations( | def persist_index_computations( | ||||
self, | self, | ||||
results: List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]], | results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
# Deduplicate revisions | # Deduplicate directories | ||||
rev_metadata: List[RevisionIntrinsicMetadataRow] = [] | dir_metadata: List[DirectoryIntrinsicMetadataRow] = [] | ||||
orig_metadata: List[OriginIntrinsicMetadataRow] = [] | orig_metadata: List[OriginIntrinsicMetadataRow] = [] | ||||
summary: Dict = {} | summary: Dict = {} | ||||
for (orig_item, rev_item) in results: | for (orig_item, dir_item) in results: | ||||
assert rev_item.metadata == orig_item.metadata | assert dir_item.metadata == orig_item.metadata | ||||
if rev_item.metadata and not (rev_item.metadata.keys() <= {"@context"}): | if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): | ||||
# Only store non-empty metadata sets | # Only store non-empty metadata sets | ||||
if rev_item not in rev_metadata: | if dir_item not in dir_metadata: | ||||
rev_metadata.append(rev_item) | dir_metadata.append(dir_item) | ||||
if orig_item not in orig_metadata: | if orig_item not in orig_metadata: | ||||
orig_metadata.append(orig_item) | orig_metadata.append(orig_item) | ||||
if rev_metadata: | if dir_metadata: | ||||
summary_rev = self.idx_storage.revision_intrinsic_metadata_add(rev_metadata) | summary_dir = self.idx_storage.directory_intrinsic_metadata_add( | ||||
summary.update(summary_rev) | dir_metadata | ||||
) | |||||
summary.update(summary_dir) | |||||
if orig_metadata: | if orig_metadata: | ||||
summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) | summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) | ||||
summary.update(summary_ori) | summary.update(summary_ori) | ||||
return summary | return summary |