diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,5 @@ pytest +pytest-mock pytest-postgresql >= 2.1.0 celery >= 4.3 hypothesis >= 3.11.0 diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -3,6 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from datetime import datetime + from celery import current_app import celery.app.task from celery.utils.log import get_task_logger @@ -10,6 +12,10 @@ from swh.core.statsd import Statsd +def ts(): + return int(datetime.utcnow().timestamp()) + + class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) @@ -34,15 +40,29 @@ }) return self._statsd else: - return Statsd(constant_tags={ + statsd = Statsd(constant_tags={ 'task': self.name, 'worker': 'unknown worker', }) + return statsd def __call__(self, *args, **kwargs): self.statsd.increment('swh_task_called_count') + self.statsd.gauge('swh_task_start_ts', ts()) with self.statsd.timed('swh_task_duration_seconds'): - return super().__call__(*args, **kwargs) + result = super().__call__(*args, **kwargs) + try: + status = result['status'] + if status == 'success': + status = 'eventful' if result.get('eventful') \ + else 'uneventful' + except Exception: + status = 'eventful' if result else 'uneventful' + + self.statsd.gauge( + 'swh_task_end_ts', ts(), + tags={'status': status}) + return result def on_failure(self, exc, task_id, args, kwargs, einfo): self.statsd.increment('swh_task_failure_count') diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -27,7 +27,7 @@ # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py -TASK_NAMES = ['ping', 'multiping', 'add', 'error'] +TASK_NAMES = ['ping', 'multiping', 'add', 'error', 'echo'] @pytest.fixture(scope='session') diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -34,3 +34,9 @@ @shared_task(name='swh.scheduler.tests.tasks.add') def add(x, y): return x + y + + +@shared_task(name='swh.scheduler.tests.tasks.echo') +def echo(**kw): + "Does nothing, just return the given kwargs dict" + return kw diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,4 +1,6 @@ from time import sleep +from itertools import count + from celery.result import GroupResult from celery.result import AsyncResult @@ -17,10 +19,19 @@ assert res.result == 'OK' +def test_ping_with_kw(swh_app, celery_session_worker): + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.ping', kwargs={'a': 1}) + assert res + res.wait() + assert res.successful() + assert res.result == "OK (kw={'a': 1})" + + def test_multiping(swh_app, celery_session_worker): "Test that a task that spawns subtasks (group) works" res = swh_app.send_task( - 'swh.scheduler.tests.tasks.multiping', n=5) + 'swh.scheduler.tests.tasks.multiping', kwargs={'n': 5}) assert res res.wait() @@ -37,6 +48,7 @@ sleep(1) results = [x.get() for x in promise.results] + assert len(results) == 5 for i in range(5): assert ("OK (kw={'i': %s})" % i) in results @@ -91,3 +103,61 @@ result = AsyncResult(id=task['backend_id']) with pytest.raises(NotImplementedError): result.get() + + +def test_statsd(swh_app, celery_session_worker, mocker): + m = mocker.patch('swh.scheduler.task.Statsd._send_to_server') + mocker.patch('swh.scheduler.task.ts', side_effect=count()) + mocker.patch('swh.core.statsd.monotonic', side_effect=count()) + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.echo') + assert res + res.wait() + assert res.successful() + assert res.result == {} + + m.assert_any_call( + 'swh_task_called_count:1|c|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_start_ts:0|g|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_end_ts:1|g|' + '#status:uneventful,task:swh.scheduler.tests.tasks.echo,' + 'worker:unknown worker') + m.assert_any_call( + 'swh_task_duration_seconds:1000|ms|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_success_count:1|c|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + + +def test_statsd_with_status(swh_app, celery_session_worker, mocker): + m = mocker.patch('swh.scheduler.task.Statsd._send_to_server') + mocker.patch('swh.scheduler.task.ts', side_effect=count()) + mocker.patch('swh.core.statsd.monotonic', side_effect=count()) + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.echo', kwargs={'status': 'eventful'}) + assert res + res.wait() + assert res.successful() + assert res.result == {'status': 'eventful'} + + m.assert_any_call( + 'swh_task_called_count:1|c|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_start_ts:0|g|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_end_ts:1|g|' + '#status:eventful,task:swh.scheduler.tests.tasks.echo,' + 'worker:unknown worker') + m.assert_any_call( + 'swh_task_duration_seconds:1000|ms|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_success_count:1|c|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')