Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7342977
D518.id1659.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
D518.id1659.diff
View Options
diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py
--- a/swh/indexer/orchestrator.py
+++ b/swh/indexer/orchestrator.py
@@ -93,6 +93,7 @@
self.tasks = tasks
def run(self, ids):
+ all_results = []
for name, (idx_class, filtering, batch_size) in self.indexers.items():
if filtering:
policy_update = 'ignore-dups'
@@ -111,10 +112,12 @@
policy_update=policy_update)
celery_tasks.append(celery_task)
- self._run_tasks(celery_tasks)
+ all_results.append(self._run_tasks(celery_tasks))
+
+ return all_results
def _run_tasks(self, celery_tasks):
- group(celery_tasks).delay()
+ return group(celery_tasks).delay()
class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer):
diff --git a/swh/indexer/tests/__init__.py b/swh/indexer/tests/__init__.py
--- a/swh/indexer/tests/__init__.py
+++ b/swh/indexer/tests/__init__.py
@@ -1,2 +1,24 @@
+import os
from os import path
+
+from celery import shared_task
+from celery.contrib.testing.worker import _start_worker_thread
+from celery import current_app
+
+__all__ = ['start_worker_thread']
+
DATA_DIR = path.join(path.dirname(__file__), 'data')
+
+os.environ['CELERY_BROKER_URL'] = 'memory://'
+os.environ['CELERY_RESULT_BACKEND'] = 'cache+memory://'
+
+
+def start_worker_thread():
+ return _start_worker_thread(current_app)
+
+
+# Needed to pass an assertion, see
+# https://github.com/celery/celery/pull/5111
+@shared_task(name='celery.ping')
+def ping():
+ return 'pong'
diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py
--- a/swh/indexer/tests/test_orchestrator.py
+++ b/swh/indexer/tests/test_orchestrator.py
@@ -6,12 +6,14 @@
import unittest
from swh.indexer.orchestrator import BaseOrchestratorIndexer
-from swh.indexer.indexer import RevisionIndexer
-from swh.indexer.tests.test_utils import MockIndexerStorage
+from swh.indexer.indexer import BaseIndexer
+from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage
from swh.scheduler.task import Task
+from . import start_worker_thread
-class BaseTestIndexer(RevisionIndexer):
+
+class BaseTestIndexer(BaseIndexer):
ADDITIONAL_CONFIG = {
'tools': ('dict', {
'name': 'foo',
@@ -22,16 +24,20 @@
def prepare(self):
self.idx_storage = MockIndexerStorage()
+ self.storage = MockStorage()
def check(self):
pass
def filter(self, ids):
- self.filtered = ids
+ self.filtered.append(ids)
return ids
+ def run(self, ids, policy_update):
+ return self.index(ids)
+
def index(self, ids):
- self.indexed = ids
+ self.indexed.append(ids)
return [id_ + '_indexed_by_' + self.__class__.__name__
for id_ in ids]
@@ -40,30 +46,42 @@
class Indexer1(BaseTestIndexer):
+ filtered = []
+ indexed = []
+
def filter(self, ids):
return super().filter([id_ for id_ in ids if '1' in id_])
class Indexer2(BaseTestIndexer):
+ filtered = []
+ indexed = []
+
def filter(self, ids):
return super().filter([id_ for id_ in ids if '2' in id_])
class Indexer3(BaseTestIndexer):
+ filtered = []
+ indexed = []
+
def filter(self, ids):
return super().filter([id_ for id_ in ids if '3' in id_])
class Indexer1Task(Task):
- pass
+ def run(self, *args, **kwargs):
+ return Indexer1().run(*args, **kwargs)
class Indexer2Task(Task):
- pass
+ def run(self, *args, **kwargs):
+ return Indexer2().run(*args, **kwargs)
class Indexer3Task(Task):
- pass
+ def run(self, *args, **kwargs):
+ return Indexer3().run(*args, **kwargs)
class TestOrchestrator12(BaseOrchestratorIndexer):
@@ -98,15 +116,35 @@
}
self.prepare_tasks()
+
+class MockedTestOrchestrator12(TestOrchestrator12):
def _run_tasks(self, celery_tasks):
self.running_tasks.extend(celery_tasks)
class OrchestratorTest(unittest.TestCase):
+ def test_orchestrator_filter(self):
+ with start_worker_thread():
+ o = TestOrchestrator12()
+ o.prepare()
+ promises = o.run(['id12', 'id2'])
+ results = []
+ for promise in promises:
+ results.append(promise.get(timeout=10))
+ self.assertCountEqual(
+ results,
+ [[['id12_indexed_by_Indexer1']],
+ [['id12_indexed_by_Indexer2',
+ 'id2_indexed_by_Indexer2']]])
+ self.assertEqual(Indexer2.indexed, [['id12', 'id2']])
+ self.assertEqual(Indexer1.indexed, [['id12']])
+
+
+class MockedOrchestratorTest(unittest.TestCase):
maxDiff = None
- def test_orchestrator_filter(self):
- o = TestOrchestrator12()
+ def test_mocked_orchestrator_filter(self):
+ o = MockedTestOrchestrator12()
o.prepare()
o.run(['id12', 'id2'])
self.assertCountEqual(o.running_tasks, [
@@ -128,8 +166,8 @@
'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'},
])
- def test_orchestrator_batch(self):
- o = TestOrchestrator12()
+ def test_mocked_orchestrator_batch(self):
+ o = MockedTestOrchestrator12()
o.prepare()
o.run(['id12', 'id2a', 'id2b', 'id2c'])
self.assertCountEqual(o.running_tasks, [
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 6:27 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218201
Attached To
D518: Add tests using Celery.
Event Timeline
Log In to Comment