diff --git a/sql/updates/30.sql b/sql/updates/30.sql new file mode 100644 index 0000000..ed71236 --- /dev/null +++ b/sql/updates/30.sql @@ -0,0 +1,79 @@ +-- SWH DB schema upgrade +-- from_version: 29 +-- to_version: 30 +-- description: merge a bunch of fields in origin_visit_stats + +insert into dbversion (version, release, description) + values (30, now(), 'Work In Progress'); + + +create type last_visit_status as enum ('successful', 'failed', 'not_found'); +comment on type last_visit_status is 'Record of the status of the last visit of an origin'; + +alter table origin_visit_stats + add column last_successful timestamptz, + add column last_visit timestamptz, + add column last_visit_status last_visit_status; + +comment on column origin_visit_stats.last_successful is 'Date of the last successful visit, at which we recorded the `last_snapshot`'; +comment on column origin_visit_stats.last_visit is 'Date of the last visit overall'; +comment on column origin_visit_stats.last_visit_status is 'Status of the last visit'; + +update origin_visit_stats + set last_successful = greatest(last_eventful, last_uneventful), + last_visit = greatest(last_eventful, last_uneventful, last_failed, last_notfound); + +update origin_visit_stats + set last_visit_status = + case + when last_visit = last_failed then 'failed'::last_visit_status + when last_visit = last_notfound then 'not_found'::last_visit_status + else 'successful'::last_visit_status + end + where last_visit is not null; + + +create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) + returns setof scheduler_metrics + language sql +as $$ + insert into scheduler_metrics ( + lister_id, visit_type, last_update, + origins_known, origins_enabled, + origins_never_visited, origins_with_pending_changes + ) + select + lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, + count(*) as origins_known, + count(*) filter (where enabled) as origins_enabled, + count(*) filter (where + enabled and last_snapshot is NULL + ) as origins_never_visited, + count(*) filter (where + enabled and lo.last_update > last_successful + ) as origins_with_pending_changes + from listed_origins lo + left join origin_visit_stats ovs using (url, visit_type) + where + -- update only for the requested lister + update_metrics.lister_id = lo.lister_id + -- or for all listers if the function argument is null + or update_metrics.lister_id is null + group by (lister_id, visit_type) + on conflict (lister_id, visit_type) do update + set + last_update = EXCLUDED.last_update, + origins_known = EXCLUDED.origins_known, + origins_enabled = EXCLUDED.origins_enabled, + origins_never_visited = EXCLUDED.origins_never_visited, + origins_with_pending_changes = EXCLUDED.origins_with_pending_changes + returning * +$$; + +comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; + +alter table origin_visit_stats + drop column last_eventful, + drop column last_uneventful, + drop column last_failed, + drop column last_notfound; diff --git a/swh/scheduler/api/serializers.py b/swh/scheduler/api/serializers.py index 354d301..c420c8c 100644 --- a/swh/scheduler/api/serializers.py +++ b/swh/scheduler/api/serializers.py @@ -1,28 +1,34 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Decoder and encoders for swh.scheduler.model objects.""" from typing import Callable, Dict, List, Tuple import attr import swh.scheduler.model as model def _encode_model_object(obj): d = attr.asdict(obj, recurse=False) d["__type__"] = type(obj).__name__ return d +def _encode_enum(obj): + return obj.value + + ENCODERS: List[Tuple[type, str, Callable]] = [ (model.BaseSchedulerModel, "scheduler_model", _encode_model_object), + (model.LastVisitStatus, "last_visit_status", _encode_enum), ] DECODERS: Dict[str, Callable] = { - "scheduler_model": lambda d: getattr(model, d.pop("__type__"))(**d) + "scheduler_model": lambda d: getattr(model, d.pop("__type__"))(**d), + "last_visit_status": model.LastVisitStatus, } diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index f605eb0..a6ac60f 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,1085 +1,1083 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID import attr from psycopg2.errors import CardinalityViolation +from psycopg2.extensions import AsIs import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.scheduler.utils import utcnow from .exc import SchedulerException, StaleData, UnknownPolicy from .interface import ListedOriginPageToken, PaginatedListedOriginList -from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics +from .model import ( + LastVisitStatus, + ListedOrigin, + Lister, + OriginVisitStats, + SchedulerMetrics, +) logger = logging.getLogger(__name__) +def adapt_LastVisitStatus(v: LastVisitStatus): + return AsIs(f"'{v.value}'::last_visit_status") + + psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) +psycopg2.extensions.register_adapter(LastVisitStatus, adapt_LastVisitStatus) psycopg2.extras.register_uuid() def format_query(query, keys): """Format a query with the given keys""" query_keys = ", ".join(keys) placeholders = ", ".join(["%s"] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) class SchedulerBackend: """Backend for the Software Heritage scheduling database. """ def __init__(self, db, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = BaseDb(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() task_type_keys = [ "type", "description", "backend_name", "default_interval", "min_interval", "max_interval", "backoff_factor", "max_queue_length", "num_retries", "retry_delay", ] @db_transaction() def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] query = format_query( """insert into task_type ({keys}) values ({placeholders}) on conflict do nothing""", keys, ) cur.execute(query, [task_type[key] for key in keys]) @db_transaction() def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" query = format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cur.execute(query, (task_type_name,)) return cur.fetchone() @db_transaction() def get_task_types(self, db=None, cur=None): """Retrieve all registered task types""" query = format_query("select {keys} from task_type", self.task_type_keys,) cur.execute(query) return cur.fetchall() @db_transaction() def get_listers(self, db=None, cur=None) -> List[Lister]: """Retrieve information about all listers from the database. """ select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers """ cur.execute(query) return [Lister(**ret) for ret in cur.fetchall()] @db_transaction() def get_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Optional[Lister]: """Retrieve information about the given instance of the lister from the database. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers where (name, instance_name) = (%s, %s) """ cur.execute(query, (name, instance_name)) ret = cur.fetchone() if not ret: return None return Lister(**ret) @db_transaction() def get_or_create_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) insert_cols, insert_meta = ( ", ".join(tup) for tup in Lister.insert_columns_and_metavars() ) query = f""" with added as ( insert into listers ({insert_cols}) values ({insert_meta}) on conflict do nothing returning {select_cols} ) select {select_cols} from added union all select {select_cols} from listers where (name, instance_name) = (%(name)s, %(instance_name)s); """ cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name))) return Lister(**cur.fetchone()) @db_transaction() def update_lister(self, lister: Lister, db=None, cur=None) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ select_cols = ", ".join(Lister.select_columns()) set_vars = ", ".join( f"{col} = {meta}" for col, meta in zip(*Lister.insert_columns_and_metavars()) ) query = f"""update listers set {set_vars} where id=%(id)s and updated=%(updated)s returning {select_cols}""" cur.execute(query, attr.asdict(lister)) updated = cur.fetchone() if not updated: raise StaleData("Stale data; Lister state not updated") return Lister(**updated) @db_transaction() def record_listed_origins( self, listed_origins: Iterable[ListedOrigin], db=None, cur=None ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ pk_cols = ListedOrigin.primary_key_columns() select_cols = ListedOrigin.select_columns() insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars() upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols) query = f"""INSERT into listed_origins ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} RETURNING {", ".join(select_cols)} """ ret = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=(attr.asdict(origin) for origin in listed_origins), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=True, ) return [ListedOrigin(**d) for d in ret] @db_transaction() def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, db=None, cur=None, ) -> PaginatedListedOriginList: """Get information on the listed origins matching either the `url` or `lister_id`, or both arguments. """ query_filters: List[str] = [] query_params: List[Union[int, str, UUID, Tuple[UUID, str]]] = [] if lister_id: query_filters.append("lister_id = %s") query_params.append(lister_id) if url is not None: query_filters.append("url = %s") query_params.append(url) if page_token is not None: query_filters.append("(lister_id, url) > %s") # the typeshed annotation for tuple() is too strict. query_params.append(tuple(page_token)) # type: ignore query_params.append(limit) select_cols = ", ".join(ListedOrigin.select_columns()) if query_filters: where_clause = "where %s" % (" and ".join(query_filters)) else: where_clause = "" query = f"""SELECT {select_cols} from listed_origins {where_clause} ORDER BY lister_id, url LIMIT %s""" cur.execute(query, tuple(query_params)) origins = [ListedOrigin(**d) for d in cur] if len(origins) == limit: page_token = (str(origins[-1].lister_id), origins[-1].url) else: page_token = None return PaginatedListedOriginList(origins, page_token) @db_transaction() def grab_next_visits( self, visit_type: str, count: int, policy: str, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), - notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), + not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), db=None, cur=None, ) -> List[ListedOrigin]: if timestamp is None: timestamp = utcnow() origin_select_cols = ", ".join(ListedOrigin.select_columns()) query_args: List[Any] = [] where_clauses = [] # list of (name, query) handled as CTEs before the main query common_table_expressions: List[Tuple[str, str]] = [] # "NOT enabled" = the lister said the origin no longer exists where_clauses.append("enabled") # Only schedule visits of the given type where_clauses.append("visit_type = %s") query_args.append(visit_type) if scheduled_cooldown: # Don't re-schedule visits if they're already scheduled but we haven't # recorded a result yet, unless they've been scheduled more than a week # ago (it probably means we've lost them in flight somewhere). where_clauses.append( """origin_visit_stats.last_scheduled IS NULL OR origin_visit_stats.last_scheduled < GREATEST( %s - %s, - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful, - origin_visit_stats.last_failed, - origin_visit_stats.last_notfound + origin_visit_stats.last_visit ) """ ) query_args.append(timestamp) query_args.append(scheduled_cooldown) if failed_cooldown: # Don't retry failed origins too often where_clauses.append( - "origin_visit_stats.last_failed is null " - "or origin_visit_stats.last_failed < %s - %s" + "origin_visit_stats.last_visit_status is distinct from 'failed' " + "or origin_visit_stats.last_visit < %s - %s" ) query_args.append(timestamp) query_args.append(failed_cooldown) - if notfound_cooldown: + if not_found_cooldown: # Don't retry not found origins too often where_clauses.append( - "origin_visit_stats.last_notfound is null " - "or origin_visit_stats.last_notfound < %s - %s" + "origin_visit_stats.last_visit_status is distinct from 'not_found' " + "or origin_visit_stats.last_visit < %s - %s" ) query_args.append(timestamp) - query_args.append(notfound_cooldown) + query_args.append(not_found_cooldown) if policy == "oldest_scheduled_first": order_by = "origin_visit_stats.last_scheduled NULLS FIRST" elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NULL") # order by increasing last_update (oldest first) where_clauses.append("listed_origins.last_update IS NOT NULL") order_by = "listed_origins.last_update" elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? # visited origins have a NOT NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") # ignore origins we have visited after the known last update where_clauses.append("listed_origins.last_update IS NOT NULL") where_clauses.append( - """ - listed_origins.last_update - > GREATEST( - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful - ) - """ + "listed_origins.last_update > origin_visit_stats.last_successful" ) # order by decreasing visit lag - order_by = """\ - listed_origins.last_update - - GREATEST( - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful - ) - DESC - """ + order_by = ( + "listed_origins.last_update - origin_visit_stats.last_successful DESC" + ) elif policy == "origins_without_last_update": where_clauses.append("last_update IS NULL") order_by = "origin_visit_stats.next_visit_queue_position nulls first" # fmt: off # This policy requires updating the global queue position for this # visit type common_table_expressions.append(("update_queue_position", """ INSERT INTO visit_scheduler_queue_position(visit_type, position) SELECT visit_type, COALESCE(MAX(next_visit_queue_position), now()) FROM selected_origins GROUP BY visit_type ON CONFLICT(visit_type) DO UPDATE SET position=GREATEST( visit_scheduler_queue_position.position, EXCLUDED.position ) """)) # fmt: on else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") # fmt: off common_table_expressions.insert(0, ("selected_origins", f""" SELECT {origin_select_cols}, next_visit_queue_position FROM listed_origins LEFT JOIN origin_visit_stats USING (url, visit_type) WHERE ({") AND (".join(where_clauses)}) ORDER BY {order_by} LIMIT %s """)) # fmt: on query_args.append(count) # fmt: off common_table_expressions.append(("update_stats", """ INSERT INTO origin_visit_stats (url, visit_type, last_scheduled) SELECT url, visit_type, %s FROM selected_origins ON CONFLICT (url, visit_type) DO UPDATE SET last_scheduled = GREATEST( origin_visit_stats.last_scheduled, EXCLUDED.last_scheduled ) """)) # fmt: on query_args.append(timestamp) formatted_ctes = ",\n".join( f"{name} AS (\n{cte}\n)" for name, cte in common_table_expressions ) query = f""" WITH {formatted_ctes} SELECT {origin_select_cols} FROM selected_origins """ cur.execute(query, tuple(query_args)) return [ListedOrigin(**d) for d in cur] task_create_keys = [ "type", "arguments", "next_run", "policy", "status", "retries_left", "priority", ] task_keys = task_create_keys + ["id", "current_interval"] @db_transaction() def create_tasks(self, tasks, policy="recurring", db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cur.execute("select swh_scheduler_mktemp_task()") db.copy_to( tasks, "tmp_task", self.task_create_keys, default_values={"policy": policy, "status": "next_run_not_scheduled"}, cur=cur, ) query = format_query( "select {keys} from swh_scheduler_create_tasks_from_temp()", self.task_keys, ) cur.execute(query) return cur.fetchall() @db_transaction() def set_status_tasks( self, task_ids: List[int], status: str = "disabled", next_run: Optional[datetime.datetime] = None, db=None, cur=None, ): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args: List[Any] = [status] if next_run: query.append(", next_run = %s") args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) cur.execute("".join(query), args) @db_transaction() def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" return self.set_status_tasks(task_ids, db=db, cur=cur) @db_transaction() def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, db=None, cur=None, ): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append("id = %s") else: where.append("id in %s") task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append("type = %s") else: where.append("type in %s") task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append("status = %s") else: where.append("status in %s") status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append("priority = %s") else: priority = tuple(priority) where.append("priority in %s") args.append(priority) if policy: where.append("policy = %s") args.append(policy) if before: where.append("next_run <= %s") args.append(before) if after: where.append("next_run >= %s") args.append(after) query = "select * from task" if where: query += " where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" query = format_query("select {keys} from task where id in %s", self.task_keys) cur.execute(query, (tuple(task_ids),)) return cur.fetchall() @db_transaction() def peek_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_no_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def grab_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def peek_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_any_ready_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s", task_type, cur.rowcount) return cur.fetchall() @db_transaction() def grab_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_any_ready_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s", task_type, cur.rowcount) return cur.fetchall() task_run_create_keys = ["task", "backend_id", "scheduled", "metadata"] @db_transaction() def schedule_task_run( self, task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)", (task_id, backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cur.execute("select swh_scheduler_mktemp_task_run()") db.copy_to(task_runs, "tmp_task_run", self.task_run_create_keys, cur=cur) cur.execute("select swh_scheduler_schedule_task_run_from_temp()") @db_transaction() def start_task_run( self, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_start_task_run(%s, %s, %s)", (backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, db=None, cur=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_end_task_run(%s, %s, %s, %s)", (backend_id, status, metadata, timestamp), ) return cur.fetchone() @db_transaction() def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ assert not page_token or isinstance(page_token, str) last_id = -1 if page_token is None else int(page_token) tasks = [] cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit + 1), ) for row in cur: task = dict(row) # nested type index does not accept bare values # transform it as a dict to comply with this task["arguments"]["args"] = { i: v for i, v in enumerate(task["arguments"]["args"]) } kwargs = task["arguments"]["kwargs"] task["arguments"]["kwargs"] = json.dumps(kwargs) tasks.append(task) if len(tasks) >= limit + 1: # remains data, add pagination information result = { "tasks": tasks[:limit], "next_page_token": str(tasks[-1]["task_id"]), } else: result = {"tasks": tasks} return result @db_transaction() def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id["task_id"]) _task_run_ids.append(task_id["task_run_id"]) cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids), ) task_run_keys = [ "id", "task", "backend_id", "scheduled", "started", "ended", "metadata", "status", ] @db_transaction() def get_task_runs(self, task_ids, limit=None, db=None, cur=None): """Search task run for a task id""" where = [] args = [] if task_ids: if isinstance(task_ids, (str, int)): where.append("task = %s") else: where.append("task in %s") task_ids = tuple(task_ids) args.append(task_ids) else: return () query = "select * from task_run where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None ) -> None: pk_cols = OriginVisitStats.primary_key_columns() insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join( f"{col} = coalesce(EXCLUDED.{col}, ovi.{col})" for col in upsert_cols ) query = f""" INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} """ try: psycopg2.extras.execute_values( cur=cur, sql=query, argslist=( attr.asdict(visit_stats) for visit_stats in origin_visit_stats ), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=False, ) except CardinalityViolation as e: raise SchedulerException(repr(e)) @db_transaction() def origin_visit_stats_get( self, ids: Iterable[Tuple[str, str]], db=None, cur=None ) -> List[OriginVisitStats]: if not ids: return [] primary_keys = tuple((origin, visit_type) for (origin, visit_type) in ids) query = format_query( """ SELECT {keys} FROM (VALUES %s) as stats(url, visit_type) INNER JOIN origin_visit_stats USING (url, visit_type) """, OriginVisitStats.select_columns(), ) rows = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=primary_keys, fetch=True ) return [OriginVisitStats(**row) for row in rows] @db_transaction() def visit_scheduler_queue_position_get( self, db=None, cur=None, ) -> Dict[str, datetime.datetime]: cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position") return {row["visit_type"]: row["position"] for row in cur} @db_transaction() def visit_scheduler_queue_position_set( self, visit_type: str, position: datetime.datetime, db=None, cur=None, ) -> None: query = """ INSERT INTO visit_scheduler_queue_position(visit_type, position) VALUES(%s, %s) ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position """ cur.execute(query, (visit_type, position)) @db_transaction() def update_metrics( self, lister_id: Optional[UUID] = None, timestamp: Optional[datetime.datetime] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: """Update the performance metrics of this scheduler instance. Returns the updated metrics. Args: lister_id: if passed, update the metrics only for this lister instance timestamp: if passed, the date at which we're updating the metrics, defaults to the database NOW() """ query = format_query( "SELECT {keys} FROM update_metrics(%s, %s)", SchedulerMetrics.select_columns(), ) cur.execute(query, (lister_id, timestamp)) return [SchedulerMetrics(**row) for row in cur.fetchall()] @db_transaction() def get_metrics( self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: """Retrieve the performance metrics of this scheduler instance. Args: lister_id: filter the metrics for this lister instance only visit_type: filter the metrics for this visit type only """ where_filters = [] where_args = [] if lister_id: where_filters.append("lister_id = %s") where_args.append(str(lister_id)) if visit_type: where_filters.append("visit_type = %s") where_args.append(visit_type) where_clause = "" if where_filters: where_clause = f"where {' and '.join(where_filters)}" query = format_query( "SELECT {keys} FROM scheduler_metrics %s" % where_clause, SchedulerMetrics.select_columns(), ) cur.execute(query, tuple(where_args)) return [SchedulerMetrics(**row) for row in cur.fetchall()] diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py index e2ff2ed..73f0a0b 100644 --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -1,489 +1,489 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint from swh.core.api.classes import PagedResult from swh.scheduler.model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics ListedOriginPageToken = Tuple[str, str] class PaginatedListedOriginList(PagedResult[ListedOrigin, ListedOriginPageToken]): """A list of listed origins, with a continuation token""" def __init__( self, results: List[ListedOrigin], next_page_token: Union[None, ListedOriginPageToken, List[str]], ): parsed_next_page_token: Optional[Tuple[str, str]] = None if next_page_token is not None: if len(next_page_token) != 2: raise TypeError("Expected Tuple[str, str] or list of size 2.") parsed_next_page_token = tuple(next_page_token) # type: ignore super().__init__(results, parsed_next_page_token) @runtime_checkable class SchedulerInterface(Protocol): @remote_api_endpoint("task_type/create") def create_task_type(self, task_type): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ ... @remote_api_endpoint("task_type/get") def get_task_type(self, task_type_name): """Retrieve the task type with id task_type_name""" ... @remote_api_endpoint("task_type/get_all") def get_task_types(self): """Retrieve all registered task types""" ... @remote_api_endpoint("task/create") def create_tasks(self, tasks, policy="recurring"): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ ... @remote_api_endpoint("task/set_status") def set_status_tasks( self, task_ids: List[int], status: str = "disabled", next_run: Optional[datetime.datetime] = None, ): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ ... @remote_api_endpoint("task/disable") def disable_tasks(self, task_ids): """Disable the tasks whose ids are listed.""" ... @remote_api_endpoint("task/search") def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, ): """Search tasks from selected criterions""" ... @remote_api_endpoint("task/get") def get_tasks(self, task_ids): """Retrieve the info of tasks whose ids are listed.""" ... @remote_api_endpoint("task/peek_ready") def peek_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: """Fetch the list of tasks (with no priority) to be scheduled. Args: task_type: filtering task per their type timestamp: peek tasks that need to be executed before that timestamp num_tasks: only peek at num_tasks tasks (with no priority) Returns: the list of tasks which would be scheduled """ ... @remote_api_endpoint("task/grab_ready") def grab_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: """Fetch and schedule the list of tasks (with no priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: grab tasks that need to be executed before that timestamp num_tasks: only grab num_tasks tasks (with no priority) Returns: the list of scheduled tasks """ ... @remote_api_endpoint("task/peek_ready_with_priority") def peek_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: """Fetch list of tasks (with any priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: peek tasks that need to be executed before that timestamp num_tasks: only peek at num_tasks tasks (with no priority) Returns: a list of tasks """ ... @remote_api_endpoint("task/grab_ready_with_priority") def grab_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: """Fetch and schedule the list of tasks (with any priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: grab tasks that need to be executed before that timestamp num_tasks: only grab num_tasks tasks (with no priority) Returns: a list of tasks """ ... @remote_api_endpoint("task_run/schedule_one") def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ ... @remote_api_endpoint("task_run/schedule") def mass_schedule_task_runs(self, task_runs): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ ... @remote_api_endpoint("task_run/start") def start_task_run(self, backend_id, metadata=None, timestamp=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ ... @remote_api_endpoint("task_run/end") def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ ... @remote_api_endpoint("task/filter_for_archive") def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ ... @remote_api_endpoint("task/delete_archived") def delete_archived_tasks(self, task_ids): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ ... @remote_api_endpoint("task_run/get") def get_task_runs(self, task_ids, limit=None): """Search task run for a task id""" ... @remote_api_endpoint("listers/get") def get_listers(self) -> List[Lister]: """Retrieve information about all listers from the database. """ ... @remote_api_endpoint("lister/get") def get_lister( self, name: str, instance_name: Optional[str] = None ) -> Optional[Lister]: """Retrieve information about the given instance of the lister from the database. """ ... @remote_api_endpoint("lister/get_or_create") def get_or_create_lister( self, name: str, instance_name: Optional[str] = None ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ ... @remote_api_endpoint("lister/update") def update_lister(self, lister: Lister) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ ... @remote_api_endpoint("origins/record") def record_listed_origins( self, listed_origins: Iterable[ListedOrigin] ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ ... @remote_api_endpoint("origins/get") def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, ) -> PaginatedListedOriginList: """Get information on the listed origins matching either the `url` or `lister_id`, or both arguments. Use the `limit` and `page_token` arguments for continuation. The next page token, if any, is returned in the PaginatedListedOriginList object. """ ... @remote_api_endpoint("origins/grab_next") def grab_next_visits( self, visit_type: str, count: int, policy: str, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), - notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), + not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin. Arguments: visit_type: type of visits to schedule count: number of visits to schedule policy: the scheduling policy used to select which visits to schedule timestamp: the mocked timestamp at which we're recording that the visits are being scheduled (defaults to the current time) scheduled_cooldown: the minimal interval before which we can schedule the same origin again failed_cooldown: the minimal interval before which we can reschedule a failed origin - notfound_cooldown: the minimal interval before which we can reschedule a - notfound origin + not_found_cooldown: the minimal interval before which we can reschedule a + not_found origin """ ... @remote_api_endpoint("visit_stats/upsert") def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats] ) -> None: """Create a new origin visit stats """ ... @remote_api_endpoint("visit_stats/get") def origin_visit_stats_get( self, ids: Iterable[Tuple[str, str]] ) -> List[OriginVisitStats]: """Retrieve the stats for an origin with a given visit type If some visit_stats are not found, they are filtered out of the result. So the output list may be of length inferior to the length of the input list. """ ... @remote_api_endpoint("visit_scheduler/get") def visit_scheduler_queue_position_get(self,) -> Dict[str, datetime.datetime]: """Retrieve all current queue positions for the recurrent visit scheduler. Returns Mapping of visit type to their current queue position """ ... @remote_api_endpoint("visit_scheduler/set") def visit_scheduler_queue_position_set( self, visit_type: str, position: datetime.datetime ) -> None: """Set the current queue position of the recurrent visit scheduler for `visit_type`. """ ... @remote_api_endpoint("scheduler_metrics/update") def update_metrics( self, lister_id: Optional[UUID] = None, timestamp: Optional[datetime.datetime] = None, ) -> List[SchedulerMetrics]: """Update the performance metrics of this scheduler instance. Returns the updated metrics. Args: lister_id: if passed, update the metrics only for this lister instance timestamp: if passed, the date at which we're updating the metrics, defaults to the database NOW() """ ... @remote_api_endpoint("scheduler_metrics/get") def get_metrics( self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None ) -> List[SchedulerMetrics]: """Retrieve the performance metrics of this scheduler instance. Args: lister_id: filter the metrics for this lister instance only visit_type: filter the metrics for this visit type only """ ... diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py index 3c272cf..13ce184 100644 --- a/swh/scheduler/journal_client.py +++ b/swh/scheduler/journal_client.py @@ -1,257 +1,249 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import random from typing import Dict, List, Optional, Tuple import attr from swh.scheduler.interface import SchedulerInterface -from swh.scheduler.model import OriginVisitStats +from swh.scheduler.model import LastVisitStatus, OriginVisitStats from swh.scheduler.utils import utcnow msg_type = "origin_visit_status" def max_date(*dates: Optional[datetime]) -> datetime: """Return the max date of given (possibly None) dates At least one date must be not None. """ filtered_dates = [d for d in dates if d is not None] if not filtered_dates: raise ValueError("At least one date should be a valid datetime") return max(filtered_dates) -def update_next_position_offset(visit_stats: Dict, increment: int) -> None: - """Update the next position offset according to existing value and the increment. The - resulting value must be a positive integer. +def update_next_position_offset(visit_stats: Dict, eventful: Optional[bool]) -> None: + """Update the next position offset according to the existing value and the eventfulness + of the visit. The resulting value must be a positive integer. """ + increment = -2 if eventful else 1 + visit_stats["next_position_offset"] = max( 0, visit_stats["next_position_offset"] + increment ) def from_position_offset_to_days(position_offset: int) -> int: """Compute position offset to interval in days. - index 0 and 1: interval 1 day - index 2, 3 and 4: interval 2 days - index 5 and up: interval `4^(n-4)` days for n in (4, 16, 64, 256, 1024, ...) Args: position_offset: The actual position offset for a given visit stats Returns: The offset as an interval in number of days """ assert position_offset >= 0 if position_offset < 2: result = 1 elif position_offset < 5: result = 2 else: result = 4 ** (position_offset - 4) return result def next_visit_queue_position( queue_position_per_visit_type: Dict, visit_stats: Dict ) -> datetime: """Compute the next visit queue position for the given visit_stats. This takes the visit_stats next_position_offset value and compute a corresponding interval in days (with a random fudge factor of -/+ 10% range to avoid scheduling burst for hosters). Then computes out of this visit interval and the current visit stats's position in the queue a new position. As an implementation detail, if the visit stats does not have a queue position yet, this fallbacks to use the current global position (for the same visit type as the visit stats) to compute the new position in the queue. If there is no global state yet for the visit type, this starts up using the ``utcnow`` function as default value. Args: queue_position_per_visit_type: The global state of the queue per visit type visit_stats: The actual visit information to compute the next position for Returns: The actual next visit queue position for that visit stats """ days = from_position_offset_to_days(visit_stats["next_position_offset"]) random_fudge_factor = random.uniform(-0.1, 0.1) visit_interval = timedelta(days=days * (1 + random_fudge_factor)) # Use the current queue position per visit type as starting position if none is # already set default_queue_position = queue_position_per_visit_type.get( visit_stats["visit_type"], utcnow() ) current_position = ( visit_stats["next_visit_queue_position"] if visit_stats.get("next_visit_queue_position") else default_queue_position ) return current_position + visit_interval +def get_last_status( + incoming_visit_status: Dict, known_visit_stats: Dict +) -> Tuple[LastVisitStatus, Optional[bool]]: + """Determine the `last_visit_status` and eventfulness of an origin according to + the received visit_status object, and the state of the origin_visit_stats in db + """ + + status = incoming_visit_status["status"] + if status in ("not_found", "failed"): + return LastVisitStatus(status), None + + assert status in ("full", "partial") + + if incoming_visit_status["snapshot"] is None: + return LastVisitStatus.failed, None + + if incoming_visit_status["snapshot"] != known_visit_stats.get("last_snapshot"): + return LastVisitStatus.successful, True + + return LastVisitStatus.successful, False + + def process_journal_objects( messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface ) -> None: """Read messages from origin_visit_status journal topic to update "origin_visit_stats" information on (origin, visit_type). The goal is to compute visit stats information - per origin and visit_type: last_eventful, last_uneventful, last_failed, - last_notfound, last_snapshot, ... + per origin and visit_type: `last_successful`, `last_visit`, `last_visit_status`, ... Details: - - This journal consumes origin visit status information for final visit status - ("full", "partial", "failed", "not_found"). It drops the information on non - final visit statuses ("ongoing", "created"). + - This journal consumes origin visit status information for final visit + status (`"full"`, `"partial"`, `"failed"`, `"not_found"`). It drops + the information of non final visit statuses (`"ongoing"`, + `"created"`). - - The snapshot is used to determine the "eventful/uneventful" nature of the - origin visit status. + - This journal client only considers messages that arrive in + chronological order. Messages that arrive out of order (i.e. with a + date field smaller than the latest recorded visit of the origin) are + ignored. This is a tradeoff between correctness and simplicity of + implementation [1]_. - - When no snapshot is provided, the visit is considered as failed so the - last_failed column is updated. + - The snapshot is used to determine the eventful or uneventful nature of + the origin visit. - - As there is no time guarantee when reading message from the topic, the code - tries to keep the data in the most timely ordered as possible. + - When no snapshot is provided, the visit is considered as failed. - - Compared to what is already stored in the origin_visit_stats table, only most - recent information is kept. - - - This updates the `next_visit_queue_position` (time at which some new objects + - Finally, the `next_visit_queue_position` (time at which some new objects are expected to be added for the origin), and `next_position_offset` (duration - that we expect to wait between visits of this origin). + that we expect to wait between visits of this origin) are updated. This is a worker function to be used with `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`. + .. [1] Ignoring out of order messages makes the initialization of the + origin_visit_status table (from a full journal) less deterministic: only the + `last_visit`, `last_visit_state` and `last_successful` fields are guaranteed + to be exact, the `next_position_offset` field is a best effort estimate + (which should converge once the client has run for a while on in-order + messages). + """ assert set(messages) <= { msg_type }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" assert msg_type in messages, f"Expected {msg_type} messages" interesting_messages = [ msg for msg in messages[msg_type] if "type" in msg and msg["status"] not in ("created", "ongoing") ] if not interesting_messages: return origin_visit_stats: Dict[Tuple[str, str], Dict] = { (visit_stats.url, visit_stats.visit_type): attr.asdict(visit_stats) for visit_stats in scheduler.origin_visit_stats_get( list(set((vs["origin"], vs["type"]) for vs in interesting_messages)) ) } # Use the default values from the model object empty_object = { field.name: field.default if field.default != attr.NOTHING else None for field in attr.fields(OriginVisitStats) } # Retrieve the global queue state queue_position_per_visit_type = scheduler.visit_scheduler_queue_position_get() for msg_dict in interesting_messages: origin = msg_dict["origin"] visit_type = msg_dict["type"] pk = origin, visit_type if pk not in origin_visit_stats: origin_visit_stats[pk] = { **empty_object, "url": origin, "visit_type": visit_type, } visit_stats_d = origin_visit_stats[pk] - if msg_dict["status"] == "not_found": - visit_stats_d["last_notfound"] = max_date( - msg_dict["date"], visit_stats_d.get("last_notfound") - ) - update_next_position_offset(visit_stats_d, 1) # visit less often - elif msg_dict["status"] == "failed" or msg_dict["snapshot"] is None: - visit_stats_d["last_failed"] = max_date( - msg_dict["date"], visit_stats_d.get("last_failed") + if ( + visit_stats_d.get("last_visit") + and msg_dict["date"] <= visit_stats_d["last_visit"] + ): + # message received out of order, ignore + continue + + # Compare incoming message to known status of the origin, to determine + # eventfulness + last_visit_status, eventful = get_last_status(msg_dict, visit_stats_d) + + # Update the position offset according to the visit status, + # if we had already visited this origin before. + + if visit_stats_d.get("last_visit"): + update_next_position_offset(visit_stats_d, eventful) + + # Record current visit date as highest known date (we've rejected out of order + # messages earlier). + visit_stats_d["last_visit"] = msg_dict["date"] + visit_stats_d["last_visit_status"] = last_visit_status + + # Record last successful visit date + if last_visit_status == LastVisitStatus.successful: + visit_stats_d["last_successful"] = max_date( + msg_dict["date"], visit_stats_d.get("last_successful") ) - update_next_position_offset(visit_stats_d, 1) # visit less often - else: # visit with snapshot, something happened - if visit_stats_d["last_snapshot"] is None: - # first time visit with snapshot, we keep relevant information - visit_stats_d["last_eventful"] = msg_dict["date"] - visit_stats_d["last_snapshot"] = msg_dict["snapshot"] - else: - # last_snapshot is set, so an eventful visit should have previously been - # recorded - assert visit_stats_d["last_eventful"] is not None - latest_recorded_visit_date = max_date( - visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] - ) - current_status_date = msg_dict["date"] - previous_snapshot = visit_stats_d["last_snapshot"] - if msg_dict["snapshot"] != previous_snapshot: - if ( - latest_recorded_visit_date - and current_status_date < latest_recorded_visit_date - ): - # out of order message so ignored - continue - # new eventful visit (new snapshot) - visit_stats_d["last_eventful"] = current_status_date - visit_stats_d["last_snapshot"] = msg_dict["snapshot"] - # Visit this origin more often in the future - update_next_position_offset(visit_stats_d, -2) - else: - # same snapshot as before - if ( - latest_recorded_visit_date - and current_status_date < latest_recorded_visit_date - ): - # we receive an old message which is an earlier "eventful" event - # than what we had, we consider the last_eventful event as - # actually an uneventful event. - # The last uneventful visit remains the most recent: - # max, previously computed - visit_stats_d["last_uneventful"] = latest_recorded_visit_date - # The eventful visit remains the oldest one: min - visit_stats_d["last_eventful"] = min( - visit_stats_d["last_eventful"], current_status_date - ) - # Visit this origin less often in the future - update_next_position_offset(visit_stats_d, 1) - elif ( - latest_recorded_visit_date - and current_status_date == latest_recorded_visit_date - ): - # A duplicated message must be ignored to avoid - # populating the last_uneventful message - continue - else: - # uneventful event - visit_stats_d["last_uneventful"] = current_status_date - # Visit this origin less often in the future - update_next_position_offset(visit_stats_d, 1) + visit_stats_d["last_snapshot"] = msg_dict["snapshot"] # Update the next visit queue position (which will be used solely for origin # without any last_update, cf. the dedicated scheduling policy # "origins_without_last_update") visit_stats_d["next_visit_queue_position"] = next_visit_queue_position( queue_position_per_visit_type, visit_stats_d ) scheduler.origin_visit_stats_upsert( OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() ) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py index 28aff38..b10cff9 100644 --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -1,262 +1,271 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -from typing import Any, Dict, List, Optional, Tuple +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple, Union from uuid import UUID import attr import attr.converters from attrs_strict import type_validator def check_timestamptz(value) -> None: """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") @attr.s class BaseSchedulerModel: """Base class for database-backed objects. These database-backed objects are defined through attrs-based attributes that match the columns of the database 1:1. This is a (very) lightweight ORM. These attrs-based attributes have metadata specific to the functionality expected from these fields in the database: - `primary_key`: the column is a primary key; it should be filtered out when doing an `update` of the object - `auto_primary_key`: the column is a primary key, which is automatically handled by the database. It will not be inserted to. This must be matched with a database-side default value. - `auto_now_add`: the column is a timestamp that is set to the current time when the object is inserted, and never updated afterwards. This must be matched with a database-side default value. - `auto_now`: the column is a timestamp that is set to the current time when the object is inserted or updated. """ _pk_cols: Optional[Tuple[str, ...]] = None _select_cols: Optional[Tuple[str, ...]] = None _insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None @classmethod def primary_key_columns(cls) -> Tuple[str, ...]: """Get the primary key columns for this object type""" if cls._pk_cols is None: columns: List[str] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_primary_key", "primary_key") ): columns.append(field.name) cls._pk_cols = tuple(sorted(columns)) return cls._pk_cols @classmethod def select_columns(cls) -> Tuple[str, ...]: """Get all the database columns needed for a `select` on this object type""" if cls._select_cols is None: columns: List[str] = [] for field in attr.fields(cls): columns.append(field.name) cls._select_cols = tuple(sorted(columns)) return cls._select_cols @classmethod def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: """Get the database columns and metavars needed for an `insert` or `update` on this object type. This implements support for the `auto_*` field metadata attributes. """ if cls._insert_cols_and_metavars is None: zipped_cols_and_metavars: List[Tuple[str, str]] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_now_add", "auto_primary_key") ): continue elif field.metadata.get("auto_now"): zipped_cols_and_metavars.append((field.name, "now()")) else: zipped_cols_and_metavars.append((field.name, f"%({field.name})s")) zipped_cols_and_metavars.sort() cols, metavars = zip(*zipped_cols_and_metavars) cls._insert_cols_and_metavars = cols, metavars return cls._insert_cols_and_metavars @attr.s class Lister(BaseSchedulerModel): name = attr.ib(type=str, validator=[type_validator()]) instance_name = attr.ib(type=str, validator=[type_validator()]) # Populated by database id = attr.ib( type=Optional[UUID], validator=type_validator(), default=None, metadata={"auto_primary_key": True}, ) current_state = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) created = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) updated = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) @attr.s class ListedOrigin(BaseSchedulerModel): """Basic information about a listed origin, output by a lister""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) extra_loader_arguments = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) enabled = attr.ib(type=bool, validator=[type_validator()], default=True) first_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) last_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) def as_task_dict(self): return { "type": f"load-{self.visit_type}", "arguments": { "args": [], "kwargs": {"url": self.url, **self.extra_loader_arguments}, }, } +class LastVisitStatus(Enum): + successful = "successful" + failed = "failed" + not_found = "not_found" + + +def convert_last_visit_status( + s: Union[None, str, LastVisitStatus] +) -> Optional[LastVisitStatus]: + if not isinstance(s, str): + return s + return LastVisitStatus(s) + + @attr.s(frozen=True, slots=True) class OriginVisitStats(BaseSchedulerModel): """Represents an aggregated origin visits view. """ url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) - last_eventful = attr.ib( - type=Optional[datetime.datetime], validator=type_validator() + last_successful = attr.ib( + type=Optional[datetime.datetime], validator=type_validator(), default=None ) - last_uneventful = attr.ib( - type=Optional[datetime.datetime], validator=type_validator() + last_visit = attr.ib( + type=Optional[datetime.datetime], validator=type_validator(), default=None ) - last_failed = attr.ib(type=Optional[datetime.datetime], validator=type_validator()) - last_notfound = attr.ib( - type=Optional[datetime.datetime], validator=type_validator() + last_visit_status = attr.ib( + type=Optional[LastVisitStatus], + validator=type_validator(), + default=None, + converter=convert_last_visit_status, ) last_scheduled = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) last_snapshot = attr.ib( type=Optional[bytes], validator=type_validator(), default=None ) next_visit_queue_position = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None ) next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) - @last_eventful.validator - def check_last_eventful(self, attribute, value): - check_timestamptz(value) - - @last_uneventful.validator - def check_last_uneventful(self, attribute, value): - check_timestamptz(value) - - @last_failed.validator - def check_last_failed(self, attribute, value): + @last_successful.validator + def check_last_successful(self, attribute, value): check_timestamptz(value) - @last_notfound.validator - def check_last_notfound(self, attribute, value): + @last_visit.validator + def check_last_visit(self, attribute, value): check_timestamptz(value) @next_visit_queue_position.validator def check_next_visit_queue_position(self, attribute, value): check_timestamptz(value) @attr.s(frozen=True, slots=True) class SchedulerMetrics(BaseSchedulerModel): """Metrics for the scheduler, aggregated by (lister_id, visit_type)""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) origins_known = attr.ib(type=int, validator=[type_validator()], default=0) """Number of known (enabled or disabled) origins""" origins_enabled = attr.ib(type=int, validator=[type_validator()], default=0) """Number of origins that were present in the latest listings""" origins_never_visited = attr.ib(type=int, validator=[type_validator()], default=0) """Number of enabled origins that have never been visited (according to the visit cache)""" origins_with_pending_changes = attr.ib( type=int, validator=[type_validator()], default=0 ) """Number of enabled origins with known activity (recorded by a lister) since our last visit""" diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql index 0e75155..8f160be 100644 --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -1,227 +1,227 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (29, now(), 'Work In Progress'); + values (30, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval, min_interval interval, max_interval interval, backoff_factor float, max_queue_length bigint, num_retries bigint, retry_delay interval ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; comment on column task_type.num_retries is 'Default number of retries on transient failures'; comment on column task_type.retry_delay is 'Retry delay for the task'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'); comment on type task_status is 'Status of a given task'; create type task_policy as enum ('recurring', 'oneshot'); comment on type task_policy is 'Recurrence policy of the given task'; create type task_priority as enum('high', 'normal', 'low'); comment on type task_priority is 'Priority of the given task'; create table priority_ratio( id task_priority primary key, ratio float not null ); comment on table priority_ratio is 'Oneshot task''s reading ratio per priority'; comment on column priority_ratio.id is 'Task priority id'; comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority'; insert into priority_ratio (id, ratio) values ('high', 0.5); insert into priority_ratio (id, ratio) values ('normal', 0.3); insert into priority_ratio (id, ratio) values ('low', 0.2); create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval, status task_status not null, policy task_policy not null default 'recurring', retries_left bigint not null default 0, priority task_priority references priority_ratio(id), check (policy <> 'recurring' or current_interval is not null) ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; comment on column task.policy is 'Whether the task is one-shot or recurring'; comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' 'transient failure'; comment on column task.priority is 'Policy of the given task'; comment on column task.id is 'Task Identifier'; comment on column task.type is 'References task_type table'; comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')'; create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'); comment on type task_run_status is 'Status of a given task run'; create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; comment on column task_run.id is 'Task run identifier'; comment on column task_run.task is 'References task table'; comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; create table if not exists listers ( id uuid primary key default uuid_generate_v4(), name text not null, instance_name text not null, created timestamptz not null default now(), -- auto_now_add in the model current_state jsonb not null, updated timestamptz not null ); comment on table listers is 'Lister instances known to the origin visit scheduler'; comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)'; comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)'; comment on column listers.created is 'Timestamp at which the lister was originally created'; comment on column listers.current_state is 'Known current state of this lister'; comment on column listers.updated is 'Timestamp at which the lister state was last updated'; create table if not exists listed_origins ( -- Basic information lister_id uuid not null references listers(id), url text not null, visit_type text not null, extra_loader_arguments jsonb not null, -- Whether this origin still exists or not enabled boolean not null, -- time-based information first_seen timestamptz not null default now(), last_seen timestamptz not null, -- potentially provided by the lister last_update timestamptz, primary key (lister_id, url, visit_type) ); comment on table listed_origins is 'Origins known to the origin visit scheduler'; comment on column listed_origins.lister_id is 'Lister instance which owns this origin'; comment on column listed_origins.url is 'URL of the origin listed'; comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url'; comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin'; comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.'; comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister'; comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister'; comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote'; +create type last_visit_status as enum ('successful', 'failed', 'not_found'); +comment on type last_visit_status is 'Record of the status of the last visit of an origin'; create table origin_visit_stats ( url text not null, visit_type text not null, - last_eventful timestamptz, - last_uneventful timestamptz, - last_failed timestamptz, - last_notfound timestamptz, + last_successful timestamptz, + last_visit timestamptz, + last_visit_status last_visit_status, -- visit scheduling information last_scheduled timestamptz, -- last snapshot resulting from an eventful visit last_snapshot bytea, -- position in the global queue, the "time" at which we expect the origin to have new -- objects next_visit_queue_position timestamptz, -- duration that we expect to wait between visits of this origin next_position_offset int not null default 4, primary key (url, visit_type) ); comment on table origin_visit_stats is 'Aggregated information on visits for each origin in the archive'; comment on column origin_visit_stats.url is 'Origin URL'; comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; -comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; -comment on column origin_visit_stats.last_uneventful is 'Date of the last uneventful event'; -comment on column origin_visit_stats.last_failed is 'Date of the last failed event'; -comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; +comment on column origin_visit_stats.last_successful is 'Date of the last successful visit, at which we recorded the `last_snapshot`'; +comment on column origin_visit_stats.last_visit is 'Date of the last visit overall'; +comment on column origin_visit_stats.last_visit_status is 'Status of the last visit'; comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; comment on column origin_visit_stats.next_visit_queue_position is 'Time at which some new objects are expected to be found'; comment on column origin_visit_stats.next_position_offset is 'Duration that we expect to wait between visits of this origin'; create table visit_scheduler_queue_position ( visit_type text not null, position timestamptz not null, primary key (visit_type) ); comment on table visit_scheduler_queue_position is 'Current queue position for the recurrent visit scheduler'; comment on column visit_scheduler_queue_position.visit_type is 'Visit type'; comment on column visit_scheduler_queue_position.position is 'Current position for the runner of this visit type'; create table scheduler_metrics ( lister_id uuid not null references listers(id), visit_type text not null, last_update timestamptz not null, origins_known int not null default 0, origins_enabled int not null default 0, origins_never_visited int not null default 0, origins_with_pending_changes int not null default 0, primary key (lister_id, visit_type) ); comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; comment on column scheduler_metrics.last_update is 'Last update of these metrics'; comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been successfully visited'; comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; diff --git a/swh/scheduler/sql/40-func.sql b/swh/scheduler/sql/40-func.sql index edbc2a8..3090819 100644 --- a/swh/scheduler/sql/40-func.sql +++ b/swh/scheduler/sql/40-func.sql @@ -1,401 +1,401 @@ create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task alter column retries_left drop not null, drop column id; $$; comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin -- update the default values in one go -- this is separated from the insert/select to avoid too much -- juggling update tmp_task t set current_interval = tt.default_interval, retries_left = coalesce(retries_left, tt.num_retries, 0) from task_type tt where tt.type=t.type; insert into task (type, arguments, next_run, status, current_interval, policy, retries_left, priority) select type, arguments, next_run, status, current_interval, policy, retries_left, priority from tmp_task t where not exists(select 1 from task where type = t.type and md5(arguments::text) = md5(t.arguments::text) and arguments = t.arguments and policy = t.policy and priority is not distinct from t.priority and status = t.status); return query select distinct t.* from tmp_task tt inner join task t on ( tt.type = t.type and md5(tt.arguments::text) = md5(t.arguments::text) and tt.arguments = t.arguments and tt.policy = t.policy and tt.priority is not distinct from t.priority and tt.status = t.status ); end; $$; comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; create or replace function swh_scheduler_peek_no_priority_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts and type = task_type and status = 'next_run_not_scheduled' and priority is null order by next_run limit num_tasks; $$; comment on function swh_scheduler_peek_no_priority_tasks (text, timestamptz, bigint) is 'Retrieve tasks without priority'; create or replace function swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority task_priority) returns numeric language sql stable as $$ select ceil(num_tasks_priority * (select ratio from priority_ratio where id = task_priority)) :: numeric $$; comment on function swh_scheduler_nb_priority_tasks (bigint, task_priority) is 'Given a priority task and a total number, compute the number of tasks to read'; create or replace function swh_scheduler_peek_tasks_with_priority (task_type text, ts timestamptz default now(), num_tasks_priority bigint default NULL, task_priority task_priority default 'normal') returns setof task language sql stable as $$ select * from task t where t.next_run <= ts and t.type = task_type and t.status = 'next_run_not_scheduled' and t.priority = task_priority order by t.next_run limit num_tasks_priority; $$; comment on function swh_scheduler_peek_tasks_with_priority(text, timestamptz, bigint, task_priority) is 'Retrieve tasks with a given priority'; create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from swh_scheduler_peek_no_priority_tasks(task_type, ts, num_tasks) ) next_tasks where task.id = next_tasks.id returning task.*; $$; comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint) is 'Grab (no priority) tasks ready for scheduling and change their status'; create or replace function swh_scheduler_peek_any_ready_priority_tasks ( task_type text, ts timestamptz default now(), num_tasks bigint default NULL ) returns setof task language sql stable as $$ select * from task t where t.next_run <= ts and t.type = task_type and t.status = 'next_run_not_scheduled' and t.priority is not null order by t.next_run limit num_tasks; $$; comment on function swh_scheduler_peek_any_ready_priority_tasks(text, timestamptz, bigint) is 'List tasks with any priority ready for scheduling'; create or replace function swh_scheduler_grab_any_ready_priority_tasks ( task_type text, ts timestamptz default now(), num_tasks bigint default NULL ) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from swh_scheduler_peek_any_ready_priority_tasks( task_type, ts, num_tasks ) ) next_tasks where task.id = next_tasks.id returning task.*; $$; comment on function swh_scheduler_grab_any_ready_priority_tasks (text, timestamptz, bigint) is 'Grab any priority tasks ready for scheduling and change their status'; create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ insert into task_run (task, backend_id, metadata, scheduled, status) values (task_id, backend_id, metadata, ts, 'scheduled') returning *; $$; create or replace function swh_scheduler_mktemp_task_run () returns void language sql as $$ create temporary table tmp_task_run ( like task_run excluding indexes ) on commit drop; alter table tmp_task_run drop column id, drop column status; $$; comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling'; create or replace function swh_scheduler_schedule_task_run_from_temp () returns void language plpgsql as $$ begin insert into task_run (task, backend_id, metadata, scheduled, status) select task, backend_id, metadata, scheduled, 'scheduled' from tmp_task_run; return; end; $$; create or replace function swh_scheduler_start_task_run (backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set started = ts, status = 'started', metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; create or replace function swh_scheduler_end_task_run (backend_id text, status task_run_status, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set ended = ts, status = swh_scheduler_end_task_run.status, metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; create type task_record as ( task_id bigint, task_policy task_policy, task_status task_status, task_run_id bigint, arguments jsonb, type text, backend_id text, metadata jsonb, scheduled timestamptz, started timestamptz, ended timestamptz, task_run_status task_run_status ); create or replace function swh_scheduler_task_to_archive( ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, lim bigint default 10) returns setof task_record language sql stable as $$ select t.id as task_id, t.policy as task_policy, t.status as task_status, tr.id as task_run_id, t.arguments, t.type, tr.backend_id, tr.metadata, tr.scheduled, tr.started, tr.ended, tr.status as task_run_status from task_run tr inner join task t on tr.task=t.id where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or (t.policy = 'recurring' and t.status = 'disabled')) and ((ts_after <= tr.started and tr.started < ts_before) or (tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and t.id >= last_id order by tr.task, tr.started limit lim; $$; comment on function swh_scheduler_task_to_archive (timestamptz, timestamptz, bigint, bigint) is 'Read archivable tasks function'; create or replace function swh_scheduler_delete_archived_tasks( task_ids bigint[], task_run_ids bigint[]) returns void language sql as $$ -- clean up task_run_ids delete from task_run where id in (select * from unnest(task_run_ids)); -- clean up only tasks whose associated task_run are all cleaned up. -- Remaining tasks will stay there and will be cleaned up when -- remaining data have been indexed delete from task where id in (select t.id from task t left outer join task_run tr on t.id=tr.task where t.id in (select * from unnest(task_ids)) and tr.task is null); $$; comment on function swh_scheduler_delete_archived_tasks(bigint[], bigint[]) is 'Clean up archived tasks function'; create or replace function swh_scheduler_update_task_on_task_end () returns trigger language plpgsql as $$ declare cur_task task%rowtype; cur_task_type task_type%rowtype; adjustment_factor float; new_interval interval; begin select * from task where id = new.task into cur_task; select * from task_type where type = cur_task.type into cur_task_type; case when new.status = 'permfailed' then update task set status = 'disabled' where id = cur_task.id; when new.status in ('eventful', 'uneventful') then case when cur_task.policy = 'oneshot' then update task set status = 'completed' where id = cur_task.id; when cur_task.policy = 'recurring' then if new.status = 'uneventful' then adjustment_factor := 1/cur_task_type.backoff_factor; else adjustment_factor := 1/cur_task_type.backoff_factor; end if; new_interval := greatest( cur_task_type.min_interval, least( cur_task_type.max_interval, adjustment_factor * cur_task.current_interval)); update task set status = 'next_run_not_scheduled', next_run = new.ended + new_interval, current_interval = new_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; else -- new.status in 'failed', 'lost' if cur_task.retries_left > 0 then update task set status = 'next_run_not_scheduled', next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'), retries_left = cur_task.retries_left - 1 where id = cur_task.id; else -- no retries left case when cur_task.policy = 'oneshot' then update task set status = 'disabled' where id = cur_task.id; when cur_task.policy = 'recurring' then update task set status = 'next_run_not_scheduled', next_run = new.ended + cur_task.current_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; end if; -- retries end case; return null; end; $$; create trigger update_task_on_task_end after update of status on task_run for each row when (new.status NOT IN ('scheduled', 'started')) execute procedure swh_scheduler_update_task_on_task_end (); create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) returns setof scheduler_metrics language sql as $$ insert into scheduler_metrics ( lister_id, visit_type, last_update, origins_known, origins_enabled, origins_never_visited, origins_with_pending_changes ) select lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, count(*) as origins_known, count(*) filter (where enabled) as origins_enabled, count(*) filter (where enabled and last_snapshot is NULL ) as origins_never_visited, count(*) filter (where - enabled and lo.last_update > greatest(ovs.last_eventful, ovs.last_uneventful) + enabled and lo.last_update > last_successful ) as origins_with_pending_changes from listed_origins lo left join origin_visit_stats ovs using (url, visit_type) where -- update only for the requested lister update_metrics.lister_id = lo.lister_id -- or for all listers if the function argument is null or update_metrics.lister_id is null group by (lister_id, visit_type) on conflict (lister_id, visit_type) do update set last_update = EXCLUDED.last_update, origins_known = EXCLUDED.origins_known, origins_enabled = EXCLUDED.origins_enabled, origins_never_visited = EXCLUDED.origins_never_visited, origins_with_pending_changes = EXCLUDED.origins_with_pending_changes returning * $$; comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py index 03a1ac5..1c06562 100644 --- a/swh/scheduler/tests/test_journal_client.py +++ b/swh/scheduler/tests/test_journal_client.py @@ -1,925 +1,895 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from datetime import timedelta import functools from itertools import permutations from typing import List from unittest.mock import Mock import attr import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.journal_client import ( from_position_offset_to_days, max_date, next_visit_queue_position, process_journal_objects, ) -from swh.scheduler.model import ListedOrigin, OriginVisitStats +from swh.scheduler.model import LastVisitStatus, ListedOrigin, OriginVisitStats from swh.scheduler.utils import utcnow def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) with pytest.raises(AssertionError, match="Got unexpected origin_visit"): process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) with pytest.raises(AssertionError, match="Expected origin_visit_status"): process_fn({}) ONE_DAY = datetime.timedelta(days=1) ONE_YEAR = datetime.timedelta(days=366) DATE3 = utcnow() DATE2 = DATE3 - ONE_DAY DATE1 = DATE2 - ONE_DAY assert DATE1 < DATE2 < DATE3 @pytest.mark.parametrize( "dates,expected_max_date", [ ((DATE1,), DATE1), ((None, DATE2), DATE2), ((DATE1, None), DATE1), ((DATE1, DATE2), DATE2), ((DATE2, DATE1), DATE2), ((DATE1, DATE2, DATE3), DATE3), ((None, DATE2, DATE3), DATE3), ((None, None, DATE3), DATE3), ((DATE1, None, DATE3), DATE3), ], ) def test_max_date(dates, expected_max_date): assert max_date(*dates) == expected_max_date def test_max_date_raise(): with pytest.raises(ValueError, match="valid datetime"): max_date() with pytest.raises(ValueError, match="valid datetime"): max_date(None) with pytest.raises(ValueError, match="valid datetime"): max_date(None, None) def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): """Only final statuses (full, partial) are important, the rest remain ignored. """ # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) visit_statuses = [ { "origin": "foo", "visit": 1, "status": "created", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "ongoing", "date": utcnow(), "type": "svn", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # All messages have been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def test_journal_client_ignore_missing_type(swh_scheduler): """Ignore statuses with missing type key""" # Trace method calls on the swh_scheduler swh_scheduler = Mock(wraps=swh_scheduler) date = utcnow() snapshot = hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd") visit_statuses = [ { "origin": "foo", "visit": 1, "status": "full", "date": date, "snapshot": snapshot, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) # The message has been ignored: no stats have been upserted swh_scheduler.origin_visit_stats_upsert.assert_not_called() def assert_visit_stats_ok( - actual_visit_stats: List[OriginVisitStats], - expected_visit_stats: List[OriginVisitStats], + actual_visit_stats: OriginVisitStats, + expected_visit_stats: OriginVisitStats, ignore_fields: List[str] = ["next_visit_queue_position"], ): """Utility test function to ensure visits stats read from the backend are in the right shape. The comparison on the next_visit_queue_position will be dealt with in dedicated tests so it's not tested in tests that are calling this function. """ - assert len(actual_visit_stats) == len(expected_visit_stats) - fields = attr.fields_dict(OriginVisitStats) defaults = {field: fields[field].default for field in ignore_fields} - for visit_stats in actual_visit_stats: - visit_stats = attr.evolve(visit_stats, **defaults) - assert visit_stats in expected_visit_stats + actual_visit_stats = attr.evolve(actual_visit_stats, **defaults) + assert actual_visit_stats == expected_visit_stats -def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): +def test_journal_client_origin_visit_status_from_journal_last_not_found(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "not_found", "date": DATE1, "type": "git", "snapshot": None, } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=visit_status["date"], - last_snapshot=None, - next_position_offset=5, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_visit=visit_status["date"], + last_visit_status=LastVisitStatus.not_found, + next_position_offset=4, + ), ) visit_statuses = [ { "origin": "foo", "visit": 3, "status": "not_found", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "foo", "visit": 4, "status": "not_found", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=DATE3, - last_snapshot=None, - next_position_offset=7, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_visit=DATE3, + last_visit_status=LastVisitStatus.not_found, + next_position_offset=6, + ), ) def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): visit_statuses = [ { "origin": "foo", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 2, "status": "full", "date": DATE2, "type": "git", "snapshot": None, }, { "origin": "bar", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="bar", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=DATE3, - last_notfound=None, - last_snapshot=None, - next_position_offset=7, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="bar", + visit_type="git", + last_visit=DATE3, + last_visit_status=LastVisitStatus.failed, + next_position_offset=6, + ), ) def test_journal_client_origin_visit_status_from_journal_last_failed2(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 2, "status": "failed", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "bar", "visit": 3, "status": "failed", "date": DATE2, "type": "git", "snapshot": None, }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="bar", - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=DATE2, - last_notfound=None, - last_snapshot=None, - next_position_offset=6, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="bar", + visit_type="git", + last_visit=DATE2, + last_visit_status=LastVisitStatus.failed, + next_position_offset=5, + ), ) -def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): +def test_journal_client_origin_visit_status_from_journal_last_successful(swh_scheduler): visit_statuses = [ { "origin": "bar", "visit": 1, "status": "partial", "date": utcnow(), "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), }, { "origin": "foo", "visit": 2, "status": "partial", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), }, { "origin": "foo", "visit": 3, "status": "full", "date": DATE3, "type": "git", "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), }, ] process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE3, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), - next_position_offset=0, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_successful=DATE3, + last_visit=DATE3, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + next_position_offset=0, + ), ) def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE3 + ONE_DAY, "type": "git", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), } # Let's insert some visit stats with some previous visit information swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visit_status["origin"], visit_type=visit_status["type"], - last_eventful=DATE1, - last_uneventful=DATE3, - last_failed=DATE2, - last_notfound=DATE1, + last_successful=DATE2, + last_visit=DATE3, + last_visit_status=LastVisitStatus.failed, last_snapshot=visit_status["snapshot"], next_visit_queue_position=None, next_position_offset=4, ) ] ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url=visit_status["origin"], - visit_type=visit_status["type"], - last_eventful=DATE1, - last_uneventful=visit_status["date"], # most recent date but uneventful - last_failed=DATE2, - last_notfound=DATE1, - last_snapshot=visit_status["snapshot"], - next_position_offset=5, # uneventful so visit less often - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_visit=DATE3 + ONE_DAY, + last_successful=DATE3 + ONE_DAY, + last_visit_status=LastVisitStatus.successful, + last_snapshot=visit_status["snapshot"], + next_visit_queue_position=None, + next_position_offset=5, + ), ) VISIT_STATUSES = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "foo", "type": "git", "visit": 1, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "foo", "type": "git", "visit": 2, "status": "created", "snapshot": None, }, { "origin": "foo", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) ) def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + visit_stats = actual_origin_visit_stats[0] assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1 + ONE_DAY, - last_uneventful=DATE1 + 3 * ONE_DAY, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - next_position_offset=5, # uneventful, visit origin less often in future - ) - ], + visit_stats, + OriginVisitStats( + url="foo", + visit_type="git", + last_successful=DATE1 + 3 * ONE_DAY, + last_visit=DATE1 + 3 * ONE_DAY, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ), + ignore_fields=["next_visit_queue_position", "next_position_offset"], ) + # We ignore out of order messages, so the next_position_offset isn't exact + # depending on the permutation. What matters is consistency of the final + # dates (last_visit and last_successful). + assert 4 <= visit_stats.next_position_offset <= 5 + VISIT_STATUSES_1 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "partial", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_1, len(VISIT_STATUSES_1)) ) def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_visit_stats = swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) + visit_stats = actual_visit_stats[0] assert_visit_stats_ok( - actual_visit_stats, - [ - OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1 + 2 * ONE_DAY, - last_uneventful=DATE1 + 3 * ONE_DAY, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - ], - ignore_fields=[ - "next_visit_queue_position", - "next_position_offset", # depending on the permutations, the value differs - ], + visit_stats, + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_successful=DATE1 + 3 * ONE_DAY, + last_visit=DATE1 + 3 * ONE_DAY, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ), + ignore_fields=["next_visit_queue_position", "next_position_offset"], ) + # We ignore out of order messages, so the next_position_offset isn't exact + # depending on the permutation. What matters is consistency of the final + # dates (last_visit and last_successful). + assert 2 <= visit_stats.next_position_offset <= 5 + VISIT_STATUSES_2 = [ {**ovs, "date": DATE1 + n * ONE_DAY} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("0000000000000000000000000000000000000000"), }, { "origin": "cavabarder", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("1111111111111111111111111111111111111111"), }, { "origin": "iciaussi", "type": "hg", "visit": 1, "status": "full", "snapshot": hash_to_bytes("2222222222222222222222222222222222222222"), }, { "origin": "iciaussi", "type": "hg", "visit": 2, "status": "full", "snapshot": hash_to_bytes("3333333333333333333333333333333333333333"), }, { "origin": "cavabarder", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("4444444444444444444444444444444444444444"), }, { "origin": "cavabarder", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("5555555555555555555555555555555555555555"), }, { "origin": "iciaussi", "type": "git", "visit": 1, "status": "full", "snapshot": hash_to_bytes("6666666666666666666666666666666666666666"), }, { "origin": "iciaussi", "type": "git", "visit": 2, "status": "full", "snapshot": hash_to_bytes("7777777777777777777777777777777777777777"), }, ] ) ] def test_journal_client_origin_visit_status_after_grab_next_visits( swh_scheduler, stored_lister ): """Ensure OriginVisitStat entries created in the db as a result of calling grab_next_visits() do not mess the OriginVisitStats upsert mechanism. """ listed_origins = [ ListedOrigin(lister_id=stored_lister.id, url=url, visit_type=visit_type) for (url, visit_type) in set((v["origin"], v["type"]) for v in VISIT_STATUSES_2) ] swh_scheduler.record_listed_origins(listed_origins) before = utcnow() swh_scheduler.grab_next_visits( visit_type="git", count=10, policy="oldest_scheduled_first" ) after = utcnow() assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [] assert swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] is not None process_journal_objects( {"origin_visit_status": VISIT_STATUSES_2}, scheduler=swh_scheduler ) for url in ("cavabarder", "iciaussi"): ovs = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] assert before <= ovs.last_scheduled <= after ovs = swh_scheduler.origin_visit_stats_get([(url, "hg")])[0] assert ovs.last_scheduled is None ovs = swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] - assert ovs.last_eventful == DATE1 + 5 * ONE_DAY - assert ovs.last_uneventful is None - assert ovs.last_failed is None - assert ovs.last_notfound is None + assert ovs.last_successful == DATE1 + 5 * ONE_DAY + assert ovs.last_visit == DATE1 + 5 * ONE_DAY + assert ovs.last_visit_status == LastVisitStatus.successful assert ovs.last_snapshot == hash_to_bytes( "5555555555555555555555555555555555555555" ) def test_journal_client_origin_visit_status_duplicated_messages(swh_scheduler): """A duplicated message must be ignored """ visit_status = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=None, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_successful=DATE1, + last_visit=DATE1, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ), ) def test_journal_client_origin_visit_status_several_upsert(swh_scheduler): """An old message updates old information """ visit_status1 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE1, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } visit_status2 = { "origin": "foo", "visit": 1, "status": "full", "date": DATE2, "type": "git", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), } process_journal_objects( {"origin_visit_status": [visit_status2]}, scheduler=swh_scheduler ) process_journal_objects( {"origin_visit_status": [visit_status1]}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="foo", - visit_type="git", - last_eventful=DATE1, - last_uneventful=DATE2, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - next_position_offset=5, - ) - ], + actual_origin_visit_stats[0], + OriginVisitStats( + url="foo", + visit_type="git", + last_successful=DATE2, + last_visit=DATE2, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + next_position_offset=4, + ), ) VISIT_STATUSES_SAME_SNAPSHOT = [ {**ovs, "date": DATE1 + n * ONE_YEAR} for n, ovs in enumerate( [ { "origin": "cavabarder", "type": "hg", "visit": 3, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, { "origin": "cavabarder", "type": "hg", "visit": 4, "status": "full", "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), }, ] ) ] @pytest.mark.parametrize( "visit_statuses", permutations(VISIT_STATUSES_SAME_SNAPSHOT, len(VISIT_STATUSES_SAME_SNAPSHOT)), ) def test_journal_client_origin_visit_statuses_same_snapshot_permutation( visit_statuses, swh_scheduler ): """Ensure out of order topic subscription ends up in the same final state """ process_journal_objects( {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler ) actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( [("cavabarder", "hg")] ) + visit_stats = actual_origin_visit_stats[0] assert_visit_stats_ok( - actual_origin_visit_stats, - [ - OriginVisitStats( - url="cavabarder", - visit_type="hg", - last_eventful=DATE1, - last_uneventful=DATE1 + 2 * ONE_YEAR, - last_failed=None, - last_notfound=None, - last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), - next_position_offset=6, # 2 uneventful visits, whatever the permutation - ) - ], - ) + visit_stats, + OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_successful=DATE1 + 2 * ONE_YEAR, + last_visit=DATE1 + 2 * ONE_YEAR, + last_visit_status=LastVisitStatus.successful, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ), + ignore_fields=["next_visit_queue_position", "next_position_offset"], + ) + + # We ignore out of order messages, so the next_position_offset isn't exact + # depending on the permutation. What matters is consistency of the final + # dates (last_visit and last_successful). + assert 4 <= visit_stats.next_position_offset <= 6 @pytest.mark.parametrize( "position_offset, interval", [ (0, 1), (1, 1), (2, 2), (3, 2), (4, 2), (5, 4), (6, 16), (7, 64), (8, 256), (9, 1024), (10, 4096), ], ) def test_journal_client_from_position_offset_to_days(position_offset, interval): assert from_position_offset_to_days(position_offset) == interval def test_journal_client_from_position_offset_to_days_only_positive_input(): with pytest.raises(AssertionError): from_position_offset_to_days(-1) @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.01, 1), (-0.01, 5), (0.1, 8), (-0.1, 10),] ) def test_next_visit_queue_position(mocker, fudge_factor, next_position_offset): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() mock_now = mocker.patch("swh.scheduler.journal_client.utcnow") mock_now.return_value = date_now actual_position = next_visit_queue_position( {}, {"next_position_offset": next_position_offset, "visit_type": "svn",} ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_now.called assert mock_random.called @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.02, 2), (-0.02, 3), (0, 7), (-0.09, 9),] ) def test_next_visit_queue_position_with_state( mocker, fudge_factor, next_position_offset ): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() actual_position = next_visit_queue_position( {"git": date_now}, {"next_position_offset": next_position_offset, "visit_type": "git",}, ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_random.called @pytest.mark.parametrize( "fudge_factor,next_position_offset", [(0.03, 3), (-0.03, 4), (0.08, 7), (-0.08, 9),] ) def test_next_visit_queue_position_with_next_visit_queue( mocker, fudge_factor, next_position_offset ): mock_random = mocker.patch("swh.scheduler.journal_client.random.uniform") mock_random.return_value = fudge_factor date_now = utcnow() actual_position = next_visit_queue_position( {}, { "next_position_offset": next_position_offset, "visit_type": "hg", "next_visit_queue_position": date_now, }, ) assert actual_position == date_now + timedelta( days=from_position_offset_to_days(next_position_offset) * (1 + fudge_factor) ) assert mock_random.called diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 51bc5f3..096d22b 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,1505 +1,1450 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import copy import datetime from datetime import timedelta import inspect import random from typing import Any, Dict, List, Optional, Tuple import uuid import attr from psycopg2.extras import execute_values import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface -from swh.scheduler.model import ListedOrigin, OriginVisitStats, SchedulerMetrics +from swh.scheduler.model import ( + LastVisitStatus, + ListedOrigin, + OriginVisitStats, + SchedulerMetrics, +) from swh.scheduler.utils import utcnow from .common import ( LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template, tasks_with_priority_from_template, ) ONEDAY = timedelta(days=1) NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20} def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: return (m.lister_id, m.visit_type) def assert_metrics_equal(left, right): assert sorted(left, key=metrics_sort_key) == sorted(right, key=metrics_sort_key) class TestScheduler: def test_interface(self, swh_scheduler): """Checks all methods of SchedulerInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (SchedulerInterface,), {})() assert "create_task_type" in dir(interface) missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) try: concrete_meth = getattr(swh_scheduler, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) tt2 = TASK_TYPES["hg"] swh_scheduler.create_task_type(tt2) assert tt == swh_scheduler.get_task_type(tt["type"]) assert tt2 == swh_scheduler.get_task_type(tt2["type"]) def test_create_task_type_idempotence(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES["git"], TASK_TYPES["hg"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt2) actual_task_types = swh_scheduler.get_task_types() assert tt in actual_task_types assert tt2 in actual_task_types def test_create_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) num_git = 100 tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), num_git) tasks_2 = tasks_from_template( TEMPLATES["hg"], utcnow(), num_priorities=NUM_PRIORITY_TASKS ) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = swh_scheduler.create_tasks(tasks + tasks) set_ret1 = set([t["id"] for t in ret1]) # creating the same set result in the same ids ret = swh_scheduler.create_tasks(tasks) set_ret = set([t["id"] for t in ret]) # Idempotence results assert set_ret == set_ret1 assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task["type"].split("-")[-1]] assert task["id"] not in ids assert task["status"] == "next_run_not_scheduled" assert task["current_interval"] == task_type["default_interval"] assert task["policy"] == orig_task.get("policy", "recurring") priority = task.get("priority") actual_priorities[priority] += 1 assert task["retries_left"] == (task_type["num_retries"] or 0) ids.add(task["id"]) del task["id"] del task["status"] del task["current_interval"] del task["retries_left"] if "policy" not in orig_task: del task["policy"] if "priority" not in orig_task: del task["priority"] assert task == orig_task expected_priorities = NUM_PRIORITY_TASKS.copy() expected_priorities[None] += num_git assert dict(actual_priorities) == expected_priorities def test_peek_ready_tasks_no_priority(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] tasks = tasks_from_template(TEMPLATES["git"], t, 100) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): assert ready_tasks[i]["next_run"] <= ready_tasks[i + 1]["next_run"] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks) // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks(task_type, num_tasks=limit) assert len(ready_tasks_limited) == limit assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit - 1]["next_run"] ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts ) for ready_task in ready_tasks_timestamped: assert ready_task["next_run"] <= max_ts # Make sure we get proper behavior for the first ready tasks assert ready_tasks[: len(ready_tasks_timestamped)] == ready_tasks_timestamped # Limit by both ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit // 3 ) assert len(ready_tasks_both) <= limit // 3 for ready_task in ready_tasks_both: assert ready_task["next_run"] <= max_ts assert ready_task in ready_tasks[: limit // 3] def test_peek_ready_tasks_returns_only_no_priority_tasks(self, swh_scheduler): """Peek ready tasks only return standard tasks (no priority)""" self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["git"], t, num_priorities=NUM_PRIORITY_TASKS, ) count_priority = 0 for task in tasks: count_priority += 0 if task.get("priority") is None else 1 assert count_priority > 0, "Some created tasks should have some priority" random.shuffle(tasks) swh_scheduler.create_tasks(tasks) # take all available no priority tasks ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) - count_priority # No read task should have any priority for task in ready_tasks: assert task.get("priority") is None def test_grab_ready_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["git"], t, num_priorities=NUM_PRIORITY_TASKS ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) first_ready_tasks = swh_scheduler.peek_ready_tasks(task_type, num_tasks=50) grabbed_tasks = swh_scheduler.grab_ready_tasks(task_type, num_tasks=50) first_ready_tasks.sort(key=lambda task: task["arguments"]["args"][0]) grabbed_tasks.sort(key=lambda task: task["arguments"]["args"][0]) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed priority = grabbed["priority"] assert priority == peeked["priority"] assert priority is None def test_grab_ready_priority_tasks(self, swh_scheduler): """check the grab and peek priority tasks endpoint behave as expected""" self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] num_tasks = 100 # Create tasks with and without priorities tasks0 = tasks_with_priority_from_template( TEMPLATES["git"], t, num_tasks, "high", ) tasks1 = tasks_with_priority_from_template( TEMPLATES["hg"], t, num_tasks, "low", ) tasks2 = tasks_with_priority_from_template( TEMPLATES["hg"], t, num_tasks, "normal", ) tasks = tasks0 + tasks1 + tasks2 random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_priority_tasks(task_type, num_tasks=50) grabbed_tasks = swh_scheduler.grab_ready_priority_tasks(task_type, num_tasks=50) ready_tasks.sort(key=lambda task: task["arguments"]["args"][0]) grabbed_tasks.sort(key=lambda task: task["arguments"]["args"][0]) for peeked, grabbed in zip(ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed assert peeked["priority"] == grabbed["priority"] assert peeked["priority"] is not None def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = sorted(tasks[:length], key=lambda x: x["id"]) tasks[:length] = [] ret = swh_scheduler.get_tasks(task["id"] for task in cur_tasks) # result is not guaranteed to be sorted ret.sort(key=lambda x: x["id"]) assert ret == cur_tasks def test_search_tasks(self, swh_scheduler): def make_real_dicts(lst): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in lst] self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) assert make_real_dicts(swh_scheduler.search_tasks()) == make_real_dicts(tasks) def assert_filtered_task_ok( self, task: Dict[str, Any], after: datetime.datetime, before: datetime.datetime ) -> None: """Ensure filtered tasks have the right expected properties (within the range, recurring disabled, etc..) """ started = task["started"] date = started if started is not None else task["scheduled"] assert after <= date and date <= before if task["task_policy"] == "oneshot": assert task["task_status"] in ["completed", "disabled"] if task["task_policy"] == "recurring": assert task["task_status"] in ["disabled"] def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") _tasks.append(t) # Randomly update task's status per policy status_per_policy = {"recurring": 0, "oneshot": 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] "recurring": [ (1, "disabled"), (0, "completed"), (0, "next_run_not_scheduled"), ], "oneshot": [ (0, "next_run_not_scheduled"), (1, "disabled"), (1, "completed"), ], } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task["policy"] _task_ids[policy].append(task["id"]) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task["id"]) swh_scheduler.disable_tasks(tasks_to_update["recurring"]) # hack: change the status to something else than completed/disabled swh_scheduler.set_status_tasks( _task_ids["oneshot"], status="next_run_not_scheduled" ) # complete the tasks to update swh_scheduler.set_status_tasks(tasks_to_update["oneshot"], status="completed") total_tasks_filtered = ( status_per_policy["recurring"] + status_per_policy["oneshot"] ) # no pagination scenario # retrieve tasks to archive after = _time - ONEDAY after_ts = after.strftime("%Y-%m-%d") before = utcnow() + ONEDAY before_ts = before.strftime("%Y-%m-%d") tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=total_tasks ) tasks_to_archive = tasks_result["tasks"] assert len(tasks_to_archive) == total_tasks_filtered assert tasks_result.get("next_page_token") is None actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in tasks_to_archive: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy # pagination scenario nb_tasks = 3 tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) == nb_tasks next_page_token = tasks_result["next_page_token"] assert next_page_token is not None all_tasks = tasks_to_archive2 while next_page_token is not None: # Retrieve paginated results tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks, page_token=next_page_token, ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) <= nb_tasks all_tasks.extend(tasks_to_archive2) next_page_token = tasks_result.get("next_page_token") actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in all_tasks: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy def test_delete_archived_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") c = random.randint(0, 100) if c <= percent: _tasks.append({"task_id": t["task"], "task_run_id": t["id"]}) swh_scheduler.delete_archived_tasks(_tasks) all_tasks = [task["id"] for task in swh_scheduler.search_tasks()] tasks_count = len(all_tasks) tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) assert tasks_count == total_tasks - len(_tasks) assert tasks_run_count == total_tasks - len(_tasks) def test_get_task_runs_no_task(self, swh_scheduler): """No task exist in the scheduler's db, get_task_runs() should always return an empty list. """ assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_no_task_executed(self, swh_scheduler): """No task has been executed yet, get_task_runs() should always return an empty list. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) swh_scheduler.create_tasks(recurring + oneshots) assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_with_scheduled(self, swh_scheduler): """Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) assert not swh_scheduler.get_task_runs(task_ids=[total_tasks + 1]) btask = backend_tasks[0] runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 run = runs[0] assert subdict(run, excl=("id",)) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": None, "ended": None, "metadata": None, "status": "scheduled", } runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks], limit=2 ) assert len(runs) == 2 runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks] ) assert len(runs) == total_tasks keys = ("task", "backend_id", "scheduled") assert ( sorted([subdict(x, keys) for x in runs], key=lambda x: x["task"]) == backend_tasks ) def test_get_task_runs_with_executed(self, swh_scheduler): """Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() swh_scheduler.start_task_run( btask["backend_id"], metadata={"something": "stupid"}, timestamp=ts ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": None, "metadata": {"something": "stupid"}, "status": "started", } ts2 = utcnow() swh_scheduler.end_task_run( btask["backend_id"], metadata={"other": "stuff"}, timestamp=ts2, status="eventful", ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": ts2, "metadata": {"something": "stupid", "other": "stuff"}, "status": "eventful", } def test_get_or_create_lister(self, swh_scheduler): db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) for lister, lister_args in zip(db_listers, LISTERS): assert lister.name == lister_args["name"] assert lister.instance_name == lister_args.get("instance_name", "") lister_get_again = swh_scheduler.get_or_create_lister( lister.name, lister.instance_name ) assert lister == lister_get_again def test_get_lister(self, swh_scheduler): for lister_args in LISTERS: assert swh_scheduler.get_lister(**lister_args) is None db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) for lister, lister_args in zip(db_listers, LISTERS): lister_get_again = swh_scheduler.get_lister( lister.name, lister.instance_name ) assert lister == lister_get_again def test_get_listers(self, swh_scheduler): assert swh_scheduler.get_listers() == [] db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) assert swh_scheduler.get_listers() == db_listers def test_update_lister(self, swh_scheduler, stored_lister): lister = attr.evolve(stored_lister, current_state={"updated": "now"}) updated_lister = swh_scheduler.update_lister(lister) assert updated_lister.updated > lister.updated assert updated_lister == attr.evolve(lister, updated=updated_lister.updated) def test_update_lister_stale(self, swh_scheduler, stored_lister): swh_scheduler.update_lister(stored_lister) with pytest.raises(StaleData) as exc: swh_scheduler.update_lister(stored_lister) assert "state not updated" in exc.value.args[0] def test_record_listed_origins(self, swh_scheduler, listed_origins): ret = swh_scheduler.record_listed_origins(listed_origins) assert set(returned.url for returned in ret) == set( origin.url for origin in listed_origins ) assert all(origin.first_seen == origin.last_seen for origin in ret) def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins): # First, insert `cutoff` origins cutoff = 100 assert cutoff < len(listed_origins) ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff]) assert len(ret) == cutoff # Then, insert all origins, including the `cutoff` first. ret = swh_scheduler.record_listed_origins(listed_origins) assert len(ret) == len(listed_origins) # Two different "first seen" values assert len(set(origin.first_seen for origin in ret)) == 2 # But a single "last seen" value assert len(set(origin.last_seen for origin in ret)) == 1 def test_get_listed_origins_exact(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) for i, origin in enumerate(listed_origins): ret = swh_scheduler.get_listed_origins( lister_id=origin.lister_id, url=origin.url ) assert ret.next_page_token is None assert len(ret.results) == 1 assert ret.results[0].lister_id == origin.lister_id assert ret.results[0].url == origin.url @pytest.mark.parametrize("num_origins,limit", [(20, 6), (5, 42), (20, 20)]) def test_get_listed_origins_limit( self, swh_scheduler, listed_origins, num_origins, limit ) -> None: added_origins = sorted( listed_origins[:num_origins], key=lambda o: (o.lister_id, o.url) ) swh_scheduler.record_listed_origins(added_origins) returned_origins: List[ListedOrigin] = [] call_count = 0 next_page_token: Optional[ListedOriginPageToken] = None while True: call_count += 1 ret = swh_scheduler.get_listed_origins( lister_id=listed_origins[0].lister_id, limit=limit, page_token=next_page_token, ) returned_origins.extend(ret.results) next_page_token = ret.next_page_token if next_page_token is None: break assert call_count == (num_origins // limit) + 1 assert len(returned_origins) == num_origins assert [(origin.lister_id, origin.url) for origin in returned_origins] == [ (origin.lister_id, origin.url) for origin in added_origins ] def test_get_listed_origins_all(self, swh_scheduler, listed_origins) -> None: swh_scheduler.record_listed_origins(listed_origins) ret = swh_scheduler.get_listed_origins(limit=len(listed_origins) + 1) assert ret.next_page_token is None assert len(ret.results) == len(listed_origins) def _grab_next_visits_setup(self, swh_scheduler, listed_origins_by_type): """Basic origins setup for scheduling policy tests""" visit_type = next(iter(listed_origins_by_type)) origins = listed_origins_by_type[visit_type][:100] assert len(origins) > 0 recorded_origins = swh_scheduler.record_listed_origins(origins) return visit_type, recorded_origins def _check_grab_next_visit_basic( self, swh_scheduler, visit_type, policy, expected, **kwargs ): """Calls grab_next_visits with the passed policy, and check that: - all the origins returned are the expected ones (in the same order) - no extra origins are returned - the last_scheduled field has been set properly. Pass the extra keyword arguments to the calls to grab_next_visits. Returns a timestamp greater than all `last_scheduled` values for the grabbed visits. """ assert len(expected) != 0 before = utcnow() ret = swh_scheduler.grab_next_visits( visit_type=visit_type, # Request one more than expected to check that no extra origin is returned count=len(expected) + 1, policy=policy, **kwargs, ) after = utcnow() assert ret == expected visit_stats_list = swh_scheduler.origin_visit_stats_get( [(origin.url, origin.visit_type) for origin in expected] ) assert len(visit_stats_list) == len(expected) for visit_stats in visit_stats_list: # Check that last_scheduled got updated assert before <= visit_stats.last_scheduled <= after # They should not be scheduled again ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy=policy, **kwargs ) assert ret == [], "grab_next_visits returned already-scheduled origins" return after def _check_grab_next_visit( self, swh_scheduler, visit_type, policy, expected, **kwargs ): """Run the same check as _check_grab_next_visit_basic, but also checks the origin visits have been marked as scheduled, and are only re-scheduled a week later """ after = self._check_grab_next_visit_basic( swh_scheduler, visit_type, policy, expected, **kwargs ) # But a week, later, they should ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy=policy, timestamp=after + timedelta(days=7), ) # We need to sort them because their 'last_scheduled' field is updated to # exactly the same value, so the order is not deterministic assert sorted(ret) == sorted( expected ), "grab_next_visits didn't reschedule visits after a week" def _prepare_oldest_scheduled_first_origins( self, swh_scheduler, listed_origins_by_type ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Give all origins but one a last_scheduled date base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=None, - last_eventful=None, - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=None, + last_visit=None, last_scheduled=base_date - timedelta(seconds=i), ) for i, origin in enumerate(origins[1:]) ] swh_scheduler.origin_visit_stats_upsert(visit_stats) # We expect to retrieve the origin with a NULL last_scheduled # as well as those with the oldest values (i.e. the last ones), in order. expected = [origins[0]] + origins[1:][::-1] return visit_type, origins, expected def test_grab_next_visits_oldest_scheduled_first( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins, expected = self._prepare_oldest_scheduled_first_origins( swh_scheduler, listed_origins_by_type ) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="oldest_scheduled_first", expected=expected, ) - @pytest.mark.parametrize("which_cooldown", ("scheduled", "failed", "notfound")) + @pytest.mark.parametrize("which_cooldown", ("scheduled", "failed", "not_found")) @pytest.mark.parametrize("cooldown", (7, 15)) def test_grab_next_visits_cooldowns( self, swh_scheduler, listed_origins_by_type, which_cooldown, cooldown, ): visit_type, origins, expected = self._prepare_oldest_scheduled_first_origins( swh_scheduler, listed_origins_by_type ) after = self._check_grab_next_visit_basic( swh_scheduler, visit_type=visit_type, policy="oldest_scheduled_first", expected=expected, ) - # Mark all the visits as `{which_cooldown}` (scheduled, failed or notfound) on - # the `after` timestamp - ovs_args = {"last_failed": None, "last_notfound": None, "last_scheduled": None} - ovs_args[f"last_{which_cooldown}"] = after + # Mark all the visits as scheduled, failed or notfound on the `after` timestamp + ovs_args = { + "last_visit": None, + "last_visit_status": None, + "last_scheduled": None, + } + if which_cooldown == "scheduled": + ovs_args["last_scheduled"] = after + else: + ovs_args["last_visit"] = after + ovs_args["last_visit_status"] = LastVisitStatus(which_cooldown) visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=None, - last_eventful=None, - last_uneventful=None, + last_successful=None, **ovs_args, ) for i, origin in enumerate(origins) ] swh_scheduler.origin_visit_stats_upsert(visit_stats) cooldown_td = timedelta(days=cooldown) cooldown_args = { "scheduled_cooldown": None, "failed_cooldown": None, - "notfound_cooldown": None, + "not_found_cooldown": None, } cooldown_args[f"{which_cooldown}_cooldown"] = cooldown_td ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", timestamp=after + cooldown_td - timedelta(seconds=1), **cooldown_args, ) assert ret == [], f"{which_cooldown}_cooldown ignored" ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", timestamp=after + cooldown_td + timedelta(seconds=1), **cooldown_args, ) assert sorted(ret) == sorted( expected ), "grab_next_visits didn't reschedule visits after the configured cooldown" def test_grab_next_visits_never_visited_oldest_update_first( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) # We expect to retrieve origins with the oldest update date, that is # origins at the end of our updated_origins list. expected_origins = sorted(updated_origins, key=lambda o: o.last_update) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="never_visited_oldest_update_first", expected=expected_origins, ) def test_grab_next_visits_already_visited_order_by_lag( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) # Update the visit stats with a known visit at a controlled date for # half the origins. Pick the date in the middle of the # updated_origins' `last_update` range visit_date = updated_origins[len(updated_origins) // 2].last_update visited_origins = updated_origins[::2] visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - last_eventful=visit_date, - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=visit_date, + last_visit=visit_date, ) for origin in visited_origins ] swh_scheduler.origin_visit_stats_upsert(visit_stats) # We expect to retrieve visited origins with the largest lag, but only # those which haven't been visited since their last update expected_origins = sorted( [origin for origin in visited_origins if origin.last_update > visit_date], key=lambda o: visit_date - o.last_update, ) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="already_visited_order_by_lag", expected=expected_origins, ) def test_grab_next_visits_underflow(self, swh_scheduler, listed_origins_by_type): """Check that grab_next_visits works when there not enough origins in the database""" visit_type = next(iter(listed_origins_by_type)) # Only add 5 origins to the database origins = listed_origins_by_type[visit_type][:5] assert origins swh_scheduler.record_listed_origins(origins) ret = swh_scheduler.grab_next_visits( visit_type, len(origins) + 2, policy="oldest_scheduled_first" ) assert len(ret) == 5 def test_grab_next_visits_no_last_update_nor_visit_stats( self, swh_scheduler, listed_origins_by_type ): """grab_next_visits should retrieve tasks without last update (nor visit stats) """ visit_type = next(iter(listed_origins_by_type)) origins = [] for origin in listed_origins_by_type[visit_type]: origins.append( attr.evolve(origin, last_update=None) ) # void the last update so we are in the relevant context assert len(origins) > 0 swh_scheduler.record_listed_origins(origins) # Initially, we have no global queue position actual_state = swh_scheduler.visit_scheduler_queue_position_get() assert actual_state == {} # nor any visit statuses actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in origins ) assert len(actual_visit_stats) == 0 # Grab some new visits next_visits = swh_scheduler.grab_next_visits( visit_type, count=len(origins), policy="origins_without_last_update", ) # we do have the one without any last update assert len(next_visits) == len(origins) # Now the global state got updated actual_state = swh_scheduler.visit_scheduler_queue_position_get() assert actual_state[visit_type] is not None actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in next_visits ) # Visit stats got algo created assert len(actual_visit_stats) == len(origins) def test_grab_next_visits_no_last_update_with_visit_stats( self, swh_scheduler, listed_origins_by_type ): """grab_next_visits should retrieve tasks without last update""" visit_type = next(iter(listed_origins_by_type)) origins = [] for origin in listed_origins_by_type[visit_type]: origins.append( attr.evolve(origin, last_update=None) ) # void the last update so we are in the relevant context assert len(origins) > 0 swh_scheduler.record_listed_origins(origins) # Initially, we have no global queue position actual_state = swh_scheduler.visit_scheduler_queue_position_get() assert actual_state == {} date_now = utcnow() # Simulate some of those origins have associated visit stats (some with an # existing queue position and some without any) visit_stats = ( [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), + last_visit=utcnow(), next_visit_queue_position=date_now + timedelta(days=random.uniform(-10, 1)), ) for origin in origins[:100] ] + [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), + last_visit=utcnow(), next_visit_queue_position=date_now + timedelta(days=random.uniform(1, 10)), # definitely > now() ) for origin in origins[100:150] ] + [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), + last_visit=utcnow(), ) for origin in origins[150:] ] ) swh_scheduler.origin_visit_stats_upsert(visit_stats) # Grab next visits actual_visits = swh_scheduler.grab_next_visits( visit_type, count=len(origins), policy="origins_without_last_update", ) assert len(actual_visits) == len(origins) actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in actual_visits ) assert len(actual_visit_stats) == len(origins) actual_state = swh_scheduler.visit_scheduler_queue_position_get() assert actual_state == { visit_type: max( s.next_visit_queue_position for s in actual_visit_stats if s.next_visit_queue_position is not None ) } def test_grab_next_visits_unknown_policy(self, swh_scheduler): unknown_policy = "non_existing_policy" NUM_RESULTS = 5 with pytest.raises(UnknownPolicy, match=unknown_policy): swh_scheduler.grab_next_visits("type", NUM_RESULTS, policy=unknown_policy) def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt) def test_origin_visit_stats_get_empty(self, swh_scheduler) -> None: assert swh_scheduler.origin_visit_stats_get([]) == [] def test_origin_visit_stats_get_pagination(self, swh_scheduler) -> None: page_size = inspect.signature(execute_values).parameters["page_size"].default visit_stats = [ OriginVisitStats( url=f"https://example.com/origin-{i:03d}", visit_type="git", - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), + last_visit=utcnow(), ) for i in range( page_size + 1 ) # Ensure overflow of the psycopg2.extras.execute_values page_size ] swh_scheduler.origin_visit_stats_upsert(visit_stats) assert set( swh_scheduler.origin_visit_stats_get( [(ovs.url, ovs.visit_type) for ovs in visit_stats] ) ) == set(visit_stats) def test_origin_visit_stats_upsert(self, swh_scheduler) -> None: eventful_date = utcnow() url = "https://github.com/test" visit_stats = OriginVisitStats( url=url, visit_type="git", - last_eventful=eventful_date, - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=eventful_date, + last_visit=eventful_date, ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] - uneventful_date = utcnow() + new_visit_date = utcnow() visit_stats = OriginVisitStats( - url=url, - visit_type="git", - last_eventful=None, - last_uneventful=uneventful_date, - last_failed=None, - last_notfound=None, + url=url, visit_type="git", last_successful=None, last_visit=new_visit_date, ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) uneventful_visits = swh_scheduler.origin_visit_stats_get([(url, "git")]) expected_visit_stats = OriginVisitStats( url=url, visit_type="git", - last_eventful=eventful_date, - last_uneventful=uneventful_date, - last_failed=None, - last_notfound=None, + last_successful=eventful_date, + last_visit=new_visit_date, ) assert uneventful_visits == [expected_visit_stats] - failed_date = utcnow() - visit_stats = OriginVisitStats( - url=url, - visit_type="git", - last_eventful=None, - last_uneventful=None, - last_failed=failed_date, - last_notfound=None, - ) - swh_scheduler.origin_visit_stats_upsert([visit_stats]) - - failed_visits = swh_scheduler.origin_visit_stats_get([(url, "git")]) - - expected_visit_stats = OriginVisitStats( - url=url, - visit_type="git", - last_eventful=eventful_date, - last_uneventful=uneventful_date, - last_failed=failed_date, - last_notfound=None, - ) - - assert failed_visits == [expected_visit_stats] - def test_origin_visit_stats_upsert_with_snapshot(self, swh_scheduler) -> None: eventful_date = utcnow() url = "https://github.com/666/test" visit_stats = OriginVisitStats( url=url, visit_type="git", - last_eventful=eventful_date, - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=eventful_date, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] def test_origin_visit_stats_upsert_batch(self, swh_scheduler) -> None: """Batch upsert is ok""" visit_stats = [ OriginVisitStats( url="foo", visit_type="git", - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ), OriginVisitStats( url="bar", visit_type="git", - last_eventful=None, - last_uneventful=utcnow(), - last_notfound=None, - last_failed=None, + last_visit=utcnow(), last_snapshot=hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157bfff"), ), ] swh_scheduler.origin_visit_stats_upsert(visit_stats) for visit_stat in swh_scheduler.origin_visit_stats_get( [(vs.url, vs.visit_type) for vs in visit_stats] ): assert visit_stat is not None def test_origin_visit_stats_upsert_cardinality_failing(self, swh_scheduler) -> None: """Batch upsert does not support altering multiple times the same origin-visit-status """ with pytest.raises(SchedulerException, match="CardinalityViolation"): swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url="foo", visit_type="git", - last_eventful=None, - last_uneventful=utcnow(), - last_notfound=None, - last_failed=None, - last_snapshot=None, + last_successful=None, + last_visit=utcnow(), ), OriginVisitStats( url="foo", visit_type="git", - last_eventful=None, - last_uneventful=utcnow(), - last_notfound=None, - last_failed=None, - last_snapshot=None, + last_successful=utcnow(), + last_visit=None, ), ] ) def test_visit_scheduler_queue_position( self, swh_scheduler, listed_origins ) -> None: result = swh_scheduler.visit_scheduler_queue_position_get() assert result == {} expected_result = {} visit_types = set() for origin in listed_origins: visit_type = origin.visit_type if visit_type in visit_types: continue visit_types.add(visit_type) position = utcnow() swh_scheduler.visit_scheduler_queue_position_set(visit_type, position) expected_result[visit_type] = position result = swh_scheduler.visit_scheduler_queue_position_get() assert result == expected_result def test_metrics_origins_known(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ret = swh_scheduler.update_metrics() assert sum(metric.origins_known for metric in ret) == len(listed_origins) def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) disabled_origin = attr.evolve(listed_origins[0], enabled=False) swh_scheduler.record_listed_origins([disabled_origin]) ret = swh_scheduler.update_metrics(lister_id=disabled_origin.lister_id) for metric in ret: if metric.visit_type == disabled_origin.visit_type: # We disabled one of these origins assert metric.origins_known - metric.origins_enabled == 1 else: # But these are still all enabled assert metric.origins_known == metric.origins_enabled def test_metrics_origins_never_visited(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) # Pretend that we've recorded a visit on one origin visited_origin = listed_origins[0] swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, - last_eventful=utcnow(), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=utcnow(), last_snapshot=hash_to_bytes( "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), ), ] ) ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) for metric in ret: if metric.visit_type == visited_origin.visit_type: # We visited one of these origins assert metric.origins_known - metric.origins_never_visited == 1 else: # But none of these have been visited assert metric.origins_known == metric.origins_never_visited def test_metrics_origins_with_pending_changes(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) # Pretend that we've recorded a visit on one origin, in the past with # respect to the "last update" time for the origin visited_origin = listed_origins[0] assert visited_origin.last_update is not None swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, - last_eventful=visited_origin.last_update - timedelta(days=1), - last_uneventful=None, - last_failed=None, - last_notfound=None, + last_successful=visited_origin.last_update - timedelta(days=1), last_snapshot=hash_to_bytes( "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), ), ] ) ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) for metric in ret: if metric.visit_type == visited_origin.visit_type: # We visited one of these origins, in the past assert metric.origins_with_pending_changes == 1 else: # But none of these have been visited assert metric.origins_with_pending_changes == 0 def test_update_metrics_explicit_lister(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) fake_uuid = uuid.uuid4() assert all(fake_uuid != origin.lister_id for origin in listed_origins) ret = swh_scheduler.update_metrics(lister_id=fake_uuid) assert len(ret) == 0 def test_update_metrics_explicit_timestamp(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ts = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) def test_update_metrics_twice(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ts = utcnow() ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) second_ts = ts + timedelta(seconds=1) ret = swh_scheduler.update_metrics(timestamp=second_ts) assert all(metric.last_update == second_ts for metric in ret) def test_get_metrics(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics() assert_metrics_equal(updated, retrieved) def test_get_metrics_by_lister(self, swh_scheduler, listed_origins): lister_id = listed_origins[0].lister_id assert lister_id is not None swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics(lister_id=lister_id) assert len(retrieved) > 0 assert_metrics_equal( [metric for metric in updated if metric.lister_id == lister_id], retrieved ) def test_get_metrics_by_visit_type(self, swh_scheduler, listed_origins): visit_type = listed_origins[0].visit_type assert visit_type is not None swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics(visit_type=visit_type) assert len(retrieved) > 0 assert_metrics_equal( [metric for metric in updated if metric.visit_type == visit_type], retrieved )