@pytest.fixture(scope="session")
def swh_scheduler_celery_app():
"""Set up a Celery app as swh.scheduler and swh worker tests would expect it"""
test_app = TestApp(
set_as_current=True,
enable_logging=True,
task_cls="swh.scheduler.task:SWHTask",
config={
"accept_content": ["application/x-msgpack", "application/json"],
"task_serializer": "msgpack",
"result_serializer": "json",
},
)
with setup_default_app(test_app, use_trap=False):
> from swh.scheduler.celery_backend import config
.tox/py3/lib/python3.7/site-packages/swh/scheduler/pytest_plugin.py:84:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
import functools
import logging
import os
import pkg_resources
import traceback
from typing import Any, Dict
import urllib.parse
from celery import Celery
from celery.signals import setup_logging, celeryd_after_setup, worker_init
from celery.utils.log import ColorFormatter
from celery.worker.control import Panel
from kombu import Exchange, Queue
> from kombu.five import monotonic as _monotonic
E ModuleNotFoundError: No module named 'kombu.five'
.tox/py3/lib/python3.7/site-packages/swh/scheduler/celery_backend/config.py:20: ModuleNotFoundError
TEST RESULT
TEST RESULT
- Run At
- Sep 24 2020, 5:10 PM