diff --git a/swh/indexer/ctags.py b/swh/indexer/ctags.py index a2cac90..aa118b3 100644 --- a/swh/indexer/ctags.py +++ b/swh/indexer/ctags.py @@ -1,160 +1,160 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import subprocess import json from swh.model import hashutil from .language import compute_language -from .indexer import BaseIndexer, DiskIndexer +from .indexer import ContentIndexer, DiskIndexer # Options used to compute tags __FLAGS = [ '--fields=+lnz', # +l: language # +n: line number of tag definition # +z: include the symbol's kind (function, variable, ...) '--sort=no', # sort output on tag name '--links=no', # do not follow symlinks '--output-format=json', # outputs in json ] def run_ctags(path, lang=None, ctags_command='ctags'): """Run ctags on file path with optional language. Args: path: path to the file lang: language for that path (optional) Returns: ctags' output """ optional = [] if lang: optional = ['--language-force=%s' % lang] cmd = [ctags_command] + __FLAGS + optional + [path] output = subprocess.check_output(cmd, universal_newlines=True) for symbol in output.split('\n'): if not symbol: continue js_symbol = json.loads(symbol) yield { 'name': js_symbol['name'], 'kind': js_symbol['kind'], 'line': js_symbol['line'], 'lang': js_symbol['language'], } -class CtagsIndexer(BaseIndexer, DiskIndexer): +class CtagsIndexer(ContentIndexer, DiskIndexer): CONFIG_BASE_FILENAME = 'indexer/ctags' ADDITIONAL_CONFIG = { 'workdir': ('str', '/tmp/swh/indexer.ctags'), 'tools': ('dict', { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' '''--output-format=json ''' }, }), 'languages': ('dict', { 'ada': 'Ada', 'adl': None, 'agda': None, # ... }) } def prepare(self): super().prepare() self.working_directory = self.config['workdir'] self.language_map = self.config['languages'] - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_ctags_missing(( { 'id': sha1, 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - ctags ([dict]): ctags list of symbols """ lang = compute_language(raw_content, log=self.log)['lang'] if not lang: return None ctags_lang = self.language_map.get(lang) if not ctags_lang: return None ctags = { 'id': sha1, } filename = hashutil.hash_to_hex(sha1) content_path = self.write_to_temp( filename=filename, data=raw_content) result = run_ctags(content_path, lang=ctags_lang) ctags.update({ 'ctags': list(result), 'indexer_configuration_id': self.tools['id'], }) self.cleanup(content_path) return ctags def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - ctags ([dict]): ctags list of symbols policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_ctags_add( results, conflict_update=(policy_update == 'update-dups')) @click.command() @click.option('--path', help="Path to execute index on") def main(path): r = list(run_ctags(path)) print(r) if __name__ == '__main__': main() diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py index 642c548..ca79dab 100644 --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -1,140 +1,140 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import subprocess from swh.model import hashutil -from .indexer import BaseIndexer, DiskIndexer +from .indexer import ContentIndexer, DiskIndexer def compute_license(path, log=None): """Determine license from file at path. Args: path: filepath to determine the license Returns: A dict with the following keys: - licenses ([str]): associated detected licenses to path - path (bytes): content filepath - tool (str): tool used to compute the output """ try: properties = subprocess.check_output(['nomossa', path], universal_newlines=True) if properties: res = properties.rstrip().split(' contains license(s) ') licenses = res[1].split(',') return { 'licenses': licenses, 'path': path, } except subprocess.CalledProcessError: if log: from os import path as __path log.exception('Problem during license detection for sha1 %s' % __path.basename(path)) return { 'licenses': [], 'path': path, } -class ContentFossologyLicenseIndexer(BaseIndexer, DiskIndexer): +class ContentFossologyLicenseIndexer(ContentIndexer, DiskIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {license, encoding} from that content - store result in storage """ ADDITIONAL_CONFIG = { 'workdir': ('str', '/tmp/swh/indexer.fossology.license'), 'tools': ('dict', { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, }), } CONFIG_BASE_FILENAME = 'indexer/fossology_license' def prepare(self): super().prepare() self.working_directory = self.config['workdir'] - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_fossology_license_missing(( { 'id': sha1, 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_license, with keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path """ filename = hashutil.hash_to_hex(sha1) content_path = self.write_to_temp( filename=filename, data=raw_content) try: properties = compute_license(path=content_path, log=self.log) properties.update({ 'id': sha1, 'indexer_configuration_id': self.tools['id'], }) finally: self.cleanup(content_path) return properties def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_license, dict with the following keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_fossology_license_add( results, conflict_update=(policy_update == 'update-dups')) @click.command(help='Compute license for path using tool') @click.option('--tool', default='nomossa', help="Path to tool") @click.option('--path', required=1, help="Path to execute index on") def main(tool, path): print(compute_license(tool, path)) if __name__ == '__main__': main() diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 1db4251..638a952 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,306 +1,386 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil from swh.storage import get_storage from swh.scheduler.utils import get_task class DiskIndexer: """Mixin intended to be used with other *Indexer classes. Indexer* inheriting from this class are a category of indexers which needs the disk for their computations. Expects: self.working_directory variable defined at runtime. """ def __init__(self): super().__init__() def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the `run` functions which is in charge to - trigger the computations on the sha1s batch received. + trigger the computations on the ids batch received. Indexers can: - - filter out sha1 whose data has already been indexed. - - retrieve sha1's content from objstorage, index this content then - store the result in storage. + - filter out ids whose data has already been indexed. + - retrieve ids data from storage or objstorage + - index this data depending on the object and store the result in storage. - To implement a new index, inherit from this class and implement - the following functions: + To implement a new object type indexer, inherit from the BaseIndexer and + implement the process of indexation : - - def filter_contents(self, sha1s): filter out data already + - def run(self, object_ids, policy_update): object_ids are different + depending on object. For example: sha1 for content, sha1_git for + revision, directorie, release, and id for origin + + To implement a new concrete indexer, inherit from the object level classes: + ContentIndexer, RevisionIndexer + (later on OriginIndexer will also be available) + + Then you need to implement the following functions: + + - def filter(self, ids): filter out data already indexed (in storage). This function is used by the orchestrator and not directly by the indexer (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). - - def index_content(self, sha1, raw_content): compute index on - sha1 with data raw_content (retrieved in the objstorage by the - sha1 key) and return the resulting index computation. + - def index_object(self, id, data): compute index on + id with data (retrieved from the storage or the objstorage by the + id key) and return the resulting index computation. - def persist_index_computations(self, results, policy_update): persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: - def prepare(self): Configuration preparation for the indexer. When overriding, this must call the super().prepare() function. - def check(self): Configuration check for the indexer. When overriding, this must call the super().check() function. - def retrieve_tools_information(self): This should return a dict of the tool(s) to use when indexing or filtering. """ CONFIG = 'indexer/base' DEFAULT_CONFIG = { 'storage': ('dict', { 'host': 'uffizi', 'cls': 'remote', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'} }), # queue to reschedule if problem (none for no rescheduling, # the default) 'rescheduling_task': ('str', None), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): """Prepare and check that the indexer is ready to run. """ super().__init__() self.prepare() self.check() def prepare(self): """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) storage = self.config['storage'] self.storage = get_storage(storage['cls'], storage['args']) rescheduling_task = self.config['rescheduling_task'] if rescheduling_task: self.rescheduling_task = get_task(rescheduling_task) else: self.rescheduling_task = None l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = self.retrieve_tools_information() def check(self): """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if not self.tools: raise ValueError('Tools %s is unknown, cannot continue' % self.config['tools']) def retrieve_tools_information(self): """Permit to define how to retrieve tool information based on configuration. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) """ tool = { 'tool_%s' % key: value for key, value in self.config['tools'].items() } return self.storage.indexer_configuration_get(tool) @abc.abstractmethod - def filter_contents(self, sha1s): - """Filter missing sha1 for that particular indexer. + def filter(self, ids): + """Filter missing ids for that particular indexer. Args: - sha1s ([bytes]): list of contents' sha1 + ids ([bytes]): list of ids Yields: - iterator of missing sha1 + iterator of missing ids """ pass @abc.abstractmethod - def index_content(self, sha1, content): + def index(self, id, data): """Index computation for the sha1 and associated raw content. Args: - sha1 (bytes): sha1 identifier - content (bytes): sha1's raw content + id (bytes): sha1 identifier + content (bytes): id's data from storage or objstorage depending on + object type Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the - result of the index_content function. + result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them Returns: None """ pass def next_step(self, results): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned - by index_content function. + by index function. Returns: None """ pass + @abc.abstractmethod + def run(self, ids, policy_update): + """Given a list of ids: + - retrieves the data from the storage + - executes the indexing computations + - stores the results (according to policy_update) + + Args: + ids ([bytes]): id's identifier list + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them + + """ + pass + + +class ContentIndexer(BaseIndexer): + """ + An object type indexer, inherit from the BaseIndexer and + implement the process of indexation for Contents with the run method + + Note: the ContentIndexer is not an instantiable object + to use it in another context one should refer to the instructions in the + BaseIndexer + """ + def run(self, sha1s, policy_update): """Given a list of sha1s: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: sha1s ([bytes]): sha1's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ results = [] try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warn('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue - res = self.index_content(sha1, raw_content) + res = self.index(sha1, raw_content) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) self.next_step(results) except Exception: self.log.exception( 'Problem when reading contents metadata.') if self.rescheduling_task: self.log.warn('Rescheduling batch') self.rescheduling_task.delay(sha1s, policy_update) + + +class RevisionIndexer(BaseIndexer): + """ + An object type indexer, inherit from the BaseIndexer and + implement the process of indexation for Revisions with the run method + + Note: the RevisionIndexer is not an instantiable object + to use it in another context one should refer to the instructions in the + BaseIndexer + """ + + def run(self, sha1_gits, policy_update): + """ + Given a list of sha1_gits: + - retrieve revsions from storage + - execute the indexing computations + - store the results (according to policy_update) + Args: + sha1_gits ([bytes]): sha1_git's identifier list + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them + + """ + results = [] + try: + for sha1_git in sha1_gits: + try: + revs = self.storage.revision_get([sha1_git]) + except ValueError: + self.log.warn('Revision %s not found in storage' % + hashutil.hash_to_hex(sha1_git)) + continue + for rev in revs: + if rev: # If no revision, skip it + res = self.index(rev) + print(res) + if res: # If no results, skip it + results.append(res) + self.persist_index_computations(results, policy_update) + except Exception: + self.log.exception( + 'Problem when processing revision') diff --git a/swh/indexer/language.py b/swh/indexer/language.py index d282c41..8028c6e 100644 --- a/swh/indexer/language.py +++ b/swh/indexer/language.py @@ -1,206 +1,206 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io from pygments.lexers import guess_lexer from pygments.util import ClassNotFound from chardet.universaldetector import UniversalDetector -from .indexer import BaseIndexer +from .indexer import ContentIndexer def _cleanup_classname(classname): """Determine the language from the pygments' lexer names. """ return classname.lower().replace(' ', '-') def _read_raw(raw_content, size=2048): """Read raw content in chunk. """ bs = io.BytesIO(raw_content) while True: chunk = bs.read(size) if not chunk: break yield chunk def _detect_encoding(raw_content): """Given a raw content, try and detect its encoding. """ detector = UniversalDetector() for chunk in _read_raw(raw_content): detector.feed(chunk) if detector.done: break detector.close() return detector.result['encoding'] def compute_language_from_chunk(encoding, length, raw_content, max_size, log=None): """Determine the raw content's language. Args: encoding (str): Encoding to use to decode the content length (int): raw_content's length raw_content (bytes): raw content to work with max_size (int): max size to split the raw content at Returns: Dict with keys: - lang: None if nothing found or the possible language """ try: if max_size <= length: raw_content = raw_content[0:max_size] content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) except ClassNotFound: lang = None except UnicodeDecodeError: raise except Exception: if log: log.exception('Problem during language detection, skipping') lang = None return { 'lang': lang } def compute_language(raw_content, encoding=None, log=None): """Determine the raw content's language. Args: raw_content (bytes): raw content to work with Returns: Dict with keys: - lang: None if nothing found or the possible language """ try: encoding = _detect_encoding(raw_content) content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) except ClassNotFound: lang = None except Exception: if log: log.exception('Problem during language detection, skipping') lang = None return { 'lang': lang } -class ContentLanguageIndexer(BaseIndexer): +class ContentLanguageIndexer(ContentIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ CONFIG_BASE_FILENAME = 'indexer/language' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'pygments', 'version': '2.0.1+dfsg-1.1+deb8u1', 'configuration': { 'type': 'library', 'debian-package': 'python3-pygments', 'max_content_size': 10240, }, }), } def prepare(self): super().prepare() c = self.config self.max_content_size = c['tools']['configuration']['max_content_size'] - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_language_missing(( { 'id': sha1, 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language """ result = { 'id': sha1, 'indexer_configuration_id': self.tools['id'], 'lang': None, } encoding = _detect_encoding(raw_content) if not encoding: return result l = len(raw_content) for i in range(0, 9): max_size = self.max_content_size + i try: result = compute_language_from_chunk( encoding, l, raw_content, max_size, log=self.log) except UnicodeDecodeError: self.log.warn('Decoding failed on wrong byte chunk at [0-%s]' ', trying again at next ending byte.' % max_size) continue # we found something, so we return it result.update({ 'id': sha1, 'indexer_configuration_id': self.tools['id'], }) break return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_language_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 8b07d84..7211e1d 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,84 +1,259 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from .indexer import BaseIndexer +from swh.indexer.indexer import ContentIndexer, RevisionIndexer from swh.indexer.metadata_dictionary import compute_metadata +from swh.indexer.metadata_detector import detect_metadata +from swh.indexer.metadata_detector import extract_minimal_metadata_dict +from swh.model import hashutil +from swh.objstorage.exc import ObjNotFoundError -class ContentMetadataIndexer(BaseIndexer): - """Indexer in charge of: - - filtering out content already indexed + +class ContentMetadataIndexer(ContentIndexer): + """Indexer at content level in charge of: + - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - - using the MetadataDict and a tool for each context - - store result in storage + - using the metadata_dictionary as the 'swh-metadata-translator' tool + - store result in content_metadata table """ CONFIG_BASE_FILENAME = 'indexer/metadata' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': { 'type': 'local', 'context': 'npm' }, }), } + def __init__(self): + self.config = self.parse_config_file( + config_filename="~/.config/swh/storage.yml", + additional_configs=[self.ADDITIONAL_CONFIG]) + super().__init__() + def prepare(self): super().prepare() + self.results = [] - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_metadata_missing(( { 'id': sha1, 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: result (dict): representing a content_metadata if translation wasn't successful the translated_metadata keys will be kept as None """ result = { 'id': sha1, 'indexer_configuration_id': self.tools['id'], 'translated_metadata': None } try: context = self.tools['configuration']['context'] result['translated_metadata'] = compute_metadata( context, raw_content) + # a twisted way to keep result with indexer object for get_results + self.results.append(result) except: self.log.exception( "Problem during tool retrieval of metadata translation") return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - translated_metadata (jsonb): detected metadata policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) + + def get_results(self): + """ + can be called only if run method was called before + + Returns: + results (list): list of content_metadata entries calculated + by current indxer + """ + return self.results + + +class RevisionMetadataIndexer(RevisionIndexer): + """Indexer at Revision level in charge of: + - filtering revisions already indexed in revision_metadata table with + defined computation tool + - retrieve all entry_files in root directory + - use metadata_detector for file_names containig metadata + - compute metadata translation if necessary and possible (depends on tool) + - send sha1s to content indexing if possible + - store the results for revision + + """ + ADDITIONAL_CONFIG = { + 'destination_queue': ('str', None), + 'tools': ('dict', { + 'name': 'swh-metadata-detector', + 'version': '0.1', + 'configuration': { + 'type': 'local', + 'contexts': ['npm'] + }, + }), + } + + def prepare(self): + super().prepare() + self.tools = self.retrieve_tools_information() + print(self.tools) + + def filter(self, sha1_gits): + """Filter out known sha1s and return only missing ones. + + """ + yield from self.storage.revision_metadata_missing(( + { + 'id': sha1_git, + 'indexer_configuration_id': self.tools['id'], + } for sha1_git in sha1_gits + )) + + def index(self, rev): + """Index rev by processing it and organizing result. + use metadata_detector to iterate on filenames + - if one filename detected -> sends file to content indexer + - if multiple file detected -> translation needed at revision level + + Args: + rev (bytes): revision artifact from storage + + Returns: + A dict, representing a revision_metadata, with keys: + - id (bytes): rev's identifier (sha1_git) + - indexer_configuration_id (bytes): tool used + - translated_metadata (bytes): dict of retrieved metadata + + """ + try: + result = { + 'id': rev['id'], + 'indexer_configuration_id': self.tools['id'], + 'translated_metadata': None + } + + root_dir = rev['directory'] + dir_ls = self.storage.directory_ls(root_dir, recursive=True) + files = (entry for entry in dir_ls if entry['type'] == 'file') + detected_files = detect_metadata(files) + result['translated_metadata'] = self.translate_revision_metadata( + detected_files) + except Exception as e: + self.log.exception( + 'Problem when indexing rev: ', e) + print(e) + + return result + + def persist_index_computations(self, results, policy_update): + """Persist the results in storage. + + Args: + results ([dict]): list of content_mimetype, dict with the + following keys: + - id (bytes): content's identifier (sha1) + - mimetype (bytes): mimetype in bytes + - encoding (bytes): encoding in bytes + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them + + """ + self.storage.revision_metadata_add( + results, conflict_update=(policy_update == 'update-dups')) + + def translate_revision_metadata(self, detected_files): + """ + Determine plan of action to translate metadata when containing + one or multiple detected files: + Args: + - detected_files : dict with context name and list of sha1s + (e.g : {'npm' : [sha1_1, sha1_2], + 'authors': sha1_3}) + + Returns: + - translated_metadata: dict with the CodeMeta vocabulary + """ + print(detected_files) + translated_metadata = [] + + # TODO: iterate on each context, on each file + # -> get raw_contents + # -> translate each content + for context in detected_files.keys(): + for sha1 in detected_files[context]: + try: + raw_content = self.objstorage.get(sha1) + except ObjNotFoundError: + self.log.warn('Content %s not found in objstorage' % + hashutil.hash_to_hex(sha1)) + # sends to raw_content 'swh-metadata-translator' + local_metadata = compute_metadata(context, raw_content) + # aggregating metadata + translated_metadata.append(local_metadata) + # for now this method doesn't call the ContentMetadataIndexer + # due to configuration issue that should be resolved with a better + # configuration management plan, should look like this + ##################################################################### + # send sha1s to ContentMetadataIndexer + # c_metadata_indexer = ContentMetadataIndexer() + # c_metadata_indexer.run(sha1s, policy_update='ignore-dups') + # translated_metadata = c_metadata_indexer.get_results() + ##################################################################### + # open questions: + # do we keep at revision level the translated_metadata of one file? + # we have a key in the swh-metadata-translator named 'other' + # to keep undefined categories, should we delete this ? + extract_minimal_metadata_dict(translated_metadata) + return translated_metadata + + +def main(): + rev_metadata_indexer = RevisionMetadataIndexer() + sha1_git1 = hashutil.hash_to_bytes( + '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f') + sha1_git2 = hashutil.hash_to_bytes( + '8dbb6aeb036e7fd80664eb8bfd1507881af1ba94') + sha1_gits = [sha1_git1, sha1_git2] + rev_metadata_indexer.run(sha1_gits, 'update-dups') + + +if __name__ == '__main__': + main() diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py index 801dc60..d514081 100644 --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,154 +1,154 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from subprocess import Popen, PIPE from swh.scheduler import utils -from .indexer import BaseIndexer +from .indexer import ContentIndexer def compute_mimetype_encoding(raw_content): """Determine mimetype and encoding from the raw content. Args: raw_content (bytes): content's raw data Returns: A dict with mimetype and encoding key and corresponding values. """ with Popen(['file', '--mime', '-'], stdin=PIPE, stdout=PIPE, stderr=PIPE) as p: properties, _ = p.communicate(raw_content) if properties: res = properties.split(b': ')[1].strip().split(b'; ') mimetype = res[0] encoding = res[1].split(b'=')[1] return { 'mimetype': mimetype, 'encoding': encoding } -class ContentMimetypeIndexer(BaseIndexer): +class ContentMimetypeIndexer(ContentIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ ADDITIONAL_CONFIG = { # chained queue message, e.g: # swh.indexer.tasks.SWHOrchestratorTextContentsTask 'destination_queue': ('str', None), 'tools': ('dict', { 'name': 'file', 'version': '5.22', 'configuration': { 'command_line': 'file --mime ', }, }), } CONFIG_BASE_FILENAME = 'indexer/mimetype' def prepare(self): super().prepare() destination_queue = self.config.get('destination_queue') if destination_queue: self.task_destination = utils.get_task(destination_queue) else: self.task_destination = None self.tools = self.retrieve_tools_information() - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_mimetype_missing(( { 'id': sha1, 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ properties = compute_mimetype_encoding(raw_content) properties.update({ 'id': sha1, 'indexer_configuration_id': self.tools['id'], }) return properties def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_mimetype_add( results, conflict_update=(policy_update == 'update-dups')) def _filter_text(self, results): """Filter sha1 whose raw content is text. """ for result in results: if b'binary' in result['encoding']: continue yield result['id'] def next_step(self, results): """When the computations is done, we'd like to send over only text contents to the text content orchestrator. Args: results ([dict]): List of content_mimetype results, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ if self.task_destination: self.task_destination.delay(list(self._filter_text(results))) @click.command() @click.option('--path', help="Path to execute index on") def main(path): with open(path, 'rb') as f: raw_content = f.read() print(compute_mimetype_encoding(raw_content)) if __name__ == '__main__': main() diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index 186e169..1b5cd18 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,128 +1,128 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.core.config import SWHConfig from swh.core.utils import grouper from swh.scheduler import utils from . import TASK_NAMES, INDEXER_CLASSES def get_class(clazz): """Get a symbol class dynamically by its fully qualified name string representation. """ parts = clazz.split('.') module = '.'.join(parts[:-1]) m = __import__(module) for comp in parts[1:]: m = getattr(m, comp) return m class BaseOrchestratorIndexer(SWHConfig): """The indexer orchestrator is in charge of dispatching batch of contents (filtered or not based on presence) to indexers. That dispatch is indexer specific, so the configuration reflects it: - when check_presence flag is true, filter out the contents already present for that indexer, otherwise send everything - broadcast those (filtered or not) contents to indexers in a batch_size fashioned For example: ```yaml indexers: mimetype: batch_size: 10 check_presence: false language: batch_size: 2 check_presence: true ``` means: - send all contents received as batch of size 10 to the 'mimetype' indexer - send only unknown contents as batch of size 2 to the 'language' indexer. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' DEFAULT_CONFIG = { 'indexers': ('dict', { 'mimetype': { 'batch_size': 10, 'check_presence': True, }, }), } def __init__(self): super().__init__() self.config = self.parse_config_file() indexer_names = list(self.config['indexers'].keys()) random.shuffle(indexer_names) indexers = {} tasks = {} for name in indexer_names: if name not in TASK_NAMES: raise ValueError('%s must be one of %s' % ( name, TASK_NAMES.keys())) opts = self.config['indexers'][name] indexers[name] = ( INDEXER_CLASSES[name], opts['check_presence'], opts['batch_size']) tasks[name] = utils.get_task(TASK_NAMES[name]) self.indexers = indexers self.tasks = tasks def run(self, sha1s): for name, (idx_class, filtering, batch_size) in self.indexers.items(): if filtering: policy_update = 'ignore-dups' indexer_class = get_class(idx_class) - sha1s_filtered = list(indexer_class().filter_contents(sha1s)) + sha1s_filtered = list(indexer_class().filter(sha1s)) if not sha1s_filtered: continue else: policy_update = 'update-dups' sha1s_filtered = sha1s celery_tasks = [] for sha1s_to_send in grouper(sha1s_filtered, batch_size): celery_task = self.tasks[name].s( sha1s=list(sha1s_to_send), policy_update=policy_update) celery_tasks.append(celery_task) group(celery_tasks).delay() class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of any types of contents. """ class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of text contents. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py index 2c90bbd..630fd68 100644 --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -1,199 +1,278 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import logging from nose.tools import istest from swh.indexer.metadata_dictionary import compute_metadata from swh.indexer.metadata import ContentMetadataIndexer from swh.indexer.tests.test_utils import MockObjStorage class MockStorage(): """Mock storage to simplify reading indexers' outputs. """ def content_metadata_add(self, metadata, conflict_update=None): self.state = metadata self.conflict_update = conflict_update + def revision_metadata_add(self, metadata, conflict_update=None): + self.state = metadata + self.conflict_update = conflict_update + def indexer_configuration_get(self, tool): - return { - 'id': 30, - 'name': 'hard_mapping_npm', - 'version': '0.1', - 'configuration': { - 'type': 'local', - 'context': 'npm' - }, - } + if tool['tool_name'] == 'swh-metadata-translator': + return { + 'id': 30, + 'name': 'swh-metadata-translator', + 'version': '0.1', + 'configuration': { + 'type': 'local', + 'context': 'npm' + }, + } + elif tool['tool_name'] == 'swh-metadata-detector': + return { + 'id': 7, + 'name': 'swh-metadata-detector', + 'version': '0.1', + 'configuration': { + 'type': 'local', + 'context': 'npm' + }, + } + + def revision_missing(self, revisions, cur=None): + pass + + def revision_get(self, revisions): + """Get all revisions from storage + Args: an iterable of revision ids + Returns: an iterable of revisions as dictionaries + (or None if the revision doesn't exist) + """ + pass + + def directory_get(self, + directories, + cur=None): + """Get information on directories. + + Args: + - directories: an iterable of directory ids + + Returns: + List of directories as dict with keys and associated values. + + """ + pass + + def directory_ls(self, directory, recursive=False, cur=None): + """Get entries for one directory. + + Args: + - directory: the directory to list entries from. + - recursive: if flag on, this list recursively from this directory. + + Returns: + List of entries for such directory. + + """ + pass class TestMetadataIndexer(ContentMetadataIndexer): """Specific Metadata whose configuration is enough to satisfy the indexing tests. """ def prepare(self): self.config = { 'rescheduling_task': None, 'tools': { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': { 'type': 'local', 'context': 'npm' } } } self.storage = MockStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.task_destination = None self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.retrieve_tools_information() + self.results = [] + + +# class TestRevisionMetadataIndexer(RevsionMetadataIndexer): +# """Specific indexer whose configuration is enough to satisfy the +# indexing tests. +# """ +# def prepare(self): +# self.config = { +# 'rescheduling_task': None, +# 'tools': { +# 'name': 'swh-metadata-detector', +# 'version': '0.0.1', +# 'configuration': { +# 'type': 'local', +# 'context': 'npm' +# } +# } +# } +# self.storage = MockStorage() +# self.log = logging.getLogger('swh.indexer') +# self.objstorage = MockObjStorage() +# self.task_destination = None +# self.rescheduling_task = self.config['rescheduling_task'] +# self.tools = self.retrieve_tools_information() +# self.results = [] class Metadata(unittest.TestCase): """ Tests metadata_mock_tool tool for Metadata detection """ def setUp(self): """ shows the entire diff in the results """ self.maxDiff = None @istest def test_compute_metadata_none(self): """ testing content empty content is empty should return None """ # given content = b"" context = "npm" # None if no metadata was found or an error occurred declared_metadata = None # when result = compute_metadata(context, content) # then self.assertEqual(declared_metadata, result) @istest def test_compute_metadata_npm(self): """ testing only computation of metadata with hard_mapping_npm """ # given content = b""" { "name": "test_metadata", "version": "0.0.1", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" } } """ declared_metadata = { 'name': 'test_metadata', 'version': '0.0.1', 'description': 'Simple package.json test for indexer', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'other': {} } # when result = compute_metadata("npm", content) # then self.assertEqual(declared_metadata, result) @istest def test_index_content_metadata_npm(self): """ testing NPM with package.json - one sha1 uses a file that can't be translated to metadata and should return None in the translated metadata """ # given sha1s = ['26a9f72a7c87cc9205725cfd879f514ff4f3d8d5', 'd4c647f0fc257591cc9ba1722484229780d1c607', '02fb2c89e14f7fab46701478c83779c7beb7b069'] # this metadata indexer computes only metadata for package.json # in npm context with a hard mapping metadata_indexer = TestMetadataIndexer() # when metadata_indexer.run(sha1s, policy_update='ignore-dups') results = metadata_indexer.storage.state expected_results = [{ 'indexer_configuration_id': 30, 'translated_metadata': { 'other': {}, 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'description': 'Simple package.json test for indexer', 'name': 'test_metadata', 'version': '0.0.1' }, 'id': '26a9f72a7c87cc9205725cfd879f514ff4f3d8d5' }, { 'indexer_configuration_id': 30, 'translated_metadata': { 'softwareRequirements': { 'JSONStream': '~1.3.1', 'abbrev': '~1.1.0', 'ansi-regex': '~2.1.1', 'ansicolors': '~0.3.2', 'ansistyles': '~0.1.3' }, 'issueTracker': { 'url': 'https://github.com/npm/npm/issues' }, 'author': 'Isaac Z. Schlueter (http://blog.izs.me)', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/npm/npm' }, 'description': 'a package manager for JavaScript', 'softwareSuggestions': { 'tacks': '~1.2.6', 'tap': '~10.3.2' }, 'license': 'Artistic-2.0', 'version': '5.0.3', 'other': { 'preferGlobal': True, 'config': { 'publishtest': False } }, 'name': 'npm', 'keywords': [ 'install', 'modules', 'package manager', 'package.json' ], 'url': 'https://docs.npmjs.com/' }, 'id': 'd4c647f0fc257591cc9ba1722484229780d1c607' }, { 'indexer_configuration_id': 30, 'translated_metadata': None, 'id': '02fb2c89e14f7fab46701478c83779c7beb7b069' }] # The assertion bellow returns False sometimes because of nested lists self.assertEqual(expected_results, results)