Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7342996
D973.id3084.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
D973.id3084.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
swh.core >= 0.0.36
swh.loader.tar >= 0.0.37
swh.loader.core >= 0.0.32
-swh.scheduler >= 0.0.26
+swh.scheduler >= 0.0.39
swh.model >= 0.0.26
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,2 +1,3 @@
-pytest
+pytest<4
pytest-django
+swh.scheduler[testing]
diff --git a/swh/deposit/loader/tasks.py b/swh/deposit/loader/tasks.py
--- a/swh/deposit/loader/tasks.py
+++ b/swh/deposit/loader/tasks.py
@@ -3,12 +3,14 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.scheduler.task import Task
+from celery import current_app as app
+
from swh.deposit.loader.loader import DepositLoader
from swh.deposit.loader.checker import DepositChecker
-class LoadDepositArchiveTsk(Task):
+@app.task(name=__name__ + '.LoadDepositArchiveTsk')
+def load_deposit_archive(archive_url, deposit_meta_url, deposit_update_url):
"""Deposit archive loading task described by the following steps:
1. Retrieve tarball from deposit's private api and store
@@ -19,33 +21,16 @@
deposit's private update status api
"""
- task_queue = 'swh_loader_deposit'
-
- def run_task(self, *, archive_url, deposit_meta_url, deposit_update_url):
- """Import a deposit tarball into swh.
-
- Args: see :func:`DepositLoader.load`.
-
- """
- _loader = DepositLoader()
- _loader.log = self.log
- return _loader.load(archive_url=archive_url,
- deposit_meta_url=deposit_meta_url,
- deposit_update_url=deposit_update_url)
+ return DepositLoader().load(
+ archive_url=archive_url,
+ deposit_meta_url=deposit_meta_url,
+ deposit_update_url=deposit_update_url)
-class ChecksDepositTsk(Task):
- """Deposit checks task.
+@app.task(name=__name__ + '.ChecksDepositTsk')
+def check_deposit(deposit_check_url):
+ """Check a deposit's status
+ Args: see :func:`DepositChecker.check`.
"""
- task_queue = 'swh_checker_deposit'
-
- def run_task(self, deposit_check_url):
- """Check a deposit's status
-
- Args: see :func:`DepositChecker.check`.
-
- """
- _checker = DepositChecker()
- _checker.log = self.log
- return _checker.check(deposit_check_url)
+ return DepositChecker().check(deposit_check_url)
diff --git a/swh/deposit/tests/loader/conftest.py b/swh/deposit/tests/loader/conftest.py
new file mode 100644
--- /dev/null
+++ b/swh/deposit/tests/loader/conftest.py
@@ -0,0 +1,9 @@
+import pytest
+from swh.scheduler.tests.conftest import * # noqa
+
+
+@pytest.fixture(scope='session')
+def celery_includes():
+ return [
+ 'swh.deposit.loader.tasks',
+ ]
diff --git a/swh/deposit/tests/loader/test_tasks.py b/swh/deposit/tests/loader/test_tasks.py
--- a/swh/deposit/tests/loader/test_tasks.py
+++ b/swh/deposit/tests/loader/test_tasks.py
@@ -3,48 +3,39 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import unittest
from unittest.mock import patch
-from swh.deposit.loader.tasks import LoadDepositArchiveTsk, ChecksDepositTsk
+@patch('swh.deposit.loader.loader.DepositLoader.load')
+def test_deposit_load(mock_loader, swh_app, celery_session_worker):
+ mock_loader.return_value = {'status': 'eventful'}
-class TestTasks(unittest.TestCase):
- def test_check_task_name(self):
- task = LoadDepositArchiveTsk()
- self.assertEqual(task.task_queue, 'swh_loader_deposit')
+ res = swh_app.send_task(
+ 'swh.deposit.loader.tasks.LoadDepositArchiveTsk',
+ kwargs=dict(archive_url='archive_url',
+ deposit_meta_url='deposit_meta_url',
+ deposit_update_url='deposit_update_url'))
+ assert res
+ res.wait()
+ assert res.successful()
- @patch('swh.deposit.loader.loader.DepositLoader.load')
- def test_task(self, mock_loader):
- mock_loader.return_value = {'status': 'eventful'}
- task = LoadDepositArchiveTsk()
+ assert res.result == {'status': 'eventful'}
+ mock_loader.assert_called_once_with(
+ archive_url='archive_url',
+ deposit_meta_url='deposit_meta_url',
+ deposit_update_url='deposit_update_url')
- # given
- actual_result = task.run_task(
- archive_url='archive_url',
- deposit_meta_url='deposit_meta_url',
- deposit_update_url='deposit_update_url')
- self.assertEqual(actual_result, {'status': 'eventful'})
+@patch('swh.deposit.loader.checker.DepositChecker.check')
+def test_check_deposit(mock_checker, swh_app, celery_session_worker):
+ mock_checker.return_value = {'status': 'uneventful'}
- mock_loader.assert_called_once_with(
- archive_url='archive_url',
- deposit_meta_url='deposit_meta_url',
- deposit_update_url='deposit_update_url')
+ res = swh_app.send_task(
+ 'swh.deposit.loader.tasks.ChecksDepositTsk',
+ args=('check_deposit_url',))
+ assert res
+ res.wait()
+ assert res.successful()
-
-class TestTasks2(unittest.TestCase):
- def test_check_task_name(self):
- task = ChecksDepositTsk()
- self.assertEqual(task.task_queue, 'swh_checker_deposit')
-
- @patch('swh.deposit.loader.checker.DepositChecker.check')
- def test_task(self, mock_checker):
- mock_checker.return_value = {'status': 'uneventful'}
- task = ChecksDepositTsk()
-
- # given
- actual_result = task.run_task('check_deposit_url')
- self.assertEqual(actual_result, {'status': 'uneventful'})
-
- mock_checker.assert_called_once_with('check_deposit_url')
+ assert res.result == {'status': 'uneventful'}
+ mock_checker.assert_called_once_with('check_deposit_url')
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 6:37 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227356
Attached To
D973: Rewrite celery tasks as a decorated function
Event Timeline
Log In to Comment