diff --git a/sql/updates/29.sql b/sql/updates/29.sql index fef95f7..b097b24 100644 --- a/sql/updates/29.sql +++ b/sql/updates/29.sql @@ -1,27 +1,31 @@ -- SWH DB schema upgrade -- from_version: 28 -- to_version: 29 -- description: insert into dbversion (version, release, description) values (29, now(), 'Work In Progress'); alter table origin_visit_stats add column next_visit_queue_position timestamptz; alter table origin_visit_stats add column next_position_offset int not null default 4; +alter table origin_visit_stats +add column successive_visits int not null default 1; + comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; +comment on column origin_visit_stats.successive_visits is 'number of successive visits with the same status'; create table visit_scheduler_queue_position ( visit_type text not null, position timestamptz not null, primary key (visit_type) ); comment on table visit_scheduler_queue_position is 'Current queue position for the recurrent visit scheduler'; comment on column visit_scheduler_queue_position.visit_type is 'Visit type'; comment on column visit_scheduler_queue_position.position is 'Current position for the runner of this visit type'; diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py index b3889be..d35f183 100644 --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -1,255 +1,263 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import random from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import LastVisitStatus, OriginVisitStats from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates At least one date must be not None. """ filtered_dates = [d for d in dates if d is not None] if not filtered_dates: raise ValueError("At least one date should be a valid datetime") return max(filtered_dates) def from_position_offset_to_days(position_offset: int) -> int: """Compute position offset to interval in days. - index 0 and 1: interval 1 day - index 2, 3 and 4: interval 2 days - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...) Args: position_offset: The actual position offset for a given visit stats Returns: The offset as an interval in number of days """ assert position_offset >= 0 if position_offset < 2: result = 1 elif position_offset < 5: result = 2 else: result = 4 ** (position_offset - 4) return result def next_visit_queue_position( queue_position_per_visit_type: Dict, visit_stats: Dict ) -> datetime: """Compute the next visit queue position for the given visit_stats. This takes the visit_stats next_position_offset value and compute a corresponding interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling burst for hosters). Then computes out of this visit interval and the current visit stats's position in the queue a new position. As an implementation detail, if the visit stats does not have a queue position yet, this fallbacks to use the current global position (for the same visit type as the visit stats) to compute the new position in the queue. If there is no global state yet for the visit type, this starts up using the ``utcnow`` function as default value. Args: queue_position_per_visit_type: The global state of the queue per visit type visit_stats: The actual visit information to compute the next position for Returns: The actual next visit queue position for that visit stats """ days = from_position_offset_to_days(visit_stats["next_position_offset"]) random_fudge_factor = random.uniform(-0.1, 0.1) visit_interval = timedelta(days=days * (1 + random_fudge_factor)) # Use the current queue position per visit type as starting position if none is # already set default_queue_position = queue_position_per_visit_type.get( visit_stats["visit_type"], utcnow() ) current_position = ( visit_stats["next_visit_queue_position"] if visit_stats.get("next_visit_queue_position") else default_queue_position ) return current_position + visit_interval def get_last_status( incoming_visit_status: Dict, known_visit_stats: Dict ) -> Tuple[LastVisitStatus, Optional[bool]]: """Determine the `last_visit_status` and eventfulness of an origin according to the received visit_status object, and the state of the origin_visit_stats in db. Note that at the time this function is called, out of order messages were already discarded. Thus why the implementation is rather simple. Args: incoming_visit_status: Incoming visit status read ouf of the journal known_visit_stats: Visit stats already registered in the backend Returns: A tuple of (LastVisitStatus, Optional[bool]). LastVisitStatus represents the successfulness of the visit. Optional[bool] represents whether the snapshot is fresher than before (True/False) or None if there is no snapshot at all. """ status = incoming_visit_status["status"] if status in ("not_found", "failed"): return LastVisitStatus(status), None assert status in ("full", "partial") if incoming_visit_status["snapshot"] is None: return LastVisitStatus.failed, None if incoming_visit_status["snapshot"] != known_visit_stats.get("last_snapshot"): return LastVisitStatus.successful, True return LastVisitStatus.successful, False def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topic to update "origin_visit_stats" information on (origin, visit_type). The goal is to compute visit stats information per origin and visit_type: `last_successful`, `last_visit`, `last_visit_status`, ... Details: - This journal consumes origin visit status information for final visit status (`"full"`, `"partial"`, `"failed"`, `"not_found"`). It drops the information of non final visit statuses (`"ongoing"`, `"created"`). - This journal client only considers messages that arrive in chronological order. Messages that arrive out of order (i.e. with a date field smaller than the latest recorded visit of the origin) are ignored. This is a tradeoff between correctness and simplicity of implementation [1]_. - The snapshot is used to determine the eventful or uneventful nature of the origin visit. - When no snapshot is provided, the visit is considered as failed. - Finally, the `next_visit_queue_position` (time at which some new objects are expected to be added for the origin), and `next_position_offset` (duration that we expect to wait between visits of this origin) are updated. This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. .. [1] Ignoring out of order messages makes the initialization of the origin_visit_status table (from a full journal) less deterministic: only the `last_visit`, `last_visit_state` and `last_successful` fields are guaranteed to be exact, the `next_position_offset` field is a best effort estimate (which should converge once the client has run for a while on in-order messages). """ assert set(messages) <= { msg_type }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" interesting_messages = [ msg for msg in messages[msg_type] if "type" in msg and msg["status"] not in ("created", "ongoing") ] if not interesting_messages: return origin_visit_stats: Dict[Tuple[str, str], Dict] = { (visit_stats.url, visit_stats.visit_type): attr.asdict(visit_stats) for visit_stats in scheduler.origin_visit_stats_get( list(set((vs["origin"], vs["type"]) for vs in interesting_messages)) ) } # Use the default values from the model object empty_object = { field.name: field.default if field.default != attr.NOTHING else None for field in attr.fields(OriginVisitStats) } # Retrieve the global queue state queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() for msg_dict in interesting_messages: origin = msg_dict["origin"] visit_type = msg_dict["type"] pk = origin, visit_type if pk not in origin_visit_stats: origin_visit_stats[pk] = { **empty_object, "url": origin, "visit_type": visit_type, } visit_stats_d = origin_visit_stats[pk] if ( visit_stats_d.get("last_visit") and msg_dict["date"] <= visit_stats_d["last_visit"] ): # message received out of order, ignore continue # Compare incoming message to known status of the origin, to determine # eventfulness last_visit_status, eventful = get_last_status(msg_dict, visit_stats_d) # Update the position offset according to the visit status, # if we had already visited this origin before. if visit_stats_d.get("last_visit"): # Update the next position offset according to the existing value and the # eventfulness of the visit. increment = -2 if eventful else 1 visit_stats_d["next_position_offset"] = max( 0, visit_stats_d["next_position_offset"] + increment ) + # increment the counter when last_visit_status is the same + same_visit_status = last_visit_status == visit_stats_d["last_visit_status"] + else: + same_visit_status = False # Record current visit date as highest known date (we've rejected out of order # messages earlier). visit_stats_d["last_visit"] = msg_dict["date"] visit_stats_d["last_visit_status"] = last_visit_status # Record last successful visit date if last_visit_status == LastVisitStatus.successful: visit_stats_d["last_successful"] = max_date( msg_dict["date"], visit_stats_d.get("last_successful") ) visit_stats_d["last_snapshot"] = msg_dict["snapshot"] # Update the next visit queue position (which will be used solely for origin # without any last_update, cf. the dedicated scheduling policy # "origins_without_last_update") visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( queue_position_per_visit_type, visit_stats_d ) + visit_stats_d["successive_visits"] = ( + visit_stats_d["successive_visits"] + 1 if same_visit_status else 1 + ) + scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py index b10cff9..6c08e75 100644 --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -1,271 +1,273 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from enum import Enum from typing import Any, Dict, List, Optional, Tuple, Union from uuid import UUID import attr import attr.converters from attrs_strict import type_validator def check_timestamptz(value) -> None: """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") @attr.s class BaseSchedulerModel: """Base class for database-backed objects. These database-backed objects are defined through attrs-based attributes that match the columns of the database 1:1. This is a (very) lightweight ORM. These attrs-based attributes have metadata specific to the functionality expected from these fields in the database: - `primary_key`: the column is a primary key; it should be filtered out when doing an `update` of the object - `auto_primary_key`: the column is a primary key, which is automatically handled by the database. It will not be inserted to. This must be matched with a database-side default value. - `auto_now_add`: the column is a timestamp that is set to the current time when the object is inserted, and never updated afterwards. This must be matched with a database-side default value. - `auto_now`: the column is a timestamp that is set to the current time when the object is inserted or updated. """ _pk_cols: Optional[Tuple[str, ...]] = None _select_cols: Optional[Tuple[str, ...]] = None _insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None @classmethod def primary_key_columns(cls) -> Tuple[str, ...]: """Get the primary key columns for this object type""" if cls._pk_cols is None: columns: List[str] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_primary_key", "primary_key") ): columns.append(field.name) cls._pk_cols = tuple(sorted(columns)) return cls._pk_cols @classmethod def select_columns(cls) -> Tuple[str, ...]: """Get all the database columns needed for a `select` on this object type""" if cls._select_cols is None: columns: List[str] = [] for field in attr.fields(cls): columns.append(field.name) cls._select_cols = tuple(sorted(columns)) return cls._select_cols @classmethod def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: """Get the database columns and metavars needed for an `insert` or `update` on this object type. This implements support for the `auto_*` field metadata attributes. """ if cls._insert_cols_and_metavars is None: zipped_cols_and_metavars: List[Tuple[str, str]] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_now_add", "auto_primary_key") ): continue elif field.metadata.get("auto_now"): zipped_cols_and_metavars.append((field.name, "now()")) else: zipped_cols_and_metavars.append((field.name, f"%({field.name})s")) zipped_cols_and_metavars.sort() cols, metavars = zip(*zipped_cols_and_metavars) cls._insert_cols_and_metavars = cols, metavars return cls._insert_cols_and_metavars @attr.s class Lister(BaseSchedulerModel): name = attr.ib(type=str, validator=[type_validator()]) instance_name = attr.ib(type=str, validator=[type_validator()]) # Populated by database id = attr.ib( type=Optional[UUID], validator=type_validator(), default=None, metadata={"auto_primary_key": True}, ) current_state = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) created = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) updated = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) @attr.s class ListedOrigin(BaseSchedulerModel): """Basic information about a listed origin, output by a lister""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) extra_loader_arguments = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) enabled = attr.ib(type=bool, validator=[type_validator()], default=True) first_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) last_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) def as_task_dict(self): return { "type": f"load-{self.visit_type}", "arguments": { "args": [], "kwargs": {"url": self.url, **self.extra_loader_arguments}, }, } class LastVisitStatus(Enum): successful = "successful" failed = "failed" not_found = "not_found" def convert_last_visit_status( s: Union[None, str, LastVisitStatus] ) -> Optional[LastVisitStatus]: if not isinstance(s, str): return s return LastVisitStatus(s) @attr.s(frozen=True, slots=True) class OriginVisitStats(BaseSchedulerModel): """Represents an aggregated origin visits view. """ url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) last_successful = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None ) last_visit = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None ) last_visit_status = attr.ib( type=Optional[LastVisitStatus], validator=type_validator(), default=None, converter=convert_last_visit_status, ) last_scheduled = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) last_snapshot = attr.ib( type=Optional[bytes], validator=type_validator(), default=None ) next_visit_queue_position = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None ) next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) + successive_visits = attr.ib(type=int, validator=type_validator(), default=1) + @last_successful.validator def check_last_successful(self, attribute, value): check_timestamptz(value) @last_visit.validator def check_last_visit(self, attribute, value): check_timestamptz(value) @next_visit_queue_position.validator def check_next_visit_queue_position(self, attribute, value): check_timestamptz(value) @attr.s(frozen=True, slots=True) class SchedulerMetrics(BaseSchedulerModel): """Metrics for the scheduler, aggregated by (lister_id, visit_type)""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) origins_known = attr.ib(type=int, validator=[type_validator()], default=0) """Number of known (enabled or disabled) origins""" origins_enabled = attr.ib(type=int, validator=[type_validator()], default=0) """Number of origins that were present in the latest listings""" origins_never_visited = attr.ib(type=int, validator=[type_validator()], default=0) """Number of enabled origins that have never been visited (according to the visit cache)""" origins_with_pending_changes = attr.ib( type=int, validator=[type_validator()], default=0 ) """Number of enabled origins with known activity (recorded by a lister) since our last visit""" diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql index 8f160be..399e453 100644 --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -1,227 +1,229 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) values (30, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval, min_interval interval, max_interval interval, backoff_factor float, max_queue_length bigint, num_retries bigint, retry_delay interval ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; comment on column task_type.num_retries is 'Default number of retries on transient failures'; comment on column task_type.retry_delay is 'Retry delay for the task'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'); comment on type task_status is 'Status of a given task'; create type task_policy as enum ('recurring', 'oneshot'); comment on type task_policy is 'Recurrence policy of the given task'; create type task_priority as enum('high', 'normal', 'low'); comment on type task_priority is 'Priority of the given task'; create table priority_ratio( id task_priority primary key, ratio float not null ); comment on table priority_ratio is 'Oneshot task''s reading ratio per priority'; comment on column priority_ratio.id is 'Task priority id'; comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority'; insert into priority_ratio (id, ratio) values ('high', 0.5); insert into priority_ratio (id, ratio) values ('normal', 0.3); insert into priority_ratio (id, ratio) values ('low', 0.2); create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval, status task_status not null, policy task_policy not null default 'recurring', retries_left bigint not null default 0, priority task_priority references priority_ratio(id), check (policy <> 'recurring' or current_interval is not null) ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; comment on column task.policy is 'Whether the task is one-shot or recurring'; comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' 'transient failure'; comment on column task.priority is 'Policy of the given task'; comment on column task.id is 'Task Identifier'; comment on column task.type is 'References task_type table'; comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')'; create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'); comment on type task_run_status is 'Status of a given task run'; create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; comment on column task_run.id is 'Task run identifier'; comment on column task_run.task is 'References task table'; comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; create table if not exists listers ( id uuid primary key default uuid_generate_v4(), name text not null, instance_name text not null, created timestamptz not null default now(), -- auto_now_add in the model current_state jsonb not null, updated timestamptz not null ); comment on table listers is 'Lister instances known to the origin visit scheduler'; comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)'; comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)'; comment on column listers.created is 'Timestamp at which the lister was originally created'; comment on column listers.current_state is 'Known current state of this lister'; comment on column listers.updated is 'Timestamp at which the lister state was last updated'; create table if not exists listed_origins ( -- Basic information lister_id uuid not null references listers(id), url text not null, visit_type text not null, extra_loader_arguments jsonb not null, -- Whether this origin still exists or not enabled boolean not null, -- time-based information first_seen timestamptz not null default now(), last_seen timestamptz not null, -- potentially provided by the lister last_update timestamptz, primary key (lister_id, url, visit_type) ); comment on table listed_origins is 'Origins known to the origin visit scheduler'; comment on column listed_origins.lister_id is 'Lister instance which owns this origin'; comment on column listed_origins.url is 'URL of the origin listed'; comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url'; comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin'; comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.'; comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister'; comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister'; comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote'; create type last_visit_status as enum ('successful', 'failed', 'not_found'); comment on type last_visit_status is 'Record of the status of the last visit of an origin'; create table origin_visit_stats ( url text not null, visit_type text not null, last_successful timestamptz, last_visit timestamptz, last_visit_status last_visit_status, -- visit scheduling information last_scheduled timestamptz, -- last snapshot resulting from an eventful visit last_snapshot bytea, -- position in the global queue, the "time" at which we expect the origin to have new -- objects next_visit_queue_position timestamptz, -- duration that we expect to wait between visits of this origin next_position_offset int not null default 4, + successive_visits int not null default 1, primary key (url, visit_type) ); comment on table origin_visit_stats is 'Aggregated information on visits for each origin in the archive'; comment on column origin_visit_stats.url is 'Origin URL'; comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; comment on column origin_visit_stats.last_successful is 'Date of the last successful visit, at which we recorded the `last_snapshot`'; comment on column origin_visit_stats.last_visit is 'Date of the last visit overall'; comment on column origin_visit_stats.last_visit_status is 'Status of the last visit'; comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; +comment on column origin_visit_stats.successive_visits is 'number of successive visits with the same status'; create table visit_scheduler_queue_position ( visit_type text not null, position timestamptz not null, primary key (visit_type) ); comment on table visit_scheduler_queue_position is 'Current queue position for the recurrent visit scheduler'; comment on column visit_scheduler_queue_position.visit_type is 'Visit type'; comment on column visit_scheduler_queue_position.position is 'Current position for the runner of this visit type'; create table scheduler_metrics ( lister_id uuid not null references listers(id), visit_type text not null, last_update timestamptz not null, origins_known int not null default 0, origins_enabled int not null default 0, origins_never_visited int not null default 0, origins_with_pending_changes int not null default 0, primary key (lister_id, visit_type) ); comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; comment on column scheduler_metrics.last_update is 'Last update of these metrics'; comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been successfully visited'; comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py index 1c06562..4f3a222 100644 --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -1,895 +1,923 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from datetime import timedelta import functools from itertools import permutations from typing import List from unittest.mock import Mock import attr import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.journal_client import ( from_position_offset_to_days, max_date, next_visit_queue_position, process_journal_objects, ) from swh.scheduler.model import LastVisitStatus, ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) with pytest.raises(AssertionError, match="Got unexpected origin_visit"): process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) with pytest.raises(AssertionError, match="Expected origin_visit_status"): process_fn({}) ONE_DAY = datetime.timedelta(days=1) ONE_YEAR = datetime.timedelta(days=366) DATE3 = utcnow() DATE2 = DATE3 - ONE_DAY DATE1 = DATE2 - ONE_DAY assert DATE1 < DATE2 < DATE3 @pytest.mark.parametrize( "dates,expected_max_date", [ ((DATE1,), DATE1), ((None, DATE2), DATE2), ((DATE1, None), DATE1), ((DATE1, DATE2), DATE2), ((DATE2, DATE1), DATE2), ((DATE1, DATE2, DATE3), DATE3), ((None, DATE2, DATE3), DATE3), ((None, None, DATE3), DATE3), ((DATE1, None, DATE3), DATE3), ], ) def test_max_date(dates, expected_max_date): assert max_date(*dates) == expected_max_date def test_max_date_raise(): with pytest.raises(ValueError, match="valid datetime"): max_date() with pytest.raises(ValueError, match="valid datetime"): max_date(None) with pytest.raises(ValueError, match="valid datetime"): max_date(None, None) def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): """Only final statuses (full, partial) are important, the rest remain ignored. """ # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) visit_statuses = [ { "origin": "foo", "visit": 1, "status": "created", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "ongoing", "date": utcnow(), "type": "svn", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # All messages have been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def test_journal_client_ignore_missing_type(swh_scheduler): """Ignore statuses with missing type key""" # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) date = utcnow() snapshot = hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd") visit_statuses = [ { "origin": "foo", "visit": 1, "status": "full", "date": date, "snapshot": snapshot, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # The message has been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def assert_visit_stats_ok( actual_visit_stats: OriginVisitStats, expected_visit_stats: OriginVisitStats, ignore_fields: List[str] = ["next_visit_queue_position"], ): """Utility test function to ensure visits stats read from the backend are in the right shape. The comparison on the next_visit_queue_position will be dealt with in dedicated tests so it's not tested in tests that are calling this function. """ fields = attr.fields_dict(OriginVisitStats) defaults = {field: fields[field].default for field in ignore_fields} actual_visit_stats = attr.evolve(actual_visit_stats, **defaults) assert actual_visit_stats == expected_visit_stats def test_journal_client_origin_visit_status_from_journal_last_not_found(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "not_found", "date": DATE1, "type": "git", "snapshot": None, } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_visit=visit_status["date"], last_visit_status=LastVisitStatus.not_found, next_position_offset=4, + successive_visits=1, ), ) visit_statuses = [ { "origin": "foo", "visit": 3, "status": "not_found", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "foo", "visit": 4, "status": "not_found", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_visit=DATE3, last_visit_status=LastVisitStatus.not_found, next_position_offset=6, + successive_visits=3, ), ) def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): visit_statuses = [ { "origin": "foo", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 2, "status": "full", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="bar", visit_type="git", last_visit=DATE3, last_visit_status=LastVisitStatus.failed, next_position_offset=6, + successive_visits=3, ), ) def test_journal_client_origin_visit_status_from_journal_last_failed2(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 2, "status": "failed", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "bar", "visit": 3, "status": "failed", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="bar", visit_type="git", last_visit=DATE2, last_visit_status=LastVisitStatus.failed, next_position_offset=5, + successive_visits=2, ), ) def test_journal_client_origin_visit_status_from_journal_last_successful(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), }, { "origin": "foo", "visit": 2, "status": "partial", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), }, { "origin": "foo", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_successful=DATE3, last_visit=DATE3, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), next_position_offset=0, + successive_visits=3, ), ) def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE3 + ONE_DAY, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), } # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_successful=DATE2, last_visit=DATE3, last_visit_status=LastVisitStatus.failed, last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=4, + successive_visits=1, ) ] ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) + assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_visit=DATE3 + ONE_DAY, last_successful=DATE3 + ONE_DAY, last_visit_status=LastVisitStatus.successful, last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=5, + successive_visits=1, ), ) VISIT_STATUSES = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "foo", "type": "git", "visit": 1, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "type": "git", "visit": 2, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) ) def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) visit_stats = actual_origin_visit_stats[0] assert_visit_stats_ok( visit_stats, OriginVisitStats( url="foo", visit_type="git", last_successful=DATE1 + 3 * ONE_DAY, last_visit=DATE1 + 3 * ONE_DAY, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ), - ignore_fields=["next_visit_queue_position", "next_position_offset"], + ignore_fields=[ + "next_visit_queue_position", + "next_position_offset", + "successive_visits", + ], ) # We ignore out of order messages, so the next_position_offset isn't exact # depending on the permutation. What matters is consistency of the final # dates (last_visit and last_successful). assert 4 <= visit_stats.next_position_offset <= 5 + # same goes for successive_visits + assert 1 <= visit_stats.successive_visits <= 2 VISIT_STATUSES_1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "partial", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_1, len(VISIT_STATUSES_1)) ) def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_visit_stats = swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) visit_stats = actual_visit_stats[0] assert_visit_stats_ok( visit_stats, OriginVisitStats( url="cavabarder", visit_type="hg", last_successful=DATE1 + 3 * ONE_DAY, last_visit=DATE1 + 3 * ONE_DAY, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), ), - ignore_fields=["next_visit_queue_position", "next_position_offset"], + ignore_fields=[ + "next_visit_queue_position", + "next_position_offset", + "successive_visits", + ], ) # We ignore out of order messages, so the next_position_offset isn't exact # depending on the permutation. What matters is consistency of the final # dates (last_visit and last_successful). assert 2 <= visit_stats.next_position_offset <= 5 + # same goes for successive_visits + assert 1 <= visit_stats.successive_visits <= 4 VISIT_STATUSES_2 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("0000000000000000000000000000000000000000"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("1111111111111111111111111111111111111111"), }, { "origin": "iciaussi", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("2222222222222222222222222222222222222222"), }, { "origin": "iciaussi", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("3333333333333333333333333333333333333333"), }, { "origin": "cavabarder", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("4444444444444444444444444444444444444444"), }, { "origin": "cavabarder", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("5555555555555555555555555555555555555555"), }, { "origin": "iciaussi", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("6666666666666666666666666666666666666666"), }, { "origin": "iciaussi", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("7777777777777777777777777777777777777777"), }, ] ) ] def test_journal_client_origin_visit_status_after_grab_next_visits( swh_scheduler, stored_lister ): """Ensure OriginVisitStat entries created in the db as a result of calling grab_next_visits() do not mess the OriginVisitStats upsert mechanism. """ listed_origins = [ ListedOrigin(lister_id=stored_lister.id, url=url, visit_type=visit_type) for (url, visit_type) in set((v["origin"], v["type"]) for v in VISIT_STATUSES_2) ] swh_scheduler.record_listed_origins(listed_origins) before = utcnow() swh_scheduler.grab_next_visits( visit_type="git", count=10, policy="oldest_scheduled_first" ) after = utcnow() assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [] assert swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] is not None process_journal_objects( {"origin_visit_status": VISIT_STATUSES_2}, scheduler=swh_scheduler ) for url in ("cavabarder", "iciaussi"): ovs = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] assert before <= ovs.last_scheduled <= after ovs = swh_scheduler.origin_visit_stats_get([(url, "hg")])[0] assert ovs.last_scheduled is None ovs = swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] assert ovs.last_successful == DATE1 + 5 * ONE_DAY assert ovs.last_visit == DATE1 + 5 * ONE_DAY assert ovs.last_visit_status == LastVisitStatus.successful assert ovs.last_snapshot == hash_to_bytes( "5555555555555555555555555555555555555555" ) def test_journal_client_origin_visit_status_duplicated_messages(swh_scheduler): """A duplicated message must be ignored """ visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_successful=DATE1, last_visit=DATE1, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + successive_visits=1, ), ) def test_journal_client_origin_visit_status_several_upsert(swh_scheduler): """An old message updates old information """ visit_status1 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } visit_status2 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status2]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( actual_origin_visit_stats[0], OriginVisitStats( url="foo", visit_type="git", last_successful=DATE2, last_visit=DATE2, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), next_position_offset=4, + successive_visits=1, ), ) VISIT_STATUSES_SAME_SNAPSHOT = [ {**ovs, "date": DATE1 + n * ONE_YEAR} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_SAME_SNAPSHOT, len(VISIT_STATUSES_SAME_SNAPSHOT)), ) def test_journal_client_origin_visit_statuses_same_snapshot_permutation( visit_statuses, swh_scheduler ): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [("cavabarder", "hg")] ) visit_stats = actual_origin_visit_stats[0] assert_visit_stats_ok( visit_stats, OriginVisitStats( url="cavabarder", visit_type="hg", last_successful=DATE1 + 2 * ONE_YEAR, last_visit=DATE1 + 2 * ONE_YEAR, last_visit_status=LastVisitStatus.successful, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), ), - ignore_fields=["next_visit_queue_position", "next_position_offset"], + ignore_fields=[ + "next_visit_queue_position", + "next_position_offset", + "successive_visits", + ], ) # We ignore out of order messages, so the next_position_offset isn't exact # depending on the permutation. What matters is consistency of the final # dates (last_visit and last_successful). assert 4 <= visit_stats.next_position_offset <= 6 + # same goes for successive_visits + assert 1 <= visit_stats.successive_visits <= 3 @pytest.mark.parametrize( "position_offset, interval", [ (0, 1), (1, 1), (2, 2), (3, 2), (4, 2), (5, 4), (6, 16), (7, 64), (8, 256), (9, 1024), (10, 4096), ], ) def test_journal_client_from_position_offset_to_days(position_offset, interval): assert from_position_offset_to_days(position_offset) == interval def test_journal_client_from_position_offset_to_days_only_positive_input(): with pytest.raises(AssertionError): from_position_offset_to_days(-1) @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.01, 1), (-0.01, 5), (0.1, 8), (-0.1, 10),] ) def test_next_visit_queue_position(mocker, fudge_factor, next_position_offset): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() mock_now = mocker.patch("swh.scheduler.journal_client.utcnow") mock_now.return_value = date_now actual_position = next_visit_queue_position( {}, {"next_position_offset": next_position_offset, "visit_type": "svn",} ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_now.called assert mock_random.called @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.02, 2), (-0.02, 3), (0, 7), (-0.09, 9),] ) def test_next_visit_queue_position_with_state( mocker, fudge_factor, next_position_offset ): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() actual_position = next_visit_queue_position( {"git": date_now}, {"next_position_offset": next_position_offset, "visit_type": "git",}, ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_random.called @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.03, 3), (-0.03, 4), (0.08, 7), (-0.08, 9),] ) def test_next_visit_queue_position_with_next_visit_queue( mocker, fudge_factor, next_position_offset ): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() actual_position = next_visit_queue_position( {}, { "next_position_offset": next_position_offset, "visit_type": "hg", "next_visit_queue_position": date_now, }, ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_random.called