diff --git a/swh/indexer/ctags.py b/swh/indexer/ctags.py index b99c7b2..763c70e 100644 --- a/swh/indexer/ctags.py +++ b/swh/indexer/ctags.py @@ -1,164 +1,160 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import subprocess import json from swh.model import hashutil from .language import compute_language from .indexer import BaseIndexer, DiskIndexer # Options used to compute tags __FLAGS = [ '--fields=+lnz', # +l: language # +n: line number of tag definition # +z: include the symbol's kind (function, variable, ...) '--sort=no', # sort output on tag name '--links=no', # do not follow symlinks '--output-format=json', # outputs in json ] def run_ctags(path, lang=None, ctags_command='ctags'): """Run ctags on file path with optional language. Args: path: path to the file lang: language for that path (optional) Returns: ctags' output """ optional = [] if lang: optional = ['--language-force=%s' % lang] cmd = [ctags_command] + __FLAGS + optional + [path] output = subprocess.check_output(cmd, universal_newlines=True) for symbol in output.split('\n'): if not symbol: continue js_symbol = json.loads(symbol) yield { 'name': js_symbol['name'], 'kind': js_symbol['kind'], 'line': js_symbol['line'], 'lang': js_symbol['language'], } class CtagsIndexer(BaseIndexer, DiskIndexer): CONFIG_BASE_FILENAME = 'indexer/ctags' ADDITIONAL_CONFIG = { 'workdir': ('str', '/tmp/swh/indexer.ctags'), - 'tool': ('dict', { + 'tools': ('dict', { 'name': 'universal-ctags', 'version': '~git7859817b', - 'command': '/usr/bin/ctags', + 'configuration': { + 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' + '''--output-format=json ''' + }, }), 'languages': ('dict', { 'ada': 'Ada', 'adl': None, 'agda': None, # ... }) } def __init__(self): super().__init__() self.working_directory = self.config['workdir'] self.language_map = self.config['languages'] - self.ctags_command = self.config['tool']['command'] - self.tool_name = self.config['tool']['name'] - self.tool_version = self.config['tool']['version'] def filter_contents(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_ctags_missing(( { 'id': sha1, - 'tool_name': self.tool_name, - 'tool_version': self.tool_version + 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) def index_content(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - ctags ([dict]): ctags list of symbols """ lang = compute_language(raw_content)['lang'] if not lang: return None ctags_lang = self.language_map.get(lang) if not ctags_lang: return None ctags = { 'id': sha1, } filename = hashutil.hash_to_hex(sha1) content_path = self.write_to_temp( filename=filename, data=raw_content) - result = run_ctags(content_path, - lang=ctags_lang, - ctags_command=self.ctags_command) + result = run_ctags(content_path, lang=ctags_lang) ctags.update({ 'ctags': list(result), - 'tool_name': self.tool_name, - 'tool_version': self.tool_version, + 'indexer_configuration_id': self.tools['id'], }) self.cleanup(content_path) return ctags def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - ctags ([dict]): ctags list of symbols policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_ctags_add( results, conflict_update=(policy_update == 'update-dups')) @click.command() @click.option('--path', help="Path to execute index on") def main(path): r = list(run_ctags(path)) print(r) if __name__ == '__main__': main() diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py index 68efa6c..d06703c 100644 --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -1,151 +1,147 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import subprocess from swh.model import hashutil from .indexer import BaseIndexer, DiskIndexer -def compute_license(tool, path, log=None): +def compute_license(path, log=None): """Determine license from file at path. Args: path: filepath to determine the license Returns: A dict with the following keys: - licenses ([str]): associated detected licenses to path - path (bytes): content filepath - tool (str): tool used to compute the output """ try: - properties = subprocess.check_output([tool, path], + properties = subprocess.check_output(['nomossa', path], universal_newlines=True) if properties: res = properties.rstrip().split(' contains license(s) ') licenses = res[1].split(',') return { 'licenses': licenses, 'path': path, } except subprocess.CalledProcessError: if log: from os import path as __path log.exception('Problem during license detection for sha1 %s' % __path.basename(path)) return { 'licenses': [], 'path': path, } class ContentFossologyLicenseIndexer(BaseIndexer, DiskIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {license, encoding} from that content - store result in storage """ ADDITIONAL_CONFIG = { 'workdir': ('str', '/tmp/swh/indexer.fossology.license'), - 'tool': ('dict', { + 'tools': ('dict', { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', - 'command': '/usr/bin/nomossa', + 'configuration': { + 'command_line': 'nomossa ', + }, }), } CONFIG_BASE_FILENAME = 'indexer/fossology_license' def __init__(self): super().__init__() self.working_directory = self.config['workdir'] - self.tool = self.config['tool']['command'] - self.tool_name = self.config['tool']['name'] - self.tool_version = self.config['tool']['version'] def filter_contents(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_fossology_license_missing(( { 'id': sha1, - 'tool_name': self.tool_name, - 'tool_version': self.tool_version + 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) - def index_content(self, sha1, content): + def index_content(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier - content (bytes): raw content in bytes + raw_content (bytes): raw content in bytes Returns: A dict, representing a content_license, with keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path """ filename = hashutil.hash_to_hex(sha1) content_path = self.write_to_temp( filename=filename, - data=content) + data=raw_content) try: - properties = compute_license(self.tool, path=content_path, - log=self.log) + properties = compute_license(path=content_path, log=self.log) properties.update({ 'id': sha1, - 'tool_name': self.tool_name, - 'tool_version': self.tool_version, + 'indexer_configuration_id': self.tools['id'], }) finally: self.cleanup(content_path) return properties def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_license, dict with the following keys: - id (bytes): content's identifier (sha1) - license (bytes): license in bytes - path (bytes): path policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ wrong_licenses = self.storage.content_fossology_license_add( results, conflict_update=(policy_update == 'update-dups')) if wrong_licenses: for l in wrong_licenses: self.log.warn('Content %s has some unknown licenses: %s' % ( hashutil.hash_to_hex(l['id']), ','.join((name for name in l['licenses']))) ) @click.command(help='Compute license for path using tool') @click.option('--tool', default='nomossa', help="Path to tool") @click.option('--path', required=1, help="Path to execute index on") def main(tool, path): print(compute_license(tool, path)) if __name__ == '__main__': main() diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 6be6048..e55a5d6 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,260 +1,279 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil from swh.storage import get_storage from swh.scheduler.utils import get_task class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the `run` functions which is in charge to trigger the computations on the sha1s batch received. Indexers can: - filter out sha1 whose data has already been indexed. - retrieve sha1's content from objstorage, index this content then store the result in storage. To implement a new index, inherit from this class and implement the following functions: - def filter_contents(self, sha1s): filter out data already indexed (in storage). This function is used by the orchestrator and not directly by the indexer (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). - def index_content(self, sha1, raw_content): compute index on sha1 with data raw_content (retrieved in the objstorage by the sha1 key) and return the resulting index computation. - def persist_index_computations(self, results, policy_update): persist the results of multiple index computations in the storage. """ CONFIG = 'indexer/base' DEFAULT_CONFIG = { 'storage': ('dict', { 'host': 'uffizi', 'cls': 'remote', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'} }), # queue to reschedule if problem (none for no rescheduling, # the default) 'rescheduling_task': ('str', None), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) storage = self.config['storage'] self.storage = get_storage(storage['cls'], storage['args']) rescheduling_task = self.config['rescheduling_task'] if rescheduling_task: self.rescheduling_task = get_task(rescheduling_task) else: self.rescheduling_task = None l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') + self.tools = self.retrieve_tools_information() + if not self.tools: + raise ValueError('Tools %s is unknown, cannot continue' % + self.config['tools']) + + def retrieve_tools_information(self): + """Permit to define how to retrieve tool information based on + configuration. + + Add a sensible default which can be overridden if not + sufficient. (For now, all indexers use only one tool) + + """ + tool = { + 'tool_%s' % key: value for key, value + in self.config['tools'].items() + } + return self.storage.indexer_configuration_get(tool) + @abc.abstractmethod def filter_contents(self, sha1s): """Filter missing sha1 for that particular indexer. Args: sha1s ([bytes]): list of contents' sha1 Yields: iterator of missing sha1 """ pass @abc.abstractmethod def index_content(self, sha1, content): """Index computation for the sha1 and associated raw content. Args: sha1 (bytes): sha1 identifier content (bytes): sha1's raw content Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index_content function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them Returns: None """ pass def next_step(self, results): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned by index_content function. Returns: None """ pass def run(self, sha1s, policy_update): """Given a list of sha1s: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: sha1s ([bytes]): sha1's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ results = [] try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warn('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue res = self.index_content(sha1, raw_content) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) self.next_step(results) except Exception: self.log.exception( 'Problem when reading contents metadata.') if self.rescheduling_task: self.log.warn('Rescheduling batch') self.rescheduling_task.delay(sha1s, policy_update) class DiskIndexer: """Mixin intended to be used with other *Indexer classes. Indexer* inheriting from this class are a category of indexers which needs the disk for their computations. Expects: self.working_directory variable defined at runtime. """ def __init__(self): super().__init__() def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) diff --git a/swh/indexer/language.py b/swh/indexer/language.py index aee6891..8ab6e54 100644 --- a/swh/indexer/language.py +++ b/swh/indexer/language.py @@ -1,150 +1,151 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io from pygments.lexers import guess_lexer from chardet.universaldetector import UniversalDetector from .indexer import BaseIndexer def _cleanup_classname(classname): """Determine the language from the pygments' lexer names. """ return classname.lower().replace(' ', '-') def _read_raw(raw_content, size=2048): """Read raw content in chunk. """ bs = io.BytesIO(raw_content) while True: chunk = bs.read(size) if not chunk: break yield chunk def _detect_encoding(raw_content): """Given a raw content, try and detect its encoding. """ detector = UniversalDetector() for chunk in _read_raw(raw_content): detector.feed(chunk) if detector.done: break detector.close() return detector.result['encoding'] def compute_language(raw_content, log=None): """Determine the raw content's language. Args: raw_content (bytes): content to determine raw content Returns: Dict with keys: - lang: None if nothing found or the possible language - decoding_failure: True if a decoding failure happened """ try: encoding = _detect_encoding(raw_content) content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) return { 'lang': lang } except Exception: if log: log.exception('Problem during language detection, skipping') return { 'lang': None } class ContentLanguageIndexer(BaseIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ CONFIG_BASE_FILENAME = 'indexer/language' ADDITIONAL_CONFIG = { - 'tool': ('dict', { + 'tools': ('dict', { 'name': 'pygments', 'version': '2.0.1+dfsg-1.1+deb8u1', - 'max_content_size': 10240, + 'configuration': { + 'type': 'library', + 'debian-package': 'python3-pygments', + 'max_content_size': 10240, + }, }), } def __init__(self): super().__init__() - self.tool_name = self.config['tool']['name'] - self.tool_version = self.config['tool']['version'] - self.max_content_size = self.config['tool']['max_content_size'] + c = self.config + self.max_content_size = c['tools']['configuration']['max_content_size'] def filter_contents(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_language_missing(( { 'id': sha1, - 'tool_name': self.tool_name, - 'tool_version': self.tool_version + 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) def index_content(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language """ l = len(raw_content) if self.max_content_size <= l: raw_content = raw_content[0:self.max_content_size] result = compute_language(raw_content, log=self.log) result.update({ 'id': sha1, - 'tool_name': self.tool_name, - 'tool_version': self.tool_version, + 'indexer_configuration_id': self.tools['id'], }) return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_language_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py index e11fbb7..16890bf 100644 --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,154 +1,151 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from subprocess import Popen, PIPE from swh.scheduler import utils from .indexer import BaseIndexer def compute_mimetype_encoding(raw_content): """Determine mimetype and encoding from the raw content. Args: raw_content (bytes): content's raw data Returns: A dict with mimetype and encoding key and corresponding values. """ with Popen(['file', '--mime', '-'], stdin=PIPE, stdout=PIPE, stderr=PIPE) as p: properties, _ = p.communicate(raw_content) if properties: res = properties.split(b': ')[1].strip().split(b'; ') mimetype = res[0] encoding = res[1].split(b'=')[1] return { 'mimetype': mimetype, 'encoding': encoding } class ContentMimetypeIndexer(BaseIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ ADDITIONAL_CONFIG = { # chained queue message, e.g: # swh.indexer.tasks.SWHOrchestratorTextContentsTask 'destination_queue': ('str', None), - 'tool': ('dict', { + 'tools': ('dict', { 'name': 'file', - 'version': '5.22' + 'version': '5.22', + 'configuration': 'file --mime ', }), } CONFIG_BASE_FILENAME = 'indexer/mimetype' def __init__(self): super().__init__() destination_queue = self.config.get('destination_queue') if destination_queue: self.task_destination = utils.get_task(destination_queue) else: self.task_destination = None - self.tool_name = self.config['tool']['name'] - self.tool_version = self.config['tool']['version'] def filter_contents(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_mimetype_missing(( { 'id': sha1, - 'tool_name': self.tool_name, - 'tool_version': self.tool_version + 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) def index_content(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ properties = compute_mimetype_encoding(raw_content) properties.update({ 'id': sha1, - 'tool_name': self.tool_name, - 'tool_version': self.tool_version, + 'indexer_configuration_id': self.tools['id'], }) return properties def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_mimetype_add( results, conflict_update=(policy_update == 'update-dups')) def _filter_text(self, results): """Filter sha1 whose raw content is text. """ for result in results: if b'binary' in result['encoding']: continue yield result['id'] def next_step(self, results): """When the computations is done, we'd like to send over only text contents to the text content orchestrator. Args: results ([dict]): List of content_mimetype results, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ if self.task_destination: self.task_destination.delay(list(self._filter_text(results))) @click.command() @click.option('--path', help="Path to execute index on") def main(path): with open(path, 'rb') as f: raw_content = f.read() print(compute_mimetype_encoding(raw_content)) if __name__ == '__main__': main()