diff --git a/PKG-INFO b/PKG-INFO index 55ef8e3..364054a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,37 +1,37 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.18.2 +Version: 0.19.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator License-File: LICENSE License-File: LICENSE.Celery License-File: AUTHORS swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). diff --git a/requirements-test.txt b/requirements-test.txt index fc6c024..726caf0 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,10 +1,11 @@ pytest pytest-mock celery >= 4.3 hypothesis >= 3.11.0 swh.lister swh.storage[testing] types-click types-flask types-pyyaml types-requests +types-Deprecated diff --git a/requirements.txt b/requirements.txt index 87ed09c..1b8e3a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,17 +1,17 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html attrs attrs-strict celery >= 4.3, != 5.0.3 -click +click < 8.0 elasticsearch > 5.4 flask humanize pika >= 1.1.0 psycopg2 pyyaml requests setuptools typing-extensions diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 55ef8e3..364054a 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,37 +1,37 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.18.2 +Version: 0.19.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator License-File: LICENSE License-File: LICENSE.Celery License-File: AUTHORS swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index d76e928..cf90d4a 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,136 +1,137 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE LICENSE.Celery MANIFEST.in Makefile README.md conftest.py mypy.ini pyproject.toml pytest.ini requirements-journal.txt requirements-simulator.txt requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini data/README.md data/elastic-template.json data/update-index-settings.json docs/.gitignore docs/Makefile docs/cli.rst docs/conf.py docs/index.rst docs/simulator.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile sql/updates/02.sql sql/updates/03.sql sql/updates/04.sql sql/updates/05.sql sql/updates/06.sql sql/updates/07.sql sql/updates/08.sql sql/updates/09.sql sql/updates/10.sql sql/updates/11.sql sql/updates/12.sql sql/updates/13.sql sql/updates/14.sql sql/updates/15.sql sql/updates/16.sql sql/updates/17.sql sql/updates/18.sql sql/updates/19.sql sql/updates/20.sql sql/updates/23.sql sql/updates/24.sql sql/updates/25.sql sql/updates/26.sql sql/updates/27.sql sql/updates/28.sql sql/updates/29.sql sql/updates/30-bis.sql sql/updates/30.sql swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli_utils.py swh/scheduler/elasticsearch_memory.py swh/scheduler/exc.py swh/scheduler/interface.py swh/scheduler/journal_client.py swh/scheduler/model.py swh/scheduler/py.typed swh/scheduler/pytest_plugin.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/serializers.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py -swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/pika_listener.py +swh/scheduler/celery_backend/recurrent_visits.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py swh/scheduler/cli/admin.py swh/scheduler/cli/celery_monitor.py swh/scheduler/cli/journal.py swh/scheduler/cli/origin.py swh/scheduler/cli/simulator.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py swh/scheduler/cli/utils.py swh/scheduler/simulator/__init__.py swh/scheduler/simulator/common.py swh/scheduler/simulator/origin_scheduler.py swh/scheduler/simulator/origins.py swh/scheduler/simulator/task_scheduler.py swh/scheduler/sql/10-superuser-init.sql swh/scheduler/sql/30-schema.sql swh/scheduler/sql/40-func.sql swh/scheduler/sql/50-data.sql swh/scheduler/sql/60-indexes.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/common.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py swh/scheduler/tests/test_cli_celery_monitor.py swh/scheduler/tests/test_cli_journal.py swh/scheduler/tests/test_cli_origin.py swh/scheduler/tests/test_cli_task_type.py swh/scheduler/tests/test_common.py swh/scheduler/tests/test_config.py swh/scheduler/tests/test_init.py swh/scheduler/tests/test_journal_client.py swh/scheduler/tests/test_model.py +swh/scheduler/tests/test_recurrent_visits.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_simulator.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/es/__init__.py swh/scheduler/tests/es/conftest.py swh/scheduler/tests/es/test_backend_es.py swh/scheduler/tests/es/test_cli_task.py swh/scheduler/tests/es/test_elasticsearch_memory.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index a772f88..0fb232c 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,37 +1,38 @@ attrs attrs-strict celery!=5.0.3,>=4.3 -click +click<8.0 elasticsearch>5.4 flask humanize pika>=1.1.0 psycopg2 pyyaml requests setuptools typing-extensions swh.core[db,http]>=0.14.0 swh.storage>=0.11.1 [journal] swh.journal [simulator] plotille simpy<4,>=3 [testing] pytest pytest-mock celery>=4.3 hypothesis>=3.11.0 swh.lister swh.storage[testing] types-click types-flask types-pyyaml types-requests +types-Deprecated swh.journal plotille simpy<4,>=3 diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 19b48a0..309957a 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,1105 +1,1102 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID import attr from psycopg2.errors import CardinalityViolation from psycopg2.extensions import AsIs import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.scheduler.utils import utcnow from .exc import SchedulerException, StaleData, UnknownPolicy from .interface import ListedOriginPageToken, PaginatedListedOriginList from .model import ( LastVisitStatus, ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics, ) logger = logging.getLogger(__name__) def adapt_LastVisitStatus(v: LastVisitStatus): return AsIs(f"'{v.value}'::last_visit_status") psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(LastVisitStatus, adapt_LastVisitStatus) psycopg2.extras.register_uuid() def format_query(query, keys): """Format a query with the given keys""" query_keys = ", ".join(keys) placeholders = ", ".join(["%s"] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) class SchedulerBackend: """Backend for the Software Heritage scheduling database. """ def __init__(self, db, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = BaseDb(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() task_type_keys = [ "type", "description", "backend_name", "default_interval", "min_interval", "max_interval", "backoff_factor", "max_queue_length", "num_retries", "retry_delay", ] @db_transaction() def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] query = format_query( """insert into task_type ({keys}) values ({placeholders}) on conflict do nothing""", keys, ) cur.execute(query, [task_type[key] for key in keys]) @db_transaction() def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" query = format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cur.execute(query, (task_type_name,)) return cur.fetchone() @db_transaction() def get_task_types(self, db=None, cur=None): """Retrieve all registered task types""" query = format_query("select {keys} from task_type", self.task_type_keys,) cur.execute(query) return cur.fetchall() @db_transaction() def get_listers(self, db=None, cur=None) -> List[Lister]: """Retrieve information about all listers from the database. """ select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers """ cur.execute(query) return [Lister(**ret) for ret in cur.fetchall()] @db_transaction() def get_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Optional[Lister]: """Retrieve information about the given instance of the lister from the database. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers where (name, instance_name) = (%s, %s) """ cur.execute(query, (name, instance_name)) ret = cur.fetchone() if not ret: return None return Lister(**ret) @db_transaction() def get_or_create_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) insert_cols, insert_meta = ( ", ".join(tup) for tup in Lister.insert_columns_and_metavars() ) query = f""" with added as ( insert into listers ({insert_cols}) values ({insert_meta}) on conflict do nothing returning {select_cols} ) select {select_cols} from added union all select {select_cols} from listers where (name, instance_name) = (%(name)s, %(instance_name)s); """ cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name))) return Lister(**cur.fetchone()) @db_transaction() def update_lister(self, lister: Lister, db=None, cur=None) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ select_cols = ", ".join(Lister.select_columns()) set_vars = ", ".join( f"{col} = {meta}" for col, meta in zip(*Lister.insert_columns_and_metavars()) ) query = f"""update listers set {set_vars} where id=%(id)s and updated=%(updated)s returning {select_cols}""" cur.execute(query, attr.asdict(lister)) updated = cur.fetchone() if not updated: raise StaleData("Stale data; Lister state not updated") return Lister(**updated) @db_transaction() def record_listed_origins( self, listed_origins: Iterable[ListedOrigin], db=None, cur=None ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ pk_cols = ListedOrigin.primary_key_columns() select_cols = ListedOrigin.select_columns() insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars() upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols) query = f"""INSERT into listed_origins ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} RETURNING {", ".join(select_cols)} """ ret = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=(attr.asdict(origin) for origin in listed_origins), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=True, ) return [ListedOrigin(**d) for d in ret] @db_transaction() def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, db=None, cur=None, ) -> PaginatedListedOriginList: """Get information on the listed origins matching either the `url` or `lister_id`, or both arguments. """ query_filters: List[str] = [] query_params: List[Union[int, str, UUID, Tuple[UUID, str]]] = [] if lister_id: query_filters.append("lister_id = %s") query_params.append(lister_id) if url is not None: query_filters.append("url = %s") query_params.append(url) if page_token is not None: query_filters.append("(lister_id, url) > %s") # the typeshed annotation for tuple() is too strict. query_params.append(tuple(page_token)) # type: ignore query_params.append(limit) select_cols = ", ".join(ListedOrigin.select_columns()) if query_filters: where_clause = "where %s" % (" and ".join(query_filters)) else: where_clause = "" query = f"""SELECT {select_cols} from listed_origins {where_clause} ORDER BY lister_id, url LIMIT %s""" cur.execute(query, tuple(query_params)) origins = [ListedOrigin(**d) for d in cur] if len(origins) == limit: page_token = (str(origins[-1].lister_id), origins[-1].url) else: page_token = None return PaginatedListedOriginList(origins, page_token) @db_transaction() def grab_next_visits( self, visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, db=None, cur=None, ) -> List[ListedOrigin]: if timestamp is None: timestamp = utcnow() origin_select_cols = ", ".join(ListedOrigin.select_columns()) query_args: List[Any] = [] where_clauses = [] # list of (name, query) handled as CTEs before the main query common_table_expressions: List[Tuple[str, str]] = [] # "NOT enabled" = the lister said the origin no longer exists where_clauses.append("enabled" if enabled else "not enabled") # Only schedule visits of the given type where_clauses.append("visit_type = %s") query_args.append(visit_type) if scheduled_cooldown: # Don't re-schedule visits if they're already scheduled but we haven't # recorded a result yet, unless they've been scheduled more than a week # ago (it probably means we've lost them in flight somewhere). where_clauses.append( """origin_visit_stats.last_scheduled IS NULL OR origin_visit_stats.last_scheduled < GREATEST( - %s - %s, + %s, origin_visit_stats.last_visit ) """ ) - query_args.append(timestamp) - query_args.append(scheduled_cooldown) + query_args.append(timestamp - scheduled_cooldown) if failed_cooldown: # Don't retry failed origins too often where_clauses.append( "origin_visit_stats.last_visit_status is distinct from 'failed' " - "or origin_visit_stats.last_visit < %s - %s" + "or origin_visit_stats.last_visit < %s" ) - query_args.append(timestamp) - query_args.append(failed_cooldown) + query_args.append(timestamp - failed_cooldown) if not_found_cooldown: # Don't retry not found origins too often where_clauses.append( "origin_visit_stats.last_visit_status is distinct from 'not_found' " - "or origin_visit_stats.last_visit < %s - %s" + "or origin_visit_stats.last_visit < %s" ) - query_args.append(timestamp) - query_args.append(not_found_cooldown) + query_args.append(timestamp - not_found_cooldown) if policy == "oldest_scheduled_first": order_by = "origin_visit_stats.last_scheduled NULLS FIRST" elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NULL") # order by increasing last_update (oldest first) where_clauses.append("listed_origins.last_update IS NOT NULL") order_by = "listed_origins.last_update" elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? # visited origins have a NOT NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") # ignore origins we have visited after the known last update where_clauses.append("listed_origins.last_update IS NOT NULL") where_clauses.append( "listed_origins.last_update > origin_visit_stats.last_successful" ) # order by decreasing visit lag order_by = ( "listed_origins.last_update - origin_visit_stats.last_successful DESC" ) elif policy == "origins_without_last_update": where_clauses.append("last_update IS NULL") order_by = ", ".join( [ # By default, sort using the queue position. If the queue # position is null, then the origin has never been visited, # which we want to handle first "origin_visit_stats.next_visit_queue_position nulls first", # Schedule unknown origins in the order we've seen them "listed_origins.first_seen", ] ) # fmt: off # This policy requires updating the global queue position for this # visit type common_table_expressions.append(("update_queue_position", """ INSERT INTO visit_scheduler_queue_position(visit_type, position) SELECT visit_type, COALESCE(MAX(next_visit_queue_position), now()) FROM selected_origins GROUP BY visit_type ON CONFLICT(visit_type) DO UPDATE SET position=GREATEST( visit_scheduler_queue_position.position, EXCLUDED.position ) """)) # fmt: on else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") if tablesample: table = "listed_origins tablesample SYSTEM (%s)" query_args.insert(0, tablesample) else: table = "listed_origins" if lister_uuid: where_clauses.append("lister_id = %s") query_args.append(lister_uuid) # fmt: off common_table_expressions.insert(0, ("selected_origins", f""" SELECT {origin_select_cols}, next_visit_queue_position FROM {table} LEFT JOIN origin_visit_stats USING (url, visit_type) WHERE ({") AND (".join(where_clauses)}) ORDER BY {order_by} LIMIT %s """)) # fmt: on query_args.append(count) # fmt: off common_table_expressions.append(("update_stats", """ INSERT INTO origin_visit_stats (url, visit_type, last_scheduled) SELECT url, visit_type, %s FROM selected_origins ON CONFLICT (url, visit_type) DO UPDATE SET last_scheduled = GREATEST( origin_visit_stats.last_scheduled, EXCLUDED.last_scheduled ) """)) # fmt: on query_args.append(timestamp) formatted_ctes = ",\n".join( f"{name} AS (\n{cte}\n)" for name, cte in common_table_expressions ) query = f""" WITH {formatted_ctes} SELECT {origin_select_cols} FROM selected_origins """ cur.execute(query, tuple(query_args)) return [ListedOrigin(**d) for d in cur] task_create_keys = [ "type", "arguments", "next_run", "policy", "status", "retries_left", "priority", ] task_keys = task_create_keys + ["id", "current_interval"] @db_transaction() def create_tasks(self, tasks, policy="recurring", db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cur.execute("select swh_scheduler_mktemp_task()") db.copy_to( tasks, "tmp_task", self.task_create_keys, default_values={"policy": policy, "status": "next_run_not_scheduled"}, cur=cur, ) query = format_query( "select {keys} from swh_scheduler_create_tasks_from_temp()", self.task_keys, ) cur.execute(query) return cur.fetchall() @db_transaction() def set_status_tasks( self, task_ids: List[int], status: str = "disabled", next_run: Optional[datetime.datetime] = None, db=None, cur=None, ): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args: List[Any] = [status] if next_run: query.append(", next_run = %s") args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) cur.execute("".join(query), args) @db_transaction() def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" return self.set_status_tasks(task_ids, db=db, cur=cur) @db_transaction() def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, db=None, cur=None, ): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append("id = %s") else: where.append("id in %s") task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append("type = %s") else: where.append("type in %s") task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append("status = %s") else: where.append("status in %s") status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append("priority = %s") else: priority = tuple(priority) where.append("priority in %s") args.append(priority) if policy: where.append("policy = %s") args.append(policy) if before: where.append("next_run <= %s") args.append(before) if after: where.append("next_run >= %s") args.append(after) query = "select * from task" if where: query += " where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" query = format_query("select {keys} from task where id in %s", self.task_keys) cur.execute(query, (tuple(task_ids),)) return cur.fetchall() @db_transaction() def peek_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_no_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def grab_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def peek_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_any_ready_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s", task_type, cur.rowcount) return cur.fetchall() @db_transaction() def grab_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_any_ready_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s", task_type, cur.rowcount) return cur.fetchall() task_run_create_keys = ["task", "backend_id", "scheduled", "metadata"] @db_transaction() def schedule_task_run( self, task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)", (task_id, backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cur.execute("select swh_scheduler_mktemp_task_run()") db.copy_to(task_runs, "tmp_task_run", self.task_run_create_keys, cur=cur) cur.execute("select swh_scheduler_schedule_task_run_from_temp()") @db_transaction() def start_task_run( self, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_start_task_run(%s, %s, %s)", (backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, db=None, cur=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_end_task_run(%s, %s, %s, %s)", (backend_id, status, metadata, timestamp), ) return cur.fetchone() @db_transaction() def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ assert not page_token or isinstance(page_token, str) last_id = -1 if page_token is None else int(page_token) tasks = [] cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit + 1), ) for row in cur: task = dict(row) # nested type index does not accept bare values # transform it as a dict to comply with this task["arguments"]["args"] = { i: v for i, v in enumerate(task["arguments"]["args"]) } kwargs = task["arguments"]["kwargs"] task["arguments"]["kwargs"] = json.dumps(kwargs) tasks.append(task) if len(tasks) >= limit + 1: # remains data, add pagination information result = { "tasks": tasks[:limit], "next_page_token": str(tasks[-1]["task_id"]), } else: result = {"tasks": tasks} return result @db_transaction() def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id["task_id"]) _task_run_ids.append(task_id["task_run_id"]) cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids), ) task_run_keys = [ "id", "task", "backend_id", "scheduled", "started", "ended", "metadata", "status", ] @db_transaction() def get_task_runs(self, task_ids, limit=None, db=None, cur=None): """Search task run for a task id""" where = [] args = [] if task_ids: if isinstance(task_ids, (str, int)): where.append("task = %s") else: where.append("task in %s") task_ids = tuple(task_ids) args.append(task_ids) else: return () query = "select * from task_run where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None ) -> None: pk_cols = OriginVisitStats.primary_key_columns() insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join( f"{col} = coalesce(EXCLUDED.{col}, ovi.{col})" for col in upsert_cols ) query = f""" INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} """ try: psycopg2.extras.execute_values( cur=cur, sql=query, argslist=( attr.asdict(visit_stats) for visit_stats in origin_visit_stats ), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=False, ) except CardinalityViolation as e: raise SchedulerException(repr(e)) @db_transaction() def origin_visit_stats_get( self, ids: Iterable[Tuple[str, str]], db=None, cur=None ) -> List[OriginVisitStats]: if not ids: return [] primary_keys = tuple((origin, visit_type) for (origin, visit_type) in ids) query = format_query( """ SELECT {keys} FROM (VALUES %s) as stats(url, visit_type) INNER JOIN origin_visit_stats USING (url, visit_type) """, OriginVisitStats.select_columns(), ) rows = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=primary_keys, fetch=True ) return [OriginVisitStats(**row) for row in rows] @db_transaction() def visit_scheduler_queue_position_get( self, db=None, cur=None, ) -> Dict[str, datetime.datetime]: cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position") return {row["visit_type"]: row["position"] for row in cur} @db_transaction() def visit_scheduler_queue_position_set( self, visit_type: str, position: datetime.datetime, db=None, cur=None, ) -> None: query = """ INSERT INTO visit_scheduler_queue_position(visit_type, position) VALUES(%s, %s) ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position """ cur.execute(query, (visit_type, position)) @db_transaction() def update_metrics( self, lister_id: Optional[UUID] = None, timestamp: Optional[datetime.datetime] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: """Update the performance metrics of this scheduler instance. Returns the updated metrics. Args: lister_id: if passed, update the metrics only for this lister instance timestamp: if passed, the date at which we're updating the metrics, defaults to the database NOW() """ query = format_query( "SELECT {keys} FROM update_metrics(%s, %s)", SchedulerMetrics.select_columns(), ) cur.execute(query, (lister_id, timestamp)) return [SchedulerMetrics(**row) for row in cur.fetchall()] @db_transaction() def get_metrics( self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: """Retrieve the performance metrics of this scheduler instance. Args: lister_id: filter the metrics for this lister instance only visit_type: filter the metrics for this visit type only """ where_filters = [] where_args = [] if lister_id: where_filters.append("lister_id = %s") where_args.append(str(lister_id)) if visit_type: where_filters.append("visit_type = %s") where_args.append(visit_type) where_clause = "" if where_filters: where_clause = f"where {' and '.join(where_filters)}" query = format_query( "SELECT {keys} FROM scheduler_metrics %s" % where_clause, SchedulerMetrics.select_columns(), ) cur.execute(query, tuple(where_args)) return [SchedulerMetrics(**row) for row in cur.fetchall()] diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py deleted file mode 100644 index 9306fa0..0000000 --- a/swh/scheduler/celery_backend/listener.py +++ /dev/null @@ -1,220 +0,0 @@ -# Copyright (C) 2015-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -import logging -import sys -import time - -import celery -from celery.events import EventReceiver -import click -from kombu import Queue - -from swh.core.statsd import statsd -from swh.scheduler.utils import utcnow - - -class ReliableEventReceiver(EventReceiver): - def __init__( - self, - channel, - handlers=None, - routing_key="#", - node_id=None, - app=None, - queue_prefix="celeryev", - accept=None, - ): - super(ReliableEventReceiver, self).__init__( - channel, handlers, routing_key, node_id, app, queue_prefix, accept - ) - - self.queue = Queue( - ".".join([self.queue_prefix, self.node_id]), - exchange=self.exchange, - routing_key=self.routing_key, - auto_delete=False, - durable=True, - ) - - def get_consumers(self, consumer, channel): - return [ - consumer( - queues=[self.queue], - callbacks=[self._receive], - no_ack=False, - accept=self.accept, - ) - ] - - def _receive(self, bodies, message): - if not isinstance(bodies, list): # celery<4 returned body as element - bodies = [bodies] - for body in bodies: - type, body = self.event_from_message(body) - self.process(type, body, message) - - def process(self, type, event, message): - """Process the received event by dispatching it to the appropriate - handler.""" - handler = self.handlers.get(type) or self.handlers.get("*") - if handler: - handler(event, message) - statsd.increment( - "swh_scheduler_listener_handled_event_total", tags={"event_type": type} - ) - - -ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) -ACTION_QUEUE_MAX_LENGTH = 1000 - - -def event_monitor(app, backend): - logger = logging.getLogger("swh.scheduler.listener") - actions = { - "last_send": utcnow() - 2 * ACTION_SEND_DELAY, - "queue": [], - } - - def try_perform_actions(actions=actions): - logger.debug("Try perform pending actions") - if actions["queue"] and ( - len(actions["queue"]) > ACTION_QUEUE_MAX_LENGTH - or utcnow() - actions["last_send"] > ACTION_SEND_DELAY - ): - perform_actions(actions) - - def perform_actions(actions, backend=backend): - logger.info("Perform %s pending actions" % len(actions["queue"])) - action_map = { - "start_task_run": backend.start_task_run, - "end_task_run": backend.end_task_run, - } - - messages = [] - db = backend.get_db() - try: - cursor = db.cursor(None) - for action in actions["queue"]: - messages.append(action["message"]) - function = action_map[action["action"]] - args = action.get("args", ()) - kwargs = action.get("kwargs", {}) - kwargs["cur"] = cursor - function(*args, **kwargs) - - except Exception: - db.conn.rollback() - else: - db.conn.commit() - finally: - backend.put_db(db) - - for message in messages: - if not message.acknowledged: - message.ack() - actions["queue"] = [] - actions["last_send"] = utcnow() - - def queue_action(action, actions=actions): - actions["queue"].append(action) - try_perform_actions() - - def catchall_event(event, message): - logger.debug("event: %s %s", event["type"], event.get("name", "N/A")) - if not message.acknowledged: - message.ack() - try_perform_actions() - - def task_started(event, message): - logger.debug("task_started: %s %s", event["type"], event.get("name", "N/A")) - - queue_action( - { - "action": "start_task_run", - "args": [event["uuid"]], - "kwargs": { - "timestamp": utcnow(), - "metadata": {"worker": event["hostname"],}, - }, - "message": message, - } - ) - - def task_succeeded(event, message): - logger.debug("task_succeeded: event: %s" % event) - logger.debug(" message: %s" % message) - result = event["result"] - - logger.debug("task_succeeded: result: %s" % result) - try: - status = result.get("status") - if status == "success": - status = "eventful" if result.get("eventful") else "uneventful" - except Exception: - status = "eventful" if result else "uneventful" - - queue_action( - { - "action": "end_task_run", - "args": [event["uuid"]], - "kwargs": {"timestamp": utcnow(), "status": status, "result": result,}, - "message": message, - } - ) - - def task_failed(event, message): - logger.debug("task_failed: event: %s" % event) - logger.debug(" message: %s" % message) - - queue_action( - { - "action": "end_task_run", - "args": [event["uuid"]], - "kwargs": {"timestamp": utcnow(), "status": "failed",}, - "message": message, - } - ) - - recv = ReliableEventReceiver( - celery.current_app.connection(), - app=celery.current_app, - handlers={ - "task-started": task_started, - "task-result": task_succeeded, - "task-failed": task_failed, - "*": catchall_event, - }, - node_id="listener", - ) - - errors = 0 - while True: - try: - recv.capture(limit=None, timeout=None, wakeup=True) - errors = 0 - except KeyboardInterrupt: - logger.exception("Keyboard interrupt, exiting") - break - except Exception: - logger.exception("Unexpected exception") - if errors < 5: - time.sleep(errors) - errors += 1 - else: - logger.error("Too many consecutive errors, exiting") - sys.exit(1) - - -@click.command() -@click.pass_context -def main(ctx): - click.echo("Deprecated! Use 'swh-scheduler listener' instead.", err=True) - ctx.exit(1) - - -if __name__ == "__main__": - main() diff --git a/swh/scheduler/celery_backend/pika_listener.py b/swh/scheduler/celery_backend/pika_listener.py index 7df7bec..d895d5d 100644 --- a/swh/scheduler/celery_backend/pika_listener.py +++ b/swh/scheduler/celery_backend/pika_listener.py @@ -1,98 +1,107 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +"""This is the scheduler listener. It is in charge of listening to rabbitmq events (the +task result) and flushes the "oneshot" tasks' status in the scheduler backend. It's the +final step after a task is done. + +The scheduler runner :mod:`swh.scheduler.celery_backend.runner` is the module in charge +of pushing tasks in the queue. + +""" + import json import logging import sys import pika from swh.core.statsd import statsd from swh.scheduler import get_scheduler from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) def get_listener(broker_url, queue_name, scheduler_backend): connection = pika.BlockingConnection(pika.URLParameters(broker_url)) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) exchange = "celeryev" routing_key = "#" channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key) channel.basic_qos(prefetch_count=1000) channel.basic_consume( queue=queue_name, on_message_callback=get_on_message(scheduler_backend), ) return channel def get_on_message(scheduler_backend): def on_message(channel, method_frame, properties, body): try: events = json.loads(body) except Exception: logger.warning("Could not parse body %r", body) events = [] if not isinstance(events, list): events = [events] for event in events: logger.debug("Received event %r", event) process_event(event, scheduler_backend) channel.basic_ack(delivery_tag=method_frame.delivery_tag) return on_message def process_event(event, scheduler_backend): uuid = event.get("uuid") if not uuid: return event_type = event["type"] statsd.increment( "swh_scheduler_listener_handled_event_total", tags={"event_type": event_type} ) if event_type == "task-started": scheduler_backend.start_task_run( uuid, timestamp=utcnow(), metadata={"worker": event.get("hostname")}, ) elif event_type == "task-result": result = event["result"] status = None if isinstance(result, dict) and "status" in result: status = result["status"] if status == "success": status = "eventful" if result.get("eventful") else "uneventful" if status is None: status = "eventful" if result else "uneventful" scheduler_backend.end_task_run( uuid, timestamp=utcnow(), status=status, result=result ) elif event_type == "task-failed": scheduler_backend.end_task_run(uuid, timestamp=utcnow(), status="failed") if __name__ == "__main__": url = sys.argv[1] logging.basicConfig(level=logging.DEBUG) scheduler_backend = get_scheduler("local", args={"db": "service=swh-scheduler"}) channel = get_listener(url, "celeryev.test", scheduler_backend) logger.info("Start consuming") channel.start_consuming() diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py new file mode 100644 index 0000000..04d2045 --- /dev/null +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -0,0 +1,301 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This schedules the recurrent visits, for listed origins, in Celery. + +For "oneshot" (save code now, lister) tasks, check the +:mod:`swh.scheduler.celery_backend.runner` and +:mod:`swh.scheduler.celery_backend.pika_listener` modules. + +""" + +from __future__ import annotations + +from itertools import chain +import logging +from queue import Empty, Queue +import random +from threading import Thread +import time +from typing import TYPE_CHECKING, Any, Dict, List, Tuple + +from kombu.utils.uuid import uuid + +from swh.scheduler.celery_backend.config import get_available_slots + +if TYPE_CHECKING: + from ..interface import SchedulerInterface + from ..model import ListedOrigin + +logger = logging.getLogger(__name__) + + +_VCS_POLICY_RATIOS = { + "already_visited_order_by_lag": 0.49, + "never_visited_oldest_update_first": 0.49, + "origins_without_last_update": 0.02, +} + +# Default policy ratio, let's start that configuration in the module first +POLICY_RATIO: Dict[str, Dict[str, float]] = { + "default": { + "already_visited_order_by_lag": 0.5, + "never_visited_oldest_update_first": 0.5, + }, + "git": _VCS_POLICY_RATIOS, + "hg": _VCS_POLICY_RATIOS, + "svn": _VCS_POLICY_RATIOS, + "cvs": _VCS_POLICY_RATIOS, + "bzr": _VCS_POLICY_RATIOS, +} + + +MIN_SLOTS_RATIO = 0.05 +"""Quantity of slots that need to be available (with respect to max_queue_length) for +`grab_next_visits` to trigger""" + +QUEUE_FULL_BACKOFF = 60 +"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in +the queue.""" + +NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 +"""Backoff time (in seconds) if no origins have been scheduled in the current +iteration""" + +BACKOFF_SPLAY = 5.0 +"""Amplitude of the fuzziness between backoffs""" + +TERMINATE = object() +"""Termination request received from command queue (singleton used for identity +comparison)""" + + +def grab_next_visits_policy_ratio( + scheduler: SchedulerInterface, visit_type: str, num_visits: int +) -> List[ListedOrigin]: + """Get the next `num_visits` for the given `visit_type` using the corresponding + set of scheduling policies. + + The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies + used to pull the next tasks, and what proportion of the available num_visits + they take. + + This function emits a warning if the ratio of retrieved origins is off of + the requested ratio by more than 5%. + + Returns: + at most `num_visits` `ListedOrigin` objects + """ + policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) + + fetched_origins: Dict[str, List[ListedOrigin]] = {} + + for policy, ratio in policy_ratio.items(): + num_tasks_to_send = int(num_visits * ratio) + fetched_origins[policy] = scheduler.grab_next_visits( + visit_type, num_tasks_to_send, policy=policy + ) + + all_origins: List[ListedOrigin] = list( + chain.from_iterable(fetched_origins.values()) + ) + if not all_origins: + return [] + + # Check whether the ratios of origins fetched are skewed with respect to the + # ones we requested + fetched_origin_ratios = { + policy: len(origins) / len(all_origins) + for policy, origins in fetched_origins.items() + } + + for policy, expected_ratio in policy_ratio.items(): + # 5% of skew with respect to request + if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: + logger.info( + "Skewed fetch for visit type %s with policy %s: fetched %s, " + "requested %s", + visit_type, + policy, + fetched_origin_ratios[policy], + expected_ratio, + ) + + return all_origins + + +def splay(): + """Return a random short interval by which to vary the backoffs for the visit + scheduling threads""" + return random.uniform(0, BACKOFF_SPLAY) + + +def send_visits_for_visit_type( + scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, +) -> float: + """Schedule the next batch of visits for the given `visit_type`. + + First, we determine the number of available slots by introspecting the + RabbitMQ queue. + + If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait + for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive + `grab_next_visits` queries when there's not many jobs to queue. + + Once there's more than `MIN_SLOTS_RATIO` slots available, we run + `get_next_visits` to retrieve the next set of origin visits to schedule, and + we send them to celery. + + If the last scheduling attempt didn't return any origins, we sleep for + `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive + `grab_next_visits` queries too often if there's nothing left to schedule. + + Returns: + the earliest `time.monotonic` value at which to run the next iteration of + the loop. + + """ + queue_name = task_type["backend_name"] + max_queue_length = task_type.get("max_queue_length") or 0 + min_available_slots = max_queue_length * MIN_SLOTS_RATIO + + current_iteration_start = time.monotonic() + + # Check queue level + available_slots = get_available_slots(app, queue_name, max_queue_length) + logger.debug( + "%s available slots for visit type %s in queue %s", + available_slots, + visit_type, + queue_name, + ) + if available_slots < min_available_slots: + return current_iteration_start + QUEUE_FULL_BACKOFF + + origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots) + + if not origins: + logger.debug("No origins to visit for type %s", visit_type) + return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF + + # Try to smooth the ingestion load, origins pulled by different + # scheduling policies have different resource usage patterns + random.shuffle(origins) + + for origin in origins: + task_dict = origin.as_task_dict() + app.send_task( + queue_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + logger.info( + "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, + ) + + # When everything worked, we can try to schedule origins again ASAP. + return time.monotonic() + + +def visit_scheduler_thread( + config: Dict, + visit_type: str, + command_queue: Queue[object], + exc_queue: Queue[Tuple[str, BaseException]], +): + """Target function for the visit sending thread, which initializes local + connections and handles exceptions by sending them back to the main thread.""" + + from swh.scheduler import get_scheduler + from swh.scheduler.celery_backend.config import build_app + + try: + # We need to reinitialize these connections because they're not generally + # thread-safe + app = build_app(config.get("celery")) + scheduler = get_scheduler(**config["scheduler"]) + task_type = scheduler.get_task_type(f"load-{visit_type}") + + if task_type is None: + raise ValueError(f"Unknown task type: load-{visit_type}") + + next_iteration = time.monotonic() + + while True: + # vary the next iteration time a little bit + next_iteration = next_iteration + splay() + while time.monotonic() < next_iteration: + # Wait for next iteration to start. Listen for termination message. + try: + msg = command_queue.get(block=True, timeout=1) + except Empty: + continue + + if msg is TERMINATE: + return + else: + logger.warn("Received unexpected message %s in command queue", msg) + + next_iteration = send_visits_for_visit_type( + scheduler, app, visit_type, task_type + ) + + except BaseException as e: + exc_queue.put((visit_type, e)) + + +VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] +"""Dict storing the visit scheduler threads and their command queues""" + + +def spawn_visit_scheduler_thread( + threads: VisitSchedulerThreads, + exc_queue: Queue[Tuple[str, BaseException]], + config: Dict[str, Any], + visit_type: str, +): + """Spawn a new thread to schedule the visits of type `visit_type`.""" + command_queue: Queue[object] = Queue() + thread = Thread( + target=visit_scheduler_thread, + kwargs={ + "config": config, + "visit_type": visit_type, + "command_queue": command_queue, + "exc_queue": exc_queue, + }, + ) + threads[visit_type] = (thread, command_queue) + thread.start() + + +def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: + """Terminate all visit scheduler threads""" + logger.info("Termination requested...") + for _, command_queue in threads.values(): + command_queue.put(TERMINATE) + + loops = 0 + while threads and loops < 10: + logger.info( + "Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) + ) + loops += 1 + for visit_type, (thread, _) in list(threads.items()): + thread.join(timeout=1) + if not thread.is_alive(): + logger.debug("Thread %s terminated", visit_type) + del threads[visit_type] + + if threads: + logger.warn( + "Could not reap the following threads after 10 attempts: %s", + ", ".join(sorted(threads)), + ) + + return list(sorted(threads)) diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index 52ce42a..53b2e25 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,169 +1,180 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +"""This is the first scheduler runner. It is in charge of scheduling "oneshot" tasks +(e.g save code now, indexer, vault, deposit, ...). To do this, it reads tasks ouf of the +scheduler backend and pushes those to their associated rabbitmq queues. + +The scheduler listener :mod:`swh.scheduler.celery_backend.pika_listener` is the module +in charge of finalizing the task results. + +""" + import logging from typing import Dict, List, Tuple +from deprecated import deprecated from kombu.utils.uuid import uuid from swh.core.statsd import statsd from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import get_available_slots from swh.scheduler.interface import SchedulerInterface from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks( backend: SchedulerInterface, app, task_types: List[Dict] = [], with_priority: bool = False, ) -> List[Dict]: """Schedule tasks ready to be scheduled. This lookups any tasks per task type and mass schedules those accordingly (send messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler backend). If tasks (per task type) with priority exist, they will get redirected to dedicated high priority queue (standard queue name prefixed with `save_code_now:`). Args: backend: scheduler backend to interact with (read/update tasks) app (App): Celery application to send tasks to task_types: The list of task types dict to iterate over. By default, empty. When empty, the full list of task types referenced in the scheduler will be used. with_priority: If True, only tasks with priority set will be fetched and scheduled. By default, False. Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks: List[Dict] = [] while True: if not task_types: task_types = backend.get_task_types() task_types_d = {} pending_tasks = [] for task_type in task_types: task_type_name = task_type["type"] task_types_d[task_type_name] = task_type max_queue_length = task_type["max_queue_length"] if max_queue_length is None: max_queue_length = 0 backend_name = task_type["backend_name"] if with_priority: # grab max_queue_length (or 10) potential tasks with any priority for # the same type (limit the result to avoid too long running queries) grabbed_priority_tasks = backend.grab_ready_priority_tasks( task_type_name, num_tasks=max_queue_length or 10 ) if grabbed_priority_tasks: pending_tasks.extend(grabbed_priority_tasks) logger.info( "Grabbed %s tasks %s (priority)", len(grabbed_priority_tasks), task_type_name, ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", len(grabbed_priority_tasks), tags={"task_type": task_type_name}, ) else: num_tasks = get_available_slots(app, backend_name, max_queue_length) # only pull tasks if the buffer is at least 1/5th empty (= 80% # full), to help postgresql use properly indexed queries. if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: # Only grab num_tasks tasks with no priority grabbed_tasks = backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks ) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) logger.info( "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", len(grabbed_tasks), tags={"task_type": task_type_name}, ) if not pending_tasks: return all_backend_tasks backend_tasks = [] celery_tasks: List[Tuple[bool, str, str, List, Dict]] = [] for task in pending_tasks: args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] backend_name = task_types_d[task["type"]]["backend_name"] backend_id = uuid() celery_tasks.append( ( task.get("priority") is not None, backend_name, backend_id, args, kwargs, ) ) data = { "task": task["id"], "backend_id": backend_id, "scheduled": utcnow(), } backend_tasks.append(data) logger.debug("Sent %s celery tasks", len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) for with_priority, backend_name, backend_id, args, kwargs in celery_tasks: kw = dict(task_id=backend_id, args=args, kwargs=kwargs,) if with_priority: kw["queue"] = f"save_code_now:{backend_name}" app.send_task(backend_name, **kw) all_backend_tasks.extend(backend_tasks) +@deprecated(version="0.18", reason="Use `swh scheduler start-runner` instead") def main(): from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler("local") try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py index 0becbe8..861651e 100644 --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -1,137 +1,226 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import time +from typing import List, Tuple import click from . import cli @cli.command("start-runner") @click.option( "--period", "-p", default=0, help=( "Period (in s) at witch pending tasks are checked and " "executed. Set to 0 (default) for a one shot." ), ) @click.option( "--task-type", "task_type_names", multiple=True, default=[], help=( "Task types to schedule. If not provided, this iterates over every " "task types referenced in the scheduler backend." ), ) @click.option( "--with-priority/--without-priority", is_flag=True, default=False, help=( "Determine if those tasks should be the ones with priority or not." "By default, this deals with tasks without any priority." ), ) @click.pass_context def runner(ctx, period, task_type_names, with_priority): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.config import build_app from swh.scheduler.celery_backend.runner import run_ready_tasks config = ctx.obj["config"] app = build_app(config.get("celery")) app.set_current() logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] - logger.debug("Scheduler %s" % scheduler) + logger.debug("Scheduler %s", scheduler) task_types = [] for task_type_name in task_type_names: task_type = scheduler.get_task_type(task_type_name) if not task_type: raise ValueError(f"Unknown {task_type_name}") task_types.append(task_type) try: while True: logger.debug("Run ready tasks") try: ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) if ntasks: logger.info("Scheduled %s tasks", ntasks) except Exception: logger.exception("Unexpected error in run_ready_tasks()") if not period: break time.sleep(period) except KeyboardInterrupt: ctx.exit(0) @cli.command("start-listener") @click.pass_context def listener(ctx): """Starts a swh-scheduler listener service. This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" scheduler_backend = ctx.obj["scheduler"] if not scheduler_backend: raise ValueError("Scheduler class (local/remote) must be instantiated") broker = ( ctx.obj["config"] .get("celery", {}) .get("task_broker", "amqp://guest@localhost/%2f") ) from swh.scheduler.celery_backend.pika_listener import get_listener listener = get_listener(broker, "celeryev.listener", scheduler_backend) try: listener.start_consuming() finally: listener.stop_consuming() +@cli.command("schedule-recurrent") +@click.option( + "--visit-type", + "visit_types", + multiple=True, + default=[], + help=( + "Visit types to schedule. If not provided, this iterates over every " + "corresponding load task types referenced in the scheduler backend." + ), +) +@click.pass_context +def schedule_recurrent(ctx, visit_types: List[str]): + """Starts the scheduler for recurrent visits. + + This runs one thread for each visit type, which regularly sends new visits + to celery. + + """ + from queue import Queue + + from swh.scheduler.celery_backend.recurrent_visits import ( + VisitSchedulerThreads, + logger, + spawn_visit_scheduler_thread, + terminate_visit_scheduler_threads, + ) + + config = ctx.obj["config"] + scheduler = ctx.obj["scheduler"] + + if not visit_types: + visit_types = [] + # Figure out which visit types exist in the scheduler + all_task_types = scheduler.get_task_types() + for task_type in all_task_types: + if not task_type["type"].startswith("load-"): + # only consider loading tasks as recurring ones, the rest is dismissed + continue + # get visit type name from task type + visit_types.append(task_type["type"][5:]) + else: + # Check that the passed visit types exist in the scheduler + for visit_type in visit_types: + task_type_name = f"load-{visit_type}" + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown task type: {task_type_name}") + + exc_queue: Queue[Tuple[str, BaseException]] = Queue() + threads: VisitSchedulerThreads = {} + + try: + # Spawn initial threads + for visit_type in visit_types: + spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type) + + # Handle exceptions from child threads + while True: + visit_type, exc_info = exc_queue.get(block=True) + + logger.exception( + "Thread %s died with exception; respawning", + visit_type, + exc_info=exc_info, + ) + + dead_thread = threads[visit_type][0] + dead_thread.join(timeout=1) + + if dead_thread.is_alive(): + logger.warn( + "The thread for %s is still alive after sending an exception?! " + "Respawning anyway.", + visit_type, + ) + + spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type) + + except SystemExit: + remaining_threads = terminate_visit_scheduler_threads(threads) + if remaining_threads: + ctx.exit(1) + ctx.exit(0) + + @cli.command("rpc-serve") @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") @click.option("--port", default=5008, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=None, help=( "Indicates if the server should run in debug mode. " "Defaults to True if log-level is DEBUG, False otherwise." ), ) @click.pass_context def rpc_server(ctx, host, port, debug): """Starts a swh-scheduler API HTTP server. """ if ctx.obj["config"]["scheduler"]["cls"] == "remote": click.echo( "The API server can only be started with a 'local' " "configuration", err=True, ) ctx.exit(1) from swh.scheduler.api import server server.app.config.update(ctx.obj["config"]) if debug is None: debug = ctx.obj["log_level"] <= logging.DEBUG server.app.run(host, port=port, debug=bool(debug)) diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 775b38a..4872516 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,62 +1,72 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os from typing import Dict, List +from unittest.mock import patch import pytest from swh.scheduler.model import ListedOrigin, Lister from swh.scheduler.tests.common import LISTERS # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith("CELERY")]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ["LC_ALL"] = "C.UTF-8" @pytest.fixture def stored_lister(swh_scheduler) -> Lister: """Store a lister in the scheduler and return its information""" return swh_scheduler.get_or_create_lister(**LISTERS[0]) @pytest.fixture def visit_types() -> List[str]: """Possible visit types in `ListedOrigin`s""" return ["git", "svn"] @pytest.fixture def listed_origins_by_type( stored_lister: Lister, visit_types: List[str] ) -> Dict[str, List[ListedOrigin]]: """A fixed list of `ListedOrigin`s, for each `visit_type`.""" count_per_type = 1000 assert stored_lister.id return { visit_type: [ ListedOrigin( lister_id=stored_lister.id, url=f"https://{visit_type}.example.com/{i:04d}", visit_type=visit_type, last_update=datetime( 2020, 6, 15, 16, 0, 0, j * count_per_type + i, tzinfo=timezone.utc ), ) for i in range(count_per_type) ] for j, visit_type in enumerate(visit_types) } @pytest.fixture def listed_origins(listed_origins_by_type) -> List[ListedOrigin]: """Return a (fixed) set of listed origins""" return sum(listed_origins_by_type.values(), []) + + +@pytest.fixture +def storage(swh_storage): + """An instance of in-memory storage that gets injected + into the CLI functions.""" + with patch("swh.storage.get_storage") as get_storage_mock: + get_storage_mock.return_value = swh_storage + yield swh_storage diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py index 044a1d8..62b83fb 100644 --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -1,856 +1,847 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from itertools import islice import logging import random import re import tempfile from unittest.mock import patch from click.testing import CliRunner import pytest from swh.core.api.classes import stream_results from swh.model.model import Origin from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict, utcnow CLI_CONFIG = """ scheduler: cls: foo args: {} """ def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch( "swh.scheduler.get_scheduler" ) as get_scheduler_mock, tempfile.NamedTemporaryFile( "a", suffix=".yml" ) as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler args = ["-C" + config_fd.name,] + args result = runner.invoke(cli, args, obj={"log_level": logging.WARNING}) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_schedule_tasks(swh_scheduler): csv_data = ( b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" ) with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(csv_data) csv_fd.seek(0) result = invoke( swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name] ) expected = r""" Created 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg1', 'arg2'\] Keyword args: key: 'value' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg3', 'arg4'\] Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_tasks_columns(swh_scheduler): with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n') csv_fd.seek(0) result = invoke( swh_scheduler, False, [ "task", "schedule", "-c", "type", "-c", "policy", "-c", "args", "-c", "kwargs", "-d", ";", csv_fd.name, ], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_task(swh_scheduler): result = invoke( swh_scheduler, False, ["task", "add", "swh-test-ping", "arg1", "arg2", "key=value",], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_none(swh_scheduler): result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 1 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter(swh_scheduler): task = create_task_dict("swh-test-multiping", "oneshot", key="value") swh_scheduler.create_tasks([task]) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter_2(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-multiping", "oneshot", key="value"), create_task_dict("swh-test-ping", "oneshot", key="value2"), ] ) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # Fails because "task list-pending --limit 3" only returns 2 tasks, because # of how compute_nb_tasks_from works. @pytest.mark.xfail def test_list_pending_tasks_limit(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-ping", "oneshot", key="value%d" % i) for i in range(10) ] ) result = invoke( swh_scheduler, False, ["task", "list-pending", "swh-test-ping", "--limit", "3",] ) expected = r""" Found 2 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value0' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3) task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: tomorrow \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke(swh_scheduler, False, ["task", "list",]) expected = r""" Found 2 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, ["task", "list", "--task-id", "2",]) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id_2(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"] ) expected = r""" Found 2 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_type(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"] ) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_limit(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, ["task", "list", "--limit", "2",]) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_after(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--after", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def _fill_storage_with_origins(storage, nb_origins): origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)] storage.origin_add(origins) return origins -@pytest.fixture -def storage(swh_storage): - """An instance of in-memory storage that gets injected - into the CLI functions.""" - with patch("swh.storage.get_storage") as get_storage_mock: - get_storage_mock.return_value = swh_storage - yield swh_storage - - @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins_dry_run(swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a divisor of nb_origins.""" _fill_storage_with_origins(storage, 90) result = invoke( swh_scheduler, False, ["task", "schedule_origins", "--dry-run", "swh-test-ping",], ) # Check the output expected = r""" Scheduled 3 tasks \(30 origins\). Scheduled 6 tasks \(60 origins\). Scheduled 9 tasks \(90 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check scheduled tasks tasks = swh_scheduler.search_tasks() assert len(tasks) == 0 def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins): # check there are not too many tasks assert len(tasks) <= max_tasks # check tasks are not too large assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks) # check the tasks are exhaustive assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len( expected_origins ) assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == { origin.url for origin in expected_origins } @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins(swh_scheduler, storage): """Tests the scheduling when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" origins = _fill_storage_with_origins(storage, 70) result = invoke( swh_scheduler, False, ["task", "schedule_origins", "swh-test-ping", "--batch-size", "20",], ) # Check the output expected = r""" Scheduled 3 tasks \(60 origins\). Scheduled 4 tasks \(70 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 4, 20, origins) assert all(task["arguments"]["kwargs"] == {} for task in tasks) def test_task_schedule_origins_kwargs(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" origins = _fill_storage_with_origins(storage, 30) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", "20", 'key1="value1"', 'key2="value2"', ], ) # Check the output expected = r""" Scheduled 2 tasks \(30 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 2, 20, origins) assert all( task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} for task in tasks ) def test_task_schedule_origins_with_limit(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" _fill_storage_with_origins(storage, 50) limit = 20 expected_origins = list(islice(stream_results(storage.origin_list), limit)) nb_origins = len(expected_origins) assert nb_origins == limit max_task_size = 5 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 # made the numbers go round result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--limit", limit, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_task_schedule_origins_with_page_token(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" nb_total_origins = 50 origins = _fill_storage_with_origins(storage, nb_total_origins) # prepare page_token and origins result expectancy page_result = storage.origin_list(limit=10) assert len(page_result.results) == 10 page_token = page_result.next_page_token assert page_token is not None # remove the first 10 origins listed as we won't see those in tasks expected_origins = [o for o in origins if o not in page_result.results] nb_origins = len(expected_origins) assert nb_origins == nb_total_origins - len(page_result.results) max_task_size = 10 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--page-token", page_token, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_cli_task_runner_unknown_task_types(swh_scheduler, storage): """When passing at least one unknown task type, the runner should fail.""" task_types = swh_scheduler.get_task_types() task_type_names = [t["type"] for t in task_types] known_task_type = random.choice(task_type_names) unknown_task_type = "unknown-task-type" assert unknown_task_type not in task_type_names with pytest.raises(ValueError, match="Unknown"): invoke( swh_scheduler, False, [ "start-runner", "--task-type", known_task_type, "--task-type", unknown_task_type, ], ) @pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"]) def test_cli_task_runner_with_known_tasks( swh_scheduler, storage, caplog, flag_priority ): """Trigger runner with known tasks runs smoothly.""" task_types = swh_scheduler.get_task_types() task_type_names = [t["type"] for t in task_types] task_type_name = random.choice(task_type_names) task_type_name2 = random.choice(task_type_names) # The runner will just iterate over the following known tasks and do noop. We are # just checking the runner does not explode here. result = invoke( swh_scheduler, False, [ "start-runner", flag_priority, "--task-type", task_type_name, "--task-type", task_type_name2, ], ) assert result.exit_code == 0, result.output def test_cli_task_runner_no_task(swh_scheduler, storage): """Trigger runner with no parameter should run as before.""" # The runner will just iterate over the existing tasks from the scheduler and do # noop. We are just checking the runner does not explode here. result = invoke(swh_scheduler, False, ["start-runner",],) assert result.exit_code == 0, result.output diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py new file mode 100644 index 0000000..d118fbf --- /dev/null +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -0,0 +1,180 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import timedelta +import logging +from queue import Queue +from unittest.mock import MagicMock + +import pytest + +from swh.scheduler.celery_backend.recurrent_visits import ( + VisitSchedulerThreads, + send_visits_for_visit_type, + spawn_visit_scheduler_thread, + terminate_visit_scheduler_threads, + visit_scheduler_thread, +) + +from .test_cli import invoke + +TEST_MAX_QUEUE = 10000 +MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits" + + +def _compute_backend_name(visit_type: str) -> str: + "Build a dummy reproducible backend name" + return f"swh.loader.{visit_type}.tasks" + + +@pytest.fixture +def swh_scheduler(swh_scheduler): + """Override default fixture of the scheduler to install some more task types.""" + for visit_type in ["git", "hg", "svn"]: + task_type = f"load-{visit_type}" + swh_scheduler.create_task_type( + { + "type": task_type, + "max_queue_length": TEST_MAX_QUEUE, + "description": "The {} testing task".format(task_type), + "backend_name": _compute_backend_name(visit_type), + "default_interval": timedelta(days=1), + "min_interval": timedelta(hours=6), + "max_interval": timedelta(days=12), + } + ) + return swh_scheduler + + +def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): + """When passed an unknown visit type, the recurrent visit scheduler should refuse + to start.""" + + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + False, + ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"], + ) + + +def test_cli_schedule_recurrent_noop(swh_scheduler, mocker): + """When passing no visit types, the recurrent visit scheduler should start.""" + + spawn_visit_scheduler_thread = mocker.patch( + f"{MODULE_NAME}.spawn_visit_scheduler_thread" + ) + spawn_visit_scheduler_thread.side_effect = SystemExit + # The actual scheduling threads won't spawn, they'll immediately terminate. This + # only exercises the logic to pull task types out of the database + + result = invoke(swh_scheduler, False, ["schedule-recurrent"]) + assert result.exit_code == 0, result.output + + +def test_recurrent_visit_scheduling( + swh_scheduler, caplog, listed_origins_by_type, mocker, +): + """Scheduling known tasks is ok.""" + + caplog.set_level(logging.DEBUG, MODULE_NAME) + nb_origins = 1000 + + mock_celery_app = MagicMock() + mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") + mock_available_slots.return_value = nb_origins # Slots available in queue + + # Make sure the scheduler is properly configured in terms of visit/task types + all_task_types = { + task_type_d["type"]: task_type_d + for task_type_d in swh_scheduler.get_task_types() + } + + visit_types = list(listed_origins_by_type.keys()) + assert len(visit_types) > 0 + + task_types = [] + origins = [] + for visit_type, _origins in listed_origins_by_type.items(): + origins.extend(swh_scheduler.record_listed_origins(_origins)) + task_type_name = f"load-{visit_type}" + assert task_type_name in all_task_types.keys() + task_type = all_task_types[task_type_name] + task_type["visit_type"] = visit_type + # we'll limit the orchestrator to the origins' type we know + task_types.append(task_type) + + for visit_type in ["git", "svn"]: + task_type = f"load-{visit_type}" + send_visits_for_visit_type( + swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type] + ) + + assert mock_available_slots.called, "The available slots functions should be called" + + records = [record.message for record in caplog.records] + + # Mapping over the dict ratio/policies entries can change overall order so let's + # check the set of records + expected_records = set() + for task_type in task_types: + visit_type = task_type["visit_type"] + queue_name = task_type["backend_name"] + msg = ( + f"{nb_origins} available slots for visit type {visit_type} " + f"in queue {queue_name}" + ) + expected_records.add(msg) + + for expected_record in expected_records: + assert expected_record in set(records) + + +@pytest.fixture +def scheduler_config(swh_scheduler_config): + return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}} + + +def test_visit_scheduler_thread_unknown_task( + swh_scheduler, scheduler_config, +): + """Starting a thread with unknown task type reports the error""" + + unknown_visit_type = "unknown" + command_queue = Queue() + exc_queue = Queue() + + visit_scheduler_thread( + scheduler_config, unknown_visit_type, command_queue, exc_queue + ) + + assert command_queue.empty() is True + assert exc_queue.empty() is False + assert len(exc_queue.queue) == 1 + result = exc_queue.queue.pop() + assert result[0] == unknown_visit_type + assert isinstance(result[1], ValueError) + + +def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker): + """Spawning and terminating threads runs smoothly""" + + threads: VisitSchedulerThreads = {} + exc_queue = Queue() + mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") + mock_build_app.return_value = MagicMock() + + assert len(threads) == 0 + for visit_type in visit_types: + spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) + + # This actually only checks the spawning and terminating logic is sound + + assert len(threads) == len(visit_types) + + actual_threads = terminate_visit_scheduler_threads(threads) + assert not len(actual_threads) + + assert mock_build_app.called