diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py new file mode 100644 index 0000000..014c42b --- /dev/null +++ b/swh/indexer/__init__.py @@ -0,0 +1,22 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .file_properties import ContentMimetypeIndexer + + +INDEXER_CLASSES = { + 'mimetype': ContentMimetypeIndexer, +} + + +TASK_NAMES = { + 'orchestrator': 'swh.indexer.tasks.SWHOrchestratorTask', + 'mimetype': 'swh.indexer.tasks.SWHContentMimetypeTask', +} + + +__all__ = [ + 'INDEXER_CLASSES', 'TASK_NAMES' +] diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 67f1284..4b08139 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,178 +1,184 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import shutil import tempfile -from swh.core import SWHConfig -from swh.storage import get_storage +from swh.core.config import SWHConfig from swh.objstorage import get_objstorage +from swh.storage import get_storage class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. Indexers can: - filter out sha1 whose data has already been indexed. - retrieve sha1's content from objstorage, index this content then store the result in storage. Thus the following interface to implement per inheriting class: - def filter: filter out data already indexed (in storage) - def index: compute index on data (stored by sha1 in objstorage) and store result in storage. """ CONFIG_BASE_FILENAME = 'indexer/indexer' DEFAULT_CONFIG = { 'storage': ('dict', { 'host': 'uffizi', 'cls': 'pathslicing', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'} }), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) storage = self.config['storage'] self.storage = get_storage(storage['cls'], storage['args']) @abc.abstractmethod def filter_contents(self, sha1s): """Filter missing sha1 for that particular indexer. Args: sha1s ([bytes]): list of contents' sha1 Yields: iterator of missing sha1 """ pass def index_contents(self, sha1s): """Given a list of sha1s: - retrieve the content from the storage - execute the indexing computations - store the results """ results = [] for sha1 in sha1s: content = self.objstorage.get(sha1) res = self.index_content(sha1, content) results.append(res) self.persist_index_computations(results) @abc.abstractmethod def index_content(self, sha1, content): pass @abc.abstractmethod def persist_index_computations(self, results): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index_content function. """ pass + def run(self, sha1s): + """Main entry point for the base indexer. + + """ + self.index_contents(sha1s) + class DiskIndexer: """Mixin intended to be used with other *Indexer Class. Indexer* inheriting from this class are a category of indexers which needs the disk for their computations. Expects: Have the self.working_directory defined at runtime. """ def __init__(self): super().__init__() def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py new file mode 100644 index 0000000..a3dbae6 --- /dev/null +++ b/swh/indexer/orchestrator.py @@ -0,0 +1,46 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random + +from swh.core.config import SWHConfig +from swh.scheduler.celery_backend.config import app + +from . import TASK_NAMES, INDEXER_CLASSES + + +class OrchestratorIndexer(SWHConfig): + """The indexer orchestrator is in charge of: + - reading messages (list of sha1s as bytes) + - according to its configuration, broadcast those messages to indexers + - by eventually filtering by indexers + + """ + CONFIG_BASE_FILENAME = 'indexer/orchestrator' + + DEFAULT_CONFIG = { + 'indexers': ('[str]', ['mimetype']), + } + + def __init__(self): + super().__init__() + self.config = self.parse_config_file() + indexer_names = self.config['indexers'] + random.shuffle(indexer_names) + self.indexers = { + TASK_NAMES[name]: INDEXER_CLASSES[name] + for name in indexer_names + if name in app.tasks + } + + def run(self, sha1s): + for task_name, indexer_class in self.indexers.items(): + indexer = indexer_class() + + # first filter the contents per indexers + sha1s_filtered = indexer.filter_contents(sha1s) + + # now send the message for the indexer to compute and store results + app.tasks[task_name].delay(sha1s_filtered) diff --git a/swh/indexer/producer.py b/swh/indexer/producer.py index ef03140..8822997 100755 --- a/swh/indexer/producer.py +++ b/swh/indexer/producer.py @@ -1,124 +1,84 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os import random from swh.core.config import SWHConfig -from swh.core import hashutil, utils +from swh.core import utils from swh.objstorage import get_objstorage -from swh.storage import get_storage from swh.scheduler.celery_backend.config import app -from . import tasks # noqa +from . import tasks, TASK_NAMES # noqa -task_name = 'swh.indexer.tasks.SWHReaderTask' -task_destination = 'swh.indexer.tasks.SWHFilePropertiesTask' +task_name = TASK_NAMES['orchestrator'] -reader_task = app.tasks[task_name] +orchestrator_task = app.tasks[task_name] class ContentIndexerProducer(SWHConfig): DEFAULT_CONFIG = { - 'storage': ('dict', { - 'cls': 'remote_storage', - 'args': ['http://localhost:5000/'], - }), 'objstorage': ('dict', { 'cls': 'pathslicing', 'args': { 'slicing': '0:2/2:4/4:6', 'root': '/srv/softwareheritage/objects/' }, }), 'batch': ('int', 10), 'limit': ('str', 'none'), } - CONFIG_BASE_FILENAME = 'indexer/reader' + CONFIG_BASE_FILENAME = 'indexer/producer' def __init__(self): super().__init__() self.config = self.parse_config_file() - storage = self.config['storage'] - self.storage = get_storage(storage['cls'], storage['args']) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) self.limit = self.config['limit'] if self.limit == 'none': self.limit = None else: self.limit = int(self.limit) self.batch = self.config['batch'] - def _get_random_name(self, sha1, revision_paths, total_retry=10): - """Retrieve a random name which is utf-8 decodable. - - """ - retry = 0 - while retry <= total_retry: - name = random.choice(revision_paths) - try: - return name.decode('utf-8') - except UnicodeDecodeError as e: - print('sha1 %s with path %s is not utf-8 decodable - %s' % ( - sha1, name, e)) - pass - retry += 1 - def get_contents(self): """Read contents and retrieve randomly one possible path. """ - for sha1 in self.objstorage: - c = self.storage.cache_content_get({'sha1': sha1}) - if not c: - print('No reference found for %s' % sha1) - continue - revision_paths = [ - os.path.basename(path) for _, path in c['revision_paths'] - ] - - name = self._get_random_name(sha1, revision_paths) - if not name: # nothing found, drop that content (for now) - print('No valid path found for %s' % sha1) - continue - - yield { - 'sha1': hashutil.hash_to_hex(sha1), - 'name': name, - } + yield from self.objstorage def gen_sha1(self): """Generate batch of grouped sha1s from the objstorage. """ for sha1s in utils.grouper(self.get_contents(), self.batch): sha1s = list(sha1s) random.shuffle(sha1s) yield sha1s def run_with_limit(self): count = 0 for sha1s in self.gen_sha1(): count += len(sha1s) print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) - reader_task.delay(sha1s, task_destination) + orchestrator_task.delay(sha1s) if count >= self.limit: return def run_no_limit(self): for sha1s in self.gen_sha1(): print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) - reader_task.delay(sha1s, task_destination) + orchestrator_task.delay(sha1s) def run(self, *args, **kwargs): if self.limit: self.run_with_limit() else: self.run_no_limit() + if __name__ == '__main__': ContentIndexerProducer().run() diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py index 41e6ade..a487ad8 100644 --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,17 +1,30 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task + +from .orchestrator import OrchestratorIndexer from .file_properties import ContentMimetypeIndexer +class SWHOrchestratorTask(Task): + """Main task in charge of reading messages and broadcasting them back + to other tasks. + + """ + task_queue = 'swh_indexer_orchestrator' + + def run(self, *args, **kwargs): + OrchestratorIndexer().run(*args, **kwargs) + + class SWHContentMimetypeTask(Task): - """Main task which computes the mimetype, encoding from the sha1's content. + """Task which computes the mimetype, encoding from the sha1's content. """ - task_queue = 'swh_indexer_worker_content_mimetype' + task_queue = 'swh_indexer_content_mimetype' def run(self, *args, **kwargs): ContentMimetypeIndexer().run(*args, **kwargs)