diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -974,6 +974,24 @@ ) return [OriginVisitStats(**row) for row in rows] + @db_transaction() + def visit_scheduler_queue_position_get( + self, db=None, cur=None, + ) -> Dict[str, datetime.datetime]: + cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position") + return {row["visit_type"]: row["position"] for row in cur} + + @db_transaction() + def visit_scheduler_queue_position_set( + self, visit_type: str, position: datetime.datetime, db=None, cur=None, + ) -> None: + query = """ + INSERT INTO visit_scheduler_queue_position(visit_type, position) + VALUES(%s, %s) + ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position + """ + cur.execute(query, (visit_type, position)) + @db_transaction() def update_metrics( self, diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -440,6 +440,25 @@ """ ... + @remote_api_endpoint("visit_scheduler/get") + def visit_scheduler_queue_position_get(self,) -> Dict[str, datetime.datetime]: + """Retrieve all current queue positions for the recurrent visit scheduler. + + Returns + Mapping of visit type to their current queue position + + """ + ... + + @remote_api_endpoint("visit_scheduler/set") + def visit_scheduler_queue_position_set( + self, visit_type: str, position: datetime.datetime + ) -> None: + """Set the current queue position of the recurrent visit scheduler for `visit_type`. + + """ + ... + @remote_api_endpoint("scheduler_metrics/update") def update_metrics( self, diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -3,13 +3,15 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime +from datetime import datetime, timedelta +import random from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats +from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" @@ -26,13 +28,65 @@ return max(filtered_dates) +def update_next_position_offset(visit_stats: Dict, increment: int) -> None: + """Update the next position offset according to existing value and the increment. The + resulting value must be a positive integer. + + """ + visit_stats["next_position_offset"] = max( + 0, visit_stats["next_position_offset"] + increment + ) + + +def from_position_offset_to_days(position_offset: int) -> int: + """Compute from position offset to interval in days. + + - index 0 and 1: interval 1 day + - index 2, 3 and 4: interval 2 days + - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...) + + """ + assert position_offset >= 0 + if position_offset < 2: + result = 1 + elif position_offset < 5: + result = 2 + else: + result = 4 ** (position_offset - 4) + return result + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: - """Read messages from origin_visit_status journal topics, then inserts them in the - scheduler "origin_visit_stats" table. + """Read messages from origin_visit_status journal topic to update "origin_visit_stats" + information on (origin, visit_type). The goal is to compute visit stats information + per origin and visit_type: last_eventful, last_uneventful, last_failed, + last_notfound, last_snapshot, ... + + Details: - Worker function for `JournalClient.process(worker_fn)`, after + - This journal consumes origin visit status information for final visit status + ("full", "partial", "failed", "not_found"). It drops the information on non + final visit statuses ("ongoing", "created"). + + - The snapshot is used to determine the "eventful/uneventful" nature of the + origin visit status. + + - When no snapshot is provided, the visit is considered as failed so the + last_failed column is updated. + + - As there is no time guarantee when reading message from the topic, the code + tries to keep the data in the most timely ordered as possible. + + - Compared to what is already stored in the origin_visit_stats table, only most + recent information is kept. + + - This updates the `next_visit_queue_position` (time at which some new objects + are expected to be added for the origin), and `next_position_offset` (duration + that we expect to wait between visits of this origin). + + This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. """ @@ -79,22 +133,20 @@ visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) - elif msg_dict["status"] == "failed": - visit_stats_d["last_failed"] = max_date( - msg_dict["date"], visit_stats_d.get("last_failed") - ) - elif msg_dict["snapshot"] is None: + update_next_position_offset(visit_stats_d, 1) # visit less often + elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) + update_next_position_offset(visit_stats_d, 1) # visit less often else: # visit with snapshot, something happened if visit_stats_d["last_snapshot"] is None: # first time visit with snapshot, we keep relevant information visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: - # visit with snapshot already stored, last_eventful should already be - # stored + # last_snapshot is set, so an eventful visit should have previously been + # recorded assert visit_stats_d["last_eventful"] is not None latest_recorded_visit_date = max_date( visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] @@ -111,6 +163,8 @@ # new eventful visit (new snapshot) visit_stats_d["last_eventful"] = current_status_date visit_stats_d["last_snapshot"] = msg_dict["snapshot"] + # Visit this origin more often in the future + update_next_position_offset(visit_stats_d, -2) else: # same snapshot as before if ( @@ -127,6 +181,8 @@ visit_stats_d["last_eventful"] = min( visit_stats_d["last_eventful"], current_status_date ) + # Visit this origin less often in the future + update_next_position_offset(visit_stats_d, 1) elif ( latest_recorded_visit_date and current_status_date == latest_recorded_visit_date @@ -137,6 +193,22 @@ else: # uneventful event visit_stats_d["last_uneventful"] = current_status_date + # Visit this origin less often in the future + update_next_position_offset(visit_stats_d, 1) + + # We set the next visit target to its current value + the new visit interval + # multiplied by a random fudge factor (picked in the -/+ 10% range). + days = from_position_offset_to_days(visit_stats_d["next_position_offset"]) + random_fudge_factor = random.choice(range(-10, 11)) / 100 + visit_interval = timedelta(days=days * (1 + random_fudge_factor)) + # FIXME: or is this here that we use visit_scheduler_queue_position.position for + # the given visit type to initialize such value? + current_position = ( + visit_stats_d["next_visit_queue_position"] + if visit_stats_d.get("next_visit_queue_position") + else utcnow() + ) + visit_stats_d["next_visit_queue_position"] = current_position + visit_interval scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -204,6 +204,10 @@ last_snapshot = attr.ib( type=Optional[bytes], validator=type_validator(), default=None ) + next_visit_queue_position = attr.ib( + type=Optional[datetime.datetime], validator=type_validator(), default=None + ) + next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) @last_eventful.validator def check_last_eventful(self, attribute, value): @@ -221,6 +225,10 @@ def check_last_notfound(self, attribute, value): check_timestamptz(value) + @next_visit_queue_position.validator + def check_next_visit_queue_position(self, attribute, value): + check_timestamptz(value) + @attr.s(frozen=True, slots=True) class SchedulerMetrics(BaseSchedulerModel): diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -172,10 +172,16 @@ last_scheduled timestamptz, -- last snapshot resulting from an eventful visit last_snapshot bytea, + -- position in the global queue, the "time" at which we expect the origin to have new + -- objects + next_visit_queue_position timestamptz, + -- duration that we expect to wait between visits of this origin + next_position_offset int not null default 4, primary key (url, visit_type) ); +comment on table origin_visit_stats is 'Aggregated information on visits for each origin in the archive'; comment on column origin_visit_stats.url is 'Origin URL'; comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; @@ -185,6 +191,19 @@ comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; +comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; +comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; + +create table visit_scheduler_queue_position ( + visit_type text not null, + position timestamptz not null, + + primary key (visit_type) +); + +comment on table visit_scheduler_queue_position is 'Current queue position for the recurrent visit scheduler'; +comment on column visit_scheduler_queue_position.visit_type is 'Visit type'; +comment on column visit_scheduler_queue_position.position is 'Current position for the runner of this visit type'; create table scheduler_metrics ( lister_id uuid not null references listers(id), diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -70,6 +70,8 @@ "task_type/create", "task_type/get", "task_type/get_all", + "visit_scheduler/get", + "visit_scheduler/set", "visit_stats/get", "visit_stats/upsert", ) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -11,7 +11,11 @@ import pytest from swh.model.hashutil import hash_to_bytes -from swh.scheduler.journal_client import max_date, process_journal_objects +from swh.scheduler.journal_client import ( + from_position_offset_to_days, + max_date, + process_journal_objects, +) from swh.scheduler.model import ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow @@ -147,6 +151,8 @@ last_failed=None, last_notfound=visit_status["date"], last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=5, ) ] @@ -183,6 +189,8 @@ last_failed=None, last_notfound=DATE3, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=7, ) ] @@ -237,6 +245,8 @@ last_failed=DATE3, last_notfound=None, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=7, ) ] @@ -275,6 +285,8 @@ last_failed=DATE2, last_notfound=None, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=6, ) ] @@ -329,6 +341,8 @@ last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + next_visit_queue_position=None, + next_position_offset=0, ) ] @@ -354,6 +368,8 @@ last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], + next_visit_queue_position=None, + next_position_offset=4, ) ] ) @@ -374,6 +390,8 @@ last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], + next_visit_queue_position=None, + next_position_offset=5, # uneventful so visit less often ) ] @@ -434,6 +452,8 @@ last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=5, # uneventful, visit origin less often in the future ) assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ @@ -647,7 +667,7 @@ def test_journal_client_origin_visit_status_several_upsert(swh_scheduler): - """A duplicated message must be ignored + """An old message updates old information """ visit_status1 = { @@ -676,18 +696,18 @@ {"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=DATE2, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - expected_visit_stats + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1, + last_uneventful=DATE2, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=5, + ) ] @@ -735,16 +755,41 @@ {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1, - last_uneventful=DATE1 + 2 * ONE_YEAR, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [ - expected_visit_stats + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_eventful=DATE1, + last_uneventful=DATE1 + 2 * ONE_YEAR, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=6, # 2 uneventful visits, whatever the permutation + ) ] + + +@pytest.mark.parametrize( + "position_offset, interval", + [ + (0, 1), + (1, 1), + (2, 2), + (3, 2), + (4, 2), + (5, 4), + (6, 16), + (7, 64), + (8, 256), + (9, 1024), + (10, 4096), + ], +) +def test_journal_client_from_position_offset_to_days(position_offset, interval): + assert from_position_offset_to_days(position_offset) == interval + + +def test_journal_client_from_position_offset_to_days_only_positive_input(): + with pytest.raises(AssertionError): + from_position_offset_to_days(-1) diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1204,6 +1204,26 @@ ] ) + def test_visit_scheduler_queue_position( + self, swh_scheduler, listed_origins + ) -> None: + result = swh_scheduler.visit_scheduler_queue_position_get() + assert result == {} + + expected_result = {} + visit_types = set() + for origin in listed_origins: + visit_type = origin.visit_type + if visit_type in visit_types: + continue + visit_types.add(visit_type) + position = utcnow() + swh_scheduler.visit_scheduler_queue_position_set(visit_type, position) + expected_result[visit_type] = position + + result = swh_scheduler.visit_scheduler_queue_position_get() + assert result == expected_result + def test_metrics_origins_known(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins)