diff --git a/requirements-test.txt b/requirements-test.txt index 6c27c8a..80828c5 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,6 @@ pytest +pytest-mock pytest-postgresql >= 2.1.0 celery >= 4.3 hypothesis >= 3.11.0 swh.lister diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index 3e2e47a..83d3c0e 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,66 +1,86 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from datetime import datetime + from celery import current_app import celery.app.task from celery.utils.log import get_task_logger from swh.core.statsd import Statsd +def ts(): + return int(datetime.utcnow().timestamp()) + + class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ _statsd = None _log = None @property def statsd(self): if self._statsd: return self._statsd worker_name = current_app.conf.get('worker_name') if worker_name: self._statsd = Statsd(constant_tags={ 'task': self.name, 'worker': worker_name, }) return self._statsd else: - return Statsd(constant_tags={ + statsd = Statsd(constant_tags={ 'task': self.name, 'worker': 'unknown worker', }) + return statsd def __call__(self, *args, **kwargs): self.statsd.increment('swh_task_called_count') + self.statsd.gauge('swh_task_start_ts', ts()) with self.statsd.timed('swh_task_duration_seconds'): - return super().__call__(*args, **kwargs) + result = super().__call__(*args, **kwargs) + try: + status = result['status'] + if status == 'success': + status = 'eventful' if result.get('eventful') \ + else 'uneventful' + except Exception: + status = 'eventful' if result else 'uneventful' + + self.statsd.gauge( + 'swh_task_end_ts', ts(), + tags={'status': status}) + return result def on_failure(self, exc, task_id, args, kwargs, einfo): self.statsd.increment('swh_task_failure_count') def on_success(self, retval, task_id, args, kwargs): self.statsd.increment('swh_task_success_count') # this is a swh specific event. Used to attach the retval to the # task_run self.send_event('task-result', result=retval) @property def log(self): if self._log is None: self._log = get_task_logger(self.name) return self._log def run(self, *args, **kwargs): self.log.debug('%s: args=%s, kwargs=%s', self.name, args, kwargs) ret = super().run(*args, **kwargs) self.log.debug('%s: OK => %s', self.name, ret) return ret diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index e96f15f..6534132 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,109 +1,109 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import glob from datetime import timedelta import pkg_resources from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py -TASK_NAMES = ['ping', 'multiping', 'add', 'error'] +TASK_NAMES = ['ping', 'multiping', 'add', 'error', 'echo'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): task_modules = [ 'swh.scheduler.tests.tasks', ] for entrypoint in pkg_resources.iter_entry_points('swh.workers'): task_modules.extend(entrypoint.load()().get('task_modules', [])) return task_modules @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'json', } # use the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application # with the test application @pytest.fixture(scope='session') def swh_app(celery_session_app): from swh.scheduler.celery_backend import config config.app = celery_session_app yield celery_session_app @pytest.fixture def swh_scheduler_config(request, postgresql): scheduler_config = { 'db': postgresql.dsn, } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() return scheduler_config @pytest.fixture def swh_scheduler(swh_scheduler_config): scheduler = get_scheduler('local', swh_scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler # this alias is used to be able to easily instantiate a db-backed Scheduler # eg. for the RPC client/server test suite. swh_db_scheduler = swh_scheduler diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py index be7628d..2426478 100644 --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,36 +1,42 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import group, shared_task @shared_task(name='swh.scheduler.tests.tasks.ping', bind=True) def ping(self, **kw): # check this is a SWHTask assert hasattr(self, 'log') assert not hasattr(self, 'run_task') assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] self.log.debug(self.name) if kw: return 'OK (kw=%s)' % kw return 'OK' @shared_task(name='swh.scheduler.tests.tasks.multiping', bind=True) def multiping(self, n=10): promise = group(ping.s(i=i) for i in range(n))() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) promise.save() return promise.id @shared_task(name='swh.scheduler.tests.tasks.error') def not_implemented(): raise NotImplementedError('Nope') @shared_task(name='swh.scheduler.tests.tasks.add') def add(x, y): return x + y + + +@shared_task(name='swh.scheduler.tests.tasks.echo') +def echo(**kw): + "Does nothing, just return the given kwargs dict" + return kw diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py index a500552..78bed28 100644 --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,103 +1,163 @@ from time import sleep +from itertools import count + from celery.result import GroupResult from celery.result import AsyncResult import pytest from swh.scheduler.utils import create_task_dict from swh.scheduler.celery_backend.runner import run_ready_tasks def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.scheduler.tests.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' def test_ping_with_kw(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.scheduler.tests.tasks.ping', kwargs={'a': 1}) assert res res.wait() assert res.successful() assert res.result == "OK (kw={'a': 1})" def test_multiping(swh_app, celery_session_worker): "Test that a task that spawns subtasks (group) works" res = swh_app.send_task( 'swh.scheduler.tests.tasks.multiping', kwargs={'n': 5}) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) results = [x.get() for x in promise.results] assert len(results) == 5 for i in range(5): assert ("OK (kw={'i': %s})" % i) in results @pytest.mark.db def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): "Test that the scheduler fixture works properly" task_type = swh_scheduler.get_task_type('swh-test-ping') assert task_type assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.ping' swh_scheduler.create_tasks([create_task_dict( 'swh-test-ping', 'oneshot')]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert backend_tasks for task in backend_tasks: # Make sure the task completed AsyncResult(id=task['backend_id']).get() @pytest.mark.db def test_task_return_value(swh_app, celery_session_worker, swh_scheduler): task_type = swh_scheduler.get_task_type('swh-test-add') assert task_type assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.add' swh_scheduler.create_tasks([create_task_dict( 'swh-test-add', 'oneshot', 12, 30)]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert len(backend_tasks) == 1 task = backend_tasks[0] value = AsyncResult(id=task['backend_id']).get() assert value == 42 @pytest.mark.db def test_task_exception(swh_app, celery_session_worker, swh_scheduler): task_type = swh_scheduler.get_task_type('swh-test-error') assert task_type assert task_type['backend_name'] == 'swh.scheduler.tests.tasks.error' swh_scheduler.create_tasks([create_task_dict( 'swh-test-error', 'oneshot')]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert len(backend_tasks) == 1 task = backend_tasks[0] result = AsyncResult(id=task['backend_id']) with pytest.raises(NotImplementedError): result.get() + + +def test_statsd(swh_app, celery_session_worker, mocker): + m = mocker.patch('swh.scheduler.task.Statsd._send_to_server') + mocker.patch('swh.scheduler.task.ts', side_effect=count()) + mocker.patch('swh.core.statsd.monotonic', side_effect=count()) + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.echo') + assert res + res.wait() + assert res.successful() + assert res.result == {} + + m.assert_any_call( + 'swh_task_called_count:1|c|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_start_ts:0|g|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_end_ts:1|g|' + '#status:uneventful,task:swh.scheduler.tests.tasks.echo,' + 'worker:unknown worker') + m.assert_any_call( + 'swh_task_duration_seconds:1000|ms|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_success_count:1|c|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + + +def test_statsd_with_status(swh_app, celery_session_worker, mocker): + m = mocker.patch('swh.scheduler.task.Statsd._send_to_server') + mocker.patch('swh.scheduler.task.ts', side_effect=count()) + mocker.patch('swh.core.statsd.monotonic', side_effect=count()) + res = swh_app.send_task( + 'swh.scheduler.tests.tasks.echo', kwargs={'status': 'eventful'}) + assert res + res.wait() + assert res.successful() + assert res.result == {'status': 'eventful'} + + m.assert_any_call( + 'swh_task_called_count:1|c|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_start_ts:0|g|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_end_ts:1|g|' + '#status:eventful,task:swh.scheduler.tests.tasks.echo,' + 'worker:unknown worker') + m.assert_any_call( + 'swh_task_duration_seconds:1000|ms|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker') + m.assert_any_call( + 'swh_task_success_count:1|c|' + '#task:swh.scheduler.tests.tasks.echo,worker:unknown worker')