diff --git a/.gitignore b/.gitignore index 898ba80..700f993 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,11 @@ *.pyc *.sw? *~ .coverage .eggs/ __pycache__ *.egg-info/ +build/ +dist/ version.txt /.hypothesis/ diff --git a/MANIFEST.in b/MANIFEST.in index e7c46fc..d6f7f03 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,6 @@ +include README.md include Makefile include requirements.txt include requirements-swh.txt include version.txt +recursive-include swh/scheduler/sql *.sql diff --git a/PKG-INFO b/PKG-INFO index 2452bd9..783edcf 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,65 @@ -Metadata-Version: 1.0 +Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.31 +Version: 0.0.32 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Description: UNKNOWN +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler +Description: swh-scheduler + ============= + + Job scheduler for the Software Heritage project. + + Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., + listing a forge, loading new stuff from a Git repository) and one-off + activities (e.g., loading a specific version of a source package). + + + # Tests + + ## Running test manually + + ### Test data + + To be able to run (unit) tests, you need to have the + [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] + in the parent directory. If you have set your environment following the + [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] + document everythong should be set up just fine. + + Otherwise: + + ``` + ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata + ``` + + ### Required services + + Unit tests that require a running celery broker uses an in memory broker/result + backend by default, but you can choose to use a true broker by setting + `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. + + For example: + + ``` + $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests + + ..................................... + ---------------------------------------------------------------------- + Ran 37 tests in 15.578s + + OK + ``` + Platform: UNKNOWN +Classifier: Programming Language :: Python :: 3 +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) +Classifier: Operating System :: OS Independent +Classifier: Development Status :: 5 - Production/Stable +Description-Content-Type: text/markdown +Provides-Extra: testing diff --git a/README.md b/README.md new file mode 100644 index 0000000..01a2c41 --- /dev/null +++ b/README.md @@ -0,0 +1,45 @@ +swh-scheduler +============= + +Job scheduler for the Software Heritage project. + +Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., +listing a forge, loading new stuff from a Git repository) and one-off +activities (e.g., loading a specific version of a source package). + + +# Tests + +## Running test manually + +### Test data + +To be able to run (unit) tests, you need to have the +[[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] +in the parent directory. If you have set your environment following the +[[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] +document everythong should be set up just fine. + +Otherwise: + +``` +~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata +``` + +### Required services + +Unit tests that require a running celery broker uses an in memory broker/result +backend by default, but you can choose to use a true broker by setting +`CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. + +For example: + +``` +$ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests + +..................................... +---------------------------------------------------------------------- +Ran 37 tests in 15.578s + +OK +``` diff --git a/bin/swh-worker-control b/bin/swh-worker-control index 9785b37..15516f7 100755 --- a/bin/swh-worker-control +++ b/bin/swh-worker-control @@ -1,268 +1,268 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from fnmatch import fnmatch from operator import itemgetter import os import sys import click def list_remote_workers(inspect): ping_replies = inspect.ping() if not ping_replies: return {} workers = list(sorted(ping_replies)) ret = {} for worker_name in workers: if not worker_name.startswith('celery@'): print('Unsupported worker: %s' % worker_name, file=sys.stderr) continue type, host = worker_name[len('celery@'):].split('.', 1) worker = { 'name': worker_name, 'host': host, 'type': type, } ret[worker_name] = worker return ret def make_filters(filter_host, filter_type): """Parse the filters and create test functions""" def include(field, value): def filter(worker, field=field, value=value): return fnmatch(worker[field], value) return filter def exclude(field, value): def filter(worker, field=field, value=value): return not fnmatch(worker[field], value) return filter filters = [] for host in filter_host: if host.startswith('-'): filters.append(exclude('host', host[1:])) else: filters.append(include('host', host)) for type_ in filter_type: if type_.startswith('-'): filters.append(exclude('type', type_[1:])) else: filters.append(include('type', type_)) return filters def filter_workers(workers, filters): """Filter workers according to the set criteria""" return {name: worker for name, worker in workers.items() if all(check(worker) for check in filters)} def get_clock_offsets(workers, inspect): """Add a clock_offset entry for each worker""" err_msg = 'Could not get monotonic clock for {worker}' t = datetime.datetime.now(tz=datetime.timezone.utc) for worker, clock in inspect._request('monotonic').items(): monotonic = clock.get('monotonic') if monotonic is None: monotonic = 0 click.echo(err_msg.format(worker=worker), err=True) dt = datetime.timedelta(seconds=monotonic) workers[worker]['clock_offset'] = t - dt def worker_to_wallclock(worker, monotonic): """Convert a monotonic timestamp from a worker to a wall clock time""" dt = datetime.timedelta(seconds=monotonic) return worker['clock_offset'] + dt @click.group() @click.option('--instance-config', metavar='CONFIG', default=None, help='Use this worker instance configuration') @click.option('--host', metavar='HOSTNAME_FILTER', multiple=True, help='Filter by hostname') @click.option('--type', metavar='WORKER_TYPE_FILTER', multiple=True, help='Filter by worker type') @click.option('--timeout', metavar='TIMEOUT', type=float, default=1.0, help='Timeout for remote control communication') @click.option('--debug/--no-debug', default=False, help='Turn on debugging') @click.pass_context def cli(ctx, debug, timeout, instance_config, host, type): """Manage the Software Heritage workers Filters support globs; a filter starting with a "-" excludes the corresponding values. """ if instance_config: os.environ['SWH_WORKER_INSTANCE'] = instance_config from swh.scheduler.celery_backend.config import app full_inspect = app.control.inspect(timeout=timeout) workers = filter_workers( list_remote_workers(full_inspect), make_filters(host, type) ) ctx.obj['workers'] = workers destination = list(workers) inspect = app.control.inspect(destination=destination, timeout=timeout) ctx.obj['inspect'] = inspect get_clock_offsets(workers, inspect) ctx.obj['control'] = app.control ctx.obj['destination'] = destination ctx.obj['timeout'] = timeout ctx.obj['debug'] = debug @cli.command() @click.pass_context def list_workers(ctx): """List the currently running workers""" workers = ctx.obj['workers'] for worker_name, worker in sorted(workers.items()): click.echo("{type} alive on {host}".format(**worker)) if not workers: sys.exit(2) @cli.command() @click.pass_context def list_tasks(ctx): """List the tasks currently running on workers""" task_template = ('{worker} {name}' '[{id} ' 'started={started:%Y-%m-%mT%H:%M:%S} ' 'pid={worker_pid}] {args} {kwargs}') inspect = ctx.obj['inspect'] workers = ctx.obj['workers'] active = inspect.active() if not active: click.echo('No reply from workers', err=True) sys.exit(2) has_tasks = False for worker_name, tasks in sorted(active.items()): worker = workers[worker_name] if not tasks: click.echo("No active tasks on {name}".format(**worker), err=True) for task in sorted(tasks, key=itemgetter('time_start')): task['started'] = worker_to_wallclock(worker, task['time_start']) click.echo(task_template.format(worker=worker_name, **task)) has_tasks = True if not has_tasks: sys.exit(2) @cli.command() @click.pass_context def list_queues(ctx): """List all the queues currently enabled on the workers""" inspect = ctx.obj['inspect'] active = inspect.active_queues() if not active: click.echo('No reply from workers', err=True) sys.exit(2) has_queues = False for worker_name, queues in sorted(active.items()): queues = sorted(queue['name'] for queue in queues) if queues: click.echo('{worker} {queues}'.format(worker=worker_name, queues=' '.join(queues))) has_queues = True else: click.echo('No queues for {worker}'.format(worker=worker_name), err=True) if not has_queues: sys.exit(2) @cli.command() @click.option('--noop', is_flag=True, default=False, help='Do not proceed') @click.argument('queues', nargs=-1) @click.pass_context def remove_queues(ctx, noop, queues): """Cancel the queue for the given workers""" msg_template = 'Canceling queue {queue} on worker {worker}{noop}' inspect = ctx.obj['inspect'] control = ctx.obj['control'] timeout = ctx.obj['timeout'] active = inspect.active_queues() if not queues: queues = ['*'] if not active: click.echo('No reply from workers', err=True) sys.exit(2) for worker, active_queues in sorted(active.items()): for queue in sorted(active_queues, key=itemgetter('name')): if any(fnmatch(queue['name'], name) for name in queues): msg = msg_template.format(queue=queue['name'], worker=worker, noop=' (noop)' if noop else '') click.echo(msg, err=True) if not noop: control.cancel_consumer(queue['name'], destination=[worker], timeout=timeout) @cli.command() @click.option('--noop', is_flag=True, default=False, help='Do not proceed') @click.argument('queues', nargs=-1) @click.pass_context def add_queues(ctx, noop, queues): """Start the queue for the given workers""" msg_template = 'Starting queue {queue} on worker {worker}{noop}' control = ctx.obj['control'] timeout = ctx.obj['timeout'] workers = ctx.obj['workers'] if not workers: click.echo('No reply from workers', err=True) sys.exit(2) for worker in sorted(workers): for queue in queues: msg = msg_template.format(queue=queue, worker=worker, noop=' (noop)' if noop else '') click.echo(msg, err=True) if not noop: ret = control.add_consumer(queue, destination=[worker], timeout=timeout) print(ret) if __name__ == '__main__': cli(obj={}) diff --git a/debian/control b/debian/control index f08c2a1..be81f18 100644 --- a/debian/control +++ b/debian/control @@ -1,32 +1,32 @@ Source: swh-scheduler Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-arrow, python3-celery, python3-click, python3-elasticsearch (>= 5.4.0), python3-flask, python3-hypothesis, python3-kombu, python3-nose, python3-psycopg2, python3-setuptools, - python3-swh.core (>= 0.0.40~), + python3-swh.core (>= 0.0.44~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSCH/ Package: python3-swh.scheduler Architecture: all -Depends: python3-swh.core (>= 0.0.40~), ${misc:Depends}, ${python3:Depends} +Depends: python3-swh.core (>= 0.0.44~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Scheduler Package: python3-swh.scheduler.updater Architecture: all Depends: python3-swh.scheduler (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Scheduler Updater diff --git a/docs/index.rst b/docs/index.rst index ad59041..0cb463d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,17 +1,21 @@ .. _swh-scheduler: -Software Heritage - Development Documentation -============================================= +Software Heritage - Job scheduler +================================= + +Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., +listing a forge, loading new stuff from a Git repository) and one-off +activities (e.g., loading a specific version of a source package). + .. toctree:: :maxdepth: 2 :caption: Contents: - Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/requirements-swh.txt b/requirements-swh.txt index 9e6b6d1..e7de3e1 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1 @@ -swh.core >= 0.0.40 +swh.core >= 0.0.44 diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..8a26d2c --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,3 @@ +hypothesis +nose +celery diff --git a/requirements.txt b/requirements.txt index 37d1827..a5b03b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,15 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html arrow celery Click -elasticsearch>5.4 +elasticsearch > 5.4 flask kombu psycopg2 vcversioner # test dependencies # hypothesis diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index 72825ac..65d2792 --- a/setup.py +++ b/setup.py @@ -1,32 +1,69 @@ +#!/usr/bin/env python3 +# Copyright (C) 2015-2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + from setuptools import setup, find_packages +from os import path +from io import open + +here = path.abspath(path.dirname(__file__)) + +# Get the long description from the README file +with open(path.join(here, 'README.md'), encoding='utf-8') as f: + long_description = f.read() + + +def parse_requirements(name=None): + if name: + reqf = 'requirements-%s.txt' % name + else: + reqf = 'requirements.txt' -def parse_requirements(): requirements = [] - for reqf in ('requirements.txt', 'requirements-swh.txt'): - with open(reqf) as f: - for line in f.readlines(): - line = line.strip() - if not line or line.startswith('#'): - continue - requirements.append(line) + if not path.exists(reqf): + return requirements + + with open(reqf) as f: + for line in f.readlines(): + line = line.strip() + if not line or line.startswith('#'): + continue + requirements.append(line) return requirements setup( name='swh.scheduler', description='Software Heritage Scheduler', + long_description=long_description, + long_description_content_type='text/markdown', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSCH/', packages=find_packages(), scripts=['bin/swh-worker-control'], - install_requires=parse_requirements(), + setup_requires=['vcversioner'], + install_requires=parse_requirements() + parse_requirements('swh'), + extras_require={'testing': parse_requirements('test')}, + vcversioner={}, + include_package_data=True, entry_points=''' [console_scripts] swh-scheduler=swh.scheduler.cli:cli ''', - setup_requires=['vcversioner'], - vcversioner={}, - include_package_data=True, + classifiers=[ + "Programming Language :: Python :: 3", + "Intended Audience :: Developers", + "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", + "Operating System :: OS Independent", + "Development Status :: 5 - Production/Stable", + ], + project_urls={ + 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', + 'Funding': 'https://www.softwareheritage.org/donate', + 'Source': 'https://forge.softwareheritage.org/source/swh-scheduler', + }, ) diff --git a/sql/Makefile b/sql/Makefile index 9f92954..5fc56d0 100644 --- a/sql/Makefile +++ b/sql/Makefile @@ -1,49 +1,71 @@ # Depends: postgresql-client, postgresql-autodoc DBNAME = softwareheritage-scheduler-dev DOCDIR = autodoc -SQL_SCHEMA = swh-scheduler-schema.sql -SQL_DATA = swh-scheduler-data.sql +SQL_SCHEMA = 30-swh-schema.sql +SQL_DATA = 40-swh-data.sql SQLS = $(SQL_SCHEMA) $(SQL_DATA) +SQL_FILES = $(abspath $(addprefix $(CURDIR)/../swh/scheduler/sql/,$(SQLS))) PSQL_BIN = psql PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1 PSQL = $(PSQL_BIN) $(PSQL_FLAGS) +PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS)) + all: createdb: createdb-stamp -createdb-stamp: $(SQL_INIT) +createdb-stamp: $(SQL_FILES) +ifndef PIFPAF + -dropdb $(DBNAME) +endif createdb $(DBNAME) +ifndef PIFPAF touch $@ +else + rm -f $@ +endif filldb: filldb-stamp filldb-stamp: createdb-stamp - cat $(SQLS) | $(PSQL) $(DBNAME) + cat $(SQL_FILES) | $(PSQL) $(DBNAME) +ifndef PIFPAF touch $@ +else + rm -f $@ +endif dropdb: -dropdb $(DBNAME) dumpdb: swh-scheduler.dump swh-scheduler.dump: filldb-stamp pg_dump -Fc $(DBNAME) > $@ -doc: autodoc-stamp $(DOCDIR)/swh-scheduler.pdf -autodoc-stamp: filldb-stamp +$(DOCDIR): test -d $(DOCDIR)/ || mkdir $(DOCDIR) - postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh + +doc: autodoc-stamp $(DOCDIR)/swh-scheduler.pdf +autodoc-stamp: filldb-stamp $(DOCDIR) + postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh-scheduler cp -a $(DOCDIR)/swh-scheduler.dot $(DOCDIR)/swh-scheduler.dot.orig +ifndef PIFPAF touch $@ +else + rm -f $@ +endif -$(DOCDIR)/swh-scheduler.pdf: autodoc-stamp - dot -T pdf $(DOCDIR)/swh-scheduler.dot > $(DOCDIR)/swh-scheduler.pdf +$(DOCDIR)/swh-scheduler.pdf: $(DOCDIR)/swh-scheduler.dot autodoc-stamp + dot -T pdf $< > $@ +$(DOCDIR)/swh-scheduler.svg: $(DOCDIR)/swh-scheduler.dot autodoc-stamp + dot -T svg $< > $@ clean: rm -rf *-stamp $(DOCDIR)/ distclean: clean dropdb rm -f swh-scheduler.dump .PHONY: all initdb createdb dropdb doc clean diff --git a/sql/swh-scheduler-testdata.sql b/sql/swh-scheduler-testdata.sql deleted file mode 100644 index df1da98..0000000 --- a/sql/swh-scheduler-testdata.sql +++ /dev/null @@ -1,30 +0,0 @@ -begin; - -insert into task_type - (type, backend_name, description, min_interval, max_interval, default_interval, backoff_factor) -values - ('git-updater', 'swh.loader.git.tasks.UpdateGitRepository', 'Git repository updater', '12 hours'::interval, '30 days'::interval, '1 day'::interval, 2); - - -select swh_scheduler_mktemp_task (); - - -insert into tmp_task - (type, arguments, next_run) -values - ('git-updater', - '{"args":["git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git"], "kwargs":{}}'::jsonb, - now()); - -select * from swh_scheduler_create_tasks_from_temp (); - -commit; - - -select * from swh_scheduler_grab_ready_tasks (); -select * from swh_scheduler_schedule_task_run(1, 'foo'); -select * from swh_scheduler_start_task_run('foo', '{"worker": "worker01"}'); -select * from swh_scheduler_end_task_run('foo', true, '{"logs": "foobarbaz"}'); - -select * from task; -select * from task_run; diff --git a/sql/updater/sql/Makefile b/sql/updater/sql/Makefile index cd976e0..30263f2 100644 --- a/sql/updater/sql/Makefile +++ b/sql/updater/sql/Makefile @@ -1,50 +1,73 @@ # Depends: postgresql-client, postgresql-autodoc DBNAME = softwareheritage-scheduler-updater-dev DOCDIR = autodoc -SQL_INIT = swh-init.sql -SQL_SCHEMA = swh-schema.sql -SQL_FUNC = swh-func.sql +SQL_INIT = 10-swh-init.sql +SQL_SCHEMA = 30-swh-schema.sql +SQL_FUNC = 40-swh-func.sql SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC) +SQL_FILES = $(abspath $(addprefix \ + $(CURDIR)/../../../swh/scheduler/sql/updater/,$(SQLS))) PSQL_BIN = psql PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1 PSQL = $(PSQL_BIN) $(PSQL_FLAGS) +PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS)) + all: createdb: createdb-stamp -createdb-stamp: $(SQL_INIT) +createdb-stamp: $(SQL_FILES) +ifndef PIFPAF + -dropdb $(DBNAME) +endif createdb $(DBNAME) +ifndef PIFPAF touch $@ +else + rm -f $@ +endif filldb: filldb-stamp filldb-stamp: createdb-stamp - cat $(SQLS) | $(PSQL) $(DBNAME) + cat $(SQL_FILES) | $(PSQL) $(DBNAME) +ifndef PIFPAF touch $@ +else + rm -f $@ +endif dropdb: -dropdb $(DBNAME) dumpdb: swh-scheduler-updater.dump -swh-scheduler.dump: filldb-stamp +swh-scheduler-updater.dump: filldb-stamp pg_dump -Fc $(DBNAME) > $@ -doc: autodoc-stamp $(DOCDIR)/swh-scheduler-updater.pdf -autodoc-stamp: filldb-stamp +$(DOCDIR): test -d $(DOCDIR)/ || mkdir $(DOCDIR) - postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh - cp -a $(DOCDIR)/swh-scheduler.dot $(DOCDIR)/swh-scheduler-updater.dot.orig + +doc: autodoc-stamp $(DOCDIR)/swh-scheduler-updater.pdf +autodoc-stamp: filldb-stamp $(DOCDIR) + postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh-scheduler-updater + cp -a $(DOCDIR)/swh-scheduler-updater.dot $(DOCDIR)/swh-scheduler-updater.dot.orig +ifndef PIFPAF touch $@ +else + rm -f $@ +endif -$(DOCDIR)/swh-scheduler.pdf: autodoc-stamp - dot -T pdf $(DOCDIR)/swh-scheduler-updater.dot > $(DOCDIR)/swh-scheduler-updater.pdf +$(DOCDIR)/swh-scheduler-updater.pdf: $(DOCDIR)/swh-scheduler-updater.dot autodoc-stamp + dot -T pdf $< > $@ +$(DOCDIR)/swh-scheduler-updater.svg: $(DOCDIR)/swh-scheduler-updater.dot autodoc-stamp + dot -T svg $< > $@ clean: rm -rf *-stamp $(DOCDIR)/ distclean: clean dropdb rm -f swh-scheduler-updater.dump .PHONY: all initdb createdb dropdb doc clean diff --git a/sql/updates/12.sql b/sql/updates/12.sql new file mode 100644 index 0000000..3ab8c42 --- /dev/null +++ b/sql/updates/12.sql @@ -0,0 +1,51 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 11 +-- to_version: 12 +-- description: Upgrade scheduler create_tasks routine + +insert into dbversion (version, release, description) + values (12, now(), 'Work In Progress'); + +create or replace function swh_scheduler_create_tasks_from_temp () + returns setof task + language plpgsql +as $$ +begin + -- update the default values in one go + -- this is separated from the insert/select to avoid too much + -- juggling + update tmp_task t + set current_interval = tt.default_interval, + retries_left = coalesce(retries_left, tt.num_retries, 0) + from task_type tt + where tt.type=t.type; + + insert into task (type, arguments, next_run, status, current_interval, policy, + retries_left, priority) + select type, arguments, next_run, status, current_interval, policy, + retries_left, priority + from tmp_task t + where not exists(select 1 + from task + where type = t.type and + arguments->'args' = t.arguments->'args' and + arguments->'kwargs' = t.arguments->'kwargs' and + policy = t.policy and + priority is not distinct from t.priority and + status = t.status); + + return query + select distinct t.* + from tmp_task tt inner join task t on ( + tt.type = t.type and + tt.arguments->'args' = t.arguments->'args' and + tt.arguments->'kwargs' = t.arguments->'kwargs' and + tt.policy = t.policy and + tt.priority is not distinct from t.priority and + tt.status = t.status + ); +end; +$$; + +comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; + diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 2452bd9..783edcf 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,10 +1,65 @@ -Metadata-Version: 1.0 +Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.31 +Version: 0.0.32 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Description: UNKNOWN +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler +Description: swh-scheduler + ============= + + Job scheduler for the Software Heritage project. + + Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., + listing a forge, loading new stuff from a Git repository) and one-off + activities (e.g., loading a specific version of a source package). + + + # Tests + + ## Running test manually + + ### Test data + + To be able to run (unit) tests, you need to have the + [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] + in the parent directory. If you have set your environment following the + [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] + document everythong should be set up just fine. + + Otherwise: + + ``` + ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata + ``` + + ### Required services + + Unit tests that require a running celery broker uses an in memory broker/result + backend by default, but you can choose to use a true broker by setting + `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. + + For example: + + ``` + $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests + + ..................................... + ---------------------------------------------------------------------- + Ran 37 tests in 15.578s + + OK + ``` + Platform: UNKNOWN +Classifier: Programming Language :: Python :: 3 +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) +Classifier: Operating System :: OS Independent +Classifier: Development Status :: 5 - Production/Stable +Description-Content-Type: text/markdown +Provides-Extra: testing diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index f52ba7a..6af5abd 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,84 +1,87 @@ .gitignore AUTHORS LICENSE LICENSE.Celery MANIFEST.in Makefile +README.md requirements-swh.txt +requirements-test.txt requirements.txt setup.py version.txt bin/swh-worker-control data/README.md data/elastic-template.json data/update-index-settings.json debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile -sql/swh-scheduler-data.sql -sql/swh-scheduler-schema.sql -sql/swh-scheduler-testdata.sql sql/updater/sql/Makefile -sql/updater/sql/swh-func.sql -sql/updater/sql/swh-init.sql -sql/updater/sql/swh-schema.sql sql/updates/02.sql sql/updates/03.sql sql/updates/04.sql sql/updates/05.sql sql/updates/06.sql sql/updates/07.sql sql/updates/08.sql sql/updates/09.sql sql/updates/10.sql sql/updates/11.sql +sql/updates/12.sql swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py +swh/scheduler/sql/30-swh-schema.sql +swh/scheduler/sql/40-swh-data.sql +swh/scheduler/sql/updater/10-swh-init.sql +swh/scheduler/sql/updater/30-swh-schema.sql +swh/scheduler/sql/updater/40-swh-func.sql swh/scheduler/tests/__init__.py +swh/scheduler/tests/celery_testing.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_task.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/updater/__init__.py swh/scheduler/tests/updater/test_backend.py swh/scheduler/tests/updater/test_consumer.py swh/scheduler/tests/updater/test_events.py swh/scheduler/tests/updater/test_ghtorrent.py swh/scheduler/tests/updater/test_writer.py swh/scheduler/updater/__init__.py swh/scheduler/updater/backend.py swh/scheduler/updater/consumer.py swh/scheduler/updater/events.py swh/scheduler/updater/writer.py swh/scheduler/updater/ghtorrent/__init__.py swh/scheduler/updater/ghtorrent/cli.py swh/scheduler/updater/ghtorrent/fake.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index 22d7b6c..c638937 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,9 +1,14 @@ Click arrow celery elasticsearch>5.4 flask kombu psycopg2 -swh.core>=0.0.40 +swh.core>=0.0.44 vcversioner + +[testing] +celery +hypothesis +nose diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index cb70aea..20cd310 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -1,181 +1,203 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import datetime +import logging import socket from arrow import utcnow from kombu import Queue from celery.events import EventReceiver from swh.scheduler import get_scheduler from .config import app as main_app class ReliableEventReceiver(EventReceiver): def __init__(self, channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix='celeryev', accept=None): super(ReliableEventReceiver, self).__init__( channel, handlers, routing_key, node_id, app, queue_prefix, accept) self.queue = Queue('.'.join([self.queue_prefix, self.node_id]), exchange=self.exchange, routing_key=self.routing_key, auto_delete=False, - durable=True, - queue_arguments=self._get_queue_arguments()) + durable=True) def get_consumers(self, consumer, channel): return [consumer(queues=[self.queue], callbacks=[self._receive], no_ack=False, accept=self.accept)] - def _receive(self, body, message): - type, body = self.event_from_message(body) - self.process(type, body, message) + def _receive(self, bodies, message): + logging.debug('## event-receiver: bodies: %s' % bodies) + logging.debug('## event-receiver: message: %s' % message) + if not isinstance(bodies, list): # celery<4 returned body as element + bodies = [bodies] + for body in bodies: + type, body = self.event_from_message(body) + self.process(type, body, message) def process(self, type, event, message): """Process the received event by dispatching it to the appropriate handler.""" handler = self.handlers.get(type) or self.handlers.get('*') + logging.debug('## event-receiver: type: %s' % type) + logging.debug('## event-receiver: event: %s' % event) + logging.debug('## event-receiver: handler: %s' % handler) handler and handler(event, message) ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) ACTION_QUEUE_MAX_LENGTH = 1000 def event_monitor(app, backend): actions = { 'last_send': utcnow() - 2*ACTION_SEND_DELAY, 'queue': [], } def try_perform_actions(actions=actions): if not actions['queue']: return if utcnow() - actions['last_send'] > ACTION_SEND_DELAY or \ len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH: perform_actions(actions) def perform_actions(actions, backend=backend): action_map = { 'start_task_run': backend.start_task_run, 'end_task_run': backend.end_task_run, } messages = [] cursor = backend.cursor() for action in actions['queue']: messages.append(action['message']) function = action_map[action['action']] args = action.get('args', ()) kwargs = action.get('kwargs', {}) kwargs['cursor'] = cursor function(*args, **kwargs) backend.commit() for message in messages: message.ack() actions['queue'] = [] actions['last_send'] = utcnow() def queue_action(action, actions=actions): actions['queue'].append(action) try_perform_actions() def catchall_event(event, message): message.ack() try_perform_actions() def task_started(event, message): + logging.debug('#### task_started: event: %s' % event) + logging.debug('#### task_started: message: %s' % message) + queue_action({ 'action': 'start_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'metadata': { 'worker': event['hostname'], }, }, 'message': message, }) def task_succeeded(event, message): + logging.debug('#### task_succeeded: event: %s' % event) + logging.debug('#### task_succeeded: message: %s' % message) result = event['result'] + logging.debug('#### task_succeeded: result: %s' % result) try: status = result.get('status') if status == 'success': status = 'eventful' if result.get('eventful') else 'uneventful' except Exception: status = 'eventful' if result else 'uneventful' queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': status, 'result': result, }, 'message': message, }) def task_failed(event, message): + logging.debug('#### task_failed: event: %s' % event) + logging.debug('#### task_failed: message: %s' % message) + queue_action({ 'action': 'end_task_run', 'args': [event['uuid']], 'kwargs': { 'timestamp': utcnow(), 'status': 'failed', }, 'message': message, }) recv = ReliableEventReceiver( main_app.connection(), app=main_app, handlers={ 'task-started': task_started, 'task-result': task_succeeded, 'task-failed': task_failed, '*': catchall_event, }, node_id='listener-%s' % socket.gethostname(), ) recv.capture(limit=None, timeout=None, wakeup=True) @click.command() @click.option('--cls', '-c', default='local', help="Scheduler's class, default to 'local'") @click.option( '--database', '-d', help='Scheduling database DSN') @click.option('--url', '-u', help="(Optional) Scheduler's url access") -def main(cls, database, url): +@click.option('--verbose', is_flag=True, default=False, + help='Default to be silent') +def main(cls, database, url, verbose): + if verbose: + logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) + scheduler = None override_config = {} if cls == 'local': if database: override_config = {'scheduling_db': database} scheduler = get_scheduler(cls, args=override_config) elif cls == 'remote': if url: override_config = {'url': url} scheduler = get_scheduler(cls, args=override_config) if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') event_monitor(main_app, backend=scheduler) if __name__ == '__main__': main() diff --git a/sql/swh-scheduler-schema.sql b/swh/scheduler/sql/30-swh-schema.sql similarity index 97% rename from sql/swh-scheduler-schema.sql rename to swh/scheduler/sql/30-swh-schema.sql index a8d4c0b..d7333e1 100644 --- a/sql/swh-scheduler-schema.sql +++ b/swh/scheduler/sql/30-swh-schema.sql @@ -1,520 +1,520 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) - values (11, now(), 'Work In Progress'); + values (12, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval, min_interval interval, max_interval interval, backoff_factor float, max_queue_length bigint, num_retries bigint, retry_delay interval ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; comment on column task_type.num_retries is 'Default number of retries on transient failures'; comment on column task_type.retry_delay is 'Retry delay for the task'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'); comment on type task_status is 'Status of a given task'; create type task_policy as enum ('recurring', 'oneshot'); comment on type task_policy is 'Recurrence policy of the given task'; create type task_priority as enum('high', 'normal', 'low'); comment on type task_priority is 'Priority of the given task'; create table priority_ratio( id task_priority primary key, ratio float not null ); comment on table priority_ratio is 'Oneshot task''s reading ratio per priority'; comment on column priority_ratio.id is 'Task priority id'; comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority'; insert into priority_ratio (id, ratio) values ('high', 0.5); insert into priority_ratio (id, ratio) values ('normal', 0.3); insert into priority_ratio (id, ratio) values ('low', 0.2); create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval, status task_status not null, policy task_policy not null default 'recurring', retries_left bigint not null default 0, priority task_priority references priority_ratio(id), check (policy <> 'recurring' or current_interval is not null) ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; comment on column task.policy is 'Whether the task is one-shot or recurring'; comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' 'transient failure'; comment on column task.priority is 'Policy of the given task'; create index on task(type); create index on task(next_run); create index task_args on task using btree ((arguments -> 'args')); create index task_kwargs on task using gin ((arguments -> 'kwargs')); create index on task(priority); create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'); comment on type task_run_status is 'Status of a given task run'; create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; create index on task_run(task); create index on task_run(backend_id); create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task alter column retries_left drop not null, drop column id; $$; comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin -- update the default values in one go -- this is separated from the insert/select to avoid too much -- juggling update tmp_task t set current_interval = tt.default_interval, retries_left = coalesce(retries_left, tt.num_retries, 0) from task_type tt where tt.type=t.type; insert into task (type, arguments, next_run, status, current_interval, policy, retries_left, priority) select type, arguments, next_run, status, current_interval, policy, retries_left, priority from tmp_task t where not exists(select 1 from task where type = t.type and - arguments = t.arguments and + arguments->'args' = t.arguments->'args' and + arguments->'kwargs' = t.arguments->'kwargs' and policy = t.policy and - ((priority is null and t.priority is null) - or priority = t.priority) and + priority is not distinct from t.priority and status = t.status); return query select distinct t.* from tmp_task tt inner join task t on ( - t.type = tt.type and - t.arguments = tt.arguments and - t.status = tt.status and - ((t.priority is null and tt.priority is null) - or t.priority=tt.priority) and - t.policy=tt.policy + tt.type = t.type and + tt.arguments->'args' = t.arguments->'args' and + tt.arguments->'kwargs' = t.arguments->'kwargs' and + tt.policy = t.policy and + tt.priority is not distinct from t.priority and + tt.status = t.status ); end; $$; comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; create or replace function swh_scheduler_peek_no_priority_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts and type = task_type and status = 'next_run_not_scheduled' and priority is null order by next_run limit num_tasks for update skip locked; $$; comment on function swh_scheduler_peek_no_priority_tasks (text, timestamptz, bigint) is 'Retrieve tasks without priority'; create or replace function swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority task_priority) returns numeric language sql stable as $$ select ceil(num_tasks_priority * (select ratio from priority_ratio where id = task_priority)) :: numeric $$; comment on function swh_scheduler_nb_priority_tasks (bigint, task_priority) is 'Given a priority task and a total number, compute the number of tasks to read'; create or replace function swh_scheduler_peek_tasks_with_priority (task_type text, ts timestamptz default now(), num_tasks_priority bigint default NULL, task_priority task_priority default 'normal') returns setof task language sql stable as $$ select * from task t where t.next_run <= ts and t.type = task_type and t.status = 'next_run_not_scheduled' and t.priority = task_priority order by t.next_run limit num_tasks_priority for update skip locked; $$; comment on function swh_scheduler_peek_tasks_with_priority(text, timestamptz, bigint, task_priority) is 'Retrieve tasks with a given priority'; create or replace function swh_scheduler_peek_priority_tasks (task_type text, ts timestamptz default now(), num_tasks_priority bigint default NULL) returns setof task language plpgsql as $$ declare r record; count_row bigint; nb_diff bigint; nb_high bigint; nb_normal bigint; nb_low bigint; begin -- expected values to fetch select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'high') into nb_high; select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'normal') into nb_normal; select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'low') into nb_low; nb_diff := 0; count_row := 0; for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_high, 'high') loop count_row := count_row + 1; return next r; end loop; if count_row < nb_high then nb_normal := nb_normal + nb_high - count_row; end if; count_row := 0; for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_normal, 'normal') loop count_row := count_row + 1; return next r; end loop; if count_row < nb_normal then nb_low := nb_low + nb_normal - count_row; end if; return query select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_low, 'low'); end $$; comment on function swh_scheduler_peek_priority_tasks(text, timestamptz, bigint) is 'Retrieve priority tasks'; create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL, num_tasks_priority bigint default NULL) returns setof task language plpgsql as $$ declare r record; count_row bigint; nb_diff bigint; nb_tasks bigint; begin count_row := 0; for r in select * from swh_scheduler_peek_priority_tasks(task_type, ts, num_tasks_priority) order by priority, next_run loop count_row := count_row + 1; return next r; end loop; if count_row < num_tasks_priority then nb_tasks := num_tasks + num_tasks_priority - count_row; else nb_tasks := num_tasks; end if; for r in select * from swh_scheduler_peek_no_priority_tasks(task_type, ts, nb_tasks) order by priority, next_run loop return next r; end loop; return; end $$; comment on function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint, bigint) is 'Retrieve tasks with/without priority in order'; create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL, num_tasks_priority bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from swh_scheduler_peek_ready_tasks(task_type, ts, num_tasks, num_tasks_priority) ) next_tasks where task.id = next_tasks.id returning task.*; $$; comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint) is 'Grab tasks ready for scheduling and change their status'; create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ insert into task_run (task, backend_id, metadata, scheduled, status) values (task_id, backend_id, metadata, ts, 'scheduled') returning *; $$; create or replace function swh_scheduler_mktemp_task_run () returns void language sql as $$ create temporary table tmp_task_run ( like task_run excluding indexes ) on commit drop; alter table tmp_task_run drop column id, drop column status; $$; comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling'; create or replace function swh_scheduler_schedule_task_run_from_temp () returns void language plpgsql as $$ begin insert into task_run (task, backend_id, metadata, scheduled, status) select task, backend_id, metadata, scheduled, 'scheduled' from tmp_task_run; return; end; $$; create or replace function swh_scheduler_start_task_run (backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set started = ts, status = 'started', metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; create or replace function swh_scheduler_end_task_run (backend_id text, status task_run_status, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set ended = ts, status = swh_scheduler_end_task_run.status, metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; create type task_record as ( task_id bigint, task_policy task_policy, task_status task_status, task_run_id bigint, arguments jsonb, type text, backend_id text, metadata jsonb, scheduled timestamptz, started timestamptz, ended timestamptz, task_run_status task_run_status ); create index task_run_id_asc_idx on task_run(task asc, started asc); create or replace function swh_scheduler_task_to_archive( ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, lim bigint default 10) returns setof task_record language sql stable as $$ select t.id as task_id, t.policy as task_policy, t.status as task_status, tr.id as task_run_id, t.arguments, t.type, tr.backend_id, tr.metadata, tr.scheduled, tr.started, tr.ended, tr.status as task_run_status from task_run tr inner join task t on tr.task=t.id where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or (t.policy = 'recurring' and t.status = 'disabled')) and ((ts_after <= tr.started and tr.started < ts_before) or tr.started is null) and t.id > last_id order by tr.task, tr.started limit lim; $$; comment on function swh_scheduler_task_to_archive is 'Read archivable tasks function'; create or replace function swh_scheduler_delete_archived_tasks( task_ids bigint[], task_run_ids bigint[]) returns void language sql as $$ -- clean up task_run_ids delete from task_run where id in (select * from unnest(task_run_ids)); -- clean up only tasks whose associated task_run are all cleaned up. -- Remaining tasks will stay there and will be cleaned up when -- remaining data have been indexed delete from task where id in (select t.id from task t left outer join task_run tr on t.id=tr.task where t.id in (select * from unnest(task_ids)) and tr.task is null); $$; comment on function swh_scheduler_delete_archived_tasks is 'Clean up archived tasks function'; create or replace function swh_scheduler_update_task_on_task_end () returns trigger language plpgsql as $$ declare cur_task task%rowtype; cur_task_type task_type%rowtype; adjustment_factor float; new_interval interval; begin select * from task where id = new.task into cur_task; select * from task_type where type = cur_task.type into cur_task_type; case when new.status = 'permfailed' then update task set status = 'disabled' where id = cur_task.id; when new.status in ('eventful', 'uneventful') then case when cur_task.policy = 'oneshot' then update task set status = 'completed' where id = cur_task.id; when cur_task.policy = 'recurring' then if new.status = 'uneventful' then adjustment_factor := 1/cur_task_type.backoff_factor; else adjustment_factor := 1/cur_task_type.backoff_factor; end if; new_interval := greatest( cur_task_type.min_interval, least( cur_task_type.max_interval, adjustment_factor * cur_task.current_interval)); update task set status = 'next_run_not_scheduled', next_run = now() + new_interval, current_interval = new_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; else -- new.status in 'failed', 'lost' if cur_task.retries_left > 0 then update task set status = 'next_run_not_scheduled', next_run = now() + coalesce(cur_task_type.retry_delay, interval '1 hour'), retries_left = cur_task.retries_left - 1 where id = cur_task.id; else -- no retries left case when cur_task.policy = 'oneshot' then update task set status = 'disabled' where id = cur_task.id; when cur_task.policy = 'recurring' then update task set status = 'next_run_not_scheduled', next_run = now() + cur_task.current_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; end if; -- retries end case; return null; end; $$; create trigger update_task_on_task_end after update of status on task_run for each row when (new.status NOT IN ('scheduled', 'started')) execute procedure swh_scheduler_update_task_on_task_end (); diff --git a/sql/swh-scheduler-data.sql b/swh/scheduler/sql/40-swh-data.sql similarity index 76% rename from sql/swh-scheduler-data.sql rename to swh/scheduler/sql/40-swh-data.sql index 400d0e1..b10d2e5 100644 --- a/sql/swh-scheduler-data.sql +++ b/swh/scheduler/sql/40-swh-data.sql @@ -1,155 +1,194 @@ insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-loader-mount-dump-and-load-svn-repository', 'Loading svn repositories from svn dump', - 'swh.loader.svn.tasks.MountAndLoadSvnRepositoryTsk', + 'swh.loader.svn.tasks.MountAndLoadSvnRepository', + '1 day', '1 day', '1 day', 1, + 1000); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor, + max_queue_length) +values ( + 'origin-update-svn', + 'Create dump of a remote svn repository, mount it and load it', + 'swh.loader.svn.tasks.DumpMountAndLoadSvnRepository', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-loading', 'Loading deposit archive into swh through swh-loader-tar', 'swh.deposit.loader.tasks.LoadDepositArchiveTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-checks', 'Pre-checking deposit step before loading into swh archive', 'swh.deposit.loader.tasks.ChecksDepositTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-vault-cooking', 'Cook a Vault bundle', 'swh.vault.cooking_tasks.SWHCookingTask', '1 day', '1 day', '1 day', 1, 10000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( - 'origin-load-hg', + 'origin-update-hg', 'Loading mercurial repository swh-loader-mercurial', - 'swh.loader.mercurial.tasks.LoadMercurialTsk', + 'swh.loader.mercurial.tasks.LoadMercurial', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-load-archive-hg', 'Loading archive mercurial repository swh-loader-mercurial', - 'swh.loader.mercurial.tasks.LoadArchiveMercurialTsk', + 'swh.loader.mercurial.tasks.LoadArchiveMercurial', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-git', 'Update an origin of type git', 'swh.loader.git.tasks.UpdateGitRepository', '64 days', '12:00:00', - '64 days', 2, 100000); + '64 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( -'swh-lister-github-incremental', + 'swh-lister-github-incremental', 'Incrementally list GitHub', 'swh.lister.github.tasks.IncrementalGitHubLister', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-github-full', 'Full update of GitHub repos list', 'swh.lister.github.tasks.FullGitHubRelister', '90 days', '90 days', '90 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-debian', 'List a Debian distribution', 'swh.lister.debian.tasks.DebianListerTask', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-gitlab-incremental', 'Incrementally list a Gitlab instance', 'swh.lister.gitlab.tasks.IncrementalGitLabLister', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-gitlab-full', 'Full update of a Gitlab instance''s repos list', 'swh.lister.gitlab.tasks.FullGitLabRelister', '90 days', '90 days', '90 days', 1); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor) +values ( + 'swh-lister-pypi', + 'Full pypi lister', + 'swh.lister.pypi.tasks.PyPIListerTask', + '1 days', + '1 days', + '1 days', 1); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor, + max_queue_length) +values ( + 'origin-update-pypi', + 'Load Pypi origin', + 'swh.loader.pypi.tasks.LoadPyPI', + '64 days', '12:00:00', '64 days', 2, + 5000); diff --git a/sql/updater/sql/swh-init.sql b/swh/scheduler/sql/updater/10-swh-init.sql similarity index 100% rename from sql/updater/sql/swh-init.sql rename to swh/scheduler/sql/updater/10-swh-init.sql diff --git a/sql/updater/sql/swh-schema.sql b/swh/scheduler/sql/updater/30-swh-schema.sql similarity index 100% rename from sql/updater/sql/swh-schema.sql rename to swh/scheduler/sql/updater/30-swh-schema.sql diff --git a/sql/updater/sql/swh-func.sql b/swh/scheduler/sql/updater/40-swh-func.sql similarity index 100% rename from sql/updater/sql/swh-func.sql rename to swh/scheduler/sql/updater/40-swh-func.sql diff --git a/swh/scheduler/tests/__init__.py b/swh/scheduler/tests/__init__.py index e69de29..04915f5 100644 --- a/swh/scheduler/tests/__init__.py +++ b/swh/scheduler/tests/__init__.py @@ -0,0 +1,5 @@ +from os import path +import swh.scheduler + + +SQL_DIR = path.join(path.dirname(swh.scheduler.__file__), 'sql') diff --git a/swh/scheduler/tests/celery_testing.py b/swh/scheduler/tests/celery_testing.py new file mode 100644 index 0000000..2859e9c --- /dev/null +++ b/swh/scheduler/tests/celery_testing.py @@ -0,0 +1,18 @@ +import os + + +def setup_celery(): + os.environ.setdefault('CELERY_BROKER_URL', 'memory://') + os.environ.setdefault('CELERY_RESULT_BACKEND', 'cache+memory://') + + +class CeleryTestFixture: + """Mix this in a test subject class to setup Celery config for testing + purpose. + + Can be overriden by CELERY_BROKER_URL and CELERY_RESULT_BACKEND env vars. + """ + + def setUp(sel): + setup_celery() + super().setUp() diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index c395e8c..5e0c21d 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,36 +1,36 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from swh.core.tests.server_testing import ServerTestFixture from swh.scheduler import get_scheduler -from swh.scheduler.tests.test_scheduler import CommonSchedulerTest from swh.scheduler.api.server import app +from swh.scheduler.tests.test_scheduler import CommonSchedulerTest class RemoteSchedulerTest(CommonSchedulerTest, ServerTestFixture, unittest.TestCase): """Test the remote scheduler API. This class doesn't define any tests as we want identical functionality between local and remote scheduler. All the tests are therefore defined in CommonSchedulerTest. """ def setUp(self): self.config = { 'scheduler': { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=%s' % self.TEST_DB_NAME, } } } self.app = app # this will setup the local scheduler... super().setUp() # accessible through a remote scheduler accessible on the # given port self.backend = get_scheduler('remote', {'url': self.url()}) diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 6f41a72..caf1748 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,486 +1,474 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import os import random import unittest import uuid +from collections import defaultdict +import psycopg2 from arrow import utcnow -from collections import defaultdict from nose.plugins.attrib import attr -from nose.tools import istest -import psycopg2 from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler import get_scheduler - -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') +from . import SQL_DIR @attr('db') class CommonSchedulerTest(SingleDbTestFixture): TEST_DB_NAME = 'softwareheritage-scheduler-test' - TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler.dump') + TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') def setUp(self): super().setUp() tt = { 'type': 'update-git', 'description': 'Update a git repository', 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), } tt2 = tt.copy() tt2['type'] = 'update-hg' tt2['description'] = 'Update a mercurial repository' tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository' tt2['max_queue_length'] = 42 tt2['num_retries'] = None tt2['retry_delay'] = None self.task_types = { tt['type']: tt, tt2['type']: tt2, } self.task1_template = t1_template = { 'type': tt['type'], 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, } self.task2_template = t2_template = copy.deepcopy(t1_template) t2_template['type'] = tt2['type'] t2_template['policy'] = 'oneshot' def tearDown(self): self.backend.close_connection() self.empty_tables() super().tearDown() def empty_tables(self, whitelist=["priority_ratio"]): query = """SELECT table_name FROM information_schema.tables WHERE table_schema = %%s and table_name not in (%s) """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) self.cursor.execute(query, ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() - @istest - def add_task_type(self): + def test_add_task_type(self): tt, tt2 = self.task_types.values() self.backend.create_task_type(tt) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) with self.assertRaisesRegex(psycopg2.IntegrityError, - '\(type\)=\(%s\)' % tt['type']): + r'\(type\)=\(%s\)' % tt['type']): self.backend.create_task_type(tt) self.backend.create_task_type(tt2) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) - @istest - def get_task_types(self): + def test_get_task_types(self): tt, tt2 = self.task_types.values() self.backend.create_task_type(tt) self.backend.create_task_type(tt2) self.assertCountEqual([tt2, tt], self.backend.get_task_types()) @staticmethod def _task_from_template(template, next_run, priority, *args, **kwargs): ret = copy.deepcopy(template) ret['next_run'] = next_run if priority: ret['priority'] = priority if args: ret['arguments']['args'] = list(args) if kwargs: ret['arguments']['kwargs'] = kwargs return ret def _pop_priority(self, priorities): if not priorities: return None for priority, remains in priorities.items(): if remains > 0: priorities[priority] = remains - 1 return priority return None def _tasks_from_template(self, template, max_timestamp, num, num_priority=0, priorities=None): if num_priority and priorities: priorities = { priority: ratio * num_priority for priority, ratio in priorities.items() } tasks = [] for i in range(num + num_priority): priority = self._pop_priority(priorities) tasks.append(self._task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, 'argument-%03d' % i, **{'kwarg%03d' % i: 'bogus-kwarg'} )) return tasks def _create_task_types(self): for tt in self.task_types.values(): self.backend.create_task_type(tt) - @istest - def create_tasks(self): + def test_create_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() num_tasks_priority = 100 tasks_1 = self._tasks_from_template(self.task1_template, utcnow(), 100) tasks_2 = self._tasks_from_template( self.task2_template, utcnow(), 100, num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids ret = self.backend.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results self.assertEqual(set_ret, set_ret1) self.assertEqual(len(ret), len(ret1)) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = self.task_types[orig_task['type']] self.assertNotIn(task['id'], ids) self.assertEqual(task['status'], 'next_run_not_scheduled') self.assertEqual(task['current_interval'], task_type['default_interval']) self.assertEqual(task['policy'], orig_task.get('policy', 'recurring')) priority = task.get('priority') if priority: actual_priorities[priority] += 1 self.assertEqual(task['retries_left'], task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] del task['current_interval'] del task['retries_left'] if 'policy' not in orig_task: del task['policy'] if 'priority' not in orig_task: del task['priority'] self.assertEqual(task, orig_task) self.assertEqual(dict(actual_priorities), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) - @istest - def peek_ready_tasks_no_priority(self): + def test_peek_ready_tasks_no_priority(self): self._create_task_types() t = utcnow() task_type = self.task1_template['type'] tasks = self._tasks_from_template(self.task1_template, t, 100) random.shuffle(tasks) self.backend.create_tasks(tasks) ready_tasks = self.backend.peek_ready_tasks(task_type) self.assertEqual(len(ready_tasks), len(tasks)) for i in range(len(ready_tasks) - 1): self.assertLessEqual(ready_tasks[i]['next_run'], ready_tasks[i+1]['next_run']) # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=limit) self.assertEqual(len(ready_tasks_limited), limit) self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) # Limit by timestamp max_ts = tasks[limit-1]['next_run'] ready_tasks_timestamped = self.backend.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: self.assertLessEqual(ready_task['next_run'], max_ts) # Make sure we get proper behavior for the first ready tasks self.assertCountEqual( ready_tasks[:len(ready_tasks_timestamped)], ready_tasks_timestamped, ) # Limit by both ready_tasks_both = self.backend.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) self.assertLessEqual(len(ready_tasks_both), limit//3) for ready_task in ready_tasks_both: self.assertLessEqual(ready_task['next_run'], max_ts) self.assertIn(ready_task, ready_tasks[:limit//3]) def _priority_ratio(self): self.cursor.execute('select id, ratio from priority_ratio') priority_ratio = {} for row in self.cursor.fetchall(): priority_ratio[row[0]] = row[1] return priority_ratio - @istest - def peek_ready_tasks_mixed_priorities(self): + def test_peek_ready_tasks_mixed_priorities(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() task_type = self.task1_template['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( self.task1_template, t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) # take all available tasks ready_tasks = self.backend.peek_ready_tasks( task_type) self.assertEqual(len(ready_tasks), len(tasks)) self.assertEqual(num_tasks_priority + num_tasks_no_priority, len(ready_tasks)) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get('priority') if priority: count_tasks_per_priority[priority] += 1 self.assertEqual(dict(count_tasks_per_priority), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get('priority') count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] self.assertTrue( actual_prio == expected_count or actual_prio == expected_count + 1) self.assertEqual(count_tasks_per_priority[None], num_tasks) - @istest - def grab_ready_tasks(self): + def test_grab_ready_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() task_type = self.task1_template['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( self.task1_template, t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) first_ready_tasks = self.backend.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) grabbed_tasks = self.backend.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): self.assertEqual(peeked['status'], 'next_run_not_scheduled') del peeked['status'] self.assertEqual(grabbed['status'], 'next_run_scheduled') del grabbed['status'] self.assertEqual(peeked, grabbed) self.assertEqual(peeked['priority'], grabbed['priority']) - @istest - def get_tasks(self): + def test_get_tasks(self): self._create_task_types() t = utcnow() tasks = self._tasks_from_template(self.task1_template, t, 100) tasks = self.backend.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = tasks[:length] tasks[:length] = [] ret = self.backend.get_tasks(task['id'] for task in cur_tasks) self.assertCountEqual(ret, cur_tasks) - @istest - def filter_task_to_archive(self): + def test_filter_task_to_archive(self): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types() _time = utcnow() recurring = self._tasks_from_template(self.task1_template, _time, 12) oneshots = self._tasks_from_template(self.task2_template, _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) # Randomly update task's status per policy status_per_policy = {'recurring': 0, 'oneshot': 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] 'recurring': [(1, 'disabled'), (0, 'completed'), (0, 'next_run_not_scheduled')], 'oneshot': [(0, 'next_run_not_scheduled'), (1, 'disabled'), (1, 'completed')] } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task['policy'] _task_ids[policy].append(task['id']) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) self.backend.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled self.backend.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update self.backend.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) # retrieve tasks to archive after = _time.shift(days=-1).format('YYYY-MM-DD') before = utcnow().shift(days=1).format('YYYY-MM-DD') tasks_to_archive = list(self.backend.filter_task_to_archive( after_ts=after, before_ts=before, limit=total_tasks)) self.assertEqual(len(tasks_to_archive), total_tasks_filtered) actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: actual_filtered_per_status[task['task_policy']] += 1 self.assertEqual(actual_filtered_per_status, status_per_policy) - @istest - def delete_archived_tasks(self): + def test_delete_archived_tasks(self): self._create_task_types() _time = utcnow() recurring = self._tasks_from_template( self.task1_template, _time, 12) oneshots = self._tasks_from_template( self.task2_template, _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) self.backend.delete_archived_tasks(_tasks) self.cursor.execute('select count(*) from task') tasks_count = self.cursor.fetchone() self.cursor.execute('select count(*) from task_run') tasks_run_count = self.cursor.fetchone() self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME} self.backend = get_scheduler('local', self.config) diff --git a/swh/scheduler/tests/test_task.py b/swh/scheduler/tests/test_task.py index 3d957ba..7e2130e 100644 --- a/swh/scheduler/tests/test_task.py +++ b/swh/scheduler/tests/test_task.py @@ -1,31 +1,28 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.tools import istest - from swh.scheduler import task +from .celery_testing import CeleryTestFixture -class Task(unittest.TestCase): +class Task(CeleryTestFixture, unittest.TestCase): - @istest - def not_implemented_task(self): + def test_not_implemented_task(self): class NotImplementedTask(task.Task): pass with self.assertRaises(NotImplementedError): NotImplementedTask().run() - @istest - def add_task(self): + def test_add_task(self): class AddTask(task.Task): def run_task(self, x, y): return x + y r = AddTask().apply([2, 3]) self.assertTrue(r.successful()) self.assertEqual(r.result, 5) diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py index a539cea..192be61 100644 --- a/swh/scheduler/tests/test_utils.py +++ b/swh/scheduler/tests/test_utils.py @@ -1,60 +1,56 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest - from datetime import timezone -from nose.tools import istest from unittest.mock import patch from swh.scheduler import utils class UtilsTest(unittest.TestCase): - @istest @patch('swh.scheduler.utils.datetime') - def create_oneshot_task_dict_simple(self, mock_datetime): + def test_create_oneshot_task_dict_simple(self, mock_datetime): mock_datetime.now.return_value = 'some-date' actual_task = utils.create_oneshot_task_dict('some-task-type') expected_task = { 'policy': 'oneshot', 'type': 'some-task-type', 'next_run': 'some-date', 'arguments': { 'args': [], 'kwargs': {}, }, 'priority': None, } self.assertEqual(actual_task, expected_task) mock_datetime.now.assert_called_once_with(tz=timezone.utc) - @istest @patch('swh.scheduler.utils.datetime') - def create_oneshot_task_dict_other_call(self, mock_datetime): + def test_create_oneshot_task_dict_other_call(self, mock_datetime): mock_datetime.now.return_value = 'some-other-date' actual_task = utils.create_oneshot_task_dict( 'some-task-type', 'arg0', 'arg1', priority='high', other_stuff='normal' ) expected_task = { 'policy': 'oneshot', 'type': 'some-task-type', 'next_run': 'some-other-date', 'arguments': { 'args': ('arg0', 'arg1'), 'kwargs': {'other_stuff': 'normal'}, }, 'priority': 'high', } self.assertEqual(actual_task, expected_task) mock_datetime.now.assert_called_once_with(tz=timezone.utc) diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py index c141f6c..f08fca7 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -1,71 +1,65 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from arrow import utcnow -from nose.plugins.attrib import attr -from nose.tools import istest from hypothesis import given from hypothesis.strategies import sets +from nose.plugins.attrib import attr from swh.core.tests.db_testing import SingleDbTestFixture +from swh.scheduler.tests import SQL_DIR from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent from . import from_regex -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') - - @attr('db') class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' - TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, - 'dumps/swh-scheduler-updater.dump') + TEST_DB_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') def setUp(self): super().setUp() config = { 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, 'cache_read_limit': 1000, } self.backend = SchedulerUpdaterBackend(**config) def _empty_tables(self): self.cursor.execute( """SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def tearDown(self): self.backend.close_connection() self._empty_tables() super().tearDown() - @istest @given(sets( from_regex( r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), min_size=10, max_size=15)) - def cache_read(self, urls): + def test_cache_read(self, urls): def gen_events(urls): for url in urls: yield SWHEvent({ 'url': url, 'type': 'create', 'origin_type': 'git', }) self.backend.cache_put(gen_events(urls)) r = self.backend.cache_read(timestamp=utcnow()) self.assertNotEqual(r, []) diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py index 0944e48..463a05c 100644 --- a/swh/scheduler/tests/updater/test_consumer.py +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -1,199 +1,194 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest +from itertools import chain from hypothesis import given -from hypothesis.strategies import sampled_from, lists, tuples, text +from hypothesis.strategies import lists, sampled_from, text, tuples -from itertools import chain -from nose.tools import istest +from swh.scheduler.updater.consumer import UpdaterConsumer +from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from . import UpdaterTestUtil, from_regex -from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS -from swh.scheduler.updater.consumer import UpdaterConsumer - class FakeSchedulerUpdaterBackend: def __init__(self): self.events = [] def cache_put(self, events): self.events.append(events) class FakeUpdaterConsumerBase(UpdaterConsumer): def __init__(self, backend_class=FakeSchedulerUpdaterBackend): super().__init__(backend_class=backend_class) self.connection_opened = False self.connection_closed = False self.consume_called = False self.has_events_called = False def open_connection(self): self.connection_opened = True def close_connection(self): self.connection_closed = True def convert_event(self, event): pass class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return True def consume_events(self): self.consume_called = True raise ValueError('Broken stuff') class UpdaterConsumerRaisingTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerRaise() - @istest - def running_raise(self): + def test_running_raise(self): """Raising during run should finish fine. """ # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when with self.assertRaisesRegex(ValueError, 'Broken stuff'): self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertTrue(self.updater.consume_called) class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return False def consume_events(self): self.consume_called = True class UpdaterConsumerNoEventTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerNoEvent() - @istest - def running_does_not_consume(self): + def test_running_does_not_consume(self): """Run with no events should do just fine""" # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertFalse(self.updater.consume_called) EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type'] class FakeUpdaterConsumer(FakeUpdaterConsumerBase): def __init__(self, messages): super().__init__() self.messages = messages self.debug = False def has_events(self): self.has_events_called = True return len(self.messages) > 0 def consume_events(self): self.consume_called = True for msg in self.messages: yield msg self.messages.pop() def convert_event(self, event, keys=EVENT_KEYS): for k in keys: v = event.get(k) if v is None: return None e = { 'type': event['type'], 'url': 'https://fake.url/%s' % event['repo']['name'], 'last_seen': event['created_at'], 'origin_type': event['origin_type'], } return SWHEvent(e) class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase): - @istest @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(text(), # event type from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text(), # origin type sampled_from(EVENT_KEYS)), # keys to drop min_size=3, max_size=10)) - def running(self, events, uninteresting_events, incomplete_events): + def test_running(self, events, uninteresting_events, incomplete_events): """Interesting events are written to cache, others are dropped """ # given ready_events = self._make_events(events) ready_uninteresting_events = self._make_events(uninteresting_events) ready_incomplete_events = self._make_incomplete_events( incomplete_events) updater = FakeUpdaterConsumer(list(chain( ready_events, ready_incomplete_events, ready_uninteresting_events))) self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) # when updater.run() # then self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) self.assertTrue(updater.connection_opened) self.assertTrue(updater.has_events_called) self.assertTrue(updater.connection_closed) self.assertTrue(updater.consume_called) self.assertEqual(updater.messages, []) # uninteresting or incomplete events are dropped self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py index cb7489e..2f00bd7 100644 --- a/swh/scheduler/tests/updater/test_events.py +++ b/swh/scheduler/tests/updater/test_events.py @@ -1,48 +1,44 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from hypothesis import given -from hypothesis.strategies import text, sampled_from -from nose.tools import istest +from hypothesis.strategies import sampled_from, text -from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS +from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.ghtorrent import events from . import UpdaterTestUtil def event_values_ko(): return set(events['evt']).union( set(events['ent'])).difference( set(LISTENED_EVENTS)) WRONG_EVENTS = sorted(list(event_values_ko())) class EventTest(UpdaterTestUtil, unittest.TestCase): - @istest @given(sampled_from(LISTENED_EVENTS), text(), text()) - def is_interesting_ok(self, event_type, name, origin_type): + def test_is_interesting_ok(self, event_type, name, origin_type): evt = self._make_simple_event(event_type, name, origin_type) self.assertTrue(SWHEvent(evt).is_interesting()) - @istest @given(text(), text(), text()) - def is_interested_with_noisy_event_should_be_ko( + def test_is_interested_with_noisy_event_should_be_ko( self, event_type, name, origin_type): if event_type in LISTENED_EVENTS: # just in case something good is generated, skip it return evt = self._make_simple_event(event_type, name, origin_type) self.assertFalse(SWHEvent(evt).is_interesting()) - @istest @given(sampled_from(WRONG_EVENTS), text(), text()) - def is_interesting_ko(self, event_type, name, origin_type): + def test_is_interesting_ko(self, event_type, name, origin_type): evt = self._make_simple_event(event_type, name, origin_type) self.assertFalse(SWHEvent(evt).is_interesting()) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index bfeecf2..b87e345 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,171 +1,164 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest +from unittest.mock import patch from hypothesis import given from hypothesis.strategies import sampled_from -from nose.tools import istest -from unittest.mock import patch from swh.scheduler.updater.events import SWHEvent -from swh.scheduler.updater.ghtorrent import ( - events, GHTorrentConsumer, INTERESTING_EVENT_KEYS) +from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS, + GHTorrentConsumer, events) -from . import from_regex, UpdaterTestUtil +from . import UpdaterTestUtil, from_regex def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeChannel: """Fake Channel (virtual connection inside a connection) """ def close(self): self.close = True class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False self._channel = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self._channel = True return FakeChannel() class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): def setUp(self): self.fake_config = { 'conn': { 'url': 'amqp://u:p@https://somewhere:9807', }, 'debug': True, 'batch_cache_write': 10, 'rabbitmq_prefetch_read': 100, } self.consumer = GHTorrentConsumer(self.fake_config, _connection_class=FakeConnection) - @istest def test_init(self): # given # check init is ok self.assertEqual(self.consumer.debug, self.fake_config['debug']) self.assertEqual(self.consumer.batch, self.fake_config['batch_cache_write']) self.assertEqual(self.consumer.prefetch_read, self.fake_config['rabbitmq_prefetch_read']) self.assertEqual(self.consumer.config, self.fake_config) - @istest def test_has_events(self): self.assertTrue(self.consumer.has_events()) - @istest def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, self.fake_config['conn']['url']) self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) self.assertIsInstance(self.consumer.channel, FakeChannel) - @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) - def convert_event_ok(self, event_type, name): + def test_convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name, 'git') actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'cnt': 1, 'origin_type': 'git', } self.assertEqual(event, expected_event) - @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(INTERESTING_EVENT_KEYS)) - def convert_event_ko(self, event_type, name, missing_data_key): + def test_convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, 'git', missing_data_key) actual_converted_event = self.consumer.convert_event(input_event) self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') - @istest - def consume_events(self, mock_collect_replies): + def test_consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ self._make_event('PushEvent', 'user/some-repo', 'git'), self._make_event('PushEvent', 'user2/some-other-repo', 'git'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, self.consumer.channel, 'fake-queue', no_ack=False, limit=self.fake_config['rabbitmq_prefetch_read'] ) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index 2d98f28..79a6f09 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,162 +1,158 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest - +from glob import glob from nose.plugins.attrib import attr -from nose.tools import istest +from swh.core.utils import numfile_sortkey as sortkey from swh.core.tests.db_testing import DbTestFixture -from swh.scheduler.updater.events import SWHEvent +from swh.scheduler.tests import SQL_DIR +from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.writer import UpdaterWriter -from swh.scheduler.updater.events import LISTENED_EVENTS from . import UpdaterTestUtil -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') - - @attr('db') class CommonSchedulerTest(DbTestFixture): TEST_SCHED_DB = 'softwareheritage-scheduler-test' - TEST_SCHED_DUMP = os.path.join(TEST_DATA_DIR, - 'dumps/swh-scheduler.dump') + TEST_SCHED_DUMP = os.path.join(SQL_DIR, '*.sql') TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' - TEST_SCHED_UPDATER_DUMP = os.path.join(TEST_DATA_DIR, - 'dumps/swh-scheduler-updater.dump') + TEST_SCHED_UPDATER_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') @classmethod def setUpClass(cls): - cls.add_db(cls.TEST_SCHED_DB, cls.TEST_SCHED_DUMP) - cls.add_db(cls.TEST_SCHED_UPDATER_DB, cls.TEST_SCHED_UPDATER_DUMP) + cls.add_db(cls.TEST_SCHED_DB, + [(sqlfn, 'psql') for sqlfn in + sorted(glob(cls.TEST_SCHED_DUMP), key=sortkey)]) + cls.add_db(cls.TEST_SCHED_UPDATER_DB, + [(sqlfn, 'psql') for sqlfn in + sorted(glob(cls.TEST_SCHED_UPDATER_DUMP), key=sortkey)]) super().setUpClass() def tearDown(self): self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) self.reset_db_tables(self.TEST_SCHED_DB, excluded=['task_type', 'priority_ratio']) super().tearDown() class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() config = { 'scheduler': { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-test', 'cache_read_limit': 5, }, 'pause': 0.1, 'verbose': False, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend def tearDown(self): self.scheduler_backend.close_connection() self.scheduler_updater_backend.close_connection() super().tearDown() - @istest - def run_ko(self): + def test_run_ko(self): """Only git tasks are supported for now, other types are dismissed. """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'svn')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # other reads after writes are still empty since it's not supported self.assertEqual(len(r), 0) - @istest - def run_ok(self): + def test_run_ok(self): """Only git origin are supported for now """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'git')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() # now, we should have scheduling task ready r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEquals(len(r), expected_length) # Check the task has been scheduled for t in r: self.assertEquals(t['type'], 'origin-update-git') self.assertEquals(t['priority'], 'normal') self.assertEquals(t['policy'], 'oneshot') self.assertEquals(t['status'], 'next_run_not_scheduled') # writer has nothing to do now self.writer.run() # so no more data in cache data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), 0) # provided, no runner is ran, still the same amount of scheduling tasks r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEquals(len(r), expected_length) diff --git a/version.txt b/version.txt index e10cfc1..565ae72 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.31-0-g1c2899c \ No newline at end of file +v0.0.32-0-g8df816a \ No newline at end of file