diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index a114d97..29295f9 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,386 +1,397 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil from swh.storage import get_storage from swh.scheduler.utils import get_task class DiskIndexer: """Mixin intended to be used with other SomethingIndexer classes. - Indexers inheriting from this class are a category of indexers which - needs the disk for their computations. + Indexers inheriting from this class are a category of indexers + which needs the disk for their computations. Note: - expects `self.working_directory` variable defined at runtime. + This expects `self.working_directory` variable defined at + runtime. """ - def __init__(self): - super().__init__() - def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. - The main entry point is the `run` functions which is in charge to - trigger the computations on the ids batch received. + The main entry point is the :func:`run` function which is in + charge of triggering the computations on the batch dict/ids + received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - - index this data depending on the object and store the result in storage. + - index this data depending on the object and store the result in + storage. - To implement a new object type indexer, inherit from the BaseIndexer and - implement the process of indexation: + To implement a new object type indexer, inherit from the + BaseIndexer and implement the process of indexation: - def run(self, object_ids, policy_update) + :func:`run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin - To implement a new concrete indexer, inherit from the object level classes: - ContentIndexer, RevisionIndexer - (later on OriginIndexer will also be available) + To implement a new concrete indexer, inherit from the object level + classes: :class:`ContentIndexer`, :class:`RevisionIndexer` (later + on :class:`OriginIndexer` will also be available) Then you need to implement the following functions: - def filter(self, ids) + :func:`filter`: filter out data already indexed (in storage). This function is used by the orchestrator and not directly by the indexer (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). - def index_object(self, id, data) + :func:`index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. - def persist_index_computations(self, results, policy_update) + :func:`persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: - def prepare(self) + :func:`prepare`: Configuration preparation for the indexer. When overriding, this must - call the super().prepare() function. + call the `super().prepare()` instruction. - def check(self) + :func:`check`: Configuration check for the indexer. When overriding, this must call the - super().check() function. + `super().check()` instruction. - def retrieve_tools_information(self) + :func:`retrieve_tools_information`: This should return a dict of the tool(s) to use when indexing or filtering. """ CONFIG = 'indexer/base' DEFAULT_CONFIG = { 'storage': ('dict', { 'host': 'uffizi', 'cls': 'remote', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'} }), # queue to reschedule if problem (none for no rescheduling, # the default) 'rescheduling_task': ('str', None), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): """Prepare and check that the indexer is ready to run. """ super().__init__() self.prepare() self.check() def prepare(self): """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) storage = self.config['storage'] self.storage = get_storage(storage['cls'], storage['args']) rescheduling_task = self.config['rescheduling_task'] if rescheduling_task: self.rescheduling_task = get_task(rescheduling_task) else: self.rescheduling_task = None l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = self.retrieve_tools_information() def check(self): """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if not self.tools: raise ValueError('Tools %s is unknown, cannot continue' % self.config['tools']) def retrieve_tools_information(self): """Permit to define how to retrieve tool information based on configuration. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) """ tool = { 'tool_%s' % key: value for key, value in self.config['tools'].items() } return self.storage.indexer_configuration_get(tool) @abc.abstractmethod def filter(self, ids): """Filter missing ids for that particular indexer. Args: ids ([bytes]): list of ids Yields: iterator of missing ids """ pass @abc.abstractmethod def index(self, id, data): """Index computation for the id and associated raw data. Args: - id (bytes): identifier + id (bytes): identifier data (bytes): id's data from storage or objstorage depending on object type Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: + results ([result]): List of results. One result is the - result of the index function. + result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them + respectively update duplicates or ignore + them Returns: None """ pass def next_step(self, results): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned - by index function. + by index function. Returns: None """ pass @abc.abstractmethod def run(self, ids, policy_update): """Given a list of ids: + - retrieves the data from the storage - executes the indexing computations - stores the results (according to policy_update) Args: ids ([bytes]): id's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ pass class ContentIndexer(BaseIndexer): - """ - An object type indexer, inherits from the BaseIndexer and - implements the process of indexation for Contents using the run method + """An object type indexer, inherits from the :class:`BaseIndexer` and + implements the process of indexation for Contents using the run + method + + Note: the :class:`ContentIndexer` is not an instantiable + object. To use it in another context, one should inherit from this + class and override the methods mentioned in the + :class:`BaseIndexer` class. - Note: the ContentIndexer is not an instantiable object - to use it in another context one should inherit from this class and - override the methods mentioned in the BaseIndexer class """ def run(self, sha1s, policy_update): """Given a list of sha1s: + - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: sha1s ([bytes]): sha1's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them + respectively update duplicates or ignore + them """ results = [] try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warn('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue res = self.index(sha1, raw_content) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) self.next_step(results) except Exception: self.log.exception( 'Problem when reading contents metadata.') if self.rescheduling_task: self.log.warn('Rescheduling batch') self.rescheduling_task.delay(sha1s, policy_update) class RevisionIndexer(BaseIndexer): - """ - An object type indexer, inherits from the BaseIndexer and - implements the process of indexation for Revisions using the run method + """An object type indexer, inherits from the :class:`BaseIndexer` and + implements the process of indexation for Revisions using the run + method + + Note: the :class:`RevisionIndexer` is not an instantiable object. + To use it in another context one should inherit from this class + and override the methods mentioned in the :class:`BaseIndexer` + class. - Note: the RevisionIndexer is not an instantiable object - to use it in another context one should inherit from this class and - override the methods mentioned in the BaseIndexer class """ def run(self, sha1_gits, policy_update): - """ - Given a list of sha1_gits: + """Given a list of sha1_gits: + - retrieve revsions from storage - execute the indexing computations - store the results (according to policy_update) + Args: sha1_gits ([bytes]): sha1_git's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them + respectively update duplicates or ignore + them """ results = [] revs = self.storage.revision_get(sha1_gits) for rev in revs: if not rev: self.log.warn('Revisions %s not found in storage' % list(map(hashutil.hash_to_hex, sha1_gits))) continue try: res = self.index(rev) if res: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing revision') self.persist_index_computations(results, policy_update) diff --git a/swh/indexer/language.py b/swh/indexer/language.py index 8028c6e..18fa13d 100644 --- a/swh/indexer/language.py +++ b/swh/indexer/language.py @@ -1,206 +1,207 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io from pygments.lexers import guess_lexer from pygments.util import ClassNotFound from chardet.universaldetector import UniversalDetector from .indexer import ContentIndexer def _cleanup_classname(classname): """Determine the language from the pygments' lexer names. """ return classname.lower().replace(' ', '-') def _read_raw(raw_content, size=2048): """Read raw content in chunk. """ bs = io.BytesIO(raw_content) while True: chunk = bs.read(size) if not chunk: break yield chunk def _detect_encoding(raw_content): """Given a raw content, try and detect its encoding. """ detector = UniversalDetector() for chunk in _read_raw(raw_content): detector.feed(chunk) if detector.done: break detector.close() return detector.result['encoding'] def compute_language_from_chunk(encoding, length, raw_content, max_size, log=None): """Determine the raw content's language. Args: encoding (str): Encoding to use to decode the content length (int): raw_content's length raw_content (bytes): raw content to work with max_size (int): max size to split the raw content at Returns: Dict with keys: - lang: None if nothing found or the possible language """ try: if max_size <= length: raw_content = raw_content[0:max_size] content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) except ClassNotFound: lang = None except UnicodeDecodeError: raise except Exception: if log: log.exception('Problem during language detection, skipping') lang = None return { 'lang': lang } def compute_language(raw_content, encoding=None, log=None): """Determine the raw content's language. Args: raw_content (bytes): raw content to work with Returns: Dict with keys: - lang: None if nothing found or the possible language """ try: encoding = _detect_encoding(raw_content) content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) except ClassNotFound: lang = None except Exception: if log: log.exception('Problem during language detection, skipping') lang = None return { 'lang': lang } class ContentLanguageIndexer(ContentIndexer): """Indexer in charge of: + - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ CONFIG_BASE_FILENAME = 'indexer/language' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'pygments', 'version': '2.0.1+dfsg-1.1+deb8u1', 'configuration': { 'type': 'library', 'debian-package': 'python3-pygments', 'max_content_size': 10240, }, }), } def prepare(self): super().prepare() c = self.config self.max_content_size = c['tools']['configuration']['max_content_size'] def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_language_missing(( { 'id': sha1, 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language """ result = { 'id': sha1, 'indexer_configuration_id': self.tools['id'], 'lang': None, } encoding = _detect_encoding(raw_content) if not encoding: return result l = len(raw_content) for i in range(0, 9): max_size = self.max_content_size + i try: result = compute_language_from_chunk( encoding, l, raw_content, max_size, log=self.log) except UnicodeDecodeError: self.log.warn('Decoding failed on wrong byte chunk at [0-%s]' ', trying again at next ending byte.' % max_size) continue # we found something, so we return it result.update({ 'id': sha1, 'indexer_configuration_id': self.tools['id'], }) break return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_language_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py index d514081..b34631d 100644 --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,154 +1,159 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from subprocess import Popen, PIPE from swh.scheduler import utils from .indexer import ContentIndexer def compute_mimetype_encoding(raw_content): """Determine mimetype and encoding from the raw content. Args: raw_content (bytes): content's raw data Returns: A dict with mimetype and encoding key and corresponding values. """ with Popen(['file', '--mime', '-'], stdin=PIPE, stdout=PIPE, stderr=PIPE) as p: properties, _ = p.communicate(raw_content) if properties: res = properties.split(b': ')[1].strip().split(b'; ') mimetype = res[0] encoding = res[1].split(b'=')[1] return { 'mimetype': mimetype, 'encoding': encoding } class ContentMimetypeIndexer(ContentIndexer): """Indexer in charge of: + - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ ADDITIONAL_CONFIG = { # chained queue message, e.g: # swh.indexer.tasks.SWHOrchestratorTextContentsTask 'destination_queue': ('str', None), 'tools': ('dict', { 'name': 'file', 'version': '5.22', 'configuration': { 'command_line': 'file --mime ', }, }), } CONFIG_BASE_FILENAME = 'indexer/mimetype' def prepare(self): super().prepare() destination_queue = self.config.get('destination_queue') if destination_queue: self.task_destination = utils.get_task(destination_queue) else: self.task_destination = None self.tools = self.retrieve_tools_information() def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_mimetype_missing(( { 'id': sha1, 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: + - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ properties = compute_mimetype_encoding(raw_content) properties.update({ 'id': sha1, 'indexer_configuration_id': self.tools['id'], }) return properties def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: + - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes + policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_mimetype_add( results, conflict_update=(policy_update == 'update-dups')) def _filter_text(self, results): """Filter sha1 whose raw content is text. """ for result in results: if b'binary' in result['encoding']: continue yield result['id'] def next_step(self, results): """When the computations is done, we'd like to send over only text contents to the text content orchestrator. Args: results ([dict]): List of content_mimetype results, dict with the following keys: + - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ if self.task_destination: self.task_destination.delay(list(self._filter_text(results))) @click.command() @click.option('--path', help="Path to execute index on") def main(path): with open(path, 'rb') as f: raw_content = f.read() print(compute_mimetype_encoding(raw_content)) if __name__ == '__main__': main() diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index 1411443..cbf4667 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,123 +1,124 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.core.config import SWHConfig from swh.core.utils import grouper from swh.scheduler import utils from . import TASK_NAMES, INDEXER_CLASSES def get_class(clazz): """Get a symbol class dynamically by its fully qualified name string representation. """ parts = clazz.split('.') module = '.'.join(parts[:-1]) m = __import__(module) for comp in parts[1:]: m = getattr(m, comp) return m class BaseOrchestratorIndexer(SWHConfig): """The indexer orchestrator is in charge of dispatching batch of - contents (filtered or not based on presence) to indexers. + contents (filtered or not based on presence) to indexers. That dispatch is indexer specific, so the configuration reflects it: - - when check_presence flag is true, filter out the contents - already present for that indexer, otherwise send everything + - when `check_presence` flag is true, filter out the + contents already present for that indexer, otherwise send + everything - broadcast those (filtered or not) contents to indexers in a - batch_size fashioned + `batch_size` fashioned For example:: indexers: mimetype: batch_size: 10 check_presence: false language: batch_size: 2 check_presence: true means: - send all contents received as batch of size 10 to the 'mimetype' indexer - send only unknown contents as batch of size 2 to the 'language' indexer. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' DEFAULT_CONFIG = { 'indexers': ('dict', { 'mimetype': { 'batch_size': 10, 'check_presence': True, }, }), } def __init__(self): super().__init__() self.config = self.parse_config_file() indexer_names = list(self.config['indexers'].keys()) random.shuffle(indexer_names) indexers = {} tasks = {} for name in indexer_names: if name not in TASK_NAMES: raise ValueError('%s must be one of %s' % ( name, TASK_NAMES.keys())) opts = self.config['indexers'][name] indexers[name] = ( INDEXER_CLASSES[name], opts['check_presence'], opts['batch_size']) tasks[name] = utils.get_task(TASK_NAMES[name]) self.indexers = indexers self.tasks = tasks def run(self, ids): for name, (idx_class, filtering, batch_size) in self.indexers.items(): if filtering: policy_update = 'ignore-dups' indexer_class = get_class(idx_class) ids_filtered = list(indexer_class().filter(ids)) if not ids_filtered: continue else: policy_update = 'update-dups' ids_filtered = ids celery_tasks = [] for ids_to_send in grouper(ids_filtered, batch_size): celery_task = self.tasks[name].s( ids=list(ids_to_send), policy_update=policy_update) celery_tasks.append(celery_task) group(celery_tasks).delay() class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of any types of contents. """ class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of text contents. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator_text'