Page MenuHomeSoftware Heritage

D2393.id8425.diff
No OneTemporary

D2393.id8425.diff

diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,5 @@
pytest
+pytest-mock
pytest-postgresql >= 2.1.0
celery >= 4.3
hypothesis >= 3.11.0
diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py
--- a/swh/scheduler/task.py
+++ b/swh/scheduler/task.py
@@ -3,6 +3,8 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from datetime import datetime
+
from celery import current_app
import celery.app.task
from celery.utils.log import get_task_logger
@@ -10,6 +12,10 @@
from swh.core.statsd import Statsd
+def ts():
+ return int(datetime.utcnow().timestamp())
+
+
class SWHTask(celery.app.task.Task):
"""a schedulable task (abstract class)
@@ -34,15 +40,29 @@
})
return self._statsd
else:
- return Statsd(constant_tags={
+ statsd = Statsd(constant_tags={
'task': self.name,
'worker': 'unknown worker',
})
+ return statsd
def __call__(self, *args, **kwargs):
self.statsd.increment('swh_task_called_count')
+ self.statsd.gauge('swh_task_start_ts', ts())
with self.statsd.timed('swh_task_duration_seconds'):
- return super().__call__(*args, **kwargs)
+ result = super().__call__(*args, **kwargs)
+ try:
+ status = result['status']
+ if status == 'success':
+ status = 'eventful' if result.get('eventful') \
+ else 'uneventful'
+ except Exception:
+ status = 'eventful' if result else 'uneventful'
+
+ self.statsd.gauge(
+ 'swh_task_end_ts', ts(),
+ tags={'status': status})
+ return result
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.statsd.increment('swh_task_failure_count')
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -27,7 +27,7 @@
# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
-TASK_NAMES = ['ping', 'multiping', 'add', 'error']
+TASK_NAMES = ['ping', 'multiping', 'add', 'error', 'echo']
@pytest.fixture(scope='session')
diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py
--- a/swh/scheduler/tests/tasks.py
+++ b/swh/scheduler/tests/tasks.py
@@ -34,3 +34,9 @@
@shared_task(name='swh.scheduler.tests.tasks.add')
def add(x, y):
return x + y
+
+
+@shared_task(name='swh.scheduler.tests.tasks.echo')
+def echo(**kw):
+ "Does nothing, just return the given kwargs dict"
+ return kw
diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py
--- a/swh/scheduler/tests/test_celery_tasks.py
+++ b/swh/scheduler/tests/test_celery_tasks.py
@@ -1,4 +1,6 @@
from time import sleep
+from itertools import count
+
from celery.result import GroupResult
from celery.result import AsyncResult
@@ -17,10 +19,19 @@
assert res.result == 'OK'
+def test_ping_with_kw(swh_app, celery_session_worker):
+ res = swh_app.send_task(
+ 'swh.scheduler.tests.tasks.ping', kwargs={'a': 1})
+ assert res
+ res.wait()
+ assert res.successful()
+ assert res.result == "OK (kw={'a': 1})"
+
+
def test_multiping(swh_app, celery_session_worker):
"Test that a task that spawns subtasks (group) works"
res = swh_app.send_task(
- 'swh.scheduler.tests.tasks.multiping', n=5)
+ 'swh.scheduler.tests.tasks.multiping', kwargs={'n': 5})
assert res
res.wait()
@@ -37,6 +48,7 @@
sleep(1)
results = [x.get() for x in promise.results]
+ assert len(results) == 5
for i in range(5):
assert ("OK (kw={'i': %s})" % i) in results
@@ -91,3 +103,61 @@
result = AsyncResult(id=task['backend_id'])
with pytest.raises(NotImplementedError):
result.get()
+
+
+def test_statsd(swh_app, celery_session_worker, mocker):
+ m = mocker.patch('swh.scheduler.task.Statsd._send_to_server')
+ mocker.patch('swh.scheduler.task.ts', side_effect=count())
+ mocker.patch('swh.core.statsd.monotonic', side_effect=count())
+ res = swh_app.send_task(
+ 'swh.scheduler.tests.tasks.echo')
+ assert res
+ res.wait()
+ assert res.successful()
+ assert res.result == {}
+
+ m.assert_any_call(
+ 'swh_task_called_count:1|c|'
+ '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')
+ m.assert_any_call(
+ 'swh_task_start_ts:0|g|'
+ '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')
+ m.assert_any_call(
+ 'swh_task_end_ts:1|g|'
+ '#status:uneventful,task:swh.scheduler.tests.tasks.echo,'
+ 'worker:unknown worker')
+ m.assert_any_call(
+ 'swh_task_duration_seconds:1000|ms|'
+ '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')
+ m.assert_any_call(
+ 'swh_task_success_count:1|c|'
+ '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')
+
+
+def test_statsd_with_status(swh_app, celery_session_worker, mocker):
+ m = mocker.patch('swh.scheduler.task.Statsd._send_to_server')
+ mocker.patch('swh.scheduler.task.ts', side_effect=count())
+ mocker.patch('swh.core.statsd.monotonic', side_effect=count())
+ res = swh_app.send_task(
+ 'swh.scheduler.tests.tasks.echo', kwargs={'status': 'eventful'})
+ assert res
+ res.wait()
+ assert res.successful()
+ assert res.result == {'status': 'eventful'}
+
+ m.assert_any_call(
+ 'swh_task_called_count:1|c|'
+ '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')
+ m.assert_any_call(
+ 'swh_task_start_ts:0|g|'
+ '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')
+ m.assert_any_call(
+ 'swh_task_end_ts:1|g|'
+ '#status:eventful,task:swh.scheduler.tests.tasks.echo,'
+ 'worker:unknown worker')
+ m.assert_any_call(
+ 'swh_task_duration_seconds:1000|ms|'
+ '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')
+ m.assert_any_call(
+ 'swh_task_success_count:1|c|'
+ '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 2:27 PM (11 w, 16 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222401

Event Timeline