diff --git a/swh/indexer/file_properties.py b/swh/indexer/file_properties.py index 25bdb29..6067745 100644 --- a/swh/indexer/file_properties.py +++ b/swh/indexer/file_properties.py @@ -1,39 +1,107 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import subprocess +from swh.core import hashutil + +from .indexer import BaseIndexer, DiskIndexer + def run_file_properties(path): """Determine mimetype and encoding from file at path. Args: path: filepath to determine the mime type Returns: A dict with mimetype and encoding key and corresponding values. """ cmd = ['file', '--mime-type', '--mime-encoding', path] - properties = subprocess.check_output(cmd, universal_newlines=True) + properties = subprocess.check_output(cmd) if properties: - res = properties.split(': ')[1].strip().split('; ') + res = properties.split(b': ')[1].strip().split(b'; ') mimetype = res[0] - encoding = res[1].split('=')[1] + encoding = res[1].split(b'=')[1] return { 'mimetype': mimetype, 'encoding': encoding } +class ContentMimetypeIndexer(BaseIndexer, DiskIndexer): + """Indexer in charge of: + - filtering out content already indexed + - reading content from objstorage per the content's id (sha1) + - computing {mimetype, encoding} from that content + - store result in storage + + """ + ADDITIONAL_CONFIG = { + 'workdir': ('str', '/tmp/swh/worker.file.properties'), + } + + def __init__(self): + super().__init__() + self.working_directory = self.config['workdir'] + + def filter_contents(self, sha1s): + """Filter out known sha1s and return only missing ones. + + """ + yield from self.storage.content_mimetype_missing(sha1s) + + def index_content(self, sha1, content): + """Index sha1s' content and store result. + + Args: + sha1 (bytes): content's identifier + content (bytes): raw content in bytes + + Returns: + A dict, representing a content_mimetype, with keys: + - id (bytes): content's identifier (sha1) + - mimetype (bytes): mimetype in bytes + - encoding (bytes): encoding in bytes + + """ + filename = hashutil.hash_to_hex(sha1) + content_path = self.write_to_temp( + filename=filename, + data=content) + + properties = run_file_properties(content_path) + properties.update({ + 'id': sha1, + }) + + self.cleanup(content_path) + return properties + + def persist_index_computations(self, results): + """Persist the results in storage. + + Args: + + results ([dict]): list of content_mimetype, dict with the + following keys: + - id (bytes): content's identifier (sha1) + - mimetype (bytes): mimetype in bytes + - encoding (bytes): encoding in bytes + + """ + self.storage.content_mimetype_add(results) + + @click.command() @click.option('--path', help="Path to execute index on") def main(path): print(run_file_properties(path)) if __name__ == '__main__': main() diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py new file mode 100644 index 0000000..67f1284 --- /dev/null +++ b/swh/indexer/indexer.py @@ -0,0 +1,178 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import abc +import os +import shutil +import tempfile + +from swh.core import SWHConfig +from swh.storage import get_storage +from swh.objstorage import get_objstorage + + +class BaseIndexer(SWHConfig, + metaclass=abc.ABCMeta): + """Base class for indexers to inherit from. + Indexers can: + - filter out sha1 whose data has already been indexed. + - retrieve sha1's content from objstorage, index this content then + store the result in storage. + + Thus the following interface to implement per inheriting class: + - def filter: filter out data already indexed (in storage) + - def index: compute index on data (stored by sha1 in + objstorage) and store result in storage. + + """ + CONFIG_BASE_FILENAME = 'indexer/indexer' + + DEFAULT_CONFIG = { + 'storage': ('dict', { + 'host': 'uffizi', + 'cls': 'pathslicing', + 'args': {'root': '/tmp/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'} + }), + 'objstorage': ('dict', { + 'cls': 'multiplexer', + 'args': { + 'objstorages': [{ + 'cls': 'filtered', + 'args': { + 'storage_conf': { + 'cls': 'azure-storage', + 'args': { + 'account_name': '0euwestswh', + 'api_secret_key': 'secret', + 'container_name': 'contents' + } + }, + 'filters_conf': [ + {'type': 'readonly'}, + {'type': 'prefix', 'prefix': '0'} + ] + } + }, { + 'cls': 'filtered', + 'args': { + 'storage_conf': { + 'cls': 'azure-storage', + 'args': { + 'account_name': '1euwestswh', + 'api_secret_key': 'secret', + 'container_name': 'contents' + } + }, + 'filters_conf': [ + {'type': 'readonly'}, + {'type': 'prefix', 'prefix': '1'} + ] + } + }] + }, + }), + } + + ADDITIONAL_CONFIG = {} + + def __init__(self): + super().__init__() + self.config = self.parse_config_file( + additional_configs=[self.ADDITIONAL_CONFIG]) + objstorage = self.config['objstorage'] + self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) + storage = self.config['storage'] + self.storage = get_storage(storage['cls'], storage['args']) + + @abc.abstractmethod + def filter_contents(self, sha1s): + """Filter missing sha1 for that particular indexer. + + Args: + sha1s ([bytes]): list of contents' sha1 + + Yields: + iterator of missing sha1 + + """ + pass + + def index_contents(self, sha1s): + """Given a list of sha1s: + - retrieve the content from the storage + - execute the indexing computations + - store the results + + """ + results = [] + for sha1 in sha1s: + content = self.objstorage.get(sha1) + res = self.index_content(sha1, content) + results.append(res) + + self.persist_index_computations(results) + + @abc.abstractmethod + def index_content(self, sha1, content): + pass + + @abc.abstractmethod + def persist_index_computations(self, results): + """Persist the computation resulting from the index. + + Args: + results ([result]): List of results. One result is the + result of the index_content function. + + """ + pass + + +class DiskIndexer: + """Mixin intended to be used with other *Indexer Class. + + Indexer* inheriting from this class are a category of indexers + which needs the disk for their computations. + + Expects: + Have the self.working_directory defined at runtime. + + """ + def __init__(self): + super().__init__() + + def write_to_temp(self, filename, data): + """Write the sha1's content in a temporary file. + + Args: + sha1 (str): the sha1 name + filename (str): one of sha1's many filenames + data (bytes): the sha1's content to write in temporary + file + + Returns: + The path to the temporary file created. That file is + filled in with the raw content's data. + + """ + os.makedirs(self.working_directory, exist_ok=True) + temp_dir = tempfile.mkdtemp(dir=self.working_directory) + content_path = os.path.join(temp_dir, filename) + + with open(content_path, 'wb') as f: + f.write(data) + + return content_path + + def cleanup(self, content_path): + """Remove content_path from working directory. + + Args: + content_path (str): the file to remove + + """ + temp_dir = os.path.dirname(content_path) + shutil.rmtree(temp_dir) diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py index 819bf64..41e6ade 100644 --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,48 +1,17 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task -from .worker import ReaderWorker, FilePropertiesWorker, LanguageWorker -from .worker import CtagsWorker +from .file_properties import ContentMimetypeIndexer -class SWHReaderTask(Task): - """Main task that read from storage the sha1's content. +class SWHContentMimetypeTask(Task): + """Main task which computes the mimetype, encoding from the sha1's content. """ - task_queue = 'swh_indexer_worker_reader' + task_queue = 'swh_indexer_worker_content_mimetype' def run(self, *args, **kwargs): - ReaderWorker().run(*args, **kwargs) - - -class SWHFilePropertiesTask(Task): - """Main task which computes the mime type from the sha1's content. - - """ - task_queue = 'swh_indexer_worker_file_properties' - - def run(self, *args, **kwargs): - FilePropertiesWorker().run(*args, **kwargs) - - -class SWHLanguageTask(Task): - """Main task which computes the language from the sha1's content. - - """ - task_queue = 'swh_indexer_worker_language' - - def run(self, *args, **kwargs): - LanguageWorker().run(*args, **kwargs) - - -class SWHCtagsTask(Task): - """Main task which computes the ctags from the sha1's content. - - """ - task_queue = 'swh_indexer_worker_ctags' - - def run(self, *args, **kwargs): - CtagsWorker().run(*args, **kwargs) + ContentMimetypeIndexer().run(*args, **kwargs) diff --git a/swh/indexer/worker.py b/swh/indexer/worker.py deleted file mode 100644 index 24f0b3e..0000000 --- a/swh/indexer/worker.py +++ /dev/null @@ -1,353 +0,0 @@ -# Copyright (C) 2016 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import abc -import os -import shutil -import tempfile - -from . import file_properties, converters, language, ctags -from .storage import Storage - -from swh.scheduler.celery_backend.config import app -from swh.core import hashutil -from swh.core.config import SWHConfig -from swh.core.serializers import msgpack_dumps, msgpack_loads -from swh.objstorage import get_objstorage - - -class BaseWorker(SWHConfig, metaclass=abc.ABCMeta): - """Base worker for the indexing computations. - - Inherit from this class and override the following properties: - - ADDITIONAL_CONFIG: to add a dictionary of extra configuration - - CONFIG_BASE_FILENAME: the default configuration file to lookup for - - def compute(self, *args, **kwargs): method in charge of doing - the actual computation on sha1 or sha1's content. - - """ - DEFAULT_CONFIG = { - 'db': ('dict', { - 'conn': 'mongodb://mongodb0.example.net:27017', - 'name': 'content', - }), - 'next_task_queue': ('str', 'next.plugged.task.queue'), - } - - ADDITIONAL_CONFIG = {} - - CONFIG_BASE_FILENAME = 'indexer/worker' - - def __init__(self): - super().__init__() - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - # Next task queue to send message too (can be empty for final step) - next_task_queue = self.config.get('next_task_queue', None) - if not next_task_queue: - next_task_queue = None - self.next_task_queue = next_task_queue - - @abc.abstractmethod - def compute(self, content): - """Method in charge of actual computations on the dictionary content. - - Args: - - content (dict): a content with at least the 'sha1' key filled in. - - Returns: - The updated content - - """ - pass - - def encode(self, content): - content_copy = content - if 'data' in content: - content_copy['data'] = msgpack_dumps(content['data']) - - return content_copy - - def decode(self, content): - content_copy = content - if 'data' in content: - content_copy['data'] = msgpack_loads(content['data']) - - return content_copy - - def compute_contents(self, contents_packed): - """Compute what's necessary for each content and yield the updated - content. - - """ - for content_packed in contents_packed: - content = self.decode(content_packed) - content_updated = self.compute(content) - yield self.encode(content_updated) - - def run(self, contents_packed, task_destination=None, **kwargs): - """Compute from a sha1 or sha1's content and then propagate the result - to another queue. - - """ - contents_updated_pack = list(self.compute_contents(contents_packed)) - if task_destination: - task = app.tasks[task_destination] - task.delay(contents_updated_pack, self.next_task_queue) - - -class ReaderWorker(BaseWorker): - """Class in charge of reading the sha1's content from objstorage and - flush its contents in another queue. - - Note: The default config below demonstrates a configuration for a - multiplexer objstorage. One which can only read from multiple - objstorages based on the sha1's prefix. - - """ - CONFIG_BASE_FILENAME = 'indexer/reader' - - ADDITIONAL_CONFIG = { - 'next_task_queue': ( - 'str', 'swh.indexer.tasks.SWHFilePropertiesTask'), - 'objstorage': ('dict', { - 'cls': 'multiplexer', - 'args': { - 'objstorages': [{ - 'cls': 'filtered', - 'args': { - 'storage_conf': { - 'cls': 'azure-storage', - 'args': { - 'account_name': '0euwestswh', - 'api_secret_key': 'secret', - 'container_name': 'contents' - } - }, - 'filters_conf': [ - {'type': 'readonly'}, - {'type': 'prefix', 'prefix': '0'} - ] - } - }, { - 'cls': 'filtered', - 'args': { - 'storage_conf': { - 'cls': 'azure-storage', - 'args': { - 'account_name': '1euwestswh', - 'api_secret_key': 'secret', - 'container_name': 'contents' - } - }, - 'filters_conf': [ - {'type': 'readonly'}, - {'type': 'prefix', 'prefix': '1'} - ] - } - }] - }, - }), - } - - def __init__(self): - super().__init__() - objstorage = self.config['objstorage'] - self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) - - def compute(self, content): - """Compute from the sha1 its content and returns it. - - """ - content_copy = content.copy() - sha1 = hashutil.hex_to_hash(content['sha1']) - data = self.objstorage.get(sha1) - content_copy['data'] = data - return content_copy - - -class DiskWorker: - """Mixin intended to be used with other *Worker Class. - - Worker inheriting from this class are a category of workers - which needs the disk for their computations. - - Expects: - Have the self.working_directory defined at runtime. - - """ - def __init__(self): - super().__init__() - - def write_to_temp(self, filename, data): - """Write the sha1's content in a temporary file. - - Args: - sha1 (str): the sha1 name - filename (str): one of sha1's many filenames - data (bytes): the sha1's content to write in temporary - file - - Returns: - The path to the temporary file created. That file is - filled in with the raw content's data. - - """ - os.makedirs(self.working_directory, exist_ok=True) - temp_dir = tempfile.mkdtemp(dir=self.working_directory) - content_path = os.path.join(temp_dir, filename) - - with open(content_path, 'wb') as f: - f.write(data) - - return content_path - - def cleanup(self, content_path): - """Remove content_path from working directory. - - Args: - content_path (str): the file to remove - - """ - temp_dir = os.path.dirname(content_path) - shutil.rmtree(temp_dir) - - -class PersistResultWorker: - """Mixin intended to be used with other *Worker Class. - - Worker inheriting from this class are a category of workers - which are able to perist the computed data in storage. - - Expects: - Have the self.storage defined at runtime. - - """ - def save(self, content): - """Store the content in storage except for the raw data. - - Args: - content: dict with the following keys: - - sha1: content id - - data: raw data for the content - - mimetype: its mimetype - - encoding: its encoding - - """ - content_to_store = converters.content_to_storage(content) - self.storage.content_add(content_to_store) - - -class FilePropertiesWorker(BaseWorker, DiskWorker, PersistResultWorker): - """Worker in charge of computing the properties of the file content. - - """ - CONFIG_BASE_FILENAME = 'indexer/file_properties' - ADDITIONAL_CONFIG = { - 'workdir': ('str', '/tmp/swh/worker.file.properties'), - 'next_task_queue': ('str', 'swh.indexer.tasks.SWHLanguageTask'), - } - - def __init__(self): - super().__init__() - db = self.config['db'] - self.storage = Storage(db_conn=db['conn'], db_name=db['name']) - self.working_directory = self.config['workdir'] - - def compute(self, content): - """Compute the mimetype of the content, updates the content, stores - the result and return the updated result. - - """ - content_copy = content.copy() - content_path = self.write_to_temp( - filename=content['name'], - data=content['data']) - - properties = file_properties.run_file_properties(content_path) - content_copy.update(properties) - - self.save(content_copy) - self.cleanup(content_path) - - return content_copy - - -class LanguageWorker(BaseWorker, DiskWorker, PersistResultWorker): - """Worker in charge of computing the mimetype of a content. - - """ - CONFIG_BASE_FILENAME = 'indexer/language' - ADDITIONAL_CONFIG = { - 'workdir': ('str', '/tmp/swh/worker.language/'), - 'next_task_queue': ('str', 'swh.indexer.tasks.SWHCtagsTask'), - } - - def __init__(self): - super().__init__() - db = self.config['db'] - self.storage = Storage(db_conn=db['conn'], db_name=db['name']) - self.working_directory = self.config['workdir'] - - def compute(self, content): - """Compute the mimetype of the content, updates the content, stores - the result and return the updated result. - - """ - content_copy = content.copy() - encoding = content['encoding'] - if encoding == 'binary': - content_copy['lang'] = None - self.save(content_copy) - return content_copy - - content_path = self.write_to_temp( - filename=content['name'], - data=content['data']) - - lang = language.run_language(content_path, encoding=encoding) - content_copy.update(lang) - - self.save(content_copy) - self.cleanup(content_path) - - return content_copy - - -class CtagsWorker(BaseWorker, DiskWorker, PersistResultWorker): - CONFIG_BASE_FILENAME = 'indexer/ctags' - ADDITIONAL_CONFIG = { - 'workdir': ('str', '/tmp/swh/worker.ctags/'), - 'next_task_queue': ('str', ''), # empty for final step - } - - def __init__(self): - super().__init__() - db = self.config['db'] - self.storage = Storage(db_conn=db['conn'], db_name=db['name']) - self.working_directory = self.config['workdir'] - - def compute(self, content): - """Compute the mimetype of the content, updates the content, stores - the result and return the updated result. - - """ - # Bypass binary content or content with decoding error - content_copy = content.copy() - encoding = content['encoding'] - if encoding == 'binary' or 'decoding_failure' in content: - content_copy['ctags'] = None - self.save(content_copy) - return content_copy - - content_path = self.write_to_temp( - filename=content['name'], - data=content['data']) - ctagsfile = ctags.run_ctags( - path=content_path, lang=content.get('lang')) - content_copy['ctags'] = list(ctags.parse_ctags(ctagsfile)) - self.save(content_copy) - self.cleanup(content_path) - return content_copy