diff --git a/docs/dev-info.rst b/docs/dev-info.rst --- a/docs/dev-info.rst +++ b/docs/dev-info.rst @@ -72,7 +72,7 @@ slicing: 0:1/1:5 root: /home/storage/swh-storage/ - destination_queue: swh.indexer.tasks.SWHOrchestratorTextContentsTask + destination_task: swh.indexer.tasks.SWHOrchestratorTextContentsTask rescheduling_task: swh.indexer.tasks.SWHContentMimetypeTask diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -40,7 +40,7 @@ """ ADDITIONAL_CONFIG = { - 'destination_queue': ('str', None), + 'destination_task': ('str', None), 'tools': ('dict', { 'name': 'file', 'version': '1:5.30-1+deb9u1', @@ -55,11 +55,11 @@ def prepare(self): super().prepare() - destination_queue = self.config.get('destination_queue') - if destination_queue: - self.task_destination = utils.get_task(destination_queue) + destination_task = self.config.get('destination_task') + if destination_task: + self.destination_task = utils.get_task(destination_task) else: - self.task_destination = None + self.destination_task = None self.tool = self.tools[0] def filter(self, ids): @@ -141,8 +141,8 @@ - encoding (bytes): encoding in bytes """ - if self.task_destination: - self.task_destination.delay(list(self._filter_text(results))) + if self.destination_task: + self.destination_task.delay(list(self._filter_text(results))) @click.command() diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -93,6 +93,7 @@ self.tasks = tasks def run(self, ids): + all_tasks = [] for name, (idx_class, filtering, batch_size) in self.indexers.items(): if filtering: policy_update = 'ignore-dups' @@ -111,10 +112,12 @@ policy_update=policy_update) celery_tasks.append(celery_task) - self._run_tasks(celery_tasks) + all_tasks.append(self._run_tasks(celery_tasks)) + + return all_tasks def _run_tasks(self, celery_tasks): - group(celery_tasks).delay() + return group(celery_tasks).delay() class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): diff --git a/swh/indexer/tests/storage/test_converters.py b/swh/indexer/tests/storage/test_converters.py --- a/swh/indexer/tests/storage/test_converters.py +++ b/swh/indexer/tests/storage/test_converters.py @@ -17,7 +17,7 @@ self.maxDiff = None @istest - def ctags_to_db(self): + def test_ctags_to_db(self): input_ctag = { 'id': b'some-id', 'indexer_configuration_id': 100, @@ -60,7 +60,7 @@ self.assertEquals(actual_ctags, expected_ctags) @istest - def db_to_ctags(self): + def test_db_to_ctags(self): input_ctags = { 'id': b'some-id', 'name': 'some-name', @@ -93,7 +93,7 @@ self.assertEquals(actual_ctags, expected_ctags) @istest - def db_to_mimetype(self): + def test_db_to_mimetype(self): input_mimetype = { 'id': b'some-id', 'tool_id': 10, @@ -121,7 +121,7 @@ self.assertEquals(actual_mimetype, expected_mimetype) @istest - def db_to_language(self): + def test_db_to_language(self): input_language = { 'id': b'some-id', 'tool_id': 20, @@ -147,7 +147,7 @@ self.assertEquals(actual_language, expected_language) @istest - def db_to_fossology_license(self): + def test_db_to_fossology_license(self): input_license = { 'id': b'some-id', 'tool_id': 20, @@ -172,7 +172,7 @@ self.assertEquals(actual_license, expected_license) @istest - def db_to_metadata(self): + def test_db_to_metadata(self): input_metadata = { 'id': b'some-id', 'tool_id': 20, diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -108,12 +108,12 @@ """ @istest - def check_config(self): + def test_check_config(self): self.assertTrue(self.storage.check_config(check_write=True)) self.assertTrue(self.storage.check_config(check_write=False)) @istest - def content_mimetype_missing(self): + def test_content_mimetype_missing(self): # given tool_id = self.tools['file']['id'] @@ -151,7 +151,7 @@ self.assertEqual(list(actual_missing), [self.sha1_1]) @istest - def content_mimetype_add__drop_duplicate(self): + def test_content_mimetype_add__drop_duplicate(self): # given tool_id = self.tools['file']['id'] @@ -194,7 +194,7 @@ self.assertEqual(actual_mimetypes, expected_mimetypes_v1) @istest - def content_mimetype_add__update_in_place_duplicate(self): + def test_content_mimetype_add__update_in_place_duplicate(self): # given tool_id = self.tools['file']['id'] @@ -250,7 +250,7 @@ self.assertEqual(actual_mimetypes, expected_mimetypes_v2) @istest - def content_mimetype_get(self): + def test_content_mimetype_get(self): # given tool_id = self.tools['file']['id'] @@ -280,7 +280,7 @@ self.assertEqual(actual_mimetypes, expected_mimetypes) @istest - def content_language_missing(self): + def test_content_language_missing(self): # given tool_id = self.tools['pygments']['id'] @@ -318,7 +318,7 @@ self.assertEqual(actual_missing, [self.sha1_1]) @istest - def content_language_get(self): + def test_content_language_get(self): # given tool_id = self.tools['pygments']['id'] @@ -345,7 +345,7 @@ self.assertEqual(actual_languages, expected_languages) @istest - def content_language_add__drop_duplicate(self): + def test_content_language_add__drop_duplicate(self): # given tool_id = self.tools['pygments']['id'] @@ -385,7 +385,7 @@ self.assertEqual(actual_languages, expected_languages_v1) @istest - def content_language_add__update_in_place_duplicate(self): + def test_content_language_add__update_in_place_duplicate(self): # given tool_id = self.tools['pygments']['id'] @@ -432,7 +432,7 @@ self.assertEqual(actual_languages, expected_languages_v2) @istest - def content_ctags_missing(self): + def test_content_ctags_missing(self): # given tool_id = self.tools['universal-ctags']['id'] @@ -477,7 +477,7 @@ self.assertEqual(list(actual_missing), [self.sha1_1]) @istest - def content_ctags_get(self): + def test_content_ctags_get(self): # given tool_id = self.tools['universal-ctags']['id'] @@ -531,7 +531,7 @@ self.assertEqual(actual_ctags, expected_ctags) @istest - def content_ctags_search(self): + def test_content_ctags_search(self): # 1. given tool = self.tools['universal-ctags'] tool_id = tool['id'] @@ -641,13 +641,13 @@ }]) @istest - def content_ctags_search_no_result(self): + def test_content_ctags_search_no_result(self): actual_ctags = list(self.storage.content_ctags_search('counter')) self.assertEquals(actual_ctags, []) @istest - def content_ctags_add__add_new_ctags_added(self): + def test_content_ctags_add__add_new_ctags_added(self): # given tool = self.tools['universal-ctags'] tool_id = tool['id'] @@ -722,7 +722,7 @@ self.assertEqual(actual_ctags, expected_ctags) @istest - def content_ctags_add__update_in_place(self): + def test_content_ctags_add__update_in_place(self): # given tool = self.tools['universal-ctags'] tool_id = tool['id'] @@ -804,7 +804,7 @@ self.assertEqual(actual_ctags, expected_ctags) @istest - def content_fossology_license_get(self): + def test_content_fossology_license_get(self): # given tool = self.tools['nomos'] tool_id = tool['id'] @@ -833,7 +833,7 @@ self.assertEqual(actual_licenses, [expected_license]) @istest - def content_fossology_license_add__new_license_added(self): + def test_content_fossology_license_add__new_license_added(self): # given tool = self.tools['nomos'] tool_id = tool['id'] @@ -884,7 +884,7 @@ self.assertEqual(actual_licenses, [expected_license]) @istest - def content_fossology_license_add__update_in_place_duplicate(self): + def test_content_fossology_license_add__update_in_place_duplicate(self): # given tool = self.tools['nomos'] tool_id = tool['id'] @@ -935,7 +935,7 @@ self.assertEqual(actual_licenses, [expected_license]) @istest - def content_metadata_missing(self): + def test_content_metadata_missing(self): # given tool_id = self.tools['swh-metadata-translator']['id'] @@ -982,7 +982,7 @@ self.assertEqual(actual_missing, [self.sha1_1]) @istest - def content_metadata_get(self): + def test_content_metadata_get(self): # given tool_id = self.tools['swh-metadata-translator']['id'] @@ -1026,7 +1026,7 @@ self.assertEqual(actual_metadata, expected_metadata) @istest - def content_metadata_add_drop_duplicate(self): + def test_content_metadata_add_drop_duplicate(self): # given tool_id = self.tools['swh-metadata-translator']['id'] @@ -1079,7 +1079,7 @@ self.assertEqual(actual_metadata, expected_metadata_v1) @istest - def content_metadata_add_update_in_place_duplicate(self): + def test_content_metadata_add_update_in_place_duplicate(self): # given tool_id = self.tools['swh-metadata-translator']['id'] @@ -1141,7 +1141,7 @@ self.assertEqual(actual_metadata, expected_metadata_v2) @istest - def revision_metadata_missing(self): + def test_revision_metadata_missing(self): # given tool_id = self.tools['swh-metadata-detector']['id'] @@ -1198,7 +1198,7 @@ self.assertEqual(actual_missing, [self.revision_id_2]) @istest - def revision_metadata_get(self): + def test_revision_metadata_get(self): # given tool_id = self.tools['swh-metadata-detector']['id'] @@ -1241,7 +1241,7 @@ self.assertEqual(actual_metadata, expected_metadata) @istest - def revision_metadata_add_drop_duplicate(self): + def test_revision_metadata_add_drop_duplicate(self): # given tool_id = self.tools['swh-metadata-detector']['id'] @@ -1302,7 +1302,7 @@ self.assertEqual(actual_metadata, expected_metadata_v1) @istest - def revision_metadata_add_update_in_place_duplicate(self): + def test_revision_metadata_add_update_in_place_duplicate(self): # given tool_id = self.tools['swh-metadata-detector']['id'] @@ -1368,7 +1368,7 @@ self.assertEqual(actual_metadata, expected_metadata_v2) @istest - def indexer_configuration_add(self): + def test_indexer_configuration_add(self): tool = { 'tool_name': 'some-unknown-tool', 'tool_version': 'some-version', @@ -1396,7 +1396,7 @@ self.assertEqual(actual_tool, actual_tool2) @istest - def indexer_configuration_add_multiple(self): + def test_indexer_configuration_add_multiple(self): tool = { 'tool_name': 'some-unknown-tool', 'tool_version': 'some-version', @@ -1422,7 +1422,7 @@ self.assertIn(tool, new_tools) @istest - def indexer_configuration_get_missing(self): + def test_indexer_configuration_get_missing(self): tool = { 'tool_name': 'unknown-tool', 'tool_version': '3.1.0rc2-31-ga2cbb8c', @@ -1434,7 +1434,7 @@ self.assertIsNone(actual_tool) @istest - def indexer_configuration_get(self): + def test_indexer_configuration_get(self): tool = { 'tool_name': 'nomos', 'tool_version': '3.1.0rc2-31-ga2cbb8c', @@ -1449,7 +1449,7 @@ self.assertEqual(expected_tool, actual_tool) @istest - def indexer_configuration_metadata_get_missing_context(self): + def test_indexer_configuration_metadata_get_missing_context(self): tool = { 'tool_name': 'swh-metadata-translator', 'tool_version': '0.0.1', @@ -1461,7 +1461,7 @@ self.assertIsNone(actual_tool) @istest - def indexer_configuration_metadata_get(self): + def test_indexer_configuration_metadata_get(self): tool = { 'tool_name': 'swh-metadata-translator', 'tool_version': '0.0.1', diff --git a/swh/indexer/tests/test_language.py b/swh/indexer/tests/test_language.py --- a/swh/indexer/tests/test_language.py +++ b/swh/indexer/tests/test_language.py @@ -30,7 +30,7 @@ """ def prepare(self): self.config = { - 'destination_queue': None, + 'destination_task': None, 'rescheduling_task': None, 'tools': { 'name': 'pygments', @@ -45,7 +45,7 @@ self.idx_storage = _MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() - self.task_destination = None + self.destination_task = None self.rescheduling_task = self.config['rescheduling_task'] self.tool_config = self.config['tools']['configuration'] self.max_content_size = self.tool_config['max_content_size'] @@ -61,7 +61,7 @@ self.maxDiff = None @istest - def test_compute_language_none(self): + def test_test_compute_language_none(self): # given self.content = "" self.declared_language = { @@ -73,7 +73,7 @@ self.assertEqual(self.declared_language, result) @istest - def test_index_content_language_python(self): + def test_test_index_content_language_python(self): # given # testing python sha1s = ['02fb2c89e14f7fab46701478c83779c7beb7b069'] @@ -92,7 +92,7 @@ self.assertEqual(expected_results, results) @istest - def test_index_content_language_c(self): + def test_test_index_content_language_c(self): # given # testing c sha1s = ['103bc087db1d26afc3a0283f38663d081e9b01e6'] diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -27,7 +27,7 @@ self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() - self.task_destination = None + self.destination_task = None self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] @@ -63,7 +63,7 @@ self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() - self.task_destination = None + self.destination_task = None self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] @@ -89,7 +89,7 @@ } @istest - def test_compute_metadata_none(self): + def test_test_compute_metadata_none(self): """ testing content empty content is empty should return None @@ -106,7 +106,7 @@ self.assertEqual(declared_metadata, result) @istest - def test_compute_metadata_npm(self): + def test_test_compute_metadata_npm(self): """ testing only computation of metadata with hard_mapping_npm """ @@ -139,7 +139,7 @@ self.assertEqual(declared_metadata, result) @istest - def test_extract_minimal_metadata_dict(self): + def test_test_extract_minimal_metadata_dict(self): """ Test the creation of a coherent minimal metadata set """ @@ -197,7 +197,7 @@ self.assertEqual(expected_results, results) @istest - def test_index_content_metadata_npm(self): + def test_test_index_content_metadata_npm(self): """ testing NPM with package.json - one sha1 uses a file that can't be translated to metadata and @@ -281,7 +281,7 @@ self.assertEqual(expected_results, results) @istest - def test_detect_metadata_package_json(self): + def test_test_detect_metadata_package_json(self): # given df = [{ 'sha1_git': b'abc', @@ -317,7 +317,7 @@ self.assertEqual(expected_results, results) @istest - def test_revision_metadata_indexer(self): + def test_test_revision_metadata_indexer(self): metadata_indexer = TestRevisionMetadataIndexer() sha1_gits = [ diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py --- a/swh/indexer/tests/test_mimetype.py +++ b/swh/indexer/tests/test_mimetype.py @@ -33,7 +33,7 @@ """ def prepare(self): self.config = { - 'destination_queue': None, + 'destination_task': None, 'rescheduling_task': None, 'tools': { 'name': 'file', @@ -47,9 +47,9 @@ self.idx_storage = _MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() - self.task_destination = None + self.destination_task = None self.rescheduling_task = self.config['rescheduling_task'] - self.destination_queue = self.config['destination_queue'] + self.destination_task = self.config['destination_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] @@ -66,7 +66,7 @@ class TestMimetypeIndexerWithErrors(unittest.TestCase): @istest - def wrong_unknown_configuration_tool(self): + def test_wrong_unknown_configuration_tool(self): """Indexer with unknown configuration tool should fail the check""" with self.assertRaisesRegex(ValueError, 'Tools None is unknown'): TestMimetypeIndexerUnknownToolStorage() @@ -77,7 +77,7 @@ self.indexer = TestMimetypeIndexer() @istest - def test_index_no_update(self): + def test_test_index_no_update(self): # given sha1s = [ '01c9379dfc33803963d07c1ccc748d3fe4c96bb5', @@ -104,7 +104,7 @@ self.assertEquals(expected_results, self.indexer.idx_storage.state) @istest - def test_index_update(self): + def test_test_index_update(self): # given sha1s = [ '01c9379dfc33803963d07c1ccc748d3fe4c96bb5', @@ -137,7 +137,7 @@ self.assertEquals(expected_results, self.indexer.idx_storage.state) @istest - def test_index_one_unknown_sha1(self): + def test_test_index_one_unknown_sha1(self): # given sha1s = ['688a5ef812c53907562fe379d4b3851e69c7cb15', '799a5ef812c53907562fe379d4b3851e69c7cb15', # unknown diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py --- a/swh/indexer/tests/test_orchestrator.py +++ b/swh/indexer/tests/test_orchestrator.py @@ -4,11 +4,11 @@ # See top-level LICENSE file for more information import unittest -from nose.tools import istest +import pytest from swh.indexer.orchestrator import BaseOrchestratorIndexer from swh.indexer.indexer import RevisionIndexer -from swh.indexer.tests.test_utils import MockIndexerStorage +from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage from swh.scheduler.task import Task @@ -23,6 +23,7 @@ def prepare(self): self.idx_storage = MockIndexerStorage() + self.storage = MockStorage() def check(self): pass @@ -41,30 +42,39 @@ class Indexer1(BaseTestIndexer): + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '1' in id_]) class Indexer2(BaseTestIndexer): + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '2' in id_]) class Indexer3(BaseTestIndexer): + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '3' in id_]) class Indexer1Task(Task): - pass + def run_task(self, *args, **kwargs): + Indexer1().run(*args, **kwargs) class Indexer2Task(Task): - pass + def run_task(self, *args, **kwargs): + Indexer2().run(*args, **kwargs) class Indexer3Task(Task): - pass + def run_task(self, *args, **kwargs): + Indexer3().run(*args, **kwargs) class TestOrchestrator12(BaseOrchestratorIndexer): @@ -99,16 +109,39 @@ } self.prepare_tasks() - def _run_tasks(self, celery_tasks): + +class MockedTestOrchestrator12(TestOrchestrator12): + def _run_tasks(self, celery_tasks, *, callback): self.running_tasks.extend(celery_tasks) + if callback: + callback() + + +@pytest.fixture(scope='session') +def celery_config(): + return { + 'accept_content': ['application/x-msgpack', 'application/json'], + 'broker_url': 'amqp://', + 'result_backend': 'redis://' + } + + +def test_orchestrator_filter(celery_app, celery_worker): + print(celery_worker.__class__) + o = TestOrchestrator12() + o.prepare() + tasks = o.run(['id12', 'id2']) + for task in tasks: + task.get(timeout=1) + assert Indexer1.indexed == ['id12'] + assert Indexer2.indexed == ['id12', 'id2'] class OrchestratorTest(unittest.TestCase): maxDiff = None - @istest - def orchestrator_filter(self): - o = TestOrchestrator12() + def test_mocked_orchestrator_filter(self): + o = MockedTestOrchestrator12() o.prepare() o.run(['id12', 'id2']) self.assertCountEqual(o.running_tasks, [ @@ -130,9 +163,8 @@ 'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, ]) - @istest - def orchestrator_batch(self): - o = TestOrchestrator12() + def test_mocked_orchestrator_batch(self): + o = MockedTestOrchestrator12() o.prepare() o.run(['id12', 'id2a', 'id2b', 'id2c']) self.assertCountEqual(o.running_tasks, [