diff --git a/PKG-INFO b/PKG-INFO index afab4c3..84b01ba 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.61 +Version: 0.0.62 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler -Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index afab4c3..84b01ba 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.61 +Version: 0.0.62 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler -Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index d5dfd88..20837c6 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,65 +1,64 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt bin/swh-worker-control swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli_utils.py swh/scheduler/py.typed swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/server.py -swh/scheduler/api/wsgi.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py swh/scheduler/cli/admin.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py swh/scheduler/cli/utils.py swh/scheduler/sql/30-swh-schema.sql swh/scheduler/sql/40-swh-func.sql swh/scheduler/sql/50-swh-data.sql swh/scheduler/sql/60-swh-indexes.sql swh/scheduler/sql/updater/10-swh-init.sql swh/scheduler/sql/updater/30-swh-schema.sql swh/scheduler/sql/updater/40-swh-func.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/updater/__init__.py swh/scheduler/tests/updater/conftest.py swh/scheduler/tests/updater/test_backend.py swh/scheduler/tests/updater/test_consumer.py swh/scheduler/tests/updater/test_events.py swh/scheduler/tests/updater/test_ghtorrent.py swh/scheduler/tests/updater/test_writer.py swh/scheduler/updater/__init__.py swh/scheduler/updater/backend.py swh/scheduler/updater/consumer.py swh/scheduler/updater/events.py swh/scheduler/updater/writer.py swh/scheduler/updater/ghtorrent/__init__.py swh/scheduler/updater/ghtorrent/cli.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index 1949565..857239a 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,16 +1,16 @@ arrow celery>=4 Click elasticsearch>5.4 flask psycopg2 pyyaml vcversioner swh.core[db,http]>=0.0.65 swh.storage>=0.0.129 [testing] pytest<4 -pytest-postgresql +pytest-postgresql>=2.1.0 celery>=4 hypothesis>=3.11.0 diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py index 7c07cc4..1d84894 100644 --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -1,106 +1,109 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient class RemoteScheduler(RPCClient): """Proxy to a remote scheduler API """ def close_connection(self): return self.post('close_connection', {}) def set_status_tasks(self, task_ids, status='disabled', next_run=None): return self.post('set_status_tasks', dict( task_ids=task_ids, status=status, next_run=next_run)) def create_task_type(self, task_type): return self.post('create_task_type', {'task_type': task_type}) def get_task_type(self, task_type_name): return self.post('get_task_type', {'task_type_name': task_type_name}) def get_task_types(self): return self.post('get_task_types', {}) def create_tasks(self, tasks): return self.post('create_tasks', {'tasks': tasks}) def disable_tasks(self, task_ids): return self.post('disable_tasks', {'task_ids': task_ids}) def get_tasks(self, task_ids): return self.post('get_tasks', {'task_ids': task_ids}) def get_task_runs(self, task_ids, limit=None): return self.post( 'get_task_runs', {'task_ids': task_ids, 'limit': limit}) def search_tasks(self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None): return self.post('search_tasks', dict( task_id=task_id, task_type=task_type, status=status, priority=priority, policy=policy, before=before, after=after, limit=limit)) def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None): return self.post('peek_ready_tasks', { 'task_type': task_type, 'timestamp': timestamp, 'num_tasks': num_tasks, 'num_tasks_priority': num_tasks_priority, }) def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None): return self.post('grab_ready_tasks', { 'task_type': task_type, 'timestamp': timestamp, 'num_tasks': num_tasks, 'num_tasks_priority': num_tasks_priority, }) def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): return self.post('schedule_task_run', { 'task_id': task_id, 'backend_id': backend_id, 'metadata': metadata, 'timestamp': timestamp, }) def mass_schedule_task_runs(self, task_runs): return self.post('mass_schedule_task_runs', {'task_runs': task_runs}) def start_task_run(self, backend_id, metadata=None, timestamp=None): return self.post('start_task_run', { 'backend_id': backend_id, 'metadata': metadata, 'timestamp': timestamp, }) def end_task_run(self, backend_id, status, metadata=None, timestamp=None): return self.post('end_task_run', { 'backend_id': backend_id, 'status': status, 'metadata': metadata, 'timestamp': timestamp, }) def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1): return self.post('filter_task_to_archive', { 'after_ts': after_ts, 'before_ts': before_ts, 'limit': limit, 'last_id': last_id, }) def delete_archived_tasks(self, task_ids): return self.post('delete_archived_tasks', {'task_ids': task_ids}) + + def get_priority_ratios(self): + return self.get('get_priority_ratios') diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 3cdb28d..8f243be 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,259 +1,266 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging from flask import request, Flask from swh.core import config from swh.core.api import (decode_request, error_handler, encode_data_server as encode_data) from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter from swh.scheduler import get_scheduler as get_scheduler_from app = Flask(__name__) scheduler = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) def get_sched(): global scheduler if not scheduler: scheduler = get_scheduler_from(**app.config['scheduler']) return scheduler def has_no_empty_params(rule): return len(rule.defaults or ()) >= len(rule.arguments or ()) @app.route('/') @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def index(): return 'SWH Scheduler API server' @app.route('/close_connection', methods=['GET', 'POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def close_connection(): return get_sched().close_connection() @app.route('/set_status_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def set_status_tasks(): return get_sched().set_status_tasks(**decode_request(request)) @app.route('/create_task_type', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def create_task_type(): return get_sched().create_task_type(**decode_request(request)) @app.route('/get_task_type', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def get_task_type(): return get_sched().get_task_type(**decode_request(request)) @app.route('/get_task_types', methods=['GET', 'POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def get_task_types(): return get_sched().get_task_types(**decode_request(request)) @app.route('/create_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def create_tasks(): return get_sched().create_tasks(**decode_request(request)) @app.route('/disable_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def disable_tasks(): return get_sched().disable_tasks(**decode_request(request)) @app.route('/get_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def get_tasks(): return get_sched().get_tasks(**decode_request(request)) @app.route('/get_task_runs', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def get_task_runs(): return get_sched().get_task_runs(**decode_request(request)) @app.route('/search_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def search_tasks(): return get_sched().search_tasks(**decode_request(request)) @app.route('/peek_ready_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def peek_ready_tasks(): return get_sched().peek_ready_tasks(**decode_request(request)) @app.route('/grab_ready_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def grab_ready_tasks(): return get_sched().grab_ready_tasks(**decode_request(request)) @app.route('/schedule_task_run', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def schedule_task_run(): return get_sched().schedule_task_run(**decode_request(request)) @app.route('/mass_schedule_task_runs', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def mass_schedule_task_runs(): return get_sched().mass_schedule_task_runs(**decode_request(request)) @app.route('/start_task_run', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def start_task_run(): return get_sched().start_task_run(**decode_request(request)) @app.route('/end_task_run', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def end_task_run(): return get_sched().end_task_run(**decode_request(request)) @app.route('/filter_task_to_archive', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def filter_task_to_archive(): return get_sched().filter_task_to_archive(**decode_request(request)) @app.route('/delete_archived_tasks', methods=['POST']) @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def delete_archived_tasks(): return get_sched().delete_archived_tasks(**decode_request(request)) +@app.route('/get_priority_ratios', methods=['GET', 'POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) +def get_priority_ratios(): + return get_sched().get_priority_ratios(**decode_request(request)) + + @app.route("/site-map") @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def site_map(): links = [] sched = get_sched() for rule in app.url_map.iter_rules(): if has_no_empty_params(rule) and hasattr(sched, rule.endpoint): links.append(dict( rule=rule.rule, description=getattr(sched, rule.endpoint).__doc__)) # links is now a list of url, endpoint tuples return links def load_and_check_config(config_file, type='local'): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError('Configuration file must be defined') if not os.path.exists(config_file): raise FileNotFoundError('Configuration file %s does not exist' % ( config_file, )) cfg = config.read(config_file) vcfg = cfg.get('scheduler') if not vcfg: raise KeyError("Missing '%scheduler' configuration") if type == 'local': cls = vcfg.get('cls') if cls != 'local': raise ValueError( "The scheduler backend can only be started with a 'local' " "configuration") args = vcfg.get('args') if not args: raise KeyError( "Invalid configuration; missing 'args' config entry") db = args.get('db') if not db: raise KeyError( "Invalid configuration; missing 'db' config entry") return cfg api_cfg = None def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_file = os.environ.get('SWH_CONFIG_FILENAME') api_cfg = load_and_check_config(config_file) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == '__main__': print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/api/wsgi.py b/swh/scheduler/api/wsgi.py deleted file mode 100644 index 02c4901..0000000 --- a/swh/scheduler/api/wsgi.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from .server import make_app_from_configfile - -application = make_app_from_configfile() diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 4a042d1..f57ca46 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,488 +1,493 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from arrow import Arrow, utcnow import psycopg2.pool import psycopg2.extras from psycopg2.extensions import AsIs from swh.core.db import BaseDb from swh.core.db.common import db_transaction, db_transaction_generator logger = logging.getLogger(__name__) def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) def format_query(query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) class SchedulerBackend: """Backend for the Software Heritage scheduling database. """ def __init__(self, db, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = BaseDb(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay', ] @db_transaction() def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] query = format_query( """insert into task_type ({keys}) values ({placeholders})""", keys) cur.execute(query, [task_type[key] for key in keys]) @db_transaction() def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" query = format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cur.execute(query, (task_type_name,)) return cur.fetchone() @db_transaction() def get_task_types(self, db=None, cur=None): """Retrieve all registered task types""" query = format_query( "select {keys} from task_type", self.task_type_keys, ) cur.execute(query) return cur.fetchall() task_create_keys = [ 'type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority' ] task_keys = task_create_keys + ['id', 'current_interval'] @db_transaction() def create_tasks(self, tasks, policy='recurring', db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cur.execute('select swh_scheduler_mktemp_task()') db.copy_to(tasks, 'tmp_task', self.task_create_keys, default_values={ 'policy': policy, 'status': 'next_run_not_scheduled' }, cur=cur) query = format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cur.execute(query) return cur.fetchall() @db_transaction() def set_status_tasks(self, task_ids, status='disabled', next_run=None, db=None, cur=None): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args = [status] if next_run: query.append(', next_run = %s') args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) cur.execute(''.join(query), args) @db_transaction() def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" return self.set_status_tasks(task_ids, db=db, cur=cur) @db_transaction() def search_tasks(self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, db=None, cur=None): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append('id = %s') else: where.append('id in %s') task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append('type = %s') else: where.append('type in %s') task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append('status = %s') else: where.append('status in %s') status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append('priority = %s') else: priority = tuple(priority) where.append('priority in %s') args.append(priority) if policy: where.append('policy = %s') args.append(policy) if before: where.append('next_run <= %s') args.append(before) if after: where.append('next_run >= %s') args.append(after) query = 'select * from task' if where: query += ' where ' + ' and '.join(where) if limit: query += ' limit %s :: bigint' args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" query = format_query('select {keys} from task where id in %s', self.task_keys) cur.execute(query, (tuple(task_ids),)) return cur.fetchall() @db_transaction() def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, db=None, cur=None): """Fetch the list of ready tasks Args: task_type (str): filtering task per their type timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks (with no priority) num_tasks_priority (int): only peek at num_tasks_priority tasks (with priority) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cur.execute( '''select * from swh_scheduler_peek_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)''', (task_type, timestamp, num_tasks, num_tasks_priority) ) logger.debug('PEEK %s => %s' % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, db=None, cur=None): """Fetch the list of ready tasks, and mark them as scheduled Args: task_type (str): filtering task per their type timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks (with no priority) num_tasks_priority (int): only grab oneshot num_tasks tasks (with priorities) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cur.execute( '''select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)''', (task_type, timestamp, num_tasks, num_tasks_priority) ) logger.debug('GRAB %s => %s' % (task_type, cur.rowcount)) return cur.fetchall() task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] @db_transaction() def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) return cur.fetchone() @db_transaction() def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cur.execute('select swh_scheduler_mktemp_task_run()') db.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, cur=cur) cur.execute('select swh_scheduler_schedule_task_run_from_temp()') @db_transaction() def start_task_run(self, backend_id, metadata=None, timestamp=None, db=None, cur=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) return cur.fetchone() @db_transaction() def end_task_run(self, backend_id, status, metadata=None, timestamp=None, result=None, db=None, cur=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) return cur.fetchone() @db_transaction_generator() def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, db=None, cur=None): """Returns the list of task/task_run prior to a given date to archive. """ last_task_run_id = None while True: row = None cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit) ) for row in cur: # nested type index does not accept bare values # transform it as a dict to comply with this row['arguments']['args'] = { i: v for i, v in enumerate(row['arguments']['args']) } kwargs = row['arguments']['kwargs'] row['arguments']['kwargs'] = json.dumps(kwargs) yield row if not row: break _id = row.get('task_id') _task_run_id = row.get('task_run_id') if last_id == _id and last_task_run_id == _task_run_id: break last_id = _id last_task_run_id = _task_run_id @db_transaction() def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id['task_id']) _task_run_ids.append(task_id['task_run_id']) cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids)) task_run_keys = ['id', 'task', 'backend_id', 'scheduled', 'started', 'ended', 'metadata', 'status', ] @db_transaction() def get_task_runs(self, task_ids, limit=None, db=None, cur=None): """Search task run for a task id""" where = [] args = [] if task_ids: if isinstance(task_ids, (str, int)): where.append('task = %s') else: where.append('task in %s') task_ids = tuple(task_ids) args.append(task_ids) else: return () query = 'select * from task_run where ' + ' and '.join(where) if limit: query += ' limit %s :: bigint' args.append(limit) cur.execute(query, args) return cur.fetchall() + + @db_transaction() + def get_priority_ratios(self, db=None, cur=None): + cur.execute('select id, ratio from priority_ratio') + return {row['id']: row['ratio'] for row in cur.fetchall()} diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index e273471..4580dee 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,284 +1,293 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import pkg_resources import urllib.parse from celery import Celery from celery.signals import setup_logging, celeryd_after_setup from celery.utils.log import ColorFormatter from celery.worker.control import Panel from kombu import Exchange, Queue from kombu.five import monotonic as _monotonic import requests from typing import Any, Dict from swh.scheduler import CONFIG as SWH_CONFIG from swh.core.config import load_named_config, merge_configs -from swh.core.logger import JournalHandler + +try: + from swh.core.logger import JournalHandler +except ImportError: + JournalHandler = None # type: ignore + DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' CONFIG_NAME_TEMPLATE = 'worker/%s' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } logger = logging.getLogger(__name__) @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, log_console=None, log_journal=None, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG if isinstance(loglevel, str): loglevel = logging._nameToLevel[loglevel] formatter = logging.Formatter(format) root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) log_target = os.environ.get('SWH_LOG_TARGET', 'console') if log_target == 'console': log_console = True elif log_target == 'journal': log_journal = True # this looks for log levels *higher* than DEBUG if loglevel <= logging.DEBUG and log_console is None: log_console = True if log_console: color_formatter = ColorFormatter(format) if colorize else formatter console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) root_logger.addHandler(console) if log_journal: - systemd_journal = JournalHandler() - systemd_journal.setLevel(logging.DEBUG) - systemd_journal.setFormatter(formatter) - root_logger.addHandler(systemd_journal) + if not JournalHandler: + root_logger.warning('JournalHandler is not available, skipping. ' + 'Please install swh-core[logging].') + else: + systemd_journal = JournalHandler() + systemd_journal.setLevel(logging.DEBUG) + systemd_journal.setFormatter(formatter) + root_logger.addHandler(systemd_journal) logging.getLogger('celery').setLevel(logging.INFO) # Silence amqp heartbeat_tick messages logger = logging.getLogger('amqp') logger.addFilter(lambda record: not record.msg.startswith( 'heartbeat_tick')) logger.setLevel(loglevel) # Silence useless "Starting new HTTP connection" messages logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('swh').setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task logging.getLogger('celery.task').setLevel(loglevel) return loglevel @celeryd_after_setup.connect def setup_queues_and_tasks(sender, instance, **kwargs): """Signal called on worker start. This automatically registers swh.scheduler.task.Task subclasses as available celery tasks. This also subscribes the worker to the "implicit" per-task queues defined for these task classes. """ logger.info('Setup Queues & Tasks for %s', sender) instance.app.conf['worker_name'] = sender @Panel.register def monotonic(state): """Get the current value for the monotonic clock""" return {'monotonic': _monotonic()} def route_for_task(name, args, kwargs, options, task=None, **kw): """Route tasks according to the task_queue attribute in the task class""" if name is not None and name.startswith('swh.'): return {'queue': name} def get_queue_stats(app, queue_name): """Get the statistics regarding a queue on the broker. Arguments: queue_name: name of the queue to check Returns a dictionary raw from the RabbitMQ management API; or `None` if the current configuration does not use RabbitMQ. Interesting keys: - Consumers (number of consumers for the queue) - messages (number of messages in queue) - messages_unacknowledged (number of messages currently being processed) Documentation: https://www.rabbitmq.com/management.html#http-api """ conn_info = app.connection().info() if conn_info['transport'] == 'memory': # We're running in a test environment, without RabbitMQ. return None url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( hostname=conn_info['hostname'], port=conn_info['port'] + 10000, vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), queue=urllib.parse.quote(queue_name, safe=''), ) credentials = (conn_info['userid'], conn_info['password']) r = requests.get(url, auth=credentials) if r.status_code == 404: return {} if r.status_code != 200: raise ValueError('Got error %s when reading queue stats: %s' % ( r.status_code, r.json())) return r.json() def get_queue_length(app, queue_name): """Shortcut to get a queue's length""" stats = get_queue_stats(app, queue_name) if stats: return stats.get('messages') def register_task_class(app, name, cls): """Register a class-based task under the given name""" if name in app.tasks: return task_instance = cls() task_instance.name = name app.register_task(task_instance) INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) CONFIG_NAME = os.environ.get('SWH_CONFIG_FILENAME') CONFIG = {} # type: Dict[str, Any] if CONFIG_NAME: # load the celery config from the main config file given as # SWH_CONFIG_FILENAME environment variable. # This is expected to have a [celery] section in which we have the # celery specific configuration. SWH_CONFIG.clear() SWH_CONFIG.update(load_named_config(CONFIG_NAME)) CONFIG = SWH_CONFIG.get('celery', {}) if not CONFIG: # otherwise, back to compat config loading mechanism if INSTANCE_NAME: CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME else: CONFIG_NAME = DEFAULT_CONFIG_NAME # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) CONFIG.setdefault('task_modules', []) # load tasks modules declared as plugin entry points for entrypoint in pkg_resources.iter_entry_points('swh.workers'): worker_registrer_fn = entrypoint.load() # The registry function is expected to return a dict which the 'tasks' key # is a string (or a list of strings) with the name of the python module in # which celery tasks are defined. task_modules = worker_registrer_fn().get('task_modules', []) CONFIG['task_modules'].extend(task_modules) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] CELERY_DEFAULT_CONFIG = dict( # Timezone configuration: all in UTC enable_utc=True, timezone='UTC', # Imported modules imports=CONFIG.get('task_modules', []), # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. result_expires=None, # A string identifying the default serialization method to use. Can # be json (default), pickle, yaml, msgpack, or any custom # serialization methods that have been registered with task_serializer='msgpack', # Result serialization format result_serializer='msgpack', # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. task_acks_late=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry accept_content=['msgpack', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. task_track_started=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # result_compression='bzip2', # task_compression='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) worker_disable_rate_limits=True, # Task routing task_routes=route_for_task, # Allow pool restarts from remote worker_pool_restarts=True, # Do not prefetch tasks worker_prefetch_multiplier=1, # Send events worker_send_task_events=True, # Do not send useless task_sent events task_send_sent_event=False, ) def build_app(config=None): config = merge_configs( {k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, config or {}) config['task_queues'] = CELERY_QUEUES + [ Queue(queue, Exchange(queue), routing_key=queue) for queue in config.get('task_queues', ())] logger.debug('Creating a Celery app with %s', config) # Instantiate the Celery app app = Celery(broker=config['task_broker'], task_cls='swh.scheduler.task:SWHTask') app.add_defaults(CELERY_DEFAULT_CONFIG) app.add_defaults(config) return app app = build_app(CONFIG) # XXX for BW compat Celery.get_queue_length = get_queue_length diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index e4e9e05..fbfb5d2 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,100 +1,105 @@ +# Copyright (C) 2016-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + import os import pytest import glob from datetime import timedelta import pkg_resources from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR +from swh.scheduler.tests.tasks import register_test_tasks + # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: os.environ.pop(var) -import swh.scheduler.celery_backend.config # noqa -# this import is needed here to enforce creation of the celery current app -# BEFORE the swh_app fixture is called, otherwise the Celery app instance from -# celery_backend.config becomes the celery.current_app - # test_cli tests depends on a en/C locale, so ensure it os.environ['LC_ALL'] = 'C.UTF-8' DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ['ping', 'multiping', 'add', 'error'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): task_modules = [ 'swh.scheduler.tests.tasks', ] for entrypoint in pkg_resources.iter_entry_points('swh.workers'): task_modules.extend(entrypoint.load()().get('task_modules', [])) return task_modules @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'json', } # override the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application -# with the test application. +# with the test application (and also register test tasks) @pytest.fixture(scope='session') def swh_app(celery_session_app): - swh.scheduler.celery_backend.config.app = celery_session_app - yield celery_session_app + from swh.scheduler.celery_backend.config import app + register_test_tasks(celery_session_app) + app = celery_session_app # noqa + yield app @pytest.fixture -def swh_scheduler(request, postgresql_proc, postgresql): +def swh_scheduler(postgresql): scheduler_config = { - 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( - host=postgresql_proc.host, - port=postgresql_proc.port, - user='postgres', - dbname='tests') + 'db': postgresql.dsn, } - all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() scheduler = get_scheduler('local', scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler + + +# this alias is used to be able to easily instantiate a db-backed Scheduler +# eg. for the RPC client/server test suite. +swh_db_scheduler = swh_scheduler diff --git a/swh/scheduler/tests/tasks.py b/swh/scheduler/tests/tasks.py index b18df46..89f7476 100644 --- a/swh/scheduler/tests/tasks.py +++ b/swh/scheduler/tests/tasks.py @@ -1,31 +1,43 @@ -from celery import group, current_app as app - - -@app.task(name='swh.scheduler.tests.tasks.ping', bind=True) -def ping(self, **kw): - # check this is a SWHTask - assert hasattr(self, 'log') - assert not hasattr(self, 'run_task') - assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] - self.log.debug(self.name) - if kw: - return 'OK (kw=%s)' % kw - return 'OK' - - -@app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) -def multiping(self, n=10): - promise = group(ping.s(i=i) for i in range(n))() - self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) - promise.save() - return promise.id - - -@app.task(name='swh.scheduler.tests.tasks.error') -def not_implemented(): - raise NotImplementedError('Nope') - - -@app.task(name='swh.scheduler.tests.tasks.add') -def add(x, y): - return x + y +# Copyright (C) 2018-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from celery import group + + +def register_test_tasks(app): + """Register test tasks for the specific app passed as parameter. + + In the test context, app is the swh_app and not the runtime one. + + Args: + app: Celery app. Expects the tests application + (swh.scheduler.tests.conftest.swh_app) + + """ + @app.task(name='swh.scheduler.tests.tasks.ping', bind=True) + def ping(self, **kw): + # check this is a SWHTask + assert hasattr(self, 'log') + assert not hasattr(self, 'run_task') + assert 'SWHTask' in [x.__name__ for x in self.__class__.__mro__] + self.log.debug(self.name) + if kw: + return 'OK (kw=%s)' % kw + return 'OK' + + @app.task(name='swh.scheduler.tests.tasks.multiping', bind=True) + def multiping(self, n=10): + promise = group(ping.s(i=i) for i in range(n))() + self.log.debug('%s OK (spawned %s subtasks)' % (self.name, n)) + promise.save() + return promise.id + + @app.task(name='swh.scheduler.tests.tasks.error') + def not_implemented(): + raise NotImplementedError('Nope') + + @app.task(name='swh.scheduler.tests.tasks.add') + def add(x, y): + return x + y diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index e3c194b..92ccbff 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,54 +1,48 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest -import requests - -from swh.core.api.tests.server_testing import ServerTestFixture - -from swh.scheduler import get_scheduler -from swh.scheduler.api.server import app -from swh.scheduler.tests.test_scheduler import CommonSchedulerTest - - -class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture, - unittest.TestCase): - """Test the remote scheduler API. - - This class doesn't define any tests as we want identical - functionality between local and remote scheduler. All the tests are - therefore defined in CommonSchedulerTest. - """ - - def setUp(self): - self.config = { - 'scheduler': { - 'cls': 'local', - 'args': { - 'db': 'dbname=%s' % self.TEST_DB_NAME, - } - } - } - self.app = app # this will setup the local scheduler... - super().setUp() - # accessible through a remote scheduler accessible on the - # given port - self.backend = get_scheduler('remote', {'url': self.url()}) - - def test_site_map(self): - sitemap = requests.get(self.url() + 'site-map') - assert sitemap.headers['Content-Type'] == 'application/json' - sitemap = sitemap.json() - - rules = set(x['rule'] for x in sitemap) - # we expect at least these rules - expected_rules = set('/'+rule for rule in ( - 'set_status_tasks', 'create_task_type', - 'get_task_type', 'get_task_types', 'create_tasks', 'disable_tasks', - 'get_tasks', 'search_tasks', 'peek_ready_tasks', - 'grab_ready_tasks', 'schedule_task_run', 'mass_schedule_task_runs', - 'start_task_run', 'end_task_run', 'filter_task_to_archive', - 'delete_archived_tasks')) - assert rules.issuperset(expected_rules), expected_rules - rules +import pytest +from flask import url_for + +import swh.scheduler.api.server as server +from swh.scheduler.api.client import RemoteScheduler +from swh.scheduler.tests.test_scheduler import TestScheduler # noqa + +# tests are executed using imported class (TestScheduler) using overloaded +# swh_scheduler fixture below + + +# the Flask app used as server in these tests +@pytest.fixture +def app(swh_db_scheduler): + server.scheduler = swh_db_scheduler + yield server.app + + +# the RPCClient class used as client used in these tests +@pytest.fixture +def swh_rpc_client_class(): + return RemoteScheduler + + +@pytest.fixture +def swh_scheduler(swh_rpc_client, app): + yield swh_rpc_client + + +def test_site_map(flask_app_client): + sitemap = flask_app_client.get(url_for('site_map')) + assert sitemap.headers['Content-Type'] == 'application/json' + + rules = set(x['rule'] for x in sitemap.json) + # we expect at least these rules + expected_rules = set('/'+rule for rule in ( + 'set_status_tasks', 'create_task_type', + 'get_task_type', 'get_task_types', 'create_tasks', 'disable_tasks', + 'get_tasks', 'search_tasks', 'get_task_runs', 'peek_ready_tasks', + 'grab_ready_tasks', 'schedule_task_run', 'mass_schedule_task_runs', + 'start_task_run', 'end_task_run', 'filter_task_to_archive', + 'delete_archived_tasks', 'get_priority_ratios')) + assert rules == expected_rules diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 712bf5d..3172dcb 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,637 +1,596 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime -import os import random -import unittest import uuid from collections import defaultdict import psycopg2 from arrow import utcnow import pytest -from swh.core.db.tests.db_testing import SingleDbTestFixture -from swh.scheduler import get_scheduler - -from . import SQL_DIR - TASK_TYPES = { 'git': { 'type': 'update-git', 'description': 'Update a git repository', 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), }, 'hg': { 'type': 'update-hg', 'description': 'Update a mercurial repository', 'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), }, } TEMPLATES = { 'git': { 'type': 'update-git', 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, }, 'hg': { 'type': 'update-hg', 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, 'policy': 'oneshot', } } def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} @pytest.mark.db -class CommonSchedulerTest(SingleDbTestFixture): - TEST_DB_NAME = 'softwareheritage-scheduler-test' - TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') - - def tearDown(self): - self.empty_tables() - super().tearDown() - - def empty_tables(self, whitelist=["priority_ratio"]): - query = """SELECT table_name FROM information_schema.tables - WHERE table_schema = %%s and - table_name not in (%s) - """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) - self.cursor.execute(query, ('public', )) - - tables = set(table for (table,) in self.cursor.fetchall()) - - for table in tables: - self.cursor.execute('truncate table %s cascade' % table) - self.conn.commit() +class TestScheduler: + def test_get_priority_ratios(self, swh_scheduler): + assert swh_scheduler.get_priority_ratios() == { + 'high': 0.5, + 'normal': 0.3, + 'low': 0.2, + } - def test_add_task_type(self): + def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES['git'] - self.backend.create_task_type(tt) - self.assertEqual(tt, self.backend.get_task_type(tt['type'])) - with self.assertRaisesRegex(psycopg2.IntegrityError, - r'\(type\)=\(%s\)' % tt['type']): - self.backend.create_task_type(tt) + swh_scheduler.create_task_type(tt) + assert tt == swh_scheduler.get_task_type(tt['type']) + with pytest.raises(psycopg2.IntegrityError, + match=r'\(type\)=\(%s\)' % tt['type']): + swh_scheduler.create_task_type(tt) tt2 = TASK_TYPES['hg'] - self.backend.create_task_type(tt2) - self.assertEqual(tt, self.backend.get_task_type(tt['type'])) - self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) + swh_scheduler.create_task_type(tt2) + assert tt == swh_scheduler.get_task_type(tt['type']) + assert tt2 == swh_scheduler.get_task_type(tt2['type']) - def test_get_task_types(self): + def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg'] - self.backend.create_task_type(tt) - self.backend.create_task_type(tt2) - self.assertCountEqual([tt2, tt], self.backend.get_task_types()) - - @staticmethod - def _task_from_template(template, next_run, priority, *args, **kwargs): - ret = copy.deepcopy(template) - ret['next_run'] = next_run - if priority: - ret['priority'] = priority - if args: - ret['arguments']['args'] = list(args) - if kwargs: - ret['arguments']['kwargs'] = kwargs - return ret - - def _pop_priority(self, priorities): - if not priorities: - return None - for priority, remains in priorities.items(): - if remains > 0: - priorities[priority] = remains - 1 - return priority - return None - - def _tasks_from_template(self, template, max_timestamp, num, - num_priority=0, priorities=None): - if num_priority and priorities: - priorities = { - priority: ratio * num_priority - for priority, ratio in priorities.items() - } - - tasks = [] - for i in range(num + num_priority): - priority = self._pop_priority(priorities) - tasks.append(self._task_from_template( - template, - max_timestamp - datetime.timedelta(microseconds=i), - priority, - 'argument-%03d' % i, - **{'kwarg%03d' % i: 'bogus-kwarg'} - )) - return tasks - - def _create_task_types(self): - for tt in TASK_TYPES.values(): - self.backend.create_task_type(tt) - - def test_create_tasks(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + swh_scheduler.create_task_type(tt) + swh_scheduler.create_task_type(tt2) + actual_task_types = swh_scheduler.get_task_types() + assert tt in actual_task_types + assert tt2 in actual_task_types + + def test_create_tasks(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) num_tasks_priority = 100 tasks_1 = self._tasks_from_template( TEMPLATES['git'], utcnow(), 100) tasks_2 = self._tasks_from_template( TEMPLATES['hg'], utcnow(), 100, num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids - ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) + ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids - ret = self.backend.create_tasks(tasks) + ret = swh_scheduler.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results - self.assertEqual(set_ret, set_ret1) - self.assertEqual(len(ret), len(ret1)) + assert set_ret == set_ret1 + assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task['type'].split('-')[-1]] - self.assertNotIn(task['id'], ids) - self.assertEqual(task['status'], 'next_run_not_scheduled') - self.assertEqual(task['current_interval'], - task_type['default_interval']) - self.assertEqual(task['policy'], orig_task.get('policy', - 'recurring')) + assert task['id'] not in ids + assert task['status'] == 'next_run_not_scheduled' + assert task['current_interval'] == task_type['default_interval'] + assert task['policy'] == orig_task.get('policy', 'recurring') priority = task.get('priority') if priority: actual_priorities[priority] += 1 - self.assertEqual(task['retries_left'], - task_type['num_retries'] or 0) + assert task['retries_left'] == (task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] del task['current_interval'] del task['retries_left'] if 'policy' not in orig_task: del task['policy'] if 'priority' not in orig_task: del task['priority'] - self.assertEqual(task, orig_task) + assert task == orig_task - self.assertEqual(dict(actual_priorities), { + assert dict(actual_priorities) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() - }) + } - def test_peek_ready_tasks_no_priority(self): - self._create_task_types() + def test_peek_ready_tasks_no_priority(self, swh_scheduler): + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) - ready_tasks = self.backend.peek_ready_tasks(task_type) - self.assertEqual(len(ready_tasks), len(tasks)) + ready_tasks = swh_scheduler.peek_ready_tasks(task_type) + assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): - self.assertLessEqual(ready_tasks[i]['next_run'], - ready_tasks[i+1]['next_run']) + assert ready_tasks[i]['next_run'] <= ready_tasks[i+1]['next_run'] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) - ready_tasks_limited = self.backend.peek_ready_tasks( + ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=limit) - self.assertEqual(len(ready_tasks_limited), limit) - self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) + assert len(ready_tasks_limited) == limit + assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit-1]['next_run'] - ready_tasks_timestamped = self.backend.peek_ready_tasks( + ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: - self.assertLessEqual(ready_task['next_run'], max_ts) + assert ready_task['next_run'] <= max_ts # Make sure we get proper behavior for the first ready tasks - self.assertCountEqual( - ready_tasks[:len(ready_tasks_timestamped)], - ready_tasks_timestamped, - ) + assert ready_tasks[:len(ready_tasks_timestamped)] \ + == ready_tasks_timestamped # Limit by both - ready_tasks_both = self.backend.peek_ready_tasks( + ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) - self.assertLessEqual(len(ready_tasks_both), limit//3) + assert len(ready_tasks_both) <= limit//3 for ready_task in ready_tasks_both: - self.assertLessEqual(ready_task['next_run'], max_ts) - self.assertIn(ready_task, ready_tasks[:limit//3]) - - def _priority_ratio(self): - self.cursor.execute('select id, ratio from priority_ratio') - priority_ratio = {} - for row in self.cursor.fetchall(): - priority_ratio[row[0]] = row[1] - return priority_ratio - - def test_peek_ready_tasks_mixed_priorities(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + assert ready_task['next_run'] <= max_ts + assert ready_task in ready_tasks[:limit//3] + + def _priority_ratio(self, swh_scheduler): + return swh_scheduler.get_priority_ratios() + + def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) # take all available tasks - ready_tasks = self.backend.peek_ready_tasks( + ready_tasks = swh_scheduler.peek_ready_tasks( task_type) - self.assertEqual(len(ready_tasks), len(tasks)) - self.assertEqual(num_tasks_priority + num_tasks_no_priority, - len(ready_tasks)) + assert len(ready_tasks) == len(tasks) + assert num_tasks_priority + num_tasks_no_priority \ + == len(ready_tasks) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get('priority') if priority: count_tasks_per_priority[priority] += 1 - self.assertEqual(dict(count_tasks_per_priority), { + assert dict(count_tasks_per_priority) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() - }) + } # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) - ready_tasks_limited = self.backend.peek_ready_tasks( + ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get('priority') count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] - self.assertTrue( - actual_prio == expected_count or - actual_prio == expected_count + 1) + assert (actual_prio == expected_count or + actual_prio == expected_count + 1) - self.assertEqual(count_tasks_per_priority[None], num_tasks) + assert count_tasks_per_priority[None] == num_tasks - def test_grab_ready_tasks(self): - priority_ratio = self._priority_ratio() - self._create_task_types() + def test_grab_ready_tasks(self, swh_scheduler): + priority_ratio = self._priority_ratio(swh_scheduler) + self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) - self.backend.create_tasks(tasks) + swh_scheduler.create_tasks(tasks) - first_ready_tasks = self.backend.peek_ready_tasks( + first_ready_tasks = swh_scheduler.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) - grabbed_tasks = self.backend.grab_ready_tasks( + grabbed_tasks = swh_scheduler.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): - self.assertEqual(peeked['status'], 'next_run_not_scheduled') + assert peeked['status'] == 'next_run_not_scheduled' del peeked['status'] - self.assertEqual(grabbed['status'], 'next_run_scheduled') + assert grabbed['status'] == 'next_run_scheduled' del grabbed['status'] - self.assertEqual(peeked, grabbed) - self.assertEqual(peeked['priority'], grabbed['priority']) + assert peeked == grabbed + assert peeked['priority'] == grabbed['priority'] - def test_get_tasks(self): - self._create_task_types() + def test_get_tasks(self, swh_scheduler): + self._create_task_types(swh_scheduler) t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) - tasks = self.backend.create_tasks(tasks) + tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) - cur_tasks = tasks[:length] + cur_tasks = sorted(tasks[:length], key=lambda x: x['id']) tasks[:length] = [] - ret = self.backend.get_tasks(task['id'] for task in cur_tasks) - self.assertCountEqual(ret, cur_tasks) + ret = swh_scheduler.get_tasks(task['id'] for task in cur_tasks) + # result is not guaranteed to be sorted + ret.sort(key=lambda x: x['id']) + assert ret == cur_tasks - def test_search_tasks(self): + def test_search_tasks(self, swh_scheduler): def make_real_dicts(l): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in l] - self._create_task_types() + self._create_task_types(swh_scheduler) t = utcnow() tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) - tasks = self.backend.create_tasks(tasks) - self.assertCountEqual( - make_real_dicts(self.backend.search_tasks()), - make_real_dicts(tasks)) + tasks = swh_scheduler.create_tasks(tasks) + assert make_real_dicts(swh_scheduler.search_tasks()) \ + == make_real_dicts(tasks) - def test_filter_task_to_archive(self): + def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: - t = self.backend.end_task_run( + t = swh_scheduler.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) # Randomly update task's status per policy status_per_policy = {'recurring': 0, 'oneshot': 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] 'recurring': [(1, 'disabled'), (0, 'completed'), (0, 'next_run_not_scheduled')], 'oneshot': [(0, 'next_run_not_scheduled'), (1, 'disabled'), (1, 'completed')] } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task['policy'] _task_ids[policy].append(task['id']) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) - self.backend.disable_tasks(tasks_to_update['recurring']) + swh_scheduler.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled - self.backend.set_status_tasks( + swh_scheduler.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update - self.backend.set_status_tasks( + swh_scheduler.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) # retrieve tasks to archive after = _time.shift(days=-1).format('YYYY-MM-DD') before = utcnow().shift(days=1).format('YYYY-MM-DD') - tasks_to_archive = list(self.backend.filter_task_to_archive( + tasks_to_archive = list(swh_scheduler.filter_task_to_archive( after_ts=after, before_ts=before, limit=total_tasks)) - self.assertEqual(len(tasks_to_archive), total_tasks_filtered) + assert len(tasks_to_archive) == total_tasks_filtered actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: actual_filtered_per_status[task['task_policy']] += 1 - self.assertEqual(actual_filtered_per_status, status_per_policy) + assert actual_filtered_per_status == status_per_policy - def test_delete_archived_tasks(self): - self._create_task_types() + def test_delete_archived_tasks(self, swh_scheduler): + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: - t = self.backend.end_task_run( + t = swh_scheduler.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) - self.backend.delete_archived_tasks(_tasks) + swh_scheduler.delete_archived_tasks(_tasks) - self.cursor.execute('select count(*) from task') - tasks_count = self.cursor.fetchone() + all_tasks = [task['id'] for task in swh_scheduler.search_tasks()] + tasks_count = len(all_tasks) + tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) - self.cursor.execute('select count(*) from task_run') - tasks_run_count = self.cursor.fetchone() + assert tasks_count == total_tasks - len(_tasks) + assert tasks_run_count == total_tasks - len(_tasks) - self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) - self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) - - def test_get_task_runs_no_task(self): + def test_get_task_runs_no_task(self, swh_scheduler): '''No task exist in the scheduler's db, get_task_runs() should always return an empty list. ''' - self.assertFalse(self.backend.get_task_runs(task_ids=())) - self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3))) - self.assertFalse(self.backend.get_task_runs(task_ids=(1, 2, 3), - limit=10)) + assert not swh_scheduler.get_task_runs(task_ids=()) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), + limit=10) - def test_get_task_runs_no_task_executed(self): + def test_get_task_runs_no_task_executed(self, swh_scheduler): '''No task has been executed yet, get_task_runs() should always return an empty list. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) - self.backend.create_tasks(recurring + oneshots) + swh_scheduler.create_tasks(recurring + oneshots) - self.assertFalse(self.backend.get_task_runs( - task_ids=())) - self.assertFalse(self.backend.get_task_runs( - task_ids=(1, 2, 3))) - self.assertFalse(self.backend.get_task_runs( - task_ids=(1, 2, 3), limit=10)) + assert not swh_scheduler.get_task_runs(task_ids=()) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) + assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) - def test_get_task_runs_with_scheduled(self): + def test_get_task_runs_with_scheduled(self, swh_scheduler): '''Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) - self.assertFalse(self.backend.get_task_runs( - task_ids=[total_tasks + 1])) + assert not swh_scheduler.get_task_runs( + task_ids=[total_tasks + 1]) btask = backend_tasks[0] - runs = self.backend.get_task_runs( + runs = swh_scheduler.get_task_runs( task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) + assert len(runs) == 1 run = runs[0] - self.assertEqual(subdict(run, excl=('id',)), - {'task': btask['task'], - 'backend_id': btask['backend_id'], - 'scheduled': btask['scheduled'], - 'started': None, - 'ended': None, - 'metadata': None, - 'status': 'scheduled', - }) - - runs = self.backend.get_task_runs( + assert subdict(run, excl=('id',)) == { + 'task': btask['task'], + 'backend_id': btask['backend_id'], + 'scheduled': btask['scheduled'], + 'started': None, + 'ended': None, + 'metadata': None, + 'status': 'scheduled', + } + + runs = swh_scheduler.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks], limit=2) - self.assertEqual(len(runs), 2) + assert len(runs) == 2 - runs = self.backend.get_task_runs( + runs = swh_scheduler.get_task_runs( task_ids=[bt['task'] for bt in backend_tasks]) - self.assertEqual(len(runs), total_tasks) + assert len(runs) == total_tasks keys = ('task', 'backend_id', 'scheduled') - self.assertEqual(sorted([subdict(x, keys) for x in runs], - key=lambda x: x['task']), - backend_tasks) + assert sorted([subdict(x, keys) for x in runs], + key=lambda x: x['task']) == backend_tasks - def test_get_task_runs_with_executed(self): + def test_get_task_runs_with_executed(self, swh_scheduler): '''Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. ''' - self._create_task_types() + self._create_task_types(swh_scheduler) _time = utcnow() recurring = self._tasks_from_template( TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( TEMPLATES['hg'], _time, 12) - pending_tasks = self.backend.create_tasks(recurring + oneshots) + pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] - self.backend.mass_schedule_task_runs(backend_tasks) + swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() - self.backend.start_task_run(btask['backend_id'], - metadata={'something': 'stupid'}, - timestamp=ts) - runs = self.backend.get_task_runs(task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) - self.assertEqual(subdict(runs[0], excl=('id')), { + swh_scheduler.start_task_run(btask['backend_id'], + metadata={'something': 'stupid'}, + timestamp=ts) + runs = swh_scheduler.get_task_runs(task_ids=[btask['task']]) + assert len(runs) == 1 + assert subdict(runs[0], excl=('id')) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': ts, 'ended': None, 'metadata': {'something': 'stupid'}, 'status': 'started', - }) + } ts2 = utcnow() - self.backend.end_task_run(btask['backend_id'], - metadata={'other': 'stuff'}, - timestamp=ts2, - status='eventful') - runs = self.backend.get_task_runs(task_ids=[btask['task']]) - self.assertEqual(len(runs), 1) - self.assertEqual(subdict(runs[0], excl=('id')), { + swh_scheduler.end_task_run(btask['backend_id'], + metadata={'other': 'stuff'}, + timestamp=ts2, + status='eventful') + runs = swh_scheduler.get_task_runs(task_ids=[btask['task']]) + assert len(runs) == 1 + assert subdict(runs[0], excl=('id')) == { 'task': btask['task'], 'backend_id': btask['backend_id'], 'scheduled': btask['scheduled'], 'started': ts, 'ended': ts2, 'metadata': {'something': 'stupid', 'other': 'stuff'}, 'status': 'eventful', - }) + } + + @staticmethod + def _task_from_template(template, next_run, priority, *args, **kwargs): + ret = copy.deepcopy(template) + ret['next_run'] = next_run + if priority: + ret['priority'] = priority + if args: + ret['arguments']['args'] = list(args) + if kwargs: + ret['arguments']['kwargs'] = kwargs + return ret + + def _pop_priority(self, priorities): + if not priorities: + return None + for priority, remains in priorities.items(): + if remains > 0: + priorities[priority] = remains - 1 + return priority + return None + + def _tasks_from_template(self, template, max_timestamp, num, + num_priority=0, priorities=None): + if num_priority and priorities: + priorities = { + priority: ratio * num_priority + for priority, ratio in priorities.items() + } + tasks = [] + for i in range(num + num_priority): + priority = self._pop_priority(priorities) + tasks.append(self._task_from_template( + template, + max_timestamp - datetime.timedelta(microseconds=i), + priority, + 'argument-%03d' % i, + **{'kwarg%03d' % i: 'bogus-kwarg'} + )) + return tasks -class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): - def setUp(self): - super().setUp() - self.config = {'db': 'dbname=' + self.TEST_DB_NAME} - self.backend = get_scheduler('local', self.config) + def _create_task_types(self, scheduler): + for tt in TASK_TYPES.values(): + scheduler.create_task_type(tt) diff --git a/swh/scheduler/tests/updater/conftest.py b/swh/scheduler/tests/updater/conftest.py index 9cafcb5..87584c5 100644 --- a/swh/scheduler/tests/updater/conftest.py +++ b/swh/scheduler/tests/updater/conftest.py @@ -1,33 +1,68 @@ import pytest import glob import os +from arrow import utcnow # XXX from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.tests import SQL_DIR import swh.scheduler.tests.conftest # noqa DUMP_FILES = os.path.join(SQL_DIR, 'updater', '*.sql') @pytest.fixture -def swh_scheduler_updater(request, postgresql_proc, postgresql): +def swh_scheduler_updater(postgresql): config = { - 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( - host=postgresql_proc.host, - port=postgresql_proc.port, - user='postgres', - dbname='tests') + 'db': postgresql.dsn, } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() backend = SchedulerUpdaterBackend(**config) return backend + + +def make_event(event_type, name, origin_type): + return { + 'type': event_type, + 'repo': { + 'name': name, + }, + 'created_at': utcnow(), + 'origin_type': origin_type, + } + + +def make_simple_event(event_type, name, origin_type): + return { + 'type': event_type, + 'url': 'https://fakeurl/%s' % name, + 'origin_type': origin_type, + 'created_at': utcnow(), + } + + +def make_events(events): + for event_type, repo_name, origin_type in events: + yield make_event(event_type, repo_name, origin_type) + + +def make_incomplete_event(event_type, name, origin_type, + missing_data_key): + event = make_event(event_type, name, origin_type) + del event[missing_data_key] + return event + + +def make_incomplete_events(events): + for event_type, repo_name, origin_type, missing_data_key in events: + yield make_incomplete_event(event_type, repo_name, + origin_type, missing_data_key) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index 2e084e9..825f4a2 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,153 +1,152 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os -import unittest from glob import glob import pytest +from pytest_postgresql.factories import postgresql as pg_fixture_factory +from os.path import join from swh.core.utils import numfile_sortkey as sortkey -from swh.core.db.tests.db_testing import DbTestFixture from swh.scheduler.tests import SQL_DIR from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.writer import UpdaterWriter -from . import UpdaterTestUtil - - -@pytest.mark.db -class CommonSchedulerTest(DbTestFixture): - TEST_SCHED_DB = 'softwareheritage-scheduler-test' - TEST_SCHED_DUMP = os.path.join(SQL_DIR, '*.sql') - - TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' - TEST_SCHED_UPDATER_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') - - @classmethod - def setUpClass(cls): - cls.add_db(cls.TEST_SCHED_DB, - [(sqlfn, 'psql') for sqlfn in - sorted(glob(cls.TEST_SCHED_DUMP), key=sortkey)]) - cls.add_db(cls.TEST_SCHED_UPDATER_DB, - [(sqlfn, 'psql') for sqlfn in - sorted(glob(cls.TEST_SCHED_UPDATER_DUMP), key=sortkey)]) - super().setUpClass() - - def tearDown(self): - self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) - self.reset_db_tables(self.TEST_SCHED_DB, - excluded=['task_type', 'priority_ratio']) - super().tearDown() - - -class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, - unittest.TestCase): - def setUp(self): - super().setUp() - - config = { - 'scheduler': { - 'cls': 'local', - 'args': { - 'db': 'dbname=softwareheritage-scheduler-test', - }, - }, - 'scheduler_updater': { - 'cls': 'local', - 'args': { - 'db': - 'dbname=softwareheritage-scheduler-updater-test', - 'cache_read_limit': 5, - }, +from .conftest import make_simple_event + + +pg_scheduler = pg_fixture_factory('postgresql_proc', 'scheduler') +pg_updater = pg_fixture_factory('postgresql_proc', 'updater') + + +def pg_sched_fact(dbname, sqldir): + @pytest.fixture + def pg_scheduler_db(request): + pg = request.getfixturevalue('pg_%s' % dbname) + dump_files = sorted(glob(os.path.join(sqldir, '*.sql')), + key=sortkey) + with pg.cursor() as cur: + for fname in dump_files: + with open(fname) as fobj: + sql = fobj.read().replace('concurrently', '') + cur.execute(sql) + pg.commit() + yield pg + + return pg_scheduler_db + + +scheduler_db = pg_sched_fact('scheduler', SQL_DIR) +updater_db = pg_sched_fact('updater', join(SQL_DIR, 'updater')) + + +@pytest.fixture +def swh_updater_writer(scheduler_db, updater_db): + config = { + 'scheduler': { + 'cls': 'local', + 'args': { + 'db': scheduler_db.dsn, }, - 'updater_writer': { - 'pause': 0.1, - 'verbose': False, + }, + 'scheduler_updater': { + 'cls': 'local', + 'args': { + 'db': updater_db.dsn, + 'cache_read_limit': 5, }, - } - self.writer = UpdaterWriter(**config) - self.scheduler_backend = self.writer.scheduler_backend - self.scheduler_updater_backend = self.writer.scheduler_updater_backend + }, + 'updater_writer': { + 'pause': 0.1, + 'verbose': False, + }, + } + return UpdaterWriter(**config) + + +def test_run_ko(swh_updater_writer): + """Only git tasks are supported for now, other types are dismissed. + + """ + scheduler = swh_updater_writer.scheduler_backend + updater = swh_updater_writer.scheduler_updater_backend - def test_run_ko(self): - """Only git tasks are supported for now, other types are dismissed. + ready_events = [ + SWHEvent( + make_simple_event(event_type, 'origin-%s' % i, + 'svn')) + for i, event_type in enumerate(LISTENED_EVENTS) + ] - """ - ready_events = [ - SWHEvent( - self._make_simple_event(event_type, 'origin-%s' % i, - 'svn')) - for i, event_type in enumerate(LISTENED_EVENTS) - ] + updater.cache_put(ready_events) + list(updater.cache_read()) - expected_length = len(ready_events) + r = scheduler.peek_ready_tasks('load-git') - self.scheduler_updater_backend.cache_put(ready_events) - data = list(self.scheduler_updater_backend.cache_read()) - self.assertEqual(len(data), expected_length) + # first read on an empty scheduling db results with nothing in it + assert not r - r = self.scheduler_backend.peek_ready_tasks('load-git') + # Read from cache to scheduler db + swh_updater_writer.run() - # first read on an empty scheduling db results with nothing in it - self.assertEqual(len(r), 0) + r = scheduler.peek_ready_tasks('load-git') - # Read from cache to scheduler db - self.writer.run() + # other reads after writes are still empty since it's not supported + assert not r - r = self.scheduler_backend.peek_ready_tasks('load-git') - # other reads after writes are still empty since it's not supported - self.assertEqual(len(r), 0) +def test_run_ok(swh_updater_writer): + """Only git origin are supported for now - def test_run_ok(self): - """Only git origin are supported for now + """ + scheduler = swh_updater_writer.scheduler_backend + updater = swh_updater_writer.scheduler_updater_backend - """ - ready_events = [ - SWHEvent( - self._make_simple_event(event_type, 'origin-%s' % i, 'git')) - for i, event_type in enumerate(LISTENED_EVENTS) - ] + ready_events = [ + SWHEvent( + make_simple_event(event_type, 'origin-%s' % i, 'git')) + for i, event_type in enumerate(LISTENED_EVENTS) + ] - expected_length = len(ready_events) + expected_length = len(ready_events) - self.scheduler_updater_backend.cache_put(ready_events) + updater.cache_put(ready_events) - data = list(self.scheduler_updater_backend.cache_read()) - self.assertEqual(len(data), expected_length) + data = list(updater.cache_read()) + assert len(data) == expected_length - r = self.scheduler_backend.peek_ready_tasks('load-git') + r = scheduler.peek_ready_tasks('load-git') - # first read on an empty scheduling db results with nothing in it - self.assertEqual(len(r), 0) + # first read on an empty scheduling db results with nothing in it + assert not r - # Read from cache to scheduler db - self.writer.run() + # Read from cache to scheduler db + swh_updater_writer.run() - # now, we should have scheduling task ready - r = self.scheduler_backend.peek_ready_tasks('load-git') + # now, we should have scheduling task ready + r = scheduler.peek_ready_tasks('load-git') - self.assertEqual(len(r), expected_length) + assert len(r) == expected_length - # Check the task has been scheduled - for t in r: - self.assertEqual(t['type'], 'load-git') - self.assertEqual(t['priority'], 'normal') - self.assertEqual(t['policy'], 'oneshot') - self.assertEqual(t['status'], 'next_run_not_scheduled') + # Check the task has been scheduled + for t in r: + assert t['type'] == 'load-git' + assert t['priority'] == 'normal' + assert t['policy'] == 'oneshot' + assert t['status'] == 'next_run_not_scheduled' - # writer has nothing to do now - self.writer.run() + # writer has nothing to do now + swh_updater_writer.run() - # so no more data in cache - data = list(self.scheduler_updater_backend.cache_read()) + # so no more data in cache + data = list(updater.cache_read()) - self.assertEqual(len(data), 0) + assert not data - # provided, no runner is ran, still the same amount of scheduling tasks - r = self.scheduler_backend.peek_ready_tasks('load-git') + # provided, no runner is ran, still the same amount of scheduling tasks + r = scheduler.peek_ready_tasks('load-git') - self.assertEqual(len(r), expected_length) + assert len(r) == expected_length diff --git a/version.txt b/version.txt index 5f7bd5b..ec85cf3 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.61-0-g5955c8d \ No newline at end of file +v0.0.62-0-g787c7a9 \ No newline at end of file