Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
# Copyright (C) 2017-2021 The Software Heritage developers | # Copyright (C) 2017-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import copy | import copy | ||||
import datetime | import datetime | ||||
from datetime import timedelta | |||||
import inspect | import inspect | ||||
import random | import random | ||||
from typing import Any, Dict, List, Optional, Tuple | from typing import Any, Dict, List, Optional, Tuple | ||||
import uuid | import uuid | ||||
import attr | import attr | ||||
from psycopg2.extras import execute_values | from psycopg2.extras import execute_values | ||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy | from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy | ||||
from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface | from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin, OriginVisitStats, SchedulerMetrics | from swh.scheduler.model import ListedOrigin, OriginVisitStats, SchedulerMetrics | ||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||
from .common import ( | from .common import ( | ||||
LISTERS, | LISTERS, | ||||
TASK_TYPES, | TASK_TYPES, | ||||
TEMPLATES, | TEMPLATES, | ||||
tasks_from_template, | tasks_from_template, | ||||
tasks_with_priority_from_template, | tasks_with_priority_from_template, | ||||
) | ) | ||||
ONEDAY = datetime.timedelta(days=1) | ONEDAY = timedelta(days=1) | ||||
NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20} | NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20} | ||||
def subdict(d, keys=None, excl=()): | def subdict(d, keys=None, excl=()): | ||||
if keys is None: | if keys is None: | ||||
keys = [k for k in d.keys()] | keys = [k for k in d.keys()] | ||||
return {k: d[k] for k in keys if k not in excl} | return {k: d[k] for k in keys if k not in excl} | ||||
▲ Show 20 Lines • Show All 773 Lines • ▼ Show 20 Lines | ): | ||||
after = self._check_grab_next_visit_basic( | after = self._check_grab_next_visit_basic( | ||||
swh_scheduler, visit_type, policy, expected, **kwargs | swh_scheduler, visit_type, policy, expected, **kwargs | ||||
) | ) | ||||
# But a week, later, they should | # But a week, later, they should | ||||
ret = swh_scheduler.grab_next_visits( | ret = swh_scheduler.grab_next_visits( | ||||
visit_type=visit_type, | visit_type=visit_type, | ||||
count=len(expected) + 1, | count=len(expected) + 1, | ||||
policy=policy, | policy=policy, | ||||
timestamp=after + datetime.timedelta(days=7), | timestamp=after + timedelta(days=7), | ||||
) | ) | ||||
# We need to sort them because their 'last_scheduled' field is updated to | # We need to sort them because their 'last_scheduled' field is updated to | ||||
# exactly the same value, so the order is not deterministic | # exactly the same value, so the order is not deterministic | ||||
assert sorted(ret) == sorted( | assert sorted(ret) == sorted( | ||||
expected | expected | ||||
), "grab_next_visits didn't reschedule visits after a week" | ), "grab_next_visits didn't reschedule visits after a week" | ||||
def _prepare_oldest_scheduled_first_origins( | def _prepare_oldest_scheduled_first_origins( | ||||
Show All 9 Lines | ): | ||||
OriginVisitStats( | OriginVisitStats( | ||||
url=origin.url, | url=origin.url, | ||||
visit_type=origin.visit_type, | visit_type=origin.visit_type, | ||||
last_snapshot=None, | last_snapshot=None, | ||||
last_eventful=None, | last_eventful=None, | ||||
last_uneventful=None, | last_uneventful=None, | ||||
last_failed=None, | last_failed=None, | ||||
last_notfound=None, | last_notfound=None, | ||||
last_scheduled=base_date - datetime.timedelta(seconds=i), | last_scheduled=base_date - timedelta(seconds=i), | ||||
) | ) | ||||
for i, origin in enumerate(origins[1:]) | for i, origin in enumerate(origins[1:]) | ||||
] | ] | ||||
swh_scheduler.origin_visit_stats_upsert(visit_stats) | swh_scheduler.origin_visit_stats_upsert(visit_stats) | ||||
# We expect to retrieve the origin with a NULL last_scheduled | # We expect to retrieve the origin with a NULL last_scheduled | ||||
# as well as those with the oldest values (i.e. the last ones), in order. | # as well as those with the oldest values (i.e. the last ones), in order. | ||||
expected = [origins[0]] + origins[1:][::-1] | expected = [origins[0]] + origins[1:][::-1] | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | ): | ||||
last_eventful=None, | last_eventful=None, | ||||
last_uneventful=None, | last_uneventful=None, | ||||
**ovs_args, | **ovs_args, | ||||
) | ) | ||||
for i, origin in enumerate(origins) | for i, origin in enumerate(origins) | ||||
] | ] | ||||
swh_scheduler.origin_visit_stats_upsert(visit_stats) | swh_scheduler.origin_visit_stats_upsert(visit_stats) | ||||
cooldown_td = datetime.timedelta(days=cooldown) | cooldown_td = timedelta(days=cooldown) | ||||
cooldown_args = { | cooldown_args = { | ||||
"scheduled_cooldown": None, | "scheduled_cooldown": None, | ||||
"failed_cooldown": None, | "failed_cooldown": None, | ||||
"notfound_cooldown": None, | "notfound_cooldown": None, | ||||
} | } | ||||
cooldown_args[f"{which_cooldown}_cooldown"] = cooldown_td | cooldown_args[f"{which_cooldown}_cooldown"] = cooldown_td | ||||
ret = swh_scheduler.grab_next_visits( | ret = swh_scheduler.grab_next_visits( | ||||
visit_type=visit_type, | visit_type=visit_type, | ||||
count=len(expected) + 1, | count=len(expected) + 1, | ||||
policy="oldest_scheduled_first", | policy="oldest_scheduled_first", | ||||
timestamp=after + cooldown_td - datetime.timedelta(seconds=1), | timestamp=after + cooldown_td - timedelta(seconds=1), | ||||
**cooldown_args, | **cooldown_args, | ||||
) | ) | ||||
assert ret == [], f"{which_cooldown}_cooldown ignored" | assert ret == [], f"{which_cooldown}_cooldown ignored" | ||||
ret = swh_scheduler.grab_next_visits( | ret = swh_scheduler.grab_next_visits( | ||||
visit_type=visit_type, | visit_type=visit_type, | ||||
count=len(expected) + 1, | count=len(expected) + 1, | ||||
policy="oldest_scheduled_first", | policy="oldest_scheduled_first", | ||||
timestamp=after + cooldown_td + datetime.timedelta(seconds=1), | timestamp=after + cooldown_td + timedelta(seconds=1), | ||||
**cooldown_args, | **cooldown_args, | ||||
) | ) | ||||
assert sorted(ret) == sorted( | assert sorted(ret) == sorted( | ||||
expected | expected | ||||
), "grab_next_visits didn't reschedule visits after the configured cooldown" | ), "grab_next_visits didn't reschedule visits after the configured cooldown" | ||||
def test_grab_next_visits_never_visited_oldest_update_first( | def test_grab_next_visits_never_visited_oldest_update_first( | ||||
self, swh_scheduler, listed_origins_by_type, | self, swh_scheduler, listed_origins_by_type, | ||||
): | ): | ||||
visit_type, origins = self._grab_next_visits_setup( | visit_type, origins = self._grab_next_visits_setup( | ||||
swh_scheduler, listed_origins_by_type | swh_scheduler, listed_origins_by_type | ||||
) | ) | ||||
# Update known origins with a `last_update` field that we control | # Update known origins with a `last_update` field that we control | ||||
base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) | base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) | ||||
updated_origins = [ | updated_origins = [ | ||||
attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) | attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) | ||||
for i, origin in enumerate(origins) | for i, origin in enumerate(origins) | ||||
] | ] | ||||
updated_origins = swh_scheduler.record_listed_origins(updated_origins) | updated_origins = swh_scheduler.record_listed_origins(updated_origins) | ||||
# We expect to retrieve origins with the oldest update date, that is | # We expect to retrieve origins with the oldest update date, that is | ||||
# origins at the end of our updated_origins list. | # origins at the end of our updated_origins list. | ||||
expected_origins = sorted(updated_origins, key=lambda o: o.last_update) | expected_origins = sorted(updated_origins, key=lambda o: o.last_update) | ||||
Show All 9 Lines | class TestScheduler: | ||||
): | ): | ||||
visit_type, origins = self._grab_next_visits_setup( | visit_type, origins = self._grab_next_visits_setup( | ||||
swh_scheduler, listed_origins_by_type | swh_scheduler, listed_origins_by_type | ||||
) | ) | ||||
# Update known origins with a `last_update` field that we control | # Update known origins with a `last_update` field that we control | ||||
base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) | base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) | ||||
updated_origins = [ | updated_origins = [ | ||||
attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i)) | attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) | ||||
for i, origin in enumerate(origins) | for i, origin in enumerate(origins) | ||||
] | ] | ||||
updated_origins = swh_scheduler.record_listed_origins(updated_origins) | updated_origins = swh_scheduler.record_listed_origins(updated_origins) | ||||
# Update the visit stats with a known visit at a controlled date for | # Update the visit stats with a known visit at a controlled date for | ||||
# half the origins. Pick the date in the middle of the | # half the origins. Pick the date in the middle of the | ||||
# updated_origins' `last_update` range | # updated_origins' `last_update` range | ||||
visit_date = updated_origins[len(updated_origins) // 2].last_update | visit_date = updated_origins[len(updated_origins) // 2].last_update | ||||
Show All 38 Lines | def test_grab_next_visits_underflow(self, swh_scheduler, listed_origins_by_type): | ||||
swh_scheduler.record_listed_origins(origins) | swh_scheduler.record_listed_origins(origins) | ||||
ret = swh_scheduler.grab_next_visits( | ret = swh_scheduler.grab_next_visits( | ||||
visit_type, len(origins) + 2, policy="oldest_scheduled_first" | visit_type, len(origins) + 2, policy="oldest_scheduled_first" | ||||
) | ) | ||||
assert len(ret) == 5 | assert len(ret) == 5 | ||||
def test_grab_next_visits_no_last_update_nor_visit_stats( | |||||
self, swh_scheduler, listed_origins_by_type | |||||
): | |||||
"""grab_next_visits should retrieve tasks without last update (nor visit stats) | |||||
""" | |||||
visit_type = next(iter(listed_origins_by_type)) | |||||
origins = [] | |||||
for origin in listed_origins_by_type[visit_type]: | |||||
origins.append( | |||||
attr.evolve(origin, last_update=None) | |||||
) # void the last update so we are in the relevant context | |||||
assert len(origins) > 0 | |||||
swh_scheduler.record_listed_origins(origins) | |||||
# Initially, we have no global queue position | |||||
actual_state = swh_scheduler.visit_scheduler_queue_position_get() | |||||
assert actual_state == {} | |||||
# nor any visit statuses | |||||
actual_visit_stats = swh_scheduler.origin_visit_stats_get( | |||||
(o.url, o.visit_type) for o in origins | |||||
) | |||||
assert len(actual_visit_stats) == 0 | |||||
ardumont: ¯\_(ツ)_/¯ | |||||
# Grab some new visits | |||||
next_visits = swh_scheduler.grab_next_visits( | |||||
visit_type, count=len(origins), policy="origins_without_last_update", | |||||
) | |||||
# we do have the one without any last update | |||||
assert len(next_visits) == len(origins) | |||||
# Now the global state got updated | |||||
actual_state = swh_scheduler.visit_scheduler_queue_position_get() | |||||
assert actual_state[visit_type] is not None | |||||
actual_visit_stats = swh_scheduler.origin_visit_stats_get( | |||||
(o.url, o.visit_type) for o in next_visits | |||||
) | |||||
# Visit stats got algo created | |||||
assert len(actual_visit_stats) == len(origins) | |||||
def test_grab_next_visits_no_last_update_with_visit_stats( | |||||
self, swh_scheduler, listed_origins_by_type | |||||
): | |||||
"""grab_next_visits should retrieve tasks without last update""" | |||||
visit_type = next(iter(listed_origins_by_type)) | |||||
origins = [] | |||||
for origin in listed_origins_by_type[visit_type]: | |||||
origins.append( | |||||
attr.evolve(origin, last_update=None) | |||||
) # void the last update so we are in the relevant context | |||||
assert len(origins) > 0 | |||||
swh_scheduler.record_listed_origins(origins) | |||||
# Initially, we have no global queue position | |||||
actual_state = swh_scheduler.visit_scheduler_queue_position_get() | |||||
assert actual_state == {} | |||||
date_now = utcnow() | |||||
# Simulate some of those origins have associated visit stats (some with an | |||||
# existing queue position and some without any) | |||||
visit_stats = ( | |||||
[ | |||||
OriginVisitStats( | |||||
url=origin.url, | |||||
visit_type=origin.visit_type, | |||||
last_eventful=utcnow(), | |||||
last_uneventful=None, | |||||
last_failed=None, | |||||
last_notfound=None, | |||||
next_visit_queue_position=date_now | |||||
+ timedelta(days=random.uniform(-10, 1)), | |||||
) | |||||
for origin in origins[:100] | |||||
] | |||||
+ [ | |||||
OriginVisitStats( | |||||
url=origin.url, | |||||
visit_type=origin.visit_type, | |||||
last_eventful=utcnow(), | |||||
last_uneventful=None, | |||||
last_failed=None, | |||||
last_notfound=None, | |||||
next_visit_queue_position=date_now | |||||
+ timedelta(days=random.uniform(1, 10)), # definitely > now() | |||||
) | |||||
for origin in origins[100:150] | |||||
] | |||||
+ [ | |||||
OriginVisitStats( | |||||
url=origin.url, | |||||
visit_type=origin.visit_type, | |||||
last_eventful=utcnow(), | |||||
last_uneventful=None, | |||||
last_failed=None, | |||||
last_notfound=None, | |||||
) | |||||
for origin in origins[150:] | |||||
] | |||||
) | |||||
swh_scheduler.origin_visit_stats_upsert(visit_stats) | |||||
# Grab next visits | |||||
actual_visits = swh_scheduler.grab_next_visits( | |||||
visit_type, count=len(origins), policy="origins_without_last_update", | |||||
) | |||||
assert len(actual_visits) == len(origins) | |||||
actual_visit_stats = swh_scheduler.origin_visit_stats_get( | |||||
(o.url, o.visit_type) for o in actual_visits | |||||
) | |||||
assert len(actual_visit_stats) == len(origins) | |||||
actual_state = swh_scheduler.visit_scheduler_queue_position_get() | |||||
assert actual_state == { | |||||
visit_type: max( | |||||
s.next_visit_queue_position | |||||
for s in actual_visit_stats | |||||
if s.next_visit_queue_position is not None | |||||
) | |||||
} | |||||
def test_grab_next_visits_unknown_policy(self, swh_scheduler): | def test_grab_next_visits_unknown_policy(self, swh_scheduler): | ||||
unknown_policy = "non_existing_policy" | unknown_policy = "non_existing_policy" | ||||
NUM_RESULTS = 5 | NUM_RESULTS = 5 | ||||
with pytest.raises(UnknownPolicy, match=unknown_policy): | with pytest.raises(UnknownPolicy, match=unknown_policy): | ||||
swh_scheduler.grab_next_visits("type", NUM_RESULTS, policy=unknown_policy) | swh_scheduler.grab_next_visits("type", NUM_RESULTS, policy=unknown_policy) | ||||
def _create_task_types(self, scheduler): | def _create_task_types(self, scheduler): | ||||
for tt in TASK_TYPES.values(): | for tt in TASK_TYPES.values(): | ||||
▲ Show 20 Lines • Show All 247 Lines • ▼ Show 20 Lines | def test_metrics_origins_with_pending_changes(self, swh_scheduler, listed_origins): | ||||
# respect to the "last update" time for the origin | # respect to the "last update" time for the origin | ||||
visited_origin = listed_origins[0] | visited_origin = listed_origins[0] | ||||
assert visited_origin.last_update is not None | assert visited_origin.last_update is not None | ||||
swh_scheduler.origin_visit_stats_upsert( | swh_scheduler.origin_visit_stats_upsert( | ||||
[ | [ | ||||
OriginVisitStats( | OriginVisitStats( | ||||
url=visited_origin.url, | url=visited_origin.url, | ||||
visit_type=visited_origin.visit_type, | visit_type=visited_origin.visit_type, | ||||
last_eventful=visited_origin.last_update | last_eventful=visited_origin.last_update - timedelta(days=1), | ||||
- datetime.timedelta(days=1), | |||||
last_uneventful=None, | last_uneventful=None, | ||||
last_failed=None, | last_failed=None, | ||||
last_notfound=None, | last_notfound=None, | ||||
last_snapshot=hash_to_bytes( | last_snapshot=hash_to_bytes( | ||||
"d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" | "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" | ||||
), | ), | ||||
), | ), | ||||
] | ] | ||||
Show All 29 Lines | |||||
def test_update_metrics_twice(self, swh_scheduler, listed_origins): | def test_update_metrics_twice(self, swh_scheduler, listed_origins): | ||||
swh_scheduler.record_listed_origins(listed_origins) | swh_scheduler.record_listed_origins(listed_origins) | ||||
ts = utcnow() | ts = utcnow() | ||||
ret = swh_scheduler.update_metrics(timestamp=ts) | ret = swh_scheduler.update_metrics(timestamp=ts) | ||||
assert all(metric.last_update == ts for metric in ret) | assert all(metric.last_update == ts for metric in ret) | ||||
second_ts = ts + datetime.timedelta(seconds=1) | second_ts = ts + timedelta(seconds=1) | ||||
ret = swh_scheduler.update_metrics(timestamp=second_ts) | ret = swh_scheduler.update_metrics(timestamp=second_ts) | ||||
assert all(metric.last_update == second_ts for metric in ret) | assert all(metric.last_update == second_ts for metric in ret) | ||||
def test_get_metrics(self, swh_scheduler, listed_origins): | def test_get_metrics(self, swh_scheduler, listed_origins): | ||||
swh_scheduler.record_listed_origins(listed_origins) | swh_scheduler.record_listed_origins(listed_origins) | ||||
updated = swh_scheduler.update_metrics() | updated = swh_scheduler.update_metrics() | ||||
retrieved = swh_scheduler.get_metrics() | retrieved = swh_scheduler.get_metrics() | ||||
Show All 29 Lines |
¯\_(ツ)_/¯