diff --git a/sql/updates/29.sql b/sql/updates/29.sql new file mode 100644 index 0000000..2724b17 --- /dev/null +++ b/sql/updates/29.sql @@ -0,0 +1,16 @@ +-- SWH DB schema upgrade +-- from_version: 28 +-- to_version: 29 +-- description: + +insert into dbversion (version, release, description) + values (29, now(), 'Work In Progress'); + +alter table origin_visit_stats +add column next_visit_queue_position timestamptz; + +alter table origin_visit_stats +add column next_position_offset int not null default 4; + +comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; +comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py index df618d1..f4be7c6 100644 --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -1,163 +1,181 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import OriginVisitStats msg_type = "origin_visit_status" def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates At least one date must be not None. """ filtered_dates = [d for d in dates if d is not None] if not filtered_dates: raise ValueError("At least one date should be a valid datetime") return max(filtered_dates) +def update_next_position_offset(visit_stats: Dict, increment: int) -> None: + """Update the next position offset according to existing value and the increment. The + resulting value must be a positive integer. + + """ + visit_stats["next_position_offset"] = max( + 0, visit_stats["next_position_offset"] + increment + ) + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topic to update "origin_visit_stats" information on (origin, visit_type). The goal is to compute visit stats information per origin and visit_type: last_eventful, last_uneventful, last_failed, last_notfound, last_snapshot, ... Details: - This journal consumes origin visit status information for final visit status ("full", "partial", "failed", "not_found"). It drops the information on non final visit statuses ("ongoing", "created"). - The snapshot is used to determine the "eventful/uneventful" nature of the origin visit status. - When no snapshot is provided, the visit is considered as failed so the last_failed column is updated. - As there is no time guarantee when reading message from the topic, the code tries to keep the data in the most timely ordered as possible. - Compared to what is already stored in the origin_visit_stats table, only most recent information is kept. + - This updates the `next_visit_queue_position` (time at which some new objects + are expected to be added for the origin), and `next_position_offset` (duration + that we expect to wait between visits of this origin). + This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. """ assert set(messages) <= { msg_type }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" interesting_messages = [ msg for msg in messages[msg_type] if "type" in msg and msg["status"] not in ("created", "ongoing") ] if not interesting_messages: return origin_visit_stats: Dict[Tuple[str, str], Dict] = { (visit_stats.url, visit_stats.visit_type): attr.asdict(visit_stats) for visit_stats in scheduler.origin_visit_stats_get( list(set((vs["origin"], vs["type"]) for vs in interesting_messages)) ) } # Use the default values from the model object empty_object = { field.name: field.default if field.default != attr.NOTHING else None for field in attr.fields(OriginVisitStats) } for msg_dict in interesting_messages: origin = msg_dict["origin"] visit_type = msg_dict["type"] pk = origin, visit_type if pk not in origin_visit_stats: origin_visit_stats[pk] = { **empty_object, "url": origin, "visit_type": visit_type, } visit_stats_d = origin_visit_stats[pk] if msg_dict["status"] == "not_found": visit_stats_d["last_notfound"] = max_date( msg_dict["date"], visit_stats_d.get("last_notfound") ) - elif msg_dict["status"] == "failed": - visit_stats_d["last_failed"] = max_date( - msg_dict["date"], visit_stats_d.get("last_failed") - ) - elif msg_dict["snapshot"] is None: + update_next_position_offset(visit_stats_d, 1) # visit less often + elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: visit_stats_d["last_failed"] = max_date( msg_dict["date"], visit_stats_d.get("last_failed") ) + update_next_position_offset(visit_stats_d, 1) # visit less often else: # visit with snapshot, something happened if visit_stats_d["last_snapshot"] is None: # first time visit with snapshot, we keep relevant information visit_stats_d["last_eventful"] = msg_dict["date"] visit_stats_d["last_snapshot"] = msg_dict["snapshot"] else: - # visit with snapshot already stored, last_eventful should already be - # stored + # last_snapshot is set, so an eventful visit should have previously been + # recorded assert visit_stats_d["last_eventful"] is not None latest_recorded_visit_date = max_date( visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] ) current_status_date = msg_dict["date"] previous_snapshot = visit_stats_d["last_snapshot"] if msg_dict["snapshot"] != previous_snapshot: if ( latest_recorded_visit_date and current_status_date < latest_recorded_visit_date ): # out of order message so ignored continue # new eventful visit (new snapshot) visit_stats_d["last_eventful"] = current_status_date visit_stats_d["last_snapshot"] = msg_dict["snapshot"] + # Visit this origin more often in the future + update_next_position_offset(visit_stats_d, -2) else: # same snapshot as before if ( latest_recorded_visit_date and current_status_date < latest_recorded_visit_date ): # we receive an old message which is an earlier "eventful" event # than what we had, we consider the last_eventful event as # actually an uneventful event. # The last uneventful visit remains the most recent: # max, previously computed visit_stats_d["last_uneventful"] = latest_recorded_visit_date # The eventful visit remains the oldest one: min visit_stats_d["last_eventful"] = min( visit_stats_d["last_eventful"], current_status_date ) + # Visit this origin less often in the future + update_next_position_offset(visit_stats_d, 1) elif ( latest_recorded_visit_date and current_status_date == latest_recorded_visit_date ): # A duplicated message must be ignored to avoid # populating the last_uneventful message continue else: # uneventful event visit_stats_d["last_uneventful"] = current_status_date + # Visit this origin less often in the future + update_next_position_offset(visit_stats_d, 1) scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py index ea9e2c5..28aff38 100644 --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -1,254 +1,262 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Any, Dict, List, Optional, Tuple from uuid import UUID import attr import attr.converters from attrs_strict import type_validator def check_timestamptz(value) -> None: """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") @attr.s class BaseSchedulerModel: """Base class for database-backed objects. These database-backed objects are defined through attrs-based attributes that match the columns of the database 1:1. This is a (very) lightweight ORM. These attrs-based attributes have metadata specific to the functionality expected from these fields in the database: - `primary_key`: the column is a primary key; it should be filtered out when doing an `update` of the object - `auto_primary_key`: the column is a primary key, which is automatically handled by the database. It will not be inserted to. This must be matched with a database-side default value. - `auto_now_add`: the column is a timestamp that is set to the current time when the object is inserted, and never updated afterwards. This must be matched with a database-side default value. - `auto_now`: the column is a timestamp that is set to the current time when the object is inserted or updated. """ _pk_cols: Optional[Tuple[str, ...]] = None _select_cols: Optional[Tuple[str, ...]] = None _insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None @classmethod def primary_key_columns(cls) -> Tuple[str, ...]: """Get the primary key columns for this object type""" if cls._pk_cols is None: columns: List[str] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_primary_key", "primary_key") ): columns.append(field.name) cls._pk_cols = tuple(sorted(columns)) return cls._pk_cols @classmethod def select_columns(cls) -> Tuple[str, ...]: """Get all the database columns needed for a `select` on this object type""" if cls._select_cols is None: columns: List[str] = [] for field in attr.fields(cls): columns.append(field.name) cls._select_cols = tuple(sorted(columns)) return cls._select_cols @classmethod def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: """Get the database columns and metavars needed for an `insert` or `update` on this object type. This implements support for the `auto_*` field metadata attributes. """ if cls._insert_cols_and_metavars is None: zipped_cols_and_metavars: List[Tuple[str, str]] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_now_add", "auto_primary_key") ): continue elif field.metadata.get("auto_now"): zipped_cols_and_metavars.append((field.name, "now()")) else: zipped_cols_and_metavars.append((field.name, f"%({field.name})s")) zipped_cols_and_metavars.sort() cols, metavars = zip(*zipped_cols_and_metavars) cls._insert_cols_and_metavars = cols, metavars return cls._insert_cols_and_metavars @attr.s class Lister(BaseSchedulerModel): name = attr.ib(type=str, validator=[type_validator()]) instance_name = attr.ib(type=str, validator=[type_validator()]) # Populated by database id = attr.ib( type=Optional[UUID], validator=type_validator(), default=None, metadata={"auto_primary_key": True}, ) current_state = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) created = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) updated = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) @attr.s class ListedOrigin(BaseSchedulerModel): """Basic information about a listed origin, output by a lister""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) extra_loader_arguments = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) enabled = attr.ib(type=bool, validator=[type_validator()], default=True) first_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) last_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) def as_task_dict(self): return { "type": f"load-{self.visit_type}", "arguments": { "args": [], "kwargs": {"url": self.url, **self.extra_loader_arguments}, }, } @attr.s(frozen=True, slots=True) class OriginVisitStats(BaseSchedulerModel): """Represents an aggregated origin visits view. """ url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) last_eventful = attr.ib( type=Optional[datetime.datetime], validator=type_validator() ) last_uneventful = attr.ib( type=Optional[datetime.datetime], validator=type_validator() ) last_failed = attr.ib(type=Optional[datetime.datetime], validator=type_validator()) last_notfound = attr.ib( type=Optional[datetime.datetime], validator=type_validator() ) last_scheduled = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) last_snapshot = attr.ib( type=Optional[bytes], validator=type_validator(), default=None ) + next_visit_queue_position = attr.ib( + type=Optional[datetime.datetime], validator=type_validator(), default=None + ) + next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) @last_eventful.validator def check_last_eventful(self, attribute, value): check_timestamptz(value) @last_uneventful.validator def check_last_uneventful(self, attribute, value): check_timestamptz(value) @last_failed.validator def check_last_failed(self, attribute, value): check_timestamptz(value) @last_notfound.validator def check_last_notfound(self, attribute, value): check_timestamptz(value) + @next_visit_queue_position.validator + def check_next_visit_queue_position(self, attribute, value): + check_timestamptz(value) + @attr.s(frozen=True, slots=True) class SchedulerMetrics(BaseSchedulerModel): """Metrics for the scheduler, aggregated by (lister_id, visit_type)""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) origins_known = attr.ib(type=int, validator=[type_validator()], default=0) """Number of known (enabled or disabled) origins""" origins_enabled = attr.ib(type=int, validator=[type_validator()], default=0) """Number of origins that were present in the latest listings""" origins_never_visited = attr.ib(type=int, validator=[type_validator()], default=0) """Number of enabled origins that have never been visited (according to the visit cache)""" origins_with_pending_changes = attr.ib( type=int, validator=[type_validator()], default=0 ) """Number of enabled origins with known activity (recorded by a lister) since our last visit""" diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql index b0a752f..87aec97 100644 --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -1,208 +1,214 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (28, now(), 'Work In Progress'); + values (29, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval, min_interval interval, max_interval interval, backoff_factor float, max_queue_length bigint, num_retries bigint, retry_delay interval ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; comment on column task_type.num_retries is 'Default number of retries on transient failures'; comment on column task_type.retry_delay is 'Retry delay for the task'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'); comment on type task_status is 'Status of a given task'; create type task_policy as enum ('recurring', 'oneshot'); comment on type task_policy is 'Recurrence policy of the given task'; create type task_priority as enum('high', 'normal', 'low'); comment on type task_priority is 'Priority of the given task'; create table priority_ratio( id task_priority primary key, ratio float not null ); comment on table priority_ratio is 'Oneshot task''s reading ratio per priority'; comment on column priority_ratio.id is 'Task priority id'; comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority'; insert into priority_ratio (id, ratio) values ('high', 0.5); insert into priority_ratio (id, ratio) values ('normal', 0.3); insert into priority_ratio (id, ratio) values ('low', 0.2); create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval, status task_status not null, policy task_policy not null default 'recurring', retries_left bigint not null default 0, priority task_priority references priority_ratio(id), check (policy <> 'recurring' or current_interval is not null) ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; comment on column task.policy is 'Whether the task is one-shot or recurring'; comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' 'transient failure'; comment on column task.priority is 'Policy of the given task'; comment on column task.id is 'Task Identifier'; comment on column task.type is 'References task_type table'; comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')'; create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'); comment on type task_run_status is 'Status of a given task run'; create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; comment on column task_run.id is 'Task run identifier'; comment on column task_run.task is 'References task table'; comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; create table if not exists listers ( id uuid primary key default uuid_generate_v4(), name text not null, instance_name text not null, created timestamptz not null default now(), -- auto_now_add in the model current_state jsonb not null, updated timestamptz not null ); comment on table listers is 'Lister instances known to the origin visit scheduler'; comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)'; comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)'; comment on column listers.created is 'Timestamp at which the lister was originally created'; comment on column listers.current_state is 'Known current state of this lister'; comment on column listers.updated is 'Timestamp at which the lister state was last updated'; create table if not exists listed_origins ( -- Basic information lister_id uuid not null references listers(id), url text not null, visit_type text not null, extra_loader_arguments jsonb not null, -- Whether this origin still exists or not enabled boolean not null, -- time-based information first_seen timestamptz not null default now(), last_seen timestamptz not null, -- potentially provided by the lister last_update timestamptz, primary key (lister_id, url, visit_type) ); comment on table listed_origins is 'Origins known to the origin visit scheduler'; comment on column listed_origins.lister_id is 'Lister instance which owns this origin'; comment on column listed_origins.url is 'URL of the origin listed'; comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url'; comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin'; comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.'; comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister'; comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister'; comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote'; create table origin_visit_stats ( url text not null, visit_type text not null, last_eventful timestamptz, last_uneventful timestamptz, last_failed timestamptz, last_notfound timestamptz, -- visit scheduling information last_scheduled timestamptz, -- last snapshot resulting from an eventful visit last_snapshot bytea, - + -- position in the global queue, the "time" at which we expect the origin to have new + -- objects + next_visit_queue_position timestamptz, + -- duration that we expect to wait between visits of this origin + next_position_offset int not null default 4, primary key (url, visit_type) ); comment on column origin_visit_stats.url is 'Origin URL'; comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; comment on column origin_visit_stats.last_uneventful is 'Date of the last uneventful event'; comment on column origin_visit_stats.last_failed is 'Date of the last failed event'; comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; +comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; +comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; create table scheduler_metrics ( lister_id uuid not null references listers(id), visit_type text not null, last_update timestamptz not null, origins_known int not null default 0, origins_enabled int not null default 0, origins_never_visited int not null default 0, origins_with_pending_changes int not null default 0, primary key (lister_id, visit_type) ); comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; comment on column scheduler_metrics.last_update is 'Last update of these metrics'; comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been successfully visited'; comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py index 8169a7b..c1ca61d 100644 --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -1,750 +1,766 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools from itertools import permutations from unittest.mock import Mock import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.journal_client import max_date, process_journal_objects from swh.scheduler.model import ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) with pytest.raises(AssertionError, match="Got unexpected origin_visit"): process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) with pytest.raises(AssertionError, match="Expected origin_visit_status"): process_fn({}) ONE_DAY = datetime.timedelta(days=1) ONE_YEAR = datetime.timedelta(days=366) DATE3 = utcnow() DATE2 = DATE3 - ONE_DAY DATE1 = DATE2 - ONE_DAY assert DATE1 < DATE2 < DATE3 @pytest.mark.parametrize( "dates,expected_max_date", [ ((DATE1,), DATE1), ((None, DATE2), DATE2), ((DATE1, None), DATE1), ((DATE1, DATE2), DATE2), ((DATE2, DATE1), DATE2), ((DATE1, DATE2, DATE3), DATE3), ((None, DATE2, DATE3), DATE3), ((None, None, DATE3), DATE3), ((DATE1, None, DATE3), DATE3), ], ) def test_max_date(dates, expected_max_date): assert max_date(*dates) == expected_max_date def test_max_date_raise(): with pytest.raises(ValueError, match="valid datetime"): max_date() with pytest.raises(ValueError, match="valid datetime"): max_date(None) with pytest.raises(ValueError, match="valid datetime"): max_date(None, None) def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): """Only final statuses (full, partial) are important, the rest remain ignored. """ # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) visit_statuses = [ { "origin": "foo", "visit": 1, "status": "created", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "ongoing", "date": utcnow(), "type": "svn", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # All messages have been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def test_journal_client_ignore_missing_type(swh_scheduler): """Ignore statuses with missing type key""" # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) date = utcnow() snapshot = hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd") visit_statuses = [ { "origin": "foo", "visit": 1, "status": "full", "date": date, "snapshot": snapshot, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # The message has been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "not_found", "date": DATE1, "type": "git", "snapshot": None, } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert actual_origin_visit_stats == [ OriginVisitStats( url="foo", visit_type="git", last_eventful=None, last_uneventful=None, last_failed=None, last_notfound=visit_status["date"], last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=5, ) ] visit_statuses = [ { "origin": "foo", "visit": 3, "status": "not_found", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "foo", "visit": 4, "status": "not_found", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert actual_origin_visit_stats == [ OriginVisitStats( url="foo", visit_type="git", last_eventful=None, last_uneventful=None, last_failed=None, last_notfound=DATE3, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=7, ) ] def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): visit_statuses = [ { "origin": "foo", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 2, "status": "full", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert actual_origin_visit_stats == [ OriginVisitStats( url="bar", visit_type="git", last_eventful=None, last_uneventful=None, last_failed=DATE3, last_notfound=None, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=7, ) ] def test_journal_client_origin_visit_status_from_journal_last_failed2(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 2, "status": "failed", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "bar", "visit": 3, "status": "failed", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert actual_origin_visit_stats == [ OriginVisitStats( url="bar", visit_type="git", last_eventful=None, last_uneventful=None, last_failed=DATE2, last_notfound=None, last_snapshot=None, + next_visit_queue_position=None, + next_position_offset=6, ) ] def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), }, { "origin": "foo", "visit": 2, "status": "partial", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), }, { "origin": "foo", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert actual_origin_visit_stats == [ OriginVisitStats( url="foo", visit_type="git", last_eventful=DATE3, last_uneventful=None, last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + next_visit_queue_position=None, + next_position_offset=0, ) ] def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE3 + ONE_DAY, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), } # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=DATE3, last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], + next_visit_queue_position=None, + next_position_offset=4, ) ] ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) assert actual_origin_visit_stats == [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], last_eventful=DATE1, last_uneventful=visit_status["date"], # most recent date but uneventful last_failed=DATE2, last_notfound=DATE1, last_snapshot=visit_status["snapshot"], + next_visit_queue_position=None, + next_position_offset=5, # uneventful so visit less often ) ] VISIT_STATUSES = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "foo", "type": "git", "visit": 1, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "type": "git", "visit": 2, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) ) def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) expected_visit_stats = OriginVisitStats( url="foo", visit_type="git", last_eventful=DATE1 + ONE_DAY, last_uneventful=DATE1 + 3 * ONE_DAY, last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=5, # uneventful, visit origin less often in the future ) assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ expected_visit_stats ] VISIT_STATUSES_1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "partial", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_1, len(VISIT_STATUSES_1)) ) def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) (actual_visit_stats,) = swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) assert actual_visit_stats.url == "cavabarder" assert actual_visit_stats.visit_type == "hg" assert actual_visit_stats.last_eventful == DATE1 + 2 * ONE_DAY assert actual_visit_stats.last_uneventful == DATE1 + 3 * ONE_DAY assert actual_visit_stats.last_failed is None assert actual_visit_stats.last_notfound is None assert actual_visit_stats.last_snapshot == hash_to_bytes( "aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd" ) VISIT_STATUSES_2 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("0000000000000000000000000000000000000000"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("1111111111111111111111111111111111111111"), }, { "origin": "iciaussi", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("2222222222222222222222222222222222222222"), }, { "origin": "iciaussi", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("3333333333333333333333333333333333333333"), }, { "origin": "cavabarder", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("4444444444444444444444444444444444444444"), }, { "origin": "cavabarder", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("5555555555555555555555555555555555555555"), }, { "origin": "iciaussi", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("6666666666666666666666666666666666666666"), }, { "origin": "iciaussi", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("7777777777777777777777777777777777777777"), }, ] ) ] def test_journal_client_origin_visit_status_after_grab_next_visits( swh_scheduler, stored_lister ): """Ensure OriginVisitStat entries created in the db as a result of calling grab_next_visits() do not mess the OriginVisitStats upsert mechanism. """ listed_origins = [ ListedOrigin(lister_id=stored_lister.id, url=url, visit_type=visit_type) for (url, visit_type) in set((v["origin"], v["type"]) for v in VISIT_STATUSES_2) ] swh_scheduler.record_listed_origins(listed_origins) before = utcnow() swh_scheduler.grab_next_visits( visit_type="git", count=10, policy="oldest_scheduled_first" ) after = utcnow() assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [] assert swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] is not None process_journal_objects( {"origin_visit_status": VISIT_STATUSES_2}, scheduler=swh_scheduler ) for url in ("cavabarder", "iciaussi"): ovs = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] assert before <= ovs.last_scheduled <= after ovs = swh_scheduler.origin_visit_stats_get([(url, "hg")])[0] assert ovs.last_scheduled is None ovs = swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] assert ovs.last_eventful == DATE1 + 5 * ONE_DAY assert ovs.last_uneventful is None assert ovs.last_failed is None assert ovs.last_notfound is None assert ovs.last_snapshot == hash_to_bytes( "5555555555555555555555555555555555555555" ) def test_journal_client_origin_visit_status_duplicated_messages(swh_scheduler): """A duplicated message must be ignored """ visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) expected_visit_stats = OriginVisitStats( url="foo", visit_type="git", last_eventful=DATE1, last_uneventful=None, last_failed=None, last_notfound=None, last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), ) assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ expected_visit_stats ] def test_journal_client_origin_visit_status_several_upsert(swh_scheduler): - """A duplicated message must be ignored + """An old message updates old information """ visit_status1 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } visit_status2 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status2]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=DATE2, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ - expected_visit_stats + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1, + last_uneventful=DATE2, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=5, + ) ] VISIT_STATUSES_SAME_SNAPSHOT = [ {**ovs, "date": DATE1 + n * ONE_YEAR} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_SAME_SNAPSHOT, len(VISIT_STATUSES_SAME_SNAPSHOT)), ) def test_journal_client_origin_visit_statuses_same_snapshot_permutation( visit_statuses, swh_scheduler ): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) - expected_visit_stats = OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1, - last_uneventful=DATE1 + 2 * ONE_YEAR, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [ - expected_visit_stats + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_eventful=DATE1, + last_uneventful=DATE1 + 2 * ONE_YEAR, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_visit_queue_position=None, + next_position_offset=6, # 2 uneventful visits, whatever the permutation + ) ]