Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
# Copyright (C) 2017-2019 The Software Heritage developers | # Copyright (C) 2017-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import inspect | import inspect | ||||
import random | import random | ||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, List, Optional | ||||
import uuid | import uuid | ||||
import attr | import attr | ||||
import pytest | import pytest | ||||
from swh.scheduler.exc import StaleData | from swh.scheduler.exc import StaleData, UnknownPolicy | ||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin, ListedOriginPageToken | from swh.scheduler.model import ListedOrigin, ListedOriginPageToken | ||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||
from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template | from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template | ||||
ONEDAY = datetime.timedelta(days=1) | ONEDAY = datetime.timedelta(days=1) | ||||
▲ Show 20 Lines • Show All 695 Lines • ▼ Show 20 Lines | class TestScheduler: | ||||
def test_get_listed_origins_all(self, swh_scheduler, listed_origins) -> None: | def test_get_listed_origins_all(self, swh_scheduler, listed_origins) -> None: | ||||
swh_scheduler.record_listed_origins(listed_origins) | swh_scheduler.record_listed_origins(listed_origins) | ||||
ret = swh_scheduler.get_listed_origins(limit=len(listed_origins) + 1) | ret = swh_scheduler.get_listed_origins(limit=len(listed_origins) + 1) | ||||
assert ret.next_page_token is None | assert ret.next_page_token is None | ||||
assert len(ret.origins) == len(listed_origins) | assert len(ret.origins) == len(listed_origins) | ||||
@pytest.mark.parametrize("policy", ["oldest_scheduled_first"]) | |||||
def test_grab_next_visits(self, swh_scheduler, listed_origins, policy): | |||||
NUM_RESULTS = 5 | |||||
# Strict inequality to check that grab_next_visits doesn't return more | |||||
# results than requested | |||||
assert len(listed_origins) > NUM_RESULTS | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
before = utcnow() | |||||
ret = swh_scheduler.grab_next_visits(NUM_RESULTS, policy=policy) | |||||
after = utcnow() | |||||
assert len(ret) == NUM_RESULTS | |||||
for origin in ret: | |||||
assert before <= origin.last_scheduled <= after | |||||
@pytest.mark.parametrize("policy", ["oldest_scheduled_first"]) | |||||
def test_grab_next_visits_underflow(self, swh_scheduler, listed_origins, policy): | |||||
NUM_RESULTS = 5 | |||||
assert len(listed_origins) >= NUM_RESULTS | |||||
swh_scheduler.record_listed_origins(listed_origins[:NUM_RESULTS]) | |||||
ret = swh_scheduler.grab_next_visits(NUM_RESULTS + 2, policy=policy) | |||||
assert len(ret) == NUM_RESULTS | |||||
def test_grab_next_visits_unknown_policy(self, swh_scheduler): | |||||
NUM_RESULTS = 5 | |||||
with pytest.raises(UnknownPolicy, match="non_existing_policy"): | |||||
swh_scheduler.grab_next_visits(NUM_RESULTS, policy="non_existing_policy") | |||||
def _create_task_types(self, scheduler): | def _create_task_types(self, scheduler): | ||||
for tt in TASK_TYPES.values(): | for tt in TASK_TYPES.values(): | ||||
scheduler.create_task_type(tt) | scheduler.create_task_type(tt) |