Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata.py
Show All 16 Lines | |||||
class ContentMetadataIndexer(ContentIndexer): | class ContentMetadataIndexer(ContentIndexer): | ||||
"""Content-level indexer | """Content-level indexer | ||||
This indexer is in charge of: | This indexer is in charge of: | ||||
- filtering out content already indexed in content_metadata | - filtering out content already indexed in content_metadata | ||||
- reading content from objstorage with the content's id sha1 | - reading content from objstorage with the content's id sha1 | ||||
- computing translated_metadata by given context | - computing metadata by given context | ||||
- using the metadata_dictionary as the 'swh-metadata-translator' tool | - using the metadata_dictionary as the 'swh-metadata-translator' tool | ||||
- store result in content_metadata table | - store result in content_metadata table | ||||
""" | """ | ||||
def filter(self, ids): | def filter(self, ids): | ||||
"""Filter out known sha1s and return only missing ones. | """Filter out known sha1s and return only missing ones. | ||||
""" | """ | ||||
yield from self.idx_storage.content_metadata_missing(( | yield from self.idx_storage.content_metadata_missing(( | ||||
{ | { | ||||
'id': sha1, | 'id': sha1, | ||||
'indexer_configuration_id': self.tool['id'], | 'indexer_configuration_id': self.tool['id'], | ||||
} for sha1 in ids | } for sha1 in ids | ||||
)) | )) | ||||
def index(self, id, data, log_suffix='unknown revision'): | def index(self, id, data, log_suffix='unknown revision'): | ||||
"""Index sha1s' content and store result. | """Index sha1s' content and store result. | ||||
Args: | Args: | ||||
id (bytes): content's identifier | id (bytes): content's identifier | ||||
data (bytes): raw content in bytes | data (bytes): raw content in bytes | ||||
Returns: | Returns: | ||||
dict: dictionary representing a content_metadata. If the | dict: dictionary representing a content_metadata. If the | ||||
translation wasn't successful the translated_metadata keys will | translation wasn't successful the metadata keys will | ||||
be returned as None | be returned as None | ||||
""" | """ | ||||
result = { | result = { | ||||
'id': id, | 'id': id, | ||||
'indexer_configuration_id': self.tool['id'], | 'indexer_configuration_id': self.tool['id'], | ||||
'translated_metadata': None | 'metadata': None | ||||
} | } | ||||
try: | try: | ||||
mapping_name = self.tool['tool_configuration']['context'] | mapping_name = self.tool['tool_configuration']['context'] | ||||
log_suffix += ', content_id=%s' % hashutil.hash_to_hex(id) | log_suffix += ', content_id=%s' % hashutil.hash_to_hex(id) | ||||
result['translated_metadata'] = \ | result['metadata'] = \ | ||||
MAPPINGS[mapping_name](log_suffix).translate(data) | MAPPINGS[mapping_name](log_suffix).translate(data) | ||||
except Exception: | except Exception: | ||||
self.log.exception( | self.log.exception( | ||||
"Problem during metadata translation " | "Problem during metadata translation " | ||||
"for content %s" % hashutil.hash_to_hex(id)) | "for content %s" % hashutil.hash_to_hex(id)) | ||||
if result['translated_metadata'] is None: | if result['metadata'] is None: | ||||
return None | return None | ||||
return result | return result | ||||
def persist_index_computations(self, results, policy_update): | def persist_index_computations(self, results, policy_update): | ||||
"""Persist the results in storage. | """Persist the results in storage. | ||||
Args: | Args: | ||||
results ([dict]): list of content_metadata, dict with the | results ([dict]): list of content_metadata, dict with the | ||||
following keys: | following keys: | ||||
- id (bytes): content's identifier (sha1) | - id (bytes): content's identifier (sha1) | ||||
- translated_metadata (jsonb): detected metadata | - metadata (jsonb): detected metadata | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update ([str]): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
""" | """ | ||||
self.idx_storage.content_metadata_add( | self.idx_storage.content_metadata_add( | ||||
results, conflict_update=(policy_update == 'update-dups')) | results, conflict_update=(policy_update == 'update-dups')) | ||||
class RevisionMetadataIndexer(RevisionIndexer): | class RevisionMetadataIndexer(RevisionIndexer): | ||||
"""Revision-level indexer | """Revision-level indexer | ||||
This indexer is in charge of: | This indexer is in charge of: | ||||
- filtering revisions already indexed in revision_metadata table with | - filtering revisions already indexed in revision_intrinsic_metadata table | ||||
defined computation tool | with defined computation tool | ||||
- retrieve all entry_files in root directory | - retrieve all entry_files in root directory | ||||
- use metadata_detector for file_names containing metadata | - use metadata_detector for file_names containing metadata | ||||
- compute metadata translation if necessary and possible (depends on tool) | - compute metadata translation if necessary and possible (depends on tool) | ||||
- send sha1s to content indexing if possible | - send sha1s to content indexing if possible | ||||
- store the results for revision | - store the results for revision | ||||
""" | """ | ||||
ADDITIONAL_CONFIG = { | ADDITIONAL_CONFIG = { | ||||
'tools': ('dict', { | 'tools': ('dict', { | ||||
'name': 'swh-metadata-detector', | 'name': 'swh-metadata-detector', | ||||
'version': '0.0.2', | 'version': '0.0.2', | ||||
'configuration': { | 'configuration': { | ||||
}, | }, | ||||
}), | }), | ||||
} | } | ||||
def filter(self, sha1_gits): | def filter(self, sha1_gits): | ||||
"""Filter out known sha1s and return only missing ones. | """Filter out known sha1s and return only missing ones. | ||||
""" | """ | ||||
yield from self.idx_storage.revision_metadata_missing(( | yield from self.idx_storage.revision_intrinsic_metadata_missing(( | ||||
{ | { | ||||
'id': sha1_git, | 'id': sha1_git, | ||||
'indexer_configuration_id': self.tool['id'], | 'indexer_configuration_id': self.tool['id'], | ||||
} for sha1_git in sha1_gits | } for sha1_git in sha1_gits | ||||
)) | )) | ||||
def index(self, rev): | def index(self, rev): | ||||
"""Index rev by processing it and organizing result. | """Index rev by processing it and organizing result. | ||||
use metadata_detector to iterate on filenames | use metadata_detector to iterate on filenames | ||||
- if one filename detected -> sends file to content indexer | - if one filename detected -> sends file to content indexer | ||||
- if multiple file detected -> translation needed at revision level | - if multiple file detected -> translation needed at revision level | ||||
Args: | Args: | ||||
rev (dict): revision artifact from storage | rev (dict): revision artifact from storage | ||||
Returns: | Returns: | ||||
dict: dictionary representing a revision_metadata, with keys: | dict: dictionary representing a revision_intrinsic_metadata, with | ||||
keys: | |||||
- id (str): rev's identifier (sha1_git) | - id (str): rev's identifier (sha1_git) | ||||
- indexer_configuration_id (bytes): tool used | - indexer_configuration_id (bytes): tool used | ||||
- translated_metadata: dict of retrieved metadata | - metadata: dict of retrieved metadata | ||||
""" | """ | ||||
result = { | result = { | ||||
'id': rev['id'], | 'id': rev['id'], | ||||
'indexer_configuration_id': self.tool['id'], | 'indexer_configuration_id': self.tool['id'], | ||||
'mappings': None, | 'mappings': None, | ||||
'translated_metadata': None | 'metadata': None | ||||
} | } | ||||
try: | try: | ||||
root_dir = rev['directory'] | root_dir = rev['directory'] | ||||
dir_ls = self.storage.directory_ls(root_dir, recursive=False) | dir_ls = self.storage.directory_ls(root_dir, recursive=False) | ||||
files = [entry for entry in dir_ls if entry['type'] == 'file'] | files = [entry for entry in dir_ls if entry['type'] == 'file'] | ||||
detected_files = detect_metadata(files) | detected_files = detect_metadata(files) | ||||
(mappings, metadata) = self.translate_revision_metadata( | (mappings, metadata) = self.translate_revision_intrinsic_metadata( | ||||
detected_files, | detected_files, | ||||
log_suffix='revision=%s' % hashutil.hash_to_hex(rev['id'])) | log_suffix='revision=%s' % hashutil.hash_to_hex(rev['id'])) | ||||
result['mappings'] = mappings | result['mappings'] = mappings | ||||
result['translated_metadata'] = metadata | result['metadata'] = metadata | ||||
except Exception as e: | except Exception as e: | ||||
self.log.exception( | self.log.exception( | ||||
'Problem when indexing rev: %r', e) | 'Problem when indexing rev: %r', e) | ||||
return result | return result | ||||
def persist_index_computations(self, results, policy_update): | def persist_index_computations(self, results, policy_update): | ||||
"""Persist the results in storage. | """Persist the results in storage. | ||||
Args: | Args: | ||||
results ([dict]): list of content_mimetype, dict with the | results ([dict]): list of content_mimetype, dict with the | ||||
following keys: | following keys: | ||||
- id (bytes): content's identifier (sha1) | - id (bytes): content's identifier (sha1) | ||||
- mimetype (bytes): mimetype in bytes | - mimetype (bytes): mimetype in bytes | ||||
- encoding (bytes): encoding in bytes | - encoding (bytes): encoding in bytes | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update ([str]): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
""" | """ | ||||
# TODO: add functions in storage to keep data in revision_metadata | # TODO: add functions in storage to keep data in | ||||
self.idx_storage.revision_metadata_add( | # revision_intrinsic_metadata | ||||
self.idx_storage.revision_intrinsic_metadata_add( | |||||
results, conflict_update=(policy_update == 'update-dups')) | results, conflict_update=(policy_update == 'update-dups')) | ||||
def translate_revision_metadata(self, detected_files, log_suffix): | def translate_revision_intrinsic_metadata( | ||||
self, detected_files, log_suffix): | |||||
""" | """ | ||||
Determine plan of action to translate metadata when containing | Determine plan of action to translate metadata when containing | ||||
one or multiple detected files: | one or multiple detected files: | ||||
Args: | Args: | ||||
detected_files (dict): dictionary mapping context names (e.g., | detected_files (dict): dictionary mapping context names (e.g., | ||||
"npm", "authors") to list of sha1 | "npm", "authors") to list of sha1 | ||||
Returns: | Returns: | ||||
(List[str], dict): list of mappings used and dict with | (List[str], dict): list of mappings used and dict with | ||||
translated metadata according to the CodeMeta vocabulary | translated metadata according to the CodeMeta vocabulary | ||||
""" | """ | ||||
used_mappings = [MAPPINGS[context].name for context in detected_files] | used_mappings = [MAPPINGS[context].name for context in detected_files] | ||||
translated_metadata = [] | metadata = [] | ||||
tool = { | tool = { | ||||
'name': 'swh-metadata-translator', | 'name': 'swh-metadata-translator', | ||||
'version': '0.0.2', | 'version': '0.0.2', | ||||
'configuration': { | 'configuration': { | ||||
}, | }, | ||||
} | } | ||||
# TODO: iterate on each context, on each file | # TODO: iterate on each context, on each file | ||||
# -> get raw_contents | # -> get raw_contents | ||||
# -> translate each content | # -> translate each content | ||||
config = { | config = { | ||||
k: self.config[k] | k: self.config[k] | ||||
for k in [INDEXER_CFG_KEY, 'objstorage', 'storage'] | for k in [INDEXER_CFG_KEY, 'objstorage', 'storage'] | ||||
} | } | ||||
config['tools'] = [tool] | config['tools'] = [tool] | ||||
for context in detected_files.keys(): | for context in detected_files.keys(): | ||||
cfg = deepcopy(config) | cfg = deepcopy(config) | ||||
cfg['tools'][0]['configuration']['context'] = context | cfg['tools'][0]['configuration']['context'] = context | ||||
c_metadata_indexer = ContentMetadataIndexer(config=cfg) | c_metadata_indexer = ContentMetadataIndexer(config=cfg) | ||||
# sha1s that are in content_metadata table | # sha1s that are in content_metadata table | ||||
sha1s_in_storage = [] | sha1s_in_storage = [] | ||||
metadata_generator = self.idx_storage.content_metadata_get( | metadata_generator = self.idx_storage.content_metadata_get( | ||||
detected_files[context]) | detected_files[context]) | ||||
for c in metadata_generator: | for c in metadata_generator: | ||||
# extracting translated_metadata | # extracting metadata | ||||
sha1 = c['id'] | sha1 = c['id'] | ||||
sha1s_in_storage.append(sha1) | sha1s_in_storage.append(sha1) | ||||
local_metadata = c['translated_metadata'] | local_metadata = c['metadata'] | ||||
# local metadata is aggregated | # local metadata is aggregated | ||||
if local_metadata: | if local_metadata: | ||||
translated_metadata.append(local_metadata) | metadata.append(local_metadata) | ||||
sha1s_filtered = [item for item in detected_files[context] | sha1s_filtered = [item for item in detected_files[context] | ||||
if item not in sha1s_in_storage] | if item not in sha1s_in_storage] | ||||
if sha1s_filtered: | if sha1s_filtered: | ||||
# content indexing | # content indexing | ||||
try: | try: | ||||
c_metadata_indexer.run(sha1s_filtered, | c_metadata_indexer.run(sha1s_filtered, | ||||
policy_update='ignore-dups', | policy_update='ignore-dups', | ||||
log_suffix=log_suffix) | log_suffix=log_suffix) | ||||
# on the fly possibility: | # on the fly possibility: | ||||
for result in c_metadata_indexer.results: | for result in c_metadata_indexer.results: | ||||
local_metadata = result['translated_metadata'] | local_metadata = result['metadata'] | ||||
translated_metadata.append(local_metadata) | metadata.append(local_metadata) | ||||
except Exception: | except Exception: | ||||
self.log.exception( | self.log.exception( | ||||
"Exception while indexing metadata on contents") | "Exception while indexing metadata on contents") | ||||
# transform translated_metadata into min set with swh-metadata-detector | # transform metadata into min set with swh-metadata-detector | ||||
min_metadata = extract_minimal_metadata_dict(translated_metadata) | min_metadata = extract_minimal_metadata_dict(metadata) | ||||
return (used_mappings, min_metadata) | return (used_mappings, min_metadata) | ||||
class OriginMetadataIndexer(OriginIndexer): | class OriginMetadataIndexer(OriginIndexer): | ||||
ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG | ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG | ||||
USE_TOOLS = False | USE_TOOLS = False | ||||
Show All 19 Lines | def index_list(self, origins): | ||||
if not rev: | if not rev: | ||||
self.log.warning('Missing head revision of origin %r', | self.log.warning('Missing head revision of origin %r', | ||||
origin) | origin) | ||||
continue | continue | ||||
rev_metadata = self.revision_metadata_indexer.index(rev) | rev_metadata = self.revision_metadata_indexer.index(rev) | ||||
orig_metadata = { | orig_metadata = { | ||||
'from_revision': rev_metadata['id'], | 'from_revision': rev_metadata['id'], | ||||
'origin_id': origin['id'], | 'id': origin['id'], | ||||
'metadata': rev_metadata['translated_metadata'], | 'metadata': rev_metadata['metadata'], | ||||
'mappings': rev_metadata['mappings'], | 'mappings': rev_metadata['mappings'], | ||||
'indexer_configuration_id': | 'indexer_configuration_id': | ||||
rev_metadata['indexer_configuration_id'], | rev_metadata['indexer_configuration_id'], | ||||
} | } | ||||
results.append((orig_metadata, rev_metadata)) | results.append((orig_metadata, rev_metadata)) | ||||
return results | return results | ||||
def persist_index_computations(self, results, policy_update): | def persist_index_computations(self, results, policy_update): | ||||
Show All 15 Lines | def persist_index_computations(self, results, policy_update): | ||||
orig_metadata.append(orig_item) | orig_metadata.append(orig_item) | ||||
else: | else: | ||||
if rev_item not in revs_to_delete: | if rev_item not in revs_to_delete: | ||||
revs_to_delete.append(rev_item) | revs_to_delete.append(rev_item) | ||||
if orig_item not in origs_to_delete: | if orig_item not in origs_to_delete: | ||||
origs_to_delete.append(orig_item) | origs_to_delete.append(orig_item) | ||||
if rev_metadata: | if rev_metadata: | ||||
self.idx_storage.revision_metadata_add( | self.idx_storage.revision_intrinsic_metadata_add( | ||||
rev_metadata, conflict_update=conflict_update) | rev_metadata, conflict_update=conflict_update) | ||||
if orig_metadata: | if orig_metadata: | ||||
self.idx_storage.origin_intrinsic_metadata_add( | self.idx_storage.origin_intrinsic_metadata_add( | ||||
orig_metadata, conflict_update=conflict_update) | orig_metadata, conflict_update=conflict_update) | ||||
# revs_to_delete should always be empty unless we changed a mapping | # revs_to_delete should always be empty unless we changed a mapping | ||||
# to detect less files. | # to detect less files. | ||||
# However, origs_to_delete may be empty whenever an upstream deletes | # However, origs_to_delete may be empty whenever an upstream deletes | ||||
# a metadata file. | # a metadata file. | ||||
if origs_to_delete: | if origs_to_delete: | ||||
self.idx_storage.origin_intrinsic_metadata_delete(origs_to_delete) | self.idx_storage.origin_intrinsic_metadata_delete(origs_to_delete) | ||||
if revs_to_delete: | if revs_to_delete: | ||||
self.idx_storage.revision_metadata_delete(revs_to_delete) | self.idx_storage.revision_intrinsic_metadata_delete(revs_to_delete) |