diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py index eafbb92..7ad5ba4 100644 --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -1,31 +1,4 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - - -INDEXER_CLASSES = { - 'indexer_mimetype': 'swh.indexer.mimetype.ContentMimetypeIndexer', - 'indexer_language': 'swh.indexer.language.ContentLanguageIndexer', - 'indexer_ctags': 'swh.indexer.ctags.CtagsIndexer', - 'indexer_fossology_license': - 'swh.indexer.fossology_license.ContentFossologyLicenseIndexer', -} - - -TASK_NAMES = { - 'indexer_orchestrator_all': 'swh.indexer.tasks.OrchestratorAllContents', - 'indexer_orchestrator_text': 'swh.indexer.tasks.OrchestratorTextContents', - 'indexer_mimetype': 'swh.indexer.tasks.ContentMimetype', - 'indexer_language': 'swh.indexer.tasks.ContentLanguage', - 'indexer_ctags': 'swh.indexer.tasks.Ctags', - 'indexer_fossology_license': 'swh.indexer.tasks.ContentFossologyLicense', - 'indexer_rehash': 'swh.indexer.tasks.RecomputeChecksums', - 'indexer_revision_metadata': 'swh.indexer.tasks.RevisionMetadata', - 'indexer_origin_intrinsic_metadata': 'swh.indexer.tasks.OriginMetadata', -} - - -__all__ = [ - 'INDEXER_CLASSES', 'TASK_NAMES', -] diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 9015e4a..c5af240 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,507 +1,505 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile import datetime from copy import deepcopy from swh.scheduler import get_scheduler from swh.storage import get_storage from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY class DiskIndexer: """Mixin intended to be used with other SomethingIndexer classes. Indexers inheriting from this class are a category of indexers which needs the disk for their computations. Note: This expects `self.working_directory` variable defined at runtime. """ def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :func:`run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :func:`filter`: - filter out data already indexed (in storage). This function is used by - the orchestrator and not directly by the indexer - (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). + filter out data already indexed (in storage). :func:`index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :func:`persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :func:`prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :func:`check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :func:`register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ CONFIG = 'indexer/base' DEFAULT_CONFIG = { INDEXER_CFG_KEY: ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5007/' } }), 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): """Prepare and check that the indexer is ready to run. """ super().__init__() self.prepare() self.check() def prepare(self): """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) if self.config['storage']: self.storage = get_storage(**self.config['storage']) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) _log = logging.getLogger('requests.packages.urllib3.connectionpool') _log.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = list(self.register_tools(self.config['tools'])) def check(self): """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if not self.tools: raise ValueError('Tools %s is unknown, cannot continue' % self.tools) def _prepare_tool(self, tool): """Prepare the tool dict to be compliant with the storage api. """ return {'tool_%s' % key: value for key, value in tool.items()} def register_tools(self, tools): """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools (dict/[dict]): Either a dict or a list of dict. Returns: List of dict with additional id key. Raises: ValueError if not a list nor a dict. """ tools = self.config['tools'] if isinstance(tools, list): tools = map(self._prepare_tool, tools) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError('Configuration tool(s) must be a dict or list!') return self.idx_storage.indexer_configuration_add(tools) @abc.abstractmethod def filter(self, ids): """Filter missing ids for that particular indexer. Args: ids ([bytes]): list of ids Yields: iterator of missing ids """ pass @abc.abstractmethod def index(self, id, data): """Index computation for the id and associated raw data. Args: id (bytes): identifier data (bytes): id's data from storage or objstorage depending on object type Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them Returns: None """ pass def next_step(self, results, task): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned by index function. task (dict): a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus a `result_name` key. Returns: None """ if task: if getattr(self, 'scheduler', None): scheduler = self.scheduler else: scheduler = get_scheduler(**self.config['scheduler']) task = deepcopy(task) result_name = task.pop('result_name') task['next_run'] = datetime.datetime.now() task['arguments']['kwargs'][result_name] = self.results scheduler.create_tasks([task]) @abc.abstractmethod def run(self, ids, policy_update, next_step=None, **kwargs): """Given a list of ids: - retrieves the data from the storage - executes the indexing computations - stores the results (according to policy_update) Args: ids ([bytes]): id's identifier list policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them next_step (dict): a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ pass class ContentIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Content indexing using the run method Note: the :class:`ContentIndexer` is not an instantiable object. To use it in another context, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids, policy_update, next_step=None, **kwargs): """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([bytes]): sha1's identifier list policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them next_step (dict): a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ results = [] try: for sha1 in ids: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) self.results = results return self.next_step(results, task=next_step) except Exception: self.log.exception( 'Problem when reading contents metadata.') class OriginIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids, policy_update, parse_ids=False, next_step=None, **kwargs): """Given a list of origin ids: - retrieve origins from storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or (type, url) tuples. policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them parse_ids (bool: If `True`, will try to convert `ids` from a human input to the valid type. next_step (dict): a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ if parse_ids: ids = [ o.split('+', 1) if ':' in o else int(o) # type+url or id for o in ids] results = [] for id_ in ids: if isinstance(id_, (tuple, list)): if len(id_) != 2: raise TypeError('Expected a (type, url) tuple.') (type_, url) = id_ params = {'type': type_, 'url': url} elif isinstance(id_, int): params = {'id': id_} else: raise TypeError('Invalid value in "ids": %r' % id_) origin = self.storage.origin_get(params) if not origin: self.log.warning('Origins %s not found in storage' % list(ids)) continue try: res = self.index(origin, **kwargs) if origin: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing origin %s' % id_) self.persist_index_computations(results, policy_update) self.results = results return self.next_step(results, task=next_step) class RevisionIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Revision indexing using the run method Note: the :class:`RevisionIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids, policy_update, next_step=None): """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([bytes or str]): sha1_git's identifier list policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ results = [] ids = [id_.encode() if isinstance(id_, str) else id_ for id_ in ids] revs = self.storage.revision_get(ids) for rev in revs: if not rev: self.log.warning('Revisions %s not found in storage' % list(map(hashutil.hash_to_hex, ids))) continue try: res = self.index(rev) if res: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing revision') self.persist_index_computations(results, policy_update) self.results = results return self.next_step(results, task=next_step) diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py deleted file mode 100644 index 12416e6..0000000 --- a/swh/indexer/orchestrator.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright (C) 2016-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import random - -from swh.core.config import SWHConfig -from swh.core.utils import grouper -from swh.scheduler import utils -from swh.scheduler import get_scheduler -from swh.scheduler.utils import create_task_dict - - -def get_class(clazz): - """Get a symbol class dynamically by its fully qualified name string - representation. - - """ - parts = clazz.split('.') - module = '.'.join(parts[:-1]) - m = __import__(module) - for comp in parts[1:]: - m = getattr(m, comp) - return m - - -class BaseOrchestratorIndexer(SWHConfig): - """The indexer orchestrator is in charge of dispatching batch of - contents (filtered or not based on presence) to indexers. - - That dispatch is indexer specific, so the configuration reflects it: - - - when `check_presence` flag is true, filter out the - contents already present for that indexer, otherwise send - everything - - - broadcast those (filtered or not) contents to indexers in a - `batch_size` fashioned - - For example:: - - indexers: - indexer_mimetype: - batch_size: 10 - check_presence: false - indexer_language: - batch_size: 2 - check_presence: true - - means: - - - send all contents received as batch of size 10 to the 'mimetype' indexer - - send only unknown contents as batch of size 2 to the 'language' indexer. - - """ - CONFIG_BASE_FILENAME = 'indexer/orchestrator' - - # Overridable in child classes. - from . import TASK_NAMES, INDEXER_CLASSES - - DEFAULT_CONFIG = { - 'scheduler': { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5008', - }, - }, - 'indexers': ('dict', { - 'indexer_mimetype': { - 'batch_size': 10, - 'check_presence': True, - }, - }), - } - - def __init__(self): - super().__init__() - self.config = self.parse_config_file() - self.prepare_tasks() - self.prepare_scheduler() - - def prepare_scheduler(self): - self.scheduler = get_scheduler(**self.config['scheduler']) - - def prepare_tasks(self): - indexer_names = list(self.config['indexers']) - random.shuffle(indexer_names) - indexers = {} - tasks = {} - for name in indexer_names: - if name not in self.TASK_NAMES: - raise ValueError('%s must be one of %s' % ( - name, ', '.join(self.TASK_NAMES))) - - opts = self.config['indexers'][name] - indexers[name] = ( - self.INDEXER_CLASSES[name], - opts['check_presence'], - opts['batch_size']) - tasks[name] = utils.get_task(self.TASK_NAMES[name]) - - self.indexers = indexers - self.tasks = tasks - - def run(self, ids): - for task_name, task_attrs in self.indexers.items(): - (idx_class, filtering, batch_size) = task_attrs - if filtering: - policy_update = 'ignore-dups' - indexer_class = get_class(idx_class) - ids_filtered = list(indexer_class().filter(ids)) - if not ids_filtered: - continue - else: - policy_update = 'update-dups' - ids_filtered = ids - - tasks = [] - for ids_to_send in grouper(ids_filtered, batch_size): - tasks.append(create_task_dict( - task_name, - 'oneshot', - ids=list(ids_to_send), - policy_update=policy_update, - )) - self._create_tasks(tasks) - - def _create_tasks(self, tasks): - self.scheduler.create_tasks(tasks) - - -class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): - """Orchestrator which deals with batch of any types of contents. - - """ - - -class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): - """Orchestrator which deals with batch of text contents. - - """ - CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py index 9de1aa0..54123ac 100644 --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -1,217 +1,217 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import click import logging from swh.scheduler import get_scheduler from swh.scheduler.utils import create_task_dict from swh.indexer.indexer import OriginIndexer class OriginHeadIndexer(OriginIndexer): """Origin-level indexer. This indexer is in charge of looking up the revision that acts as the "head" of an origin. In git, this is usually the commit pointed to by the 'master' branch.""" ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'origin-metadata', 'version': '0.0.1', 'configuration': {}, }), } CONFIG_BASE_FILENAME = 'indexer/origin_head' revision_metadata_task = 'revision_metadata' origin_intrinsic_metadata_task = 'origin_metadata' def filter(self, ids): yield from ids def persist_index_computations(self, results, policy_update): """Do nothing. The indexer's results are not persistent, they - should only be piped to another indexer via the orchestrator.""" + should only be piped to another indexer.""" pass def next_step(self, results, task): """Once the head is found, call the RevisionMetadataIndexer on these revisions, then call the OriginMetadataIndexer with both the origin_id and the revision metadata, so it can copy the revision metadata to the origin's metadata. Args: results (Iterable[dict]): Iterable of return values from `index`. """ super().next_step(results, task) if self.revision_metadata_task is None and \ self.origin_intrinsic_metadata_task is None: return assert self.revision_metadata_task is not None assert self.origin_intrinsic_metadata_task is not None # Second task to run after this one: copy the revision's metadata # to the origin sub_task = create_task_dict( self.origin_intrinsic_metadata_task, 'oneshot', origin_head={ str(result['origin_id']): result['revision_id'].decode() for result in results}, policy_update='update-dups', ) del sub_task['next_run'] # Not json-serializable # First task to run after this one: index the metadata of the # revision task = create_task_dict( self.revision_metadata_task, 'oneshot', ids=[res['revision_id'].decode() for res in results], policy_update='update-dups', next_step={ **sub_task, 'result_name': 'revisions_metadata'}, ) if getattr(self, 'scheduler', None): scheduler = self.scheduler else: scheduler = get_scheduler(**self.config['scheduler']) scheduler.create_tasks([task]) # Dispatch def index(self, origin): origin_id = origin['id'] latest_snapshot = self.storage.snapshot_get_latest(origin_id) method = getattr(self, '_try_get_%s_head' % origin['type'], None) if method is None: method = self._try_get_head_generic rev_id = method(latest_snapshot) if rev_id is None: return None result = { 'origin_id': origin_id, 'revision_id': rev_id, } return result # VCSs def _try_get_vcs_head(self, snapshot): try: if isinstance(snapshot, dict): branches = snapshot['branches'] if branches[b'HEAD']['target_type'] == 'revision': return branches[b'HEAD']['target'] except KeyError: return None _try_get_hg_head = _try_get_git_head = _try_get_vcs_head # Tarballs _archive_filename_re = re.compile( rb'^' rb'(?P.*)[-_]' rb'(?P[0-9]+(\.[0-9])*)' rb'(?P[-+][a-zA-Z0-9.~]+?)?' rb'(?P(\.[a-zA-Z0-9]+)+)' rb'$') @classmethod def _parse_version(cls, filename): """Extracts the release version from an archive filename, to get an ordering whose maximum is likely to be the last version of the software >>> OriginHeadIndexer._parse_version(b'foo') (-inf,) >>> OriginHeadIndexer._parse_version(b'foo.tar.gz') (-inf,) >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1.tar.gz') (0, 0, 1, 0) >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') (0, 0, 1, -1, 'beta2') >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') (0, 0, 1, 1, 'foobar') """ res = cls._archive_filename_re.match(filename) if res is None: return (float('-infinity'),) version = [int(n) for n in res.group('version').decode().split('.')] if res.group('preversion') is None: version.append(0) else: preversion = res.group('preversion').decode() if preversion.startswith('-'): version.append(-1) version.append(preversion[1:]) elif preversion.startswith('+'): version.append(1) version.append(preversion[1:]) else: assert False, res.group('preversion') return tuple(version) def _try_get_ftp_head(self, snapshot): archive_names = list(snapshot['branches']) max_archive_name = max(archive_names, key=self._parse_version) r = self._try_resolve_target(snapshot['branches'], max_archive_name) return r # Generic def _try_get_head_generic(self, snapshot): # Works on 'deposit', 'svn', and 'pypi'. try: if isinstance(snapshot, dict): branches = snapshot['branches'] except KeyError: return None else: return ( self._try_resolve_target(branches, b'HEAD') or self._try_resolve_target(branches, b'master') ) def _try_resolve_target(self, branches, target_name): try: target = branches[target_name] while target['target_type'] == 'alias': target = branches[target['target']] if target['target_type'] == 'revision': return target['target'] elif target['target_type'] == 'content': return None # TODO elif target['target_type'] == 'directory': return None # TODO elif target['target_type'] == 'release': return None # TODO else: assert False except KeyError: return None @click.command() @click.option('--origins', '-i', help='Origins to lookup, in the "type+url" format', multiple=True) def main(origins): rev_metadata_indexer = OriginHeadIndexer() rev_metadata_indexer.run(origins, 'update-dups', parse_ids=True) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py index 2c8d4a4..6a6ef84 100644 --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,107 +1,85 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.scheduler.task import Task as SchedulerTask -from .orchestrator import OrchestratorAllContentsIndexer -from .orchestrator import OrchestratorTextContentsIndexer from .mimetype import ContentMimetypeIndexer from .language import ContentLanguageIndexer from .ctags import CtagsIndexer from .fossology_license import ContentFossologyLicenseIndexer from .rehash import RecomputeChecksums from .metadata import RevisionMetadataIndexer, OriginMetadataIndexer logging.basicConfig(level=logging.INFO) class Task(SchedulerTask): def run_task(self, *args, **kwargs): indexer = self.Indexer().run(*args, **kwargs) if hasattr(indexer, 'results'): # indexer tasks return indexer.results return indexer -class OrchestratorAllContents(Task): - """Main task in charge of reading batch contents (of any type) and - broadcasting them back to other tasks. - - """ - task_queue = 'swh_indexer_orchestrator_content_all' - - Indexer = OrchestratorAllContentsIndexer - - -class OrchestratorTextContents(Task): - """Main task in charge of reading batch contents (of type text) and - broadcasting them back to other tasks. - - """ - task_queue = 'swh_indexer_orchestrator_content_text' - - Indexer = OrchestratorTextContentsIndexer - - class RevisionMetadata(Task): task_queue = 'swh_indexer_revision_metadata' serializer = 'msgpack' Indexer = RevisionMetadataIndexer class OriginMetadata(Task): task_queue = 'swh_indexer_origin_intrinsic_metadata' Indexer = OriginMetadataIndexer class ContentMimetype(Task): """Task which computes the mimetype, encoding from the sha1's content. """ task_queue = 'swh_indexer_content_mimetype' Indexer = ContentMimetypeIndexer class ContentLanguage(Task): """Task which computes the language from the sha1's content. """ task_queue = 'swh_indexer_content_language' def run_task(self, *args, **kwargs): ContentLanguageIndexer().run(*args, **kwargs) class Ctags(Task): """Task which computes ctags from the sha1's content. """ task_queue = 'swh_indexer_content_ctags' Indexer = CtagsIndexer class ContentFossologyLicense(Task): """Task which computes licenses from the sha1's content. """ task_queue = 'swh_indexer_content_fossology_license' Indexer = ContentFossologyLicenseIndexer class RecomputeChecksums(Task): """Task which recomputes hashes and possibly new ones. """ task_queue = 'swh_indexer_content_rehash' Indexer = RecomputeChecksums diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py deleted file mode 100644 index fa99c7d..0000000 --- a/swh/indexer/tests/test_orchestrator.py +++ /dev/null @@ -1,210 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest - -import celery - -from swh.indexer.orchestrator import BaseOrchestratorIndexer -from swh.indexer.indexer import BaseIndexer -from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage -from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture - - -class BaseTestIndexer(BaseIndexer): - ADDITIONAL_CONFIG = { - 'tools': ('dict', { - 'name': 'foo', - 'version': 'bar', - 'configuration': {} - }), - } - - def prepare(self): - self.idx_storage = MockIndexerStorage() - self.storage = MockStorage() - - def check(self): - pass - - def filter(self, ids): - self.filtered.append(ids) - return ids - - def run(self, ids, policy_update): - return self.index(ids) - - def index(self, ids): - self.indexed.append(ids) - return [id_ + '_indexed_by_' + self.__class__.__name__ - for id_ in ids] - - def persist_index_computations(self, result, policy_update): - self.persisted = result - - -class Indexer1(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '1' in id_]) - - -class Indexer2(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '2' in id_]) - - -class Indexer3(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '3' in id_]) - - -@celery.task -def indexer1_task(*args, **kwargs): - return Indexer1().run(*args, **kwargs) - - -@celery.task -def indexer2_task(*args, **kwargs): - return Indexer2().run(*args, **kwargs) - - -@celery.task -def indexer3_task(self, *args, **kwargs): - return Indexer3().run(*args, **kwargs) - - -class BaseTestOrchestrator12(BaseOrchestratorIndexer): - TASK_NAMES = { - 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', - 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', - 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', - } - - INDEXER_CLASSES = { - 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', - 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', - 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', - } - - def __init__(self): - super().__init__() - self.running_tasks = [] - - def parse_config_file(self): - return { - 'scheduler': { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:9999', - }, - }, - 'indexers': { - 'indexer1': { - 'batch_size': 2, - 'check_presence': True, - }, - 'indexer2': { - 'batch_size': 2, - 'check_presence': True, - }, - } - } - - -class MockedTestOrchestrator12(BaseTestOrchestrator12): - def __init__(self): - super().__init__() - self.created_tasks = [] - - def _create_tasks(self, celery_tasks): - self.created_tasks.extend(celery_tasks) - - def prepare_scheduler(self): - pass - - -class OrchestratorTest(SchedulerTestFixture, unittest.TestCase): - def setUp(self): - super().setUp() - self.add_scheduler_task_type( - 'indexer1', - 'swh.indexer.tests.test_orchestrator.indexer1_task') - self.add_scheduler_task_type( - 'indexer2', - 'swh.indexer.tests.test_orchestrator.indexer2_task') - - def test_orchestrator_filter(self): - o = BaseTestOrchestrator12() - o.scheduler = self.scheduler - o.run(['id12', 'id2']) - self.assertEqual(Indexer2.indexed, []) - self.assertEqual(Indexer1.indexed, []) - self.run_ready_tasks() - self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) - self.assertEqual(Indexer1.indexed, [['id12']]) - - -class MockedOrchestratorTest(unittest.TestCase): - maxDiff = None - - def test_mocked_orchestrator_filter(self): - o = MockedTestOrchestrator12() - o.run(['id12', 'id2']) - for task in o.created_tasks: - del task['next_run'] # not worth the trouble testing it properly - self.assertCountEqual(o.created_tasks, [ - {'type': 'indexer1', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12', 'id2'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - ]) - - def test_mocked_orchestrator_batch(self): - o = MockedTestOrchestrator12() - o.run(['id12', 'id2a', 'id2b', 'id2c']) - for task in o.created_tasks: - del task['next_run'] # not worth the trouble testing it properly - self.assertCountEqual(o.created_tasks, [ - {'type': 'indexer1', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12', 'id2a'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id2b', 'id2c'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - ])