Page MenuHomeSoftware Heritage

D5950.diff
No OneTemporary

D5950.diff

diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py
--- a/swh/scheduler/journal_client.py
+++ b/swh/scheduler/journal_client.py
@@ -3,13 +3,15 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from datetime import datetime
+from datetime import datetime, timedelta
+import random
from typing import Dict, List, Optional, Tuple
import attr
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import OriginVisitStats
+from swh.scheduler.utils import utcnow
msg_type = "origin_visit_status"
@@ -36,6 +38,70 @@
)
+def from_position_offset_to_days(position_offset: int) -> int:
+ """Compute position offset to interval in days.
+
+ - index 0 and 1: interval 1 day
+ - index 2, 3 and 4: interval 2 days
+ - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...)
+
+ Args:
+ position_offset: The actual position offset for a given visit stats
+
+ Returns:
+ The offset as an interval in number of days
+
+ """
+ assert position_offset >= 0
+ if position_offset < 2:
+ result = 1
+ elif position_offset < 5:
+ result = 2
+ else:
+ result = 4 ** (position_offset - 4)
+ return result
+
+
+def next_visit_queue_position(
+ queue_position_per_visit_type: Dict, visit_stats: Dict
+) -> datetime:
+ """Compute the next visit queue position for the given visit_stats.
+
+ This takes the visit_stats next_position_offset value and compute a corresponding
+ interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling
+ burst for hosters). Then computes out of this visit interval and the current visit
+ stats's position in the queue a new position.
+
+ As an implementation detail, if the visit stats does not have a queue position yet,
+ this fallbacks to use the current global position (for the same visit type as the
+ visit stats) to compute the new position in the queue. If there is no global state
+ yet for the visit type, this starts up using the ``utcnow`` function as default
+ value.
+
+ Args:
+ queue_position_per_visit_type: The global state of the queue per visit type
+ visit_stats: The actual visit information to compute the next position for
+
+ Returns:
+ The actual next visit queue position for that visit stats
+
+ """
+ days = from_position_offset_to_days(visit_stats["next_position_offset"])
+ random_fudge_factor = random.uniform(-0.1, 0.1)
+ visit_interval = timedelta(days=days * (1 + random_fudge_factor))
+ # Use the current queue position per visit type as starting position if none is
+ # already set
+ default_queue_position = queue_position_per_visit_type.get(
+ visit_stats["visit_type"], utcnow()
+ )
+ current_position = (
+ visit_stats["next_visit_queue_position"]
+ if visit_stats.get("next_visit_queue_position")
+ else default_queue_position
+ )
+ return current_position + visit_interval
+
+
def process_journal_objects(
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface
) -> None:
@@ -96,6 +162,9 @@
for field in attr.fields(OriginVisitStats)
}
+ # Retrieve the global queue state
+ queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get()
+
for msg_dict in interesting_messages:
origin = msg_dict["origin"]
visit_type = msg_dict["type"]
@@ -176,6 +245,13 @@
# Visit this origin less often in the future
update_next_position_offset(visit_stats_d, 1)
+ # Update the next visit queue position (which will be used solely for origin
+ # without any last_update, cf. the dedicated scheduling policy
+ # "origins_without_last_update")
+ visit_stats_d["next_visit_queue_position"] = next_visit_queue_position(
+ queue_position_per_visit_type, visit_stats_d
+ )
+
scheduler.origin_visit_stats_upsert(
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values()
)
diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py
--- a/swh/scheduler/tests/test_journal_client.py
+++ b/swh/scheduler/tests/test_journal_client.py
@@ -4,14 +4,21 @@
# See top-level LICENSE file for more information
import datetime
+from datetime import timedelta
import functools
from itertools import permutations
from unittest.mock import Mock
+import attr
import pytest
from swh.model.hashutil import hash_to_bytes
-from swh.scheduler.journal_client import max_date, process_journal_objects
+from swh.scheduler.journal_client import (
+ from_position_offset_to_days,
+ max_date,
+ next_visit_queue_position,
+ process_journal_objects,
+)
from swh.scheduler.model import ListedOrigin, OriginVisitStats
from swh.scheduler.utils import utcnow
@@ -123,6 +130,20 @@
swh_scheduler.origin_visit_stats_upsert.assert_not_called()
+def assert_visit_stats_ok(actual_visit_stats, expected_visit_stats):
+ """Utility test function to ensure visits stats read from the backend are in the right
+ shape. The comparison on the next_visit_queue_position will be dealt with in
+ dedicated tests so it's not tested in tests that are calling this function.
+
+ """
+ assert len(actual_visit_stats) == len(expected_visit_stats)
+
+ for visit_stats in actual_visit_stats:
+ visit_stats = attr.evolve(visit_stats, next_visit_queue_position=None)
+
+ assert visit_stats in expected_visit_stats
+
+
def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler):
visit_status = {
"origin": "foo",
@@ -138,19 +159,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=None,
- last_uneventful=None,
- last_failed=None,
- last_notfound=visit_status["date"],
- last_snapshot=None,
- next_visit_queue_position=None,
- next_position_offset=5,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=None,
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=visit_status["date"],
+ last_snapshot=None,
+ next_position_offset=5,
+ )
+ ],
+ )
visit_statuses = [
{
@@ -176,19 +199,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=None,
- last_uneventful=None,
- last_failed=None,
- last_notfound=DATE3,
- last_snapshot=None,
- next_visit_queue_position=None,
- next_position_offset=7,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=None,
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=DATE3,
+ last_snapshot=None,
+ next_position_offset=7,
+ )
+ ],
+ )
def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler):
@@ -232,19 +257,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="bar",
- visit_type="git",
- last_eventful=None,
- last_uneventful=None,
- last_failed=DATE3,
- last_notfound=None,
- last_snapshot=None,
- next_visit_queue_position=None,
- next_position_offset=7,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="bar",
+ visit_type="git",
+ last_eventful=None,
+ last_uneventful=None,
+ last_failed=DATE3,
+ last_notfound=None,
+ last_snapshot=None,
+ next_position_offset=7,
+ )
+ ],
+ )
def test_journal_client_origin_visit_status_from_journal_last_failed2(swh_scheduler):
@@ -272,19 +299,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="bar",
- visit_type="git",
- last_eventful=None,
- last_uneventful=None,
- last_failed=DATE2,
- last_notfound=None,
- last_snapshot=None,
- next_visit_queue_position=None,
- next_position_offset=6,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="bar",
+ visit_type="git",
+ last_eventful=None,
+ last_uneventful=None,
+ last_failed=DATE2,
+ last_notfound=None,
+ last_snapshot=None,
+ next_position_offset=6,
+ )
+ ],
+ )
def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler):
@@ -328,19 +357,21 @@
)
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE3,
- last_uneventful=None,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"),
- next_visit_queue_position=None,
- next_position_offset=0,
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE3,
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"),
+ next_position_offset=0,
+ )
+ ],
+ )
def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler):
@@ -377,19 +408,21 @@
actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get(
[(visit_status["origin"], visit_status["type"])]
)
- assert actual_origin_visit_stats == [
- OriginVisitStats(
- url=visit_status["origin"],
- visit_type=visit_status["type"],
- last_eventful=DATE1,
- last_uneventful=visit_status["date"], # most recent date but uneventful
- last_failed=DATE2,
- last_notfound=DATE1,
- last_snapshot=visit_status["snapshot"],
- next_visit_queue_position=None,
- next_position_offset=5, # uneventful so visit less often
- )
- ]
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url=visit_status["origin"],
+ visit_type=visit_status["type"],
+ last_eventful=DATE1,
+ last_uneventful=visit_status["date"], # most recent date but uneventful
+ last_failed=DATE2,
+ last_notfound=DATE1,
+ last_snapshot=visit_status["snapshot"],
+ next_position_offset=5, # uneventful so visit less often
+ )
+ ],
+ )
VISIT_STATUSES = [
@@ -440,22 +473,23 @@
{"origin_visit_status": visit_statuses}, scheduler=swh_scheduler
)
- expected_visit_stats = OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE1 + ONE_DAY,
- last_uneventful=DATE1 + 3 * ONE_DAY,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
- next_visit_queue_position=None,
- next_position_offset=5, # uneventful, visit origin less often in the future
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE1 + ONE_DAY,
+ last_uneventful=DATE1 + 3 * ONE_DAY,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ next_position_offset=5, # uneventful, visit origin less often in future
+ )
+ ],
)
- assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
- expected_visit_stats
- ]
-
VISIT_STATUSES_1 = [
{**ovs, "date": DATE1 + n * ONE_DAY}
@@ -647,20 +681,22 @@
{"origin_visit_status": [visit_status]}, scheduler=swh_scheduler
)
- expected_visit_stats = OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE1,
- last_uneventful=None,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE1,
+ last_uneventful=None,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ )
+ ],
)
- assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
- expected_visit_stats
- ]
-
def test_journal_client_origin_visit_status_several_upsert(swh_scheduler):
"""An old message updates old information
@@ -692,19 +728,22 @@
{"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler
)
- assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
- OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE1,
- last_uneventful=DATE2,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
- next_visit_queue_position=None,
- next_position_offset=5,
- )
- ]
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")])
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE1,
+ last_uneventful=DATE2,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ next_position_offset=5,
+ )
+ ],
+ )
VISIT_STATUSES_SAME_SNAPSHOT = [
@@ -751,16 +790,120 @@
{"origin_visit_status": visit_statuses}, scheduler=swh_scheduler
)
- assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [
- OriginVisitStats(
- url="cavabarder",
- visit_type="hg",
- last_eventful=DATE1,
- last_uneventful=DATE1 + 2 * ONE_YEAR,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
- next_visit_queue_position=None,
- next_position_offset=6, # 2 uneventful visits, whatever the permutation
- )
- ]
+ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get(
+ [("cavabarder", "hg")]
+ )
+ assert_visit_stats_ok(
+ actual_origin_visit_stats,
+ [
+ OriginVisitStats(
+ url="cavabarder",
+ visit_type="hg",
+ last_eventful=DATE1,
+ last_uneventful=DATE1 + 2 * ONE_YEAR,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ next_position_offset=6, # 2 uneventful visits, whatever the permutation
+ )
+ ],
+ )
+
+
+@pytest.mark.parametrize(
+ "position_offset, interval",
+ [
+ (0, 1),
+ (1, 1),
+ (2, 2),
+ (3, 2),
+ (4, 2),
+ (5, 4),
+ (6, 16),
+ (7, 64),
+ (8, 256),
+ (9, 1024),
+ (10, 4096),
+ ],
+)
+def test_journal_client_from_position_offset_to_days(position_offset, interval):
+ assert from_position_offset_to_days(position_offset) == interval
+
+
+def test_journal_client_from_position_offset_to_days_only_positive_input():
+ with pytest.raises(AssertionError):
+ from_position_offset_to_days(-1)
+
+
+@pytest.mark.parametrize(
+ "fudge_factor,next_position_offset", [(0.01, 1), (-0.01, 5), (0.1, 8), (-0.1, 10),]
+)
+def test_next_visit_queue_position(mocker, fudge_factor, next_position_offset):
+ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform")
+ mock_random.return_value = fudge_factor
+
+ date_now = utcnow()
+
+ mock_now = mocker.patch("swh.scheduler.journal_client.utcnow")
+ mock_now.return_value = date_now
+
+ actual_position = next_visit_queue_position(
+ {}, {"next_position_offset": next_position_offset, "visit_type": "svn",}
+ )
+
+ assert actual_position == date_now + timedelta(
+ days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor)
+ )
+
+ assert mock_now.called
+ assert mock_random.called
+
+
+@pytest.mark.parametrize(
+ "fudge_factor,next_position_offset", [(0.02, 2), (-0.02, 3), (0, 7), (-0.09, 9),]
+)
+def test_next_visit_queue_position_with_state(
+ mocker, fudge_factor, next_position_offset
+):
+ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform")
+ mock_random.return_value = fudge_factor
+
+ date_now = utcnow()
+
+ actual_position = next_visit_queue_position(
+ {"git": date_now},
+ {"next_position_offset": next_position_offset, "visit_type": "git",},
+ )
+
+ assert actual_position == date_now + timedelta(
+ days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor)
+ )
+
+ assert mock_random.called
+
+
+@pytest.mark.parametrize(
+ "fudge_factor,next_position_offset", [(0.03, 3), (-0.03, 4), (0.08, 7), (-0.08, 9),]
+)
+def test_next_visit_queue_position_with_next_visit_queue(
+ mocker, fudge_factor, next_position_offset
+):
+ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform")
+ mock_random.return_value = fudge_factor
+
+ date_now = utcnow()
+
+ actual_position = next_visit_queue_position(
+ {},
+ {
+ "next_position_offset": next_position_offset,
+ "visit_type": "hg",
+ "next_visit_queue_position": date_now,
+ },
+ )
+
+ assert actual_position == date_now + timedelta(
+ days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor)
+ )
+
+ assert mock_random.called

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 3:18 AM (8 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217779

Event Timeline