diff --git a/PKG-INFO b/PKG-INFO index 9cdd3b7..2e6338d 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.40 +Version: 0.0.41 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everything should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index e3c44ce..f403adf 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1 @@ -swh.core >= 0.0.48 +swh.core >= 0.0.51 diff --git a/setup.py b/setup.py index 65d2792..15163f0 100755 --- a/setup.py +++ b/setup.py @@ -1,69 +1,69 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, 'README.md'), encoding='utf-8') as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = 'requirements-%s.txt' % name else: reqf = 'requirements.txt' requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.scheduler', description='Software Heritage Scheduler', long_description=long_description, long_description_content_type='text/markdown', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSCH/', packages=find_packages(), scripts=['bin/swh-worker-control'], setup_requires=['vcversioner'], install_requires=parse_requirements() + parse_requirements('swh'), extras_require={'testing': parse_requirements('test')}, vcversioner={}, include_package_data=True, entry_points=''' [console_scripts] - swh-scheduler=swh.scheduler.cli:cli + swh-scheduler=swh.scheduler.cli:main ''', classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', 'Funding': 'https://www.softwareheritage.org/donate', 'Source': 'https://forge.softwareheritage.org/source/swh-scheduler', }, ) diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 9cdd3b7..2e6338d 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.40 +Version: 0.0.41 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everything should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index 4a3dbbc..c54e63a 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,54 +1,55 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt bin/swh-worker-control swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/sql/30-swh-schema.sql swh/scheduler/sql/40-swh-data.sql swh/scheduler/sql/updater/10-swh-init.sql swh/scheduler/sql/updater/30-swh-schema.sql swh/scheduler/sql/updater/40-swh-func.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/updater/__init__.py +swh/scheduler/tests/updater/conftest.py swh/scheduler/tests/updater/test_backend.py swh/scheduler/tests/updater/test_consumer.py swh/scheduler/tests/updater/test_events.py swh/scheduler/tests/updater/test_ghtorrent.py swh/scheduler/tests/updater/test_writer.py swh/scheduler/updater/__init__.py swh/scheduler/updater/backend.py swh/scheduler/updater/consumer.py swh/scheduler/updater/events.py swh/scheduler/updater/writer.py swh/scheduler/updater/ghtorrent/__init__.py swh/scheduler/updater/ghtorrent/cli.py swh/scheduler/updater/ghtorrent/fake.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/entry_points.txt b/swh.scheduler.egg-info/entry_points.txt index 6d27dbf..352ced3 100644 --- a/swh.scheduler.egg-info/entry_points.txt +++ b/swh.scheduler.egg-info/entry_points.txt @@ -1,4 +1,4 @@ [console_scripts] - swh-scheduler=swh.scheduler.cli:cli + swh-scheduler=swh.scheduler.cli:main \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index c7d5f70..d7082c4 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,15 +1,15 @@ arrow celery>=4 Click elasticsearch>5.4 flask kombu psycopg2 vcversioner -swh.core>=0.0.48 +swh.core>=0.0.51 [testing] hypothesis pytest<4 pytest-postgresql celery>=4 diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py index aae1885..8b96133 100644 --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -1,58 +1,68 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Percentage of tasks with priority to schedule PRIORITY_SLOT = 0.6 +DEFAULT_CONFIG_PATH = 'backend/scheduler' +DEFAULT_CONFIG = { + 'scheduler': ('dict', { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-dev', + }, + }) +} + def compute_nb_tasks_from(num_tasks): """Compute and returns the tuple, number of tasks without priority, number of tasks with priority. Args: num_tasks (int): Returns: tuple number of tasks without priority (int), number of tasks with priority (int) """ if not num_tasks: return None, None return (int((1 - PRIORITY_SLOT) * num_tasks), int(PRIORITY_SLOT * num_tasks)) def get_scheduler(cls, args={}): """ Get a scheduler object of class `scheduler_class` with arguments `scheduler_args`. Args: scheduler (dict): dictionary with keys: cls (str): scheduler's class, either 'local' or 'remote' args (dict): dictionary with keys, default to empty. Returns: an instance of swh.scheduler, either local or remote: local: swh.scheduler.backend.SchedulerBackend remote: swh.scheduler.api.client.RemoteScheduler Raises: ValueError if passed an unknown storage class. """ if cls == 'remote': from .api.client import RemoteScheduler as SchedulerBackend elif cls == 'local': from .backend import SchedulerBackend else: raise ValueError('Unknown swh.scheduler class `%s`' % cls) return SchedulerBackend(**args) diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 1229cfd..bd48a93 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,148 +1,192 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from flask import request +from flask import request, Flask from swh.core import config -from swh.scheduler import get_scheduler as get_scheduler_from -from swh.core.api import (SWHServerAPIApp, decode_request, +from swh.core.api import (decode_request, error_handler, encode_data_server as encode_data) -DEFAULT_CONFIG_PATH = 'backend/scheduler' -DEFAULT_CONFIG = { - 'scheduler': ('dict', { - 'cls': 'local', - 'args': { - 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', - }, - }) -} +from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter +from swh.scheduler import get_scheduler as get_scheduler_from +from swh.scheduler import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH -app = SWHServerAPIApp(__name__) +app = Flask(__name__) scheduler = None @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) def get_sched(): global scheduler if not scheduler: scheduler = get_scheduler_from(**app.config['scheduler']) return scheduler +def has_no_empty_params(rule): + return len(rule.defaults or ()) >= len(rule.arguments or ()) + + @app.route('/') +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def index(): return 'SWH Scheduler API server' -@app.route('/close_connection', methods=['POST']) +@app.route('/close_connection', methods=['GET', 'POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def close_connection(): - return encode_data(get_sched().close_connection()) + return get_sched().close_connection() @app.route('/set_status_tasks', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def set_status_tasks(): - return encode_data(get_sched().set_status_tasks(**decode_request(request))) + return get_sched().set_status_tasks(**decode_request(request)) @app.route('/create_task_type', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def create_task_type(): - return encode_data(get_sched().create_task_type(**decode_request(request))) + return get_sched().create_task_type(**decode_request(request)) @app.route('/get_task_type', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def get_task_type(): - return encode_data(get_sched().get_task_type(**decode_request(request))) + return get_sched().get_task_type(**decode_request(request)) -@app.route('/get_task_types', methods=['POST']) +@app.route('/get_task_types', methods=['GET', 'POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def get_task_types(): - return encode_data(get_sched().get_task_types(**decode_request(request))) + return get_sched().get_task_types(**decode_request(request)) @app.route('/create_tasks', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def create_tasks(): - return encode_data(get_sched().create_tasks(**decode_request(request))) + return get_sched().create_tasks(**decode_request(request)) @app.route('/disable_tasks', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def disable_tasks(): - return encode_data(get_sched().disable_tasks(**decode_request(request))) + return get_sched().disable_tasks(**decode_request(request)) @app.route('/get_tasks', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def get_tasks(): - return encode_data(get_sched().get_tasks(**decode_request(request))) + return get_sched().get_tasks(**decode_request(request)) @app.route('/search_tasks', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def search_tasks(): - return encode_data(get_sched().search_tasks(**decode_request(request))) + return get_sched().search_tasks(**decode_request(request)) @app.route('/peek_ready_tasks', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def peek_ready_tasks(): - return encode_data(get_sched().peek_ready_tasks(**decode_request(request))) + return get_sched().peek_ready_tasks(**decode_request(request)) @app.route('/grab_ready_tasks', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def grab_ready_tasks(): - return encode_data(get_sched().grab_ready_tasks(**decode_request(request))) + return get_sched().grab_ready_tasks(**decode_request(request)) @app.route('/schedule_task_run', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def schedule_task_run(): - return encode_data(get_sched().schedule_task_run( - **decode_request(request))) + return get_sched().schedule_task_run(**decode_request(request)) @app.route('/mass_schedule_task_runs', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def mass_schedule_task_runs(): - return encode_data( - get_sched().mass_schedule_task_runs(**decode_request(request))) + return get_sched().mass_schedule_task_runs(**decode_request(request)) @app.route('/start_task_run', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def start_task_run(): - return encode_data(get_sched().start_task_run(**decode_request(request))) + return get_sched().start_task_run(**decode_request(request)) @app.route('/end_task_run', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def end_task_run(): - return encode_data(get_sched().end_task_run(**decode_request(request))) + return get_sched().end_task_run(**decode_request(request)) @app.route('/filter_task_to_archive', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def filter_task_to_archive(): - return encode_data( - get_sched().filter_task_to_archive(**decode_request(request))) + return get_sched().filter_task_to_archive(**decode_request(request)) @app.route('/delete_archived_tasks', methods=['POST']) +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) def delete_archived_tasks(): - return encode_data( - get_sched().delete_archived_tasks(**decode_request(request))) + return get_sched().delete_archived_tasks(**decode_request(request)) + + +@app.route("/site-map") +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) +def site_map(): + links = [] + sched = get_sched() + for rule in app.url_map.iter_rules(): + if has_no_empty_params(rule) and hasattr(sched, rule.endpoint): + links.append(dict( + rule=rule.rule, + description=getattr(sched, rule.endpoint).__doc__)) + # links is now a list of url, endpoint tuples + return links def run_from_webserver(environ, start_response, config_path=DEFAULT_CONFIG_PATH): """Run the WSGI app from the webserver, loading the configuration.""" cfg = config.load_named_config(config_path, DEFAULT_CONFIG) app.config.update(cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) if __name__ == '__main__': print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 09171c2..0770c00 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,590 +1,456 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import binascii -import datetime -from functools import wraps import json -import tempfile import logging from arrow import Arrow, utcnow -import psycopg2 +import psycopg2.pool import psycopg2.extras from psycopg2.extensions import AsIs -from swh.core.config import SWHConfig +from swh.core.db import BaseDb +from swh.core.db.common import db_transaction, db_transaction_generator logger = logging.getLogger(__name__) def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) -def autocommit(fn): - @wraps(fn) - def wrapped(self, *args, **kwargs): - autocommit = False - if 'cursor' not in kwargs or not kwargs['cursor']: - autocommit = True - kwargs['cursor'] = self.cursor() +def format_query(query, keys): + """Format a query with the given keys""" - try: - ret = fn(self, *args, **kwargs) - except Exception: - if autocommit: - self.rollback() - raise + query_keys = ', '.join(keys) + placeholders = ', '.join(['%s'] * len(keys)) - if autocommit: - self.commit() + return query.format(keys=query_keys, placeholders=placeholders) - return ret - return wrapped - - -class DbBackend: - """Mixin intended to be used within scheduling db backend classes - - cf. swh.scheduler.backend.SchedulerBackend, and - swh.scheduler.updater.backend.SchedulerUpdaterBackend +class SchedulerBackend: + """Backend for the Software Heritage scheduling database. """ - def reconnect(self): - if not self.db or self.db.closed: - self.db = psycopg2.connect( - dsn=self.db_conn_dsn, - cursor_factory=psycopg2.extras.RealDictCursor, - ) - - def cursor(self): - """Return a fresh cursor on the database, with auto-reconnection in - case of failure + def __init__(self, db, min_pool_conns=1, max_pool_conns=10): """ - cur = None - - # Get a fresh cursor and reconnect at most three times - tries = 0 - while True: - tries += 1 - try: - cur = self.db.cursor() - cur.execute('select 1') - break - except psycopg2.OperationalError: - if tries < 3: - self.reconnect() - else: - raise - - return cur - - def commit(self): - """Commit a transaction""" - self.db.commit() - - def rollback(self): - """Rollback a transaction""" - self.db.rollback() - - def close_connection(self): - """Close db connection""" - if self.db and not self.db.closed: - self.db.close() - - def _format_query(self, query, keys): - """Format a query with the given keys""" - - query_keys = ', '.join(keys) - placeholders = ', '.join(['%s'] * len(keys)) - - return query.format(keys=query_keys, placeholders=placeholders) - - def _format_multiquery(self, query, keys, values): - """Format a query with placeholders generated for multiple values""" - query_keys = ', '.join(keys) - placeholders = '), ('.join( - [', '.join(['%s'] * len(keys))] * len(values) - ) - ret_values = sum([[value[key] for key in keys] - for value in values], []) - - return ( - query.format(keys=query_keys, placeholders=placeholders), - ret_values, - ) + Args: + db_conn: either a libpq connection string, or a psycopg2 connection - def copy_to(self, items, tblname, columns, default_columns={}, - cursor=None, item_cb=None): - def escape(data): - if data is None: - return '' - if isinstance(data, bytes): - return '\\x%s' % binascii.hexlify(data).decode('ascii') - elif isinstance(data, str): - return '"%s"' % data.replace('"', '""') - elif isinstance(data, (datetime.datetime, Arrow)): - # We escape twice to make sure the string generated by - # isoformat gets escaped - return escape(data.isoformat()) - elif isinstance(data, dict): - return escape(json.dumps(data)) - elif isinstance(data, list): - return escape("{%s}" % ','.join(escape(d) for d in data)) - elif isinstance(data, psycopg2.extras.Range): - # We escape twice here too, so that we make sure - # everything gets passed to copy properly - return escape( - '%s%s,%s%s' % ( - '[' if data.lower_inc else '(', - '-infinity' if data.lower_inf else escape(data.lower), - 'infinity' if data.upper_inf else escape(data.upper), - ']' if data.upper_inc else ')', - ) - ) - else: - # We don't escape here to make sure we pass literals properly - return str(data) - with tempfile.TemporaryFile('w+') as f: - for d in items: - if item_cb is not None: - item_cb(d) - line = [] - for k in columns: - v = d.get(k) - if not v: - v = default_columns.get(k) - v = escape(v) - line.append(v) - f.write(','.join(line)) - f.write('\n') - f.seek(0) - cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( - tblname, ', '.join(columns)), f) - - -class SchedulerBackend(SWHConfig, DbBackend): - """Backend for the Software Heritage scheduling database. + """ + if isinstance(db, psycopg2.extensions.connection): + self._pool = None + self._db = BaseDb(db) + else: + self._pool = psycopg2.pool.ThreadedConnectionPool( + min_pool_conns, max_pool_conns, db, + cursor_factory=psycopg2.extras.RealDictCursor, + ) + self._db = None - """ - CONFIG_BASE_FILENAME = 'scheduler' - DEFAULT_CONFIG = { - 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), - } - - def __init__(self, **override_config): - super().__init__() - self.config = self.parse_config_file(global_config=False) - self.config.update(override_config) - self.db = None - self.db_conn_dsn = self.config['scheduling_db'] - self.reconnect() - logger.debug('SchedulerBackend config=%s' % self.config) + def get_db(self): + if self._db: + return self._db + return BaseDb.from_pool(self._pool) task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay', ] - @autocommit - def create_task_type(self, task_type, cursor=None): + @db_transaction() + def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] - query = self._format_query( + query = format_query( """insert into task_type ({keys}) values ({placeholders})""", keys) - cursor.execute(query, [task_type[key] for key in keys]) + cur.execute(query, [task_type[key] for key in keys]) - @autocommit - def get_task_type(self, task_type_name, cursor=None): + @db_transaction() + def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" - query = self._format_query( + query = format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) - cursor.execute(query, (task_type_name,)) + cur.execute(query, (task_type_name,)) + return cur.fetchone() - ret = cursor.fetchone() - - return ret - - @autocommit - def get_task_types(self, cursor=None): - query = self._format_query( + @db_transaction() + def get_task_types(self, db=None, cur=None): + """Retrieve all registered task types""" + query = format_query( "select {keys} from task_type", self.task_type_keys, ) - cursor.execute(query) - ret = cursor.fetchall() - return ret + cur.execute(query) + return cur.fetchall() task_create_keys = [ 'type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority' ] task_keys = task_create_keys + ['id', 'current_interval', 'status'] - @autocommit - def create_tasks(self, tasks, policy='recurring', cursor=None): + @db_transaction() + def create_tasks(self, tasks, policy='recurring', db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ - cursor.execute('select swh_scheduler_mktemp_task()') - self.copy_to(tasks, 'tmp_task', self.task_create_keys, - default_columns={ - 'policy': policy, - 'status': 'next_run_not_scheduled' - }, - cursor=cursor) - query = self._format_query( + cur.execute('select swh_scheduler_mktemp_task()') + db.copy_to(tasks, 'tmp_task', self.task_create_keys, + default_values={ + 'policy': policy, + 'status': 'next_run_not_scheduled' + }, + cur=cur) + query = format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) - cursor.execute(query) - return cursor.fetchall() + cur.execute(query) + return cur.fetchall() - @autocommit - def set_status_tasks(self, task_ids, - status='disabled', next_run=None, cursor=None): + @db_transaction() + def set_status_tasks(self, task_ids, status='disabled', next_run=None, + db=None, cur=None): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args = [status] if next_run: query.append(', next_run = %s') args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) - cursor.execute(''.join(query), args) + cur.execute(''.join(query), args) - @autocommit - def disable_tasks(self, task_ids, cursor=None): + @db_transaction() + def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" - return self.set_status_tasks(task_ids) + return self.set_status_tasks(task_ids, db=db, cur=cur) - @autocommit + @db_transaction() def search_tasks(self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, - limit=None, cursor=None): + limit=None, db=None, cur=None): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append('id = %s') else: where.append('id in %s') task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append('type = %s') else: where.append('type in %s') task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append('status = %s') else: where.append('status in %s') status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append('priority = %s') else: priority = tuple(priority) where.append('priority in %s') args.append(priority) if policy: where.append('policy = %s') args.append(policy) if before: where.append('next_run <= %s') args.append(before) if after: where.append('next_run >= %s') args.append(after) query = 'select * from task where ' + ' and '.join(where) if limit: query += ' limit %s :: bigint' args.append(limit) - cursor.execute(query, args) - return cursor.fetchall() + cur.execute(query, args) + return cur.fetchall() - @autocommit - def get_tasks(self, task_ids, cursor=None): + @db_transaction() + def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" - query = self._format_query('select {keys} from task where id in %s', - self.task_keys) - cursor.execute(query, (tuple(task_ids),)) - return cursor.fetchall() + query = format_query('select {keys} from task where id in %s', + self.task_keys) + cur.execute(query, (tuple(task_ids),)) + return cur.fetchall() - @autocommit + @db_transaction() def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, - cursor=None): + db=None, cur=None): """Fetch the list of ready tasks Args: task_type (str): filtering task per their type timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks (with no priority) num_tasks_priority (int): only peek at num_tasks_priority tasks (with priority) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( '''select * from swh_scheduler_peek_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)''', (task_type, timestamp, num_tasks, num_tasks_priority) ) - logger.debug('PEEK %s => %s' % (task_type, cursor.rowcount)) - return cursor.fetchall() + logger.debug('PEEK %s => %s' % (task_type, cur.rowcount)) + return cur.fetchall() - @autocommit + @db_transaction() def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, - num_tasks_priority=None, cursor=None): + num_tasks_priority=None, db=None, cur=None): """Fetch the list of ready tasks, and mark them as scheduled Args: task_type (str): filtering task per their type timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks (with no priority) num_tasks_priority (int): only grab oneshot num_tasks tasks (with priorities) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( '''select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)''', (task_type, timestamp, num_tasks, num_tasks_priority) ) - logger.debug('GRAB %s => %s' % (task_type, cursor.rowcount)) - return cursor.fetchall() + logger.debug('GRAB %s => %s' % (task_type, cur.rowcount)) + return cur.fetchall() task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] - @autocommit + @db_transaction() def schedule_task_run(self, task_id, backend_id, metadata=None, - timestamp=None, cursor=None): + timestamp=None, db=None, cur=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) - return cursor.fetchone() + return cur.fetchone() - @autocommit - def mass_schedule_task_runs(self, task_runs, cursor=None): + @db_transaction() + def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ - cursor.execute('select swh_scheduler_mktemp_task_run()') - self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, - cursor=cursor) - cursor.execute('select swh_scheduler_schedule_task_run_from_temp()') + cur.execute('select swh_scheduler_mktemp_task_run()') + db.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, + cur=cur) + cur.execute('select swh_scheduler_schedule_task_run_from_temp()') - @autocommit + @db_transaction() def start_task_run(self, backend_id, metadata=None, timestamp=None, - cursor=None): + db=None, cur=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) - return cursor.fetchone() + return cur.fetchone() - @autocommit + @db_transaction() def end_task_run(self, backend_id, status, metadata=None, timestamp=None, - result=None, cursor=None): + result=None, db=None, cur=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() - cursor.execute( + cur.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) + return cur.fetchone() - return cursor.fetchone() - - @autocommit + @db_transaction_generator() def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, - cursor=None): + db=None, cur=None): """Returns the list of task/task_run prior to a given date to archive. """ last_task_run_id = None while True: row = None - cursor.execute( + cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit) ) - for row in cursor: + for row in cur: # nested type index does not accept bare values # transform it as a dict to comply with this row['arguments']['args'] = { i: v for i, v in enumerate(row['arguments']['args']) } kwargs = row['arguments']['kwargs'] row['arguments']['kwargs'] = json.dumps(kwargs) yield row if not row: break _id = row.get('task_id') _task_run_id = row.get('task_run_id') if last_id == _id and last_task_run_id == _task_run_id: break last_id = _id last_task_run_id = _task_run_id - @autocommit - def delete_archived_tasks(self, task_ids, cursor=None): + @db_transaction() + def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id['task_id']) _task_run_ids.append(task_id['task_run_id']) - cursor.execute( + cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids)) diff --git a/swh/scheduler/backend_es.py b/swh/scheduler/backend_es.py index fccfaf6..3c24bd9 100644 --- a/swh/scheduler/backend_es.py +++ b/swh/scheduler/backend_es.py @@ -1,168 +1,166 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Elastic Search backend """ - +from copy import deepcopy from swh.core import utils -from swh.core.config import SWHConfig from elasticsearch import Elasticsearch from elasticsearch import helpers -class SWHElasticSearchClient(SWHConfig): - CONFIG_BASE_FILENAME = 'backend/elastic' - - DEFAULT_CONFIG = { - 'storage_nodes': ('[dict]', [{'host': 'localhost', 'port': 9200}]), - 'index_name_prefix': ('str', 'swh-tasks'), - 'client_options': ('dict', { +DEFAULT_CONFIG = { + 'elastic_search': { + 'storage_nodes': {'host': 'localhost', 'port': 9200}, + 'index_name_prefix': 'swh-tasks', + 'client_options': { 'sniff_on_start': False, 'sniff_on_connection_fail': True, 'http_compress': False, - }) - } + }, + }, +} - def __init__(self, **config): - if config: - self.config = config - else: - self.config = self.parse_config_file() - options = self.config['client_options'] +class SWHElasticSearchClient: + def __init__(self, **config): + self.config = deepcopy(DEFAULT_CONFIG) + self.config.update(config) + es_conf = self.config['elastic_search'] + options = es_conf.get('client_options', {}) self.storage = Elasticsearch( # nodes to use by default - self.config['storage_nodes'], + es_conf['storage_nodes'], # auto detect cluster's status sniff_on_start=options['sniff_on_start'], sniff_on_connection_fail=options['sniff_on_connection_fail'], sniffer_timeout=60, # compression or not http_compress=options['http_compress']) - self.index_name_prefix = self.config['index_name_prefix'] + self.index_name_prefix = es_conf['index_name_prefix'] # document's index type (cf. ../../data/elastic-template.json) self.doc_type = 'task' def compute_index_name(self, year, month): """Given a year, month, compute the index's name. """ return '%s-%s-%s' % ( self.index_name_prefix, year, '%02d' % month) def index(self, data): """Index given data to elasticsearch. The field 'ended' in data is used to compute the index to index data to. """ date = data['ended'] index_name = self.compute_index_name(date.year, date.month) return self.storage.index(index=index_name, doc_type=self.doc_type, body=data) def mget(self, index_name, doc_ids, chunk_size=500, source=True, log=None): """Retrieve document's full content according to their ids as per source's setup. The `source` permits to retrieve only what's of interest to us, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_ids (generator): Generator of ids to retrieve chunk_size (int): Number of documents chunk to send for retrieval source (bool/[str]): Source of information to return Yields: document indexed as per source's setup """ if isinstance(source, list): source = {'_source': ','.join(source)} else: source = {'_source': str(source).lower()} for ids in utils.grouper(doc_ids, n=1000): res = self.storage.mget(body={'ids': list(ids)}, index=index_name, doc_type=self.doc_type, params=source) if not res: if log: log.error('Error during retrieval of data, skipping!') continue for doc in res['docs']: found = doc.get('found') if not found: msg = 'Doc id %s not found, not indexed yet' % doc['_id'] if log: log.warning(msg) continue yield doc['_source'] def _streaming_bulk(self, index_name, doc_stream, chunk_size=500, log=None): """Bulk index data and returns the successful indexed data's identifier. Args: index_name (str): Name of the concerned index. doc_stream (generator): Generator of documents to index chunk_size (int): Number of documents chunk to send for indexation Yields: document id indexed """ actions = ({'_index': index_name, '_op_type': 'index', '_type': self.doc_type, '_source': data} for data in doc_stream) for ok, result in helpers.streaming_bulk(client=self.storage, actions=actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False): if not ok: if log: log.error('Error during %s indexation. Skipping.' % result) continue yield result['index']['_id'] def streaming_bulk(self, index_name, doc_stream, chunk_size=500, source=True, log=None): """Bulk index data and returns the successful indexed data as per source's setup. the `source` permits to retrieve only what's of interest to us, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=['task_id'] ; returns only task_id in the _source field Args: index_name (str): Name of the concerned index. doc_stream (generator): Document generator to index chunk_size (int): Number of documents chunk to send source (bool, [str]): the information to return """ indexed_ids = self._streaming_bulk( index_name, doc_stream, chunk_size=chunk_size, log=log) yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size, source=source, log=log) diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index 489f4e9..8ae64a0 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,264 +1,287 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import importlib import logging import os import urllib.parse from celery import Celery from celery.signals import setup_logging, celeryd_after_setup from celery.utils.log import ColorFormatter from celery.worker.control import Panel from kombu import Exchange, Queue from kombu.five import monotonic as _monotonic import requests from swh.scheduler.task import Task -from swh.core.config import load_named_config +from swh.core.config import load_named_config, merge_configs from swh.core.logger import JournalHandler DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' CONFIG_NAME_TEMPLATE = 'worker/%s' DEFAULT_CONFIG = { 'task_broker': ('str', 'amqp://guest@localhost//'), 'result_backend': ('str', 'rpc://'), 'task_modules': ('list[str]', []), 'task_queues': ('list[str]', []), 'task_soft_time_limit': ('int', 0), } +logger = logging.getLogger(__name__) + @setup_logging.connect def setup_log_handler(loglevel=None, logfile=None, format=None, - colorize=None, **kwargs): + colorize=None, log_console=True, **kwargs): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG if isinstance(loglevel, str): loglevel = logging._nameToLevel[loglevel] formatter = logging.Formatter(format) root_logger = logging.getLogger('') root_logger.setLevel(logging.INFO) - if loglevel == logging.DEBUG: + if loglevel <= logging.DEBUG: + log_console = True + if log_console: color_formatter = ColorFormatter(format) if colorize else formatter console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) root_logger.addHandler(console) systemd_journal = JournalHandler() systemd_journal.setLevel(logging.DEBUG) systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) logging.getLogger('celery').setLevel(logging.INFO) # Silence amqp heartbeat_tick messages logger = logging.getLogger('amqp') logger.addFilter(lambda record: not record.msg.startswith( 'heartbeat_tick')) logger.setLevel(logging.DEBUG) # Silence useless "Starting new HTTP connection" messages logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('swh').setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task logging.getLogger('celery.task').setLevel(loglevel) return loglevel @celeryd_after_setup.connect def setup_queues_and_tasks(sender, instance, **kwargs): """Signal called on worker start. This automatically registers swh.scheduler.task.Task subclasses as available celery tasks. This also subscribes the worker to the "implicit" per-task queues defined for these task classes. """ + logger.info('Setup Queues & Tasks for %s', sender) + + instance.app.conf['worker_name'] = sender + for module_name in itertools.chain( # celery worker -I flag instance.app.conf['include'], # set from the celery / swh worker instance configuration file instance.app.conf['imports'], ): module = importlib.import_module(module_name) for name in dir(module): obj = getattr(module, name) if ( isinstance(obj, type) and issubclass(obj, Task) and obj != Task # Don't register the abstract class itself ): class_name = '%s.%s' % (module_name, name) register_task_class(instance.app, class_name, obj) for task_name in instance.app.tasks: if task_name.startswith('swh.'): instance.app.amqp.queues.select_add(task_name) @Panel.register def monotonic(state): """Get the current value for the monotonic clock""" return {'monotonic': _monotonic()} def route_for_task(name, args, kwargs, options, task=None, **kw): """Route tasks according to the task_queue attribute in the task class""" if name is not None and name.startswith('swh.'): return {'queue': name} def get_queue_stats(app, queue_name): """Get the statistics regarding a queue on the broker. Arguments: queue_name: name of the queue to check Returns a dictionary raw from the RabbitMQ management API; or `None` if the current configuration does not use RabbitMQ. Interesting keys: - consumers (number of consumers for the queue) - messages (number of messages in queue) - messages_unacknowledged (number of messages currently being processed) Documentation: https://www.rabbitmq.com/management.html#http-api """ conn_info = app.connection().info() if conn_info['transport'] == 'memory': # We're running in a test environment, without RabbitMQ. return None url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( hostname=conn_info['hostname'], port=conn_info['port'] + 10000, vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), queue=urllib.parse.quote(queue_name, safe=''), ) credentials = (conn_info['userid'], conn_info['password']) r = requests.get(url, auth=credentials) if r.status_code == 404: return {} if r.status_code != 200: raise ValueError('Got error %s when reading queue stats: %s' % ( r.status_code, r.json())) return r.json() def get_queue_length(app, queue_name): """Shortcut to get a queue's length""" stats = get_queue_stats(app, queue_name) if stats: return stats.get('messages') def register_task_class(app, name, cls): """Register a class-based task under the given name""" if name in app.tasks: return task_instance = cls() task_instance.name = name app.register_task(task_instance) INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) -if INSTANCE_NAME: - CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME -else: - CONFIG_NAME = DEFAULT_CONFIG_NAME - -# Load the Celery config -CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) +CONFIG_NAME = os.environ.get('SWH_CONFIG_FILENAME') +CONFIG = {} +if CONFIG_NAME: + # load the celery config from the main config file given as + # SWH_CONFIG_FILENAME environment variable. + # This is expected to have a [celery] section in which we have the + # celery specific configuration. + CONFIG = load_named_config(CONFIG_NAME).get('celery') + +if not CONFIG: + # otherwise, back to compat config loading mechanism + if INSTANCE_NAME: + CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME + else: + CONFIG_NAME = DEFAULT_CONFIG_NAME + + # Load the Celery config + CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] -for queue in CONFIG['task_queues']: - CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) - CELERY_DEFAULT_CONFIG = dict( # Timezone configuration: all in UTC enable_utc=True, timezone='UTC', # Imported modules - imports=CONFIG['task_modules'], + imports=CONFIG.get('task_modules', []), # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. result_expires=None, # A string identifying the default serialization method to use. Can # be json (default), pickle, yaml, msgpack, or any custom # serialization methods that have been registered with task_serializer='msgpack', # Result serialization format result_serializer='msgpack', # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. task_acks_late=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry accept_content=['msgpack', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. task_track_started=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # result_compression='bzip2', # task_compression='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) worker_disable_rate_limits=True, - # Task hard time limit in seconds. The worker processing the task will be - # killed and replaced with a new one when this is exceeded. - # task_time_limit=3600, - # Task soft time limit in seconds. - # The SoftTimeLimitExceeded exception will be raised when this is exceeded. - # The task can catch this to e.g. clean up before the hard time limit - # comes. - task_soft_time_limit=CONFIG['task_soft_time_limit'], # Task routing task_routes=route_for_task, # Task queues this worker will consume from task_queues=CELERY_QUEUES, # Allow pool restarts from remote worker_pool_restarts=True, # Do not prefetch tasks worker_prefetch_multiplier=1, # Send events worker_send_task_events=True, # Do not send useless task_sent events task_send_sent_event=False, ) -# Instantiate the Celery app -app = Celery(broker=CONFIG['task_broker'], - backend=CONFIG['result_backend'], - task_cls='swh.scheduler.task:SWHTask') -app.add_defaults(CELERY_DEFAULT_CONFIG) + +def build_app(config=None): + config = merge_configs( + {k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, + config or {}) + + config['task_queues'] = [Queue(queue, Exchange(queue), routing_key=queue) + for queue in config.get('task_queues', ())] + logger.debug('Creating a Celery app with %s', config) + + # Instantiate the Celery app + app = Celery(broker=config['task_broker'], + backend=config['result_backend'], + task_cls='swh.scheduler.task:SWHTask') + app.add_defaults(CELERY_DEFAULT_CONFIG) + app.add_defaults(config) + return app + + +app = build_app(CONFIG) # XXX for BW compat Celery.get_queue_length = get_queue_length diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index 0a8b17b..dca4a7f 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -1,207 +1,208 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import click import datetime import logging +import time import socket +import sys + +import click from arrow import utcnow from kombu import Queue -from celery.events import EventReceiver -from swh.scheduler import get_scheduler -from .config import setup_log_handler, app as main_app +import celery +from celery.events import EventReceiver class ReliableEventReceiver(EventReceiver): def __init__(self, channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix='celeryev', accept=None): super(ReliableEventReceiver, self).__init__( channel, handlers, routing_key, node_id, app, queue_prefix, accept) self.queue = Queue('.'.join([self.queue_prefix, self.node_id]), exchange=self.exchange, routing_key=self.routing_key, auto_delete=False, durable=True) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], callbacks=[self._receive], no_ack=False, accept=self.accept)] def _receive(self, bodies, message): logging.debug('## event-receiver: bodies: %s' % bodies) logging.debug('## event-receiver: message: %s' % message) if not isinstance(bodies, list): # celery<4 returned body as element bodies = [bodies] for body in bodies: type, body = self.event_from_message(body) self.process(type, body, message) def process(self, type, event, message): """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') logging.debug('## event-receiver: type: %s' % type) logging.debug('## event-receiver: event: %s' % event) logging.debug('## event-receiver: handler: %s' % handler) handler and handler(event, message) ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) ACTION_QUEUE_MAX_LENGTH = 1000 def event_monitor(app, backend): logger = logging.getLogger('swh.scheduler.listener') actions = { 'last_send': utcnow() - 2*ACTION_SEND_DELAY, 'queue': [], } def try_perform_actions(actions=actions): logger.debug('Try perform pending actions') if actions['queue'] and ( len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH or utcnow() - actions['last_send'] > ACTION_SEND_DELAY): perform_actions(actions) def perform_actions(actions, backend=backend): logger.info('Perform %s pending actions' % len(actions['queue'])) action_map = { 'start_task_run': backend.start_task_run, 'end_task_run': backend.end_task_run, } messages = [] - cursor = backend.cursor() + db = backend.get_db() + cursor = db.cursor(None) for action in actions['queue']: messages.append(action['message']) function = action_map[action['action']] args = action.get('args', ()) kwargs = action.get('kwargs', {}) - kwargs['cursor'] = cursor + kwargs['cur'] = cursor function(*args, **kwargs) - backend.commit() + db.conn.commit() for message in messages: - message.ack() + if not message.acknowledged: + message.ack() + else: + logger.info('message already acknowledged: %s', message) actions['queue'] = [] actions['last_send'] = utcnow() def queue_action(action, actions=actions): actions['queue'].append(action) try_perform_actions() def catchall_event(event, message): - message.ack() + logger.info('event: %s, message:%s', event, message) + if not message.acknowledged: + message.ack() + else: + logger.info('message already acknowledged: %s', message) try_perform_actions() def task_started(event, message): logger.debug('task_started: event: %s' % event) logger.debug('task_started: message: %s' % message) queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'metadata': { 'worker': event['hostname'], }, }, 'message': message, }) def task_succeeded(event, message): logger.debug('task_succeeded: event: %s' % event) logger.debug('task_succeeded: message: %s' % message) result = event['result'] logger.debug('task_succeeded: result: %s' % result) try: status = result.get('status') if status == 'success': status = 'eventful' if result.get('eventful') else 'uneventful' except Exception: status = 'eventful' if result else 'uneventful' queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': status, 'result': result, }, 'message': message, }) def task_failed(event, message): logger.debug('task_failed: event: %s' % event) logger.debug('task_failed: message: %s' % message) queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': 'failed', }, 'message': message, }) recv = ReliableEventReceiver( - main_app.connection(), - app=main_app, + celery.current_app.connection(), + app=celery.current_app, handlers={ 'task-started': task_started, 'task-result': task_succeeded, 'task-failed': task_failed, '*': catchall_event, }, node_id='listener-%s' % socket.gethostname(), ) - recv.capture(limit=None, timeout=None, wakeup=True) + errors = 0 + while True: + try: + recv.capture(limit=None, timeout=None, wakeup=True) + errors = 0 + except KeyboardInterrupt: + logger.exception('Keyboard interrupt, exiting') + break + except Exception: + logger.exception('Unexpected exception') + if errors < 5: + time.sleep(errors) + errors += 1 + else: + logger.error('Too many consecutive errors, exiting') + sys.exit(1) @click.command() -@click.option('--cls', '-c', default='local', - help="Scheduler's class, default to 'local'") -@click.option( - '--database', '-d', help='Scheduling database DSN') -@click.option('--url', '-u', - help="(Optional) Scheduler's url access") -@click.option('--log-level', '-l', default='INFO', - type=click.Choice(logging._nameToLevel.keys()), - help='Log level (default to INFO)') -def main(cls, database, url, log_level): - setup_log_handler(loglevel=log_level, colorize=False, - format='[%(levelname)s] %(name)s -- %(message)s') - # logging.basicConfig(level=level) - - scheduler = None - override_config = {} - if cls == 'local': - if database: - override_config = {'scheduling_db': database} - scheduler = get_scheduler(cls, args=override_config) - elif cls == 'remote': - if url: - override_config = {'url': url} - scheduler = get_scheduler(cls, args=override_config) - - if not scheduler: - raise ValueError('Scheduler class (local/remote) must be instantiated') - - event_monitor(main_app, backend=scheduler) +@click.pass_context +def main(ctx): + click.echo("Deprecated! Use 'swh-scheduler listener' instead.", + err=True) + ctx.exit(1) if __name__ == '__main__': main() diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index b6586e4..d4bdafc 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,115 +1,115 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow +import logging from swh.scheduler import get_scheduler, compute_nb_tasks_from -from .config import app as main_app +logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks(backend, app): """Run tasks that are ready Args: backend (Scheduler): backend to read tasks to schedule app (App): Celery application to send tasks to Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': arrow.utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks = [] while True: - cursor = backend.cursor() task_types = {} pending_tasks = [] - for task_type in backend.get_task_types(cursor=cursor): + for task_type in backend.get_task_types(): task_type_name = task_type['type'] task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] backend_name = task_type['backend_name'] if max_queue_length: try: queue_length = app.get_queue_length(backend_name) except ValueError: queue_length = None if queue_length is None: # Running without RabbitMQ (probably a test env). num_tasks = MAX_NUM_TASKS else: num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS if num_tasks > 0: num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) - pending_tasks.extend( - backend.grab_ready_tasks( + grabbed_tasks = backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, - num_tasks_priority=num_tasks_priority, - cursor=cursor)) + num_tasks_priority=num_tasks_priority) + if grabbed_tasks: + pending_tasks.extend(grabbed_tasks) + logger.info('Grabbed %s tasks %s', + len(grabbed_tasks), task_type_name) if not pending_tasks: return all_backend_tasks backend_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] backend_name = task_types[task['type']]['backend_name'] celery_result = app.send_task( backend_name, args=args, kwargs=kwargs, ) - data = { 'task': task['id'], 'backend_id': celery_result.id, 'scheduled': arrow.utcnow(), } backend_tasks.append(data) + logger.debug('Sent %s celery tasks', len(backend_tasks)) - backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) - - backend.commit() - + backend.mass_schedule_task_runs(backend_tasks) all_backend_tasks.extend(backend_tasks) def main(): + from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler('local') try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == '__main__': main() diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py index d622b52..61a828f 100644 --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -1,634 +1,679 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import click import csv import itertools import json import locale import logging import time from swh.core import utils, config from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient -from . import get_scheduler +from . import get_scheduler, DEFAULT_CONFIG locale.setlocale(locale.LC_ALL, '') ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] class DateTimeType(click.ParamType): name = 'time and date' def convert(self, value, param, ctx): if not isinstance(value, arrow.Arrow): value = arrow.get(value) return value DATETIME = DateTimeType() CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) def pretty_print_list(list, indent): """Pretty-print a list""" return ''.join('%s%s\n' % (' ' * indent, item) for item in list) def pretty_print_dict(dict, indent): """Pretty-print a list""" return ''.join('%s%s: %s\n' % (' ' * indent, click.style(key, bold=True), value) for key, value in dict.items()) def pretty_print_task(task, full=False): """Pretty-print a task If 'full' is True, also print the status and priority fields. """ next_run = arrow.get(task['next_run']) lines = [ '%s %s\n' % (click.style('Task', bold=True), task['id']), click.style(' Next run: ', bold=True), "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()), '\n', click.style(' Interval: ', bold=True), str(task['current_interval']), '\n', click.style(' Type: ', bold=True), task['type'] or '', '\n', click.style(' Policy: ', bold=True), task['policy'] or '', '\n', ] if full: lines += [ click.style(' Status: ', bold=True), task['status'] or '', '\n', click.style(' Priority: ', bold=True), task['priority'] or '', '\n', ] lines += [ click.style(' Args:\n', bold=True), pretty_print_list(task['arguments']['args'], indent=4), click.style(' Keyword args:\n', bold=True), pretty_print_dict(task['arguments']['kwargs'], indent=4), ] return ''.join(lines) @click.group(context_settings=CONTEXT_SETTINGS) -@click.option('--cls', '-c', default='local', - type=click.Choice(['local', 'remote']), - help="Scheduler's class, default to 'local'") -@click.option('--database', '-d', - help="Scheduling database DSN (if cls is 'local')") -@click.option('--url', '-u', - help="Scheduler's url access (if cls is 'remote')") +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") +@click.option('--database', '-d', default=None, + help="Scheduling database DSN (imply cls is 'local')") +@click.option('--url', '-u', default=None, + help="Scheduler's url access (imply cls is 'remote')") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") +@click.option('--no-stdout', is_flag=True, default=False, + help="Do NOT output logs on the console") @click.pass_context -def cli(ctx, cls, database, url, log_level): +def cli(ctx, config_file, database, url, log_level, no_stdout): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ from swh.scheduler.celery_backend.config import setup_log_handler log_level = setup_log_handler( loglevel=log_level, colorize=False, - format='[%(levelname)s] %(name)s -- %(message)s') + format='[%(levelname)s] %(name)s -- %(message)s', + log_console=not no_stdout) ctx.ensure_object(dict) logger = logging.getLogger(__name__) scheduler = None - override_config = {} + conf = config.read(config_file, DEFAULT_CONFIG) + if 'scheduler' not in conf: + raise ValueError("missing 'scheduler' configuration") + + if database: + conf['scheduler']['cls'] = 'local' + conf['scheduler']['args']['db'] = database + elif url: + conf['scheduler']['cls'] = 'remote' + conf['scheduler']['args'] = {'url': url} + sched_conf = conf['scheduler'] try: - if cls == 'local' and database: - override_config = {'scheduling_db': database} - elif cls == 'remote' and url: - override_config = {'url': url} - logger.debug('Instanciating scheduler %s with %s' % ( - cls, override_config)) - scheduler = get_scheduler(cls, args=override_config) - except Exception: + logger.debug('Instanciating scheduler with %s' % ( + sched_conf)) + scheduler = get_scheduler(**sched_conf) + except ValueError: # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj['scheduler'] = scheduler - ctx.obj['config'] = {'cls': cls, 'args': override_config} + ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.group('task') @click.pass_context def task(ctx): """Manipulate tasks.""" pass @task.command('schedule') @click.option('--columns', '-c', multiple=True, default=['type', 'args', 'kwargs', 'next_run'], type=click.Choice([ 'type', 'args', 'kwargs', 'policy', 'next_run']), help='columns present in the CSV file') @click.option('--delimiter', '-d', default=',') @click.argument('file', type=click.File(encoding='utf-8')) @click.pass_context def schedule_tasks(ctx, columns, delimiter, file): """Schedule tasks from a CSV input file. The following columns are expected, and can be set through the -c option: - type: the type of the task to be scheduled (mandatory) - args: the arguments passed to the task (JSON list, defaults to an empty list) - kwargs: the keyword arguments passed to the task (JSON object, defaults to an empty dict) - next_run: the date at which the task should run (datetime, defaults to now) The CSV can be read either from a named file, or from stdin (use - as filename). Use sample: cat scheduling-task.txt | \ python3 -m swh.scheduler.cli \ --database 'service=swh-scheduler-dev' \ task schedule \ --columns type --columns kwargs --columns policy \ --delimiter ';' - """ tasks = [] now = arrow.utcnow() scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') reader = csv.reader(file, delimiter=delimiter) for line in reader: task = dict(zip(columns, line)) args = json.loads(task.pop('args', '[]')) kwargs = json.loads(task.pop('kwargs', '{}')) task['arguments'] = { 'args': args, 'kwargs': kwargs, } task['next_run'] = DATETIME.convert(task.get('next_run', now), None, None) tasks.append(task) created = scheduler.create_tasks(tasks) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager('\n'.join(output)) @task.command('add') @click.argument('type', nargs=1, required=True) @click.argument('options', nargs=-1) @click.option('--policy', '-p', default='recurring', type=click.Choice(['recurring', 'oneshot'])) @click.option('--priority', '-P', default=None, type=click.Choice(['low', 'normal', 'high'])) @click.option('--next-run', '-n', default=None) @click.pass_context def schedule_task(ctx, type, options, policy, priority, next_run): """Schedule one task from arguments. Usage sample: swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-pypi swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-debian --policy=oneshot distribution=stretch Note: if the priority is not given, the task won't have the priority set, which is considered as the lowest priority level. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') now = arrow.utcnow() args = [x for x in options if '=' not in x] kw = dict(x.split('=', 1) for x in options if '=' in x) task = {'type': type, 'policy': policy, 'priority': priority, 'arguments': { 'args': args, 'kwargs': kw, }, 'next_run': DATETIME.convert(next_run or now, None, None), } created = scheduler.create_tasks([task]) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('list-pending') @click.argument('task-types', required=True, nargs=-1) @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch') @click.option('--before', '-b', required=False, type=DATETIME, help='List all jobs supposed to run before the given date') @click.pass_context def list_pending_tasks(ctx, task_types, limit, before): """List the tasks that are going to be run. You can override the number of tasks to fetch """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) output = [] for task_type in task_types: pending = scheduler.peek_ready_tasks( task_type, timestamp=before, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) output.append('Found %d %s tasks\n' % ( len(pending), task_type)) for task in pending: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('list') @click.option('--task-id', '-i', default=None, multiple=True, metavar='ID', help='List only tasks whose id is ID.') @click.option('--task-type', '-t', default=None, multiple=True, metavar='TYPE', help='List only tasks of type TYPE') @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch.') @click.option('--status', '-s', multiple=True, metavar='STATUS', default=None, help='List tasks whose status is STATUS.') @click.option('--policy', '-p', default=None, type=click.Choice(['recurring', 'oneshot']), help='List tasks whose policy is POLICY.') @click.option('--priority', '-P', default=None, multiple=True, type=click.Choice(['all', 'low', 'normal', 'high']), help='List tasks whose priority is PRIORITY.') @click.option('--before', '-b', required=False, type=DATETIME, metavar='DATETIME', help='Limit to tasks supposed to run before the given date.') @click.option('--after', '-a', required=False, type=DATETIME, metavar='DATETIME', help='Limit to tasks supposed to run after the given date.') @click.pass_context def list_tasks(ctx, task_id, task_type, limit, status, policy, priority, before, after): """List tasks. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') if not task_type: task_type = [x['type'] for x in scheduler.get_task_types()] # if task_id is not given, default value for status is # 'next_run_not_scheduled' # if task_id is given, default status is 'all' if task_id is None and status is None: status = ['next_run_not_scheduled'] if status and 'all' in status: status = None if priority and 'all' in priority: priority = None output = [] tasks = scheduler.search_tasks( task_id=task_id, task_type=task_type, status=status, priority=priority, policy=policy, before=before, after=after, limit=limit) output.append('Found %d tasks\n' % ( len(tasks))) for task in tasks: output.append(pretty_print_task(task, full=True)) click.echo('\n'.join(output)) @task.command('respawn') @click.argument('task-ids', required=True, nargs=-1) @click.option('--next-run', '-n', required=False, type=DATETIME, metavar='DATETIME', default=None, help='Re spawn the selected tasks at this date') @click.pass_context def respawn_tasks(ctx, task_ids, next_run): """Respawn tasks. Respawn tasks given by their ids (see the 'task list' command to find task ids) at the given date (immediately by default). Eg. swh-scheduler task respawn 1 3 12 """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') if next_run is None: next_run = arrow.utcnow() output = [] scheduler.set_status_tasks( task_ids, status='next_run_not_scheduled', next_run=next_run) - output.append('Respawn tasks %s\n' % ( - task_ids)) + output.append('Respawn tasks %s\n' % (task_ids,)) click.echo('\n'.join(output)) @task.command('archive') @click.option('--before', '-b', default=None, help='''Task whose ended date is anterior will be archived. Default to current month's first day.''') @click.option('--after', '-a', default=None, help='''Task whose ended date is after the specified date will be archived. Default to prior month's first day.''') @click.option('--batch-index', default=1000, type=click.INT, help='Batch size of tasks to read from db to archive') @click.option('--bulk-index', default=200, type=click.INT, help='Batch size of tasks to bulk index') @click.option('--batch-clean', default=1000, type=click.INT, help='Batch size of task to clean after archival') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--verbose', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--cleanup/--no-cleanup', is_flag=True, default=True, help='Clean up archived tasks (default)') @click.option('--start-from', type=click.INT, default=-1, help='(Optional) default task id to start from. Default is -1.') @click.pass_context def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, dry_run, verbose, cleanup, start_from): """Archive task/task_run whose (task_type is 'oneshot' and task_status is 'completed') or (task_type is 'recurring' and task_status is 'disabled'). With --dry-run flag set (default), only list those. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') es_client = SWHElasticSearchClient() logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') logging.getLogger('urllib3').setLevel(logging.WARN) logging.getLogger('elasticsearch').setLevel(logging.WARN) if dry_run: log.info('**DRY-RUN** (only reading db)') if not cleanup: log.info('**NO CLEANUP**') now = arrow.utcnow() # Default to archive tasks from a rolling month starting the week # prior to the current one if not before: before = now.shift(weeks=-1).format('YYYY-MM-DD') if not after: after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( not dry_run, not dry_run and cleanup, after, before)) def group_by_index_name(data, es_client=es_client): """Given a data record, determine the index's name through its ending date. This varies greatly depending on the task_run's status. """ date = data.get('started') if not date: date = data['scheduled'] return es_client.compute_index_name(date.year, date.month) def index_data(before, last_id, batch_index): tasks_in = scheduler.filter_task_to_archive( after, before, last_id=last_id, limit=batch_index) for index_name, tasks_group in itertools.groupby( tasks_in, key=group_by_index_name): log.debug('Index tasks to %s' % index_name) if dry_run: for task in tasks_group: yield task continue yield from es_client.streaming_bulk( index_name, tasks_group, source=['task_id', 'task_run_id'], chunk_size=bulk_index, log=log) gen = index_data(before, last_id=start_from, batch_index=batch_index) if cleanup: for task_ids in utils.grouper(gen, n=batch_clean): task_ids = list(task_ids) log.info('Clean up %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) if dry_run: # no clean up continue ctx.obj['scheduler'].delete_archived_tasks(task_ids) else: for task_ids in utils.grouper(gen, n=batch_index): task_ids = list(task_ids) log.info('Indexed %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) @cli.command('runner') @click.option('--period', '-p', default=0, help=('Period (in s) at witch pending tasks are checked and ' 'executed. Set to 0 (default) for a one shot.')) @click.pass_context def runner(ctx, period): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.runner import run_ready_tasks - from swh.scheduler.celery_backend.config import app + from swh.scheduler.celery_backend.config import build_app + + app = build_app(ctx.obj['config'].get('celery')) + app.set_current() logger = logging.getLogger(__name__ + '.runner') scheduler = ctx.obj['scheduler'] logger.debug('Scheduler %s' % scheduler) try: while True: - logger.info('Run ready tasks') + logger.debug('Run ready tasks') try: - run_ready_tasks(scheduler, app) + ntasks = len(run_ready_tasks(scheduler, app)) + if ntasks: + logger.info('Scheduled %s tasks', ntasks) except Exception: - scheduler.rollback() logger.exception('Unexpected error in run_ready_tasks()') if not period: break time.sleep(period) except KeyboardInterrupt: ctx.exit(0) @cli.command('listener') @click.pass_context def listener(ctx): """Starts a swh-scheduler listener service. This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') - from swh.scheduler.celery_backend.listener import ( - event_monitor, main_app) - event_monitor(main_app, backend=scheduler) + from swh.scheduler.celery_backend.config import build_app + app = build_app(ctx.obj['config'].get('celery')) + app.set_current() + + from swh.scheduler.celery_backend.listener import event_monitor + event_monitor(app, backend=scheduler) @cli.command('api-server') -@click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the scheduler server api") @click.option('--port', default=5008, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=None, help=("Indicates if the server should run in debug mode. " "Defaults to True if log-level is DEBUG, False otherwise.") ) @click.pass_context -def api_server(ctx, config_path, host, port, debug): +def api_server(ctx, host, port, debug): """Starts a swh-scheduler API HTTP server. """ - if ctx.obj['config']['cls'] == 'remote': + if ctx.obj['config']['scheduler']['cls'] == 'remote': click.echo("The API server can only be started with a 'local' " "configuration", err=True) ctx.exit(1) - from swh.scheduler.api.server import app, DEFAULT_CONFIG - conf = config.read(config_path, DEFAULT_CONFIG) - if ctx.obj['config']['args']: - conf['scheduler']['args'].update(ctx.obj['config']['args']) - app.config.update(conf) + from swh.scheduler.api import server + server.app.scheduler = ctx.obj['scheduler'] + server.app.config.update(ctx.obj['config']) if debug is None: debug = ctx.obj['loglevel'] <= logging.DEBUG - - app.run(host, port=port, debug=bool(debug)) + server.app.run(host, port=port, debug=bool(debug)) @cli.group('task-type') @click.pass_context def task_type(ctx): """Manipulate task types.""" pass @task_type.command('list') @click.option('--verbose', '-v', is_flag=True, default=False) @click.option('--task_type', '-t', multiple=True, default=None, help='List task types of given type') @click.option('--task_name', '-n', multiple=True, default=None, help='List task types of given backend task name') @click.pass_context def list_task_types(ctx, verbose, task_type, task_name): click.echo("Known task types:") if verbose: tmpl = click.style('{type}: ', bold=True) + '''{backend_name} {description} interval: {default_interval} [{min_interval}, {max_interval}] backoff_factor: {backoff_factor} max_queue_length: {max_queue_length} num_retries: {num_retries} retry_delay: {retry_delay} ''' else: tmpl = '{type}:\n {description}' - for tasktype in ctx.obj['scheduler'].get_task_types(): + for tasktype in sorted(ctx.obj['scheduler'].get_task_types(), + key=lambda x: x['type']): if task_type and tasktype['type'] not in task_type: continue if task_name and tasktype['backend_name'] not in task_name: continue click.echo(tmpl.format(**tasktype)) @task_type.command('add') @click.argument('type', required=1) @click.argument('task-name', required=1) @click.argument('description', required=1) @click.option('--default-interval', '-i', default='90 days', help='Default interval ("90 days" by default)') @click.option('--min-interval', default=None, help='Minimum interval (default interval if not set)') @click.option('--max-interval', '-i', default=None, help='Maximal interval (default interval if not set)') @click.option('--backoff-factor', '-f', type=float, default=1, help='Backoff factor') @click.pass_context def add_task_type(ctx, type, task_name, description, default_interval, min_interval, max_interval, backoff_factor): """Create a new task type """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') task_type = dict( type=type, backend_name=task_name, description=description, default_interval=default_interval, min_interval=min_interval, max_interval=max_interval, backoff_factor=backoff_factor, max_queue_length=None, num_retries=None, retry_delay=None, ) scheduler.create_task_type(task_type) click.echo('OK') +@cli.command('updater') +@click.option('--verbose/--no-verbose', '-v', default=False, + help='Verbose mode') +@click.pass_context +def updater(ctx, verbose): + """Insert tasks in the scheduler from the scheduler-updater's events + + """ + from swh.scheduler.updater.writer import UpdaterWriter + UpdaterWriter(**ctx.obj['config']).run() + + +@cli.command('ghtorrent') +@click.option('--verbose/--no-verbose', '-v', default=False, + help='Verbose mode') +@click.pass_context +def ghtorrent(ctx, verbose): + """Consume events from ghtorrent and write them to cache. + + """ + from swh.scheduler.updater.ghtorrent import GHTorrentConsumer + from swh.scheduler.updater.backend import SchedulerUpdaterBackend + + ght_config = ctx.obj['config'].get('ghtorrent', {}) + back_config = ctx.obj['config'].get('scheduler_updater', {}) + backend = SchedulerUpdaterBackend(**back_config) + GHTorrentConsumer(backend, **ght_config).run() + + +def main(): + return cli(auto_envvar_prefix='SWH_SCHEDULER') + + if __name__ == '__main__': - cli() + main() diff --git a/swh/scheduler/sql/40-swh-data.sql b/swh/scheduler/sql/40-swh-data.sql index 59279d6..a0322c6 100644 --- a/swh/scheduler/sql/40-swh-data.sql +++ b/swh/scheduler/sql/40-swh-data.sql @@ -1,286 +1,286 @@ insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-loader-mount-dump-and-load-svn-repository', 'Loading svn repositories from svn dump', 'swh.loader.svn.tasks.MountAndLoadSvnRepository', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-svn', 'Create dump of a remote svn repository, mount it and load it', 'swh.loader.svn.tasks.DumpMountAndLoadSvnRepository', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-loading', 'Loading deposit archive into swh through swh-loader-tar', 'swh.deposit.loader.tasks.LoadDepositArchiveTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-checks', 'Pre-checking deposit step before loading into swh archive', 'swh.deposit.loader.tasks.ChecksDepositTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-vault-cooking', 'Cook a Vault bundle', 'swh.vault.cooking_tasks.SWHCookingTask', '1 day', '1 day', '1 day', 1, 10000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-hg', 'Loading mercurial repository swh-loader-mercurial', 'swh.loader.mercurial.tasks.LoadMercurial', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-load-archive-hg', 'Loading archive mercurial repository swh-loader-mercurial', 'swh.loader.mercurial.tasks.LoadArchiveMercurial', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-git', 'Update an origin of type git', 'swh.loader.git.tasks.UpdateGitRepository', '64 days', '12:00:00', '64 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-github-incremental', 'Incrementally list GitHub', 'swh.lister.github.tasks.IncrementalGitHubLister', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-github-full', 'Full update of GitHub repos list', 'swh.lister.github.tasks.FullGitHubRelister', '90 days', '90 days', '90 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-debian', 'List a Debian distribution', 'swh.lister.debian.tasks.DebianListerTask', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-gitlab-incremental', 'Incrementally list a Gitlab instance', 'swh.lister.gitlab.tasks.IncrementalGitLabLister', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-gitlab-full', 'Full update of a Gitlab instance''s repos list', 'swh.lister.gitlab.tasks.FullGitLabRelister', '90 days', '90 days', '90 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-pypi', 'Full pypi lister', 'swh.lister.pypi.tasks.PyPIListerTask', '1 days', '1 days', '1 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-pypi', 'Load Pypi origin', 'swh.loader.pypi.tasks.LoadPyPI', '64 days', '12:00:00', '64 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_mimetype', 'Mimetype indexer task', 'swh.indexer.tasks.ContentMimetype', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_range_mimetype', 'Mimetype Range indexer task', 'swh.indexer.tasks.ContentRangeMimetype', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_fossology_license', 'Fossology license indexer task', 'swh.indexer.tasks.ContentFossologyLicense', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_range_fossology_license', 'Fossology license range indexer task', 'swh.indexer.tasks.ContentRangeFossologyLicense', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_origin_head', 'Origin Head indexer task', 'swh.indexer.tasks.OriginHead', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_revision_metadata', 'Revision Metadata indexer task', 'swh.indexer.tasks.RevisionMetadata', '1 day', '12:00:00', '1 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'indexer_origin_metadata', 'Origin Metadata indexer task', 'swh.indexer.tasks.OriginMetadata', '1 day', '12:00:00', '1 days', 2, - 5000); + 20000); diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index b141a01..ca65742 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,68 +1,96 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from celery import current_app import celery.app.task from celery.utils.log import get_task_logger +from swh.core.statsd import Statsd + class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ + _statsd = None _log = None + @property + def statsd(self): + if self._statsd: + return self._statsd + worker_name = current_app.conf.get('worker_name') + if worker_name: + self._statsd = Statsd(constant_tags={ + 'task': self.name, + 'worker': worker_name, + }) + return self._statsd + else: + return Statsd(constant_tags={ + 'task': self.name, + 'worker': 'unknown worker', + }) + + def __call__(self, *args, **kwargs): + self.statsd.increment('swh_task_called_count') + with self.statsd.timed('swh_task_duration_seconds'): + return super().__call__(*args, **kwargs) + def on_failure(self, exc, task_id, args, kwargs, einfo): + self.statsd.increment('swh_task_failure_count') self.send_event('task-result-exception') def on_success(self, retval, task_id, args, kwargs): + self.statsd.increment('swh_task_success_count') self.send_event('task-result', result=retval) @property def log(self): if self._log is None: self._log = get_task_logger(self.name) return self._log def run(self, *args, **kwargs): self.log.debug('%s: args=%s, kwargs=%s', self.name, args, kwargs) ret = super().run(*args, **kwargs) self.log.debug('%s: OK => %s', self.name, ret) return ret class Task(SWHTask): """a schedulable task (abstract class) DEPRECATED! Please use SWHTask as base for decorated functions instead. Sub-classes must implement the run_task() method. Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ abstract = True def run(self, *args, **kwargs): """This method is called by the celery worker when a task is received. Should not be overridden as we need our special events to be sent for the reccurrent scheduler. Override run_task instead.""" return self.run_task(*args, **kwargs) def run_task(self, *args, **kwargs): """Perform the task. Must return a json-serializable value as it is passed back to the task scheduler using a celery event. """ raise NotImplementedError('tasks must implement the run_task() method') diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 0394053..d761f9a 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,84 +1,93 @@ import os import pytest import glob from datetime import timedelta from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR +# make sure we are not fooled by CELERY_ config environment vars +for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: + os.environ.pop(var) + +import swh.scheduler.celery_backend.config # noqa +# this import is needed here to enforce creation of the celery current app +# BEFORE the swh_app fixture is called, otherwise the Celery app instance from +# celery_backend.config becomes the celery.current_app + + DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/celery_tasks.py TASK_NAMES = ['ping', 'multiping', 'add', 'error'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): return [ 'swh.scheduler.tests.tasks', ] @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'msgpack', } # override the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application # with the test application. @pytest.fixture(scope='session') def swh_app(celery_session_app): - import swh.scheduler.celery_backend.config swh.scheduler.celery_backend.config.app = celery_session_app yield celery_session_app @pytest.fixture def swh_scheduler(request, postgresql_proc, postgresql): scheduler_config = { - 'scheduling_db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( + 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( host=postgresql_proc.host, port=postgresql_proc.port, user='postgres', dbname='tests') } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() scheduler = get_scheduler('local', scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index 5e0c21d..5b52588 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,36 +1,53 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest +import requests from swh.core.tests.server_testing import ServerTestFixture from swh.scheduler import get_scheduler from swh.scheduler.api.server import app from swh.scheduler.tests.test_scheduler import CommonSchedulerTest class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture, unittest.TestCase): """Test the remote scheduler API. This class doesn't define any tests as we want identical functionality between local and remote scheduler. All the tests are therefore defined in CommonSchedulerTest. """ def setUp(self): self.config = { 'scheduler': { 'cls': 'local', 'args': { - 'scheduling_db': 'dbname=%s' % self.TEST_DB_NAME, + 'db': 'dbname=%s' % self.TEST_DB_NAME, } } } self.app = app # this will setup the local scheduler... super().setUp() # accessible through a remote scheduler accessible on the # given port self.backend = get_scheduler('remote', {'url': self.url()}) + + def test_site_map(self): + sitemap = requests.get(self.url() + 'site-map') + assert sitemap.headers['Content-Type'] == 'application/json' + sitemap = sitemap.json() + + rules = set(x['rule'] for x in sitemap) + # we expect at least these rules + expected_rules = set('/'+rule for rule in ( + 'set_status_tasks', 'create_task_type', + 'get_task_type', 'get_task_types', 'create_tasks', 'disable_tasks', + 'get_tasks', 'search_tasks', 'peek_ready_tasks', + 'grab_ready_tasks', 'schedule_task_run', 'mass_schedule_task_runs', + 'start_task_run', 'end_task_run', 'filter_task_to_archive', + 'delete_archived_tasks')) + assert rules.issuperset(expected_rules), expected_rules - rules diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index cd3fc8c..b66147c 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,474 +1,484 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import os import random import unittest import uuid from collections import defaultdict import psycopg2 from arrow import utcnow import pytest from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler import get_scheduler from . import SQL_DIR +TASK_TYPES = { + 'git': { + 'type': 'update-git', + 'description': 'Update a git repository', + 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', + 'default_interval': datetime.timedelta(days=64), + 'min_interval': datetime.timedelta(hours=12), + 'max_interval': datetime.timedelta(days=64), + 'backoff_factor': 2, + 'max_queue_length': None, + 'num_retries': 7, + 'retry_delay': datetime.timedelta(hours=2), + }, + 'hg': { + 'type': 'update-hg', + 'description': 'Update a mercurial repository', + 'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository', + 'default_interval': datetime.timedelta(days=64), + 'min_interval': datetime.timedelta(hours=12), + 'max_interval': datetime.timedelta(days=64), + 'backoff_factor': 2, + 'max_queue_length': None, + 'num_retries': 7, + 'retry_delay': datetime.timedelta(hours=2), + }, +} + +TEMPLATES = { + 'git': { + 'type': 'update-git', + 'arguments': { + 'args': [], + 'kwargs': {}, + }, + 'next_run': None, + }, + 'hg': { + 'type': 'update-hg', + 'arguments': { + 'args': [], + 'kwargs': {}, + }, + 'next_run': None, + 'policy': 'oneshot', + } +} + + @pytest.mark.db class CommonSchedulerTest(SingleDbTestFixture): TEST_DB_NAME = 'softwareheritage-scheduler-test' TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') - def setUp(self): - super().setUp() - - tt = { - 'type': 'update-git', - 'description': 'Update a git repository', - 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', - 'default_interval': datetime.timedelta(days=64), - 'min_interval': datetime.timedelta(hours=12), - 'max_interval': datetime.timedelta(days=64), - 'backoff_factor': 2, - 'max_queue_length': None, - 'num_retries': 7, - 'retry_delay': datetime.timedelta(hours=2), - } - tt2 = tt.copy() - tt2['type'] = 'update-hg' - tt2['description'] = 'Update a mercurial repository' - tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository' - tt2['max_queue_length'] = 42 - tt2['num_retries'] = None - tt2['retry_delay'] = None - - self.task_types = { - tt['type']: tt, - tt2['type']: tt2, - } - - self.task1_template = t1_template = { - 'type': tt['type'], - 'arguments': { - 'args': [], - 'kwargs': {}, - }, - 'next_run': None, - } - self.task2_template = t2_template = copy.deepcopy(t1_template) - t2_template['type'] = tt2['type'] - t2_template['policy'] = 'oneshot' - def tearDown(self): - self.backend.close_connection() self.empty_tables() super().tearDown() def empty_tables(self, whitelist=["priority_ratio"]): query = """SELECT table_name FROM information_schema.tables WHERE table_schema = %%s and table_name not in (%s) """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) self.cursor.execute(query, ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def test_add_task_type(self): - tt, tt2 = self.task_types.values() + tt = TASK_TYPES['git'] self.backend.create_task_type(tt) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) with self.assertRaisesRegex(psycopg2.IntegrityError, r'\(type\)=\(%s\)' % tt['type']): self.backend.create_task_type(tt) + + tt2 = TASK_TYPES['hg'] self.backend.create_task_type(tt2) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) def test_get_task_types(self): - tt, tt2 = self.task_types.values() + tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg'] self.backend.create_task_type(tt) self.backend.create_task_type(tt2) self.assertCountEqual([tt2, tt], self.backend.get_task_types()) @staticmethod def _task_from_template(template, next_run, priority, *args, **kwargs): ret = copy.deepcopy(template) ret['next_run'] = next_run if priority: ret['priority'] = priority if args: ret['arguments']['args'] = list(args) if kwargs: ret['arguments']['kwargs'] = kwargs return ret def _pop_priority(self, priorities): if not priorities: return None for priority, remains in priorities.items(): if remains > 0: priorities[priority] = remains - 1 return priority return None def _tasks_from_template(self, template, max_timestamp, num, num_priority=0, priorities=None): if num_priority and priorities: priorities = { priority: ratio * num_priority for priority, ratio in priorities.items() } tasks = [] for i in range(num + num_priority): priority = self._pop_priority(priorities) tasks.append(self._task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, 'argument-%03d' % i, **{'kwarg%03d' % i: 'bogus-kwarg'} )) return tasks def _create_task_types(self): - for tt in self.task_types.values(): + for tt in TASK_TYPES.values(): self.backend.create_task_type(tt) def test_create_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() num_tasks_priority = 100 - tasks_1 = self._tasks_from_template(self.task1_template, utcnow(), 100) + tasks_1 = self._tasks_from_template( + TEMPLATES['git'], utcnow(), 100) tasks_2 = self._tasks_from_template( - self.task2_template, utcnow(), 100, - num_tasks_priority, priorities=priority_ratio) + TEMPLATES['hg'], utcnow(), 100, + num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids ret = self.backend.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results self.assertEqual(set_ret, set_ret1) self.assertEqual(len(ret), len(ret1)) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) - task_type = self.task_types[orig_task['type']] + task_type = TASK_TYPES[orig_task['type'].split('-')[-1]] self.assertNotIn(task['id'], ids) self.assertEqual(task['status'], 'next_run_not_scheduled') self.assertEqual(task['current_interval'], task_type['default_interval']) self.assertEqual(task['policy'], orig_task.get('policy', 'recurring')) priority = task.get('priority') if priority: actual_priorities[priority] += 1 self.assertEqual(task['retries_left'], task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] del task['current_interval'] del task['retries_left'] if 'policy' not in orig_task: del task['policy'] if 'priority' not in orig_task: del task['priority'] self.assertEqual(task, orig_task) self.assertEqual(dict(actual_priorities), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) def test_peek_ready_tasks_no_priority(self): self._create_task_types() t = utcnow() - task_type = self.task1_template['type'] - tasks = self._tasks_from_template(self.task1_template, t, 100) + task_type = TEMPLATES['git']['type'] + tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) random.shuffle(tasks) self.backend.create_tasks(tasks) ready_tasks = self.backend.peek_ready_tasks(task_type) self.assertEqual(len(ready_tasks), len(tasks)) for i in range(len(ready_tasks) - 1): self.assertLessEqual(ready_tasks[i]['next_run'], ready_tasks[i+1]['next_run']) # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=limit) self.assertEqual(len(ready_tasks_limited), limit) self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) # Limit by timestamp max_ts = tasks[limit-1]['next_run'] ready_tasks_timestamped = self.backend.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: self.assertLessEqual(ready_task['next_run'], max_ts) # Make sure we get proper behavior for the first ready tasks self.assertCountEqual( ready_tasks[:len(ready_tasks_timestamped)], ready_tasks_timestamped, ) # Limit by both ready_tasks_both = self.backend.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) self.assertLessEqual(len(ready_tasks_both), limit//3) for ready_task in ready_tasks_both: self.assertLessEqual(ready_task['next_run'], max_ts) self.assertIn(ready_task, ready_tasks[:limit//3]) def _priority_ratio(self): self.cursor.execute('select id, ratio from priority_ratio') priority_ratio = {} for row in self.cursor.fetchall(): priority_ratio[row[0]] = row[1] return priority_ratio def test_peek_ready_tasks_mixed_priorities(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() - task_type = self.task1_template['type'] + task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( - self.task1_template, t, + TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) # take all available tasks ready_tasks = self.backend.peek_ready_tasks( task_type) self.assertEqual(len(ready_tasks), len(tasks)) self.assertEqual(num_tasks_priority + num_tasks_no_priority, len(ready_tasks)) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get('priority') if priority: count_tasks_per_priority[priority] += 1 self.assertEqual(dict(count_tasks_per_priority), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get('priority') count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] self.assertTrue( actual_prio == expected_count or actual_prio == expected_count + 1) self.assertEqual(count_tasks_per_priority[None], num_tasks) def test_grab_ready_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() - task_type = self.task1_template['type'] + task_type = TEMPLATES['git']['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( - self.task1_template, t, + TEMPLATES['git'], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) first_ready_tasks = self.backend.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) grabbed_tasks = self.backend.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): self.assertEqual(peeked['status'], 'next_run_not_scheduled') del peeked['status'] self.assertEqual(grabbed['status'], 'next_run_scheduled') del grabbed['status'] self.assertEqual(peeked, grabbed) self.assertEqual(peeked['priority'], grabbed['priority']) def test_get_tasks(self): self._create_task_types() t = utcnow() - tasks = self._tasks_from_template(self.task1_template, t, 100) + tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) tasks = self.backend.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = tasks[:length] tasks[:length] = [] ret = self.backend.get_tasks(task['id'] for task in cur_tasks) self.assertCountEqual(ret, cur_tasks) def test_filter_task_to_archive(self): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types() _time = utcnow() - recurring = self._tasks_from_template(self.task1_template, _time, 12) - oneshots = self._tasks_from_template(self.task2_template, _time, 12) + recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) + oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) # Randomly update task's status per policy status_per_policy = {'recurring': 0, 'oneshot': 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] 'recurring': [(1, 'disabled'), (0, 'completed'), (0, 'next_run_not_scheduled')], 'oneshot': [(0, 'next_run_not_scheduled'), (1, 'disabled'), (1, 'completed')] } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task['policy'] _task_ids[policy].append(task['id']) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) self.backend.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled self.backend.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update self.backend.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) # retrieve tasks to archive after = _time.shift(days=-1).format('YYYY-MM-DD') before = utcnow().shift(days=1).format('YYYY-MM-DD') tasks_to_archive = list(self.backend.filter_task_to_archive( after_ts=after, before_ts=before, limit=total_tasks)) self.assertEqual(len(tasks_to_archive), total_tasks_filtered) actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: actual_filtered_per_status[task['task_policy']] += 1 self.assertEqual(actual_filtered_per_status, status_per_policy) def test_delete_archived_tasks(self): self._create_task_types() _time = utcnow() recurring = self._tasks_from_template( - self.task1_template, _time, 12) + TEMPLATES['git'], _time, 12) oneshots = self._tasks_from_template( - self.task2_template, _time, 12) + TEMPLATES['hg'], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) self.backend.delete_archived_tasks(_tasks) self.cursor.execute('select count(*) from task') tasks_count = self.cursor.fetchone() self.cursor.execute('select count(*) from task_run') tasks_run_count = self.cursor.fetchone() self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() - self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME} + self.config = {'db': 'dbname=' + self.TEST_DB_NAME} self.backend = get_scheduler('local', self.config) diff --git a/swh/scheduler/tests/updater/conftest.py b/swh/scheduler/tests/updater/conftest.py new file mode 100644 index 0000000..9cafcb5 --- /dev/null +++ b/swh/scheduler/tests/updater/conftest.py @@ -0,0 +1,33 @@ +import pytest +import glob +import os + +from swh.core.utils import numfile_sortkey as sortkey +from swh.scheduler.updater.backend import SchedulerUpdaterBackend +from swh.scheduler.tests import SQL_DIR +import swh.scheduler.tests.conftest # noqa + + +DUMP_FILES = os.path.join(SQL_DIR, 'updater', '*.sql') + + +@pytest.fixture +def swh_scheduler_updater(request, postgresql_proc, postgresql): + config = { + 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( + host=postgresql_proc.host, + port=postgresql_proc.port, + user='postgres', + dbname='tests') + } + + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + cursor = postgresql.cursor() + for fname in all_dump_files: + with open(fname) as fobj: + cursor.execute(fobj.read()) + postgresql.commit() + + backend = SchedulerUpdaterBackend(**config) + return backend diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py index baa8205..7caa85a 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -1,65 +1,36 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os -import unittest - -from arrow import utcnow from hypothesis import given from hypothesis.strategies import sets -import pytest -from swh.core.tests.db_testing import SingleDbTestFixture -from swh.scheduler.tests import SQL_DIR -from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent from . import from_regex -@pytest.mark.db -class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): - TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' - TEST_DB_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') - - def setUp(self): - super().setUp() - config = { - 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, - 'cache_read_limit': 1000, - } - self.backend = SchedulerUpdaterBackend(**config) - - def _empty_tables(self): - self.cursor.execute( - """SELECT table_name FROM information_schema.tables - WHERE table_schema = %s""", ('public', )) - tables = set(table for (table,) in self.cursor.fetchall()) - for table in tables: - self.cursor.execute('truncate table %s cascade' % table) - self.conn.commit() - - def tearDown(self): - self.backend.close_connection() - self._empty_tables() - super().tearDown() - - @given(sets( - from_regex( - r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), - min_size=10, max_size=15)) - def test_cache_read(self, urls): - def gen_events(urls): - for url in urls: - yield SWHEvent({ - 'url': url, - 'type': 'create', - 'origin_type': 'git', - }) - - self.backend.cache_put(gen_events(urls)) - r = self.backend.cache_read(timestamp=utcnow()) - - self.assertNotEqual(r, []) +@given(urls=sets( + from_regex( + r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), + min_size=10, max_size=15)) +def test_cache_read(urls, swh_scheduler_updater): + # beware that the fixture is only called once for all the tests + # generated by hypothesis, so the db is not cleared between calls. + # see the end of + # https://hypothesis.works/articles/hypothesis-pytest-fixtures/ + def gen_events(urls): + for url in urls: + yield SWHEvent({ + 'url': url, + 'type': 'create', + 'origin_type': 'git', + }) + known_urls = set(e['url'] for e in + swh_scheduler_updater.cache_read(limit=1000000)) + swh_scheduler_updater.cache_put(gen_events(urls)) + new_urls = {u.strip() for u in urls} - known_urls + all_urls = set(e['url'] for e in + swh_scheduler_updater.cache_read(limit=1000000)) + assert (all_urls - known_urls) == new_urls diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py index 1a2f2ac..93656a0 100644 --- a/swh/scheduler/tests/updater/test_consumer.py +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -1,195 +1,199 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from itertools import chain from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists, sampled_from, text, tuples from swh.scheduler.updater.consumer import UpdaterConsumer from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from . import UpdaterTestUtil, from_regex class FakeSchedulerUpdaterBackend: def __init__(self): self.events = [] def cache_put(self, events): self.events.append(events) class FakeUpdaterConsumerBase(UpdaterConsumer): - def __init__(self, backend_class=FakeSchedulerUpdaterBackend): - super().__init__(backend_class=backend_class) + def __init__(self, backend): + super().__init__(backend) self.connection_opened = False self.connection_closed = False self.consume_called = False self.has_events_called = False def open_connection(self): self.connection_opened = True def close_connection(self): self.connection_closed = True def convert_event(self, event): pass class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return True def consume_events(self): self.consume_called = True raise ValueError('Broken stuff') class UpdaterConsumerRaisingTest(unittest.TestCase): def setUp(self): - self.updater = FakeUpdaterConsumerRaise() + self.updater = FakeUpdaterConsumerRaise( + FakeSchedulerUpdaterBackend()) def test_running_raise(self): """Raising during run should finish fine. """ # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when with self.assertRaisesRegex(ValueError, 'Broken stuff'): self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertTrue(self.updater.consume_called) class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return False def consume_events(self): self.consume_called = True class UpdaterConsumerNoEventTest(unittest.TestCase): def setUp(self): - self.updater = FakeUpdaterConsumerNoEvent() + self.updater = FakeUpdaterConsumerNoEvent( + FakeSchedulerUpdaterBackend()) def test_running_does_not_consume(self): """Run with no events should do just fine""" # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertFalse(self.updater.consume_called) EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type'] class FakeUpdaterConsumer(FakeUpdaterConsumerBase): - def __init__(self, messages): - super().__init__() + def __init__(self, backend, messages): + super().__init__(backend) self.messages = messages self.debug = False def has_events(self): self.has_events_called = True return len(self.messages) > 0 def consume_events(self): self.consume_called = True for msg in self.messages: yield msg self.messages.pop() def convert_event(self, event, keys=EVENT_KEYS): for k in keys: v = event.get(k) if v is None: return None e = { 'type': event['type'], 'url': 'https://fake.url/%s' % event['repo']['name'], 'last_seen': event['created_at'], 'origin_type': event['origin_type'], } return SWHEvent(e) class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase): @settings(suppress_health_check=[HealthCheck.too_slow]) @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(text(), # event type from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text(), # origin type sampled_from(EVENT_KEYS)), # keys to drop min_size=3, max_size=10)) def test_running(self, events, uninteresting_events, incomplete_events): """Interesting events are written to cache, others are dropped """ # given ready_events = self._make_events(events) ready_uninteresting_events = self._make_events(uninteresting_events) ready_incomplete_events = self._make_incomplete_events( incomplete_events) - updater = FakeUpdaterConsumer(list(chain( - ready_events, ready_incomplete_events, - ready_uninteresting_events))) + updater = FakeUpdaterConsumer( + FakeSchedulerUpdaterBackend(), + list(chain( + ready_events, ready_incomplete_events, + ready_uninteresting_events))) self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) # when updater.run() # then self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) self.assertTrue(updater.connection_opened) self.assertTrue(updater.has_events_called) self.assertTrue(updater.connection_closed) self.assertTrue(updater.consume_called) self.assertEqual(updater.messages, []) # uninteresting or incomplete events are dropped self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index b87e345..92cc89d 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,164 +1,174 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import patch from hypothesis import given from hypothesis.strategies import sampled_from from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS, GHTorrentConsumer, events) +from swh.scheduler.updater.backend import SchedulerUpdaterBackend from . import UpdaterTestUtil, from_regex def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeChannel: """Fake Channel (virtual connection inside a connection) """ def close(self): self.close = True class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False self._channel = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self._channel = True return FakeChannel() class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): def setUp(self): - self.fake_config = { - 'conn': { - 'url': 'amqp://u:p@https://somewhere:9807', + config = { + 'ghtorrent': { + 'rabbitmq': { + 'conn': { + 'url': 'amqp://u:p@https://somewhere:9807', + }, + 'prefetch_read': 17, + }, + 'batch_cache_write': 42, + }, + 'scheduler_updater': { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-updater-dev', + }, }, - 'debug': True, - 'batch_cache_write': 10, - 'rabbitmq_prefetch_read': 100, } - self.consumer = GHTorrentConsumer(self.fake_config, - _connection_class=FakeConnection) + GHTorrentConsumer.connection_class = FakeConnection + with patch.object( + SchedulerUpdaterBackend, '__init__', return_value=None): + self.consumer = GHTorrentConsumer(**config) - def test_init(self): + @patch('swh.scheduler.updater.backend.SchedulerUpdaterBackend') + def test_init(self, mock_backend): # given # check init is ok - self.assertEqual(self.consumer.debug, - self.fake_config['debug']) - self.assertEqual(self.consumer.batch, - self.fake_config['batch_cache_write']) - self.assertEqual(self.consumer.prefetch_read, - self.fake_config['rabbitmq_prefetch_read']) - self.assertEqual(self.consumer.config, self.fake_config) + self.assertEqual(self.consumer.batch, 42) + self.assertEqual(self.consumer.prefetch_read, 17) def test_has_events(self): self.assertTrue(self.consumer.has_events()) def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, - self.fake_config['conn']['url']) + 'amqp://u:p@https://somewhere:9807') self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) self.assertIsInstance(self.consumer.channel, FakeChannel) @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def test_convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name, 'git') actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'cnt': 1, 'origin_type': 'git', } self.assertEqual(event, expected_event) @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(INTERESTING_EVENT_KEYS)) def test_convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, 'git', missing_data_key) + logger = self.consumer.log + del self.consumer.log # prevent gazillions of warnings actual_converted_event = self.consumer.convert_event(input_event) - + self.consumer.log = logger self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') def test_consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ self._make_event('PushEvent', 'user/some-repo', 'git'), self._make_event('PushEvent', 'user2/some-other-repo', 'git'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, self.consumer.channel, 'fake-queue', no_ack=False, - limit=self.fake_config['rabbitmq_prefetch_read'] + limit=17 ) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index 1d932a5..77dee54 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,158 +1,158 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from glob import glob import pytest from swh.core.utils import numfile_sortkey as sortkey from swh.core.tests.db_testing import DbTestFixture from swh.scheduler.tests import SQL_DIR from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.writer import UpdaterWriter from . import UpdaterTestUtil @pytest.mark.db class CommonSchedulerTest(DbTestFixture): TEST_SCHED_DB = 'softwareheritage-scheduler-test' TEST_SCHED_DUMP = os.path.join(SQL_DIR, '*.sql') TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' TEST_SCHED_UPDATER_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') @classmethod def setUpClass(cls): cls.add_db(cls.TEST_SCHED_DB, [(sqlfn, 'psql') for sqlfn in sorted(glob(cls.TEST_SCHED_DUMP), key=sortkey)]) cls.add_db(cls.TEST_SCHED_UPDATER_DB, [(sqlfn, 'psql') for sqlfn in sorted(glob(cls.TEST_SCHED_UPDATER_DUMP), key=sortkey)]) super().setUpClass() def tearDown(self): self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) self.reset_db_tables(self.TEST_SCHED_DB, excluded=['task_type', 'priority_ratio']) super().tearDown() class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() config = { 'scheduler': { 'cls': 'local', 'args': { - 'scheduling_db': 'dbname=softwareheritage-scheduler-test', + 'db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { - 'scheduling_updater_db': - 'dbname=softwareheritage-scheduler-updater-test', - 'cache_read_limit': 5, + 'cls': 'local', + 'args': { + 'db': + 'dbname=softwareheritage-scheduler-updater-test', + 'cache_read_limit': 5, + }, + }, + 'updater_writer': { + 'pause': 0.1, + 'verbose': False, }, - 'pause': 0.1, - 'verbose': False, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend - def tearDown(self): - self.scheduler_backend.close_connection() - self.scheduler_updater_backend.close_connection() - super().tearDown() - def test_run_ko(self): """Only git tasks are supported for now, other types are dismissed. """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'svn')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # other reads after writes are still empty since it's not supported self.assertEqual(len(r), 0) def test_run_ok(self): """Only git origin are supported for now """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'git')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() # now, we should have scheduling task ready r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEqual(len(r), expected_length) # Check the task has been scheduled for t in r: self.assertEqual(t['type'], 'origin-update-git') self.assertEqual(t['priority'], 'normal') self.assertEqual(t['policy'], 'oneshot') self.assertEqual(t['status'], 'next_run_not_scheduled') # writer has nothing to do now self.writer.run() # so no more data in cache data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), 0) # provided, no runner is ran, still the same amount of scheduling tasks r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEqual(len(r), expected_length) diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py index 8193996..4ee85fa 100644 --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -1,82 +1,109 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow -from swh.core.config import SWHConfig -from swh.scheduler.backend import DbBackend, autocommit +import psycopg2.pool +import psycopg2.extras +from swh.core.db import BaseDb +from swh.core.db.common import db_transaction, db_transaction_generator +from swh.scheduler.backend import format_query -class SchedulerUpdaterBackend(SWHConfig, DbBackend): + +class SchedulerUpdaterBackend: CONFIG_BASE_FILENAME = 'backend/scheduler-updater' - DEFAULT_CONFIG = { - 'scheduling_updater_db': ( - 'str', 'dbname=softwareheritage-scheduler-updater-dev'), - 'cache_read_limit': ('int', 1000), - } - - def __init__(self, **override_config): - super().__init__() - if override_config: - self.config = override_config +# 'cache_read_limit': ('int', 1000), + + def __init__(self, db, cache_read_limit=1000, + min_pool_conns=1, max_pool_conns=10): + """ + Args: + db_conn: either a libpq connection string, or a psycopg2 connection + + """ + if isinstance(db, psycopg2.extensions.connection): + self._pool = None + self._db = BaseDb(db) else: - self.config = self.parse_config_file(global_config=False) - self.db = None - self.db_conn_dsn = self.config['scheduling_updater_db'] - self.limit = self.config['cache_read_limit'] - self.reconnect() + self._pool = psycopg2.pool.ThreadedConnectionPool( + min_pool_conns, max_pool_conns, db, + cursor_factory=psycopg2.extras.RealDictCursor, + ) + self._db = None + self.limit = cache_read_limit + + def get_db(self): + if self._db: + return self._db + return BaseDb.from_pool(self._pool) cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] - @autocommit - def cache_put(self, events, timestamp=None, cursor=None): + @db_transaction() + def cache_put(self, events, timestamp=None, db=None, cur=None): """Write new events in the backend. """ - if timestamp is None: - timestamp = utcnow() - - def prepare_events(events): - for e in events: - event = e.get() - seen = event['last_seen'] - if seen is None: - event['last_seen'] = timestamp - yield event - - cursor.execute('select swh_mktemp_cache()') - self.copy_to(prepare_events(events), - 'tmp_cache', self.cache_put_keys, cursor=cursor) - cursor.execute('select swh_cache_put()') + cur.execute('select swh_mktemp_cache()') + db.copy_to(prepare_events(events, timestamp), + 'tmp_cache', self.cache_put_keys, cur=cur) + cur.execute('select swh_cache_put()') cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', 'last_seen'] - @autocommit - def cache_read(self, timestamp=None, limit=None, cursor=None): + @db_transaction_generator() + def cache_read(self, timestamp=None, limit=None, db=None, cur=None): """Read events from the cache prior to timestamp. + Note that limit=None does not mean 'no limit' but use the default + limit (see cache_read_limit constructor argument). + """ if not timestamp: timestamp = utcnow() if not limit: limit = self.limit - q = self._format_query('select {keys} from swh_cache_read(%s, %s)', - self.cache_read_keys) - cursor.execute(q, (timestamp, limit)) - for r in cursor.fetchall(): - r['id'] = r['id'].tobytes() - yield r + q = format_query('select {keys} from swh_cache_read(%s, %s)', + self.cache_read_keys) + cur.execute(q, (timestamp, limit)) + yield from cur.fetchall() - @autocommit - def cache_remove(self, entries, cursor=None): + @db_transaction() + def cache_remove(self, entries, db=None, cur=None): """Clean events from the cache """ q = 'delete from cache where url in (%s)' % ( ', '.join(("'%s'" % e for e in entries)), ) - cursor.execute(q) + cur.execute(q) + + +def prepare_events(events, timestamp=None): + if timestamp is None: + timestamp = utcnow() + outevents = [] + urls = [] + for e in events: + event = e.get() + url = event['url'].strip() + if event['last_seen'] is None: + event['last_seen'] = timestamp + event['url'] = url + + if url in urls: + idx = urls.index(url) + urls.append(urls.pop(idx)) + pevent = outevents.pop(idx) + event['cnt'] += pevent['cnt'] + event['last_seen'] = max( + event['last_seen'], pevent['last_seen']) + else: + urls.append(url) + outevents.append(event) + return outevents diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py index e0465f2..a41404b 100644 --- a/swh/scheduler/updater/consumer.py +++ b/swh/scheduler/updater/consumer.py @@ -1,140 +1,138 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from abc import ABCMeta, abstractmethod -from swh.scheduler.updater.backend import SchedulerUpdaterBackend - class UpdaterConsumer(metaclass=ABCMeta): """Event consumer """ - def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend, - log_class='swh.scheduler.updater.consumer.UpdaterConsumer'): + def __init__(self, backend, batch_cache_write=1000): super().__init__() self._reset_cache() - self.backend = backend_class() - self.batch = batch + self.backend = backend + self.batch = int(batch_cache_write) logging.basicConfig(level=logging.DEBUG) - self.log = logging.getLogger(log_class) + self.log = logging.getLogger('%s.%s' % ( + self.__class__.__module__, self.__class__.__name__)) def _reset_cache(self): """Reset internal cache. """ self.count = 0 self.seen_events = set() self.events = [] def is_interesting(self, event): """Determine if an event is interesting or not. Args: event (SWHEvent): SWH event """ return event.is_interesting() @abstractmethod def convert_event(self, event): """Parse an event into an SWHEvent. """ pass def process_event(self, event): """Process converted and interesting event. Args: event (SWHEvent): Event to process if deemed interesting """ try: if event.url in self.seen_events: event.cnt += 1 else: self.events.append(event) self.seen_events.add(event.url) self.count += 1 finally: if self.count >= self.batch: if self.events: self.backend.cache_put(self.events) self._reset_cache() def _flush(self): """Flush remaining internal cache if any. """ if self.events: self.backend.cache_put(self.events) self._reset_cache() @abstractmethod def has_events(self): """Determine if there remains events to consume. Returns boolean value, true for remaining events, False otherwise """ pass @abstractmethod def consume_events(self): """The main entry point to consume events. This should either yield or return message for consumption. """ pass @abstractmethod def open_connection(self): """Open a connection to the remote system we are supposed to consume from. """ pass @abstractmethod def close_connection(self): """Close opened connection to the remote system. """ pass def run(self): """The main entry point to consume events. """ try: self.open_connection() while self.has_events(): for _event in self.consume_events(): event = self.convert_event(_event) if not event: self.log.warning( 'Incomplete event dropped %s' % _event) continue if not self.is_interesting(event): continue if self.debug: self.log.debug('Event: %s' % event) try: self.process_event(event) except Exception: self.log.exception( 'Problem when processing event %s' % _event) continue except Exception as e: self.log.error('Error raised during consumption: %s' % e) raise e finally: self.close_connection() self._flush() diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py index 2b9e5fc..8ece752 100644 --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ b/swh/scheduler/updater/ghtorrent/__init__.py @@ -1,146 +1,143 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from kombu import Connection, Exchange, Queue from kombu.common import collect_replies -from swh.core.config import SWHConfig +from swh.core.config import merge_configs + from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.consumer import UpdaterConsumer +from swh.scheduler.updater.backend import SchedulerUpdaterBackend events = { # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } +INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] -class RabbitMQConn(SWHConfig): - """RabbitMQ Connection class +DEFAULT_CONFIG = { + 'ghtorrent': { + 'batch_cache_write': 1000, + 'rabbitmq': { + 'prefetch_read': 100, + 'conn': { + 'url': 'amqp://guest:guest@localhost:5672', + 'exchange_name': 'ght-streams', + 'routing_key': 'something', + 'queue_name': 'fake-events', + }, + }, + }, + 'scheduler_updater': { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-updater-dev', + 'cache_read_limit': 1000, + }, + }, +} - """ - CONFIG_BASE_FILENAME = 'backend/ghtorrent' - - DEFAULT_CONFIG = { - 'conn': ('dict', { - 'url': 'amqp://guest:guest@localhost:5672', - 'exchange_name': 'ght-streams', - 'routing_key': 'something', - 'queue_name': 'fake-events' - }) - } - ADDITIONAL_CONFIG = {} +class GHTorrentConsumer(UpdaterConsumer): + """GHTorrent events consumer + + """ + connection_class = Connection def __init__(self, **config): - super().__init__() - if config and set(config.keys()) - {'log_class'} != set(): - self.config = config - else: - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - - self.conn_string = self.config['conn']['url'] - self.exchange = Exchange(self.config['conn']['exchange_name'], - 'topic', durable=True) - self.routing_key = self.config['conn']['routing_key'] - self.queue = Queue(self.config['conn']['queue_name'], - exchange=self.exchange, - routing_key=self.routing_key, + self.config = merge_configs(DEFAULT_CONFIG, config) + + ght_config = self.config['ghtorrent'] + rmq_config = ght_config['rabbitmq'] + self.prefetch_read = int(rmq_config.get('prefetch_read', 100)) + + exchange = Exchange( + rmq_config['conn']['exchange_name'], + 'topic', durable=True) + routing_key = rmq_config['conn']['routing_key'] + self.queue = Queue(rmq_config['conn']['queue_name'], + exchange=exchange, + routing_key=routing_key, auto_delete=True) + if self.config['scheduler_updater']['cls'] != 'local': + raise ValueError( + 'The scheduler_updater can only be a cls=local for now') + backend = SchedulerUpdaterBackend( + **self.config['scheduler_updater']['args']) -INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] - - -class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): - """GHTorrent events consumer - - """ - ADDITIONAL_CONFIG = { - 'debug': ('bool', False), - 'batch_cache_write': ('int', 1000), - 'rabbitmq_prefetch_read': ('int', 100), - } - - def __init__(self, config=None, _connection_class=Connection): - if config is None: - super().__init__( - log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer') - else: - self.config = config - self._connection_class = _connection_class - self.debug = self.config['debug'] - self.batch = self.config['batch_cache_write'] - self.prefetch_read = self.config['rabbitmq_prefetch_read'] + super().__init__(backend, ght_config.get('batch_cache_write', 1000)) def has_events(self): """Always has events """ return True def convert_event(self, event): """Given ghtorrent event, convert it to a SWHEvent instance. """ if isinstance(event, str): event = json.loads(event) - for k in INTERESTING_EVENT_KEYS: if k not in event: if hasattr(self, 'log'): - self.log.warn( + self.log.warning( 'Event should have the \'%s\' entry defined' % k) return None _type = event['type'].lower().rstrip('Event') _repo_name = 'https://github.com/%s' % event['repo']['name'] return SWHEvent({ 'type': _type, 'url': _repo_name, 'last_seen': event['created_at'], 'origin_type': 'git', }) def open_connection(self): """Open rabbitmq connection """ - self.conn = self._connection_class(self.config['conn']['url']) + self.conn = self.connection_class( + self.config['ghtorrent']['rabbitmq']['conn']['url']) self.conn.connect() self.channel = self.conn.channel() def close_connection(self): """Close rabbitmq connection """ self.channel.close() self.conn.release() def consume_events(self): """Consume and yield queue messages """ yield from collect_replies( self.conn, self.channel, self.queue, no_ack=False, limit=self.prefetch_read) diff --git a/swh/scheduler/updater/ghtorrent/cli.py b/swh/scheduler/updater/ghtorrent/cli.py index db05060..f94290b 100644 --- a/swh/scheduler/updater/ghtorrent/cli.py +++ b/swh/scheduler/updater/ghtorrent/cli.py @@ -1,28 +1,23 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -import logging - -from swh.scheduler.updater.ghtorrent import GHTorrentConsumer @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') -def main(verbose): +@click.pass_context +def main(ctx, verbose): """Consume events from ghtorrent and write them to cache. """ - log = logging.getLogger('swh.scheduler.updater.ghtorrent.cli') - log.addHandler(logging.StreamHandler()) - _loglevel = logging.DEBUG if verbose else logging.INFO - log.setLevel(_loglevel) - - GHTorrentConsumer().run() + click.echo("Deprecated! Use 'swh-scheduler updater' instead.", + err=True) + ctx.exit(1) if __name__ == '__main__': main() diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py index 829f31c..99bab3a 100644 --- a/swh/scheduler/updater/writer.py +++ b/swh/scheduler/updater/writer.py @@ -1,122 +1,96 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import time from arrow import utcnow -from swh.core.config import SWHConfig from swh.core import utils from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.scheduler.updater.backend import SchedulerUpdaterBackend -class UpdaterWriter(SWHConfig): +class UpdaterWriter: """Updater writer in charge of updating the scheduler db with latest prioritized oneshot tasks In effect, this: - reads the events from scheduler updater's db - converts those events into priority oneshot tasks - dumps them into the scheduler db """ - CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' - DEFAULT_CONFIG = { - # access to the scheduler backend - 'scheduler': ('dict', { - 'cls': 'local', - 'args': { - 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', - }, - }), - # access to the scheduler updater cache - 'scheduler_updater': ('dict', { - 'scheduling_updater_db': - 'dbname=softwareheritage-scheduler-updater-dev', - 'cache_read_limit': 1000, - }), - # waiting time between db reads - 'pause': ('int', 10), - # verbose or not - 'verbose': ('bool', False), - } def __init__(self, **config): - if config: - self.config = config - else: - self.config = self.parse_config_file() - + self.config = config + if self.config['scheduler_updater']['cls'] != 'local': + raise ValueError( + 'The scheduler_updater can only be a cls=local for now') self.scheduler_updater_backend = SchedulerUpdaterBackend( - **self.config['scheduler_updater']) + **self.config['scheduler_updater']['args']) self.scheduler_backend = get_scheduler(**self.config['scheduler']) - self.pause = self.config['pause'] + self.pause = self.config.get('updater_writer', {}).get('pause', 10) self.log = logging.getLogger( 'swh.scheduler.updater.writer.UpdaterWriter') - self.log.setLevel( - logging.DEBUG if self.config['verbose'] else logging.INFO) def convert_to_oneshot_task(self, event): """Given an event, convert it into oneshot task with priority Args: event (dict): The event to convert to task """ if event['origin_type'] == 'git': return create_oneshot_task_dict( 'origin-update-git', event['url'], priority='normal') self.log.warning('Type %s is not supported for now, only git' % ( event['origin_type'], )) return None def write_event_to_scheduler(self, events): """Write events to the scheduler and yield ids when done""" # convert events to oneshot tasks oneshot_tasks = filter(lambda e: e is not None, map(self.convert_to_oneshot_task, events)) # write event to scheduler self.scheduler_backend.create_tasks(list(oneshot_tasks)) for e in events: yield e['url'] def run(self): """First retrieve events from cache (including origin_type, cnt), then convert them into oneshot tasks with priority, then write them to the scheduler db, at last remove them from cache. """ while True: timestamp = utcnow() events = list(self.scheduler_updater_backend.cache_read(timestamp)) if not events: break for urls in utils.grouper(self.write_event_to_scheduler(events), n=100): self.scheduler_updater_backend.cache_remove(urls) time.sleep(self.pause) @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') -def main(verbose): - log = logging.getLogger('swh.scheduler.updater.writer') - log.addHandler(logging.StreamHandler()) - _loglevel = logging.DEBUG if verbose else logging.INFO - log.setLevel(_loglevel) - - UpdaterWriter().run() +@click.pass_context +def main(ctx, verbose): + click.echo("Deprecated! Use 'swh-scheduler updater' instead.", + err=True) + ctx.exit(1) if __name__ == '__main__': main() diff --git a/version.txt b/version.txt index 574f7df..b9371a2 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.40-0-g53136b7 \ No newline at end of file +v0.0.41-0-gdb25694 \ No newline at end of file