Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/test_orchestrator.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import unittest | import unittest | ||||
import celery | |||||
from swh.indexer.orchestrator import BaseOrchestratorIndexer | from swh.indexer.orchestrator import BaseOrchestratorIndexer | ||||
from swh.indexer.indexer import BaseIndexer | from swh.indexer.indexer import BaseIndexer | ||||
from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage | from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage | ||||
from swh.scheduler.task import Task | from swh.scheduler.task import Task | ||||
from swh.scheduler.tests.celery_testing import CeleryTestFixture | from swh.scheduler.tests.celery_testing import CeleryTestFixture | ||||
from . import start_worker_thread | from . import start_worker_thread | ||||
class BaseTestIndexer(BaseIndexer): | class BaseTestIndexer(BaseIndexer): | ||||
ADDITIONAL_CONFIG = { | ADDITIONAL_CONFIG = { | ||||
'tools': ('dict', { | 'tools': ('dict', { | ||||
'name': 'foo', | 'name': 'foo', | ||||
'version': 'bar', | 'version': 'bar', | ||||
'configuration': {} | 'configuration': {} | ||||
}), | }), | ||||
} | } | ||||
Show All 40 Lines | |||||
class Indexer3(BaseTestIndexer): | class Indexer3(BaseTestIndexer): | ||||
filtered = [] | filtered = [] | ||||
indexed = [] | indexed = [] | ||||
def filter(self, ids): | def filter(self, ids): | ||||
return super().filter([id_ for id_ in ids if '3' in id_]) | return super().filter([id_ for id_ in ids if '3' in id_]) | ||||
class Indexer1Task(Task): | @celery.task | ||||
def run(self, *args, **kwargs): | def indexer1_task(*args, **kwargs): | ||||
return Indexer1().run(*args, **kwargs) | return Indexer1().run(*args, **kwargs) | ||||
class Indexer2Task(Task): | @celery.task | ||||
def run(self, *args, **kwargs): | def indexer2_task(*args, **kwargs): | ||||
return Indexer2().run(*args, **kwargs) | return Indexer2().run(*args, **kwargs) | ||||
class Indexer3Task(Task): | @celery.task | ||||
def run(self, *args, **kwargs): | def indexer3_task(self, *args, **kwargs): | ||||
return Indexer3().run(*args, **kwargs) | return Indexer3().run(*args, **kwargs) | ||||
class TestOrchestrator12(BaseOrchestratorIndexer): | class TestOrchestrator12(BaseOrchestratorIndexer): | ||||
TASK_NAMES = { | TASK_NAMES = { | ||||
'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1Task', | 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', | ||||
'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2Task', | 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', | ||||
'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3Task', | 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', | ||||
} | } | ||||
INDEXER_CLASSES = { | INDEXER_CLASSES = { | ||||
'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', | 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', | ||||
'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', | 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', | ||||
'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', | 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', | ||||
} | } | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | |||||
class MockedOrchestratorTest(unittest.TestCase): | class MockedOrchestratorTest(unittest.TestCase): | ||||
maxDiff = None | maxDiff = None | ||||
def test_mocked_orchestrator_filter(self): | def test_mocked_orchestrator_filter(self): | ||||
o = MockedTestOrchestrator12() | o = MockedTestOrchestrator12() | ||||
o.prepare() | o.prepare() | ||||
o.run(['id12', 'id2']) | o.run(['id12', 'id2']) | ||||
self.assertCountEqual(o.running_tasks, [ | self.assertCountEqual(o.running_tasks, [ | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id12'], | 'kwargs': {'ids': ['id12'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer1Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id12', 'id2'], | 'kwargs': {'ids': ['id12', 'id2'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, | ||||
]) | ]) | ||||
def test_mocked_orchestrator_batch(self): | def test_mocked_orchestrator_batch(self): | ||||
o = MockedTestOrchestrator12() | o = MockedTestOrchestrator12() | ||||
o.prepare() | o.prepare() | ||||
o.run(['id12', 'id2a', 'id2b', 'id2c']) | o.run(['id12', 'id2a', 'id2b', 'id2c']) | ||||
self.assertCountEqual(o.running_tasks, [ | self.assertCountEqual(o.running_tasks, [ | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id12'], | 'kwargs': {'ids': ['id12'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer1Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id12', 'id2a'], | 'kwargs': {'ids': ['id12', 'id2a'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id2b', 'id2c'], | 'kwargs': {'ids': ['id2b', 'id2c'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, | ||||
]) | ]) |