Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066234
D5950.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
19 KB
Subscribers
None
D5950.diff
View Options
diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py
--- a/swh/scheduler/journal_client.py
+++ b/swh/scheduler/journal_client.py
@@ -3,13 +3,15 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from datetime import datetime
+from datetime import datetime, timedelta
+import random
from typing import Dict, List, Optional, Tuple
import attr
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import OriginVisitStats
+from swh.scheduler.utils import utcnow
msg_type = "origin_visit_status"
@@ -36,6 +38,70 @@
)
+def from_position_offset_to_days(position_offset: int) -> int:
+ """Compute position offset to interval in days.
+
+ - index 0 and 1: interval 1 day
+ - index 2, 3 and 4: interval 2 days
+ - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...)
+
+ Args:
+ position_offset: The actual position offset for a given visit stats
+
+ Returns:
+ The offset as an interval in number of days
+
+ """
+ assert position_offset >= 0
+ if position_offset < 2:
+ result = 1
+ elif position_offset < 5:
+ result = 2
+ else:
+ result = 4 ** (position_offset - 4)
+ return result
+
+
+def next_visit_queue_position(
+ queue_position_per_visit_type: Dict, visit_stats: Dict
+) -> datetime:
+ """Compute the next visit queue position for the given visit_stats.
+
+ This takes the visit_stats next_position_offset value and compute a corresponding
+ interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling
+ burst for hosters). Then computes out of this visit interval and the current visit
+ stats's position in the queue a new position.
+
+ As an implementation detail, if the visit stats does not have a queue position yet,
+ this fallbacks to use the current global position (for the same visit type as the
+ visit stats) to compute the new position in the queue. If there is no global state
+ yet for the visit type, this starts up using the ``utcnow`` function as default
+ value.
+
+ Args:
+ queue_position_per_visit_type: The global state of the queue per visit type
+ visit_stats: The actual visit information to compute the next position for
+
+ Returns:
+ The actual next visit queue position for that visit stats
+
+ """
+ days = from_position_offset_to_days(visit_stats["next_position_offset"])
+ random_fudge_factor = random.uniform(-0.1, 0.1)
+ visit_interval = timedelta(days=days * (1 + random_fudge_factor))
+ # Use the current queue position per visit type as starting position if none is
+ # already set
+ default_queue_position = queue_position_per_visit_type.get(
+ visit_stats["visit_type"], utcnow()
+ )
+ current_position = (
+ visit_stats["next_visit_queue_position"]
+ if visit_stats.get("next_visit_queue_position")
+ else default_queue_position
+ )
+ return current_position + visit_interval
+
+
def process_journal_objects(
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface
) -> None:
@@ -96,6 +162,9 @@
for field in attr.fields(OriginVisitStats)
}
+ # Retrieve the global queue state
+ queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get()
+
for msg_dict in interesting_messages:
origin = msg_dict["origin"]
visit_type = msg_dict["type"]
@@ -176,6 +245,13 @@
# Visit this origin less often in the future
update_next_position_offset(visit_stats_d, 1)
+ # Update the next visit queue position (which will be used solely for origin
+ # without any last_update, cf. the dedicated scheduling policy
+ # "origins_without_last_update")
+ visit_stats_d["next_visit_queue_position"] = next_visit_queue_position(
+ queue_position_per_visit_type, visit_stats_d
+ )
+
scheduler.origin_visit_stats_upsert(
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values()
)
diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py
--- a/swh/scheduler/tests/test_journal_client.py
+++ b/swh/scheduler/tests/test_journal_client.py
@@ -4,14 +4,21 @@
# See top-level LICENSE file for more information
import datetime
+from datetime import timedelta
import functools
from itertools import permutations
from unittest.mock import Mock
+import attr
import pytest
from swh.model.hashutil import hash_to_bytes
-from swh.scheduler.journal_client import max_date, process_journal_objects
+from swh.scheduler.journal_client import (
+ from_position_offset_to_days,
+ max_date,
+ next_visit_queue_position,
+ process_journal_objects,
+)
from swh.scheduler.model import ListedOrigin, OriginVisitStats
from swh.scheduler.utils import utcnow
@@ -123,6 +130,20 @@
swh_scheduler.origin_visit_stats_upsert.assert_not_called()
+def assert_visit_stats_ok(actual_visit_stats, expected_visit_stats):
+ """Utility test function to ensure visits stats read from the backend are in the right
+ shape. The comparison on the next_visit_queue_position will be dealt with in
+ dedicated tests so it's not tested in tests that are calling this function.
+
+ """
+ assert len(actual_visit_stats) == len(expected_visit_stats)
+
+ for visit_stats in actual_visit_stats:
+ visit_stats = attr.evolve(visit_stats, next_visit_queue_position=None)
+
+ assert visit_stats in expected_visit_stats
+
+
def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler):
visit_status = {
"origin": "foo",
@@ -138,19 +159,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=None,
- last_uneventful=None,
- last_failed=None,
- last_notfound=visit_status["date"],
- last_snapshot=None,
- next_visit_queue_position=None,
- next_position_offset=5,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=None,
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=visit_status["date"],
+ last_snapshot=None,
+ next_position_offset=5,
+ )
+ ],
+ )
visit_statuses = [
{
@@ -176,19 +199,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=None,
- last_uneventful=None,
- last_failed=None,
- last_notfound=DATE3,
- last_snapshot=None,
- next_visit_queue_position=None,
- next_position_offset=7,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=None,
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=DATE3,
+ last_snapshot=None,
+ next_position_offset=7,
+ )
+ ],
+ )
def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler):
@@ -232,19 +257,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="bar",
- visit_type="git",
- last_eventful=None,
- last_uneventful=None,
- last_failed=DATE3,
- last_notfound=None,
- last_snapshot=None,
- next_visit_queue_position=None,
- next_position_offset=7,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="bar",
+ visit_type="git",
+ last_eventful=None,
+ last_uneventful=None,
+ last_failed=DATE3,
+ last_notfound=None,
+ last_snapshot=None,
+ next_position_offset=7,
+ )
+ ],
+ )
def test_journal_client_origin_visit_status_from_journal_last_failed2(swh_scheduler):
@@ -272,19 +299,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="bar",
- visit_type="git",
- last_eventful=None,
- last_uneventful=None,
- last_failed=DATE2,
- last_notfound=None,
- last_snapshot=None,
- next_visit_queue_position=None,
- next_position_offset=6,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="bar",
+ visit_type="git",
+ last_eventful=None,
+ last_uneventful=None,
+ last_failed=DATE2,
+ last_notfound=None,
+ last_snapshot=None,
+ next_position_offset=6,
+ )
+ ],
+ )
def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler):
@@ -328,19 +357,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE3,
- last_uneventful=None,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"),
- next_visit_queue_position=None,
- next_position_offset=0,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE3,
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"),
+ next_position_offset=0,
+ )
+ ],
+ )
def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler):
@@ -377,19 +408,21 @@
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get(
[(visit_status["origin"], visit_status["type"])]
)
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url=visit_status["origin"],
- visit_type=visit_status["type"],
- last_eventful=DATE1,
- last_uneventful=visit_status["date"], # most recent date but uneventful
- last_failed=DATE2,
- last_notfound=DATE1,
- last_snapshot=visit_status["snapshot"],
- next_visit_queue_position=None,
- next_position_offset=5, # uneventful so visit less often
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url=visit_status["origin"],
+ visit_type=visit_status["type"],
+ last_eventful=DATE1,
+ last_uneventful=visit_status["date"], # most recent date but uneventful
+ last_failed=DATE2,
+ last_notfound=DATE1,
+ last_snapshot=visit_status["snapshot"],
+ next_position_offset=5, # uneventful so visit less often
+ )
+ ],
+ )
VISIT_STATUSES = [
@@ -440,22 +473,23 @@
{"origin_visit_status": visit_statuses}, scheduler=swh_scheduler
)
- expected_visit_stats = OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE1 + ONE_DAY,
- last_uneventful=DATE1 + 3 * ONE_DAY,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
- next_visit_queue_position=None,
- next_position_offset=5, # uneventful, visit origin less often in the future
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE1 + ONE_DAY,
+ last_uneventful=DATE1 + 3 * ONE_DAY,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ next_position_offset=5, # uneventful, visit origin less often in future
+ )
+ ],
)
- assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
- expected_visit_stats
- ]
-
VISIT_STATUSES_1 = [
{**ovs, "date": DATE1 + n * ONE_DAY}
@@ -647,20 +681,22 @@
{"origin_visit_status": [visit_status]}, scheduler=swh_scheduler
)
- expected_visit_stats = OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE1,
- last_uneventful=None,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE1,
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ )
+ ],
)
- assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
- expected_visit_stats
- ]
-
def test_journal_client_origin_visit_status_several_upsert(swh_scheduler):
"""An old message updates old information
@@ -692,19 +728,22 @@
{"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler
)
- assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
- OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE1,
- last_uneventful=DATE2,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
- next_visit_queue_position=None,
- next_position_offset=5,
- )
- ]
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE1,
+ last_uneventful=DATE2,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ next_position_offset=5,
+ )
+ ],
+ )
VISIT_STATUSES_SAME_SNAPSHOT = [
@@ -751,16 +790,120 @@
{"origin_visit_status": visit_statuses}, scheduler=swh_scheduler
)
- assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [
- OriginVisitStats(
- url="cavabarder",
- visit_type="hg",
- last_eventful=DATE1,
- last_uneventful=DATE1 + 2 * ONE_YEAR,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
- next_visit_queue_position=None,
- next_position_offset=6, # 2 uneventful visits, whatever the permutation
- )
- ]
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get(
+ [("cavabarder", "hg")]
+ )
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="cavabarder",
+ visit_type="hg",
+ last_eventful=DATE1,
+ last_uneventful=DATE1 + 2 * ONE_YEAR,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ next_position_offset=6, # 2 uneventful visits, whatever the permutation
+ )
+ ],
+ )
+
+
+@pytest.mark.parametrize(
+ "position_offset, interval",
+ [
+ (0, 1),
+ (1, 1),
+ (2, 2),
+ (3, 2),
+ (4, 2),
+ (5, 4),
+ (6, 16),
+ (7, 64),
+ (8, 256),
+ (9, 1024),
+ (10, 4096),
+ ],
+)
+def test_journal_client_from_position_offset_to_days(position_offset, interval):
+ assert from_position_offset_to_days(position_offset) == interval
+
+
+def test_journal_client_from_position_offset_to_days_only_positive_input():
+ with pytest.raises(AssertionError):
+ from_position_offset_to_days(-1)
+
+
+@pytest.mark.parametrize(
+ "fudge_factor,next_position_offset", [(0.01, 1), (-0.01, 5), (0.1, 8), (-0.1, 10),]
+)
+def test_next_visit_queue_position(mocker, fudge_factor, next_position_offset):
+ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform")
+ mock_random.return_value = fudge_factor
+
+ date_now = utcnow()
+
+ mock_now = mocker.patch("swh.scheduler.journal_client.utcnow")
+ mock_now.return_value = date_now
+
+ actual_position = next_visit_queue_position(
+ {}, {"next_position_offset": next_position_offset, "visit_type": "svn",}
+ )
+
+ assert actual_position == date_now + timedelta(
+ days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor)
+ )
+
+ assert mock_now.called
+ assert mock_random.called
+
+
+@pytest.mark.parametrize(
+ "fudge_factor,next_position_offset", [(0.02, 2), (-0.02, 3), (0, 7), (-0.09, 9),]
+)
+def test_next_visit_queue_position_with_state(
+ mocker, fudge_factor, next_position_offset
+):
+ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform")
+ mock_random.return_value = fudge_factor
+
+ date_now = utcnow()
+
+ actual_position = next_visit_queue_position(
+ {"git": date_now},
+ {"next_position_offset": next_position_offset, "visit_type": "git",},
+ )
+
+ assert actual_position == date_now + timedelta(
+ days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor)
+ )
+
+ assert mock_random.called
+
+
+@pytest.mark.parametrize(
+ "fudge_factor,next_position_offset", [(0.03, 3), (-0.03, 4), (0.08, 7), (-0.08, 9),]
+)
+def test_next_visit_queue_position_with_next_visit_queue(
+ mocker, fudge_factor, next_position_offset
+):
+ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform")
+ mock_random.return_value = fudge_factor
+
+ date_now = utcnow()
+
+ actual_position = next_visit_queue_position(
+ {},
+ {
+ "next_position_offset": next_position_offset,
+ "visit_type": "hg",
+ "next_visit_queue_position": date_now,
+ },
+ )
+
+ assert actual_position == date_now + timedelta(
+ days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor)
+ )
+
+ assert mock_random.called
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 3:18 AM (11 w, 14 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217779
Attached To
D5950: journal_client: Compute next position for origin visit
Event Timeline
Log In to Comment