Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9343290
D5956.id21407.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D5956.id21407.diff
View Options
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -19,7 +19,11 @@
from swh.scheduler.utils import utcnow
from .exc import SchedulerException, StaleData, UnknownPolicy
-from .interface import ListedOriginPageToken, PaginatedListedOriginList
+from .interface import (
+ SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ ListedOriginPageToken,
+ PaginatedListedOriginList,
+)
from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics
logger = logging.getLogger(__name__)
@@ -340,7 +344,9 @@
if timestamp is None:
timestamp = utcnow()
- origin_select_cols = ", ".join(ListedOrigin.select_columns())
+ origin_select_cols = ", ".join(
+ f"lo.{col}" for col in ListedOrigin.select_columns()
+ )
query_args: List[Any] = []
@@ -350,7 +356,7 @@
where_clauses.append("enabled")
# Only schedule visits of the given type
- where_clauses.append("visit_type = %s")
+ where_clauses.append("lo.visit_type = %s")
query_args.append(visit_type)
if scheduled_cooldown:
@@ -396,8 +402,8 @@
where_clauses.append("origin_visit_stats.last_snapshot IS NULL")
# order by increasing last_update (oldest first)
- where_clauses.append("listed_origins.last_update IS NOT NULL")
- order_by = "listed_origins.last_update"
+ where_clauses.append("lo.last_update IS NOT NULL")
+ order_by = "lo.last_update"
elif policy == "already_visited_order_by_lag":
# TODO: store "visit lag" in a materialized view?
@@ -405,10 +411,10 @@
where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL")
# ignore origins we have visited after the known last update
- where_clauses.append("listed_origins.last_update IS NOT NULL")
+ where_clauses.append("lo.last_update IS NOT NULL")
where_clauses.append(
"""
- listed_origins.last_update
+ lo.last_update
> GREATEST(
origin_visit_stats.last_eventful,
origin_visit_stats.last_uneventful
@@ -418,13 +424,27 @@
# order by decreasing visit lag
order_by = """\
- listed_origins.last_update
+ lo.last_update
- GREATEST(
origin_visit_stats.last_eventful,
origin_visit_stats.last_uneventful
)
DESC
"""
+ elif policy == SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE:
+ where_clauses.append("lo.last_update IS NULL")
+ where_clauses.append(
+ "origin_visit_stats.next_visit_queue_position IS NOT NULL"
+ )
+ where_clauses.append(
+ "vsqp.position <= origin_visit_stats.next_visit_queue_position"
+ )
+ # TODO: Determine if it's actually what we want... this chooses an interval
+ # of one day to fetch visits to schedule...
+ where_clauses.append(
+ "origin_visit_stats.next_visit_queue_position < vsqp.position + '1 day'"
+ )
+ order_by = "lo.last_update"
else:
raise UnknownPolicy(f"Unknown scheduling policy {policy}")
@@ -432,9 +452,12 @@
SELECT
{origin_select_cols}
FROM
- listed_origins
+ listed_origins lo
LEFT JOIN
origin_visit_stats USING (url, visit_type)
+ LEFT JOIN
+ visit_scheduler_queue_position vsqp
+ on vsqp.visit_type = lo.visit_type
WHERE
({") AND (".join(where_clauses)})
ORDER BY
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -16,6 +16,8 @@
ListedOriginPageToken = Tuple[str, str]
+SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE = "origins_without_last_update"
+
class PaginatedListedOriginList(PagedResult[ListedOrigin, ListedOriginPageToken]):
"""A list of listed origins, with a continuation token"""
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -6,6 +6,7 @@
from collections import defaultdict
import copy
import datetime
+from datetime import timedelta
import inspect
import random
from typing import Any, Dict, List, Optional, Tuple
@@ -17,7 +18,11 @@
from swh.model.hashutil import hash_to_bytes
from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy
-from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface
+from swh.scheduler.interface import (
+ SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ ListedOriginPageToken,
+ SchedulerInterface,
+)
from swh.scheduler.model import ListedOrigin, OriginVisitStats, SchedulerMetrics
from swh.scheduler.utils import utcnow
@@ -29,7 +34,7 @@
tasks_with_priority_from_template,
)
-ONEDAY = datetime.timedelta(days=1)
+ONEDAY = timedelta(days=1)
NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20}
@@ -819,7 +824,7 @@
visit_type=visit_type,
count=len(expected) + 1,
policy=policy,
- timestamp=after + datetime.timedelta(days=7),
+ timestamp=after + timedelta(days=7),
)
# We need to sort them because their 'last_scheduled' field is updated to
# exactly the same value, so the order is not deterministic
@@ -845,7 +850,7 @@
last_uneventful=None,
last_failed=None,
last_notfound=None,
- last_scheduled=base_date - datetime.timedelta(seconds=i),
+ last_scheduled=base_date - timedelta(seconds=i),
)
for i, origin in enumerate(origins[1:])
]
@@ -903,7 +908,7 @@
]
swh_scheduler.origin_visit_stats_upsert(visit_stats)
- cooldown_td = datetime.timedelta(days=cooldown)
+ cooldown_td = timedelta(days=cooldown)
cooldown_args = {
"scheduled_cooldown": None,
"failed_cooldown": None,
@@ -915,7 +920,7 @@
visit_type=visit_type,
count=len(expected) + 1,
policy="oldest_scheduled_first",
- timestamp=after + cooldown_td - datetime.timedelta(seconds=1),
+ timestamp=after + cooldown_td - timedelta(seconds=1),
**cooldown_args,
)
@@ -925,7 +930,7 @@
visit_type=visit_type,
count=len(expected) + 1,
policy="oldest_scheduled_first",
- timestamp=after + cooldown_td + datetime.timedelta(seconds=1),
+ timestamp=after + cooldown_td + timedelta(seconds=1),
**cooldown_args,
)
@@ -943,7 +948,7 @@
# Update known origins with a `last_update` field that we control
base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
updated_origins = [
- attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i))
+ attr.evolve(origin, last_update=base_date - timedelta(seconds=i))
for i, origin in enumerate(origins)
]
updated_origins = swh_scheduler.record_listed_origins(updated_origins)
@@ -969,7 +974,7 @@
# Update known origins with a `last_update` field that we control
base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
updated_origins = [
- attr.evolve(origin, last_update=base_date - datetime.timedelta(seconds=i))
+ attr.evolve(origin, last_update=base_date - timedelta(seconds=i))
for i, origin in enumerate(origins)
]
updated_origins = swh_scheduler.record_listed_origins(updated_origins)
@@ -1024,6 +1029,98 @@
assert len(ret) == 5
+ def test_grab_next_visits_no_last_update(
+ self, swh_scheduler, listed_origins_by_type
+ ):
+ """Check that grab_next_visits retrieve tasks without last update information"""
+ visit_type = next(iter(listed_origins_by_type))
+
+ origins = []
+ for origin in listed_origins_by_type[visit_type]:
+ origins.append(
+ attr.evolve(origin, last_update=None)
+ ) # void the last update
+
+ assert len(origins) > 0
+
+ swh_scheduler.record_listed_origins(origins[:900])
+
+ # As visit_scheduler_queue_position state and no origin_visit_stats are prepared
+ # yet
+ actual_state = swh_scheduler.visit_scheduler_queue_position_get()
+ assert actual_state == {}
+
+ ret = swh_scheduler.grab_next_visits(
+ visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ )
+
+ # Nothing will get fetched
+ assert len(ret) == 0
+
+ state_position = utcnow() - timedelta(days=1)
+ # Prepare the visit_scheduler_queue_position state for the visit type concerned
+ # here
+ swh_scheduler.visit_scheduler_queue_position_set(visit_type, state_position)
+
+ ret = swh_scheduler.grab_next_visits(
+ visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ )
+
+ # No origin visit stats so nothing get scheduled
+ assert len(ret) == 0
+
+ relevant_origin_index = 900
+ # Simulate the journal client updated information in the right range (1 day)
+ visit_stats = [
+ OriginVisitStats(
+ url=origin.url,
+ visit_type=origin.visit_type,
+ last_eventful=utcnow(),
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=None,
+ next_visit_queue_position=state_position
+ + timedelta(days=random.choice([d * 0.1 for d in range(0, 10)])),
+ )
+ for origin in origins[:relevant_origin_index] # are relevant for test
+ ] + [
+ # those visit statuses whose queue position is "far" in the future gets
+ # ignored
+ OriginVisitStats(
+ url=origin.url,
+ visit_type=origin.visit_type,
+ last_eventful=utcnow(),
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=None,
+ next_visit_queue_position=state_position
+ + timedelta(days=random.choice(range(2, 10))),
+ )
+ for origin in origins[relevant_origin_index:] # will get ignored
+ ]
+
+ swh_scheduler.origin_visit_stats_upsert(visit_stats)
+ actual_visits = swh_scheduler.grab_next_visits(
+ visit_type, len(origins), policy=SCHEDULING_POLICY_ORIGINS_NO_LAST_UPDATE,
+ )
+ assert len(actual_visits) == len(origins[:900])
+
+ visit_stats = swh_scheduler.origin_visit_stats_get(
+ (o.url, o.visit_type) for o in actual_visits
+ )
+
+ assert len(visit_stats) == len(actual_visits)
+ for visit_stat, origin in zip(visit_stats, actual_visits):
+ assert origin.url == visit_stat.url
+ assert origin.visit_type == visit_stat.visit_type
+ assert origin.last_update is None
+
+ assert (
+ state_position <= visit_stat.next_visit_queue_position
+ and visit_stat.next_visit_queue_position
+ < state_position + timedelta(days=1)
+ )
+
def test_grab_next_visits_unknown_policy(self, swh_scheduler):
unknown_policy = "non_existing_policy"
NUM_RESULTS = 5
@@ -1287,8 +1384,7 @@
OriginVisitStats(
url=visited_origin.url,
visit_type=visited_origin.visit_type,
- last_eventful=visited_origin.last_update
- - datetime.timedelta(days=1),
+ last_eventful=visited_origin.last_update - timedelta(days=1),
last_uneventful=None,
last_failed=None,
last_notfound=None,
@@ -1334,7 +1430,7 @@
ret = swh_scheduler.update_metrics(timestamp=ts)
assert all(metric.last_update == ts for metric in ret)
- second_ts = ts + datetime.timedelta(seconds=1)
+ second_ts = ts + timedelta(seconds=1)
ret = swh_scheduler.update_metrics(timestamp=second_ts)
assert all(metric.last_update == second_ts for metric in ret)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 1:24 PM (6 d, 10 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3234231
Attached To
D5956: Introduce new scheduling policy to grab origins without last update
Event Timeline
Log In to Comment