diff --git a/PKG-INFO b/PKG-INFO index 204899c..9cdd3b7 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.39 +Version: 0.0.40 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest -Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler +Project-URL: Funding, https://www.softwareheritage.org/donate Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everything should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 204899c..9cdd3b7 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.39 +Version: 0.0.40 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest -Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler +Project-URL: Funding, https://www.softwareheritage.org/donate Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everything should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index 3d16510..b141a01 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,62 +1,68 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import celery.app.task from celery.utils.log import get_task_logger class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ _log = None def on_failure(self, exc, task_id, args, kwargs, einfo): self.send_event('task-result-exception') def on_success(self, retval, task_id, args, kwargs): self.send_event('task-result', result=retval) @property def log(self): if self._log is None: self._log = get_task_logger(self.name) return self._log + def run(self, *args, **kwargs): + self.log.debug('%s: args=%s, kwargs=%s', self.name, args, kwargs) + ret = super().run(*args, **kwargs) + self.log.debug('%s: OK => %s', self.name, ret) + return ret + class Task(SWHTask): """a schedulable task (abstract class) DEPRECATED! Please use SWHTask as base for decorated functions instead. Sub-classes must implement the run_task() method. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True def run(self, *args, **kwargs): """This method is called by the celery worker when a task is received. Should not be overridden as we need our special events to be sent for the reccurrent scheduler. Override run_task instead.""" return self.run_task(*args, **kwargs) def run_task(self, *args, **kwargs): """Perform the task. Must return a json-serializable value as it is passed back to the task scheduler using a celery event. """ raise NotImplementedError('tasks must implement the run_task() method') diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py index 5702680..b18df46 100644 --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,41 +1,31 @@ -from celery import group +from celery import group, current_app as app -from swh.scheduler.celery_backend.config import app - -@app.task(name='swh.scheduler.tests.tasks.ping', - bind=True) +@app.task(name='swh.scheduler.tests.tasks.ping', bind=True) def ping(self, **kw): # check this is a SWHTask assert hasattr(self, 'log') assert not hasattr(self, 'run_task') assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] self.log.debug(self.name) if kw: return 'OK (kw=%s)' % kw return 'OK' -@app.task(name='swh.scheduler.tests.tasks.multiping', - bind=True) +@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) def multiping(self, n=10): - self.log.debug(self.name) - promise = group(ping.s(i=i) for i in range(n))() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) promise.save() return promise.id -@app.task(name='swh.scheduler.tests.tasks.error', - bind=True) -def not_implemented(self): - self.log.debug(self.name) +@app.task(name='swh.scheduler.tests.tasks.error') +def not_implemented(): raise NotImplementedError('Nope') -@app.task(name='swh.scheduler.tests.tasks.add', - bind=True) -def add(self, x, y): - self.log.debug(self.name) +@app.task(name='swh.scheduler.tests.tasks.add') +def add(x, y): return x + y diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py index accd1a0..6f9e2ab 100644 --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,90 +1,93 @@ from time import sleep from celery.result import GroupResult from celery.result import AsyncResult import pytest from swh.scheduler.utils import create_task_dict from swh.scheduler.celery_backend.runner import run_ready_tasks def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.scheduler.tests.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' def test_multiping(swh_app, celery_session_worker): "Test that a task that spawns subtasks (group) works" res = swh_app.send_task( 'swh.scheduler.tests.tasks.multiping', n=5) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) results = [x.get() for x in promise.results] for i in range(5): assert ("OK (kw={'i': %s})" % i) in results +@pytest.mark.db def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): "Test that the scheduler fixture works properly" task_type = swh_scheduler.get_task_type('swh-test-ping') assert task_type assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.ping' swh_scheduler.create_tasks([create_task_dict( 'swh-test-ping', 'oneshot')]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert backend_tasks for task in backend_tasks: # Make sure the task completed AsyncResult(id=task['backend_id']).get() +@pytest.mark.db def test_task_return_value(swh_app, celery_session_worker, swh_scheduler): task_type = swh_scheduler.get_task_type('swh-test-add') assert task_type assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.add' swh_scheduler.create_tasks([create_task_dict( 'swh-test-add', 'oneshot', 12, 30)]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert len(backend_tasks) == 1 task = backend_tasks[0] value = AsyncResult(id=task['backend_id']).get() assert value == 42 +@pytest.mark.db def test_task_exception(swh_app, celery_session_worker, swh_scheduler): task_type = swh_scheduler.get_task_type('swh-test-error') assert task_type assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.error' swh_scheduler.create_tasks([create_task_dict( 'swh-test-error', 'oneshot')]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert len(backend_tasks) == 1 task = backend_tasks[0] result = AsyncResult(id=task['backend_id']) with pytest.raises(NotImplementedError): result.get() diff --git a/version.txt b/version.txt index e0d62ae..574f7df 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.39-0-gcaaa44b \ No newline at end of file +v0.0.40-0-g53136b7 \ No newline at end of file