self = <swh.scheduler.tests.test_scheduler.TestScheduler object at 0x7f065c7759b0>
swh_scheduler = <swh.scheduler.backend.SchedulerBackend object at 0x7f065c75feb8>
def test_interface(self, swh_scheduler):
"""Checks all methods of SchedulerInterface are implemented by this
backend, and that they have the same signature."""
# Create an instance of the protocol (which cannot be instantiated
# directly, so this creates a subclass, then instantiates it)
interface = type("_", (SchedulerInterface,), {})()
assert "create_task_type" in dir(interface)
missing_methods = []
for meth_name in dir(interface):
if meth_name.startswith("_"):
continue
interface_meth = getattr(interface, meth_name)
try:
concrete_meth = getattr(swh_scheduler, meth_name)
except AttributeError:
if not getattr(interface_meth, "deprecated_endpoint", False):
# The backend is missing a (non-deprecated) endpoint
missing_methods.append(meth_name)
continue
expected_signature = inspect.signature(interface_meth)
actual_signature = inspect.signature(concrete_meth)
> assert expected_signature == actual_signature, meth_name
E AssertionError: grab_ready_priority_tasks
E assert <Signature (t...-> List[Dict]> == <Signature (t...Type] = None)>
E +<Signature (task_type: str, timestamp: Union[datetime.datetime, NoneType] = None, num_tasks: Union[int, NoneType] = None) -> List[Dict]>
E -<Signature (task_type: str, timestamp: Union[datetime.datetime, NoneType] = None, num_tasks: Union[int, NoneType] = None)>
.tox/py3/lib/python3.7/site-packages/swh/scheduler/tests/test_scheduler.py:76: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Apr 13 2021, 6:05 PM