Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/test_orchestrator.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import unittest | import unittest | ||||
import celery | |||||
from swh.indexer.orchestrator import BaseOrchestratorIndexer | from swh.indexer.orchestrator import BaseOrchestratorIndexer | ||||
from swh.indexer.indexer import RevisionIndexer | from swh.indexer.indexer import BaseIndexer | ||||
from swh.indexer.tests.test_utils import MockIndexerStorage | from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage | ||||
from swh.scheduler.task import Task | |||||
from . import start_worker_thread | |||||
class BaseTestIndexer(RevisionIndexer): | class BaseTestIndexer(BaseIndexer): | ||||
ADDITIONAL_CONFIG = { | ADDITIONAL_CONFIG = { | ||||
'tools': ('dict', { | 'tools': ('dict', { | ||||
'name': 'foo', | 'name': 'foo', | ||||
'version': 'bar', | 'version': 'bar', | ||||
'configuration': {} | 'configuration': {} | ||||
}), | }), | ||||
} | } | ||||
def prepare(self): | def prepare(self): | ||||
self.idx_storage = MockIndexerStorage() | self.idx_storage = MockIndexerStorage() | ||||
self.storage = MockStorage() | |||||
def check(self): | def check(self): | ||||
pass | pass | ||||
def filter(self, ids): | def filter(self, ids): | ||||
self.filtered = ids | self.filtered.append(ids) | ||||
return ids | return ids | ||||
def run(self, ids, policy_update): | |||||
return self.index(ids) | |||||
def index(self, ids): | def index(self, ids): | ||||
self.indexed = ids | self.indexed.append(ids) | ||||
return [id_ + '_indexed_by_' + self.__class__.__name__ | return [id_ + '_indexed_by_' + self.__class__.__name__ | ||||
for id_ in ids] | for id_ in ids] | ||||
def persist_index_computations(self, result, policy_update): | def persist_index_computations(self, result, policy_update): | ||||
self.persisted = result | self.persisted = result | ||||
class Indexer1(BaseTestIndexer): | class Indexer1(BaseTestIndexer): | ||||
filtered = [] | |||||
indexed = [] | |||||
def filter(self, ids): | def filter(self, ids): | ||||
return super().filter([id_ for id_ in ids if '1' in id_]) | return super().filter([id_ for id_ in ids if '1' in id_]) | ||||
class Indexer2(BaseTestIndexer): | class Indexer2(BaseTestIndexer): | ||||
filtered = [] | |||||
indexed = [] | |||||
def filter(self, ids): | def filter(self, ids): | ||||
return super().filter([id_ for id_ in ids if '2' in id_]) | return super().filter([id_ for id_ in ids if '2' in id_]) | ||||
class Indexer3(BaseTestIndexer): | class Indexer3(BaseTestIndexer): | ||||
filtered = [] | |||||
indexed = [] | |||||
def filter(self, ids): | def filter(self, ids): | ||||
return super().filter([id_ for id_ in ids if '3' in id_]) | return super().filter([id_ for id_ in ids if '3' in id_]) | ||||
class Indexer1Task(Task): | @celery.task | ||||
pass | def indexer1_task(*args, **kwargs): | ||||
return Indexer1().run(*args, **kwargs) | |||||
class Indexer2Task(Task): | @celery.task | ||||
pass | def indexer2_task(*args, **kwargs): | ||||
return Indexer2().run(*args, **kwargs) | |||||
class Indexer3Task(Task): | @celery.task | ||||
pass | def indexer3_task(self, *args, **kwargs): | ||||
return Indexer3().run(*args, **kwargs) | |||||
class TestOrchestrator12(BaseOrchestratorIndexer): | class TestOrchestrator12(BaseOrchestratorIndexer): | ||||
TASK_NAMES = { | TASK_NAMES = { | ||||
'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1Task', | 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', | ||||
'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2Task', | 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', | ||||
'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3Task', | 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', | ||||
} | } | ||||
INDEXER_CLASSES = { | INDEXER_CLASSES = { | ||||
'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', | 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', | ||||
'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', | 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', | ||||
'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', | 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', | ||||
} | } | ||||
Show All 11 Lines | def prepare(self): | ||||
'indexer2': { | 'indexer2': { | ||||
'batch_size': 2, | 'batch_size': 2, | ||||
'check_presence': True, | 'check_presence': True, | ||||
}, | }, | ||||
} | } | ||||
} | } | ||||
self.prepare_tasks() | self.prepare_tasks() | ||||
class MockedTestOrchestrator12(TestOrchestrator12): | |||||
def _run_tasks(self, celery_tasks): | def _run_tasks(self, celery_tasks): | ||||
self.running_tasks.extend(celery_tasks) | self.running_tasks.extend(celery_tasks) | ||||
class OrchestratorTest(unittest.TestCase): | class OrchestratorTest(unittest.TestCase): | ||||
maxDiff = None | |||||
def test_orchestrator_filter(self): | def test_orchestrator_filter(self): | ||||
return | |||||
with start_worker_thread(): | |||||
o = TestOrchestrator12() | o = TestOrchestrator12() | ||||
o.prepare() | o.prepare() | ||||
promises = o.run(['id12', 'id2']) | |||||
results = [] | |||||
for promise in promises: | |||||
results.append(promise.get(timeout=10)) | |||||
self.assertCountEqual( | |||||
results, | |||||
[[['id12_indexed_by_Indexer1']], | |||||
[['id12_indexed_by_Indexer2', | |||||
'id2_indexed_by_Indexer2']]]) | |||||
self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) | |||||
self.assertEqual(Indexer1.indexed, [['id12']]) | |||||
class MockedOrchestratorTest(unittest.TestCase): | |||||
maxDiff = None | |||||
def test_mocked_orchestrator_filter(self): | |||||
o = MockedTestOrchestrator12() | |||||
o.prepare() | |||||
o.run(['id12', 'id2']) | o.run(['id12', 'id2']) | ||||
self.assertCountEqual(o.running_tasks, [ | self.assertCountEqual(o.running_tasks, [ | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id12'], | 'kwargs': {'ids': ['id12'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer1Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id12', 'id2'], | 'kwargs': {'ids': ['id12', 'id2'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, | ||||
]) | ]) | ||||
def test_orchestrator_batch(self): | def test_mocked_orchestrator_batch(self): | ||||
o = TestOrchestrator12() | o = MockedTestOrchestrator12() | ||||
o.prepare() | o.prepare() | ||||
o.run(['id12', 'id2a', 'id2b', 'id2c']) | o.run(['id12', 'id2a', 'id2b', 'id2c']) | ||||
self.assertCountEqual(o.running_tasks, [ | self.assertCountEqual(o.running_tasks, [ | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id12'], | 'kwargs': {'ids': ['id12'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer1Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id12', 'id2a'], | 'kwargs': {'ids': ['id12', 'id2a'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, | ||||
{'args': (), | {'args': (), | ||||
'chord_size': None, | 'chord_size': None, | ||||
'immutable': False, | 'immutable': False, | ||||
'kwargs': {'ids': ['id2b', 'id2c'], | 'kwargs': {'ids': ['id2b', 'id2c'], | ||||
'policy_update': 'ignore-dups'}, | 'policy_update': 'ignore-dups'}, | ||||
'options': {}, | 'options': {}, | ||||
'subtask_type': None, | 'subtask_type': None, | ||||
'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, | 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, | ||||
]) | ]) |