diff --git a/PKG-INFO b/PKG-INFO index 70bb0b2..92333fc 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.34 +Version: 0.0.35 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everythong should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/debian/changelog b/debian/changelog index 3b37191..9017b72 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,289 +1,292 @@ -swh-scheduler (0.0.34-1~swh1~bpo9+1) stretch-swh; urgency=medium +swh-scheduler (0.0.35-1~swh1) unstable-swh; urgency=medium - * Rebuild for stretch-backports. + * v0.0.35 + * tests: Add SchedulerTestFixture + * swh.scheduler.utils: Allow to add more task information + * sql/40-swh-data: Update new indexer task types for local db - -- Antoine R. Dumont (@ardumont) Thu, 25 Oct 2018 17:52:03 +0200 + -- Antoine R. Dumont (@ardumont) Mon, 29 Oct 2018 10:07:08 +0100 swh-scheduler (0.0.34-1~swh1) unstable-swh; urgency=medium * v0.0.34 * Finalize pytest migration -- Antoine R. Dumont (@ardumont) Thu, 25 Oct 2018 17:52:03 +0200 swh-scheduler (0.0.33-1~swh1) unstable-swh; urgency=medium * v0.0.33 -- David Douard Thu, 25 Oct 2018 16:03:16 +0200 swh-scheduler (0.0.32-1~swh1) unstable-swh; urgency=medium * v0.0.32 * tests: Add celery fixture to ease tests * tests: make tests use sql/ files from the package * tests: Starting migration towards pytest * listener: Make the listener code compatible with new celery (debian buster) * Make swh_scheduler_create_tasks_from_temp use indexes * setup: prepare for pypi upload * docs: add a simple README file -- Antoine R. Dumont (@ardumont) Mon, 22 Oct 2018 15:37:51 +0200 swh-scheduler (0.0.31-1~swh1) unstable-swh; urgency=medium * v0.0.31 * sql/swh-scheduler: Make the create_tasks call idempotent * swh.scheduler.utils: Open create_task_dict function * sql/scheduler-data: Add lister gitlab task types * sql/scheduler-data: Reference the existing production lister data * swh.scheduler.backend_es: Open sniffing options -- Antoine R. Dumont (@ardumont) Tue, 31 Jul 2018 06:55:39 +0200 swh-scheduler (0.0.30-1~swh1) unstable-swh; urgency=medium * v0.0.30 * swh-scheduler-schema.sql: Archive disabled oneshot tasks as well * swh.scheduler.cli: Add policy to pretty printing task routine * swh.scheduler.cli: Fix broken cli list-pending since api change -- Antoine R. Dumont (@ardumont) Fri, 22 Jun 2018 18:07:02 +0200 swh-scheduler (0.0.29-1~swh1) unstable-swh; urgency=medium * v0.0.29 * swh.scheduler.cli: Change archival period to rolling month - 1 week * swh.scheduler.updater.writer: Force filter resolution to list * swh.scheduler.cli: Change default archival period to current month * swh.scheduler.cli: Improve logging message * swh.scheduler.updater.backend: Adapt configuration path accordingly -- Antoine R. Dumont (@ardumont) Thu, 31 May 2018 11:42:51 +0200 swh-scheduler (0.0.28-1~swh1) unstable-swh; urgency=medium * v0.0.28 * Fix wrong runtime dependencies -- Antoine R. Dumont (@ardumont) Tue, 29 May 2018 14:12:15 +0200 swh-scheduler (0.0.27-1~swh1) unstable-swh; urgency=medium * v0.0.27 * scheduler: Deal with priority in tasks * scheduler-update: new package python3-swh.scheduler.updater * Contains tools in charge of consuming events from arbitrary sources * and update the scheduler db -- Antoine R. Dumont (@ardumont) Tue, 29 May 2018 12:27:34 +0200 swh-scheduler (0.0.26-1~swh1) unstable-swh; urgency=medium * v0.0.26 * swh.scheduler: Fix package build * swh.scheduler.tests: Test remote scheduler api as well * swh.scheduler: Add tests around removing archivable tasks * swh.scheduler: Add tests around filtering archivable tasks * swh-scheduler-schema: Fix unneeded drop instructions * swh.scheduler.cli: Improve docstring * swh.scheduler.cli: Permit to specify the backend to use in cli * swh.scheduler.api: Bootstrap scheduler's remote api * swh.scheduler: Use `get_scheduler` api to instantiate a scheduler * swh.scheduler.backend: Fix docstring -- Antoine R. Dumont (@ardumont) Thu, 26 Apr 2018 17:34:07 +0200 swh-scheduler (0.0.25-1~swh1) unstable-swh; urgency=medium * v0.0.25 * swh.scheduler.cli.archive: Index arguments.kwargs as text -- Antoine R. Dumont (@ardumont) Wed, 18 Apr 2018 12:34:43 +0200 swh-scheduler (0.0.24-1~swh1) unstable-swh; urgency=medium * v0.0.24 * data/template: Do not index the arguments field (it's in _source) * data/README: Add a small readme to explain es install step * swh.scheduler.cli: Add a bulk index flag to separate read from index -- Antoine R. Dumont (@ardumont) Fri, 13 Apr 2018 14:55:32 +0200 swh-scheduler (0.0.23-1~swh1) unstable-swh; urgency=medium * swh.scheduler.cli.archive: Delete only completely indexed tasks * Prior to this commit, it could happen that we removed tasks even * though we did not yet index associated task_run. * Related T986 -- Antoine R. Dumont (@ardumont) Tue, 10 Apr 2018 17:43:07 +0200 swh-scheduler (0.0.22-1~swh1) unstable-swh; urgency=medium * v0.0.22 * Update to a more recent python3-elasticsearch client -- Antoine R. Dumont (@ardumont) Mon, 09 Apr 2018 16:09:16 +0200 swh-scheduler (0.0.21-1~swh1) unstable-swh; urgency=medium * v0.0.21 * Adapt default configuration * Fix typo in configuration variable name -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 15:02:55 +0200 swh-scheduler (0.0.20-1~swh1) unstable-swh; urgency=medium * v0.0.20 * swh.scheduler.cli.archive: Open completed oneshot or disabled * recurring tasks archival endpoint * swh.core.serializer: Move to msgpack serialization format * swh.scheduler.cli: Unify pretty print output * sql/data: Add new task type for loading mercurial dump * swh.scheduler.cli: Add sample use case for the scheduling cli * swh.scheduler.cli: Open policy column to the scheduling cli * swh.scheduler.cli: Open the delimiter option as cli argument * Fix issue when updating task-type without any retry delay defined * swh-scheduler/data: Add new oneshot scheduling load-mercurial task * backend: fix default scheduling_db value for consistency * backend: doc: fix return value of create_tasks -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 11:44:18 +0200 swh-scheduler (0.0.19-1~swh1) unstable-swh; urgency=medium * v0.0.19 * swh.scheduler.utils: Open utility function to create oneshot task -- Antoine R. Dumont (@ardumont) Wed, 29 Nov 2017 12:51:15 +0100 swh-scheduler (0.0.18-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.18 * Celery 4 compatibility -- Nicolas Dandrimont Wed, 08 Nov 2017 17:06:22 +0100 swh-scheduler (0.0.17-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.17 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 18:49:02 +0200 swh-scheduler (0.0.16-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.16 * add some tests * implement one-shot tasks * implement retry on temporary failure -- Nicolas Dandrimont Mon, 07 Aug 2017 18:44:03 +0200 swh-scheduler (0.0.15-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.15 * Add some methods to get the length of task queues * worker: Show logs on stdout if loglevel = debug -- Nicolas Dandrimont Mon, 19 Jun 2017 19:44:56 +0200 swh-scheduler (0.0.14-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler 0.0.14 * Make the return value of tasks available in the listener -- Nicolas Dandrimont Mon, 12 Jun 2017 17:50:32 +0200 swh-scheduler (0.0.13-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.13 * Use systemd for logging rather than PostgreSQL -- Nicolas Dandrimont Fri, 07 Apr 2017 11:57:50 +0200 swh-scheduler (0.0.12-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.12 * Only log to database if the configuration is present -- Nicolas Dandrimont Thu, 09 Mar 2017 11:12:45 +0100 swh-scheduler (0.0.11-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.11 * add utils.get_task -- Nicolas Dandrimont Tue, 14 Feb 2017 19:49:34 +0100 swh-scheduler (0.0.10-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.10 * Allow disabling tasks -- Nicolas Dandrimont Thu, 20 Oct 2016 17:20:17 +0200 swh-scheduler (0.0.9-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.9 * Revert management of one shot tasks * Add possibility of launching several worker instances -- Nicolas Dandrimont Fri, 02 Sep 2016 17:09:18 +0200 swh-scheduler (0.0.7-1~swh1) unstable-swh; urgency=medium * v0.0.7 * Add oneshot task -- Antoine R. Dumont (@ardumont) Fri, 01 Jul 2016 16:42:45 +0200 swh-scheduler (0.0.6-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.6 * More reliability and efficiency when scheduling a lot ot tasks -- Nicolas Dandrimont Wed, 24 Feb 2016 18:46:57 +0100 swh-scheduler (0.0.5-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.5 * Use copy for task mass-scheduling -- Nicolas Dandrimont Wed, 24 Feb 2016 12:13:38 +0100 swh-scheduler (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.4 * general cleanup of the backend * use arrow instead of dateutil * add new cli program -- Nicolas Dandrimont Tue, 23 Feb 2016 17:46:04 +0100 swh-scheduler (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.3 * Implement the timestamp arguments to the task_run functions * Make the celery event listener use a reliable queue -- Nicolas Dandrimont Mon, 22 Feb 2016 15:14:28 +0100 swh-scheduler (0.0.2-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.2 * Multiple schema changes * Initial releases for the celery job runner and the event listener -- Nicolas Dandrimont Fri, 19 Feb 2016 18:50:47 +0100 swh-scheduler (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * Release swh.scheduler v0.0.1 * Move swh.core.scheduling and swh.core.worker to swh.scheduler -- Nicolas Dandrimont Mon, 15 Feb 2016 11:07:30 +0100 diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 70bb0b2..92333fc 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,65 +1,65 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.0.34 +Version: 0.0.35 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). # Tests ## Running test manually ### Test data To be able to run (unit) tests, you need to have the [[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]] in the parent directory. If you have set your environment following the [[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]] document everythong should be set up just fine. Otherwise: ``` ~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata ``` ### Required services Unit tests that require a running celery broker uses an in memory broker/result backend by default, but you can choose to use a true broker by setting `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up. For example: ``` $ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests ..................................... ---------------------------------------------------------------------- Ran 37 tests in 15.578s OK ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index a4bf015..c70dbeb 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,89 +1,91 @@ .gitignore AUTHORS LICENSE LICENSE.Celery MANIFEST.in Makefile README.md pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.py tox.ini version.txt bin/swh-worker-control data/README.md data/elastic-template.json data/update-index-settings.json debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile sql/updater/sql/Makefile sql/updates/02.sql sql/updates/03.sql sql/updates/04.sql sql/updates/05.sql sql/updates/06.sql sql/updates/07.sql sql/updates/08.sql sql/updates/09.sql sql/updates/10.sql sql/updates/11.sql sql/updates/12.sql swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/sql/30-swh-schema.sql swh/scheduler/sql/40-swh-data.sql swh/scheduler/sql/updater/10-swh-init.sql swh/scheduler/sql/updater/30-swh-schema.sql swh/scheduler/sql/updater/40-swh-func.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/celery_testing.py +swh/scheduler/tests/scheduler_testing.py swh/scheduler/tests/test_api_client.py +swh/scheduler/tests/test_fixtures.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_task.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/updater/__init__.py swh/scheduler/tests/updater/test_backend.py swh/scheduler/tests/updater/test_consumer.py swh/scheduler/tests/updater/test_events.py swh/scheduler/tests/updater/test_ghtorrent.py swh/scheduler/tests/updater/test_writer.py swh/scheduler/updater/__init__.py swh/scheduler/updater/backend.py swh/scheduler/updater/consumer.py swh/scheduler/updater/events.py swh/scheduler/updater/writer.py swh/scheduler/updater/ghtorrent/__init__.py swh/scheduler/updater/ghtorrent/cli.py swh/scheduler/updater/ghtorrent/fake.py \ No newline at end of file diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index 25a0705..bae11d8 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,94 +1,105 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import time - import arrow from celery import group from swh.scheduler import get_scheduler, compute_nb_tasks_from from .config import app as main_app # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks(backend, app): """Run tasks that are ready Args backend (Scheduler): backend to read tasks to schedule app (App): Celery application to send tasks to + Returns: + A list of dictionaries: + { + 'task': the scheduler's task id, + 'backend_id': Celery's task id, + 'scheduler': arrow.utcnow() + } + + The result can be used to block-wait for the tasks' results: + + backend_tasks = run_ready_tasks(self.scheduler, app) + for task in backend_tasks: + AsyncResult(id=task['backend_id']).get() + """ + all_backend_tasks = [] while True: - throttled = False cursor = backend.cursor() task_types = {} pending_tasks = [] for task_type in backend.get_task_types(cursor=cursor): task_type_name = task_type['type'] task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] - if max_queue_length: - backend_name = task_type['backend_name'] + backend_name = task_type['backend_name'] + if max_queue_length and backend_name in app.tasks: queue_name = app.tasks[backend_name].task_queue queue_length = app.get_queue_length(queue_name) num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS if num_tasks > 0: num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) pending_tasks.extend( backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority, cursor=cursor)) if not pending_tasks: - break + return all_backend_tasks celery_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] celery_task = app.tasks[ task_types[task['type']]['backend_name'] ].s(*args, **kwargs) celery_tasks.append(celery_task) group_result = group(celery_tasks).delay() backend_tasks = [{ 'task': task['id'], 'backend_id': group_result.results[i].id, 'scheduled': arrow.utcnow(), } for i, task in enumerate(pending_tasks)] backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) backend.commit() - if throttled: - time.sleep(10) + all_backend_tasks.extend(backend_tasks) if __name__ == '__main__': for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler('local') try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise diff --git a/swh/scheduler/sql/40-swh-data.sql b/swh/scheduler/sql/40-swh-data.sql index b10d2e5..b41ed1b 100644 --- a/swh/scheduler/sql/40-swh-data.sql +++ b/swh/scheduler/sql/40-swh-data.sql @@ -1,194 +1,246 @@ insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-loader-mount-dump-and-load-svn-repository', 'Loading svn repositories from svn dump', 'swh.loader.svn.tasks.MountAndLoadSvnRepository', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-svn', 'Create dump of a remote svn repository, mount it and load it', 'swh.loader.svn.tasks.DumpMountAndLoadSvnRepository', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-loading', 'Loading deposit archive into swh through swh-loader-tar', 'swh.deposit.loader.tasks.LoadDepositArchiveTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, num_retries, max_queue_length) values ( 'swh-deposit-archive-checks', 'Pre-checking deposit step before loading into swh archive', 'swh.deposit.loader.tasks.ChecksDepositTsk', '1 day', '1 day', '1 day', 1, 3, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'swh-vault-cooking', 'Cook a Vault bundle', 'swh.vault.cooking_tasks.SWHCookingTask', '1 day', '1 day', '1 day', 1, 10000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-hg', 'Loading mercurial repository swh-loader-mercurial', 'swh.loader.mercurial.tasks.LoadMercurial', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-load-archive-hg', 'Loading archive mercurial repository swh-loader-mercurial', 'swh.loader.mercurial.tasks.LoadArchiveMercurial', '1 day', '1 day', '1 day', 1, 1000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-git', 'Update an origin of type git', 'swh.loader.git.tasks.UpdateGitRepository', '64 days', '12:00:00', '64 days', 2, 5000); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-github-incremental', 'Incrementally list GitHub', 'swh.lister.github.tasks.IncrementalGitHubLister', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-github-full', 'Full update of GitHub repos list', 'swh.lister.github.tasks.FullGitHubRelister', '90 days', '90 days', '90 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-debian', 'List a Debian distribution', 'swh.lister.debian.tasks.DebianListerTask', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-gitlab-incremental', 'Incrementally list a Gitlab instance', 'swh.lister.gitlab.tasks.IncrementalGitLabLister', '1 day', '1 day', '1 day', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-gitlab-full', 'Full update of a Gitlab instance''s repos list', 'swh.lister.gitlab.tasks.FullGitLabRelister', '90 days', '90 days', '90 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor) values ( 'swh-lister-pypi', 'Full pypi lister', 'swh.lister.pypi.tasks.PyPIListerTask', '1 days', '1 days', '1 days', 1); insert into task_type( type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length) values ( 'origin-update-pypi', 'Load Pypi origin', 'swh.loader.pypi.tasks.LoadPyPI', '64 days', '12:00:00', '64 days', 2, 5000); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor, + max_queue_length) +values ( + 'orchestrator', + 'swh.indexer orchestrator task', + 'swh.indexer.tasks.OrchestratorAllContents', + '1 day', '12:00:00', '1 days', 2, + 5000); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor, + max_queue_length) +values ( + 'orchestrator_text', + 'swh.indexer text orchestrator task', + 'swh.indexer.tasks.OrchestratorTextContents', + '1 day', '12:00:00', '1 days', 2, + 5000); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor, + max_queue_length) +values ( + 'indexer_mimetype', + 'Mimetype indexer task', + 'swh.indexer.tasks.ContentMimetype', + '1 day', '12:00:00', '1 days', 2, + 5000); + +insert into task_type( + type, + description, + backend_name, + default_interval, min_interval, max_interval, backoff_factor, + max_queue_length) +values ( + 'indexer_fossology_license', + 'Fossology license indexer task', + 'swh.indexer.tasks.ContentFossologyLicense', + '1 day', '12:00:00', '1 days', 2, + 5000); diff --git a/swh/scheduler/tests/scheduler_testing.py b/swh/scheduler/tests/scheduler_testing.py new file mode 100644 index 0000000..ba11ec3 --- /dev/null +++ b/swh/scheduler/tests/scheduler_testing.py @@ -0,0 +1,71 @@ +import glob +import pytest +import os.path +import datetime + +from celery.result import AsyncResult +from celery.contrib.testing.worker import start_worker +import celery.contrib.testing.tasks # noqa + +from swh.core.tests.db_testing import DbTestFixture, DB_DUMP_TYPES +from swh.core.utils import numfile_sortkey as sortkey + +from swh.scheduler import get_scheduler +from swh.scheduler.celery_backend.runner import run_ready_tasks +from swh.scheduler.celery_backend.config import app +from swh.scheduler.tests.celery_testing import CeleryTestFixture + +from . import SQL_DIR + +DUMP_FILES = os.path.join(SQL_DIR, '*.sql') + + +@pytest.mark.db +class SchedulerTestFixture(CeleryTestFixture, DbTestFixture): + """Base class for test case classes, providing an SWH scheduler as + the `scheduler` attribute.""" + SCHEDULER_DB_NAME = 'softwareheritage-scheduler-test-fixture' + + def add_scheduler_task_type(self, task_type, backend_name): + task_type = { + 'type': task_type, + 'description': 'Update a git repository', + 'backend_name': backend_name, + 'default_interval': datetime.timedelta(days=64), + 'min_interval': datetime.timedelta(hours=12), + 'max_interval': datetime.timedelta(days=64), + 'backoff_factor': 2, + 'max_queue_length': None, + 'num_retries': 7, + 'retry_delay': datetime.timedelta(hours=2), + } + self.scheduler.create_task_type(task_type) + + def run_ready_tasks(self): + """Runs the scheduler and a Celery worker, then blocks until + all tasks are completed.""" + with start_worker(app): + backend_tasks = run_ready_tasks(self.scheduler, app) + for task in backend_tasks: + AsyncResult(id=task['backend_id']).get() + + @classmethod + def setUpClass(cls): + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + all_dump_files = [(x, DB_DUMP_TYPES[os.path.splitext(x)[1]]) + for x in all_dump_files] + + cls.add_db(name=cls.SCHEDULER_DB_NAME, + dumps=all_dump_files) + super().setUpClass() + + def setUp(self): + super().setUp() + self.scheduler_config = { + 'scheduling_db': 'dbname=' + self.SCHEDULER_DB_NAME} + self.scheduler = get_scheduler('local', self.scheduler_config) + + def tearDown(self): + self.scheduler.close_connection() + super().tearDown() diff --git a/swh/scheduler/tests/test_fixtures.py b/swh/scheduler/tests/test_fixtures.py new file mode 100644 index 0000000..9e9146e --- /dev/null +++ b/swh/scheduler/tests/test_fixtures.py @@ -0,0 +1,38 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture +from swh.scheduler.task import Task +from swh.scheduler.utils import create_task_dict + +task_has_run = False + + +class SomeTestTask(Task): + def run(self, *, foo): + global task_has_run + assert foo == 'bar' + task_has_run = True + + +class FixtureTest(SchedulerTestFixture, unittest.TestCase): + def setUp(self): + super().setUp() + self.add_scheduler_task_type( + 'some_test_task_type', + 'swh.scheduler.tests.test_fixtures.SomeTestTask') + + def test_task_run(self): + self.scheduler.create_tasks([create_task_dict( + 'some_test_task_type', + 'oneshot', + foo='bar', + )]) + + self.assertEqual(task_has_run, False) + self.run_ready_tasks() + self.assertEqual(task_has_run, True) diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py index 192be61..2e34570 100644 --- a/swh/scheduler/tests/test_utils.py +++ b/swh/scheduler/tests/test_utils.py @@ -1,56 +1,79 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from datetime import timezone from unittest.mock import patch from swh.scheduler import utils class UtilsTest(unittest.TestCase): @patch('swh.scheduler.utils.datetime') def test_create_oneshot_task_dict_simple(self, mock_datetime): mock_datetime.now.return_value = 'some-date' actual_task = utils.create_oneshot_task_dict('some-task-type') expected_task = { 'policy': 'oneshot', 'type': 'some-task-type', 'next_run': 'some-date', 'arguments': { 'args': [], 'kwargs': {}, }, - 'priority': None, } self.assertEqual(actual_task, expected_task) mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch('swh.scheduler.utils.datetime') def test_create_oneshot_task_dict_other_call(self, mock_datetime): mock_datetime.now.return_value = 'some-other-date' actual_task = utils.create_oneshot_task_dict( 'some-task-type', 'arg0', 'arg1', priority='high', other_stuff='normal' ) expected_task = { 'policy': 'oneshot', 'type': 'some-task-type', 'next_run': 'some-other-date', 'arguments': { 'args': ('arg0', 'arg1'), 'kwargs': {'other_stuff': 'normal'}, }, 'priority': 'high', } self.assertEqual(actual_task, expected_task) mock_datetime.now.assert_called_once_with(tz=timezone.utc) + + @patch('swh.scheduler.utils.datetime') + def test_create_task_dict(self, mock_datetime): + mock_datetime.now.return_value = 'date' + + actual_task = utils.create_task_dict( + 'task-type', 'recurring', 'arg0', 'arg1', + priority='low', other_stuff='normal', retries_left=3 + ) + + expected_task = { + 'policy': 'recurring', + 'type': 'task-type', + 'next_run': 'date', + 'arguments': { + 'args': ('arg0', 'arg1'), + 'kwargs': {'other_stuff': 'normal'}, + }, + 'priority': 'low', + 'retries_left': 3, + } + + self.assertEqual(actual_task, expected_task) + mock_datetime.now.assert_called_once_with(tz=timezone.utc) diff --git a/swh/scheduler/utils.py b/swh/scheduler/utils.py index bf848c2..07c4c7b 100644 --- a/swh/scheduler/utils.py +++ b/swh/scheduler/utils.py @@ -1,71 +1,75 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone def get_task(task_name): """Retrieve task object in our application instance by its fully qualified python name. Args: task_name (str): task's name (e.g swh.loader.git.tasks.LoadDiskGitRepository) Returns: Instance of task """ from swh.scheduler.celery_backend.config import app for module in app.conf.CELERY_IMPORTS: __import__(module) return app.tasks[task_name] def create_task_dict(type, policy, *args, **kwargs): """Create a task with type and policy, scheduled for as soon as possible. Args: type (str): Type of oneshot task as per swh-scheduler's db table task_type's column (Ex: origin-update-git, swh-deposit-archive-checks) policy (str): oneshot or recurring policy Returns: Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks) """ - priority = None - if 'priority' in kwargs: - priority = kwargs.pop('priority') - return { + task_extra = {} + for extra_key in ['priority', 'retries_left']: + if extra_key in kwargs: + extra_val = kwargs.pop(extra_key) + task_extra[extra_key] = extra_val + + task = { 'policy': policy, 'type': type, 'next_run': datetime.now(tz=timezone.utc), 'arguments': { 'args': args if args else [], 'kwargs': kwargs if kwargs else {}, }, - 'priority': priority, } + task.update(task_extra) + return task def create_oneshot_task_dict(type, *args, **kwargs): """Create a oneshot task scheduled for as soon as possible. Args: type (str): Type of oneshot task as per swh-scheduler's db table task_type's column (Ex: origin-update-git, swh-deposit-archive-checks) Returns: Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks) """ return create_task_dict(type, 'oneshot', *args, **kwargs) diff --git a/version.txt b/version.txt index 5b8e11d..83ca342 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.34-0-gf326cef \ No newline at end of file +v0.0.35-0-g762b528 \ No newline at end of file