diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 543c5be..0280bbc 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,235 +1,239 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile from swh.core import hashutil from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.storage import get_storage class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the `run` functions which is in charge to trigger the computations on the sha1s batch receiived as parameter. Indexers can: - filter out sha1 whose data has already been indexed. - retrieve sha1's content from objstorage, index this content then store the result in storage. Thus the following interface to implement per inheriting class: - def filter_contents(self, sha1s): filter out data already indexed (in storage) - def index_content(self, sha1, content): compute index on sha1 with data content (stored by sha1 in objstorage) and store result in storage. - - def persist_index_computations(self, results): + - def persist_index_computations(self, results, policy_update): the function to store the results (as per index_content defined). """ CONFIG_BASE_FILENAME = 'indexer/base' DEFAULT_CONFIG = { 'storage': ('dict', { 'host': 'uffizi', 'cls': 'pathslicing', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'} }), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) storage = self.config['storage'] self.storage = get_storage(storage['cls'], storage['args']) l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') @abc.abstractmethod def filter_contents(self, sha1s): """Filter missing sha1 for that particular indexer. Args: sha1s ([bytes]): list of contents' sha1 Yields: iterator of missing sha1 """ pass @abc.abstractmethod def index_content(self, sha1, content): """Index computation for the sha1 and associated raw content. Args: sha1 (bytes): sha1 identifier content (bytes): sha1's raw content Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod - def persist_index_computations(self, results): + def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index_content function. + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them Returns: None """ pass def next_step(self, results): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned by index_content function. Returns: None """ pass - def run(self, sha1s): + def run(self, sha1s, policy_update): """Given a list of sha1s: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: sha1s ([bytes]): sha1's identifier list + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them """ results = [] for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warn('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue res = self.index_content(sha1, raw_content) results.append(res) - self.persist_index_computations(results) + self.persist_index_computations(results, policy_update) self.next_step(results) class DiskIndexer: """Mixin intended to be used with other *Indexer classes. Indexer* inheriting from this class are a category of indexers which needs the disk for their computations. Expects: self.working_directory variable defined at runtime. """ def __init__(self): super().__init__() def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) diff --git a/swh/indexer/language.py b/swh/indexer/language.py index 35ba852..eb065de 100644 --- a/swh/indexer/language.py +++ b/swh/indexer/language.py @@ -1,95 +1,97 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from pygments.lexers import guess_lexer from chardet import detect from .indexer import BaseIndexer def _cleanup_classname(classname): """Determine the language from the pygments' lexer names. """ return classname.lower().replace(' ', '-') def compute_language(raw_content): """Determine the raw content's language. Args: raw_content (bytes): content to determine raw content Returns: Dict with keys: - lang: None if nothing found or the possible language - decoding_failure: True if a decoding failure happened """ try: stats = detect(raw_content) encoding = stats['encoding'] content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) return { 'lang': lang } except Exception: return { 'lang': None } class ContentLanguageIndexer(BaseIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ def __init__(self): super().__init__() def filter_contents(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_language_missing(sha1s) def index_content(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language """ result = compute_language(raw_content) result.update({ 'id': sha1, }) return result - def persist_index_computations(self, results): + def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: - results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them """ - self.storage.content_language_add(results) + self.storage.content_language_add( + results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py index 2a8aec4..01378a2 100644 --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,137 +1,139 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import subprocess from swh.core import hashutil from swh.scheduler.celery_backend.config import app from .indexer import BaseIndexer, DiskIndexer def compute_mimetype_encoding(path): """Determine mimetype and encoding from file at path. Args: path: filepath to determine the mime type Returns: A dict with mimetype and encoding key and corresponding values. """ cmd = ['file', '--mime', path] properties = subprocess.check_output(cmd) if properties: res = properties.split(b': ')[1].strip().split(b'; ') mimetype = res[0] encoding = res[1].split(b'=')[1] return { 'mimetype': mimetype, 'encoding': encoding } class ContentMimetypeIndexer(BaseIndexer, DiskIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ ADDITIONAL_CONFIG = { 'workdir': ('str', '/tmp/swh/indexer.mimetype'), 'destination_queue': ( 'str', 'swh.indexer.tasks.SWHOrchestratorTextContentsTask') } CONFIG_BASE_FILENAME = 'indexer/mimetype' def __init__(self): super().__init__() self.working_directory = self.config['workdir'] destination_queue = self.config['destination_queue'] self.task_destination = app.tasks[destination_queue] def filter_contents(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_mimetype_missing(sha1s) def index_content(self, sha1, content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier content (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ filename = hashutil.hash_to_hex(sha1) content_path = self.write_to_temp( filename=filename, data=content) properties = compute_mimetype_encoding(content_path) properties.update({ 'id': sha1, }) self.cleanup(content_path) return properties - def persist_index_computations(self, results): + def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: - results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them """ - self.storage.content_mimetype_add(results) + self.storage.content_mimetype_add( + results, conflict_update=(policy_update == 'update-dups')) def _filter_text(self, results): """Filter sha1 whose raw content is text. """ for result in results: if b'binary' in result['encoding']: continue yield result['id'] def next_step(self, results): """When the computations is done, we'd like to send over only text contents to the text content orchestrator. Args: results ([dict]): List of content_mimetype results, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ self.task_destination.delay(list(self._filter_text(results))) @click.command() @click.option('--path', help="Path to execute index on") def main(path): print(compute_mimetype_encoding(path)) if __name__ == '__main__': main() diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index f799283..20f6a65 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,90 +1,94 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.core.config import SWHConfig from swh.scheduler.celery_backend.config import app from . import TASK_NAMES, INDEXER_CLASSES class BaseOrchestratorIndexer(SWHConfig): """The indexer orchestrator is in charge of: - reading batch of contents (list of sha1s as bytes) - according to its configuration, filter or not the contents - and then broadcast those contents to indexers """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' DEFAULT_CONFIG = { 'indexers': ('[str]', ['mimetype']), 'check_presence': ('bool', 'true'), } def __init__(self): super().__init__() self.config = self.parse_config_file() indexer_names = self.config['indexers'] random.shuffle(indexer_names) self.indexers = { TASK_NAMES[name]: INDEXER_CLASSES[name] for name in indexer_names } self.check_presence = self.config['check_presence'] def run_with_check(self, sha1s): - """Run with checking the presence on sha1s in db. + """Run with checking the presence on sha1s in db to filter them out. """ celery_tasks = [] for task_name, indexer_class in self.indexers.items(): indexer = indexer_class() # filter the contents per indexers sha1s_filtered = list(indexer.filter_contents(sha1s)) if not sha1s_filtered: continue - celery_task = app.tasks[task_name].s(sha1s_filtered) + # send message for indexer to compute and store results on + # filtered sha1s + celery_task = app.tasks[task_name].s(sha1s=sha1s_filtered, + policy_update='ignore-dups') celery_tasks.append(celery_task) return celery_tasks def run_no_check(self, sha1s): - """Simply broadcase sha1s to the indexers' queue. + """Simply broadcast sha1s to the indexers' queue. """ celery_tasks = [] for task_name, _ in self.indexers.items(): # send message for indexer to compute and store results - celery_task = app.tasks[task_name].s(sha1s) + celery_task = app.tasks[task_name].s(sha1s=sha1s, + policy_update='update-dups') celery_tasks.append(celery_task) return celery_tasks def run(self, sha1s): if self.check_presence: celery_tasks = self.run_with_check(sha1s) else: celery_tasks = self.run_no_check(sha1s) group(celery_tasks).delay() class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of any types of contents. """ class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of text contents. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator_text'