diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 25d2332..31e8956 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,300 +1,300 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from swh.indexer.indexer import ContentIndexer, RevisionIndexer from swh.indexer.metadata_dictionary import compute_metadata from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.indexer.storage import INDEXER_CFG_KEY from swh.model import hashutil class ContentMetadataIndexer(ContentIndexer): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ CONFIG_BASE_FILENAME = 'indexer/metadata' def __init__(self, tool, config): # twisted way to use the exact same config of RevisionMetadataIndexer # object that uses internally ContentMetadataIndexer self.config = config self.config['tools'] = tool super().__init__() def prepare(self): self.results = [] if self.config[INDEXER_CFG_KEY]: self.idx_storage = self.config[INDEXER_CFG_KEY] if self.config['objstorage']: self.objstorage = self.config['objstorage'] _log = logging.getLogger('requests.packages.urllib3.connectionpool') _log.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = self.register_tools(self.config['tools']) # NOTE: only one tool so far, change when no longer true self.tool = self.tools[0] def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_metadata_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'], } for sha1 in ids )) def index(self, id, data): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the translated_metadata keys will be returned as None """ result = { 'id': id, 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } try: context = self.tool['tool_configuration']['context'] result['translated_metadata'] = compute_metadata(context, data) # a twisted way to keep result with indexer object for get_results self.results.append(result) except Exception: self.log.exception( "Problem during tool retrieval of metadata translation") return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - translated_metadata (jsonb): detected metadata policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def get_results(self): """can be called only if run method was called before Returns: list: list of content_metadata entries calculated by current indexer """ return self.results class RevisionMetadataIndexer(RevisionIndexer): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_metadata table with defined computation tool - retrieve all entry_files in root directory - - use metadata_detector for file_names containig metadata + - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ CONFIG_BASE_FILENAME = 'indexer/metadata' ADDITIONAL_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'tools': ('dict', { 'name': 'swh-metadata-detector', 'version': '0.0.1', 'configuration': { 'type': 'local', 'context': ['npm', 'codemeta'] }, }), } def prepare(self): super().prepare() self.tool = self.tools[0] def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.revision_metadata_missing(( { 'id': sha1_git, 'indexer_configuration_id': self.tool['id'], } for sha1_git in sha1_gits )) def index(self, rev): """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: rev (bytes): revision artifact from storage Returns: dict: dictionary representing a revision_metadata, with keys: - id (bytes): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - translated_metadata (bytes): dict of retrieved metadata """ try: result = { 'id': rev['id'], 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } root_dir = rev['directory'] dir_ls = self.storage.directory_ls(root_dir, recursive=False) files = (entry for entry in dir_ls if entry['type'] == 'file') detected_files = detect_metadata(files) result['translated_metadata'] = self.translate_revision_metadata( detected_files) except Exception as e: self.log.exception( 'Problem when indexing rev') return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in revision_metadata self.idx_storage.revision_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_metadata(self, detected_files): """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files (dict): dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: dict: dict with translated metadata according to the CodeMeta vocabulary """ translated_metadata = [] tool = { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': { 'type': 'local', 'context': None }, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = { INDEXER_CFG_KEY: self.idx_storage, 'objstorage': self.objstorage } for context in detected_files.keys(): tool['configuration']['context'] = context c_metadata_indexer = ContentMetadataIndexer(tool, config) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context]) for c in metadata_generator: # extracting translated_metadata sha1 = c['id'] sha1s_in_storage.append(sha1) local_metadata = c['translated_metadata'] # local metadata is aggregated if local_metadata: translated_metadata.append(local_metadata) sha1s_filtered = [item for item in detected_files[context] if item not in sha1s_in_storage] if sha1s_filtered: # schedule indexation of content try: c_metadata_indexer.run(sha1s_filtered, policy_update='ignore-dups') # on the fly possibility: results = c_metadata_indexer.get_results() for result in results: local_metadata = result['translated_metadata'] translated_metadata.append(local_metadata) except Exception as e: self.log.warn("""Exception while indexing content""", e) # transform translated_metadata into min set with swh-metadata-detector min_metadata = extract_minimal_metadata_dict(translated_metadata) return min_metadata @click.command() @click.option('--revs', '-i', default=['8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', '026040ea79dec1b49b4e3e7beda9132b6b26b51b', '9699072e21eded4be8d45e3b8d543952533fa190'], help='Default sha1_git to lookup', multiple=True) def main(revs): _git_sha1s = list(map(hashutil.hash_to_bytes, revs)) rev_metadata_indexer = RevisionMetadataIndexer() rev_metadata_indexer.run(_git_sha1s, 'update-dups') if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py index 4530e2d..464ffa1 100644 --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -1,189 +1,189 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import itertools from collections import defaultdict from swh.core import utils from swh.core.config import SWHConfig from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.storage import get_storage from swh.scheduler.utils import get_task class RecomputeChecksums(SWHConfig): """Class in charge of (re)computing content's hashes. Hashes to compute are defined across 2 configuration options: compute_checksums ([str]) list of hash algorithms that py:func:`swh.model.hashutil.hash_data` function should be able to deal with. For variable-length checksums, a desired checksum length should also be provided. Their format is : e.g: blake2:512 recompute_checksums (bool) a boolean to notify that we also want to recompute potential existing hashes specified in compute_checksums. Default to False. """ DEFAULT_CONFIG = { # The storage to read from or update metadata to 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), # The objstorage to read contents' data from 'objstorage': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6', }, }), # the set of checksums that should be computed. # Examples: 'sha1_git', 'blake2b512', 'blake2s256' 'compute_checksums': ( 'list[str]', []), # whether checksums that already exist in the DB should be # recomputed/updated or left untouched 'recompute_checksums': ('bool', False), # Number of contents to retrieve blobs at the same time 'batch_size_retrieve_content': ('int', 10), # Number of contents to update at the same time 'batch_size_update': ('int', 100), # Rescheduling task on error (if None, nothing is done) 'rescheduling_task': ('str', None), } CONFIG_BASE_FILENAME = 'indexer/rehash' def __init__(self): self.config = self.parse_config_file() self.storage = get_storage(**self.config['storage']) self.objstorage = get_objstorage(**self.config['objstorage']) self.compute_checksums = self.config['compute_checksums'] self.recompute_checksums = self.config[ 'recompute_checksums'] self.batch_size_retrieve_content = self.config[ 'batch_size_retrieve_content'] self.batch_size_update = self.config[ 'batch_size_update'] self.log = logging.getLogger('swh.indexer.rehash') rescheduling_task = self.config['rescheduling_task'] if rescheduling_task: self.rescheduling_task = get_task(rescheduling_task) else: self.rescheduling_task = None if not self.compute_checksums: raise ValueError('Checksums list should not be empty.') def _read_content_ids(self, contents): """Read the content identifiers from the contents. """ for c in contents: h = c['sha1'] if isinstance(h, str): h = hashutil.hash_to_bytes(h) yield h def get_new_contents_metadata(self, all_contents): """Retrieve raw contents and compute new checksums on the contents. Unknown or corrupted contents are skipped. Args: all_contents ([dict]): List of contents as dictionary with the necessary primary keys checksum_algorithms ([str]): List of checksums to compute Yields: tuple of: content to update, list of checksums computed """ content_ids = self._read_content_ids(all_contents) for contents in utils.grouper(content_ids, self.batch_size_retrieve_content): contents_iter = itertools.tee(contents, 2) try: content_metadata = self.storage.content_get_metadata( [s for s in contents_iter[0]]) except Exception: self.log.exception( 'Problem when reading contents metadata.') if self.rescheduling_task: self.log.warn('Rescheduling batch.') cs = [{'sha1': sha1} for sha1 in contents_iter[1]] self.rescheduling_task.delay(cs) continue for content in content_metadata: if self.recompute_checksums: # Recompute checksums provided # in compute_checksums options checksums_to_compute = list(self.compute_checksums) - else: # Compute checkums provided in compute_checksums + else: # Compute checksums provided in compute_checksums # options not already defined for that content checksums_to_compute = [h for h in self.compute_checksums if not content.get(h)] if not checksums_to_compute: # Nothing to recompute continue try: raw_content = self.objstorage.get(content['sha1']) except ObjNotFoundError: self.log.warn('Content %s not found in objstorage!' % content['sha1']) continue # Actually computing the checksums for that content updated_content = hashutil.hash_data( raw_content, algorithms=checksums_to_compute) content.update(updated_content) yield content, checksums_to_compute def run(self, contents): """Given a list of content: - (re)compute a given set of checksums on contents available in our object storage - update those contents with the new metadata Args: contents (dict): contents as dictionary with necessary keys. key present in such dictionary should be the ones defined in the 'primary_key' option. """ for data in utils.grouper( self.get_new_contents_metadata(contents), self.batch_size_update): groups = defaultdict(list) for content, keys_to_update in data: keys = ','.join(keys_to_update) groups[keys].append(content) for keys_to_update, contents in groups.items(): keys = keys_to_update.split(',') try: self.storage.content_update(contents, keys=keys) except Exception: self.log.exception('Problem during update.') if self.rescheduling_task: self.log.warn('Rescheduling batch.') cs = [{'sha1': c['sha1']} for c in contents] self.rescheduling_task.delay(cs) continue diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py index 2953bfc..00974d5 100644 --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -1,305 +1,305 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import logging from nose.tools import istest from swh.indexer.metadata_dictionary import compute_metadata from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata import ContentMetadataIndexer from swh.indexer.metadata import RevisionMetadataIndexer from swh.indexer.tests.test_utils import MockObjStorage, MockStorage from swh.indexer.tests.test_utils import MockIndexerStorage class TestContentMetadataIndexer(ContentMetadataIndexer): """Specific Metadata whose configuration is enough to satisfy the indexing tests. """ def prepare(self): self.config.update({ 'rescheduling_task': None, }) self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.task_destination = None self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] self.results = [] class TestRevisionMetadataIndexer(RevisionMetadataIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ def prepare(self): self.config = { 'rescheduling_task': None, 'storage': { 'cls': 'remote', 'args': { 'url': 'http://localhost:9999', } }, 'tools': { 'name': 'swh-metadata-detector', 'version': '0.0.1', 'configuration': { 'type': 'local', 'context': 'npm' } } } self.storage = MockStorage() self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.task_destination = None self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] self.results = [] class Metadata(unittest.TestCase): """ Tests metadata_mock_tool tool for Metadata detection """ def setUp(self): """ shows the entire diff in the results """ self.maxDiff = None self.content_tool = { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': { 'type': 'local', 'context': 'npm' } } @istest def test_compute_metadata_none(self): """ testing content empty content is empty should return None """ # given content = b"" context = "npm" # None if no metadata was found or an error occurred declared_metadata = None # when result = compute_metadata(context, content) # then self.assertEqual(declared_metadata, result) @istest def test_compute_metadata_npm(self): """ testing only computation of metadata with hard_mapping_npm """ # given content = b""" { "name": "test_metadata", "version": "0.0.1", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" } } """ declared_metadata = { 'name': 'test_metadata', 'version': '0.0.1', 'description': 'Simple package.json test for indexer', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'other': {} } # when result = compute_metadata("npm", content) # then self.assertEqual(declared_metadata, result) @istest def test_index_content_metadata_npm(self): """ testing NPM with package.json - one sha1 uses a file that can't be translated to metadata and should return None in the translated metadata """ # given sha1s = ['26a9f72a7c87cc9205725cfd879f514ff4f3d8d5', 'd4c647f0fc257591cc9ba1722484229780d1c607', '02fb2c89e14f7fab46701478c83779c7beb7b069'] # this metadata indexer computes only metadata for package.json # in npm context with a hard mapping metadata_indexer = TestContentMetadataIndexer( tool=self.content_tool, config={}) # when metadata_indexer.run(sha1s, policy_update='ignore-dups') results = metadata_indexer.idx_storage.state expected_results = [{ 'indexer_configuration_id': 30, 'translated_metadata': { 'other': {}, 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'description': 'Simple package.json test for indexer', 'name': 'test_metadata', 'version': '0.0.1' }, 'id': '26a9f72a7c87cc9205725cfd879f514ff4f3d8d5' }, { 'indexer_configuration_id': 30, 'translated_metadata': { 'softwareRequirements': { 'JSONStream': '~1.3.1', 'abbrev': '~1.1.0', 'ansi-regex': '~2.1.1', 'ansicolors': '~0.3.2', 'ansistyles': '~0.1.3' }, 'issueTracker': { 'url': 'https://github.com/npm/npm/issues' }, 'author': 'Isaac Z. Schlueter (http://blog.izs.me)', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/npm/npm' }, 'description': 'a package manager for JavaScript', 'softwareSuggestions': { 'tacks': '~1.2.6', 'tap': '~10.3.2' }, 'license': 'Artistic-2.0', 'version': '5.0.3', 'other': { 'preferGlobal': True, 'config': { 'publishtest': False } }, 'name': 'npm', 'keywords': [ 'install', 'modules', 'package manager', 'package.json' ], 'url': 'https://docs.npmjs.com/' }, 'id': 'd4c647f0fc257591cc9ba1722484229780d1c607' }, { 'indexer_configuration_id': 30, 'translated_metadata': None, 'id': '02fb2c89e14f7fab46701478c83779c7beb7b069' }] - # The assertion bellow returns False sometimes because of nested lists + # The assertion below returns False sometimes because of nested lists self.assertEqual(expected_results, results) @istest def test_detect_metadata_package_json(self): # given df = [{ 'sha1_git': b'abc', 'name': b'index.js', 'target': b'abc', 'length': 897, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'dir_a', 'sha1': b'bcd' }, { 'sha1_git': b'aab', 'name': b'package.json', 'target': b'aab', 'length': 712, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'dir_a', 'sha1': b'cde' }] # when results = detect_metadata(df) expected_results = { 'npm': [ b'cde' ] } # then self.assertEqual(expected_results, results) @istest def test_revision_metadata_indexer(self): metadata_indexer = TestRevisionMetadataIndexer() sha1_gits = [ b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', ] metadata_indexer.run(sha1_gits, 'update-dups') results = metadata_indexer.idx_storage.state expected_results = [{ 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'translated_metadata': { 'identifier': None, 'maintainer': None, 'url': [ 'https://github.com/librariesio/yarn-parser#readme' ], 'codeRepository': [{ 'type': 'git', 'url': 'git+https://github.com/librariesio/yarn-parser.git' }], 'author': ['Andrew Nesbitt'], 'license': ['AGPL-3.0'], 'version': ['1.0.0'], 'description': [ 'Tiny web service for parsing yarn.lock files' ], 'relatedLink': None, 'developmentStatus': None, 'operatingSystem': None, 'issueTracker': [{ 'url': 'https://github.com/librariesio/yarn-parser/issues' }], 'softwareRequirements': [{ 'express': '^4.14.0', 'yarn': '^0.21.0', 'body-parser': '^1.15.2' }], 'name': ['yarn-parser'], 'keywords': [['yarn', 'parse', 'lock', 'dependencies']], 'type': None, 'email': None }, 'indexer_configuration_id': 7 }] # then self.assertEqual(expected_results, results)