Page MenuHomeSoftware Heritage

D5919.id21362.diff
No OneTemporary

D5919.id21362.diff

diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -974,6 +974,24 @@
)
return [OriginVisitStats(**row) for row in rows]
+ @db_transaction()
+ def visit_scheduler_queue_position_get(
+ self, db=None, cur=None,
+ ) -> Dict[str, datetime.datetime]:
+ cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position")
+ return {row["visit_type"]: row["position"] for row in cur}
+
+ @db_transaction()
+ def visit_scheduler_queue_position_set(
+ self, visit_type: str, position: datetime.datetime, db=None, cur=None,
+ ) -> None:
+ query = """
+ INSERT INTO visit_scheduler_queue_position(visit_type, position)
+ VALUES(%s, %s)
+ ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position
+ """
+ cur.execute(query, (visit_type, position))
+
@db_transaction()
def update_metrics(
self,
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -440,6 +440,25 @@
"""
...
+ @remote_api_endpoint("visit_scheduler/get")
+ def visit_scheduler_queue_position_get(self,) -> Dict[str, datetime.datetime]:
+ """Retrieve all current queue positions for the recurrent visit scheduler.
+
+ Returns
+ Mapping of visit type to their current queue position
+
+ """
+ ...
+
+ @remote_api_endpoint("visit_scheduler/set")
+ def visit_scheduler_queue_position_set(
+ self, visit_type: str, position: datetime.datetime
+ ) -> None:
+ """Set the current queue position of the recurrent visit scheduler for `visit_type`.
+
+ """
+ ...
+
@remote_api_endpoint("scheduler_metrics/update")
def update_metrics(
self,
diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py
--- a/swh/scheduler/journal_client.py
+++ b/swh/scheduler/journal_client.py
@@ -3,13 +3,15 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from datetime import datetime
+from datetime import datetime, timedelta
+import random
from typing import Dict, List, Optional, Tuple
import attr
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import OriginVisitStats
+from swh.scheduler.utils import utcnow
msg_type = "origin_visit_status"
@@ -26,13 +28,65 @@
return max(filtered_dates)
+def update_next_position_offset(visit_stats: Dict, increment: int) -> None:
+ """Update the next position offset according to existing value and the increment. The
+ resulting value must be a positive integer.
+
+ """
+ visit_stats["next_position_offset"] = max(
+ 0, visit_stats["next_position_offset"] + increment
+ )
+
+
+def from_position_offset_to_days(position_offset: int) -> int:
+ """Compute from position offset to interval in days.
+
+ - index 0 and 1: interval 1 day
+ - index 2, 3 and 4: interval 2 days
+ - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...)
+
+ """
+ assert position_offset >= 0
+ if position_offset < 2:
+ result = 1
+ elif position_offset < 5:
+ result = 2
+ else:
+ result = 4 ** (position_offset - 4)
+ return result
+
+
def process_journal_objects(
messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface
) -> None:
- """Read messages from origin_visit_status journal topics, then inserts them in the
- scheduler "origin_visit_stats" table.
+ """Read messages from origin_visit_status journal topic to update "origin_visit_stats"
+ information on (origin, visit_type). The goal is to compute visit stats information
+ per origin and visit_type: last_eventful, last_uneventful, last_failed,
+ last_notfound, last_snapshot, ...
+
+ Details:
- Worker function for `JournalClient.process(worker_fn)`, after
+ - This journal consumes origin visit status information for final visit status
+ ("full", "partial", "failed", "not_found"). It drops the information on non
+ final visit statuses ("ongoing", "created").
+
+ - The snapshot is used to determine the "eventful/uneventful" nature of the
+ origin visit status.
+
+ - When no snapshot is provided, the visit is considered as failed so the
+ last_failed column is updated.
+
+ - As there is no time guarantee when reading message from the topic, the code
+ tries to keep the data in the most timely ordered as possible.
+
+ - Compared to what is already stored in the origin_visit_stats table, only most
+ recent information is kept.
+
+ - This updates the `next_visit_queue_position` (time at which some new objects
+ are expected to be added for the origin), and `next_position_offset` (duration
+ that we expect to wait between visits of this origin).
+
+ This is a worker function to be used with `JournalClient.process(worker_fn)`, after
currification of `scheduler` and `task_names`.
"""
@@ -79,22 +133,20 @@
visit_stats_d["last_notfound"] = max_date(
msg_dict["date"], visit_stats_d.get("last_notfound")
)
- elif msg_dict["status"] == "failed":
- visit_stats_d["last_failed"] = max_date(
- msg_dict["date"], visit_stats_d.get("last_failed")
- )
- elif msg_dict["snapshot"] is None:
+ update_next_position_offset(visit_stats_d, 1) # visit less often
+ elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None:
visit_stats_d["last_failed"] = max_date(
msg_dict["date"], visit_stats_d.get("last_failed")
)
+ update_next_position_offset(visit_stats_d, 1) # visit less often
else: # visit with snapshot, something happened
if visit_stats_d["last_snapshot"] is None:
# first time visit with snapshot, we keep relevant information
visit_stats_d["last_eventful"] = msg_dict["date"]
visit_stats_d["last_snapshot"] = msg_dict["snapshot"]
else:
- # visit with snapshot already stored, last_eventful should already be
- # stored
+ # last_snapshot is set, so an eventful visit should have previously been
+ # recorded
assert visit_stats_d["last_eventful"] is not None
latest_recorded_visit_date = max_date(
visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"]
@@ -111,6 +163,8 @@
# new eventful visit (new snapshot)
visit_stats_d["last_eventful"] = current_status_date
visit_stats_d["last_snapshot"] = msg_dict["snapshot"]
+ # Visit this origin more often in the future
+ update_next_position_offset(visit_stats_d, -2)
else:
# same snapshot as before
if (
@@ -127,6 +181,8 @@
visit_stats_d["last_eventful"] = min(
visit_stats_d["last_eventful"], current_status_date
)
+ # Visit this origin less often in the future
+ update_next_position_offset(visit_stats_d, 1)
elif (
latest_recorded_visit_date
and current_status_date == latest_recorded_visit_date
@@ -137,6 +193,22 @@
else:
# uneventful event
visit_stats_d["last_uneventful"] = current_status_date
+ # Visit this origin less often in the future
+ update_next_position_offset(visit_stats_d, 1)
+
+ # We set the next visit target to its current value + the new visit interval
+ # multiplied by a random fudge factor (picked in the -/+ 10% range).
+ days = from_position_offset_to_days(visit_stats_d["next_position_offset"])
+ random_fudge_factor = random.choice(range(-10, 11)) / 100
+ visit_interval = timedelta(days=days * (1 + random_fudge_factor))
+ # FIXME: or is this here that we use visit_scheduler_queue_position.position for
+ # the given visit type to initialize such value?
+ current_position = (
+ visit_stats_d["next_visit_queue_position"]
+ if visit_stats_d.get("next_visit_queue_position")
+ else utcnow()
+ )
+ visit_stats_d["next_visit_queue_position"] = current_position + visit_interval
scheduler.origin_visit_stats_upsert(
OriginVisitStats(**ovs) for ovs in origin_visit_stats.values()
diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py
--- a/swh/scheduler/model.py
+++ b/swh/scheduler/model.py
@@ -204,6 +204,10 @@
last_snapshot = attr.ib(
type=Optional[bytes], validator=type_validator(), default=None
)
+ next_visit_queue_position = attr.ib(
+ type=Optional[datetime.datetime], validator=type_validator(), default=None
+ )
+ next_position_offset = attr.ib(type=int, validator=type_validator(), default=4)
@last_eventful.validator
def check_last_eventful(self, attribute, value):
@@ -221,6 +225,10 @@
def check_last_notfound(self, attribute, value):
check_timestamptz(value)
+ @next_visit_queue_position.validator
+ def check_next_visit_queue_position(self, attribute, value):
+ check_timestamptz(value)
+
@attr.s(frozen=True, slots=True)
class SchedulerMetrics(BaseSchedulerModel):
diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql
--- a/swh/scheduler/sql/30-schema.sql
+++ b/swh/scheduler/sql/30-schema.sql
@@ -172,10 +172,16 @@
last_scheduled timestamptz,
-- last snapshot resulting from an eventful visit
last_snapshot bytea,
+ -- position in the global queue, the "time" at which we expect the origin to have new
+ -- objects
+ next_visit_queue_position timestamptz,
+ -- duration that we expect to wait between visits of this origin
+ next_position_offset int not null default 4,
primary key (url, visit_type)
);
+comment on table origin_visit_stats is 'Aggregated information on visits for each origin in the archive';
comment on column origin_visit_stats.url is 'Origin URL';
comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url';
comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event';
@@ -185,6 +191,19 @@
comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last';
comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot';
+comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found';
+comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin';
+
+create table visit_scheduler_queue_position (
+ visit_type text not null,
+ position timestamptz not null,
+
+ primary key (visit_type)
+);
+
+comment on table visit_scheduler_queue_position is 'Current queue position for the recurrent visit scheduler';
+comment on column visit_scheduler_queue_position.visit_type is 'Visit type';
+comment on column visit_scheduler_queue_position.position is 'Current position for the runner of this visit type';
create table scheduler_metrics (
lister_id uuid not null references listers(id),
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -70,6 +70,8 @@
"task_type/create",
"task_type/get",
"task_type/get_all",
+ "visit_scheduler/get",
+ "visit_scheduler/set",
"visit_stats/get",
"visit_stats/upsert",
)
diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py
--- a/swh/scheduler/tests/test_journal_client.py
+++ b/swh/scheduler/tests/test_journal_client.py
@@ -11,7 +11,11 @@
import pytest
from swh.model.hashutil import hash_to_bytes
-from swh.scheduler.journal_client import max_date, process_journal_objects
+from swh.scheduler.journal_client import (
+ from_position_offset_to_days,
+ max_date,
+ process_journal_objects,
+)
from swh.scheduler.model import ListedOrigin, OriginVisitStats
from swh.scheduler.utils import utcnow
@@ -147,6 +151,8 @@
last_failed=None,
last_notfound=visit_status["date"],
last_snapshot=None,
+ next_visit_queue_position=None,
+ next_position_offset=5,
)
]
@@ -183,6 +189,8 @@
last_failed=None,
last_notfound=DATE3,
last_snapshot=None,
+ next_visit_queue_position=None,
+ next_position_offset=7,
)
]
@@ -237,6 +245,8 @@
last_failed=DATE3,
last_notfound=None,
last_snapshot=None,
+ next_visit_queue_position=None,
+ next_position_offset=7,
)
]
@@ -275,6 +285,8 @@
last_failed=DATE2,
last_notfound=None,
last_snapshot=None,
+ next_visit_queue_position=None,
+ next_position_offset=6,
)
]
@@ -329,6 +341,8 @@
last_failed=None,
last_notfound=None,
last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"),
+ next_visit_queue_position=None,
+ next_position_offset=0,
)
]
@@ -354,6 +368,8 @@
last_failed=DATE2,
last_notfound=DATE1,
last_snapshot=visit_status["snapshot"],
+ next_visit_queue_position=None,
+ next_position_offset=4,
)
]
)
@@ -374,6 +390,8 @@
last_failed=DATE2,
last_notfound=DATE1,
last_snapshot=visit_status["snapshot"],
+ next_visit_queue_position=None,
+ next_position_offset=5, # uneventful so visit less often
)
]
@@ -434,6 +452,8 @@
last_failed=None,
last_notfound=None,
last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
+ next_visit_queue_position=None,
+ next_position_offset=5, # uneventful, visit origin less often in the future
)
assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
@@ -647,7 +667,7 @@
def test_journal_client_origin_visit_status_several_upsert(swh_scheduler):
- """A duplicated message must be ignored
+ """An old message updates old information
"""
visit_status1 = {
@@ -676,18 +696,18 @@
{"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler
)
- expected_visit_stats = OriginVisitStats(
- url="foo",
- visit_type="git",
- last_eventful=DATE1,
- last_uneventful=DATE2,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
- )
-
assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [
- expected_visit_stats
+ OriginVisitStats(
+ url="foo",
+ visit_type="git",
+ last_eventful=DATE1,
+ last_uneventful=DATE2,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ next_visit_queue_position=None,
+ next_position_offset=5,
+ )
]
@@ -735,16 +755,41 @@
{"origin_visit_status": visit_statuses}, scheduler=swh_scheduler
)
- expected_visit_stats = OriginVisitStats(
- url="cavabarder",
- visit_type="hg",
- last_eventful=DATE1,
- last_uneventful=DATE1 + 2 * ONE_YEAR,
- last_failed=None,
- last_notfound=None,
- last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
- )
-
assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [
- expected_visit_stats
+ OriginVisitStats(
+ url="cavabarder",
+ visit_type="hg",
+ last_eventful=DATE1,
+ last_uneventful=DATE1 + 2 * ONE_YEAR,
+ last_failed=None,
+ last_notfound=None,
+ last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"),
+ next_visit_queue_position=None,
+ next_position_offset=6, # 2 uneventful visits, whatever the permutation
+ )
]
+
+
+@pytest.mark.parametrize(
+ "position_offset, interval",
+ [
+ (0, 1),
+ (1, 1),
+ (2, 2),
+ (3, 2),
+ (4, 2),
+ (5, 4),
+ (6, 16),
+ (7, 64),
+ (8, 256),
+ (9, 1024),
+ (10, 4096),
+ ],
+)
+def test_journal_client_from_position_offset_to_days(position_offset, interval):
+ assert from_position_offset_to_days(position_offset) == interval
+
+
+def test_journal_client_from_position_offset_to_days_only_positive_input():
+ with pytest.raises(AssertionError):
+ from_position_offset_to_days(-1)
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -1204,6 +1204,26 @@
]
)
+ def test_visit_scheduler_queue_position(
+ self, swh_scheduler, listed_origins
+ ) -> None:
+ result = swh_scheduler.visit_scheduler_queue_position_get()
+ assert result == {}
+
+ expected_result = {}
+ visit_types = set()
+ for origin in listed_origins:
+ visit_type = origin.visit_type
+ if visit_type in visit_types:
+ continue
+ visit_types.add(visit_type)
+ position = utcnow()
+ swh_scheduler.visit_scheduler_queue_position_set(visit_type, position)
+ expected_result[visit_type] = position
+
+ result = swh_scheduler.visit_scheduler_queue_position_get()
+ assert result == expected_result
+
def test_metrics_origins_known(self, swh_scheduler, listed_origins):
swh_scheduler.record_listed_origins(listed_origins)

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:27 PM (1 w, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226908

Event Timeline