Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata.py
# Copyright (C) 2017-2018 The Software Heritage developers | # Copyright (C) 2017-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from copy import deepcopy | from copy import deepcopy | ||||
from typing import Any, List, Dict, Tuple, Callable, Generator | |||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.indexer.codemeta import merge_documents | from swh.indexer.codemeta import merge_documents | ||||
from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer | from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer | ||||
from swh.indexer.origin_head import OriginHeadIndexer | from swh.indexer.origin_head import OriginHeadIndexer | ||||
from swh.indexer.metadata_dictionary import MAPPINGS | from swh.indexer.metadata_dictionary import MAPPINGS | ||||
from swh.indexer.metadata_detector import detect_metadata | from swh.indexer.metadata_detector import detect_metadata | ||||
from swh.indexer.storage import INDEXER_CFG_KEY | from swh.indexer.storage import INDEXER_CFG_KEY | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
REVISION_GET_BATCH_SIZE = 10 | REVISION_GET_BATCH_SIZE = 10 | ||||
ORIGIN_GET_BATCH_SIZE = 10 | ORIGIN_GET_BATCH_SIZE = 10 | ||||
def call_with_batches(f, args, batch_size): | def call_with_batches( | ||||
f: Callable[[List[Dict[str, Any]]], Dict['str', Any]], | |||||
args: List[Dict[str, str]], batch_size: int | |||||
vlorentz: sorry I missed this earlier, but you also forgot yield type for this generator | |||||
) -> Generator: | |||||
"""Calls a function with batches of args, and concatenates the results. | """Calls a function with batches of args, and concatenates the results. | ||||
""" | """ | ||||
groups = grouper(args, batch_size) | groups = grouper(args, batch_size) | ||||
for group in groups: | for group in groups: | ||||
yield from f(list(group)) | yield from f(list(group)) | ||||
class ContentMetadataIndexer(ContentIndexer): | class ContentMetadataIndexer(ContentIndexer): | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def index(self, id, data, log_suffix='unknown revision'): | ||||
except Exception: | except Exception: | ||||
self.log.exception( | self.log.exception( | ||||
"Problem during metadata translation " | "Problem during metadata translation " | ||||
"for content %s" % hashutil.hash_to_hex(id)) | "for content %s" % hashutil.hash_to_hex(id)) | ||||
if result['metadata'] is None: | if result['metadata'] is None: | ||||
return None | return None | ||||
return result | return result | ||||
def persist_index_computations(self, results, policy_update): | def persist_index_computations( | ||||
self, results: Dict, policy_update: str | |||||
) -> None: | |||||
"""Persist the results in storage. | """Persist the results in storage. | ||||
Args: | Args: | ||||
results ([dict]): list of content_metadata, dict with the | results: list of content_metadata, dict with the | ||||
following keys: | following keys: | ||||
- id (bytes): content's identifier (sha1) | - id (bytes): content's identifier (sha1) | ||||
- metadata (jsonb): detected metadata | - metadata (jsonb): detected metadata | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update: either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
""" | """ | ||||
self.idx_storage.content_metadata_add( | self.idx_storage.content_metadata_add( | ||||
results, conflict_update=(policy_update == 'update-dups')) | results, conflict_update=(policy_update == 'update-dups')) | ||||
class RevisionMetadataIndexer(RevisionIndexer): | class RevisionMetadataIndexer(RevisionIndexer): | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def index(self, rev): | ||||
log_suffix='revision=%s' % hashutil.hash_to_hex(rev['id'])) | log_suffix='revision=%s' % hashutil.hash_to_hex(rev['id'])) | ||||
result['mappings'] = mappings | result['mappings'] = mappings | ||||
result['metadata'] = metadata | result['metadata'] = metadata | ||||
except Exception as e: | except Exception as e: | ||||
self.log.exception( | self.log.exception( | ||||
'Problem when indexing rev: %r', e) | 'Problem when indexing rev: %r', e) | ||||
return result | return result | ||||
def persist_index_computations(self, results, policy_update): | def persist_index_computations( | ||||
self, results: Dict, policy_update: str | |||||
) -> None: | |||||
"""Persist the results in storage. | """Persist the results in storage. | ||||
Args: | Args: | ||||
results ([dict]): list of content_mimetype, dict with the | results: list of content_mimetype, dict with the | ||||
following keys: | following keys: | ||||
- id (bytes): content's identifier (sha1) | - id (bytes): content's identifier (sha1) | ||||
- mimetype (bytes): mimetype in bytes | - mimetype (bytes): mimetype in bytes | ||||
- encoding (bytes): encoding in bytes | - encoding (bytes): encoding in bytes | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update: either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
""" | """ | ||||
# TODO: add functions in storage to keep data in | # TODO: add functions in storage to keep data in | ||||
# revision_intrinsic_metadata | # revision_intrinsic_metadata | ||||
self.idx_storage.revision_intrinsic_metadata_add( | self.idx_storage.revision_intrinsic_metadata_add( | ||||
results, conflict_update=(policy_update == 'update-dups')) | results, conflict_update=(policy_update == 'update-dups')) | ||||
def translate_revision_intrinsic_metadata( | def translate_revision_intrinsic_metadata( | ||||
self, detected_files, log_suffix): | self, detected_files: Dict[str, List[Any]], log_suffix: str | ||||
) -> Tuple[List[Any], List[Any]]: | |||||
""" | """ | ||||
Determine plan of action to translate metadata when containing | Determine plan of action to translate metadata when containing | ||||
one or multiple detected files: | one or multiple detected files: | ||||
Args: | Args: | ||||
detected_files (dict): dictionary mapping context names (e.g., | detected_files: dictionary mapping context names (e.g., | ||||
"npm", "authors") to list of sha1 | "npm", "authors") to list of sha1 | ||||
Returns: | Returns: | ||||
(List[str], dict): list of mappings used and dict with | (List[str], dict): list of mappings used and dict with | ||||
translated metadata according to the CodeMeta vocabulary | translated metadata according to the CodeMeta vocabulary | ||||
""" | """ | ||||
used_mappings = [MAPPINGS[context].name for context in detected_files] | used_mappings = [MAPPINGS[context].name for context in detected_files] | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | ) -> Tuple[List[Any], List[Any]]: | ||||
return (used_mappings, metadata) | return (used_mappings, metadata) | ||||
class OriginMetadataIndexer(OriginIndexer): | class OriginMetadataIndexer(OriginIndexer): | ||||
ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG | ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG | ||||
USE_TOOLS = False | USE_TOOLS = False | ||||
def __init__(self, config=None, **kwargs): | def __init__(self, config=None, **kwargs) -> None: | ||||
super().__init__(config=config, **kwargs) | super().__init__(config=config, **kwargs) | ||||
self.origin_head_indexer = OriginHeadIndexer(config=config) | self.origin_head_indexer = OriginHeadIndexer(config=config) | ||||
self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) | self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) | ||||
def index_list(self, origin_urls): | def index_list(self, origin_urls): | ||||
head_rev_ids = [] | head_rev_ids = [] | ||||
origins_with_head = [] | origins_with_head = [] | ||||
origins = list(call_with_batches( | origins = list(call_with_batches( | ||||
Show All 24 Lines | def index_list(self, origin_urls): | ||||
'metadata': rev_metadata['metadata'], | 'metadata': rev_metadata['metadata'], | ||||
'mappings': rev_metadata['mappings'], | 'mappings': rev_metadata['mappings'], | ||||
'indexer_configuration_id': | 'indexer_configuration_id': | ||||
rev_metadata['indexer_configuration_id'], | rev_metadata['indexer_configuration_id'], | ||||
} | } | ||||
results.append((orig_metadata, rev_metadata)) | results.append((orig_metadata, rev_metadata)) | ||||
return results | return results | ||||
def persist_index_computations(self, results, policy_update): | def persist_index_computations( | ||||
self, results: Dict, policy_update: str | |||||
) -> None: | |||||
conflict_update = (policy_update == 'update-dups') | conflict_update = (policy_update == 'update-dups') | ||||
# Deduplicate revisions | # Deduplicate revisions | ||||
rev_metadata = [] | rev_metadata: List[Any] = [] | ||||
orig_metadata = [] | orig_metadata: List[Any] = [] | ||||
revs_to_delete = [] | revs_to_delete: List[Any] = [] | ||||
origs_to_delete = [] | origs_to_delete: List[Any] = [] | ||||
for (orig_item, rev_item) in results: | for (orig_item, rev_item) in results: | ||||
assert rev_item['metadata'] == orig_item['metadata'] | assert rev_item['metadata'] == orig_item['metadata'] | ||||
if not rev_item['metadata'] or \ | if not rev_item['metadata'] or \ | ||||
rev_item['metadata'].keys() <= {'@context'}: | rev_item['metadata'].keys() <= {'@context'}: | ||||
# If we didn't find any metadata, don't store a DB record | # If we didn't find any metadata, don't store a DB record | ||||
# (and delete existing ones, if any) | # (and delete existing ones, if any) | ||||
if rev_item not in revs_to_delete: | if rev_item not in revs_to_delete: | ||||
revs_to_delete.append(rev_item) | revs_to_delete.append(rev_item) | ||||
Show All 23 Lines |
sorry I missed this earlier, but you also forgot yield type for this generator