diff --git a/sql/updates/31.sql b/sql/updates/31.sql new file mode 100644 --- /dev/null +++ b/sql/updates/31.sql @@ -0,0 +1,25 @@ +-- SWH DB schema upgrade +-- from_version: 30 +-- to_version: 31 +-- description: make next_visit_queue_position integers + +insert into dbversion (version, release, description) + values (31, now(), 'Work In Progress'); + + +alter table + origin_visit_stats + alter column + next_visit_queue_position + type bigint + using extract(epoch from next_visit_queue_position); + +comment on column origin_visit_stats.next_visit_queue_position is + 'Position in the global per origin-type queue at which some new objects are expected to be found'; + +alter table + visit_scheduler_queue_position + alter column + "position" + type bigint + using extract(epoch from "position"); diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -452,7 +452,7 @@ INSERT INTO visit_scheduler_queue_position(visit_type, position) SELECT - visit_type, COALESCE(MAX(next_visit_queue_position), now()) + visit_type, COALESCE(MAX(next_visit_queue_position), 0) FROM selected_origins GROUP BY visit_type ON CONFLICT(visit_type) DO UPDATE @@ -1038,15 +1038,13 @@ return [OriginVisitStats(**row) for row in rows] @db_transaction() - def visit_scheduler_queue_position_get( - self, db=None, cur=None, - ) -> Dict[str, datetime.datetime]: + def visit_scheduler_queue_position_get(self, db=None, cur=None) -> Dict[str, int]: cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position") return {row["visit_type"]: row["position"] for row in cur} @db_transaction() def visit_scheduler_queue_position_set( - self, visit_type: str, position: datetime.datetime, db=None, cur=None, + self, visit_type: str, position: int, db=None, cur=None, ) -> None: query = """ INSERT INTO visit_scheduler_queue_position(visit_type, position) diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -451,7 +451,7 @@ ... @remote_api_endpoint("visit_scheduler/get") - def visit_scheduler_queue_position_get(self,) -> Dict[str, datetime.datetime]: + def visit_scheduler_queue_position_get(self,) -> Dict[str, int]: """Retrieve all current queue positions for the recurrent visit scheduler. Returns @@ -462,7 +462,7 @@ @remote_api_endpoint("visit_scheduler/set") def visit_scheduler_queue_position_set( - self, visit_type: str, position: datetime.datetime + self, visit_type: str, position: int ) -> None: """Set the current queue position of the recurrent visit scheduler for `visit_type`. diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information import copy -from datetime import datetime, timedelta +from datetime import datetime import random from typing import Dict, List, Optional, Tuple @@ -12,7 +12,6 @@ from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import LastVisitStatus, OriginVisitStats -from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" @@ -62,20 +61,19 @@ def next_visit_queue_position( - queue_position_per_visit_type: Dict, visit_stats: Dict -) -> datetime: + queue_position_per_visit_type: Dict[str, int], visit_stats: Dict +) -> int: """Compute the next visit queue position for the given visit_stats. This takes the visit_stats next_position_offset value and compute a corresponding - interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling + interval in "days" (with a random fudge factor of -/+ 10% range to avoid scheduling burst for hosters). Then computes out of this visit interval and the current visit stats's position in the queue a new position. As an implementation detail, if the visit stats does not have a queue position yet, this fallbacks to use the current global position (for the same visit type as the visit stats) to compute the new position in the queue. If there is no global state - yet for the visit type, this starts up using the ``utcnow`` function as default - value. + yet for the visit type, this starts up using 0 as default value. Args: queue_position_per_visit_type: The global state of the queue per visit type @@ -87,12 +85,13 @@ """ days = from_position_offset_to_days(visit_stats["next_position_offset"]) random_fudge_factor = random.uniform(-0.1, 0.1) - visit_interval = timedelta(days=days * (1 + random_fudge_factor)) + visit_interval = int(days * 24 * 3600 * (1 + random_fudge_factor)) # Use the current queue position per visit type as starting position if none is # already set default_queue_position = queue_position_per_visit_type.get( - visit_stats["visit_type"], utcnow() + visit_stats["visit_type"], 0 ) + current_position = ( visit_stats["next_visit_queue_position"] if visit_stats.get("next_visit_queue_position") @@ -161,9 +160,10 @@ - When no snapshot is provided, the visit is considered as failed. - - Finally, the `next_visit_queue_position` (time at which some new objects - are expected to be added for the origin), and `next_position_offset` (duration - that we expect to wait between visits of this origin) are updated. + - Finally, the `next_visit_queue_position` (position in the global per-origin + type queue at which some new objects are expected to be added for the origin), + and `next_position_offset` (duration that we expect to wait between visits of + this origin) are updated. - When visits fails at least {DISABLE_ORIGIN_THRESHOLD} times in a row, the origins are disabled in the scheduler table. It's up to the lister to activate diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -222,7 +222,7 @@ type=Optional[bytes], validator=type_validator(), default=None ) next_visit_queue_position = attr.ib( - type=Optional[datetime.datetime], validator=type_validator(), default=None + type=Optional[int], validator=type_validator(), default=None ) next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) @@ -236,10 +236,6 @@ def check_last_visit(self, attribute, value): check_timestamptz(value) - @next_visit_queue_position.validator - def check_next_visit_queue_position(self, attribute, value): - check_timestamptz(value) - @attr.s(frozen=True, slots=True) class SchedulerMetrics(BaseSchedulerModel): diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -11,7 +11,7 @@ comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (30, now(), 'Work In Progress'); + values (31, now(), 'Work In Progress'); create table task_type ( type text primary key, @@ -173,9 +173,9 @@ last_scheduled timestamptz, -- last snapshot resulting from an eventful visit last_snapshot bytea, - -- position in the global queue, the "time" at which we expect the origin to have new + -- position in the global queue, at which time we expect the origin to have new -- objects - next_visit_queue_position timestamptz, + next_visit_queue_position bigint, -- duration that we expect to wait between visits of this origin next_position_offset int not null default 4, successive_visits int not null default 1, @@ -192,13 +192,13 @@ comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; -comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; +comment on column origin_visit_stats.next_visit_queue_position is 'Position in the global per origin-type queue at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; comment on column origin_visit_stats.successive_visits is 'number of successive visits with the same status'; create table visit_scheduler_queue_position ( visit_type text not null, - position timestamptz not null, + position bigint not null, primary key (visit_type) ); diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -4,7 +4,6 @@ # See top-level LICENSE file for more information import datetime -from datetime import timedelta import functools from itertools import permutations from typing import List @@ -857,20 +856,17 @@ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor - date_now = utcnow() - - mock_now = mocker.patch("swh.scheduler.journal_client.utcnow") - mock_now.return_value = date_now - actual_position = next_visit_queue_position( {}, {"next_position_offset": next_position_offset, "visit_type": "svn",} ) - assert actual_position == date_now + timedelta( - days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + assert actual_position == int( + 24 + * 3600 + * from_position_offset_to_days(next_position_offset) + * (1 + fudge_factor) ) - assert mock_now.called assert mock_random.called @@ -883,15 +879,16 @@ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor - date_now = utcnow() - actual_position = next_visit_queue_position( - {"git": date_now}, + {"git": 0}, {"next_position_offset": next_position_offset, "visit_type": "git",}, ) - assert actual_position == date_now + timedelta( - days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + assert actual_position == int( + 24 + * 3600 + * from_position_offset_to_days(next_position_offset) + * (1 + fudge_factor) ) assert mock_random.called @@ -906,19 +903,20 @@ mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor - date_now = utcnow() - actual_position = next_visit_queue_position( {}, { "next_position_offset": next_position_offset, "visit_type": "hg", - "next_visit_queue_position": date_now, + "next_visit_queue_position": 0, }, ) - assert actual_position == date_now + timedelta( - days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) + assert actual_position == int( + 24 + * 3600 + * from_position_offset_to_days(next_position_offset) + * (1 + fudge_factor) ) assert mock_random.called diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1076,8 +1076,8 @@ swh_scheduler.record_listed_origins(origins) # Initially, we have no global queue position - actual_state = swh_scheduler.visit_scheduler_queue_position_get() - assert actual_state == {} + current_state = swh_scheduler.visit_scheduler_queue_position_get() + assert current_state == {} # nor any visit statuses actual_visit_stats = swh_scheduler.origin_visit_stats_get( @@ -1093,8 +1093,8 @@ assert len(next_visits) == len(origins) # Now the global state got updated - actual_state = swh_scheduler.visit_scheduler_queue_position_get() - assert actual_state[visit_type] is not None + current_state = swh_scheduler.visit_scheduler_queue_position_get() + assert current_state[visit_type] is not None actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in next_visits @@ -1120,10 +1120,9 @@ swh_scheduler.record_listed_origins(origins) # Initially, we have no global queue position - actual_state = swh_scheduler.visit_scheduler_queue_position_get() - assert actual_state == {} + current_state = swh_scheduler.visit_scheduler_queue_position_get() + assert current_state == {} - date_now = utcnow() # Simulate some of those origins have associated visit stats (some with an # existing queue position and some without any) visit_stats = ( @@ -1133,8 +1132,7 @@ visit_type=origin.visit_type, last_successful=utcnow(), last_visit=utcnow(), - next_visit_queue_position=date_now - + timedelta(days=random.uniform(-10, 1)), + next_visit_queue_position=int(24 * 3600 * random.uniform(-10, 1)), ) for origin in origins[:100] ] @@ -1144,8 +1142,9 @@ visit_type=origin.visit_type, last_successful=utcnow(), last_visit=utcnow(), - next_visit_queue_position=date_now - + timedelta(days=random.uniform(1, 10)), # definitely > now() + next_visit_queue_position=int( + 24 * 3600 * random.uniform(1, 10) + ), # definitely > 0 ) for origin in origins[100:150] ] @@ -1173,8 +1172,8 @@ ) assert len(actual_visit_stats) == len(origins) - actual_state = swh_scheduler.visit_scheduler_queue_position_get() - assert actual_state == { + current_state = swh_scheduler.visit_scheduler_queue_position_get() + assert current_state == { visit_type: max( s.next_visit_queue_position for s in actual_visit_stats @@ -1356,7 +1355,7 @@ if visit_type in visit_types: continue visit_types.add(visit_type) - position = utcnow() + position = 42 swh_scheduler.visit_scheduler_queue_position_set(visit_type, position) expected_result[visit_type] = position