diff --git a/sql/updates/30.sql b/sql/updates/30.sql new file mode 100644 --- /dev/null +++ b/sql/updates/30.sql @@ -0,0 +1,79 @@ +-- SWH DB schema upgrade +-- from_version: 29 +-- to_version: 30 +-- description: merge a bunch of fields in origin_visit_stats + +insert into dbversion (version, release, description) + values (30, now(), 'Work In Progress'); + + +create type last_visit_status as enum ('successful', 'failed', 'not_found'); +comment on type last_visit_status is 'Record of the status of the last visit of an origin'; + +alter table origin_visit_stats + add column last_successful timestamptz, + add column last_visit timestamptz, + add column last_visit_status last_visit_status; + +comment on column origin_visit_stats.last_successful is 'Date of the last successful visit, at which we recorded the `last_snapshot`'; +comment on column origin_visit_stats.last_visit is 'Date of the last visit overall'; +comment on column origin_visit_stats.last_visit_status is 'Status of the last visit'; + +update origin_visit_stats + set last_successful = greatest(last_eventful, last_uneventful), + last_visit = greatest(last_eventful, last_uneventful, last_failed, last_notfound); + +update origin_visit_stats + set last_visit_status = + case + when last_visit = last_failed then 'failed'::last_visit_status + when last_visit = last_notfound then 'not_found'::last_visit_status + else 'successful'::last_visit_status + end + where last_visit is not null; + + +create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) + returns setof scheduler_metrics + language sql +as $$ + insert into scheduler_metrics ( + lister_id, visit_type, last_update, + origins_known, origins_enabled, + origins_never_visited, origins_with_pending_changes + ) + select + lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, + count(*) as origins_known, + count(*) filter (where enabled) as origins_enabled, + count(*) filter (where + enabled and last_snapshot is NULL + ) as origins_never_visited, + count(*) filter (where + enabled and lo.last_update > last_successful + ) as origins_with_pending_changes + from listed_origins lo + left join origin_visit_stats ovs using (url, visit_type) + where + -- update only for the requested lister + update_metrics.lister_id = lo.lister_id + -- or for all listers if the function argument is null + or update_metrics.lister_id is null + group by (lister_id, visit_type) + on conflict (lister_id, visit_type) do update + set + last_update = EXCLUDED.last_update, + origins_known = EXCLUDED.origins_known, + origins_enabled = EXCLUDED.origins_enabled, + origins_never_visited = EXCLUDED.origins_never_visited, + origins_with_pending_changes = EXCLUDED.origins_with_pending_changes + returning * +$$; + +comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; + +alter table origin_visit_stats + drop column last_eventful, + drop column last_uneventful, + drop column last_failed, + drop column last_notfound; diff --git a/swh/scheduler/api/serializers.py b/swh/scheduler/api/serializers.py --- a/swh/scheduler/api/serializers.py +++ b/swh/scheduler/api/serializers.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -18,11 +18,17 @@ return d +def _encode_enum(obj): + return obj.value + + ENCODERS: List[Tuple[type, str, Callable]] = [ (model.BaseSchedulerModel, "scheduler_model", _encode_model_object), + (model.LastVisitStatus, "last_visit_status", _encode_enum), ] DECODERS: Dict[str, Callable] = { - "scheduler_model": lambda d: getattr(model, d.pop("__type__"))(**d) + "scheduler_model": lambda d: getattr(model, d.pop("__type__"))(**d), + "last_visit_status": model.LastVisitStatus, } diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -11,6 +11,7 @@ import attr from psycopg2.errors import CardinalityViolation +from psycopg2.extensions import AsIs import psycopg2.extras import psycopg2.pool @@ -20,12 +21,23 @@ from .exc import SchedulerException, StaleData, UnknownPolicy from .interface import ListedOriginPageToken, PaginatedListedOriginList -from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics +from .model import ( + LastVisitStatus, + ListedOrigin, + Lister, + OriginVisitStats, + SchedulerMetrics, +) logger = logging.getLogger(__name__) +def adapt_LastVisitStatus(v: LastVisitStatus): + return AsIs(f"'{v.value}'::last_visit_status") + + psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) +psycopg2.extensions.register_adapter(LastVisitStatus, adapt_LastVisitStatus) psycopg2.extras.register_uuid() @@ -333,7 +345,7 @@ timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), - notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), + not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), db=None, cur=None, ) -> List[ListedOrigin]: @@ -364,10 +376,7 @@ """origin_visit_stats.last_scheduled IS NULL OR origin_visit_stats.last_scheduled < GREATEST( %s - %s, - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful, - origin_visit_stats.last_failed, - origin_visit_stats.last_notfound + origin_visit_stats.last_visit ) """ ) @@ -377,20 +386,20 @@ if failed_cooldown: # Don't retry failed origins too often where_clauses.append( - "origin_visit_stats.last_failed is null " - "or origin_visit_stats.last_failed < %s - %s" + "origin_visit_stats.last_visit_status is distinct from 'failed' " + "or origin_visit_stats.last_visit < %s - %s" ) query_args.append(timestamp) query_args.append(failed_cooldown) - if notfound_cooldown: + if not_found_cooldown: # Don't retry not found origins too often where_clauses.append( - "origin_visit_stats.last_notfound is null " - "or origin_visit_stats.last_notfound < %s - %s" + "origin_visit_stats.last_visit_status is distinct from 'not_found' " + "or origin_visit_stats.last_visit < %s - %s" ) query_args.append(timestamp) - query_args.append(notfound_cooldown) + query_args.append(not_found_cooldown) if policy == "oldest_scheduled_first": order_by = "origin_visit_stats.last_scheduled NULLS FIRST" @@ -410,24 +419,13 @@ # ignore origins we have visited after the known last update where_clauses.append("listed_origins.last_update IS NOT NULL") where_clauses.append( - """ - listed_origins.last_update - > GREATEST( - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful - ) - """ + "listed_origins.last_update > origin_visit_stats.last_successful" ) # order by decreasing visit lag - order_by = """\ - listed_origins.last_update - - GREATEST( - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful - ) - DESC - """ + order_by = ( + "listed_origins.last_update - origin_visit_stats.last_successful DESC" + ) elif policy == "origins_without_last_update": where_clauses.append("last_update IS NULL") order_by = "origin_visit_stats.next_visit_queue_position nulls first" diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -397,7 +397,7 @@ timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), - notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), + not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. @@ -415,8 +415,8 @@ the same origin again failed_cooldown: the minimal interval before which we can reschedule a failed origin - notfound_cooldown: the minimal interval before which we can reschedule a - notfound origin + not_found_cooldown: the minimal interval before which we can reschedule a + not_found origin """ ... diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -10,7 +10,7 @@ import attr from swh.scheduler.interface import SchedulerInterface -from swh.scheduler.model import OriginVisitStats +from swh.scheduler.model import LastVisitStatus, OriginVisitStats from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" @@ -28,11 +28,13 @@ return max(filtered_dates) -def update_next_position_offset(visit_stats: Dict, increment: int) -> None: - """Update the next position offset according to existing value and the increment. The - resulting value must be a positive integer. +def update_next_position_offset(visit_stats: Dict, eventful: Optional[bool]) -> None: + """Update the next position offset according to the existing value and the eventfulness + of the visit. The resulting value must be a positive integer. """ + increment = -2 if eventful else 1 + visit_stats["next_position_offset"] = max( 0, visit_stats["next_position_offset"] + increment ) @@ -102,39 +104,67 @@ return current_position + visit_interval +def get_last_status( + incoming_visit_status: Dict, known_visit_stats: Dict +) -> Tuple[LastVisitStatus, Optional[bool]]: + """Determine the `last_visit_status` and eventfulness of an origin according to + the received visit_status object, and the state of the origin_visit_stats in db + """ + + status = incoming_visit_status["status"] + if status in ("not_found", "failed"): + return LastVisitStatus(status), None + + assert status in ("full", "partial") + + if incoming_visit_status["snapshot"] is None: + return LastVisitStatus.failed, None + + if incoming_visit_status["snapshot"] != known_visit_stats.get("last_snapshot"): + return LastVisitStatus.successful, True + + return LastVisitStatus.successful, False + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topic to update "origin_visit_stats" information on (origin, visit_type). The goal is to compute visit stats information - per origin and visit_type: last_eventful, last_uneventful, last_failed, - last_notfound, last_snapshot, ... + per origin and visit_type: `last_successful`, `last_visit`, `last_visit_status`, ... Details: - - This journal consumes origin visit status information for final visit status - ("full", "partial", "failed", "not_found"). It drops the information on non - final visit statuses ("ongoing", "created"). + - This journal consumes origin visit status information for final visit + status (`"full"`, `"partial"`, `"failed"`, `"not_found"`). It drops + the information of non final visit statuses (`"ongoing"`, + `"created"`). - - The snapshot is used to determine the "eventful/uneventful" nature of the - origin visit status. + - This journal client only considers messages that arrive in + chronological order. Messages that arrive out of order (i.e. with a + date field smaller than the latest recorded visit of the origin) are + ignored. This is a tradeoff between correctness and simplicity of + implementation [1]_. - - When no snapshot is provided, the visit is considered as failed so the - last_failed column is updated. + - The snapshot is used to determine the eventful or uneventful nature of + the origin visit. - - As there is no time guarantee when reading message from the topic, the code - tries to keep the data in the most timely ordered as possible. + - When no snapshot is provided, the visit is considered as failed. - - Compared to what is already stored in the origin_visit_stats table, only most - recent information is kept. - - - This updates the `next_visit_queue_position` (time at which some new objects + - Finally, the `next_visit_queue_position` (time at which some new objects are expected to be added for the origin), and `next_position_offset` (duration - that we expect to wait between visits of this origin). + that we expect to wait between visits of this origin) are updated. This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. + .. [1] Ignoring out of order messages makes the initialization of the + origin_visit_status table (from a full journal) less deterministic: only the + `last_visit`, `last_visit_state` and `last_successful` fields are guaranteed + to be exact, the `next_position_offset` field is a best effort estimate + (which should converge once the client has run for a while on in-order + messages). + """ assert set(messages) <= { msg_type @@ -178,72 +208,34 @@ visit_stats_d = origin_visit_stats[pk] - if msg_dict["status"] == "not_found": - visit_stats_d["last_notfound"] = max_date( - msg_dict["date"], visit_stats_d.get("last_notfound") - ) - update_next_position_offset(visit_stats_d, 1) # visit less often - elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: - visit_stats_d["last_failed"] = max_date( - msg_dict["date"], visit_stats_d.get("last_failed") + if ( + visit_stats_d.get("last_visit") + and msg_dict["date"] <= visit_stats_d["last_visit"] + ): + # message received out of order, ignore + continue + + # Compare incoming message to known status of the origin, to determine + # eventfulness + last_visit_status, eventful = get_last_status(msg_dict, visit_stats_d) + + # Update the position offset according to the visit status, + # if we had already visited this origin before. + + if visit_stats_d.get("last_visit"): + update_next_position_offset(visit_stats_d, eventful) + + # Record current visit date as highest known date (we've rejected out of order + # messages earlier). + visit_stats_d["last_visit"] = msg_dict["date"] + visit_stats_d["last_visit_status"] = last_visit_status + + # Record last successful visit date + if last_visit_status == LastVisitStatus.successful: + visit_stats_d["last_successful"] = max_date( + msg_dict["date"], visit_stats_d.get("last_successful") ) - update_next_position_offset(visit_stats_d, 1) # visit less often - else: # visit with snapshot, something happened - if visit_stats_d["last_snapshot"] is None: - # first time visit with snapshot, we keep relevant information - visit_stats_d["last_eventful"] = msg_dict["date"] - visit_stats_d["last_snapshot"] = msg_dict["snapshot"] - else: - # last_snapshot is set, so an eventful visit should have previously been - # recorded - assert visit_stats_d["last_eventful"] is not None - latest_recorded_visit_date = max_date( - visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] - ) - current_status_date = msg_dict["date"] - previous_snapshot = visit_stats_d["last_snapshot"] - if msg_dict["snapshot"] != previous_snapshot: - if ( - latest_recorded_visit_date - and current_status_date < latest_recorded_visit_date - ): - # out of order message so ignored - continue - # new eventful visit (new snapshot) - visit_stats_d["last_eventful"] = current_status_date - visit_stats_d["last_snapshot"] = msg_dict["snapshot"] - # Visit this origin more often in the future - update_next_position_offset(visit_stats_d, -2) - else: - # same snapshot as before - if ( - latest_recorded_visit_date - and current_status_date < latest_recorded_visit_date - ): - # we receive an old message which is an earlier "eventful" event - # than what we had, we consider the last_eventful event as - # actually an uneventful event. - # The last uneventful visit remains the most recent: - # max, previously computed - visit_stats_d["last_uneventful"] = latest_recorded_visit_date - # The eventful visit remains the oldest one: min - visit_stats_d["last_eventful"] = min( - visit_stats_d["last_eventful"], current_status_date - ) - # Visit this origin less often in the future - update_next_position_offset(visit_stats_d, 1) - elif ( - latest_recorded_visit_date - and current_status_date == latest_recorded_visit_date - ): - # A duplicated message must be ignored to avoid - # populating the last_uneventful message - continue - else: - # uneventful event - visit_stats_d["last_uneventful"] = current_status_date - # Visit this origin less often in the future - update_next_position_offset(visit_stats_d, 1) + visit_stats_d["last_snapshot"] = msg_dict["snapshot"] # Update the next visit queue position (which will be used solely for origin # without any last_update, cf. the dedicated scheduling policy diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -4,7 +4,8 @@ # See top-level LICENSE file for more information import datetime -from typing import Any, Dict, List, Optional, Tuple +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple, Union from uuid import UUID import attr @@ -177,6 +178,20 @@ } +class LastVisitStatus(Enum): + successful = "successful" + failed = "failed" + not_found = "not_found" + + +def convert_last_visit_status( + s: Union[None, str, LastVisitStatus] +) -> Optional[LastVisitStatus]: + if not isinstance(s, str): + return s + return LastVisitStatus(s) + + @attr.s(frozen=True, slots=True) class OriginVisitStats(BaseSchedulerModel): """Represents an aggregated origin visits view. @@ -188,15 +203,17 @@ visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) - last_eventful = attr.ib( - type=Optional[datetime.datetime], validator=type_validator() + last_successful = attr.ib( + type=Optional[datetime.datetime], validator=type_validator(), default=None ) - last_uneventful = attr.ib( - type=Optional[datetime.datetime], validator=type_validator() + last_visit = attr.ib( + type=Optional[datetime.datetime], validator=type_validator(), default=None ) - last_failed = attr.ib(type=Optional[datetime.datetime], validator=type_validator()) - last_notfound = attr.ib( - type=Optional[datetime.datetime], validator=type_validator() + last_visit_status = attr.ib( + type=Optional[LastVisitStatus], + validator=type_validator(), + default=None, + converter=convert_last_visit_status, ) last_scheduled = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, @@ -209,20 +226,12 @@ ) next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) - @last_eventful.validator - def check_last_eventful(self, attribute, value): - check_timestamptz(value) - - @last_uneventful.validator - def check_last_uneventful(self, attribute, value): - check_timestamptz(value) - - @last_failed.validator - def check_last_failed(self, attribute, value): + @last_successful.validator + def check_last_successful(self, attribute, value): check_timestamptz(value) - @last_notfound.validator - def check_last_notfound(self, attribute, value): + @last_visit.validator + def check_last_visit(self, attribute, value): check_timestamptz(value) @next_visit_queue_position.validator diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -11,7 +11,7 @@ comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (29, now(), 'Work In Progress'); + values (30, now(), 'Work In Progress'); create table task_type ( type text primary key, @@ -160,14 +160,15 @@ comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote'; +create type last_visit_status as enum ('successful', 'failed', 'not_found'); +comment on type last_visit_status is 'Record of the status of the last visit of an origin'; create table origin_visit_stats ( url text not null, visit_type text not null, - last_eventful timestamptz, - last_uneventful timestamptz, - last_failed timestamptz, - last_notfound timestamptz, + last_successful timestamptz, + last_visit timestamptz, + last_visit_status last_visit_status, -- visit scheduling information last_scheduled timestamptz, -- last snapshot resulting from an eventful visit @@ -184,10 +185,9 @@ comment on table origin_visit_stats is 'Aggregated information on visits for each origin in the archive'; comment on column origin_visit_stats.url is 'Origin URL'; comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; -comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; -comment on column origin_visit_stats.last_uneventful is 'Date of the last uneventful event'; -comment on column origin_visit_stats.last_failed is 'Date of the last failed event'; -comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; +comment on column origin_visit_stats.last_successful is 'Date of the last successful visit, at which we recorded the `last_snapshot`'; +comment on column origin_visit_stats.last_visit is 'Date of the last visit overall'; +comment on column origin_visit_stats.last_visit_status is 'Status of the last visit'; comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; diff --git a/swh/scheduler/sql/40-func.sql b/swh/scheduler/sql/40-func.sql --- a/swh/scheduler/sql/40-func.sql +++ b/swh/scheduler/sql/40-func.sql @@ -378,7 +378,7 @@ enabled and last_snapshot is NULL ) as origins_never_visited, count(*) filter (where - enabled and lo.last_update > greatest(ovs.last_eventful, ovs.last_uneventful) + enabled and lo.last_update > last_successful ) as origins_with_pending_changes from listed_origins lo left join origin_visit_stats ovs using (url, visit_type) diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -20,7 +20,7 @@ next_visit_queue_position, process_journal_objects, ) -from swh.scheduler.model import ListedOrigin, OriginVisitStats +from swh.scheduler.model import LastVisitStatus, ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow @@ -132,8 +132,8 @@ def assert_visit_stats_ok( - actual_visit_stats: List[OriginVisitStats], - expected_visit_stats: List[OriginVisitStats], + actual_visit_stats: OriginVisitStats, + expected_visit_stats: OriginVisitStats, ignore_fields: List[str] = ["next_visit_queue_position"], ): """Utility test function to ensure visits stats read from the backend are in the right @@ -141,17 +141,14 @@ dedicated tests so it's not tested in tests that are calling this function. """ - assert len(actual_visit_stats) == len(expected_visit_stats) - fields = attr.fields_dict(OriginVisitStats) defaults = {field: fields[field].default for field in ignore_fields} - for visit_stats in actual_visit_stats: - visit_stats = attr.evolve(visit_stats, **defaults) - assert visit_stats in expected_visit_stats + actual_visit_stats = attr.evolve(actual_visit_stats, **defaults) + assert actual_visit_stats == expected_visit_stats -def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): +def test_journal_client_origin_visit_status_from_journal_last_not_found(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, @@ -167,19 +164,14 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=visit_status["date"], - last_snapshot=None, - next_position_offset=5, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_visit=visit_status["date"], + last_visit_status=LastVisitStatus.not_found, + next_position_offset=4, + ), ) visit_statuses = [ @@ -207,19 +199,14 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=DATE3, - last_snapshot=None, - next_position_offset=7, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_visit=DATE3, + last_visit_status=LastVisitStatus.not_found, + next_position_offset=6, + ), ) @@ -265,19 +252,14 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="bar", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=DATE3, - last_notfound=None, - last_snapshot=None, - next_position_offset=7, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="bar", + visit_type="git", + last_visit=DATE3, + last_visit_status=LastVisitStatus.failed, + next_position_offset=6, + ), ) @@ -307,23 +289,18 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="bar", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=DATE2, - last_notfound=None, - last_snapshot=None, - next_position_offset=6, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="bar", + visit_type="git", + last_visit=DATE2, + last_visit_status=LastVisitStatus.failed, + next_position_offset=5, + ), ) -def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): +def test_journal_client_origin_visit_status_from_journal_last_successful(swh_scheduler): visit_statuses = [ { "origin": "bar", @@ -365,19 +342,16 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE3, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), - next_position_offset=0, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_successful=DATE3, + last_visit=DATE3, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + next_position_offset=0, + ), ) @@ -397,10 +371,9 @@ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], - last_eventful=DATE1, - last_uneventful=DATE3, - last_failed=DATE2, - last_notfound=DATE1, + last_successful=DATE2, + last_visit=DATE3, + last_visit_status=LastVisitStatus.failed, last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=4, @@ -416,19 +389,17 @@ [(visit_status["origin"], visit_status["type"])] ) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=DATE1, - last_uneventful=visit_status["date"], # most recent date but uneventful - last_failed=DATE2, - last_notfound=DATE1, - last_snapshot=visit_status["snapshot"], - next_position_offset=5, # uneventful so visit less often - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_visit=DATE3 + ONE_DAY, + last_successful=DATE3 + ONE_DAY, + last_visit_status=LastVisitStatus.successful, + last_snapshot=visit_status["snapshot"], + next_visit_queue_position=None, + next_position_offset=5, + ), ) @@ -481,22 +452,25 @@ ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + visit_stats = actual_origin_visit_stats[0] assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1 + ONE_DAY, - last_uneventful=DATE1 + 3 * ONE_DAY, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - next_position_offset=5, # uneventful, visit origin less often in future - ) - ], + visit_stats, + OriginVisitStats( + url="foo", + visit_type="git", + last_successful=DATE1 + 3 * ONE_DAY, + last_visit=DATE1 + 3 * ONE_DAY, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ), + ignore_fields=["next_visit_queue_position", "next_position_offset"], ) + # We ignore out of order messages, so the next_position_offset isn't exact + # depending on the permutation. What matters is consistency of the final + # dates (last_visit and last_successful). + assert 4 <= visit_stats.next_position_offset <= 5 + VISIT_STATUSES_1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} @@ -547,26 +521,26 @@ ) actual_visit_stats = swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) + visit_stats = actual_visit_stats[0] assert_visit_stats_ok( - actual_visit_stats, - [ - OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1 + 2 * ONE_DAY, - last_uneventful=DATE1 + 3 * ONE_DAY, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - ], - ignore_fields=[ - "next_visit_queue_position", - "next_position_offset", # depending on the permutations, the value differs - ], + visit_stats, + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_successful=DATE1 + 3 * ONE_DAY, + last_visit=DATE1 + 3 * ONE_DAY, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ), + ignore_fields=["next_visit_queue_position", "next_position_offset"], ) + # We ignore out of order messages, so the next_position_offset isn't exact + # depending on the permutation. What matters is consistency of the final + # dates (last_visit and last_successful). + assert 2 <= visit_stats.next_position_offset <= 5 + VISIT_STATUSES_2 = [ {**ovs, "date": DATE1 + n * ONE_DAY} @@ -667,10 +641,9 @@ assert ovs.last_scheduled is None ovs = swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] - assert ovs.last_eventful == DATE1 + 5 * ONE_DAY - assert ovs.last_uneventful is None - assert ovs.last_failed is None - assert ovs.last_notfound is None + assert ovs.last_successful == DATE1 + 5 * ONE_DAY + assert ovs.last_visit == DATE1 + 5 * ONE_DAY + assert ovs.last_visit_status == LastVisitStatus.successful assert ovs.last_snapshot == hash_to_bytes( "5555555555555555555555555555555555555555" ) @@ -699,18 +672,15 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_successful=DATE1, + last_visit=DATE1, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ), ) @@ -746,19 +716,16 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=DATE2, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - next_position_offset=5, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_successful=DATE2, + last_visit=DATE2, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=4, + ), ) @@ -809,21 +776,24 @@ actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [("cavabarder", "hg")] ) + visit_stats = actual_origin_visit_stats[0] assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1, - last_uneventful=DATE1 + 2 * ONE_YEAR, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - next_position_offset=6, # 2 uneventful visits, whatever the permutation - ) - ], - ) + visit_stats, + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_successful=DATE1 + 2 * ONE_YEAR, + last_visit=DATE1 + 2 * ONE_YEAR, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ), + ignore_fields=["next_visit_queue_position", "next_position_offset"], + ) + + # We ignore out of order messages, so the next_position_offset isn't exact + # depending on the permutation. What matters is consistency of the final + # dates (last_visit and last_successful). + assert 4 <= visit_stats.next_position_offset <= 6 @pytest.mark.parametrize( diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -19,7 +19,12 @@ from swh.model.hashutil import hash_to_bytes from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface -from swh.scheduler.model import ListedOrigin, OriginVisitStats, SchedulerMetrics +from swh.scheduler.model import ( + LastVisitStatus, + ListedOrigin, + OriginVisitStats, + SchedulerMetrics, +) from swh.scheduler.utils import utcnow from .common import ( @@ -842,10 +847,8 @@ url=origin.url, visit_type=origin.visit_type, last_snapshot=None, - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=None, + last_visit=None, last_scheduled=base_date - timedelta(seconds=i), ) for i, origin in enumerate(origins[1:]) @@ -871,7 +874,7 @@ expected=expected, ) - @pytest.mark.parametrize("which_cooldown", ("scheduled", "failed", "notfound")) + @pytest.mark.parametrize("which_cooldown", ("scheduled", "failed", "not_found")) @pytest.mark.parametrize("cooldown", (7, 15)) def test_grab_next_visits_cooldowns( self, swh_scheduler, listed_origins_by_type, which_cooldown, cooldown, @@ -886,18 +889,24 @@ expected=expected, ) - # Mark all the visits as `{which_cooldown}` (scheduled, failed or notfound) on - # the `after` timestamp - ovs_args = {"last_failed": None, "last_notfound": None, "last_scheduled": None} - ovs_args[f"last_{which_cooldown}"] = after + # Mark all the visits as scheduled, failed or notfound on the `after` timestamp + ovs_args = { + "last_visit": None, + "last_visit_status": None, + "last_scheduled": None, + } + if which_cooldown == "scheduled": + ovs_args["last_scheduled"] = after + else: + ovs_args["last_visit"] = after + ovs_args["last_visit_status"] = LastVisitStatus(which_cooldown) visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=None, - last_eventful=None, - last_uneventful=None, + last_successful=None, **ovs_args, ) for i, origin in enumerate(origins) @@ -908,7 +917,7 @@ cooldown_args = { "scheduled_cooldown": None, "failed_cooldown": None, - "notfound_cooldown": None, + "not_found_cooldown": None, } cooldown_args[f"{which_cooldown}_cooldown"] = cooldown_td @@ -985,10 +994,8 @@ url=origin.url, visit_type=origin.visit_type, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - last_eventful=visit_date, - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=visit_date, + last_visit=visit_date, ) for origin in visited_origins ] @@ -1099,10 +1106,8 @@ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), + last_visit=utcnow(), next_visit_queue_position=date_now + timedelta(days=random.uniform(-10, 1)), ) @@ -1112,10 +1117,8 @@ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), + last_visit=utcnow(), next_visit_queue_position=date_now + timedelta(days=random.uniform(1, 10)), # definitely > now() ) @@ -1125,10 +1128,8 @@ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), + last_visit=utcnow(), ) for origin in origins[150:] ] @@ -1176,10 +1177,8 @@ OriginVisitStats( url=f"https://example.com/origin-{i:03d}", visit_type="git", - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), + last_visit=utcnow(), ) for i in range( page_size + 1 @@ -1201,10 +1200,8 @@ visit_stats = OriginVisitStats( url=url, visit_type="git", - last_eventful=eventful_date, - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=eventful_date, + last_visit=eventful_date, ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) swh_scheduler.origin_visit_stats_upsert([visit_stats]) @@ -1212,14 +1209,9 @@ assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] - uneventful_date = utcnow() + new_visit_date = utcnow() visit_stats = OriginVisitStats( - url=url, - visit_type="git", - last_eventful=None, - last_uneventful=uneventful_date, - last_failed=None, - last_notfound=None, + url=url, visit_type="git", last_successful=None, last_visit=new_visit_date, ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) @@ -1228,38 +1220,12 @@ expected_visit_stats = OriginVisitStats( url=url, visit_type="git", - last_eventful=eventful_date, - last_uneventful=uneventful_date, - last_failed=None, - last_notfound=None, + last_successful=eventful_date, + last_visit=new_visit_date, ) assert uneventful_visits == [expected_visit_stats] - failed_date = utcnow() - visit_stats = OriginVisitStats( - url=url, - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=failed_date, - last_notfound=None, - ) - swh_scheduler.origin_visit_stats_upsert([visit_stats]) - - failed_visits = swh_scheduler.origin_visit_stats_get([(url, "git")]) - - expected_visit_stats = OriginVisitStats( - url=url, - visit_type="git", - last_eventful=eventful_date, - last_uneventful=uneventful_date, - last_failed=failed_date, - last_notfound=None, - ) - - assert failed_visits == [expected_visit_stats] - def test_origin_visit_stats_upsert_with_snapshot(self, swh_scheduler) -> None: eventful_date = utcnow() url = "https://github.com/666/test" @@ -1267,10 +1233,7 @@ visit_stats = OriginVisitStats( url=url, visit_type="git", - last_eventful=eventful_date, - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=eventful_date, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) @@ -1284,19 +1247,13 @@ OriginVisitStats( url="foo", visit_type="git", - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ), OriginVisitStats( url="bar", visit_type="git", - last_eventful=None, - last_uneventful=utcnow(), - last_notfound=None, - last_failed=None, + last_visit=utcnow(), last_snapshot=hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157bfff"), ), ] @@ -1318,20 +1275,14 @@ OriginVisitStats( url="foo", visit_type="git", - last_eventful=None, - last_uneventful=utcnow(), - last_notfound=None, - last_failed=None, - last_snapshot=None, + last_successful=None, + last_visit=utcnow(), ), OriginVisitStats( url="foo", visit_type="git", - last_eventful=None, - last_uneventful=utcnow(), - last_notfound=None, - last_failed=None, - last_snapshot=None, + last_successful=utcnow(), + last_visit=None, ), ] ) @@ -1387,10 +1338,7 @@ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), last_snapshot=hash_to_bytes( "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), @@ -1419,10 +1367,7 @@ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, - last_eventful=visited_origin.last_update - timedelta(days=1), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=visited_origin.last_update - timedelta(days=1), last_snapshot=hash_to_bytes( "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ),