Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata.py
# Copyright (C) 2017-2018 The Software Heritage developers | # Copyright (C) 2017-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from copy import deepcopy | |||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.objstorage.exc import ObjNotFoundError | |||||
from swh.model import hashutil | |||||
from swh.indexer.codemeta import merge_documents | from swh.indexer.codemeta import merge_documents | ||||
from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer | from swh.indexer.indexer import RevisionIndexer, OriginIndexer | ||||
from swh.indexer.origin_head import OriginHeadIndexer | from swh.indexer.origin_head import OriginHeadIndexer | ||||
from swh.indexer.metadata_dictionary import MAPPINGS | from swh.indexer.metadata_dictionary import MAPPINGS | ||||
from swh.indexer.metadata_detector import detect_metadata | from swh.indexer.metadata_detector import detect_metadata | ||||
from swh.indexer.storage import INDEXER_CFG_KEY | |||||
from swh.model import hashutil | |||||
REVISION_GET_BATCH_SIZE = 10 | REVISION_GET_BATCH_SIZE = 10 | ||||
ORIGIN_GET_BATCH_SIZE = 10 | ORIGIN_GET_BATCH_SIZE = 10 | ||||
def call_with_batches(f, args, batch_size): | def call_with_batches(f, args, batch_size): | ||||
"""Calls a function with batches of args, and concatenates the results. | """Calls a function with batches of args, and concatenates the results. | ||||
""" | """ | ||||
groups = grouper(args, batch_size) | groups = grouper(args, batch_size) | ||||
for group in groups: | for group in groups: | ||||
yield from f(list(group)) | yield from f(list(group)) | ||||
class ContentMetadataIndexer(ContentIndexer): | |||||
"""Content-level indexer | |||||
This indexer is in charge of: | |||||
- filtering out content already indexed in content_metadata | |||||
- reading content from objstorage with the content's id sha1 | |||||
- computing metadata by given context | |||||
- using the metadata_dictionary as the 'swh-metadata-translator' tool | |||||
- store result in content_metadata table | |||||
""" | |||||
def filter(self, ids): | |||||
"""Filter out known sha1s and return only missing ones. | |||||
""" | |||||
yield from self.idx_storage.content_metadata_missing(( | |||||
{ | |||||
'id': sha1, | |||||
'indexer_configuration_id': self.tool['id'], | |||||
} for sha1 in ids | |||||
)) | |||||
def index(self, id, data, log_suffix='unknown revision'): | |||||
"""Index sha1s' content and store result. | |||||
Args: | |||||
id (bytes): content's identifier | |||||
data (bytes): raw content in bytes | |||||
Returns: | |||||
dict: dictionary representing a content_metadata. If the | |||||
translation wasn't successful the metadata keys will | |||||
be returned as None | |||||
""" | |||||
result = { | |||||
'id': id, | |||||
'indexer_configuration_id': self.tool['id'], | |||||
'metadata': None | |||||
} | |||||
try: | |||||
mapping_name = self.tool['tool_configuration']['context'] | |||||
log_suffix += ', content_id=%s' % hashutil.hash_to_hex(id) | |||||
result['metadata'] = \ | |||||
MAPPINGS[mapping_name](log_suffix).translate(data) | |||||
except Exception: | |||||
self.log.exception( | |||||
"Problem during metadata translation " | |||||
"for content %s" % hashutil.hash_to_hex(id)) | |||||
if result['metadata'] is None: | |||||
return None | |||||
return result | |||||
def persist_index_computations(self, results, policy_update): | |||||
"""Persist the results in storage. | |||||
Args: | |||||
results ([dict]): list of content_metadata, dict with the | |||||
following keys: | |||||
- id (bytes): content's identifier (sha1) | |||||
- metadata (jsonb): detected metadata | |||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | |||||
respectively update duplicates or ignore them | |||||
""" | |||||
self.idx_storage.content_metadata_add( | |||||
results, conflict_update=(policy_update == 'update-dups')) | |||||
class RevisionMetadataIndexer(RevisionIndexer): | class RevisionMetadataIndexer(RevisionIndexer): | ||||
"""Revision-level indexer | """Revision-level indexer | ||||
This indexer is in charge of: | This indexer is in charge of: | ||||
- filtering revisions already indexed in revision_intrinsic_metadata table | - filtering revisions already indexed in revision_intrinsic_metadata table | ||||
with defined computation tool | with defined computation tool | ||||
- retrieve all entry_files in root directory | - retrieve all entry_files in root directory | ||||
▲ Show 20 Lines • Show All 98 Lines • ▼ Show 20 Lines | def translate_revision_intrinsic_metadata( | ||||
detected_files (dict): dictionary mapping context names (e.g., | detected_files (dict): dictionary mapping context names (e.g., | ||||
"npm", "authors") to list of sha1 | "npm", "authors") to list of sha1 | ||||
Returns: | Returns: | ||||
(List[str], dict): list of mappings used and dict with | (List[str], dict): list of mappings used and dict with | ||||
translated metadata according to the CodeMeta vocabulary | translated metadata according to the CodeMeta vocabulary | ||||
""" | """ | ||||
used_mappings = [MAPPINGS[context].name for context in detected_files] | used_mappings = [MAPPINGS[mapping_name].name | ||||
for mapping_name in detected_files] | |||||
metadata = [] | metadata = [] | ||||
tool = { | |||||
'name': 'swh-metadata-translator', | |||||
'version': '0.0.2', | |||||
'configuration': { | |||||
}, | |||||
} | |||||
# TODO: iterate on each context, on each file | |||||
# -> get raw_contents | |||||
# -> translate each content | |||||
config = { | |||||
k: self.config[k] | |||||
for k in [INDEXER_CFG_KEY, 'objstorage', 'storage'] | |||||
} | |||||
config['tools'] = [tool] | |||||
for context in detected_files.keys(): | |||||
cfg = deepcopy(config) | |||||
cfg['tools'][0]['configuration']['context'] = context | |||||
c_metadata_indexer = ContentMetadataIndexer(config=cfg) | |||||
# sha1s that are in content_metadata table | |||||
sha1s_in_storage = [] | |||||
metadata_generator = self.idx_storage.content_metadata_get( | |||||
detected_files[context]) | |||||
for c in metadata_generator: | |||||
# extracting metadata | |||||
sha1 = c['id'] | |||||
sha1s_in_storage.append(sha1) | |||||
local_metadata = c['metadata'] | |||||
# local metadata is aggregated | |||||
if local_metadata: | |||||
metadata.append(local_metadata) | |||||
sha1s_filtered = [item for item in detected_files[context] | for (mapping_name, sha1s) in detected_files.items(): | ||||
if item not in sha1s_in_storage] | for sha1 in sha1s: | ||||
result = self.index_content( | |||||
sha1, mapping_name, log_suffix=log_suffix) | |||||
print(result) | |||||
ardumont: Please remove the print ;) | |||||
if result: | |||||
metadata.append(result) | |||||
if sha1s_filtered: | metadata = merge_documents(metadata) | ||||
# content indexing | return (used_mappings, metadata) | ||||
try: | |||||
c_metadata_indexer.run(sha1s_filtered, | |||||
policy_update='ignore-dups', | |||||
log_suffix=log_suffix) | |||||
# on the fly possibility: | |||||
for result in c_metadata_indexer.results: | |||||
local_metadata = result['metadata'] | |||||
metadata.append(local_metadata) | |||||
def index_content(self, id, mapping_name, log_suffix='unknown revision'): | |||||
"""Index sha1s' content and store result. | |||||
Args: | |||||
id (bytes): content's identifier | |||||
data (bytes): raw content in bytes | |||||
Returns: | |||||
dict: dictionary representing a content_metadata. If the | |||||
translation wasn't successful the metadata keys will | |||||
be returned as None | |||||
""" | |||||
try: | |||||
raw_content = self.objstorage.get(id) | |||||
except ObjNotFoundError: | |||||
self.log.warning('Content %s not found in objstorage' % | |||||
hashutil.hash_to_hex(id)) | |||||
return | |||||
try: | |||||
log_suffix += ', content_id=%s' % hashutil.hash_to_hex(id) | |||||
return MAPPINGS[mapping_name](log_suffix).translate(raw_content) | |||||
except Exception: | except Exception: | ||||
self.log.exception( | self.log.exception( | ||||
"Exception while indexing metadata on contents") | "Problem during metadata translation " | ||||
"for content %s" % hashutil.hash_to_hex(id)) | |||||
metadata = merge_documents(metadata) | |||||
return (used_mappings, metadata) | |||||
class OriginMetadataIndexer(OriginIndexer): | class OriginMetadataIndexer(OriginIndexer): | ||||
ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG | ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG | ||||
USE_TOOLS = False | USE_TOOLS = False | ||||
def __init__(self, config=None, **kwargs): | def __init__(self, config=None, **kwargs): | ||||
▲ Show 20 Lines • Show All 81 Lines • Show Last 20 Lines |
Please remove the print ;)