diff --git a/PKG-INFO b/PKG-INFO index 55ef8e3..364054a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,37 +1,37 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.18.2 +Version: 0.19.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator License-File: LICENSE License-File: LICENSE.Celery License-File: AUTHORS swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). diff --git a/debian/changelog b/debian/changelog index 2943212..aa33a44 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,1055 +1,1063 @@ -swh-scheduler (0.18.2-1~swh1~bpo10+1) buster-swh; urgency=medium +swh-scheduler (0.19.0-1~swh1) unstable-swh; urgency=medium - * Rebuild for buster-swh - - -- Software Heritage autobuilder (on jenkins-debian1) Mon, 18 Oct 2021 13:21:35 +0000 + * New upstream release 0.19.0 - (tagged by Antoine R. Dumont + (@ardumont) on 2021-10-28 13:10:55 + +0200) + * Upstream changes: - v0.19.0 - Add a new cli endpoint to + schedule recurrent visits in Celery - grab_next_visits: avoid + time interval calculations in PostgreSQL - Restrict the click + version to avoid conflict version with celery's - Add docstring + to runner and listener modules - Drop deprecated listener module + - scheduler: Deprecate unused main celery runner + + -- Software Heritage autobuilder (on jenkins-debian1) Thu, 28 Oct 2021 11:15:10 +0000 swh-scheduler (0.18.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.18.2 - (tagged by Antoine R. Dumont (@ardumont) on 2021-10-18 15:11:59 +0200) * Upstream changes: - v0.18.2 - Use swh_storage fixture for cli tests -- Software Heritage autobuilder (on jenkins-debian1) Mon, 18 Oct 2021 13:18:56 +0000 swh-scheduler (0.18.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.18.1 - (tagged by Antoine R. Dumont (@ardumont) on 2021-10-15 15:49:35 +0200) * Upstream changes: - v0.18.1 - Return 0 slot if no more slots available in the queues -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Oct 2021 13:53:38 +0000 swh-scheduler (0.18.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.18.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-09-02 11:32:59 +0200) * Upstream changes: - v0.18.0 - Refine scheduling policy for origins with no known last update - Add a swh scheduler origin send-to-celery subcommand - runner: Improve help message on the task types flag. - send-to-celery: Add more options to allow scheduling of edge cases - Add table sampling option to grab_next_visits - journal_client: Only upsert if we have something to upsert -- Software Heritage autobuilder (on jenkins-debian1) Thu, 02 Sep 2021 09:35:32 +0000 swh-scheduler (0.17.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.17.1 - (tagged by Antoine R. Dumont (@ardumont) on 2021-08-26 10:30:12 +0200) * Upstream changes: - v0.17.1 - journal_client: Ensure queue position does not overflow -- Software Heritage autobuilder (on jenkins-debian1) Thu, 26 Aug 2021 08:41:41 +0000 swh-scheduler (0.17.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.17.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-08-05 15:29:18 +0200) * Upstream changes: - v0.17.0 - Introduce new scheduling policy to grab origins without last update - journal_client: Disable origins when too many visited attempts failed - journal_client: Record last_visited and last_successful in origin_visit_stats - Add a specific cooldown for notfound origins - Add a (longer) specific cooldown for failed origin visits - Make the origin visit scheduling cooldown configurable - Various refactoring to simplify the grab next visits logic and updates -- Software Heritage autobuilder (on jenkins-debian1) Fri, 06 Aug 2021 09:11:54 +0000 swh-scheduler (0.16.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.16.0 - (tagged by Antoine Lambert on 2021-06-22 17:35:55 +0200) * Upstream changes: - version 0.16.0 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 22 Jun 2021 15:39:45 +0000 swh-scheduler (0.15.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.15.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-06-10 16:09:06 +0200) * Upstream changes: - v0.15.0 - separate-runner runner: Separate scheduling tasks with and without priority concern - Refactor and extract a get_available_slots utility - Add typing stubs dependencies for mypy>0.900 - pytest_plugin: Explicitly set hostname in broker_url for celery TestApp -- Software Heritage autobuilder (on jenkins-debian1) Thu, 10 Jun 2021 14:48:52 +0000 swh-scheduler (0.14.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.14.2 - (tagged by Valentin Lorentz on 2021-05-06 17:09:00 +0200) * Upstream changes: - v0.14.2 - * Fix flaky tests -- Software Heritage autobuilder (on jenkins-debian1) Thu, 06 May 2021 15:13:11 +0000 swh-scheduler (0.14.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.14.1 - (tagged by Antoine R. Dumont (@ardumont) on 2021-05-06 16:00:07 +0200) * Upstream changes: - v0.14.1 - Use swh.core 0.14 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 06 May 2021 14:17:39 +0000 swh-scheduler (0.13.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.13.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-04-20 11:46:51 +0200) * Upstream changes: - v0.13.0 - scheduler: Clean up priority/ratio task dead code - Parse task_ids before calling set_status_tasks. - tests: Complete checks on message with priority consumption -- Software Heritage autobuilder (on jenkins-debian1) Tue, 20 Apr 2021 09:51:00 +0000 swh-scheduler (0.12.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.12.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-04-15 13:31:30 +0200) * Upstream changes: - v0.12.0 - Route priority tasks to dedicated save code now queues - Fix various Sphinx warnings -- Software Heritage autobuilder (on jenkins-debian1) Thu, 15 Apr 2021 11:36:13 +0000 swh-scheduler (0.11.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.11.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-04-14 18:15:53 +0200) * Upstream changes: - v0.11.0 - separate-queues backend: Open endpoints to peek/grab tasks with any priority - Make origin_visit_stats_get return results from all pages - journal client: Filter out status messages without type - Simplify max_date() - journal_client: Fix date computations for (un)eventful visits - journal_client: Deal with failed status message -- Software Heritage autobuilder (on jenkins-debian1) Wed, 14 Apr 2021 16:19:31 +0000 swh-scheduler (0.10.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.10.0 - (tagged by Nicolas Dandrimont on 2021-02-03 22:53:20 +0100) * Upstream changes: - Release swh.scheduler 0.10.0 - Eagerly acknowledge celery tasks - Loads of simulator improvements - grab_next_visits: - clean up query building - account for schedule time to avoid rescheduling visits too fast - allow overriding the scheduling timestamp for the simulator -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Feb 2021 22:10:13 +0000 swh-scheduler (0.9.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.9.2 - (tagged by Antoine Lambert on 2021-01-25 16:27:41 +0100) * Upstream changes: - version 0.9.2 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 25 Jan 2021 15:31:21 +0000 swh-scheduler (0.9.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.9.1 - (tagged by Vincent SELLIER on 2021-01-21 19:20:33 +0100) * Upstream changes: - v0.9.1 - * Solve uneventful/eventful with unordered messages with snapshots - * Do not consider duplicated messages as uneventful event - * Reorganize grab_next_visits tests to better check sorting behavior -- Software Heritage autobuilder (on jenkins-debian1) Thu, 21 Jan 2021 18:28:00 +0000 swh-scheduler (0.9.0-1~swh2) unstable-swh; urgency=medium * Bump new release to unstuck packaging -- Antoine R. Dumont (@ardumont) Thu, 21 Jan 2021 13:20:14 +0000 swh-scheduler (0.9.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.9.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-01-21 11:54:47 +0100) * Upstream changes: - v0.9.0 - Populate origin_visit_stats table out of the origin_visit_status topic - Introduce a scheduler policy simulator (old task-based scheduler, ...) - Implement basic aggregated metrics on listed origins - scheduler.cli.journal: Add `swh scheduler journal-client` cli - Filter origins by visit type when scheduling the next visits - Introduce a `swh scheduler origin schedule-next` cli - Introduce a `swh scheduler origin grab-next` cli - Add an new origin visit stats model object and related backend api - Implement a basic endpoint for getting the next origins to visit - doc: Add a cli section to the doc -- Software Heritage autobuilder (on jenkins-debian1) Thu, 21 Jan 2021 11:00:29 +0000 swh-scheduler (0.8.2-1~swh2) unstable-swh; urgency=medium * Bump dependency -- Antoine R. Dumont (@ardumont) Tue, 08 Dec 2020 09:29:26 +0000 swh-scheduler (0.8.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.8.2 - (tagged by Antoine R. Dumont (@ardumont) on 2020-12-07 09:52:28 +0100) * Upstream changes: - v0.8.2 - requirement: Adapt celery requirements - Replace usage of arrow datetime objects in favor of pure datetime ones - Stop using the deprecated configuration scheme - cli.task_type: All task_type clis without a scheduler should raise -- Software Heritage autobuilder (on jenkins-debian1) Mon, 07 Dec 2020 08:55:39 +0000 swh-scheduler (0.8.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.8.1 - (tagged by Antoine R. Dumont (@ardumont) on 2020-11-24 14:13:36 +0100) * Upstream changes: - v0.8.1 - conftest: Reference swh.core.db.pytest_plugin -- Software Heritage autobuilder (on jenkins-debian1) Tue, 24 Nov 2020 13:16:08 +0000 swh-scheduler (0.8.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.8.0 - (tagged by Antoine R. Dumont (@ardumont) on 2020-11-23 13:42:05 +0100) * Upstream changes: - v0.8.0 - requirements-test.txt: Drop no longer needed pytest-postgresql requirement - scheduler.pytest_plugin: Make scheduler tests faster -- Software Heritage autobuilder (on jenkins-debian1) Mon, 23 Nov 2020 12:44:40 +0000 swh-scheduler (0.7.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.7.0 - (tagged by Antoine R. Dumont (@ardumont) on 2020-10-19 09:30:36 +0200) * Upstream changes: - v0.7.0 - scheduler: Type and unify get_scheduler factory with other factories - pytest_plugin: Explicitly name the scheduler test db differently - test_server: Simplify exception manipulations - tox.ini: pin black to the pre-commit version (19.10b0) to avoid flip-flops -- Software Heritage autobuilder (on jenkins-debian1) Mon, 19 Oct 2020 07:33:54 +0000 swh-scheduler (0.6.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.6.0 - (tagged by David Douard on 2020-09-25 12:03:33 +0200) * Upstream changes: - v0.6.0 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 25 Sep 2020 10:06:32 +0000 swh-scheduler (0.5.3-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.3 - (tagged by Nicolas Dandrimont on 2020-09-24 17:49:27 +0200) * Upstream changes: - Release swh.scheduler v0.5.3 - Improve swh cli startup time - Add isort and update flake8 - Improve pytest execution time - Support recent kombu versions -- Software Heritage autobuilder (on jenkins-debian1) Thu, 24 Sep 2020 15:53:25 +0000 swh-scheduler (0.5.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.2 - (tagged by Antoine R. Dumont (@ardumont) on 2020-07-10 13:01:48 +0200) * Upstream changes: - v0.5.2 - Do no expose pytest-plugin through setuptools, let modules require it when needed -- Software Heritage autobuilder (on jenkins-debian1) Fri, 10 Jul 2020 11:08:30 +0000 swh-scheduler (0.5.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.1 - (tagged by Nicolas Dandrimont on 2020-07-09 10:18:03 +0200) * Upstream changes: - Release swh.scheduler 0.5.1 - Drop dependency on future (not needed anymore) -- Software Heritage autobuilder (on jenkins-debian1) Thu, 09 Jul 2020 09:51:38 +0000 swh-scheduler (0.5.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.0 - (tagged by Nicolas Dandrimont on 2020-07-09 10:16:57 +0200) * Upstream changes: - Release swh.scheduler v0.5.0 - Move celery fixtures to the pytest plugin -- Software Heritage autobuilder (on jenkins-debian1) Thu, 09 Jul 2020 08:20:42 +0000 swh-scheduler (0.4.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.0 - (tagged by Nicolas Dandrimont on 2020-07-06 16:47:28 +0200) * Upstream changes: - Release swh.scheduler 0.4.0 - Extract pytest fixtures to a pytest plugin -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jul 2020 14:52:42 +0000 swh-scheduler (0.3.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.0 - (tagged by Nicolas Dandrimont on 2020-07-06 12:18:28 +0200) * Upstream changes: - Release swh.scheduler 0.3.0 - Add get_listed_origins endpoint -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jul 2020 10:23:31 +0000 swh-scheduler (0.2.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.2 - (tagged by Nicolas Dandrimont on 2020-06-22 14:03:34 +0200) * Upstream changes: - Release swh.scheduler 0.2.2 - Re- introduce root endpoint for the RPC server -- Software Heritage autobuilder (on jenkins-debian1) Mon, 22 Jun 2020 12:07:05 +0000 swh-scheduler (0.2.1-1~swh1) unstable-swh; urgency=medium [ Nicolas Dandrimont ] * Force celery >= 4.3 [ Software Heritage autobuilder (on jenkins-debian1) ] * New upstream release 0.2.1 - (tagged by Nicolas Dandrimont on 2020-06-22 12:09:32 +0200) * Upstream changes: - Release swh.scheduler 0.2.1 - Bump celery requirement to 4.3+ -- Software Heritage autobuilder (on jenkins-debian1) Mon, 22 Jun 2020 10:12:50 +0000 swh-scheduler (0.2.0-1~swh1) unstable-swh; urgency=medium [ Nicolas Dandrimont ] * Switch from vcversioner to setuptools-scm * wrap-and-sort [ Software Heritage autobuilder (on jenkins-debian1) ] * New upstream release 0.2.0 - (tagged by Nicolas Dandrimont on 2020-06-22 10:33:11 +0200) * Upstream changes: - Release swh.scheduler 0.2.0 - Implement storage of lister and listed origin information - Add swh scheduler celery-monitor command - Overhaul RPC to use automatic generation -- Software Heritage autobuilder (on jenkins-debian1) Mon, 22 Jun 2020 08:36:49 +0000 swh-scheduler (0.1.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.1.1 - (tagged by Nicolas Dandrimont on 2020-06-03 11:34:19 +0200) * Upstream changes: - Release swh.scheduler v0.1.1 - Add missing dependency on future for celery 4.4.4 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jun 2020 09:39:25 +0000 swh-scheduler (0.1.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.1.0 - (tagged by Nicolas Dandrimont on 2020-05-19 11:48:34 +0200) * Upstream changes: - Release swh.scheduler v0.1.0 - Blacken source code - Disable azure http logspam - Only schedule tasks when the buffer is somewhat empty -- Software Heritage autobuilder (on jenkins-debian1) Tue, 19 May 2020 09:52:31 +0000 swh-scheduler (0.0.72-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.72 - (tagged by Nicolas Dandrimont on 2020-03-23 13:07:38 +0100) * Upstream changes: - Release swh.scheduler v0.0.72 - Update instantiation of storage in tests - ensure that create_task_type is idempotent - introduce new listener based on pika -- Software Heritage autobuilder (on jenkins-debian1) Mon, 23 Mar 2020 12:12:00 +0000 swh-scheduler (0.0.71-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.71 - (tagged by Antoine R. Dumont (@ardumont) on 2020-01-23 14:24:56 +0100) * Upstream changes: - v0.0.71 - sentry: Fix initialization init_sentry call -- Software Heritage autobuilder (on jenkins-debian1) Thu, 23 Jan 2020 13:29:33 +0000 swh-scheduler (0.0.70-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.70 - (tagged by Antoine R. Dumont (@ardumont) on 2020-01-23 13:43:35 +0100) * Upstream changes: - v0.0.70 - Use swh.core.sentry instead of calling sentry_sdk.init directly - backend_es: Fix configuration mapping -- Software Heritage autobuilder (on jenkins-debian1) Thu, 23 Jan 2020 12:47:43 +0000 swh-scheduler (0.0.69-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.69 - (tagged by Antoine R. Dumont (@ardumont) on 2019-12-17 16:00:24 +0100) * Upstream changes: - v0.0.69 - Fix scheduler's archive task cli - Make the filter task endpoint a paginated endpoint - Add coverage on the archive task cli -- Software Heritage autobuilder (on jenkins-debian1) Tue, 17 Dec 2019 15:04:48 +0000 swh-scheduler (0.0.68-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.68 - (tagged by Antoine R. Dumont (@ardumont) on 2019-12-17 15:28:13 +0100) * Upstream changes: - v0.0.68 - Fix scheduler's archive task cli - Make the filter task endpoint a paginated endpoint - Add coverage on the archive task cli -- Software Heritage autobuilder (on jenkins-debian1) Tue, 17 Dec 2019 14:33:33 +0000 swh-scheduler (0.0.67-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.67 - (tagged by Antoine R. Dumont (@ardumont) on 2019-12-17 14:33:36 +0100) * Upstream changes: - v0.0.67 - Fix scheduler's archive task cli - Make the filter task endpoint a paginated endpoint - Add coverage on the archive task cli -- Software Heritage autobuilder (on jenkins-debian1) Tue, 17 Dec 2019 13:38:03 +0000 swh-scheduler (0.0.66-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.66 - (tagged by Nicolas Dandrimont on 2019-12-17 12:04:20 +0100) * Upstream changes: - Release swh.scheduler v0.0.66 - initialize sentry on celery worker startup - improve task archival endpoints in backend api -- Software Heritage autobuilder (on jenkins-debian1) Tue, 17 Dec 2019 11:08:25 +0000 swh-scheduler (0.0.65-1~swh2) unstable-swh; urgency=medium * Add pytest-mock build-dependency. -- Nicolas Dandrimont Fri, 13 Dec 2019 11:57:41 +0100 swh-scheduler (0.0.65-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.65 - (tagged by Nicolas Dandrimont on 2019-12-13 11:45:55 +0100) * Upstream changes: - Release swh.scheduler v0.0.65 - Drop the scheduler updater - Add a statsd probe for task execution timestamps - Add listener and runner statsd probes - CLI updates - Python packaging housekeeping -- Software Heritage autobuilder (on jenkins-debian1) Fri, 13 Dec 2019 10:54:31 +0000 swh-scheduler (0.0.64-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.64 - (tagged by Antoine R. Dumont (@ardumont) on 2019-11-20 14:26:00 +0100) * Upstream changes: - v0.0.64 - req-swh*: Remove old package loader backend names -- Software Heritage autobuilder (on jenkins-debian1) Wed, 20 Nov 2019 13:29:37 +0000 swh-scheduler (0.0.63-1~swh2) unstable-swh; urgency=medium * Update build dependency -- Antoine R. Dumont (@ardumont) Tue, 19 Nov 2019 17:07:40 +0100 swh-scheduler (0.0.63-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.63 - (tagged by Antoine R. Dumont (@ardumont) on 2019-11-19 14:09:12 +0100) * Upstream changes: - v0.0.63 - swh.scheduler.cli: Add `swh scheduler task-type register` cli - Use the shared_task decorator instead of binding to a specific celery app - celery/tests: mostly revert e770eb30 to fix celery app initialization in tests -- Software Heritage autobuilder (on jenkins-debian1) Tue, 19 Nov 2019 13:14:59 +0000 swh-scheduler (0.0.62-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.62 - (tagged by Antoine R. Dumont (@ardumont) on 2019-10-18 13:39:27 +0200) * Upstream changes: - v0.0.62 - celery_backend.config: Make JournalHandler import optional - tests: rewrite tests using pytest fixtures -- Software Heritage autobuilder (on jenkins-debian1) Fri, 18 Oct 2019 11:46:26 +0000 swh-scheduler (0.0.61-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.61 - (tagged by Nicolas Dandrimont on 2019-10-07 16:33:17 +0200) * Upstream changes: - Release swh.scheduler v0.0.61 - Remove bogus dict.get(default=) statement -- Software Heritage autobuilder (on jenkins-debian1) Mon, 07 Oct 2019 14:37:37 +0000 swh-scheduler (0.0.60-1~swh2) unstable-swh; urgency=medium * Force postgresql executable to a pg_ctl that exists when running tests. -- Nicolas Dandrimont Tue, 01 Oct 2019 18:14:39 +0200 swh-scheduler (0.0.60-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.60 - (tagged by Stefano Zacchiroli on 2019-10-01 13:13:13 +0200) * Upstream changes: - v0.0.60 - * tox: anticipate mypy run to just after flake8 - * init.py: switch to documented way of extending path - * tox.ini: add mypy section - * typing: minimal changes to make a no-op mypy run pass - * fix typo in docstring and sample file name - * admin CLI: drop obsolete backward compatibility aliases - * click "required" param wants bool, not int -- Software Heritage autobuilder (on jenkins-debian1) Tue, 01 Oct 2019 11:22:43 +0000 swh-scheduler (0.0.59-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.59 - (tagged by David Douard on 2019-09-04 16:08:27 +0200) * Upstream changes: - v0.0.59 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 04 Sep 2019 14:11:48 +0000 swh-scheduler (0.0.58-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.58 - (tagged by Antoine R. Dumont (@ardumont) on 2019-09-03 10:19:34 +0200) * Upstream changes: - v0.0.58 - celery: auto add tasks declared in the swh.workers entry point in task_modules - api/client: use RPCClient instead of deprecated SWHRemoteAPI - Make schedule_origins use origin urls instead of ids in task arguments. - docs: add code of conduct document - docs: very beginning of a practical documentation on the scheduler - config: Add a pre-commit config file - data: Insert new cgit instance lister task - data: Insert load-tar task-type -- Software Heritage autobuilder (on jenkins-debian1) Tue, 03 Sep 2019 08:28:19 +0000 swh-scheduler (0.0.57-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.57 - (tagged by David Douard on 2019-06-26 14:56:32 +0200) * Upstream changes: - v0.0.57 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 26 Jun 2019 13:05:20 +0000 swh-scheduler (0.0.56-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.56 - (tagged by Nicolas Dandrimont on 2019-05-07 18:16:20 +0200) * Upstream changes: - listener: Release the db object after using it - This is the contract that get_db/put_db is supposed to conform to. -- Software Heritage autobuilder (on jenkins-debian1) Tue, 14 May 2019 12:40:09 +0000 swh-scheduler (0.0.55-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.55 - (tagged by Antoine Lambert on 2019-05-06 11:47:43 +0200) * Upstream changes: - version 0.0.55 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 May 2019 09:54:51 +0000 swh-scheduler (0.0.54-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.54 - (tagged by Antoine R. Dumont (@ardumont) on 2019-04-11 11:33:40 +0200) * Upstream changes: - v0.0.54 - cli_utils: Use yaml.safe_load instead of yaml.load - Fix support of latest versions of swh- core and psycopg2 - sql/data: Add npm related task types -- Software Heritage autobuilder (on jenkins-debian1) Thu, 11 Apr 2019 09:40:14 +0000 swh-scheduler (0.0.53-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.53 - (tagged by Antoine Lambert on 2019-04-04 16:45:56 +0200) * Upstream changes: - version 0.0.53 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 04 Apr 2019 14:55:20 +0000 swh-scheduler (0.0.52-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.52 - (tagged by Nicolas Dandrimont on 2019-04-03 10:54:06 +0200) * Upstream changes: - Release swh.scheduler v0.0.52 - Move to result_serializer = json to work around celery 4.3 bug - Fix db initialization -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Apr 2019 08:59:00 +0000 swh-scheduler (0.0.51-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.51 - (tagged by Antoine R. Dumont (@ardumont) on 2019-03-22 12:09:22 +0100) * Upstream changes: - v0.0.51 - requirements.txt: Remove kombu dependency -- Software Heritage autobuilder (on jenkins-debian1) Fri, 22 Mar 2019 11:16:06 +0000 swh-scheduler (0.0.50-1~swh2) unstable-swh; urgency=medium * Update build- and runtime dependencies -- Nicolas Dandrimont Fri, 15 Mar 2019 18:24:11 +0100 swh-scheduler (0.0.50-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.50 - (tagged by Nicolas Dandrimont on 2019-03-15 18:07:24 +0100) * Upstream changes: - Release swh.scheduler v0.0.50 - Add an explicit log target for stdout and/or journald - Avoid useless log lines - Improve test coverage - Add support for non- string options in the CLI -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Mar 2019 17:16:03 +0000 swh-scheduler (0.0.49-1~swh2) unstable-swh; urgency=medium * Export LC_ALL=C.UTF-8 -- Nicolas Dandrimont Thu, 14 Mar 2019 13:42:24 +0100 swh-scheduler (0.0.49-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.49 - (tagged by Nicolas Dandrimont on 2019-03-03 08:48:04 +0100) * Upstream changes: - Release swh.scheduler v0.0.49 - various fixes around celery behavior - move wsgi endpoint to a separate module - add tests for the CLI -- Software Heritage autobuilder (on jenkins-debian1) Sun, 03 Mar 2019 07:55:41 +0000 swh-scheduler (0.0.48-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.48 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-22 16:11:51 +0100) * Upstream changes: - v0.0.48 - Fix comment on main scheduler schema -- Software Heritage autobuilder (on jenkins-debian1) Fri, 22 Feb 2019 15:17:20 +0000 swh-scheduler (0.0.47-1~swh2) unstable-swh; urgency=low * Upstream release to fix build dependencies issue -- Antoine Romain Dumont (@ardumont) Thu, 21 Feb 2019 15:41:24 +0100 swh-scheduler (0.0.47-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.47 - (tagged by Valentin Lorentz on 2019-02-20 16:53:20 +0100) * Upstream changes: - Fix crash of SchedulerBackend.search_tasks when no argument is given. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 21 Feb 2019 09:13:07 +0000 swh-scheduler (0.0.46-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.46 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-15 15:05:47 +0100) * Upstream changes: - v0.0.46 - scheduler.task: Remove no longer used Task class -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Feb 2019 14:15:26 +0000 swh-scheduler (0.0.45-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.45 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-15 10:43:07 +0100) * Upstream changes: - v0.0.45 - celery_backend/config: Fix loglevel for amqp module -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Feb 2019 09:48:25 +0000 swh-scheduler (0.0.44-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.44 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-13 16:29:05 +0100) * Upstream changes: - v0.0.44 - swh-scheduler-api: Fix configuration read too many times -- Software Heritage autobuilder (on jenkins-debian1) Wed, 13 Feb 2019 15:34:34 +0000 swh-scheduler (0.0.43-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.43 - (tagged by David Douard on 2019-02-13 15:27:27 +0100) * Upstream changes: - v0.0.43 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 13 Feb 2019 14:46:59 +0000 swh-scheduler (0.0.42-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.42 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-11 14:28:10 +0100) * Upstream changes: - v0.0.42 - Fix dependency requirements for hypothesis -- Software Heritage autobuilder (on jenkins-debian1) Mon, 11 Feb 2019 13:33:48 +0000 swh-scheduler (0.0.41-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.41 - (tagged by David Douard on 2019-02-06 15:25:56 +0100) * Upstream changes: - v0.0.41 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 06 Feb 2019 15:33:04 +0000 swh-scheduler (0.0.40-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.40 - (tagged by Antoine R. Dumont (@ardumont) on 2019-01-28 16:24:04 +0100) * Upstream changes: - v0.0.40 - swh.scheduler.tests: Mark db tests as such - Force tox environment to C.UTF-8 locale - Add debug logging in the SWHTask class -- Software Heritage autobuilder (on jenkins-debian1) Mon, 28 Jan 2019 15:30:41 +0000 swh-scheduler (0.0.39-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.39 - (tagged by David Douard on 2019-01-16 13:37:58 +0100) * Upstream changes: - v0.0.39 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 16 Jan 2019 12:42:37 +0000 swh-scheduler (0.0.38-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.38 - (tagged by David Douard on 2018-12-20 14:39:59 +0100) * Upstream changes: - v0.0.38 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 09 Jan 2019 18:32:14 +0000 swh-scheduler (0.0.35-1~swh1) unstable-swh; urgency=medium * v0.0.35 * tests: Add SchedulerTestFixture * swh.scheduler.utils: Allow to add more task information * sql/40-swh-data: Update new indexer task types for local db -- Antoine R. Dumont (@ardumont) Mon, 29 Oct 2018 10:07:08 +0100 swh-scheduler (0.0.34-1~swh1) unstable-swh; urgency=medium * v0.0.34 * Finalize pytest migration -- Antoine R. Dumont (@ardumont) Thu, 25 Oct 2018 17:52:03 +0200 swh-scheduler (0.0.33-1~swh1) unstable-swh; urgency=medium * v0.0.33 -- David Douard Thu, 25 Oct 2018 16:03:16 +0200 swh-scheduler (0.0.32-1~swh1) unstable-swh; urgency=medium * v0.0.32 * tests: Add celery fixture to ease tests * tests: make tests use sql/ files from the package * tests: Starting migration towards pytest * listener: Make the listener code compatible with new celery (debian buster) * Make swh_scheduler_create_tasks_from_temp use indexes * setup: prepare for pypi upload * docs: add a simple README file -- Antoine R. Dumont (@ardumont) Mon, 22 Oct 2018 15:37:51 +0200 swh-scheduler (0.0.31-1~swh1) unstable-swh; urgency=medium * v0.0.31 * sql/swh-scheduler: Make the create_tasks call idempotent * swh.scheduler.utils: Open create_task_dict function * sql/scheduler-data: Add lister gitlab task types * sql/scheduler-data: Reference the existing production lister data * swh.scheduler.backend_es: Open sniffing options -- Antoine R. Dumont (@ardumont) Tue, 31 Jul 2018 06:55:39 +0200 swh-scheduler (0.0.30-1~swh1) unstable-swh; urgency=medium * v0.0.30 * swh-scheduler-schema.sql: Archive disabled oneshot tasks as well * swh.scheduler.cli: Add policy to pretty printing task routine * swh.scheduler.cli: Fix broken cli list-pending since api change -- Antoine R. Dumont (@ardumont) Fri, 22 Jun 2018 18:07:02 +0200 swh-scheduler (0.0.29-1~swh1) unstable-swh; urgency=medium * v0.0.29 * swh.scheduler.cli: Change archival period to rolling month - 1 week * swh.scheduler.updater.writer: Force filter resolution to list * swh.scheduler.cli: Change default archival period to current month * swh.scheduler.cli: Improve logging message * swh.scheduler.updater.backend: Adapt configuration path accordingly -- Antoine R. Dumont (@ardumont) Thu, 31 May 2018 11:42:51 +0200 swh-scheduler (0.0.28-1~swh1) unstable-swh; urgency=medium * v0.0.28 * Fix wrong runtime dependencies -- Antoine R. Dumont (@ardumont) Tue, 29 May 2018 14:12:15 +0200 swh-scheduler (0.0.27-1~swh1) unstable-swh; urgency=medium * v0.0.27 * scheduler: Deal with priority in tasks * scheduler-update: new package python3-swh.scheduler.updater * Contains tools in charge of consuming events from arbitrary sources * and update the scheduler db -- Antoine R. Dumont (@ardumont) Tue, 29 May 2018 12:27:34 +0200 swh-scheduler (0.0.26-1~swh1) unstable-swh; urgency=medium * v0.0.26 * swh.scheduler: Fix package build * swh.scheduler.tests: Test remote scheduler api as well * swh.scheduler: Add tests around removing archivable tasks * swh.scheduler: Add tests around filtering archivable tasks * swh-scheduler-schema: Fix unneeded drop instructions * swh.scheduler.cli: Improve docstring * swh.scheduler.cli: Permit to specify the backend to use in cli * swh.scheduler.api: Bootstrap scheduler's remote api * swh.scheduler: Use `get_scheduler` api to instantiate a scheduler * swh.scheduler.backend: Fix docstring -- Antoine R. Dumont (@ardumont) Thu, 26 Apr 2018 17:34:07 +0200 swh-scheduler (0.0.25-1~swh1) unstable-swh; urgency=medium * v0.0.25 * swh.scheduler.cli.archive: Index arguments.kwargs as text -- Antoine R. Dumont (@ardumont) Wed, 18 Apr 2018 12:34:43 +0200 swh-scheduler (0.0.24-1~swh1) unstable-swh; urgency=medium * v0.0.24 * data/template: Do not index the arguments field (it's in _source) * data/README: Add a small readme to explain es install step * swh.scheduler.cli: Add a bulk index flag to separate read from index -- Antoine R. Dumont (@ardumont) Fri, 13 Apr 2018 14:55:32 +0200 swh-scheduler (0.0.23-1~swh1) unstable-swh; urgency=medium * swh.scheduler.cli.archive: Delete only completely indexed tasks * Prior to this commit, it could happen that we removed tasks even * though we did not yet index associated task_run. * Related T986 -- Antoine R. Dumont (@ardumont) Tue, 10 Apr 2018 17:43:07 +0200 swh-scheduler (0.0.22-1~swh1) unstable-swh; urgency=medium * v0.0.22 * Update to a more recent python3-elasticsearch client -- Antoine R. Dumont (@ardumont) Mon, 09 Apr 2018 16:09:16 +0200 swh-scheduler (0.0.21-1~swh1) unstable-swh; urgency=medium * v0.0.21 * Adapt default configuration * Fix typo in configuration variable name -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 15:02:55 +0200 swh-scheduler (0.0.20-1~swh1) unstable-swh; urgency=medium * v0.0.20 * swh.scheduler.cli.archive: Open completed oneshot or disabled * recurring tasks archival endpoint * swh.core.serializer: Move to msgpack serialization format * swh.scheduler.cli: Unify pretty print output * sql/data: Add new task type for loading mercurial dump * swh.scheduler.cli: Add sample use case for the scheduling cli * swh.scheduler.cli: Open policy column to the scheduling cli * swh.scheduler.cli: Open the delimiter option as cli argument * Fix issue when updating task-type without any retry delay defined * swh-scheduler/data: Add new oneshot scheduling load-mercurial task * backend: fix default scheduling_db value for consistency * backend: doc: fix return value of create_tasks -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 11:44:18 +0200 swh-scheduler (0.0.19-1~swh1) unstable-swh; urgency=medium * v0.0.19 * swh.scheduler.utils: Open utility function to create oneshot task -- Antoine R. Dumont (@ardumont) Wed, 29 Nov 2017 12:51:15 +0100 swh-scheduler (0.0.18-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.18 * Celery 4 compatibility -- Nicolas Dandrimont Wed, 08 Nov 2017 17:06:22 +0100 swh-scheduler (0.0.17-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.17 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 18:49:02 +0200 swh-scheduler (0.0.16-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.16 * add some tests * implement one-shot tasks * implement retry on temporary failure -- Nicolas Dandrimont Mon, 07 Aug 2017 18:44:03 +0200 swh-scheduler (0.0.15-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.15 * Add some methods to get the length of task queues * worker: Show logs on stdout if loglevel = debug -- Nicolas Dandrimont Mon, 19 Jun 2017 19:44:56 +0200 swh-scheduler (0.0.14-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler 0.0.14 * Make the return value of tasks available in the listener -- Nicolas Dandrimont Mon, 12 Jun 2017 17:50:32 +0200 swh-scheduler (0.0.13-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.13 * Use systemd for logging rather than PostgreSQL -- Nicolas Dandrimont Fri, 07 Apr 2017 11:57:50 +0200 swh-scheduler (0.0.12-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.12 * Only log to database if the configuration is present -- Nicolas Dandrimont Thu, 09 Mar 2017 11:12:45 +0100 swh-scheduler (0.0.11-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.11 * add utils.get_task -- Nicolas Dandrimont Tue, 14 Feb 2017 19:49:34 +0100 swh-scheduler (0.0.10-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.10 * Allow disabling tasks -- Nicolas Dandrimont Thu, 20 Oct 2016 17:20:17 +0200 swh-scheduler (0.0.9-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.9 * Revert management of one shot tasks * Add possibility of launching several worker instances -- Nicolas Dandrimont Fri, 02 Sep 2016 17:09:18 +0200 swh-scheduler (0.0.7-1~swh1) unstable-swh; urgency=medium * v0.0.7 * Add oneshot task -- Antoine R. Dumont (@ardumont) Fri, 01 Jul 2016 16:42:45 +0200 swh-scheduler (0.0.6-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.6 * More reliability and efficiency when scheduling a lot of tasks -- Nicolas Dandrimont Wed, 24 Feb 2016 18:46:57 +0100 swh-scheduler (0.0.5-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.5 * Use copy for task mass-scheduling -- Nicolas Dandrimont Wed, 24 Feb 2016 12:13:38 +0100 swh-scheduler (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.4 * general cleanup of the backend * use arrow instead of dateutil * add new cli program -- Nicolas Dandrimont Tue, 23 Feb 2016 17:46:04 +0100 swh-scheduler (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.3 * Implement the timestamp arguments to the task_run functions * Make the celery event listener use a reliable queue -- Nicolas Dandrimont Mon, 22 Feb 2016 15:14:28 +0100 swh-scheduler (0.0.2-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.2 * Multiple schema changes * Initial releases for the celery job runner and the event listener -- Nicolas Dandrimont Fri, 19 Feb 2016 18:50:47 +0100 swh-scheduler (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * Release swh.scheduler v0.0.1 * Move swh.core.scheduling and swh.core.worker to swh.scheduler -- Nicolas Dandrimont Mon, 15 Feb 2016 11:07:30 +0100 diff --git a/requirements-test.txt b/requirements-test.txt index fc6c024..726caf0 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,10 +1,11 @@ pytest pytest-mock celery >= 4.3 hypothesis >= 3.11.0 swh.lister swh.storage[testing] types-click types-flask types-pyyaml types-requests +types-Deprecated diff --git a/requirements.txt b/requirements.txt index 87ed09c..1b8e3a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,17 +1,17 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html attrs attrs-strict celery >= 4.3, != 5.0.3 -click +click < 8.0 elasticsearch > 5.4 flask humanize pika >= 1.1.0 psycopg2 pyyaml requests setuptools typing-extensions diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 55ef8e3..364054a 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,37 +1,37 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.18.2 +Version: 0.19.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator License-File: LICENSE License-File: LICENSE.Celery License-File: AUTHORS swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index d76e928..cf90d4a 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,136 +1,137 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE LICENSE.Celery MANIFEST.in Makefile README.md conftest.py mypy.ini pyproject.toml pytest.ini requirements-journal.txt requirements-simulator.txt requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini data/README.md data/elastic-template.json data/update-index-settings.json docs/.gitignore docs/Makefile docs/cli.rst docs/conf.py docs/index.rst docs/simulator.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile sql/updates/02.sql sql/updates/03.sql sql/updates/04.sql sql/updates/05.sql sql/updates/06.sql sql/updates/07.sql sql/updates/08.sql sql/updates/09.sql sql/updates/10.sql sql/updates/11.sql sql/updates/12.sql sql/updates/13.sql sql/updates/14.sql sql/updates/15.sql sql/updates/16.sql sql/updates/17.sql sql/updates/18.sql sql/updates/19.sql sql/updates/20.sql sql/updates/23.sql sql/updates/24.sql sql/updates/25.sql sql/updates/26.sql sql/updates/27.sql sql/updates/28.sql sql/updates/29.sql sql/updates/30-bis.sql sql/updates/30.sql swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli_utils.py swh/scheduler/elasticsearch_memory.py swh/scheduler/exc.py swh/scheduler/interface.py swh/scheduler/journal_client.py swh/scheduler/model.py swh/scheduler/py.typed swh/scheduler/pytest_plugin.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/serializers.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py -swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/pika_listener.py +swh/scheduler/celery_backend/recurrent_visits.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py swh/scheduler/cli/admin.py swh/scheduler/cli/celery_monitor.py swh/scheduler/cli/journal.py swh/scheduler/cli/origin.py swh/scheduler/cli/simulator.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py swh/scheduler/cli/utils.py swh/scheduler/simulator/__init__.py swh/scheduler/simulator/common.py swh/scheduler/simulator/origin_scheduler.py swh/scheduler/simulator/origins.py swh/scheduler/simulator/task_scheduler.py swh/scheduler/sql/10-superuser-init.sql swh/scheduler/sql/30-schema.sql swh/scheduler/sql/40-func.sql swh/scheduler/sql/50-data.sql swh/scheduler/sql/60-indexes.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/common.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py swh/scheduler/tests/test_cli_celery_monitor.py swh/scheduler/tests/test_cli_journal.py swh/scheduler/tests/test_cli_origin.py swh/scheduler/tests/test_cli_task_type.py swh/scheduler/tests/test_common.py swh/scheduler/tests/test_config.py swh/scheduler/tests/test_init.py swh/scheduler/tests/test_journal_client.py swh/scheduler/tests/test_model.py +swh/scheduler/tests/test_recurrent_visits.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_simulator.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/es/__init__.py swh/scheduler/tests/es/conftest.py swh/scheduler/tests/es/test_backend_es.py swh/scheduler/tests/es/test_cli_task.py swh/scheduler/tests/es/test_elasticsearch_memory.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index a772f88..0fb232c 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,37 +1,38 @@ attrs attrs-strict celery!=5.0.3,>=4.3 -click +click<8.0 elasticsearch>5.4 flask humanize pika>=1.1.0 psycopg2 pyyaml requests setuptools typing-extensions swh.core[db,http]>=0.14.0 swh.storage>=0.11.1 [journal] swh.journal [simulator] plotille simpy<4,>=3 [testing] pytest pytest-mock celery>=4.3 hypothesis>=3.11.0 swh.lister swh.storage[testing] types-click types-flask types-pyyaml types-requests +types-Deprecated swh.journal plotille simpy<4,>=3 diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 19b48a0..309957a 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,1105 +1,1102 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID import attr from psycopg2.errors import CardinalityViolation from psycopg2.extensions import AsIs import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.scheduler.utils import utcnow from .exc import SchedulerException, StaleData, UnknownPolicy from .interface import ListedOriginPageToken, PaginatedListedOriginList from .model import ( LastVisitStatus, ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics, ) logger = logging.getLogger(__name__) def adapt_LastVisitStatus(v: LastVisitStatus): return AsIs(f"'{v.value}'::last_visit_status") psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(LastVisitStatus, adapt_LastVisitStatus) psycopg2.extras.register_uuid() def format_query(query, keys): """Format a query with the given keys""" query_keys = ", ".join(keys) placeholders = ", ".join(["%s"] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) class SchedulerBackend: """Backend for the Software Heritage scheduling database. """ def __init__(self, db, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = BaseDb(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() task_type_keys = [ "type", "description", "backend_name", "default_interval", "min_interval", "max_interval", "backoff_factor", "max_queue_length", "num_retries", "retry_delay", ] @db_transaction() def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] query = format_query( """insert into task_type ({keys}) values ({placeholders}) on conflict do nothing""", keys, ) cur.execute(query, [task_type[key] for key in keys]) @db_transaction() def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" query = format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cur.execute(query, (task_type_name,)) return cur.fetchone() @db_transaction() def get_task_types(self, db=None, cur=None): """Retrieve all registered task types""" query = format_query("select {keys} from task_type", self.task_type_keys,) cur.execute(query) return cur.fetchall() @db_transaction() def get_listers(self, db=None, cur=None) -> List[Lister]: """Retrieve information about all listers from the database. """ select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers """ cur.execute(query) return [Lister(**ret) for ret in cur.fetchall()] @db_transaction() def get_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Optional[Lister]: """Retrieve information about the given instance of the lister from the database. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers where (name, instance_name) = (%s, %s) """ cur.execute(query, (name, instance_name)) ret = cur.fetchone() if not ret: return None return Lister(**ret) @db_transaction() def get_or_create_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) insert_cols, insert_meta = ( ", ".join(tup) for tup in Lister.insert_columns_and_metavars() ) query = f""" with added as ( insert into listers ({insert_cols}) values ({insert_meta}) on conflict do nothing returning {select_cols} ) select {select_cols} from added union all select {select_cols} from listers where (name, instance_name) = (%(name)s, %(instance_name)s); """ cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name))) return Lister(**cur.fetchone()) @db_transaction() def update_lister(self, lister: Lister, db=None, cur=None) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ select_cols = ", ".join(Lister.select_columns()) set_vars = ", ".join( f"{col} = {meta}" for col, meta in zip(*Lister.insert_columns_and_metavars()) ) query = f"""update listers set {set_vars} where id=%(id)s and updated=%(updated)s returning {select_cols}""" cur.execute(query, attr.asdict(lister)) updated = cur.fetchone() if not updated: raise StaleData("Stale data; Lister state not updated") return Lister(**updated) @db_transaction() def record_listed_origins( self, listed_origins: Iterable[ListedOrigin], db=None, cur=None ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ pk_cols = ListedOrigin.primary_key_columns() select_cols = ListedOrigin.select_columns() insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars() upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols) query = f"""INSERT into listed_origins ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} RETURNING {", ".join(select_cols)} """ ret = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=(attr.asdict(origin) for origin in listed_origins), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=True, ) return [ListedOrigin(**d) for d in ret] @db_transaction() def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, db=None, cur=None, ) -> PaginatedListedOriginList: """Get information on the listed origins matching either the `url` or `lister_id`, or both arguments. """ query_filters: List[str] = [] query_params: List[Union[int, str, UUID, Tuple[UUID, str]]] = [] if lister_id: query_filters.append("lister_id = %s") query_params.append(lister_id) if url is not None: query_filters.append("url = %s") query_params.append(url) if page_token is not None: query_filters.append("(lister_id, url) > %s") # the typeshed annotation for tuple() is too strict. query_params.append(tuple(page_token)) # type: ignore query_params.append(limit) select_cols = ", ".join(ListedOrigin.select_columns()) if query_filters: where_clause = "where %s" % (" and ".join(query_filters)) else: where_clause = "" query = f"""SELECT {select_cols} from listed_origins {where_clause} ORDER BY lister_id, url LIMIT %s""" cur.execute(query, tuple(query_params)) origins = [ListedOrigin(**d) for d in cur] if len(origins) == limit: page_token = (str(origins[-1].lister_id), origins[-1].url) else: page_token = None return PaginatedListedOriginList(origins, page_token) @db_transaction() def grab_next_visits( self, visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, db=None, cur=None, ) -> List[ListedOrigin]: if timestamp is None: timestamp = utcnow() origin_select_cols = ", ".join(ListedOrigin.select_columns()) query_args: List[Any] = [] where_clauses = [] # list of (name, query) handled as CTEs before the main query common_table_expressions: List[Tuple[str, str]] = [] # "NOT enabled" = the lister said the origin no longer exists where_clauses.append("enabled" if enabled else "not enabled") # Only schedule visits of the given type where_clauses.append("visit_type = %s") query_args.append(visit_type) if scheduled_cooldown: # Don't re-schedule visits if they're already scheduled but we haven't # recorded a result yet, unless they've been scheduled more than a week # ago (it probably means we've lost them in flight somewhere). where_clauses.append( """origin_visit_stats.last_scheduled IS NULL OR origin_visit_stats.last_scheduled < GREATEST( - %s - %s, + %s, origin_visit_stats.last_visit ) """ ) - query_args.append(timestamp) - query_args.append(scheduled_cooldown) + query_args.append(timestamp - scheduled_cooldown) if failed_cooldown: # Don't retry failed origins too often where_clauses.append( "origin_visit_stats.last_visit_status is distinct from 'failed' " - "or origin_visit_stats.last_visit < %s - %s" + "or origin_visit_stats.last_visit < %s" ) - query_args.append(timestamp) - query_args.append(failed_cooldown) + query_args.append(timestamp - failed_cooldown) if not_found_cooldown: # Don't retry not found origins too often where_clauses.append( "origin_visit_stats.last_visit_status is distinct from 'not_found' " - "or origin_visit_stats.last_visit < %s - %s" + "or origin_visit_stats.last_visit < %s" ) - query_args.append(timestamp) - query_args.append(not_found_cooldown) + query_args.append(timestamp - not_found_cooldown) if policy == "oldest_scheduled_first": order_by = "origin_visit_stats.last_scheduled NULLS FIRST" elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NULL") # order by increasing last_update (oldest first) where_clauses.append("listed_origins.last_update IS NOT NULL") order_by = "listed_origins.last_update" elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? # visited origins have a NOT NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") # ignore origins we have visited after the known last update where_clauses.append("listed_origins.last_update IS NOT NULL") where_clauses.append( "listed_origins.last_update > origin_visit_stats.last_successful" ) # order by decreasing visit lag order_by = ( "listed_origins.last_update - origin_visit_stats.last_successful DESC" ) elif policy == "origins_without_last_update": where_clauses.append("last_update IS NULL") order_by = ", ".join( [ # By default, sort using the queue position. If the queue # position is null, then the origin has never been visited, # which we want to handle first "origin_visit_stats.next_visit_queue_position nulls first", # Schedule unknown origins in the order we've seen them "listed_origins.first_seen", ] ) # fmt: off # This policy requires updating the global queue position for this # visit type common_table_expressions.append(("update_queue_position", """ INSERT INTO visit_scheduler_queue_position(visit_type, position) SELECT visit_type, COALESCE(MAX(next_visit_queue_position), now()) FROM selected_origins GROUP BY visit_type ON CONFLICT(visit_type) DO UPDATE SET position=GREATEST( visit_scheduler_queue_position.position, EXCLUDED.position ) """)) # fmt: on else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") if tablesample: table = "listed_origins tablesample SYSTEM (%s)" query_args.insert(0, tablesample) else: table = "listed_origins" if lister_uuid: where_clauses.append("lister_id = %s") query_args.append(lister_uuid) # fmt: off common_table_expressions.insert(0, ("selected_origins", f""" SELECT {origin_select_cols}, next_visit_queue_position FROM {table} LEFT JOIN origin_visit_stats USING (url, visit_type) WHERE ({") AND (".join(where_clauses)}) ORDER BY {order_by} LIMIT %s """)) # fmt: on query_args.append(count) # fmt: off common_table_expressions.append(("update_stats", """ INSERT INTO origin_visit_stats (url, visit_type, last_scheduled) SELECT url, visit_type, %s FROM selected_origins ON CONFLICT (url, visit_type) DO UPDATE SET last_scheduled = GREATEST( origin_visit_stats.last_scheduled, EXCLUDED.last_scheduled ) """)) # fmt: on query_args.append(timestamp) formatted_ctes = ",\n".join( f"{name} AS (\n{cte}\n)" for name, cte in common_table_expressions ) query = f""" WITH {formatted_ctes} SELECT {origin_select_cols} FROM selected_origins """ cur.execute(query, tuple(query_args)) return [ListedOrigin(**d) for d in cur] task_create_keys = [ "type", "arguments", "next_run", "policy", "status", "retries_left", "priority", ] task_keys = task_create_keys + ["id", "current_interval"] @db_transaction() def create_tasks(self, tasks, policy="recurring", db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cur.execute("select swh_scheduler_mktemp_task()") db.copy_to( tasks, "tmp_task", self.task_create_keys, default_values={"policy": policy, "status": "next_run_not_scheduled"}, cur=cur, ) query = format_query( "select {keys} from swh_scheduler_create_tasks_from_temp()", self.task_keys, ) cur.execute(query) return cur.fetchall() @db_transaction() def set_status_tasks( self, task_ids: List[int], status: str = "disabled", next_run: Optional[datetime.datetime] = None, db=None, cur=None, ): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args: List[Any] = [status] if next_run: query.append(", next_run = %s") args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) cur.execute("".join(query), args) @db_transaction() def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" return self.set_status_tasks(task_ids, db=db, cur=cur) @db_transaction() def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, db=None, cur=None, ): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append("id = %s") else: where.append("id in %s") task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append("type = %s") else: where.append("type in %s") task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append("status = %s") else: where.append("status in %s") status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append("priority = %s") else: priority = tuple(priority) where.append("priority in %s") args.append(priority) if policy: where.append("policy = %s") args.append(policy) if before: where.append("next_run <= %s") args.append(before) if after: where.append("next_run >= %s") args.append(after) query = "select * from task" if where: query += " where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" query = format_query("select {keys} from task where id in %s", self.task_keys) cur.execute(query, (tuple(task_ids),)) return cur.fetchall() @db_transaction() def peek_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_no_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def grab_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def peek_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_any_ready_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s", task_type, cur.rowcount) return cur.fetchall() @db_transaction() def grab_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_any_ready_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s", task_type, cur.rowcount) return cur.fetchall() task_run_create_keys = ["task", "backend_id", "scheduled", "metadata"] @db_transaction() def schedule_task_run( self, task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)", (task_id, backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cur.execute("select swh_scheduler_mktemp_task_run()") db.copy_to(task_runs, "tmp_task_run", self.task_run_create_keys, cur=cur) cur.execute("select swh_scheduler_schedule_task_run_from_temp()") @db_transaction() def start_task_run( self, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_start_task_run(%s, %s, %s)", (backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, db=None, cur=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_end_task_run(%s, %s, %s, %s)", (backend_id, status, metadata, timestamp), ) return cur.fetchone() @db_transaction() def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ assert not page_token or isinstance(page_token, str) last_id = -1 if page_token is None else int(page_token) tasks = [] cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit + 1), ) for row in cur: task = dict(row) # nested type index does not accept bare values # transform it as a dict to comply with this task["arguments"]["args"] = { i: v for i, v in enumerate(task["arguments"]["args"]) } kwargs = task["arguments"]["kwargs"] task["arguments"]["kwargs"] = json.dumps(kwargs) tasks.append(task) if len(tasks) >= limit + 1: # remains data, add pagination information result = { "tasks": tasks[:limit], "next_page_token": str(tasks[-1]["task_id"]), } else: result = {"tasks": tasks} return result @db_transaction() def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id["task_id"]) _task_run_ids.append(task_id["task_run_id"]) cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids), ) task_run_keys = [ "id", "task", "backend_id", "scheduled", "started", "ended", "metadata", "status", ] @db_transaction() def get_task_runs(self, task_ids, limit=None, db=None, cur=None): """Search task run for a task id""" where = [] args = [] if task_ids: if isinstance(task_ids, (str, int)): where.append("task = %s") else: where.append("task in %s") task_ids = tuple(task_ids) args.append(task_ids) else: return () query = "select * from task_run where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None ) -> None: pk_cols = OriginVisitStats.primary_key_columns() insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join( f"{col} = coalesce(EXCLUDED.{col}, ovi.{col})" for col in upsert_cols ) query = f""" INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} """ try: psycopg2.extras.execute_values( cur=cur, sql=query, argslist=( attr.asdict(visit_stats) for visit_stats in origin_visit_stats ), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=False, ) except CardinalityViolation as e: raise SchedulerException(repr(e)) @db_transaction() def origin_visit_stats_get( self, ids: Iterable[Tuple[str, str]], db=None, cur=None ) -> List[OriginVisitStats]: if not ids: return [] primary_keys = tuple((origin, visit_type) for (origin, visit_type) in ids) query = format_query( """ SELECT {keys} FROM (VALUES %s) as stats(url, visit_type) INNER JOIN origin_visit_stats USING (url, visit_type) """, OriginVisitStats.select_columns(), ) rows = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=primary_keys, fetch=True ) return [OriginVisitStats(**row) for row in rows] @db_transaction() def visit_scheduler_queue_position_get( self, db=None, cur=None, ) -> Dict[str, datetime.datetime]: cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position") return {row["visit_type"]: row["position"] for row in cur} @db_transaction() def visit_scheduler_queue_position_set( self, visit_type: str, position: datetime.datetime, db=None, cur=None, ) -> None: query = """ INSERT INTO visit_scheduler_queue_position(visit_type, position) VALUES(%s, %s) ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position """ cur.execute(query, (visit_type, position)) @db_transaction() def update_metrics( self, lister_id: Optional[UUID] = None, timestamp: Optional[datetime.datetime] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: """Update the performance metrics of this scheduler instance. Returns the updated metrics. Args: lister_id: if passed, update the metrics only for this lister instance timestamp: if passed, the date at which we're updating the metrics, defaults to the database NOW() """ query = format_query( "SELECT {keys} FROM update_metrics(%s, %s)", SchedulerMetrics.select_columns(), ) cur.execute(query, (lister_id, timestamp)) return [SchedulerMetrics(**row) for row in cur.fetchall()] @db_transaction() def get_metrics( self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: """Retrieve the performance metrics of this scheduler instance. Args: lister_id: filter the metrics for this lister instance only visit_type: filter the metrics for this visit type only """ where_filters = [] where_args = [] if lister_id: where_filters.append("lister_id = %s") where_args.append(str(lister_id)) if visit_type: where_filters.append("visit_type = %s") where_args.append(visit_type) where_clause = "" if where_filters: where_clause = f"where {' and '.join(where_filters)}" query = format_query( "SELECT {keys} FROM scheduler_metrics %s" % where_clause, SchedulerMetrics.select_columns(), ) cur.execute(query, tuple(where_args)) return [SchedulerMetrics(**row) for row in cur.fetchall()] diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py deleted file mode 100644 index 9306fa0..0000000 --- a/swh/scheduler/celery_backend/listener.py +++ /dev/null @@ -1,220 +0,0 @@ -# Copyright (C) 2015-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -import logging -import sys -import time - -import celery -from celery.events import EventReceiver -import click -from kombu import Queue - -from swh.core.statsd import statsd -from swh.scheduler.utils import utcnow - - -class ReliableEventReceiver(EventReceiver): - def __init__( - self, - channel, - handlers=None, - routing_key="#", - node_id=None, - app=None, - queue_prefix="celeryev", - accept=None, - ): - super(ReliableEventReceiver, self).__init__( - channel, handlers, routing_key, node_id, app, queue_prefix, accept - ) - - self.queue = Queue( - ".".join([self.queue_prefix, self.node_id]), - exchange=self.exchange, - routing_key=self.routing_key, - auto_delete=False, - durable=True, - ) - - def get_consumers(self, consumer, channel): - return [ - consumer( - queues=[self.queue], - callbacks=[self._receive], - no_ack=False, - accept=self.accept, - ) - ] - - def _receive(self, bodies, message): - if not isinstance(bodies, list): # celery<4 returned body as element - bodies = [bodies] - for body in bodies: - type, body = self.event_from_message(body) - self.process(type, body, message) - - def process(self, type, event, message): - """Process the received event by dispatching it to the appropriate - handler.""" - handler = self.handlers.get(type) or self.handlers.get("*") - if handler: - handler(event, message) - statsd.increment( - "swh_scheduler_listener_handled_event_total", tags={"event_type": type} - ) - - -ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) -ACTION_QUEUE_MAX_LENGTH = 1000 - - -def event_monitor(app, backend): - logger = logging.getLogger("swh.scheduler.listener") - actions = { - "last_send": utcnow() - 2 * ACTION_SEND_DELAY, - "queue": [], - } - - def try_perform_actions(actions=actions): - logger.debug("Try perform pending actions") - if actions["queue"] and ( - len(actions["queue"]) > ACTION_QUEUE_MAX_LENGTH - or utcnow() - actions["last_send"] > ACTION_SEND_DELAY - ): - perform_actions(actions) - - def perform_actions(actions, backend=backend): - logger.info("Perform %s pending actions" % len(actions["queue"])) - action_map = { - "start_task_run": backend.start_task_run, - "end_task_run": backend.end_task_run, - } - - messages = [] - db = backend.get_db() - try: - cursor = db.cursor(None) - for action in actions["queue"]: - messages.append(action["message"]) - function = action_map[action["action"]] - args = action.get("args", ()) - kwargs = action.get("kwargs", {}) - kwargs["cur"] = cursor - function(*args, **kwargs) - - except Exception: - db.conn.rollback() - else: - db.conn.commit() - finally: - backend.put_db(db) - - for message in messages: - if not message.acknowledged: - message.ack() - actions["queue"] = [] - actions["last_send"] = utcnow() - - def queue_action(action, actions=actions): - actions["queue"].append(action) - try_perform_actions() - - def catchall_event(event, message): - logger.debug("event: %s %s", event["type"], event.get("name", "N/A")) - if not message.acknowledged: - message.ack() - try_perform_actions() - - def task_started(event, message): - logger.debug("task_started: %s %s", event["type"], event.get("name", "N/A")) - - queue_action( - { - "action": "start_task_run", - "args": [event["uuid"]], - "kwargs": { - "timestamp": utcnow(), - "metadata": {"worker": event["hostname"],}, - }, - "message": message, - } - ) - - def task_succeeded(event, message): - logger.debug("task_succeeded: event: %s" % event) - logger.debug(" message: %s" % message) - result = event["result"] - - logger.debug("task_succeeded: result: %s" % result) - try: - status = result.get("status") - if status == "success": - status = "eventful" if result.get("eventful") else "uneventful" - except Exception: - status = "eventful" if result else "uneventful" - - queue_action( - { - "action": "end_task_run", - "args": [event["uuid"]], - "kwargs": {"timestamp": utcnow(), "status": status, "result": result,}, - "message": message, - } - ) - - def task_failed(event, message): - logger.debug("task_failed: event: %s" % event) - logger.debug(" message: %s" % message) - - queue_action( - { - "action": "end_task_run", - "args": [event["uuid"]], - "kwargs": {"timestamp": utcnow(), "status": "failed",}, - "message": message, - } - ) - - recv = ReliableEventReceiver( - celery.current_app.connection(), - app=celery.current_app, - handlers={ - "task-started": task_started, - "task-result": task_succeeded, - "task-failed": task_failed, - "*": catchall_event, - }, - node_id="listener", - ) - - errors = 0 - while True: - try: - recv.capture(limit=None, timeout=None, wakeup=True) - errors = 0 - except KeyboardInterrupt: - logger.exception("Keyboard interrupt, exiting") - break - except Exception: - logger.exception("Unexpected exception") - if errors < 5: - time.sleep(errors) - errors += 1 - else: - logger.error("Too many consecutive errors, exiting") - sys.exit(1) - - -@click.command() -@click.pass_context -def main(ctx): - click.echo("Deprecated! Use 'swh-scheduler listener' instead.", err=True) - ctx.exit(1) - - -if __name__ == "__main__": - main() diff --git a/swh/scheduler/celery_backend/pika_listener.py b/swh/scheduler/celery_backend/pika_listener.py index 7df7bec..d895d5d 100644 --- a/swh/scheduler/celery_backend/pika_listener.py +++ b/swh/scheduler/celery_backend/pika_listener.py @@ -1,98 +1,107 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +"""This is the scheduler listener. It is in charge of listening to rabbitmq events (the +task result) and flushes the "oneshot" tasks' status in the scheduler backend. It's the +final step after a task is done. + +The scheduler runner :mod:`swh.scheduler.celery_backend.runner` is the module in charge +of pushing tasks in the queue. + +""" + import json import logging import sys import pika from swh.core.statsd import statsd from swh.scheduler import get_scheduler from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) def get_listener(broker_url, queue_name, scheduler_backend): connection = pika.BlockingConnection(pika.URLParameters(broker_url)) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) exchange = "celeryev" routing_key = "#" channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key) channel.basic_qos(prefetch_count=1000) channel.basic_consume( queue=queue_name, on_message_callback=get_on_message(scheduler_backend), ) return channel def get_on_message(scheduler_backend): def on_message(channel, method_frame, properties, body): try: events = json.loads(body) except Exception: logger.warning("Could not parse body %r", body) events = [] if not isinstance(events, list): events = [events] for event in events: logger.debug("Received event %r", event) process_event(event, scheduler_backend) channel.basic_ack(delivery_tag=method_frame.delivery_tag) return on_message def process_event(event, scheduler_backend): uuid = event.get("uuid") if not uuid: return event_type = event["type"] statsd.increment( "swh_scheduler_listener_handled_event_total", tags={"event_type": event_type} ) if event_type == "task-started": scheduler_backend.start_task_run( uuid, timestamp=utcnow(), metadata={"worker": event.get("hostname")}, ) elif event_type == "task-result": result = event["result"] status = None if isinstance(result, dict) and "status" in result: status = result["status"] if status == "success": status = "eventful" if result.get("eventful") else "uneventful" if status is None: status = "eventful" if result else "uneventful" scheduler_backend.end_task_run( uuid, timestamp=utcnow(), status=status, result=result ) elif event_type == "task-failed": scheduler_backend.end_task_run(uuid, timestamp=utcnow(), status="failed") if __name__ == "__main__": url = sys.argv[1] logging.basicConfig(level=logging.DEBUG) scheduler_backend = get_scheduler("local", args={"db": "service=swh-scheduler"}) channel = get_listener(url, "celeryev.test", scheduler_backend) logger.info("Start consuming") channel.start_consuming() diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py new file mode 100644 index 0000000..04d2045 --- /dev/null +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -0,0 +1,301 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This schedules the recurrent visits, for listed origins, in Celery. + +For "oneshot" (save code now, lister) tasks, check the +:mod:`swh.scheduler.celery_backend.runner` and +:mod:`swh.scheduler.celery_backend.pika_listener` modules. + +""" + +from __future__ import annotations + +from itertools import chain +import logging +from queue import Empty, Queue +import random +from threading import Thread +import time +from typing import TYPE_CHECKING, Any, Dict, List, Tuple + +from kombu.utils.uuid import uuid + +from swh.scheduler.celery_backend.config import get_available_slots + +if TYPE_CHECKING: + from ..interface import SchedulerInterface + from ..model import ListedOrigin + +logger = logging.getLogger(__name__) + + +_VCS_POLICY_RATIOS = { + "already_visited_order_by_lag": 0.49, + "never_visited_oldest_update_first": 0.49, + "origins_without_last_update": 0.02, +} + +# Default policy ratio, let's start that configuration in the module first +POLICY_RATIO: Dict[str, Dict[str, float]] = { + "default": { + "already_visited_order_by_lag": 0.5, + "never_visited_oldest_update_first": 0.5, + }, + "git": _VCS_POLICY_RATIOS, + "hg": _VCS_POLICY_RATIOS, + "svn": _VCS_POLICY_RATIOS, + "cvs": _VCS_POLICY_RATIOS, + "bzr": _VCS_POLICY_RATIOS, +} + + +MIN_SLOTS_RATIO = 0.05 +"""Quantity of slots that need to be available (with respect to max_queue_length) for +`grab_next_visits` to trigger""" + +QUEUE_FULL_BACKOFF = 60 +"""Backoff time (in seconds) if there's fewer than `MIN_SLOTS_RATIO` slots available in +the queue.""" + +NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 +"""Backoff time (in seconds) if no origins have been scheduled in the current +iteration""" + +BACKOFF_SPLAY = 5.0 +"""Amplitude of the fuzziness between backoffs""" + +TERMINATE = object() +"""Termination request received from command queue (singleton used for identity +comparison)""" + + +def grab_next_visits_policy_ratio( + scheduler: SchedulerInterface, visit_type: str, num_visits: int +) -> List[ListedOrigin]: + """Get the next `num_visits` for the given `visit_type` using the corresponding + set of scheduling policies. + + The `POLICY_RATIO` dict sets, for each visit type, the scheduling policies + used to pull the next tasks, and what proportion of the available num_visits + they take. + + This function emits a warning if the ratio of retrieved origins is off of + the requested ratio by more than 5%. + + Returns: + at most `num_visits` `ListedOrigin` objects + """ + policy_ratio = POLICY_RATIO.get(visit_type, POLICY_RATIO["default"]) + + fetched_origins: Dict[str, List[ListedOrigin]] = {} + + for policy, ratio in policy_ratio.items(): + num_tasks_to_send = int(num_visits * ratio) + fetched_origins[policy] = scheduler.grab_next_visits( + visit_type, num_tasks_to_send, policy=policy + ) + + all_origins: List[ListedOrigin] = list( + chain.from_iterable(fetched_origins.values()) + ) + if not all_origins: + return [] + + # Check whether the ratios of origins fetched are skewed with respect to the + # ones we requested + fetched_origin_ratios = { + policy: len(origins) / len(all_origins) + for policy, origins in fetched_origins.items() + } + + for policy, expected_ratio in policy_ratio.items(): + # 5% of skew with respect to request + if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: + logger.info( + "Skewed fetch for visit type %s with policy %s: fetched %s, " + "requested %s", + visit_type, + policy, + fetched_origin_ratios[policy], + expected_ratio, + ) + + return all_origins + + +def splay(): + """Return a random short interval by which to vary the backoffs for the visit + scheduling threads""" + return random.uniform(0, BACKOFF_SPLAY) + + +def send_visits_for_visit_type( + scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, +) -> float: + """Schedule the next batch of visits for the given `visit_type`. + + First, we determine the number of available slots by introspecting the + RabbitMQ queue. + + If there's fewer than `MIN_SLOTS_RATIO` slots available in the queue, we wait + for `QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive + `grab_next_visits` queries when there's not many jobs to queue. + + Once there's more than `MIN_SLOTS_RATIO` slots available, we run + `get_next_visits` to retrieve the next set of origin visits to schedule, and + we send them to celery. + + If the last scheduling attempt didn't return any origins, we sleep for + `NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive + `grab_next_visits` queries too often if there's nothing left to schedule. + + Returns: + the earliest `time.monotonic` value at which to run the next iteration of + the loop. + + """ + queue_name = task_type["backend_name"] + max_queue_length = task_type.get("max_queue_length") or 0 + min_available_slots = max_queue_length * MIN_SLOTS_RATIO + + current_iteration_start = time.monotonic() + + # Check queue level + available_slots = get_available_slots(app, queue_name, max_queue_length) + logger.debug( + "%s available slots for visit type %s in queue %s", + available_slots, + visit_type, + queue_name, + ) + if available_slots < min_available_slots: + return current_iteration_start + QUEUE_FULL_BACKOFF + + origins = grab_next_visits_policy_ratio(scheduler, visit_type, available_slots) + + if not origins: + logger.debug("No origins to visit for type %s", visit_type) + return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF + + # Try to smooth the ingestion load, origins pulled by different + # scheduling policies have different resource usage patterns + random.shuffle(origins) + + for origin in origins: + task_dict = origin.as_task_dict() + app.send_task( + queue_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + logger.info( + "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, + ) + + # When everything worked, we can try to schedule origins again ASAP. + return time.monotonic() + + +def visit_scheduler_thread( + config: Dict, + visit_type: str, + command_queue: Queue[object], + exc_queue: Queue[Tuple[str, BaseException]], +): + """Target function for the visit sending thread, which initializes local + connections and handles exceptions by sending them back to the main thread.""" + + from swh.scheduler import get_scheduler + from swh.scheduler.celery_backend.config import build_app + + try: + # We need to reinitialize these connections because they're not generally + # thread-safe + app = build_app(config.get("celery")) + scheduler = get_scheduler(**config["scheduler"]) + task_type = scheduler.get_task_type(f"load-{visit_type}") + + if task_type is None: + raise ValueError(f"Unknown task type: load-{visit_type}") + + next_iteration = time.monotonic() + + while True: + # vary the next iteration time a little bit + next_iteration = next_iteration + splay() + while time.monotonic() < next_iteration: + # Wait for next iteration to start. Listen for termination message. + try: + msg = command_queue.get(block=True, timeout=1) + except Empty: + continue + + if msg is TERMINATE: + return + else: + logger.warn("Received unexpected message %s in command queue", msg) + + next_iteration = send_visits_for_visit_type( + scheduler, app, visit_type, task_type + ) + + except BaseException as e: + exc_queue.put((visit_type, e)) + + +VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] +"""Dict storing the visit scheduler threads and their command queues""" + + +def spawn_visit_scheduler_thread( + threads: VisitSchedulerThreads, + exc_queue: Queue[Tuple[str, BaseException]], + config: Dict[str, Any], + visit_type: str, +): + """Spawn a new thread to schedule the visits of type `visit_type`.""" + command_queue: Queue[object] = Queue() + thread = Thread( + target=visit_scheduler_thread, + kwargs={ + "config": config, + "visit_type": visit_type, + "command_queue": command_queue, + "exc_queue": exc_queue, + }, + ) + threads[visit_type] = (thread, command_queue) + thread.start() + + +def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: + """Terminate all visit scheduler threads""" + logger.info("Termination requested...") + for _, command_queue in threads.values(): + command_queue.put(TERMINATE) + + loops = 0 + while threads and loops < 10: + logger.info( + "Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) + ) + loops += 1 + for visit_type, (thread, _) in list(threads.items()): + thread.join(timeout=1) + if not thread.is_alive(): + logger.debug("Thread %s terminated", visit_type) + del threads[visit_type] + + if threads: + logger.warn( + "Could not reap the following threads after 10 attempts: %s", + ", ".join(sorted(threads)), + ) + + return list(sorted(threads)) diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index 52ce42a..53b2e25 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,169 +1,180 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +"""This is the first scheduler runner. It is in charge of scheduling "oneshot" tasks +(e.g save code now, indexer, vault, deposit, ...). To do this, it reads tasks ouf of the +scheduler backend and pushes those to their associated rabbitmq queues. + +The scheduler listener :mod:`swh.scheduler.celery_backend.pika_listener` is the module +in charge of finalizing the task results. + +""" + import logging from typing import Dict, List, Tuple +from deprecated import deprecated from kombu.utils.uuid import uuid from swh.core.statsd import statsd from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import get_available_slots from swh.scheduler.interface import SchedulerInterface from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks( backend: SchedulerInterface, app, task_types: List[Dict] = [], with_priority: bool = False, ) -> List[Dict]: """Schedule tasks ready to be scheduled. This lookups any tasks per task type and mass schedules those accordingly (send messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler backend). If tasks (per task type) with priority exist, they will get redirected to dedicated high priority queue (standard queue name prefixed with `save_code_now:`). Args: backend: scheduler backend to interact with (read/update tasks) app (App): Celery application to send tasks to task_types: The list of task types dict to iterate over. By default, empty. When empty, the full list of task types referenced in the scheduler will be used. with_priority: If True, only tasks with priority set will be fetched and scheduled. By default, False. Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks: List[Dict] = [] while True: if not task_types: task_types = backend.get_task_types() task_types_d = {} pending_tasks = [] for task_type in task_types: task_type_name = task_type["type"] task_types_d[task_type_name] = task_type max_queue_length = task_type["max_queue_length"] if max_queue_length is None: max_queue_length = 0 backend_name = task_type["backend_name"] if with_priority: # grab max_queue_length (or 10) potential tasks with any priority for # the same type (limit the result to avoid too long running queries) grabbed_priority_tasks = backend.grab_ready_priority_tasks( task_type_name, num_tasks=max_queue_length or 10 ) if grabbed_priority_tasks: pending_tasks.extend(grabbed_priority_tasks) logger.info( "Grabbed %s tasks %s (priority)", len(grabbed_priority_tasks), task_type_name, ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", len(grabbed_priority_tasks), tags={"task_type": task_type_name}, ) else: num_tasks = get_available_slots(app, backend_name, max_queue_length) # only pull tasks if the buffer is at least 1/5th empty (= 80% # full), to help postgresql use properly indexed queries. if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: # Only grab num_tasks tasks with no priority grabbed_tasks = backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks ) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) logger.info( "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", len(grabbed_tasks), tags={"task_type": task_type_name}, ) if not pending_tasks: return all_backend_tasks backend_tasks = [] celery_tasks: List[Tuple[bool, str, str, List, Dict]] = [] for task in pending_tasks: args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] backend_name = task_types_d[task["type"]]["backend_name"] backend_id = uuid() celery_tasks.append( ( task.get("priority") is not None, backend_name, backend_id, args, kwargs, ) ) data = { "task": task["id"], "backend_id": backend_id, "scheduled": utcnow(), } backend_tasks.append(data) logger.debug("Sent %s celery tasks", len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) for with_priority, backend_name, backend_id, args, kwargs in celery_tasks: kw = dict(task_id=backend_id, args=args, kwargs=kwargs,) if with_priority: kw["queue"] = f"save_code_now:{backend_name}" app.send_task(backend_name, **kw) all_backend_tasks.extend(backend_tasks) +@deprecated(version="0.18", reason="Use `swh scheduler start-runner` instead") def main(): from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler("local") try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py index 0becbe8..861651e 100644 --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -1,137 +1,226 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import time +from typing import List, Tuple import click from . import cli @cli.command("start-runner") @click.option( "--period", "-p", default=0, help=( "Period (in s) at witch pending tasks are checked and " "executed. Set to 0 (default) for a one shot." ), ) @click.option( "--task-type", "task_type_names", multiple=True, default=[], help=( "Task types to schedule. If not provided, this iterates over every " "task types referenced in the scheduler backend." ), ) @click.option( "--with-priority/--without-priority", is_flag=True, default=False, help=( "Determine if those tasks should be the ones with priority or not." "By default, this deals with tasks without any priority." ), ) @click.pass_context def runner(ctx, period, task_type_names, with_priority): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.config import build_app from swh.scheduler.celery_backend.runner import run_ready_tasks config = ctx.obj["config"] app = build_app(config.get("celery")) app.set_current() logger = logging.getLogger(__name__ + ".runner") scheduler = ctx.obj["scheduler"] - logger.debug("Scheduler %s" % scheduler) + logger.debug("Scheduler %s", scheduler) task_types = [] for task_type_name in task_type_names: task_type = scheduler.get_task_type(task_type_name) if not task_type: raise ValueError(f"Unknown {task_type_name}") task_types.append(task_type) try: while True: logger.debug("Run ready tasks") try: ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) if ntasks: logger.info("Scheduled %s tasks", ntasks) except Exception: logger.exception("Unexpected error in run_ready_tasks()") if not period: break time.sleep(period) except KeyboardInterrupt: ctx.exit(0) @cli.command("start-listener") @click.pass_context def listener(ctx): """Starts a swh-scheduler listener service. This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" scheduler_backend = ctx.obj["scheduler"] if not scheduler_backend: raise ValueError("Scheduler class (local/remote) must be instantiated") broker = ( ctx.obj["config"] .get("celery", {}) .get("task_broker", "amqp://guest@localhost/%2f") ) from swh.scheduler.celery_backend.pika_listener import get_listener listener = get_listener(broker, "celeryev.listener", scheduler_backend) try: listener.start_consuming() finally: listener.stop_consuming() +@cli.command("schedule-recurrent") +@click.option( + "--visit-type", + "visit_types", + multiple=True, + default=[], + help=( + "Visit types to schedule. If not provided, this iterates over every " + "corresponding load task types referenced in the scheduler backend." + ), +) +@click.pass_context +def schedule_recurrent(ctx, visit_types: List[str]): + """Starts the scheduler for recurrent visits. + + This runs one thread for each visit type, which regularly sends new visits + to celery. + + """ + from queue import Queue + + from swh.scheduler.celery_backend.recurrent_visits import ( + VisitSchedulerThreads, + logger, + spawn_visit_scheduler_thread, + terminate_visit_scheduler_threads, + ) + + config = ctx.obj["config"] + scheduler = ctx.obj["scheduler"] + + if not visit_types: + visit_types = [] + # Figure out which visit types exist in the scheduler + all_task_types = scheduler.get_task_types() + for task_type in all_task_types: + if not task_type["type"].startswith("load-"): + # only consider loading tasks as recurring ones, the rest is dismissed + continue + # get visit type name from task type + visit_types.append(task_type["type"][5:]) + else: + # Check that the passed visit types exist in the scheduler + for visit_type in visit_types: + task_type_name = f"load-{visit_type}" + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown task type: {task_type_name}") + + exc_queue: Queue[Tuple[str, BaseException]] = Queue() + threads: VisitSchedulerThreads = {} + + try: + # Spawn initial threads + for visit_type in visit_types: + spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type) + + # Handle exceptions from child threads + while True: + visit_type, exc_info = exc_queue.get(block=True) + + logger.exception( + "Thread %s died with exception; respawning", + visit_type, + exc_info=exc_info, + ) + + dead_thread = threads[visit_type][0] + dead_thread.join(timeout=1) + + if dead_thread.is_alive(): + logger.warn( + "The thread for %s is still alive after sending an exception?! " + "Respawning anyway.", + visit_type, + ) + + spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type) + + except SystemExit: + remaining_threads = terminate_visit_scheduler_threads(threads) + if remaining_threads: + ctx.exit(1) + ctx.exit(0) + + @cli.command("rpc-serve") @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") @click.option("--port", default=5008, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=None, help=( "Indicates if the server should run in debug mode. " "Defaults to True if log-level is DEBUG, False otherwise." ), ) @click.pass_context def rpc_server(ctx, host, port, debug): """Starts a swh-scheduler API HTTP server. """ if ctx.obj["config"]["scheduler"]["cls"] == "remote": click.echo( "The API server can only be started with a 'local' " "configuration", err=True, ) ctx.exit(1) from swh.scheduler.api import server server.app.config.update(ctx.obj["config"]) if debug is None: debug = ctx.obj["log_level"] <= logging.DEBUG server.app.run(host, port=port, debug=bool(debug)) diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 775b38a..4872516 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,62 +1,72 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os from typing import Dict, List +from unittest.mock import patch import pytest from swh.scheduler.model import ListedOrigin, Lister from swh.scheduler.tests.common import LISTERS # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith("CELERY")]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ["LC_ALL"] = "C.UTF-8" @pytest.fixture def stored_lister(swh_scheduler) -> Lister: """Store a lister in the scheduler and return its information""" return swh_scheduler.get_or_create_lister(**LISTERS[0]) @pytest.fixture def visit_types() -> List[str]: """Possible visit types in `ListedOrigin`s""" return ["git", "svn"] @pytest.fixture def listed_origins_by_type( stored_lister: Lister, visit_types: List[str] ) -> Dict[str, List[ListedOrigin]]: """A fixed list of `ListedOrigin`s, for each `visit_type`.""" count_per_type = 1000 assert stored_lister.id return { visit_type: [ ListedOrigin( lister_id=stored_lister.id, url=f"https://{visit_type}.example.com/{i:04d}", visit_type=visit_type, last_update=datetime( 2020, 6, 15, 16, 0, 0, j * count_per_type + i, tzinfo=timezone.utc ), ) for i in range(count_per_type) ] for j, visit_type in enumerate(visit_types) } @pytest.fixture def listed_origins(listed_origins_by_type) -> List[ListedOrigin]: """Return a (fixed) set of listed origins""" return sum(listed_origins_by_type.values(), []) + + +@pytest.fixture +def storage(swh_storage): + """An instance of in-memory storage that gets injected + into the CLI functions.""" + with patch("swh.storage.get_storage") as get_storage_mock: + get_storage_mock.return_value = swh_storage + yield swh_storage diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py index 044a1d8..62b83fb 100644 --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -1,856 +1,847 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from itertools import islice import logging import random import re import tempfile from unittest.mock import patch from click.testing import CliRunner import pytest from swh.core.api.classes import stream_results from swh.model.model import Origin from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict, utcnow CLI_CONFIG = """ scheduler: cls: foo args: {} """ def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch( "swh.scheduler.get_scheduler" ) as get_scheduler_mock, tempfile.NamedTemporaryFile( "a", suffix=".yml" ) as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler args = ["-C" + config_fd.name,] + args result = runner.invoke(cli, args, obj={"log_level": logging.WARNING}) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_schedule_tasks(swh_scheduler): csv_data = ( b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" ) with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(csv_data) csv_fd.seek(0) result = invoke( swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name] ) expected = r""" Created 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg1', 'arg2'\] Keyword args: key: 'value' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg3', 'arg4'\] Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_tasks_columns(swh_scheduler): with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n') csv_fd.seek(0) result = invoke( swh_scheduler, False, [ "task", "schedule", "-c", "type", "-c", "policy", "-c", "args", "-c", "kwargs", "-d", ";", csv_fd.name, ], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_task(swh_scheduler): result = invoke( swh_scheduler, False, ["task", "add", "swh-test-ping", "arg1", "arg2", "key=value",], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_none(swh_scheduler): result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 1 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter(swh_scheduler): task = create_task_dict("swh-test-multiping", "oneshot", key="value") swh_scheduler.create_tasks([task]) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter_2(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-multiping", "oneshot", key="value"), create_task_dict("swh-test-ping", "oneshot", key="value2"), ] ) result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",]) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # Fails because "task list-pending --limit 3" only returns 2 tasks, because # of how compute_nb_tasks_from works. @pytest.mark.xfail def test_list_pending_tasks_limit(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-ping", "oneshot", key="value%d" % i) for i in range(10) ] ) result = invoke( swh_scheduler, False, ["task", "list-pending", "swh-test-ping", "--limit", "3",] ) expected = r""" Found 2 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value0' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3) task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: tomorrow \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke(swh_scheduler, False, ["task", "list",]) expected = r""" Found 2 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, ["task", "list", "--task-id", "2",]) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id_2(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"] ) expected = r""" Found 2 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_type(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"] ) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_limit(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, ["task", "list", "--limit", "2",]) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_after(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--after", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def _fill_storage_with_origins(storage, nb_origins): origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)] storage.origin_add(origins) return origins -@pytest.fixture -def storage(swh_storage): - """An instance of in-memory storage that gets injected - into the CLI functions.""" - with patch("swh.storage.get_storage") as get_storage_mock: - get_storage_mock.return_value = swh_storage - yield swh_storage - - @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins_dry_run(swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a divisor of nb_origins.""" _fill_storage_with_origins(storage, 90) result = invoke( swh_scheduler, False, ["task", "schedule_origins", "--dry-run", "swh-test-ping",], ) # Check the output expected = r""" Scheduled 3 tasks \(30 origins\). Scheduled 6 tasks \(60 origins\). Scheduled 9 tasks \(90 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check scheduled tasks tasks = swh_scheduler.search_tasks() assert len(tasks) == 0 def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins): # check there are not too many tasks assert len(tasks) <= max_tasks # check tasks are not too large assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks) # check the tasks are exhaustive assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len( expected_origins ) assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == { origin.url for origin in expected_origins } @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins(swh_scheduler, storage): """Tests the scheduling when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" origins = _fill_storage_with_origins(storage, 70) result = invoke( swh_scheduler, False, ["task", "schedule_origins", "swh-test-ping", "--batch-size", "20",], ) # Check the output expected = r""" Scheduled 3 tasks \(60 origins\). Scheduled 4 tasks \(70 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 4, 20, origins) assert all(task["arguments"]["kwargs"] == {} for task in tasks) def test_task_schedule_origins_kwargs(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" origins = _fill_storage_with_origins(storage, 30) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", "20", 'key1="value1"', 'key2="value2"', ], ) # Check the output expected = r""" Scheduled 2 tasks \(30 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 2, 20, origins) assert all( task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} for task in tasks ) def test_task_schedule_origins_with_limit(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" _fill_storage_with_origins(storage, 50) limit = 20 expected_origins = list(islice(stream_results(storage.origin_list), limit)) nb_origins = len(expected_origins) assert nb_origins == limit max_task_size = 5 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 # made the numbers go round result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--limit", limit, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_task_schedule_origins_with_page_token(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" nb_total_origins = 50 origins = _fill_storage_with_origins(storage, nb_total_origins) # prepare page_token and origins result expectancy page_result = storage.origin_list(limit=10) assert len(page_result.results) == 10 page_token = page_result.next_page_token assert page_token is not None # remove the first 10 origins listed as we won't see those in tasks expected_origins = [o for o in origins if o not in page_result.results] nb_origins = len(expected_origins) assert nb_origins == nb_total_origins - len(page_result.results) max_task_size = 10 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--page-token", page_token, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_cli_task_runner_unknown_task_types(swh_scheduler, storage): """When passing at least one unknown task type, the runner should fail.""" task_types = swh_scheduler.get_task_types() task_type_names = [t["type"] for t in task_types] known_task_type = random.choice(task_type_names) unknown_task_type = "unknown-task-type" assert unknown_task_type not in task_type_names with pytest.raises(ValueError, match="Unknown"): invoke( swh_scheduler, False, [ "start-runner", "--task-type", known_task_type, "--task-type", unknown_task_type, ], ) @pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"]) def test_cli_task_runner_with_known_tasks( swh_scheduler, storage, caplog, flag_priority ): """Trigger runner with known tasks runs smoothly.""" task_types = swh_scheduler.get_task_types() task_type_names = [t["type"] for t in task_types] task_type_name = random.choice(task_type_names) task_type_name2 = random.choice(task_type_names) # The runner will just iterate over the following known tasks and do noop. We are # just checking the runner does not explode here. result = invoke( swh_scheduler, False, [ "start-runner", flag_priority, "--task-type", task_type_name, "--task-type", task_type_name2, ], ) assert result.exit_code == 0, result.output def test_cli_task_runner_no_task(swh_scheduler, storage): """Trigger runner with no parameter should run as before.""" # The runner will just iterate over the existing tasks from the scheduler and do # noop. We are just checking the runner does not explode here. result = invoke(swh_scheduler, False, ["start-runner",],) assert result.exit_code == 0, result.output diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py new file mode 100644 index 0000000..d118fbf --- /dev/null +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -0,0 +1,180 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import timedelta +import logging +from queue import Queue +from unittest.mock import MagicMock + +import pytest + +from swh.scheduler.celery_backend.recurrent_visits import ( + VisitSchedulerThreads, + send_visits_for_visit_type, + spawn_visit_scheduler_thread, + terminate_visit_scheduler_threads, + visit_scheduler_thread, +) + +from .test_cli import invoke + +TEST_MAX_QUEUE = 10000 +MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits" + + +def _compute_backend_name(visit_type: str) -> str: + "Build a dummy reproducible backend name" + return f"swh.loader.{visit_type}.tasks" + + +@pytest.fixture +def swh_scheduler(swh_scheduler): + """Override default fixture of the scheduler to install some more task types.""" + for visit_type in ["git", "hg", "svn"]: + task_type = f"load-{visit_type}" + swh_scheduler.create_task_type( + { + "type": task_type, + "max_queue_length": TEST_MAX_QUEUE, + "description": "The {} testing task".format(task_type), + "backend_name": _compute_backend_name(visit_type), + "default_interval": timedelta(days=1), + "min_interval": timedelta(hours=6), + "max_interval": timedelta(days=12), + } + ) + return swh_scheduler + + +def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): + """When passed an unknown visit type, the recurrent visit scheduler should refuse + to start.""" + + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + False, + ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"], + ) + + +def test_cli_schedule_recurrent_noop(swh_scheduler, mocker): + """When passing no visit types, the recurrent visit scheduler should start.""" + + spawn_visit_scheduler_thread = mocker.patch( + f"{MODULE_NAME}.spawn_visit_scheduler_thread" + ) + spawn_visit_scheduler_thread.side_effect = SystemExit + # The actual scheduling threads won't spawn, they'll immediately terminate. This + # only exercises the logic to pull task types out of the database + + result = invoke(swh_scheduler, False, ["schedule-recurrent"]) + assert result.exit_code == 0, result.output + + +def test_recurrent_visit_scheduling( + swh_scheduler, caplog, listed_origins_by_type, mocker, +): + """Scheduling known tasks is ok.""" + + caplog.set_level(logging.DEBUG, MODULE_NAME) + nb_origins = 1000 + + mock_celery_app = MagicMock() + mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") + mock_available_slots.return_value = nb_origins # Slots available in queue + + # Make sure the scheduler is properly configured in terms of visit/task types + all_task_types = { + task_type_d["type"]: task_type_d + for task_type_d in swh_scheduler.get_task_types() + } + + visit_types = list(listed_origins_by_type.keys()) + assert len(visit_types) > 0 + + task_types = [] + origins = [] + for visit_type, _origins in listed_origins_by_type.items(): + origins.extend(swh_scheduler.record_listed_origins(_origins)) + task_type_name = f"load-{visit_type}" + assert task_type_name in all_task_types.keys() + task_type = all_task_types[task_type_name] + task_type["visit_type"] = visit_type + # we'll limit the orchestrator to the origins' type we know + task_types.append(task_type) + + for visit_type in ["git", "svn"]: + task_type = f"load-{visit_type}" + send_visits_for_visit_type( + swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type] + ) + + assert mock_available_slots.called, "The available slots functions should be called" + + records = [record.message for record in caplog.records] + + # Mapping over the dict ratio/policies entries can change overall order so let's + # check the set of records + expected_records = set() + for task_type in task_types: + visit_type = task_type["visit_type"] + queue_name = task_type["backend_name"] + msg = ( + f"{nb_origins} available slots for visit type {visit_type} " + f"in queue {queue_name}" + ) + expected_records.add(msg) + + for expected_record in expected_records: + assert expected_record in set(records) + + +@pytest.fixture +def scheduler_config(swh_scheduler_config): + return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}} + + +def test_visit_scheduler_thread_unknown_task( + swh_scheduler, scheduler_config, +): + """Starting a thread with unknown task type reports the error""" + + unknown_visit_type = "unknown" + command_queue = Queue() + exc_queue = Queue() + + visit_scheduler_thread( + scheduler_config, unknown_visit_type, command_queue, exc_queue + ) + + assert command_queue.empty() is True + assert exc_queue.empty() is False + assert len(exc_queue.queue) == 1 + result = exc_queue.queue.pop() + assert result[0] == unknown_visit_type + assert isinstance(result[1], ValueError) + + +def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker): + """Spawning and terminating threads runs smoothly""" + + threads: VisitSchedulerThreads = {} + exc_queue = Queue() + mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") + mock_build_app.return_value = MagicMock() + + assert len(threads) == 0 + for visit_type in visit_types: + spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) + + # This actually only checks the spawning and terminating logic is sound + + assert len(threads) == len(visit_types) + + actual_threads = terminate_visit_scheduler_threads(threads) + assert not len(actual_threads) + + assert mock_build_app.called