Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345620
D5919.id21362.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
D5919.id21362.diff
View Options
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -974,6 +974,24 @@
)
return [OriginVisitStats(**row) for row in rows]
+ @db_transaction()
+ def visit_scheduler_queue_position_get(
+ self, db=None, cur=None,
+ ) -> Dict[str, datetime.datetime]:
+ cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position")
+ return {row["visit_type"]: row["position"] for row in cur}
+
+ @db_transaction()
+ def visit_scheduler_queue_position_set(
+ self, visit_type: str, position: datetime.datetime, db=None, cur=None,
+ ) -> None:
+ query = """
+ INSERT INTO visit_scheduler_queue_position(visit_type, position)
+ VALUES(%s, %s)
+ ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position
+ """
+ cur.execute(query, (visit_type, position))
+
@db_transaction()
def update_metrics(
self,
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -440,6 +440,25 @@
"""
...
+ @remote_api_endpoint("visit_scheduler/get")
+ def visit_scheduler_queue_position_get(self,) -> Dict[str, datetime.datetime]:
+ """Retrieve all current queue positions for the recurrent visit scheduler.
+
+ Returns
+ Mapping of visit type to their current queue position
+
+ """
+ ...
+
+ @remote_api_endpoint("visit_scheduler/set")
+ def visit_scheduler_queue_position_set(
+ self, visit_type: str, position: datetime.datetime
+ ) -> None:
+ """Set the current queue position of the recurrent visit scheduler for `visit_type`.
+
+ """
+ ...
+
@remote_api_endpoint("scheduler_metrics/update")
def update_metrics(
self,
diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py
--- a/swh/scheduler/journal_client.py
+++ b/swh/scheduler/journal_client.py
@@ -3,13 +3,15 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from datetime import datetime
+from datetime import datetime, timedelta
+import random
from typing import Dict, List, Optional, Tuple
import attr
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import OriginVisitStats
+from swh.scheduler.utils import utcnow
msg_type = "origin_visit_status"
@@ -26,13 +28,65 @@
return max(filtered_dates)
+def update_next_position_offset(visit_stats: Dict, increment: int) -> None:
+ """Update the next position offset according to existing value and the increment. The
+ resulting value must be a positive integer.
+
+ """
+ visit_stats["next_position_offset"] = max(
+ 0, visit_stats["next_position_offset"] + increment
+ )
+
+
+def from_position_offset_to_days(position_offset: int) -> int:
+ """Compute from position offset to interval in days.
+
+ - index 0 and 1: interval 1 day
+ - index 2, 3 and 4: interval 2 days
+ - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...)
+
+ """
+ assert position_offset >= 0
+ if position_offset < 2:
+ result = 1
+ elif position_offset < 5:
+ result = 2
+ else:
+ result = 4 ** (position_offset - 4)
+ return result
+
+
def process_journal_objects(
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface
) -> None:
- """Read messages from origin_visit_status journal topics, then inserts them in the
- scheduler "origin_visit_stats" table.
+ """Read messages from origin_visit_status journal topic to update "origin_visit_stats"
+ information on (origin, visit_type). The goal is to compute visit stats information
+ per origin and visit_type: last_eventful, last_uneventful, last_failed,
+ last_notfound, last_snapshot, ...
+
+ Details:
- Worker function for `JournalClient.process(worker_fn)`, after
+ - This journal consumes origin visit status information for final visit status
+ ("full", "partial", "failed", "not_found"). It drops the information on non
+ final visit statuses ("ongoing", "created").
+
+ - The snapshot is used to determine the "eventful/uneventful" nature of the
+ origin visit status.
+
+ - When no snapshot is provided, the visit is considered as failed so the
+ last_failed column is updated.
+
+ - As there is no time guarantee when reading message from the topic, the code
+ tries to keep the data in the most timely ordered as possible.
+
+ - Compared to what is already stored in the origin_visit_stats table, only most
+ recent information is kept.
+
+ - This updates the `next_visit_queue_position` (time at which some new objects
+ are expected to be added for the origin), and `next_position_offset` (duration
+ that we expect to wait between visits of this origin).
+
+ This is a worker function to be used with `JournalClient.process(worker_fn)`, after
currification of `scheduler` and `task_names`.
"""
@@ -79,22 +133,20 @@
visit_stats_d["last_notfound"] = max_date(
msg_dict["date"], visit_stats_d.get("last_notfound")
)
- elif msg_dict["status"] == "failed":
- visit_stats_d["last_failed"] = max_date(
- msg_dict["date"], visit_stats_d.get("last_failed")
- )
- elif msg_dict["snapshot"] is None:
+ update_next_position_offset(visit_stats_d, 1) # visit less often
+ elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None:
visit_stats_d["last_failed"] = max_date(
msg_dict["date"], visit_stats_d.get("last_failed")
)
+ update_next_position_offset(visit_stats_d, 1) # visit less often
else: # visit with snapshot, something happened
if visit_stats_d["last_snapshot"] is None:
# first time visit with snapshot, we keep relevant information
visit_stats_d["last_eventful"] = msg_dict["date"]
visit_stats_d["last_snapshot"] = msg_dict["snapshot"]
else:
- # visit with snapshot already stored, last_eventful should already be
- # stored
+ # last_snapshot is set, so an eventful visit should have previously been
+ # recorded
assert visit_stats_d["last_eventful"] is not None
latest_recorded_visit_date = max_date(
visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"]
@@ -111,6 +163,8 @@
# new eventful visit (new snapshot)
visit_stats_d["last_eventful"] = current_status_date
visit_stats_d["last_snapshot"] = msg_dict["snapshot"]
+ # Visit this origin more often in the future
+ update_next_position_offset(visit_stats_d, -2)
else:
# same snapshot as before
if (
@@ -127,6 +181,8 @@
visit_stats_d["last_eventful"] = min(
visit_stats_d["last_eventful"], current_status_date
)
+ # Visit this origin less often in the future
+ update_next_position_offset(visit_stats_d, 1)
elif (
latest_recorded_visit_date
and current_status_date == latest_recorded_visit_date
@@ -137,6 +193,22 @@
else:
# uneventful event
visit_stats_d["last_uneventful"] = current_status_date
+ # Visit this origin less often in the future
+ update_next_position_offset(visit_stats_d, 1)
+
+ # We set the next visit target to its current value + the new visit interval
+ # multiplied by a random fudge factor (picked in the -/+ 10% range).
+ days = from_position_offset_to_days(visit_stats_d["next_position_offset"])
+ random_fudge_factor = random.choice(range(-10, 11)) / 100
+ visit_interval = timedelta(days=days * (1 + random_fudge_factor))
+ # FIXME: or is this here that we use visit_scheduler_queue_position.position for
+ # the given visit type to initialize such value?
+ current_position = (
+ visit_stats_d["next_visit_queue_position"]
+ if visit_stats_d.get("next_visit_queue_position")
+ else utcnow()
+ )
+ visit_stats_d["next_visit_queue_position"] = current_position + visit_interval
scheduler.origin_visit_stats_upsert(
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values()
diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py
--- a/swh/scheduler/model.py
+++ b/swh/scheduler/model.py
@@ -204,6 +204,10 @@
last_snapshot = attr.ib(
type=Optional[bytes], validator=type_validator(), default=None
)
+ next_visit_queue_position = attr.ib(
+ type=Optional[datetime.datetime], validator=type_validator(), default=None
+ )
+ next_position_offset = attr.ib(type=int, validator=type_validator(), default=4)
@last_eventful.validator
def check_last_eventful(self, attribute, value):
@@ -221,6 +225,10 @@
def check_last_notfound(self, attribute, value):
check_timestamptz(value)
+ @next_visit_queue_position.validator
+ def check_next_visit_queue_position(self, attribute, value):
+ check_timestamptz(value)
+
@attr.s(frozen=True, slots=True)
class SchedulerMetrics(BaseSchedulerModel):
diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql
--- a/swh/scheduler/sql/30-schema.sql
+++ b/swh/scheduler/sql/30-schema.sql
@@ -172,10 +172,16 @@
last_scheduled timestamptz,
-- last snapshot resulting from an eventful visit
last_snapshot bytea,
+ -- position in the global queue, the "time" at which we expect the origin to have new
+ -- objects
+ next_visit_queue_position timestamptz,
+ -- duration that we expect to wait between visits of this origin
+ next_position_offset int not null default 4,
primary key (url, visit_type)
);
+comment on table origin_visit_stats is 'Aggregated information on visits for each origin in the archive';
comment on column origin_visit_stats.url is 'Origin URL';
comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url';
comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event';
@@ -185,6 +191,19 @@
comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last';
comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot';
+comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found';
+comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin';
+
+create table visit_scheduler_queue_position (
+ visit_type text not null,
+ position timestamptz not null,
+
+ primary key (visit_type)
+);
+
+comment on table visit_scheduler_queue_position is 'Current queue position for the recurrent visit scheduler';
+comment on column visit_scheduler_queue_position.visit_type is 'Visit type';
+comment on column visit_scheduler_queue_position.position is 'Current position for the runner of this visit type';
create table scheduler_metrics (
lister_id uuid not null references listers(id),
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -70,6 +70,8 @@
"task_type/create",
"task_type/get",
"task_type/get_all",
+ "visit_scheduler/get",
+ "visit_scheduler/set",
"visit_stats/get",
"visit_stats/upsert",
)
diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py
--- a/swh/scheduler/tests/test_journal_client.py
+++ b/swh/scheduler/tests/test_journal_client.py
@@ -11,7 +11,11 @@
import pytest
from swh.model.hashutil import hash_to_bytes
-from swh.scheduler.journal_client import max_date, process_journal_objects
+from swh.scheduler.journal_client import (
+ from_position_offset_to_days,
+ max_date,
+ process_journal_objects,
+)
from swh.scheduler.model import ListedOrigin, OriginVisitStats
from swh.scheduler.utils import utcnow
@@ -147,6 +151,8 @@
last_failed=None,
last_notfound=visit_status["date"],
last_snapshot=None,
+ next_visit_queue_position=None,
+ next_position_offset=5,
)
]
@@ -183,6 +189,8 @@
last_failed=None,
last_notfound=DATE3,
last_snapshot=None,
+ next_visit_queue_position=None,
+ next_position_offset=7,
)
]
@@ -237,6 +245,8 @@
last_failed=DATE3,
last_notfound=None,
last_snapshot=None,
+ next_visit_queue_position=None,
+ next_position_offset=7,
)
]
@@ -275,6 +285,8 @@
last_failed=DATE2,
last_notfound=None,
last_snapshot=None,
+ next_visit_queue_position=None,
+ next_position_offset=6,
)
]
@@ -329,6 +341,8 @@
last_failed=None,
last_notfound=None,
last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"),
+ next_visit_queue_position=None,
+ next_position_offset=0,
)
]
@@ -354,6 +368,8 @@
last_failed=DATE2,
last_notfound=DATE1,
last_snapshot=visit_status["snapshot"],
+ next_visit_queue_position=None,
+ next_position_offset=4,
)
]
)
@@ -374,6 +390,8 @@
last_failed=DATE2,
last_notfound=DATE1,
last_snapshot=visit_status["snapshot"],
+ next_visit_queue_position=None,
+ next_position_offset=5, # uneventful so visit less often
)
]
@@ -434,6 +452,8 @@
last_failed=None,
last_notfound=None,
last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ next_visit_queue_position=None,
+ next_position_offset=5, # uneventful, visit origin less often in the future
)
assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
@@ -647,7 +667,7 @@
def test_journal_client_origin_visit_status_several_upsert(swh_scheduler):
- """A duplicated message must be ignored
+ """An old message updates old information
"""
visit_status1 = {
@@ -676,18 +696,18 @@
{"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler
)
- expected_visit_stats = OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE1,
- last_uneventful=DATE2,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
- )
-
assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
- expected_visit_stats
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE1,
+ last_uneventful=DATE2,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ next_visit_queue_position=None,
+ next_position_offset=5,
+ )
]
@@ -735,16 +755,41 @@
{"origin_visit_status": visit_statuses}, scheduler=swh_scheduler
)
- expected_visit_stats = OriginVisitStats(
- url="cavabarder",
- visit_type="hg",
- last_eventful=DATE1,
- last_uneventful=DATE1 + 2 * ONE_YEAR,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
- )
-
assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [
- expected_visit_stats
+ OriginVisitStats(
+ url="cavabarder",
+ visit_type="hg",
+ last_eventful=DATE1,
+ last_uneventful=DATE1 + 2 * ONE_YEAR,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ next_visit_queue_position=None,
+ next_position_offset=6, # 2 uneventful visits, whatever the permutation
+ )
]
+
+
+@pytest.mark.parametrize(
+ "position_offset, interval",
+ [
+ (0, 1),
+ (1, 1),
+ (2, 2),
+ (3, 2),
+ (4, 2),
+ (5, 4),
+ (6, 16),
+ (7, 64),
+ (8, 256),
+ (9, 1024),
+ (10, 4096),
+ ],
+)
+def test_journal_client_from_position_offset_to_days(position_offset, interval):
+ assert from_position_offset_to_days(position_offset) == interval
+
+
+def test_journal_client_from_position_offset_to_days_only_positive_input():
+ with pytest.raises(AssertionError):
+ from_position_offset_to_days(-1)
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -1204,6 +1204,26 @@
]
)
+ def test_visit_scheduler_queue_position(
+ self, swh_scheduler, listed_origins
+ ) -> None:
+ result = swh_scheduler.visit_scheduler_queue_position_get()
+ assert result == {}
+
+ expected_result = {}
+ visit_types = set()
+ for origin in listed_origins:
+ visit_type = origin.visit_type
+ if visit_type in visit_types:
+ continue
+ visit_types.add(visit_type)
+ position = utcnow()
+ swh_scheduler.visit_scheduler_queue_position_set(visit_type, position)
+ expected_result[visit_type] = position
+
+ result = swh_scheduler.visit_scheduler_queue_position_get()
+ assert result == expected_result
+
def test_metrics_origins_known(self, swh_scheduler, listed_origins):
swh_scheduler.record_listed_origins(listed_origins)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:27 PM (1 w, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226908
Attached To
D5919: Start handling of recurrent loading tasks in scheduler
Event Timeline
Log In to Comment