Page MenuHomeSoftware Heritage

D5956.id21407.diff
No OneTemporary

D5956.id21407.diff

diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -19,7 +19,11 @@
from swh.scheduler.utils import utcnow
from .exc import SchedulerException, StaleData, UnknownPolicy
-from .interface import ListedOriginPageToken, PaginatedListedOriginList
+from .interface import (
+ SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ ListedOriginPageToken,
+ PaginatedListedOriginList,
+)
from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics
logger = logging.getLogger(__name__)
@@ -340,7 +344,9 @@
if timestamp is None:
timestamp = utcnow()
- origin_select_cols = ", ".join(ListedOrigin.select_columns())
+ origin_select_cols = ", ".join(
+ f"lo.{col}" for col in ListedOrigin.select_columns()
+ )
query_args: List[Any] = []
@@ -350,7 +356,7 @@
where_clauses.append("enabled")
# Only schedule visits of the given type
- where_clauses.append("visit_type = %s")
+ where_clauses.append("lo.visit_type = %s")
query_args.append(visit_type)
if scheduled_cooldown:
@@ -396,8 +402,8 @@
where_clauses.append("origin_visit_stats.last_snapshot IS NULL")
# order by increasing last_update (oldest first)
- where_clauses.append("listed_origins.last_update IS NOT NULL")
- order_by = "listed_origins.last_update"
+ where_clauses.append("lo.last_update IS NOT NULL")
+ order_by = "lo.last_update"
elif policy == "already_visited_order_by_lag":
# TODO: store "visit lag" in a materialized view?
@@ -405,10 +411,10 @@
where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL")
# ignore origins we have visited after the known last update
- where_clauses.append("listed_origins.last_update IS NOT NULL")
+ where_clauses.append("lo.last_update IS NOT NULL")
where_clauses.append(
"""
- listed_origins.last_update
+ lo.last_update
> GREATEST(
origin_visit_stats.last_eventful,
origin_visit_stats.last_uneventful
@@ -418,13 +424,27 @@
# order by decreasing visit lag
order_by = """\
- listed_origins.last_update
+ lo.last_update
- GREATEST(
origin_visit_stats.last_eventful,
origin_visit_stats.last_uneventful
)
DESC
"""
+ elif policy == SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE:
+ where_clauses.append("lo.last_update IS NULL")
+ where_clauses.append(
+ "origin_visit_stats.next_visit_queue_position IS NOT NULL"
+ )
+ where_clauses.append(
+ "vsqp.position <= origin_visit_stats.next_visit_queue_position"
+ )
+ # TODO: Determine if it's actually what we want... this chooses an interval
+ # of one day to fetch visits to schedule...
+ where_clauses.append(
+ "origin_visit_stats.next_visit_queue_position < vsqp.position + '1 day'"
+ )
+ order_by = "lo.last_update"
else:
raise UnknownPolicy(f"Unknown scheduling policy {policy}")
@@ -432,9 +452,12 @@
SELECT
{origin_select_cols}
FROM
- listed_origins
+ listed_origins lo
LEFT JOIN
origin_visit_stats USING (url, visit_type)
+ LEFT JOIN
+ visit_scheduler_queue_position vsqp
+ on vsqp.visit_type = lo.visit_type
WHERE
({") AND (".join(where_clauses)})
ORDER BY
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -16,6 +16,8 @@
ListedOriginPageToken = Tuple[str, str]
+SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE = "origins_without_last_update"
+
class PaginatedListedOriginList(PagedResult[ListedOrigin, ListedOriginPageToken]):
"""A list of listed origins, with a continuation token"""
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -6,6 +6,7 @@
from collections import defaultdict
import copy
import datetime
+from datetime import timedelta
import inspect
import random
from typing import Any, Dict, List, Optional, Tuple
@@ -17,7 +18,11 @@
from swh.model.hashutil import hash_to_bytes
from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy
-from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface
+from swh.scheduler.interface import (
+ SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ ListedOriginPageToken,
+ SchedulerInterface,
+)
from swh.scheduler.model import ListedOrigin, OriginVisitStats, SchedulerMetrics
from swh.scheduler.utils import utcnow
@@ -29,7 +34,7 @@
tasks_with_priority_from_template,
)
-ONEDAY = datetime.timedelta(days=1)
+ONEDAY = timedelta(days=1)
NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20}
@@ -819,7 +824,7 @@
visit_type=visit_type,
count=len(expected) + 1,
policy=policy,
- timestamp=after + datetime.timedelta(days=7),
+ timestamp=after + timedelta(days=7),
)
# We need to sort them because their 'last_scheduled' field is updated to
# exactly the same value, so the order is not deterministic
@@ -845,7 +850,7 @@
last_uneventful=None,
last_failed=None,
last_notfound=None,
- last_scheduled=base_date - datetime.timedelta(seconds=i),
+ last_scheduled=base_date - timedelta(seconds=i),
)
for i, origin in enumerate(origins[1:])
]
@@ -903,7 +908,7 @@
]
swh_scheduler.origin_visit_stats_upsert(visit_stats)
- cooldown_td = datetime.timedelta(days=cooldown)
+ cooldown_td = timedelta(days=cooldown)
cooldown_args = {
"scheduled_cooldown": None,
"failed_cooldown": None,
@@ -915,7 +920,7 @@
visit_type=visit_type,
count=len(expected) + 1,
policy="oldest_scheduled_first",
- timestamp=after + cooldown_td - datetime.timedelta(seconds=1),
+ timestamp=after + cooldown_td - timedelta(seconds=1),
**cooldown_args,
)
@@ -925,7 +930,7 @@
visit_type=visit_type,
count=len(expected) + 1,
policy="oldest_scheduled_first",
- timestamp=after + cooldown_td + datetime.timedelta(seconds=1),
+ timestamp=after + cooldown_td + timedelta(seconds=1),
**cooldown_args,
)
@@ -943,7 +948,7 @@
# Update known origins with a `last_update` field that we control
base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
updated_origins = [
- attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i))
+ attr.evolve(origin, last_update=base_date - timedelta(seconds=i))
for i, origin in enumerate(origins)
]
updated_origins = swh_scheduler.record_listed_origins(updated_origins)
@@ -969,7 +974,7 @@
# Update known origins with a `last_update` field that we control
base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
updated_origins = [
- attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i))
+ attr.evolve(origin, last_update=base_date - timedelta(seconds=i))
for i, origin in enumerate(origins)
]
updated_origins = swh_scheduler.record_listed_origins(updated_origins)
@@ -1024,6 +1029,98 @@
assert len(ret) == 5
+ def test_grab_next_visits_no_last_update(
+ self, swh_scheduler, listed_origins_by_type
+ ):
+ """Check that grab_next_visits retrieve tasks without last update information"""
+ visit_type = next(iter(listed_origins_by_type))
+
+ origins = []
+ for origin in listed_origins_by_type[visit_type]:
+ origins.append(
+ attr.evolve(origin, last_update=None)
+ ) # void the last update
+
+ assert len(origins) > 0
+
+ swh_scheduler.record_listed_origins(origins[:900])
+
+ # As visit_scheduler_queue_position state and no origin_visit_stats are prepared
+ # yet
+ actual_state = swh_scheduler.visit_scheduler_queue_position_get()
+ assert actual_state == {}
+
+ ret = swh_scheduler.grab_next_visits(
+ visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ )
+
+ # Nothing will get fetched
+ assert len(ret) == 0
+
+ state_position = utcnow() - timedelta(days=1)
+ # Prepare the visit_scheduler_queue_position state for the visit type concerned
+ # here
+ swh_scheduler.visit_scheduler_queue_position_set(visit_type, state_position)
+
+ ret = swh_scheduler.grab_next_visits(
+ visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ )
+
+ # No origin visit stats so nothing get scheduled
+ assert len(ret) == 0
+
+ relevant_origin_index = 900
+ # Simulate the journal client updated information in the right range (1 day)
+ visit_stats = [
+ OriginVisitStats(
+ url=origin.url,
+ visit_type=origin.visit_type,
+ last_eventful=utcnow(),
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=None,
+ next_visit_queue_position=state_position
+ + timedelta(days=random.choice([d * 0.1 for d in range(0, 10)])),
+ )
+ for origin in origins[:relevant_origin_index] # are relevant for test
+ ] + [
+ # those visit statuses whose queue position is "far" in the future gets
+ # ignored
+ OriginVisitStats(
+ url=origin.url,
+ visit_type=origin.visit_type,
+ last_eventful=utcnow(),
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=None,
+ next_visit_queue_position=state_position
+ + timedelta(days=random.choice(range(2, 10))),
+ )
+ for origin in origins[relevant_origin_index:] # will get ignored
+ ]
+
+ swh_scheduler.origin_visit_stats_upsert(visit_stats)
+ actual_visits = swh_scheduler.grab_next_visits(
+ visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ )
+ assert len(actual_visits) == len(origins[:900])
+
+ visit_stats = swh_scheduler.origin_visit_stats_get(
+ (o.url, o.visit_type) for o in actual_visits
+ )
+
+ assert len(visit_stats) == len(actual_visits)
+ for visit_stat, origin in zip(visit_stats, actual_visits):
+ assert origin.url == visit_stat.url
+ assert origin.visit_type == visit_stat.visit_type
+ assert origin.last_update is None
+
+ assert (
+ state_position <= visit_stat.next_visit_queue_position
+ and visit_stat.next_visit_queue_position
+ < state_position + timedelta(days=1)
+ )
+
def test_grab_next_visits_unknown_policy(self, swh_scheduler):
unknown_policy = "non_existing_policy"
NUM_RESULTS = 5
@@ -1287,8 +1384,7 @@
OriginVisitStats(
url=visited_origin.url,
visit_type=visited_origin.visit_type,
- last_eventful=visited_origin.last_update
- - datetime.timedelta(days=1),
+ last_eventful=visited_origin.last_update - timedelta(days=1),
last_uneventful=None,
last_failed=None,
last_notfound=None,
@@ -1334,7 +1430,7 @@
ret = swh_scheduler.update_metrics(timestamp=ts)
assert all(metric.last_update == ts for metric in ret)
- second_ts = ts + datetime.timedelta(seconds=1)
+ second_ts = ts + timedelta(seconds=1)
ret = swh_scheduler.update_metrics(timestamp=second_ts)
assert all(metric.last_update == second_ts for metric in ret)

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 1:24 PM (6 d, 10 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3234231

Event Timeline