Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
# Copyright (C) 2017-2021 The Software Heritage developers | # Copyright (C) 2017-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import inspect | import inspect | ||||
import random | import random | ||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, List, Optional, Tuple | ||||
import uuid | import uuid | ||||
import attr | import attr | ||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy | from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy | ||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin, ListedOriginPageToken, OriginVisitStats | from swh.scheduler.model import ( | ||||
ListedOrigin, | |||||
ListedOriginPageToken, | |||||
OriginVisitStats, | |||||
SchedulerMetrics, | |||||
) | |||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||
from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template | from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template | ||||
ONEDAY = datetime.timedelta(days=1) | ONEDAY = datetime.timedelta(days=1) | ||||
def subdict(d, keys=None, excl=()): | def subdict(d, keys=None, excl=()): | ||||
if keys is None: | if keys is None: | ||||
keys = [k for k in d.keys()] | keys = [k for k in d.keys()] | ||||
return {k: d[k] for k in keys if k not in excl} | return {k: d[k] for k in keys if k not in excl} | ||||
def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: | |||||
return (m.lister_id, m.visit_type) | |||||
def assert_metrics_equal(left, right): | |||||
assert sorted(left, key=metrics_sort_key) == sorted(right, key=metrics_sort_key) | |||||
class TestScheduler: | class TestScheduler: | ||||
def test_interface(self, swh_scheduler): | def test_interface(self, swh_scheduler): | ||||
"""Checks all methods of SchedulerInterface are implemented by this | """Checks all methods of SchedulerInterface are implemented by this | ||||
backend, and that they have the same signature.""" | backend, and that they have the same signature.""" | ||||
# Create an instance of the protocol (which cannot be instantiated | # Create an instance of the protocol (which cannot be instantiated | ||||
# directly, so this creates a subclass, then instantiates it) | # directly, so this creates a subclass, then instantiates it) | ||||
interface = type("_", (SchedulerInterface,), {})() | interface = type("_", (SchedulerInterface,), {})() | ||||
▲ Show 20 Lines • Show All 934 Lines • ▼ Show 20 Lines | def test_origin_visit_stats_upsert_cardinality_failing(self, swh_scheduler) -> None: | ||||
last_eventful=None, | last_eventful=None, | ||||
last_uneventful=utcnow(), | last_uneventful=utcnow(), | ||||
last_notfound=None, | last_notfound=None, | ||||
last_failed=None, | last_failed=None, | ||||
last_snapshot=None, | last_snapshot=None, | ||||
), | ), | ||||
] | ] | ||||
) | ) | ||||
def test_metrics_origins_known(self, swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
ret = swh_scheduler.update_metrics() | |||||
assert sum(metric.origins_known for metric in ret) == len(listed_origins) | |||||
def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
disabled_origin = attr.evolve(listed_origins[0], enabled=False) | |||||
swh_scheduler.record_listed_origins([disabled_origin]) | |||||
ret = swh_scheduler.update_metrics(lister_id=disabled_origin.lister_id) | |||||
for metric in ret: | |||||
if metric.visit_type == disabled_origin.visit_type: | |||||
# We disabled one of these origins | |||||
assert metric.origins_known - metric.origins_enabled == 1 | |||||
else: | |||||
# But these are still all enabled | |||||
assert metric.origins_known == metric.origins_enabled | |||||
def test_metrics_origins_never_visited(self, swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
# Pretend that we've recorded a visit on one origin | |||||
visited_origin = listed_origins[0] | |||||
swh_scheduler.origin_visit_stats_upsert( | |||||
[ | |||||
OriginVisitStats( | |||||
url=visited_origin.url, | |||||
visit_type=visited_origin.visit_type, | |||||
last_eventful=utcnow(), | |||||
last_uneventful=None, | |||||
last_failed=None, | |||||
last_notfound=None, | |||||
last_snapshot=hash_to_bytes( | |||||
"d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" | |||||
), | |||||
), | |||||
] | |||||
) | |||||
ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) | |||||
for metric in ret: | |||||
if metric.visit_type == visited_origin.visit_type: | |||||
# We visited one of these origins | |||||
assert metric.origins_known - metric.origins_never_visited == 1 | |||||
else: | |||||
# But none of these have been visited | |||||
assert metric.origins_known == metric.origins_never_visited | |||||
def test_metrics_origins_with_pending_changes(self, swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
# Pretend that we've recorded a visit on one origin, in the past with | |||||
# respect to the "last update" time for the origin | |||||
visited_origin = listed_origins[0] | |||||
assert visited_origin.last_update is not None | |||||
swh_scheduler.origin_visit_stats_upsert( | |||||
[ | |||||
OriginVisitStats( | |||||
url=visited_origin.url, | |||||
visit_type=visited_origin.visit_type, | |||||
last_eventful=visited_origin.last_update | |||||
- datetime.timedelta(days=1), | |||||
last_uneventful=None, | |||||
last_failed=None, | |||||
last_notfound=None, | |||||
last_snapshot=hash_to_bytes( | |||||
"d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" | |||||
), | |||||
), | |||||
] | |||||
) | |||||
ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) | |||||
for metric in ret: | |||||
if metric.visit_type == visited_origin.visit_type: | |||||
# We visited one of these origins, in the past | |||||
assert metric.origins_with_pending_changes == 1 | |||||
else: | |||||
# But none of these have been visited | |||||
assert metric.origins_with_pending_changes == 0 | |||||
def test_update_metrics_explicit_lister(self, swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
fake_uuid = uuid.uuid4() | |||||
assert all(fake_uuid != origin.lister_id for origin in listed_origins) | |||||
ret = swh_scheduler.update_metrics(lister_id=fake_uuid) | |||||
assert len(ret) == 0 | |||||
def test_update_metrics_explicit_timestamp(self, swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
ts = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) | |||||
ret = swh_scheduler.update_metrics(timestamp=ts) | |||||
assert all(metric.last_update == ts for metric in ret) | |||||
def test_update_metrics_twice(self, swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
ts = utcnow() | |||||
ret = swh_scheduler.update_metrics(timestamp=ts) | |||||
assert all(metric.last_update == ts for metric in ret) | |||||
second_ts = ts + datetime.timedelta(seconds=1) | |||||
ret = swh_scheduler.update_metrics(timestamp=second_ts) | |||||
assert all(metric.last_update == second_ts for metric in ret) | |||||
def test_get_metrics(self, swh_scheduler, listed_origins): | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
updated = swh_scheduler.update_metrics() | |||||
retrieved = swh_scheduler.get_metrics() | |||||
assert_metrics_equal(updated, retrieved) | |||||
def test_get_metrics_by_lister(self, swh_scheduler, listed_origins): | |||||
lister_id = listed_origins[0].lister_id | |||||
assert lister_id is not None | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
updated = swh_scheduler.update_metrics() | |||||
retrieved = swh_scheduler.get_metrics(lister_id=lister_id) | |||||
assert len(retrieved) > 0 | |||||
assert_metrics_equal( | |||||
[metric for metric in updated if metric.lister_id == lister_id], retrieved | |||||
) | |||||
def test_get_metrics_by_visit_type(self, swh_scheduler, listed_origins): | |||||
visit_type = listed_origins[0].visit_type | |||||
assert visit_type is not None | |||||
swh_scheduler.record_listed_origins(listed_origins) | |||||
updated = swh_scheduler.update_metrics() | |||||
retrieved = swh_scheduler.get_metrics(visit_type=visit_type) | |||||
assert len(retrieved) > 0 | |||||
assert_metrics_equal( | |||||
[metric for metric in updated if metric.visit_type == visit_type], retrieved | |||||
) |