diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 38b09c9..5d09303 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,41 +1,41 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.1.0 + rev: v4.3.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml - - repo: https://gitlab.com/pycqa/flake8 - rev: 4.0.1 + - repo: https://github.com/pycqa/flake8 + rev: 5.0.4 hooks: - id: flake8 - additional_dependencies: [flake8-bugbear==22.3.23] + additional_dependencies: [flake8-bugbear==22.9.23] - repo: https://github.com/codespell-project/codespell - rev: v2.1.0 + rev: v2.2.2 hooks: - id: codespell name: Check source code spelling args: [-L simpy, -L hist] stages: [commit] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] - repo: https://github.com/PyCQA/isort rev: 5.10.1 hooks: - id: isort - repo: https://github.com/python/black - rev: 22.3.0 + rev: 22.10.0 hooks: - id: black diff --git a/PKG-INFO b/PKG-INFO index 4b1a7af..c9ad4f8 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,33 +1,33 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 1.2.3 +Version: 1.4.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator License-File: LICENSE License-File: LICENSE.Celery License-File: AUTHORS swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). diff --git a/debian/changelog b/debian/changelog index 0aa6a3a..078da13 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,1187 +1,1208 @@ -swh-scheduler (1.2.3-1~swh1~bpo10+1) buster-swh; urgency=medium +swh-scheduler (1.4.0-1~swh1) unstable-swh; urgency=medium - * Rebuild for buster-swh + * New upstream release 1.4.0 - (tagged by Antoine R. Dumont + (@ardumont) on 2022-12-12 11:45:31 + +0100) + * Upstream changes: - v1.4.0 - cli.add_forge_now: Open + `register-lister` with sensible defaults - cli.add_forge_now: + Open `schedule-first-visits` with sensible defaults + + -- Software Heritage autobuilder (on jenkins-debian1) Mon, 12 Dec 2022 10:51:31 +0000 - -- Software Heritage autobuilder (on jenkins-debian1) Mon, 03 Oct 2022 12:10:15 +0000 +swh-scheduler (1.3.0-1~swh1) unstable-swh; urgency=medium + + * New upstream release 1.3.0 - (tagged by Antoine R. Dumont + (@ardumont) on 2022-12-07 13:45:04 + +0100) + * Upstream changes: - v1.3.0 - task add: Ensure task type + provided exist and raise otherwise - grab_next_visits: Open + lister name and instance name filtering - send-to-celery: Adapt + to schedule from lister name & instance_name - Ensure origins + are not visited faster than twice a day - Refresh task type data + from the database every time recurrent tasks are run - Use json + instead of msgpack for serializers - pre-commit, tox: Bump pre- + commit, codespell, black and flake8 + + -- Software Heritage autobuilder (on jenkins-debian1) Wed, 07 Dec 2022 12:50:33 +0000 swh-scheduler (1.2.3-1~swh1) unstable-swh; urgency=medium * New upstream release 1.2.3 - (tagged by Antoine R. Dumont (@ardumont) on 2022-10-03 14:02:07 +0200) * Upstream changes: - v1.2.3 - Fix compatibility issue with latest dependency version - backend: Prevent query exception when lister ids is empty -- Software Heritage autobuilder (on jenkins-debian1) Mon, 03 Oct 2022 12:07:44 +0000 swh-scheduler (1.2.2-1~swh1) unstable-swh; urgency=medium * New upstream release 1.2.2 - (tagged by Antoine Lambert on 2022-09-15 13:54:36 +0200) * Upstream changes: - version 1.2.2 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 15 Sep 2022 12:00:17 +0000 swh-scheduler (1.2.1-1~swh1) unstable-swh; urgency=medium * New upstream release 1.2.1 - (tagged by David Douard on 2022-07-08 14:57:07 +0200) * Upstream changes: - v1.2.1 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 08 Jul 2022 14:53:11 +0000 swh-scheduler (1.2.0-1~swh1) unstable-swh; urgency=medium * New upstream release 1.2.0 - (tagged by Antoine R. Dumont (@ardumont) on 2022-06-03 15:37:19 +0200) * Upstream changes: - v1.2.0 - Remove unused get_current_version method - tests: use stock pytest_postgresql factory function -- Software Heritage autobuilder (on jenkins-debian1) Fri, 03 Jun 2022 13:47:47 +0000 swh-scheduler (1.1.2-1~swh1) unstable-swh; urgency=medium * New upstream release 1.1.2 - (tagged by Antoine Lambert on 2022-05-12 13:49:42 +0200) * Upstream changes: - version 1.1.2 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 12 May 2022 11:55:14 +0000 swh-scheduler (1.1.1-1~swh1) unstable-swh; urgency=medium * New upstream release 1.1.1 - (tagged by Valentin Lorentz on 2022-04-28 11:31:00 +0200) * Upstream changes: - v1.1.1 - * cli/utils: Fix parsing of empty strings - * Bump mypy version -- Software Heritage autobuilder (on jenkins-debian1) Thu, 28 Apr 2022 09:36:24 +0000 swh-scheduler (1.1.0-1~swh1) unstable-swh; urgency=medium * New upstream release 1.1.0 - (tagged by Valentin Lorentz on 2022-04-26 12:29:19 +0200) * Upstream changes: - v1.1.0 - * Add 'lister_name' and 'lister_instance_name' arguments to all tasks created from ListedOrigin - * Make scheduling policy used in schedule_recurrent configurable - * Update a bit the documentation for the new origin visit scheduler - * test/lint maitenance -- Software Heritage autobuilder (on jenkins-debian1) Tue, 26 Apr 2022 10:35:51 +0000 swh-scheduler (1.0.0-1~swh1) unstable-swh; urgency=medium * New upstream release 1.0.0 - (tagged by David Douard on 2022-02-24 16:56:30 +0100) * Upstream changes: - v1.0.0 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 24 Feb 2022 16:03:55 +0000 swh-scheduler (0.23.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.23.0 - (tagged by Vincent SELLIER on 2022-01-06 09:35:24 +0100) * Upstream changes: - v0.23.0 - Changelog: - * Allow to specify the visit grab parameters per visit type and policy - * Pin mypy and drop type annotations which makes mypy unhappy - * Use a temporary table to update scheduler metrics - * Clean up disabled scheduler archival task related services -- Software Heritage autobuilder (on jenkins-debian1) Thu, 06 Jan 2022 08:39:47 +0000 swh-scheduler (0.22.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.22.0 - (tagged by Vincent SELLIER on 2021-12-08 09:28:57 +0100) * Upstream changes: - v0.22.0 - changelog: - Make next_visit_queue_position an integer -- Software Heritage autobuilder (on jenkins-debian1) Wed, 08 Dec 2021 09:06:01 +0000 swh-scheduler (0.21.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.21.0 - (tagged by Vincent SELLIER on 2021-12-07 08:41:11 +0100) * Upstream changes: - v0.21.0 - Changelog: - Ensure there is no duplicated origins in the insertion batches -- Software Heritage autobuilder (on jenkins-debian1) Tue, 07 Dec 2021 07:45:40 +0000 swh-scheduler (0.20.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.20.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-11-22 16:11:16 +0100) * Upstream changes: - v0.20.0 - recurrent visits scheduler: use policy weights instead of ratios - recurrent visits scheduler: Improve docs rendering - backend: Fix CardinalityViolation in grab_next_visits on duplicate origins -- Software Heritage autobuilder (on jenkins-debian1) Mon, 22 Nov 2021 15:14:56 +0000 swh-scheduler (0.19.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.19.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-10-28 13:10:55 +0200) * Upstream changes: - v0.19.0 - Add a new cli endpoint to schedule recurrent visits in Celery - grab_next_visits: avoid time interval calculations in PostgreSQL - Restrict the click version to avoid conflict version with celery's - Add docstring to runner and listener modules - Drop deprecated listener module - scheduler: Deprecate unused main celery runner -- Software Heritage autobuilder (on jenkins-debian1) Thu, 28 Oct 2021 11:15:10 +0000 swh-scheduler (0.18.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.18.2 - (tagged by Antoine R. Dumont (@ardumont) on 2021-10-18 15:11:59 +0200) * Upstream changes: - v0.18.2 - Use swh_storage fixture for cli tests -- Software Heritage autobuilder (on jenkins-debian1) Mon, 18 Oct 2021 13:18:56 +0000 swh-scheduler (0.18.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.18.1 - (tagged by Antoine R. Dumont (@ardumont) on 2021-10-15 15:49:35 +0200) * Upstream changes: - v0.18.1 - Return 0 slot if no more slots available in the queues -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Oct 2021 13:53:38 +0000 swh-scheduler (0.18.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.18.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-09-02 11:32:59 +0200) * Upstream changes: - v0.18.0 - Refine scheduling policy for origins with no known last update - Add a swh scheduler origin send-to-celery subcommand - runner: Improve help message on the task types flag. - send-to-celery: Add more options to allow scheduling of edge cases - Add table sampling option to grab_next_visits - journal_client: Only upsert if we have something to upsert -- Software Heritage autobuilder (on jenkins-debian1) Thu, 02 Sep 2021 09:35:32 +0000 swh-scheduler (0.17.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.17.1 - (tagged by Antoine R. Dumont (@ardumont) on 2021-08-26 10:30:12 +0200) * Upstream changes: - v0.17.1 - journal_client: Ensure queue position does not overflow -- Software Heritage autobuilder (on jenkins-debian1) Thu, 26 Aug 2021 08:41:41 +0000 swh-scheduler (0.17.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.17.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-08-05 15:29:18 +0200) * Upstream changes: - v0.17.0 - Introduce new scheduling policy to grab origins without last update - journal_client: Disable origins when too many visited attempts failed - journal_client: Record last_visited and last_successful in origin_visit_stats - Add a specific cooldown for notfound origins - Add a (longer) specific cooldown for failed origin visits - Make the origin visit scheduling cooldown configurable - Various refactoring to simplify the grab next visits logic and updates -- Software Heritage autobuilder (on jenkins-debian1) Fri, 06 Aug 2021 09:11:54 +0000 swh-scheduler (0.16.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.16.0 - (tagged by Antoine Lambert on 2021-06-22 17:35:55 +0200) * Upstream changes: - version 0.16.0 -- Software Heritage autobuilder (on jenkins-debian1) Tue, 22 Jun 2021 15:39:45 +0000 swh-scheduler (0.15.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.15.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-06-10 16:09:06 +0200) * Upstream changes: - v0.15.0 - separate-runner runner: Separate scheduling tasks with and without priority concern - Refactor and extract a get_available_slots utility - Add typing stubs dependencies for mypy>0.900 - pytest_plugin: Explicitly set hostname in broker_url for celery TestApp -- Software Heritage autobuilder (on jenkins-debian1) Thu, 10 Jun 2021 14:48:52 +0000 swh-scheduler (0.14.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.14.2 - (tagged by Valentin Lorentz on 2021-05-06 17:09:00 +0200) * Upstream changes: - v0.14.2 - * Fix flaky tests -- Software Heritage autobuilder (on jenkins-debian1) Thu, 06 May 2021 15:13:11 +0000 swh-scheduler (0.14.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.14.1 - (tagged by Antoine R. Dumont (@ardumont) on 2021-05-06 16:00:07 +0200) * Upstream changes: - v0.14.1 - Use swh.core 0.14 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 06 May 2021 14:17:39 +0000 swh-scheduler (0.13.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.13.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-04-20 11:46:51 +0200) * Upstream changes: - v0.13.0 - scheduler: Clean up priority/ratio task dead code - Parse task_ids before calling set_status_tasks. - tests: Complete checks on message with priority consumption -- Software Heritage autobuilder (on jenkins-debian1) Tue, 20 Apr 2021 09:51:00 +0000 swh-scheduler (0.12.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.12.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-04-15 13:31:30 +0200) * Upstream changes: - v0.12.0 - Route priority tasks to dedicated save code now queues - Fix various Sphinx warnings -- Software Heritage autobuilder (on jenkins-debian1) Thu, 15 Apr 2021 11:36:13 +0000 swh-scheduler (0.11.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.11.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-04-14 18:15:53 +0200) * Upstream changes: - v0.11.0 - separate-queues backend: Open endpoints to peek/grab tasks with any priority - Make origin_visit_stats_get return results from all pages - journal client: Filter out status messages without type - Simplify max_date() - journal_client: Fix date computations for (un)eventful visits - journal_client: Deal with failed status message -- Software Heritage autobuilder (on jenkins-debian1) Wed, 14 Apr 2021 16:19:31 +0000 swh-scheduler (0.10.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.10.0 - (tagged by Nicolas Dandrimont on 2021-02-03 22:53:20 +0100) * Upstream changes: - Release swh.scheduler 0.10.0 - Eagerly acknowledge celery tasks - Loads of simulator improvements - grab_next_visits: - clean up query building - account for schedule time to avoid rescheduling visits too fast - allow overriding the scheduling timestamp for the simulator -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Feb 2021 22:10:13 +0000 swh-scheduler (0.9.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.9.2 - (tagged by Antoine Lambert on 2021-01-25 16:27:41 +0100) * Upstream changes: - version 0.9.2 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 25 Jan 2021 15:31:21 +0000 swh-scheduler (0.9.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.9.1 - (tagged by Vincent SELLIER on 2021-01-21 19:20:33 +0100) * Upstream changes: - v0.9.1 - * Solve uneventful/eventful with unordered messages with snapshots - * Do not consider duplicated messages as uneventful event - * Reorganize grab_next_visits tests to better check sorting behavior -- Software Heritage autobuilder (on jenkins-debian1) Thu, 21 Jan 2021 18:28:00 +0000 swh-scheduler (0.9.0-1~swh2) unstable-swh; urgency=medium * Bump new release to unstuck packaging -- Antoine R. Dumont (@ardumont) Thu, 21 Jan 2021 13:20:14 +0000 swh-scheduler (0.9.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.9.0 - (tagged by Antoine R. Dumont (@ardumont) on 2021-01-21 11:54:47 +0100) * Upstream changes: - v0.9.0 - Populate origin_visit_stats table out of the origin_visit_status topic - Introduce a scheduler policy simulator (old task-based scheduler, ...) - Implement basic aggregated metrics on listed origins - scheduler.cli.journal: Add `swh scheduler journal-client` cli - Filter origins by visit type when scheduling the next visits - Introduce a `swh scheduler origin schedule-next` cli - Introduce a `swh scheduler origin grab-next` cli - Add an new origin visit stats model object and related backend api - Implement a basic endpoint for getting the next origins to visit - doc: Add a cli section to the doc -- Software Heritage autobuilder (on jenkins-debian1) Thu, 21 Jan 2021 11:00:29 +0000 swh-scheduler (0.8.2-1~swh2) unstable-swh; urgency=medium * Bump dependency -- Antoine R. Dumont (@ardumont) Tue, 08 Dec 2020 09:29:26 +0000 swh-scheduler (0.8.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.8.2 - (tagged by Antoine R. Dumont (@ardumont) on 2020-12-07 09:52:28 +0100) * Upstream changes: - v0.8.2 - requirement: Adapt celery requirements - Replace usage of arrow datetime objects in favor of pure datetime ones - Stop using the deprecated configuration scheme - cli.task_type: All task_type clis without a scheduler should raise -- Software Heritage autobuilder (on jenkins-debian1) Mon, 07 Dec 2020 08:55:39 +0000 swh-scheduler (0.8.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.8.1 - (tagged by Antoine R. Dumont (@ardumont) on 2020-11-24 14:13:36 +0100) * Upstream changes: - v0.8.1 - conftest: Reference swh.core.db.pytest_plugin -- Software Heritage autobuilder (on jenkins-debian1) Tue, 24 Nov 2020 13:16:08 +0000 swh-scheduler (0.8.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.8.0 - (tagged by Antoine R. Dumont (@ardumont) on 2020-11-23 13:42:05 +0100) * Upstream changes: - v0.8.0 - requirements-test.txt: Drop no longer needed pytest-postgresql requirement - scheduler.pytest_plugin: Make scheduler tests faster -- Software Heritage autobuilder (on jenkins-debian1) Mon, 23 Nov 2020 12:44:40 +0000 swh-scheduler (0.7.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.7.0 - (tagged by Antoine R. Dumont (@ardumont) on 2020-10-19 09:30:36 +0200) * Upstream changes: - v0.7.0 - scheduler: Type and unify get_scheduler factory with other factories - pytest_plugin: Explicitly name the scheduler test db differently - test_server: Simplify exception manipulations - tox.ini: pin black to the pre-commit version (19.10b0) to avoid flip-flops -- Software Heritage autobuilder (on jenkins-debian1) Mon, 19 Oct 2020 07:33:54 +0000 swh-scheduler (0.6.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.6.0 - (tagged by David Douard on 2020-09-25 12:03:33 +0200) * Upstream changes: - v0.6.0 -- Software Heritage autobuilder (on jenkins-debian1) Fri, 25 Sep 2020 10:06:32 +0000 swh-scheduler (0.5.3-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.3 - (tagged by Nicolas Dandrimont on 2020-09-24 17:49:27 +0200) * Upstream changes: - Release swh.scheduler v0.5.3 - Improve swh cli startup time - Add isort and update flake8 - Improve pytest execution time - Support recent kombu versions -- Software Heritage autobuilder (on jenkins-debian1) Thu, 24 Sep 2020 15:53:25 +0000 swh-scheduler (0.5.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.2 - (tagged by Antoine R. Dumont (@ardumont) on 2020-07-10 13:01:48 +0200) * Upstream changes: - v0.5.2 - Do no expose pytest-plugin through setuptools, let modules require it when needed -- Software Heritage autobuilder (on jenkins-debian1) Fri, 10 Jul 2020 11:08:30 +0000 swh-scheduler (0.5.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.1 - (tagged by Nicolas Dandrimont on 2020-07-09 10:18:03 +0200) * Upstream changes: - Release swh.scheduler 0.5.1 - Drop dependency on future (not needed anymore) -- Software Heritage autobuilder (on jenkins-debian1) Thu, 09 Jul 2020 09:51:38 +0000 swh-scheduler (0.5.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.5.0 - (tagged by Nicolas Dandrimont on 2020-07-09 10:16:57 +0200) * Upstream changes: - Release swh.scheduler v0.5.0 - Move celery fixtures to the pytest plugin -- Software Heritage autobuilder (on jenkins-debian1) Thu, 09 Jul 2020 08:20:42 +0000 swh-scheduler (0.4.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.4.0 - (tagged by Nicolas Dandrimont on 2020-07-06 16:47:28 +0200) * Upstream changes: - Release swh.scheduler 0.4.0 - Extract pytest fixtures to a pytest plugin -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jul 2020 14:52:42 +0000 swh-scheduler (0.3.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.3.0 - (tagged by Nicolas Dandrimont on 2020-07-06 12:18:28 +0200) * Upstream changes: - Release swh.scheduler 0.3.0 - Add get_listed_origins endpoint -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 Jul 2020 10:23:31 +0000 swh-scheduler (0.2.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.2 - (tagged by Nicolas Dandrimont on 2020-06-22 14:03:34 +0200) * Upstream changes: - Release swh.scheduler 0.2.2 - Re- introduce root endpoint for the RPC server -- Software Heritage autobuilder (on jenkins-debian1) Mon, 22 Jun 2020 12:07:05 +0000 swh-scheduler (0.2.1-1~swh1) unstable-swh; urgency=medium [ Nicolas Dandrimont ] * Force celery >= 4.3 [ Software Heritage autobuilder (on jenkins-debian1) ] * New upstream release 0.2.1 - (tagged by Nicolas Dandrimont on 2020-06-22 12:09:32 +0200) * Upstream changes: - Release swh.scheduler 0.2.1 - Bump celery requirement to 4.3+ -- Software Heritage autobuilder (on jenkins-debian1) Mon, 22 Jun 2020 10:12:50 +0000 swh-scheduler (0.2.0-1~swh1) unstable-swh; urgency=medium [ Nicolas Dandrimont ] * Switch from vcversioner to setuptools-scm * wrap-and-sort [ Software Heritage autobuilder (on jenkins-debian1) ] * New upstream release 0.2.0 - (tagged by Nicolas Dandrimont on 2020-06-22 10:33:11 +0200) * Upstream changes: - Release swh.scheduler 0.2.0 - Implement storage of lister and listed origin information - Add swh scheduler celery-monitor command - Overhaul RPC to use automatic generation -- Software Heritage autobuilder (on jenkins-debian1) Mon, 22 Jun 2020 08:36:49 +0000 swh-scheduler (0.1.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.1.1 - (tagged by Nicolas Dandrimont on 2020-06-03 11:34:19 +0200) * Upstream changes: - Release swh.scheduler v0.1.1 - Add missing dependency on future for celery 4.4.4 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Jun 2020 09:39:25 +0000 swh-scheduler (0.1.0-1~swh1) unstable-swh; urgency=medium * New upstream release 0.1.0 - (tagged by Nicolas Dandrimont on 2020-05-19 11:48:34 +0200) * Upstream changes: - Release swh.scheduler v0.1.0 - Blacken source code - Disable azure http logspam - Only schedule tasks when the buffer is somewhat empty -- Software Heritage autobuilder (on jenkins-debian1) Tue, 19 May 2020 09:52:31 +0000 swh-scheduler (0.0.72-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.72 - (tagged by Nicolas Dandrimont on 2020-03-23 13:07:38 +0100) * Upstream changes: - Release swh.scheduler v0.0.72 - Update instantiation of storage in tests - ensure that create_task_type is idempotent - introduce new listener based on pika -- Software Heritage autobuilder (on jenkins-debian1) Mon, 23 Mar 2020 12:12:00 +0000 swh-scheduler (0.0.71-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.71 - (tagged by Antoine R. Dumont (@ardumont) on 2020-01-23 14:24:56 +0100) * Upstream changes: - v0.0.71 - sentry: Fix initialization init_sentry call -- Software Heritage autobuilder (on jenkins-debian1) Thu, 23 Jan 2020 13:29:33 +0000 swh-scheduler (0.0.70-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.70 - (tagged by Antoine R. Dumont (@ardumont) on 2020-01-23 13:43:35 +0100) * Upstream changes: - v0.0.70 - Use swh.core.sentry instead of calling sentry_sdk.init directly - backend_es: Fix configuration mapping -- Software Heritage autobuilder (on jenkins-debian1) Thu, 23 Jan 2020 12:47:43 +0000 swh-scheduler (0.0.69-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.69 - (tagged by Antoine R. Dumont (@ardumont) on 2019-12-17 16:00:24 +0100) * Upstream changes: - v0.0.69 - Fix scheduler's archive task cli - Make the filter task endpoint a paginated endpoint - Add coverage on the archive task cli -- Software Heritage autobuilder (on jenkins-debian1) Tue, 17 Dec 2019 15:04:48 +0000 swh-scheduler (0.0.68-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.68 - (tagged by Antoine R. Dumont (@ardumont) on 2019-12-17 15:28:13 +0100) * Upstream changes: - v0.0.68 - Fix scheduler's archive task cli - Make the filter task endpoint a paginated endpoint - Add coverage on the archive task cli -- Software Heritage autobuilder (on jenkins-debian1) Tue, 17 Dec 2019 14:33:33 +0000 swh-scheduler (0.0.67-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.67 - (tagged by Antoine R. Dumont (@ardumont) on 2019-12-17 14:33:36 +0100) * Upstream changes: - v0.0.67 - Fix scheduler's archive task cli - Make the filter task endpoint a paginated endpoint - Add coverage on the archive task cli -- Software Heritage autobuilder (on jenkins-debian1) Tue, 17 Dec 2019 13:38:03 +0000 swh-scheduler (0.0.66-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.66 - (tagged by Nicolas Dandrimont on 2019-12-17 12:04:20 +0100) * Upstream changes: - Release swh.scheduler v0.0.66 - initialize sentry on celery worker startup - improve task archival endpoints in backend api -- Software Heritage autobuilder (on jenkins-debian1) Tue, 17 Dec 2019 11:08:25 +0000 swh-scheduler (0.0.65-1~swh2) unstable-swh; urgency=medium * Add pytest-mock build-dependency. -- Nicolas Dandrimont Fri, 13 Dec 2019 11:57:41 +0100 swh-scheduler (0.0.65-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.65 - (tagged by Nicolas Dandrimont on 2019-12-13 11:45:55 +0100) * Upstream changes: - Release swh.scheduler v0.0.65 - Drop the scheduler updater - Add a statsd probe for task execution timestamps - Add listener and runner statsd probes - CLI updates - Python packaging housekeeping -- Software Heritage autobuilder (on jenkins-debian1) Fri, 13 Dec 2019 10:54:31 +0000 swh-scheduler (0.0.64-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.64 - (tagged by Antoine R. Dumont (@ardumont) on 2019-11-20 14:26:00 +0100) * Upstream changes: - v0.0.64 - req-swh*: Remove old package loader backend names -- Software Heritage autobuilder (on jenkins-debian1) Wed, 20 Nov 2019 13:29:37 +0000 swh-scheduler (0.0.63-1~swh2) unstable-swh; urgency=medium * Update build dependency -- Antoine R. Dumont (@ardumont) Tue, 19 Nov 2019 17:07:40 +0100 swh-scheduler (0.0.63-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.63 - (tagged by Antoine R. Dumont (@ardumont) on 2019-11-19 14:09:12 +0100) * Upstream changes: - v0.0.63 - swh.scheduler.cli: Add `swh scheduler task-type register` cli - Use the shared_task decorator instead of binding to a specific celery app - celery/tests: mostly revert e770eb30 to fix celery app initialization in tests -- Software Heritage autobuilder (on jenkins-debian1) Tue, 19 Nov 2019 13:14:59 +0000 swh-scheduler (0.0.62-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.62 - (tagged by Antoine R. Dumont (@ardumont) on 2019-10-18 13:39:27 +0200) * Upstream changes: - v0.0.62 - celery_backend.config: Make JournalHandler import optional - tests: rewrite tests using pytest fixtures -- Software Heritage autobuilder (on jenkins-debian1) Fri, 18 Oct 2019 11:46:26 +0000 swh-scheduler (0.0.61-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.61 - (tagged by Nicolas Dandrimont on 2019-10-07 16:33:17 +0200) * Upstream changes: - Release swh.scheduler v0.0.61 - Remove bogus dict.get(default=) statement -- Software Heritage autobuilder (on jenkins-debian1) Mon, 07 Oct 2019 14:37:37 +0000 swh-scheduler (0.0.60-1~swh2) unstable-swh; urgency=medium * Force postgresql executable to a pg_ctl that exists when running tests. -- Nicolas Dandrimont Tue, 01 Oct 2019 18:14:39 +0200 swh-scheduler (0.0.60-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.60 - (tagged by Stefano Zacchiroli on 2019-10-01 13:13:13 +0200) * Upstream changes: - v0.0.60 - * tox: anticipate mypy run to just after flake8 - * init.py: switch to documented way of extending path - * tox.ini: add mypy section - * typing: minimal changes to make a no-op mypy run pass - * fix typo in docstring and sample file name - * admin CLI: drop obsolete backward compatibility aliases - * click "required" param wants bool, not int -- Software Heritage autobuilder (on jenkins-debian1) Tue, 01 Oct 2019 11:22:43 +0000 swh-scheduler (0.0.59-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.59 - (tagged by David Douard on 2019-09-04 16:08:27 +0200) * Upstream changes: - v0.0.59 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 04 Sep 2019 14:11:48 +0000 swh-scheduler (0.0.58-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.58 - (tagged by Antoine R. Dumont (@ardumont) on 2019-09-03 10:19:34 +0200) * Upstream changes: - v0.0.58 - celery: auto add tasks declared in the swh.workers entry point in task_modules - api/client: use RPCClient instead of deprecated SWHRemoteAPI - Make schedule_origins use origin urls instead of ids in task arguments. - docs: add code of conduct document - docs: very beginning of a practical documentation on the scheduler - config: Add a pre-commit config file - data: Insert new cgit instance lister task - data: Insert load-tar task-type -- Software Heritage autobuilder (on jenkins-debian1) Tue, 03 Sep 2019 08:28:19 +0000 swh-scheduler (0.0.57-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.57 - (tagged by David Douard on 2019-06-26 14:56:32 +0200) * Upstream changes: - v0.0.57 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 26 Jun 2019 13:05:20 +0000 swh-scheduler (0.0.56-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.56 - (tagged by Nicolas Dandrimont on 2019-05-07 18:16:20 +0200) * Upstream changes: - listener: Release the db object after using it - This is the contract that get_db/put_db is supposed to conform to. -- Software Heritage autobuilder (on jenkins-debian1) Tue, 14 May 2019 12:40:09 +0000 swh-scheduler (0.0.55-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.55 - (tagged by Antoine Lambert on 2019-05-06 11:47:43 +0200) * Upstream changes: - version 0.0.55 -- Software Heritage autobuilder (on jenkins-debian1) Mon, 06 May 2019 09:54:51 +0000 swh-scheduler (0.0.54-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.54 - (tagged by Antoine R. Dumont (@ardumont) on 2019-04-11 11:33:40 +0200) * Upstream changes: - v0.0.54 - cli_utils: Use yaml.safe_load instead of yaml.load - Fix support of latest versions of swh- core and psycopg2 - sql/data: Add npm related task types -- Software Heritage autobuilder (on jenkins-debian1) Thu, 11 Apr 2019 09:40:14 +0000 swh-scheduler (0.0.53-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.53 - (tagged by Antoine Lambert on 2019-04-04 16:45:56 +0200) * Upstream changes: - version 0.0.53 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 04 Apr 2019 14:55:20 +0000 swh-scheduler (0.0.52-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.52 - (tagged by Nicolas Dandrimont on 2019-04-03 10:54:06 +0200) * Upstream changes: - Release swh.scheduler v0.0.52 - Move to result_serializer = json to work around celery 4.3 bug - Fix db initialization -- Software Heritage autobuilder (on jenkins-debian1) Wed, 03 Apr 2019 08:59:00 +0000 swh-scheduler (0.0.51-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.51 - (tagged by Antoine R. Dumont (@ardumont) on 2019-03-22 12:09:22 +0100) * Upstream changes: - v0.0.51 - requirements.txt: Remove kombu dependency -- Software Heritage autobuilder (on jenkins-debian1) Fri, 22 Mar 2019 11:16:06 +0000 swh-scheduler (0.0.50-1~swh2) unstable-swh; urgency=medium * Update build- and runtime dependencies -- Nicolas Dandrimont Fri, 15 Mar 2019 18:24:11 +0100 swh-scheduler (0.0.50-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.50 - (tagged by Nicolas Dandrimont on 2019-03-15 18:07:24 +0100) * Upstream changes: - Release swh.scheduler v0.0.50 - Add an explicit log target for stdout and/or journald - Avoid useless log lines - Improve test coverage - Add support for non- string options in the CLI -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Mar 2019 17:16:03 +0000 swh-scheduler (0.0.49-1~swh2) unstable-swh; urgency=medium * Export LC_ALL=C.UTF-8 -- Nicolas Dandrimont Thu, 14 Mar 2019 13:42:24 +0100 swh-scheduler (0.0.49-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.49 - (tagged by Nicolas Dandrimont on 2019-03-03 08:48:04 +0100) * Upstream changes: - Release swh.scheduler v0.0.49 - various fixes around celery behavior - move wsgi endpoint to a separate module - add tests for the CLI -- Software Heritage autobuilder (on jenkins-debian1) Sun, 03 Mar 2019 07:55:41 +0000 swh-scheduler (0.0.48-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.48 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-22 16:11:51 +0100) * Upstream changes: - v0.0.48 - Fix comment on main scheduler schema -- Software Heritage autobuilder (on jenkins-debian1) Fri, 22 Feb 2019 15:17:20 +0000 swh-scheduler (0.0.47-1~swh2) unstable-swh; urgency=low * Upstream release to fix build dependencies issue -- Antoine Romain Dumont (@ardumont) Thu, 21 Feb 2019 15:41:24 +0100 swh-scheduler (0.0.47-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.47 - (tagged by Valentin Lorentz on 2019-02-20 16:53:20 +0100) * Upstream changes: - Fix crash of SchedulerBackend.search_tasks when no argument is given. -- Software Heritage autobuilder (on jenkins-debian1) Thu, 21 Feb 2019 09:13:07 +0000 swh-scheduler (0.0.46-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.46 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-15 15:05:47 +0100) * Upstream changes: - v0.0.46 - scheduler.task: Remove no longer used Task class -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Feb 2019 14:15:26 +0000 swh-scheduler (0.0.45-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.45 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-15 10:43:07 +0100) * Upstream changes: - v0.0.45 - celery_backend/config: Fix loglevel for amqp module -- Software Heritage autobuilder (on jenkins-debian1) Fri, 15 Feb 2019 09:48:25 +0000 swh-scheduler (0.0.44-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.44 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-13 16:29:05 +0100) * Upstream changes: - v0.0.44 - swh-scheduler-api: Fix configuration read too many times -- Software Heritage autobuilder (on jenkins-debian1) Wed, 13 Feb 2019 15:34:34 +0000 swh-scheduler (0.0.43-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.43 - (tagged by David Douard on 2019-02-13 15:27:27 +0100) * Upstream changes: - v0.0.43 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 13 Feb 2019 14:46:59 +0000 swh-scheduler (0.0.42-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.42 - (tagged by Antoine R. Dumont (@ardumont) on 2019-02-11 14:28:10 +0100) * Upstream changes: - v0.0.42 - Fix dependency requirements for hypothesis -- Software Heritage autobuilder (on jenkins-debian1) Mon, 11 Feb 2019 13:33:48 +0000 swh-scheduler (0.0.41-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.41 - (tagged by David Douard on 2019-02-06 15:25:56 +0100) * Upstream changes: - v0.0.41 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 06 Feb 2019 15:33:04 +0000 swh-scheduler (0.0.40-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.40 - (tagged by Antoine R. Dumont (@ardumont) on 2019-01-28 16:24:04 +0100) * Upstream changes: - v0.0.40 - swh.scheduler.tests: Mark db tests as such - Force tox environment to C.UTF-8 locale - Add debug logging in the SWHTask class -- Software Heritage autobuilder (on jenkins-debian1) Mon, 28 Jan 2019 15:30:41 +0000 swh-scheduler (0.0.39-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.39 - (tagged by David Douard on 2019-01-16 13:37:58 +0100) * Upstream changes: - v0.0.39 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 16 Jan 2019 12:42:37 +0000 swh-scheduler (0.0.38-1~swh1) unstable-swh; urgency=medium * New upstream release 0.0.38 - (tagged by David Douard on 2018-12-20 14:39:59 +0100) * Upstream changes: - v0.0.38 -- Software Heritage autobuilder (on jenkins-debian1) Wed, 09 Jan 2019 18:32:14 +0000 swh-scheduler (0.0.35-1~swh1) unstable-swh; urgency=medium * v0.0.35 * tests: Add SchedulerTestFixture * swh.scheduler.utils: Allow to add more task information * sql/40-swh-data: Update new indexer task types for local db -- Antoine R. Dumont (@ardumont) Mon, 29 Oct 2018 10:07:08 +0100 swh-scheduler (0.0.34-1~swh1) unstable-swh; urgency=medium * v0.0.34 * Finalize pytest migration -- Antoine R. Dumont (@ardumont) Thu, 25 Oct 2018 17:52:03 +0200 swh-scheduler (0.0.33-1~swh1) unstable-swh; urgency=medium * v0.0.33 -- David Douard Thu, 25 Oct 2018 16:03:16 +0200 swh-scheduler (0.0.32-1~swh1) unstable-swh; urgency=medium * v0.0.32 * tests: Add celery fixture to ease tests * tests: make tests use sql/ files from the package * tests: Starting migration towards pytest * listener: Make the listener code compatible with new celery (debian buster) * Make swh_scheduler_create_tasks_from_temp use indexes * setup: prepare for pypi upload * docs: add a simple README file -- Antoine R. Dumont (@ardumont) Mon, 22 Oct 2018 15:37:51 +0200 swh-scheduler (0.0.31-1~swh1) unstable-swh; urgency=medium * v0.0.31 * sql/swh-scheduler: Make the create_tasks call idempotent * swh.scheduler.utils: Open create_task_dict function * sql/scheduler-data: Add lister gitlab task types * sql/scheduler-data: Reference the existing production lister data * swh.scheduler.backend_es: Open sniffing options -- Antoine R. Dumont (@ardumont) Tue, 31 Jul 2018 06:55:39 +0200 swh-scheduler (0.0.30-1~swh1) unstable-swh; urgency=medium * v0.0.30 * swh-scheduler-schema.sql: Archive disabled oneshot tasks as well * swh.scheduler.cli: Add policy to pretty printing task routine * swh.scheduler.cli: Fix broken cli list-pending since api change -- Antoine R. Dumont (@ardumont) Fri, 22 Jun 2018 18:07:02 +0200 swh-scheduler (0.0.29-1~swh1) unstable-swh; urgency=medium * v0.0.29 * swh.scheduler.cli: Change archival period to rolling month - 1 week * swh.scheduler.updater.writer: Force filter resolution to list * swh.scheduler.cli: Change default archival period to current month * swh.scheduler.cli: Improve logging message * swh.scheduler.updater.backend: Adapt configuration path accordingly -- Antoine R. Dumont (@ardumont) Thu, 31 May 2018 11:42:51 +0200 swh-scheduler (0.0.28-1~swh1) unstable-swh; urgency=medium * v0.0.28 * Fix wrong runtime dependencies -- Antoine R. Dumont (@ardumont) Tue, 29 May 2018 14:12:15 +0200 swh-scheduler (0.0.27-1~swh1) unstable-swh; urgency=medium * v0.0.27 * scheduler: Deal with priority in tasks * scheduler-update: new package python3-swh.scheduler.updater * Contains tools in charge of consuming events from arbitrary sources * and update the scheduler db -- Antoine R. Dumont (@ardumont) Tue, 29 May 2018 12:27:34 +0200 swh-scheduler (0.0.26-1~swh1) unstable-swh; urgency=medium * v0.0.26 * swh.scheduler: Fix package build * swh.scheduler.tests: Test remote scheduler api as well * swh.scheduler: Add tests around removing archivable tasks * swh.scheduler: Add tests around filtering archivable tasks * swh-scheduler-schema: Fix unneeded drop instructions * swh.scheduler.cli: Improve docstring * swh.scheduler.cli: Permit to specify the backend to use in cli * swh.scheduler.api: Bootstrap scheduler's remote api * swh.scheduler: Use `get_scheduler` api to instantiate a scheduler * swh.scheduler.backend: Fix docstring -- Antoine R. Dumont (@ardumont) Thu, 26 Apr 2018 17:34:07 +0200 swh-scheduler (0.0.25-1~swh1) unstable-swh; urgency=medium * v0.0.25 * swh.scheduler.cli.archive: Index arguments.kwargs as text -- Antoine R. Dumont (@ardumont) Wed, 18 Apr 2018 12:34:43 +0200 swh-scheduler (0.0.24-1~swh1) unstable-swh; urgency=medium * v0.0.24 * data/template: Do not index the arguments field (it's in _source) * data/README: Add a small readme to explain es install step * swh.scheduler.cli: Add a bulk index flag to separate read from index -- Antoine R. Dumont (@ardumont) Fri, 13 Apr 2018 14:55:32 +0200 swh-scheduler (0.0.23-1~swh1) unstable-swh; urgency=medium * swh.scheduler.cli.archive: Delete only completely indexed tasks * Prior to this commit, it could happen that we removed tasks even * though we did not yet index associated task_run. * Related T986 -- Antoine R. Dumont (@ardumont) Tue, 10 Apr 2018 17:43:07 +0200 swh-scheduler (0.0.22-1~swh1) unstable-swh; urgency=medium * v0.0.22 * Update to a more recent python3-elasticsearch client -- Antoine R. Dumont (@ardumont) Mon, 09 Apr 2018 16:09:16 +0200 swh-scheduler (0.0.21-1~swh1) unstable-swh; urgency=medium * v0.0.21 * Adapt default configuration * Fix typo in configuration variable name -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 15:02:55 +0200 swh-scheduler (0.0.20-1~swh1) unstable-swh; urgency=medium * v0.0.20 * swh.scheduler.cli.archive: Open completed oneshot or disabled * recurring tasks archival endpoint * swh.core.serializer: Move to msgpack serialization format * swh.scheduler.cli: Unify pretty print output * sql/data: Add new task type for loading mercurial dump * swh.scheduler.cli: Add sample use case for the scheduling cli * swh.scheduler.cli: Open policy column to the scheduling cli * swh.scheduler.cli: Open the delimiter option as cli argument * Fix issue when updating task-type without any retry delay defined * swh-scheduler/data: Add new oneshot scheduling load-mercurial task * backend: fix default scheduling_db value for consistency * backend: doc: fix return value of create_tasks -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 11:44:18 +0200 swh-scheduler (0.0.19-1~swh1) unstable-swh; urgency=medium * v0.0.19 * swh.scheduler.utils: Open utility function to create oneshot task -- Antoine R. Dumont (@ardumont) Wed, 29 Nov 2017 12:51:15 +0100 swh-scheduler (0.0.18-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.18 * Celery 4 compatibility -- Nicolas Dandrimont Wed, 08 Nov 2017 17:06:22 +0100 swh-scheduler (0.0.17-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.17 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 18:49:02 +0200 swh-scheduler (0.0.16-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.16 * add some tests * implement one-shot tasks * implement retry on temporary failure -- Nicolas Dandrimont Mon, 07 Aug 2017 18:44:03 +0200 swh-scheduler (0.0.15-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.15 * Add some methods to get the length of task queues * worker: Show logs on stdout if loglevel = debug -- Nicolas Dandrimont Mon, 19 Jun 2017 19:44:56 +0200 swh-scheduler (0.0.14-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler 0.0.14 * Make the return value of tasks available in the listener -- Nicolas Dandrimont Mon, 12 Jun 2017 17:50:32 +0200 swh-scheduler (0.0.13-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.13 * Use systemd for logging rather than PostgreSQL -- Nicolas Dandrimont Fri, 07 Apr 2017 11:57:50 +0200 swh-scheduler (0.0.12-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.12 * Only log to database if the configuration is present -- Nicolas Dandrimont Thu, 09 Mar 2017 11:12:45 +0100 swh-scheduler (0.0.11-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.11 * add utils.get_task -- Nicolas Dandrimont Tue, 14 Feb 2017 19:49:34 +0100 swh-scheduler (0.0.10-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.10 * Allow disabling tasks -- Nicolas Dandrimont Thu, 20 Oct 2016 17:20:17 +0200 swh-scheduler (0.0.9-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.9 * Revert management of one shot tasks * Add possibility of launching several worker instances -- Nicolas Dandrimont Fri, 02 Sep 2016 17:09:18 +0200 swh-scheduler (0.0.7-1~swh1) unstable-swh; urgency=medium * v0.0.7 * Add oneshot task -- Antoine R. Dumont (@ardumont) Fri, 01 Jul 2016 16:42:45 +0200 swh-scheduler (0.0.6-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.6 * More reliability and efficiency when scheduling a lot of tasks -- Nicolas Dandrimont Wed, 24 Feb 2016 18:46:57 +0100 swh-scheduler (0.0.5-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.5 * Use copy for task mass-scheduling -- Nicolas Dandrimont Wed, 24 Feb 2016 12:13:38 +0100 swh-scheduler (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.4 * general cleanup of the backend * use arrow instead of dateutil * add new cli program -- Nicolas Dandrimont Tue, 23 Feb 2016 17:46:04 +0100 swh-scheduler (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.3 * Implement the timestamp arguments to the task_run functions * Make the celery event listener use a reliable queue -- Nicolas Dandrimont Mon, 22 Feb 2016 15:14:28 +0100 swh-scheduler (0.0.2-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.2 * Multiple schema changes * Initial releases for the celery job runner and the event listener -- Nicolas Dandrimont Fri, 19 Feb 2016 18:50:47 +0100 swh-scheduler (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * Release swh.scheduler v0.0.1 * Move swh.core.scheduling and swh.core.worker to swh.scheduler -- Nicolas Dandrimont Mon, 15 Feb 2016 11:07:30 +0100 diff --git a/requirements-test.txt b/requirements-test.txt index 726caf0..2cc88ed 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,11 +1,10 @@ pytest pytest-mock -celery >= 4.3 hypothesis >= 3.11.0 swh.lister swh.storage[testing] types-click types-flask types-pyyaml types-requests types-Deprecated diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 4b1a7af..c9ad4f8 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,33 +1,33 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 1.2.3 +Version: 1.4.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator License-File: LICENSE License-File: LICENSE.Celery License-File: AUTHORS swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index 7532bff..caa4ae1 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,134 +1,137 @@ .git-blame-ignore-revs .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE LICENSE.Celery MANIFEST.in Makefile README.md conftest.py mypy.ini pyproject.toml pytest.ini requirements-journal.txt requirements-simulator.txt requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini data/README.md data/elastic-template.json data/update-index-settings.json docs/.gitignore docs/Makefile docs/cli.rst docs/conf.py docs/index.rst docs/simulator.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/cli_utils.py swh/scheduler/exc.py swh/scheduler/interface.py swh/scheduler/journal_client.py swh/scheduler/model.py swh/scheduler/py.typed swh/scheduler/pytest_plugin.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/serializers.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/pika_listener.py swh/scheduler/celery_backend/recurrent_visits.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py +swh/scheduler/cli/add_forge_now.py swh/scheduler/cli/admin.py swh/scheduler/cli/celery_monitor.py swh/scheduler/cli/journal.py swh/scheduler/cli/origin.py swh/scheduler/cli/simulator.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py +swh/scheduler/cli/test_cli_utils.py swh/scheduler/cli/utils.py swh/scheduler/simulator/__init__.py swh/scheduler/simulator/common.py swh/scheduler/simulator/origin_scheduler.py swh/scheduler/simulator/origins.py swh/scheduler/simulator/task_scheduler.py swh/scheduler/sql/10-superuser-init.sql swh/scheduler/sql/30-schema.sql swh/scheduler/sql/40-func.sql swh/scheduler/sql/50-data.sql swh/scheduler/sql/60-indexes.sql swh/scheduler/sql/upgrades/02.sql swh/scheduler/sql/upgrades/03.sql swh/scheduler/sql/upgrades/04.sql swh/scheduler/sql/upgrades/05.sql swh/scheduler/sql/upgrades/06.sql swh/scheduler/sql/upgrades/07.sql swh/scheduler/sql/upgrades/08.sql swh/scheduler/sql/upgrades/09.sql swh/scheduler/sql/upgrades/10.sql swh/scheduler/sql/upgrades/11.sql swh/scheduler/sql/upgrades/12.sql swh/scheduler/sql/upgrades/13.sql swh/scheduler/sql/upgrades/14.sql swh/scheduler/sql/upgrades/15.sql swh/scheduler/sql/upgrades/16.sql swh/scheduler/sql/upgrades/17.sql swh/scheduler/sql/upgrades/18.sql swh/scheduler/sql/upgrades/19.sql swh/scheduler/sql/upgrades/20.sql swh/scheduler/sql/upgrades/23.sql swh/scheduler/sql/upgrades/24.sql swh/scheduler/sql/upgrades/25.sql swh/scheduler/sql/upgrades/26.sql swh/scheduler/sql/upgrades/27.sql swh/scheduler/sql/upgrades/28.sql swh/scheduler/sql/upgrades/29.sql swh/scheduler/sql/upgrades/30-bis.sql swh/scheduler/sql/upgrades/30.sql swh/scheduler/sql/upgrades/31.sql swh/scheduler/sql/upgrades/32.sql swh/scheduler/sql/upgrades/33.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/common.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py +swh/scheduler/tests/test_cli_add_forge_now.py swh/scheduler/tests/test_cli_celery_monitor.py swh/scheduler/tests/test_cli_journal.py swh/scheduler/tests/test_cli_origin.py swh/scheduler/tests/test_cli_task_type.py swh/scheduler/tests/test_common.py swh/scheduler/tests/test_config.py swh/scheduler/tests/test_init.py swh/scheduler/tests/test_journal_client.py swh/scheduler/tests/test_model.py swh/scheduler/tests/test_recurrent_visits.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_simulator.py swh/scheduler/tests/test_utils.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index 51cc624..49de3b2 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,39 +1,38 @@ attrs attrs-strict celery!=5.0.3,>=4.3 click flask humanize importlib_metadata<5.0 pika>=1.1.0 psycopg2 pyyaml requests sentry-sdk setuptools typing-extensions swh.core[db,http]>=2.9 swh.storage>=0.11.1 [journal] swh.journal [simulator] plotille simpy<4,>=3 [testing] pytest pytest-mock -celery>=4.3 hypothesis>=3.11.0 swh.lister swh.storage[testing] types-click types-flask types-pyyaml types-requests types-Deprecated swh.journal plotille simpy<4,>=3 diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index f0df1ba..9862cdf 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,1146 +1,1175 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID import attr from psycopg2.errors import CardinalityViolation from psycopg2.extensions import AsIs import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.scheduler.utils import utcnow from .exc import SchedulerException, StaleData, UnknownPolicy from .interface import ListedOriginPageToken, PaginatedListedOriginList from .model import ( LastVisitStatus, ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics, ) logger = logging.getLogger(__name__) def adapt_LastVisitStatus(v: LastVisitStatus): return AsIs(f"'{v.value}'::last_visit_status") psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(LastVisitStatus, adapt_LastVisitStatus) psycopg2.extras.register_uuid() def format_query(query, keys): """Format a query with the given keys""" query_keys = ", ".join(keys) placeholders = ", ".join(["%s"] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) class SchedulerBackend: """Backend for the Software Heritage scheduling database.""" current_version = 33 def __init__(self, db, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = BaseDb(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() task_type_keys = [ "type", "description", "backend_name", "default_interval", "min_interval", "max_interval", "backoff_factor", "max_queue_length", "num_retries", "retry_delay", ] @db_transaction() def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] query = format_query( """insert into task_type ({keys}) values ({placeholders}) on conflict do nothing""", keys, ) cur.execute(query, [task_type[key] for key in keys]) @db_transaction() def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" query = format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cur.execute(query, (task_type_name,)) return cur.fetchone() @db_transaction() def get_task_types(self, db=None, cur=None): """Retrieve all registered task types""" query = format_query( "select {keys} from task_type", self.task_type_keys, ) cur.execute(query) return cur.fetchall() @db_transaction() def get_listers(self, db=None, cur=None) -> List[Lister]: """Retrieve information about all listers from the database.""" select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers """ cur.execute(query) return [Lister(**ret) for ret in cur.fetchall()] @db_transaction() def get_listers_by_id( self, lister_ids: List[str], db=None, cur=None ) -> List[Lister]: """Retrieve listers in batch, using their UUID""" select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers where id in %s """ if not lister_ids: return [] cur.execute(query, (tuple(lister_ids),)) return [Lister(**row) for row in cur] @db_transaction() def get_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Optional[Lister]: """Retrieve information about the given instance of the lister from the database. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) query = f""" select {select_cols} from listers where (name, instance_name) = (%s, %s) """ cur.execute(query, (name, instance_name)) ret = cur.fetchone() if not ret: return None return Lister(**ret) @db_transaction() def get_or_create_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) insert_cols, insert_meta = ( ", ".join(tup) for tup in Lister.insert_columns_and_metavars() ) query = f""" with added as ( insert into listers ({insert_cols}) values ({insert_meta}) on conflict do nothing returning {select_cols} ) select {select_cols} from added union all select {select_cols} from listers where (name, instance_name) = (%(name)s, %(instance_name)s); """ cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name))) return Lister(**cur.fetchone()) @db_transaction() def update_lister(self, lister: Lister, db=None, cur=None) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ select_cols = ", ".join(Lister.select_columns()) set_vars = ", ".join( f"{col} = {meta}" for col, meta in zip(*Lister.insert_columns_and_metavars()) ) query = f"""update listers set {set_vars} where id=%(id)s and updated=%(updated)s returning {select_cols}""" cur.execute(query, attr.asdict(lister)) updated = cur.fetchone() if not updated: raise StaleData("Stale data; Lister state not updated") return Lister(**updated) @db_transaction() def record_listed_origins( self, listed_origins: Iterable[ListedOrigin], db=None, cur=None ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ pk_cols = ListedOrigin.primary_key_columns() select_cols = ListedOrigin.select_columns() insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars() deduplicated_origins = { tuple(getattr(origin, k) for k in pk_cols): origin for origin in listed_origins } upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols) query = f"""INSERT into listed_origins ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} RETURNING {", ".join(select_cols)} """ ret = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=(attr.asdict(origin) for origin in deduplicated_origins.values()), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=True, ) return [ListedOrigin(**d) for d in ret] @db_transaction() def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, enabled: Optional[bool] = True, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, db=None, cur=None, ) -> PaginatedListedOriginList: """Get information on the listed origins matching either the `url` or `lister_id`, or both arguments. """ query_filters: List[str] = [] query_params: List[Union[int, str, UUID, Tuple[UUID, str]]] = [] if lister_id: query_filters.append("lister_id = %s") query_params.append(lister_id) if url is not None: query_filters.append("url = %s") query_params.append(url) if enabled is not None: query_filters.append("enabled = %s") query_params.append(enabled) if page_token is not None: query_filters.append("(lister_id, url) > %s") # the typeshed annotation for tuple() is too strict. query_params.append(tuple(page_token)) # type: ignore query_params.append(limit) select_cols = ", ".join(ListedOrigin.select_columns()) if query_filters: where_clause = "where %s" % (" and ".join(query_filters)) else: where_clause = "" query = f"""SELECT {select_cols} from listed_origins {where_clause} ORDER BY lister_id, url LIMIT %s""" cur.execute(query, tuple(query_params)) origins = [ListedOrigin(**d) for d in cur] if len(origins) == limit: page_token = (str(origins[-1].lister_id), origins[-1].url) else: page_token = None return PaginatedListedOriginList(origins, page_token) @db_transaction() def grab_next_visits( self, visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: Optional[str] = None, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, + absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12), scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, db=None, cur=None, ) -> List[ListedOrigin]: if timestamp is None: timestamp = utcnow() origin_select_cols = ", ".join(ListedOrigin.select_columns()) + joins: Dict[str, str] = { + "origin_visit_stats": "USING (url, visit_type)", + } + query_args: List[Any] = [] where_clauses = [] # list of (name, query) handled as CTEs before the main query common_table_expressions: List[Tuple[str, str]] = [] # "NOT enabled" = the lister said the origin no longer exists where_clauses.append("enabled" if enabled else "not enabled") # Only schedule visits of the given type where_clauses.append("visit_type = %s") query_args.append(visit_type) + if absolute_cooldown: + # Don't schedule visits if they've been scheduled since the absolute cooldown + where_clauses.append( + """origin_visit_stats.last_scheduled IS NULL + OR origin_visit_stats.last_scheduled < %s + """ + ) + query_args.append(timestamp - absolute_cooldown) + if scheduled_cooldown: # Don't re-schedule visits if they're already scheduled but we haven't # recorded a result yet, unless they've been scheduled more than a week # ago (it probably means we've lost them in flight somewhere). where_clauses.append( """origin_visit_stats.last_scheduled IS NULL OR origin_visit_stats.last_scheduled < GREATEST( %s, origin_visit_stats.last_visit ) """ ) query_args.append(timestamp - scheduled_cooldown) if failed_cooldown: # Don't retry failed origins too often where_clauses.append( "origin_visit_stats.last_visit_status is distinct from 'failed' " "or origin_visit_stats.last_visit < %s" ) query_args.append(timestamp - failed_cooldown) if not_found_cooldown: # Don't retry not found origins too often where_clauses.append( "origin_visit_stats.last_visit_status is distinct from 'not_found' " "or origin_visit_stats.last_visit < %s" ) query_args.append(timestamp - not_found_cooldown) if policy == "oldest_scheduled_first": order_by = "origin_visit_stats.last_scheduled NULLS FIRST" elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NULL") # order by increasing last_update (oldest first) where_clauses.append("listed_origins.last_update IS NOT NULL") order_by = "listed_origins.last_update" elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? # visited origins have a NOT NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NOT NULL") # ignore origins we have visited after the known last update where_clauses.append("listed_origins.last_update IS NOT NULL") where_clauses.append( "listed_origins.last_update > origin_visit_stats.last_successful" ) # order by decreasing visit lag order_by = ( "listed_origins.last_update - origin_visit_stats.last_successful DESC" ) elif policy == "origins_without_last_update": where_clauses.append("last_update IS NULL") order_by = ", ".join( [ # By default, sort using the queue position. If the queue # position is null, then the origin has never been visited, # which we want to handle first "origin_visit_stats.next_visit_queue_position nulls first", # Schedule unknown origins in the order we've seen them "listed_origins.first_seen", ] ) # fmt: off # This policy requires updating the global queue position for this # visit type common_table_expressions.append(("update_queue_position", """ INSERT INTO visit_scheduler_queue_position(visit_type, position) SELECT visit_type, COALESCE(MAX(next_visit_queue_position), 0) FROM selected_origins GROUP BY visit_type ON CONFLICT(visit_type) DO UPDATE SET position=GREATEST( visit_scheduler_queue_position.position, EXCLUDED.position ) """)) # fmt: on else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") if tablesample: table = "listed_origins tablesample SYSTEM (%s)" query_args.insert(0, tablesample) else: table = "listed_origins" if lister_uuid: where_clauses.append("lister_id = %s") query_args.append(lister_uuid) + if lister_name: + joins["listers"] = "on listed_origins.lister_id=listers.id" + where_clauses.append("listers.name = %s") + query_args.append(lister_name) + + if lister_instance_name: + joins["listers"] = "on listed_origins.lister_id=listers.id" + where_clauses.append("listers.instance_name = %s") + query_args.append(lister_instance_name) + + join_clause = "\n".join( + f"left join {table} {clause}" for table, clause in joins.items() + ) + # fmt: off common_table_expressions.insert(0, ("selected_origins", f""" SELECT {origin_select_cols}, next_visit_queue_position FROM {table} - LEFT JOIN - origin_visit_stats USING (url, visit_type) + {join_clause} WHERE ({") AND (".join(where_clauses)}) ORDER BY {order_by} LIMIT %s """)) # fmt: on query_args.append(count) # fmt: off common_table_expressions.append(("deduplicated_selected_origins", """ SELECT DISTINCT url, visit_type FROM selected_origins """)) # fmt: on # fmt: off common_table_expressions.append(("update_stats", """ INSERT INTO origin_visit_stats (url, visit_type, last_scheduled) SELECT url, visit_type, %s FROM deduplicated_selected_origins ON CONFLICT (url, visit_type) DO UPDATE SET last_scheduled = GREATEST( origin_visit_stats.last_scheduled, EXCLUDED.last_scheduled ) """)) # fmt: on query_args.append(timestamp) formatted_ctes = ",\n".join( f"{name} AS (\n{cte}\n)" for name, cte in common_table_expressions ) query = f""" WITH {formatted_ctes} SELECT {origin_select_cols} FROM selected_origins """ cur.execute(query, tuple(query_args)) return [ListedOrigin(**d) for d in cur] task_create_keys = [ "type", "arguments", "next_run", "policy", "status", "retries_left", "priority", ] task_keys = task_create_keys + ["id", "current_interval"] @db_transaction() def create_tasks(self, tasks, policy="recurring", db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cur.execute("select swh_scheduler_mktemp_task()") db.copy_to( tasks, "tmp_task", self.task_create_keys, default_values={"policy": policy, "status": "next_run_not_scheduled"}, cur=cur, ) query = format_query( "select {keys} from swh_scheduler_create_tasks_from_temp()", self.task_keys, ) cur.execute(query) return cur.fetchall() @db_transaction() def set_status_tasks( self, task_ids: List[int], status: str = "disabled", next_run: Optional[datetime.datetime] = None, db=None, cur=None, ): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args: List[Any] = [status] if next_run: query.append(", next_run = %s") args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) cur.execute("".join(query), args) @db_transaction() def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" return self.set_status_tasks(task_ids, db=db, cur=cur) @db_transaction() def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, db=None, cur=None, ): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append("id = %s") else: where.append("id in %s") task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append("type = %s") else: where.append("type in %s") task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append("status = %s") else: where.append("status in %s") status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append("priority = %s") else: priority = tuple(priority) where.append("priority in %s") args.append(priority) if policy: where.append("policy = %s") args.append(policy) if before: where.append("next_run <= %s") args.append(before) if after: where.append("next_run >= %s") args.append(after) query = "select * from task" if where: query += " where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" query = format_query("select {keys} from task where id in %s", self.task_keys) cur.execute(query, (tuple(task_ids),)) return cur.fetchall() @db_transaction() def peek_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_no_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def grab_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def peek_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_any_ready_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("PEEK %s => %s", task_type, cur.rowcount) return cur.fetchall() @db_transaction() def grab_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, db=None, cur=None, ) -> List[Dict]: if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_any_ready_priority_tasks( %s, %s, %s :: bigint)""", (task_type, timestamp, num_tasks), ) logger.debug("GRAB %s => %s", task_type, cur.rowcount) return cur.fetchall() task_run_create_keys = ["task", "backend_id", "scheduled", "metadata"] @db_transaction() def schedule_task_run( self, task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)", (task_id, backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cur.execute("select swh_scheduler_mktemp_task_run()") db.copy_to(task_runs, "tmp_task_run", self.task_run_create_keys, cur=cur) cur.execute("select swh_scheduler_schedule_task_run_from_temp()") @db_transaction() def start_task_run( self, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_start_task_run(%s, %s, %s)", (backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, db=None, cur=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_end_task_run(%s, %s, %s, %s)", (backend_id, status, metadata, timestamp), ) return cur.fetchone() @db_transaction() def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ assert not page_token or isinstance(page_token, str) last_id = -1 if page_token is None else int(page_token) tasks = [] cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit + 1), ) for row in cur: task = dict(row) # nested type index does not accept bare values # transform it as a dict to comply with this task["arguments"]["args"] = { i: v for i, v in enumerate(task["arguments"]["args"]) } kwargs = task["arguments"]["kwargs"] task["arguments"]["kwargs"] = json.dumps(kwargs) tasks.append(task) if len(tasks) >= limit + 1: # remains data, add pagination information result = { "tasks": tasks[:limit], "next_page_token": str(tasks[-1]["task_id"]), } else: result = {"tasks": tasks} return result @db_transaction() def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id["task_id"]) _task_run_ids.append(task_id["task_run_id"]) cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids), ) task_run_keys = [ "id", "task", "backend_id", "scheduled", "started", "ended", "metadata", "status", ] @db_transaction() def get_task_runs(self, task_ids, limit=None, db=None, cur=None): """Search task run for a task id""" where = [] args = [] if task_ids: if isinstance(task_ids, (str, int)): where.append("task = %s") else: where.append("task in %s") task_ids = tuple(task_ids) args.append(task_ids) else: return () query = "select * from task_run where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None ) -> None: pk_cols = OriginVisitStats.primary_key_columns() insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join( f"{col} = coalesce(EXCLUDED.{col}, ovi.{col})" for col in upsert_cols ) query = f""" INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} """ try: psycopg2.extras.execute_values( cur=cur, sql=query, argslist=( attr.asdict(visit_stats) for visit_stats in origin_visit_stats ), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=False, ) except CardinalityViolation as e: raise SchedulerException(repr(e)) @db_transaction() def origin_visit_stats_get( self, ids: Iterable[Tuple[str, str]], db=None, cur=None ) -> List[OriginVisitStats]: if not ids: return [] primary_keys = tuple((origin, visit_type) for (origin, visit_type) in ids) query = format_query( """ SELECT {keys} FROM (VALUES %s) as stats(url, visit_type) INNER JOIN origin_visit_stats USING (url, visit_type) """, OriginVisitStats.select_columns(), ) rows = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=primary_keys, fetch=True ) return [OriginVisitStats(**row) for row in rows] @db_transaction() def visit_scheduler_queue_position_get(self, db=None, cur=None) -> Dict[str, int]: cur.execute("SELECT visit_type, position FROM visit_scheduler_queue_position") return {row["visit_type"]: row["position"] for row in cur} @db_transaction() def visit_scheduler_queue_position_set( self, visit_type: str, position: int, db=None, cur=None, ) -> None: query = """ INSERT INTO visit_scheduler_queue_position(visit_type, position) VALUES(%s, %s) ON CONFLICT(visit_type) DO UPDATE SET position=EXCLUDED.position """ cur.execute(query, (visit_type, position)) @db_transaction() def update_metrics( self, lister_id: Optional[UUID] = None, timestamp: Optional[datetime.datetime] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: """Update the performance metrics of this scheduler instance. Returns the updated metrics. Args: lister_id: if passed, update the metrics only for this lister instance timestamp: if passed, the date at which we're updating the metrics, defaults to the database NOW() """ query = format_query( "SELECT {keys} FROM update_metrics(%s, %s)", SchedulerMetrics.select_columns(), ) cur.execute(query, (lister_id, timestamp)) return [SchedulerMetrics(**row) for row in cur.fetchall()] @db_transaction() def get_metrics( self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: """Retrieve the performance metrics of this scheduler instance. Args: lister_id: filter the metrics for this lister instance only visit_type: filter the metrics for this visit type only """ where_filters = [] where_args = [] if lister_id: where_filters.append("lister_id = %s") where_args.append(str(lister_id)) if visit_type: where_filters.append("visit_type = %s") where_args.append(visit_type) where_clause = "" if where_filters: where_clause = f"where {' and '.join(where_filters)}" query = format_query( "SELECT {keys} FROM scheduler_metrics %s" % where_clause, SchedulerMetrics.select_columns(), ) cur.execute(query, tuple(where_args)) return [SchedulerMetrics(**row) for row in cur.fetchall()] diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py index b7c2315..6b8d8e5 100644 --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -1,368 +1,368 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import os from time import monotonic as _monotonic import traceback from typing import Any, Dict, Optional import urllib.parse from celery import Celery from celery.signals import celeryd_after_setup, setup_logging, worker_init from celery.utils.log import ColorFormatter from celery.worker.control import Panel from kombu import Exchange, Queue import pkg_resources import requests from swh.core.config import load_named_config, merge_configs from swh.core.sentry import init_sentry from swh.scheduler import CONFIG as SWH_CONFIG try: from swh.core.logger import JournalHandler except ImportError: JournalHandler = None # type: ignore DEFAULT_CONFIG_NAME = "worker" CONFIG_NAME_ENVVAR = "SWH_WORKER_INSTANCE" CONFIG_NAME_TEMPLATE = "worker/%s" DEFAULT_CONFIG = { "task_broker": ("str", "amqp://guest@localhost//"), "task_modules": ("list[str]", []), "task_queues": ("list[str]", []), "task_soft_time_limit": ("int", 0), } logger = logging.getLogger(__name__) # Celery eats tracebacks in signal callbacks, this decorator catches # and prints them. # Also tries to notify Sentry if possible. def _print_errors(f): @functools.wraps(f) def newf(*args, **kwargs): try: return f(*args, **kwargs) except Exception as exc: traceback.print_exc() try: import sentry_sdk sentry_sdk.capture_exception(exc) except Exception: traceback.print_exc() return newf @setup_logging.connect @_print_errors def setup_log_handler( loglevel=None, logfile=None, format=None, colorize=None, log_console=None, log_journal=None, **kwargs, ): """Setup logging according to Software Heritage preferences. We use the command-line loglevel for tasks only, as we never really care about the debug messages from celery. """ if loglevel is None: loglevel = logging.DEBUG if isinstance(loglevel, str): loglevel = logging._nameToLevel[loglevel] formatter = logging.Formatter(format) root_logger = logging.getLogger("") root_logger.setLevel(logging.INFO) log_target = os.environ.get("SWH_LOG_TARGET", "console") if log_target == "console": log_console = True elif log_target == "journal": log_journal = True # this looks for log levels *higher* than DEBUG if loglevel <= logging.DEBUG and log_console is None: log_console = True if log_console: color_formatter = ColorFormatter(format) if colorize else formatter console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(color_formatter) root_logger.addHandler(console) if log_journal: if not JournalHandler: root_logger.warning( "JournalHandler is not available, skipping. " "Please install swh-core[logging]." ) else: systemd_journal = JournalHandler() systemd_journal.setLevel(logging.DEBUG) systemd_journal.setFormatter(formatter) root_logger.addHandler(systemd_journal) logging.getLogger("celery").setLevel(logging.INFO) # Silence amqp heartbeat_tick messages logger = logging.getLogger("amqp") logger.addFilter(lambda record: not record.msg.startswith("heartbeat_tick")) logger.setLevel(loglevel) # Silence useless "Starting new HTTP connection" messages logging.getLogger("urllib3").setLevel(logging.WARNING) # Completely disable azure logspam azure_logger = logging.getLogger("azure.core.pipeline.policies.http_logging_policy") azure_logger.setLevel(logging.WARNING) logging.getLogger("swh").setLevel(loglevel) # get_task_logger makes the swh tasks loggers children of celery.task logging.getLogger("celery.task").setLevel(loglevel) return loglevel @celeryd_after_setup.connect @_print_errors def setup_queues_and_tasks(sender, instance, **kwargs): """Signal called on worker start. This automatically registers swh.scheduler.task.Task subclasses as available celery tasks. This also subscribes the worker to the "implicit" per-task queues defined for these task classes. """ logger.info("Setup Queues & Tasks for %s", sender) instance.app.conf["worker_name"] = sender @worker_init.connect @_print_errors def on_worker_init(*args, **kwargs): try: from sentry_sdk.integrations.celery import CeleryIntegration except ImportError: integrations = [] else: integrations = [CeleryIntegration()] sentry_dsn = None # will be set in `init_sentry` function init_sentry(sentry_dsn, integrations=integrations) @Panel.register def monotonic(state): """Get the current value for the monotonic clock""" return {"monotonic": _monotonic()} def route_for_task(name, args, kwargs, options, task=None, **kw): """Route tasks according to the task_queue attribute in the task class""" if name is not None and name.startswith("swh."): return {"queue": name} def get_queue_stats(app, queue_name): """Get the statistics regarding a queue on the broker. Arguments: queue_name: name of the queue to check Returns a dictionary raw from the RabbitMQ management API; or `None` if the current configuration does not use RabbitMQ. Interesting keys: - Consumers (number of consumers for the queue) - messages (number of messages in queue) - messages_unacknowledged (number of messages currently being processed) Documentation: https://www.rabbitmq.com/management.html#http-api """ conn_info = app.connection().info() if conn_info["transport"] == "memory": # We're running in a test environment, without RabbitMQ. return None url = "http://{hostname}:{port}/api/queues/{vhost}/{queue}".format( hostname=conn_info["hostname"], port=conn_info["port"] + 10000, vhost=urllib.parse.quote(conn_info["virtual_host"], safe=""), queue=urllib.parse.quote(queue_name, safe=""), ) credentials = (conn_info["userid"], conn_info["password"]) r = requests.get(url, auth=credentials) if r.status_code == 404: return {} if r.status_code != 200: raise ValueError( "Got error %s when reading queue stats: %s" % (r.status_code, r.json()) ) return r.json() def get_queue_length(app, queue_name): """Shortcut to get a queue's length""" stats = get_queue_stats(app, queue_name) if stats: return stats.get("messages") MAX_NUM_TASKS = 10000 def get_available_slots(app, queue_name: str, max_length: Optional[int]): """Get the number of tasks that can be sent to `queue_name`, when the queue is limited to `max_length`. Returns: The number of available slots in the queue. That result should be positive. """ if not max_length: return MAX_NUM_TASKS try: queue_length = get_queue_length(app, queue_name) # Clamp the return value to MAX_NUM_TASKS max_val = max(0, min(max_length - queue_length, MAX_NUM_TASKS)) except (ValueError, TypeError): # Unknown queue length, just schedule all the tasks max_val = MAX_NUM_TASKS return max_val def register_task_class(app, name, cls): """Register a class-based task under the given name""" if name in app.tasks: return task_instance = cls() task_instance.name = name app.register_task(task_instance) INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) CONFIG_NAME = os.environ.get("SWH_CONFIG_FILENAME") CONFIG = {} # type: Dict[str, Any] if CONFIG_NAME: # load the celery config from the main config file given as # SWH_CONFIG_FILENAME environment variable. # This is expected to have a [celery] section in which we have the # celery specific configuration. SWH_CONFIG.clear() SWH_CONFIG.update(load_named_config(CONFIG_NAME)) CONFIG = SWH_CONFIG.get("celery", {}) if not CONFIG: # otherwise, back to compat config loading mechanism if INSTANCE_NAME: CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME else: CONFIG_NAME = DEFAULT_CONFIG_NAME # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) CONFIG.setdefault("task_modules", []) # load tasks modules declared as plugin entry points for entrypoint in pkg_resources.iter_entry_points("swh.workers"): worker_registrer_fn = entrypoint.load() # The registry function is expected to return a dict which the 'tasks' key # is a string (or a list of strings) with the name of the python module in # which celery tasks are defined. task_modules = worker_registrer_fn().get("task_modules", []) CONFIG["task_modules"].extend(task_modules) # Celery Queues CELERY_QUEUES = [Queue("celery", Exchange("celery"), routing_key="celery")] CELERY_DEFAULT_CONFIG = dict( # Timezone configuration: all in UTC enable_utc=True, timezone="UTC", # Imported modules imports=CONFIG.get("task_modules", []), # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. result_expires=None, # A string identifying the default serialization method to use. Can # be json (default), pickle, yaml, msgpack, or any custom # serialization methods that have been registered with - task_serializer="msgpack", + task_serializer="json", # Result serialization format - result_serializer="msgpack", + result_serializer="json", # Acknowledge tasks as soon as they're received. We can do this as we have # external monitoring to decide if we need to retry tasks. task_acks_late=False, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry accept_content=["msgpack", "json"], # If True the task will report its status as “started” # when the task is executed by a worker. task_track_started=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # result_compression='bzip2', # task_compression='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) worker_disable_rate_limits=True, # Task routing task_routes=route_for_task, # Allow pool restarts from remote worker_pool_restarts=True, # Do not prefetch tasks worker_prefetch_multiplier=1, # Send events worker_send_task_events=True, # Do not send useless task_sent events task_send_sent_event=False, ) def build_app(config=None): config = merge_configs( {k: v for (k, (_, v)) in DEFAULT_CONFIG.items()}, config or {} ) config["task_queues"] = CELERY_QUEUES + [ Queue(queue, Exchange(queue), routing_key=queue) for queue in config.get("task_queues", ()) ] logger.debug("Creating a Celery app with %s", config) # Instantiate the Celery app app = Celery(broker=config["task_broker"], task_cls="swh.scheduler.task:SWHTask") app.add_defaults(CELERY_DEFAULT_CONFIG) app.add_defaults(config) return app app = build_app(CONFIG) # XXX for BW compat Celery.get_queue_length = get_queue_length diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py index 62389f1..005fb9f 100644 --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -1,369 +1,376 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This schedules the recurrent visits, for listed origins, in Celery. For "oneshot" (save code now, lister) tasks, check the :mod:`swh.scheduler.celery_backend.runner` and :mod:`swh.scheduler.celery_backend.pika_listener` modules. """ from __future__ import annotations from itertools import chain import logging from queue import Empty, Queue import random from threading import Thread import time from typing import TYPE_CHECKING, Any, Dict, List, Tuple from kombu.utils.uuid import uuid from swh.scheduler.celery_backend.config import get_available_slots from swh.scheduler.utils import create_origin_task_dicts if TYPE_CHECKING: from ..interface import SchedulerInterface from ..model import ListedOrigin logger = logging.getLogger(__name__) DEFAULT_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 50}, {"policy": "never_visited_oldest_update_first", "weight": 50}, ] DEFAULT_DVCS_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 49}, {"policy": "never_visited_oldest_update_first", "weight": 49}, {"policy": "origins_without_last_update", "weight": 2}, ] DEFAULT_GIT_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 49, "tablesample": 0.1}, {"policy": "never_visited_oldest_update_first", "weight": 49, "tablesample": 0.1}, {"policy": "origins_without_last_update", "weight": 2, "tablesample": 0.1}, ] DEFAULT_POLICY_CONFIG: Dict[str, List[Dict[str, Any]]] = { "default": DEFAULT_POLICY, "hg": DEFAULT_DVCS_POLICY, "svn": DEFAULT_DVCS_POLICY, "cvs": DEFAULT_DVCS_POLICY, "bzr": DEFAULT_DVCS_POLICY, "git": DEFAULT_GIT_POLICY, } """Scheduling policies to use to retrieve visits for the given visit types, with their relative weights""" MIN_SLOTS_RATIO = 0.05 """Quantity of slots that need to be available (with respect to max_queue_length) for :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" QUEUE_FULL_BACKOFF = 60 """Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue.""" DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 """Backoff time (in seconds) if no origins have been scheduled in the current iteration""" BACKOFF_SPLAY = 5.0 """Amplitude of the fuzziness between backoffs""" TERMINATE = object() """Termination request received from command queue (singleton used for identity comparison)""" def grab_next_visits_policy_weights( scheduler: SchedulerInterface, visit_type: str, num_visits: int, policy_cfg: List[Dict[str, Any]], ) -> List[ListedOrigin]: """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding set of scheduling policies. The :py:data:`POLICY_CFG` list sets, for the current visit type, the scheduling policies used to pull the next tasks. Each policy config entry in the list should at least have a 'policy' (policy name) and a 'weight' entry. For each policy in this policy_cfg list, the number of returned origins to visit will be weighted using this weight config option so that the total number of returned origins is around num_visits. Any other key/value entry in the policy configuration will be passed to the scheduler.grab_next_visit() method. This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%. Returns: at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects """ policies = [cfg["policy"] for cfg in policy_cfg] if len(set(policies)) != len(policies): raise ValueError( "A policy weights can only appear once; check your policy " f"configuration for visit type {visit_type}" ) weights = [cfg["weight"] for cfg in policy_cfg] total_weight = sum(weights) if not total_weight: raise ValueError(f"No policy weights set for visit type {visit_type}") ratios = [weight / total_weight for weight in weights] extra_kws = [ {k: v for k, v in cfg.items() if k not in ("weight", "policy")} for cfg in policy_cfg ] fetched_origins: Dict[str, List[ListedOrigin]] = {} for policy, ratio, extra_kw in zip(policies, ratios, extra_kws): num_tasks_to_send = int(num_visits * ratio) fetched_origins[policy] = scheduler.grab_next_visits( visit_type, num_tasks_to_send, policy=policy, **extra_kw, ) all_origins: List[ListedOrigin] = list( chain.from_iterable(fetched_origins.values()) ) if not all_origins: return [] # Check whether the ratios of origins fetched are skewed with respect to the # ones we requested fetched_origin_ratios = { policy: len(origins) / len(all_origins) for policy, origins in fetched_origins.items() } for policy, expected_ratio in zip(policies, ratios): # 5% of skew with respect to request if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: logger.info( "Skewed fetch for visit type %s with policy %s: fetched %s, " "requested %s", visit_type, policy, fetched_origin_ratios[policy], expected_ratio, ) return all_origins def splay(): """Return a random short interval by which to vary the backoffs for the visit scheduling threads""" return random.uniform(0, BACKOFF_SPLAY) def send_visits_for_visit_type( scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, policy_cfg: List[Dict[str, Any]], no_origins_scheduled_backoff: int = DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF, ) -> float: """Schedule the next batch of visits for the given ``visit_type``. First, we determine the number of available slots by introspecting the RabbitMQ queue. If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when there's not many jobs to queue. Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run :py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits to schedule, and we send them to celery. If the last scheduling attempt didn't return any origins, we sleep by default for :py:data:`DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too often if there's nothing left to schedule. The :py:data:`POLICY_CFG` argument is the policy configuration used to choose the next origins to visit. It is passed directly to the :py:func:`grab_next_visits_policy_weights()` function. Returns: the earliest :py:func:`time.monotonic` value at which to run the next iteration of the loop. """ queue_name = task_type["backend_name"] max_queue_length = task_type.get("max_queue_length") or 0 min_available_slots = max_queue_length * MIN_SLOTS_RATIO current_iteration_start = time.monotonic() # Check queue level available_slots = get_available_slots(app, queue_name, max_queue_length) logger.debug( "%s available slots for visit type %s in queue %s", available_slots, visit_type, queue_name, ) if available_slots < min_available_slots: return current_iteration_start + QUEUE_FULL_BACKOFF origins = grab_next_visits_policy_weights( scheduler, visit_type, available_slots, policy_cfg ) if not origins: logger.debug("No origins to visit for type %s", visit_type) return current_iteration_start + no_origins_scheduled_backoff # Try to smooth the ingestion load, origins pulled by different # scheduling policies have different resource usage patterns random.shuffle(origins) for task_dict in create_origin_task_dicts(origins, scheduler): app.send_task( queue_name, task_id=uuid(), args=task_dict["arguments"]["args"], kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, ) logger.info( "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, ) # When everything worked, we can try to schedule origins again ASAP. return time.monotonic() def visit_scheduler_thread( config: Dict, visit_type: str, command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]], ): """Target function for the visit sending thread, which initializes local connections and handles exceptions by sending them back to the main thread.""" from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import build_app try: # We need to reinitialize these connections because they're not generally # thread-safe app = build_app(config.get("celery")) scheduler = get_scheduler(**config["scheduler"]) - task_type = scheduler.get_task_type(f"load-{visit_type}") + task_name = f"load-{visit_type}" + task_type = scheduler.get_task_type(task_name) if task_type is None: - raise ValueError(f"Unknown task type: load-{visit_type}") + raise ValueError(f"Unknown task type: {task_name}") policy_cfg = config.get("scheduling_policy", DEFAULT_POLICY_CONFIG) for policies in policy_cfg.values(): for policy in policies: if "weight" not in policy or "policy" not in policy: raise ValueError( "Each policy configuration needs at least a 'policy' " "and a 'weight' entry" ) policy_cfg = {**DEFAULT_POLICY_CONFIG, **policy_cfg} next_iteration = time.monotonic() while True: # vary the next iteration time a little bit next_iteration = next_iteration + splay() while time.monotonic() < next_iteration: # Wait for next iteration to start. Listen for termination message. try: msg = command_queue.get(block=True, timeout=1) except Empty: continue if msg is TERMINATE: return else: logger.warn("Received unexpected message %s in command queue", msg) + # Refresh the task_type object from the database for new parameters, e.g. + # the max queue length + task_type = scheduler.get_task_type(task_name) + if task_type is None: + raise ValueError(f"Unknown task type: {task_name}") + next_iteration = send_visits_for_visit_type( scheduler, app, visit_type, task_type, policy_cfg.get(visit_type, policy_cfg["default"]), config.get( "no_origins_scheduled_backoff", DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF ), ) except BaseException as e: exc_queue.put((visit_type, e)) VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] """Dict storing the visit scheduler threads and their command queues""" def spawn_visit_scheduler_thread( threads: VisitSchedulerThreads, exc_queue: Queue[Tuple[str, BaseException]], config: Dict[str, Any], visit_type: str, ): """Spawn a new thread to schedule the visits of type ``visit_type``.""" command_queue: Queue[object] = Queue() thread = Thread( target=visit_scheduler_thread, kwargs={ "config": config, "visit_type": visit_type, "command_queue": command_queue, "exc_queue": exc_queue, }, ) threads[visit_type] = (thread, command_queue) thread.start() def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: """Terminate all visit scheduler threads""" logger.info("Termination requested...") for _, command_queue in threads.values(): command_queue.put(TERMINATE) loops = 0 while threads and loops < 10: logger.info( "Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) ) loops += 1 for visit_type, (thread, _) in list(threads.items()): thread.join(timeout=1) if not thread.is_alive(): logger.debug("Thread %s terminated", visit_type) del threads[visit_type] if threads: logger.warn( "Could not reap the following threads after 10 attempts: %s", ", ".join(sorted(threads)), ) return list(sorted(threads)) diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py index 8eeadaa..b4cd057 100644 --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -1,102 +1,111 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group # If you're looking for subcommand imports, they are further down this file to # avoid a circular import! @swh_cli_group.group( name="scheduler", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="Configuration file.", ) @click.option( "--database", "-d", default=None, help="Scheduling database DSN (imply cls is 'postgresql')", ) @click.option( "--url", "-u", default=None, help="Scheduler's url access (imply cls is 'remote')" ) @click.option( "--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console" ) @click.pass_context def cli(ctx, config_file, database, url, no_stdout): """Software Heritage Scheduler tools. Use a local scheduler instance by default (plugged to the main scheduler db). """ try: from psycopg2 import OperationalError except ImportError: class OperationalError(Exception): pass from swh.core import config from swh.scheduler import DEFAULT_CONFIG, get_scheduler ctx.ensure_object(dict) logger = logging.getLogger(__name__) scheduler = None conf = config.read(config_file, DEFAULT_CONFIG) if "scheduler" not in conf: raise ValueError("missing 'scheduler' configuration") if database: conf["scheduler"]["cls"] = "postgresql" conf["scheduler"]["db"] = database elif url: conf["scheduler"]["cls"] = "remote" conf["scheduler"]["url"] = url sched_conf = conf["scheduler"] try: logger.debug("Instantiating scheduler with %s", sched_conf) scheduler = get_scheduler(**sched_conf) except (ValueError, OperationalError): # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj["scheduler"] = scheduler ctx.obj["config"] = conf -from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa +from . import ( # noqa + add_forge_now, + admin, + celery_monitor, + journal, + origin, + simulator, + task, + task_type, +) def main(): import click.core click.core.DEPRECATED_HELP_NOTICE = """ DEPRECATED! Please use the command 'swh scheduler'.""" cli.deprecated = True return cli(auto_envvar_prefix="SWH_SCHEDULER") if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py new file mode 100644 index 0000000..e1aad07 --- /dev/null +++ b/swh/scheduler/cli/add_forge_now.py @@ -0,0 +1,163 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# WARNING: do not import unnecessary things here to keep cli startup time under +# control + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import click + +from . import cli + +if TYPE_CHECKING: + from typing import Dict, List, Optional + + +@cli.group("add-forge-now") +@click.option( + "-p", + "--preset", + "preset", + default="production", + type=click.Choice(["production", "staging"]), + help='Determine preset to use, "production" by default.', +) +@click.pass_context +def add_forge_now(ctx, preset): + """Manipulate add-forge-now requests.""" + if not ctx.obj["scheduler"]: + raise ValueError("Scheduler class (local/remote) must be instantiated") + + ctx.obj["preset"] = preset + + +@add_forge_now.command("register-lister") +@click.argument("lister_name", nargs=1, required=True) +@click.argument("options", nargs=-1) +@click.pass_context +def register_lister_cli( + ctx, + lister_name, + options, +): + """Register the lister tasks in the scheduler. + + The specifics of what tasks are registered depends on the add-forge-now --preset + option: + - staging preset: a single oneshot full listing task is scheduled. This "full" + listing is limited to 3 pages and 10 origins per page. The origins are recorded as + disabled (to avoid their recurrent loading). + - production preset: a recurrent full and incremental (if the loader has such a + task) listing task are scheduled. The first run of the full lister is scheduled + immediately, and the first run of the incremental lister is delayed by a day. + + """ + from .utils import lister_task_type, parse_options, task_add + + scheduler = ctx.obj["scheduler"] + preset = ctx.obj["preset"] + + # Map the associated task types for the lister + task_type_names: Dict[str, str] = { + listing_type: lister_task_type(lister_name, listing_type) + for listing_type in ["full", "incremental"] + } + + task_types: Dict[str, Dict] = {} + for listing_type, task_type_name in task_type_names.items(): + task_type = scheduler.get_task_type(task_type_name) + if task_type: + task_types[listing_type] = task_type + + if not task_types: + raise ValueError(f"Unknown lister type {lister_name}.") + + (args, kw) = parse_options(options) + + # Recurring policy on production + if preset == "production": + policy = "recurring" + else: # staging, "full" but limited listing as a oneshot + policy = "oneshot" + kw.update({"max_pages": 3, "max_origins_per_page": 10, "enable_origins": False}) + # We want a "full" listing in production if both incremental and full exists + if "full" in task_types: + task_types.pop("incremental", None) + + from datetime import timedelta + + from swh.scheduler.utils import utcnow + + for listing_type, task_type in task_types.items(): + now = utcnow() + next_run = now if listing_type == "full" else now + timedelta(days=1) + task_add( + scheduler, + task_type_name=task_type["type"], + args=args, + kw=kw, + policy=policy, + next_run=next_run, + ) + + +@add_forge_now.command("schedule-first-visits") +@click.option( + "--type-name", + "-t", + "visit_type_names", + help="Visit/loader type (can be provided multiple times)", + type=str, + multiple=True, +) +@click.option( + "--lister-name", + default=None, + help="Limit origins to those listed from lister with provided name", +) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", +) +@click.pass_context +def schedule_first_visits_cli( + ctx, + visit_type_names: List[str], + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, +): + """Send next origin visits of VISIT_TYPE_NAME(S) loader to celery, filling the + associated add_forge_now queue(s). + + """ + from .utils import get_task_type, send_to_celery + + scheduler = ctx.obj["scheduler"] + preset = ctx.obj["preset"] + + visit_type_to_queue: Dict[str, str] = {} + unknown_task_types = [] + for visit_type_name in visit_type_names: + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + unknown_task_types.append(visit_type_name) + continue + queue_name = task_type["backend_name"] + visit_type_to_queue[visit_type_name] = f"add_forge_now:{queue_name}" + + if unknown_task_types: + raise ValueError(f"Unknown task types {','.join(unknown_task_types)}.") + + send_to_celery( + scheduler, + visit_type_to_queue=visit_type_to_queue, + enabled=preset == "production", + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py index 61ab1a5..dfccdcd 100644 --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -1,258 +1,253 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations -from typing import TYPE_CHECKING, Iterable, List, Optional +from typing import TYPE_CHECKING import click from . import cli from ..utils import create_origin_task_dicts if TYPE_CHECKING: + from typing import Iterable, List, Optional from uuid import UUID from ..interface import SchedulerInterface from ..model import ListedOrigin @cli.group("origin") @click.pass_context def origin(ctx): """Manipulate listed origins.""" if not ctx.obj["scheduler"]: raise ValueError("Scheduler class (local/remote) must be instantiated") def format_origins( origins: List[ListedOrigin], fields: Optional[List[str]] = None, with_header: bool = True, ) -> Iterable[str]: """Format a list of origins as CSV. Arguments: origins: list of origins to output fields: optional list of fields to output (defaults to all fields) with_header: if True, output a CSV header. """ import csv from io import StringIO import attr from ..model import ListedOrigin expected_fields = [field.name for field in attr.fields(ListedOrigin)] if not fields: fields = expected_fields unknown_fields = set(fields) - set(expected_fields) if unknown_fields: raise ValueError( "Unknown ListedOrigin field(s): %s" % ", ".join(unknown_fields) ) output = StringIO() writer = csv.writer(output) def csv_row(data): """Return a single CSV-formatted row. We clear the output buffer after we're done to keep it reasonably sized.""" writer.writerow(data) output.seek(0) ret = output.read().rstrip() output.seek(0) output.truncate() return ret if with_header: yield csv_row(fields) for origin in origins: yield csv_row(str(getattr(origin, field)) for field in fields) @origin.command("grab-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--fields", "-f", default=None, help="Listed origin fields to print on output" ) @click.option( "--with-header/--without-header", is_flag=True, default=True, help="Print the CSV header?", ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def grab_next( ctx, policy: str, fields: Optional[str], with_header: bool, type: str, count: int ): """Grab the next COUNT origins to visit using the TYPE loader from the listed origins table.""" if fields: parsed_fields: Optional[List[str]] = fields.split(",") else: parsed_fields = None scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) for line in format_origins(origins, fields=parsed_fields, with_header=with_header): click.echo(line) @origin.command("schedule-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def schedule_next(ctx, policy: str, type: str, count: int): """Send the next COUNT origin visits of the TYPE loader to the scheduler as one-shot tasks.""" from ..utils import utcnow - from .task import pretty_print_task + from .utils import pretty_print_task scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) created = scheduler.create_tasks( [ { **task_dict, "policy": "oneshot", "next_run": utcnow(), "retries_left": 1, } for task_dict in create_origin_task_dicts(origins, scheduler) ] ) output = ["Created %d tasks\n" % len(created)] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager("\n".join(output)) @origin.command("send-to-celery") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--queue", "-q", help="Target celery queue", type=str, ) @click.option( "--tablesample", help="Table sampling percentage", type=float, ) @click.option( "--only-enabled/--only-disabled", "enabled", is_flag=True, default=True, help="""Determine whether we want to scheduled enabled or disabled origins. As default, we want to reasonably deal with enabled origins. For some edge case though, we might want the disabled ones.""", ) @click.option( - "--lister-uuid", + "--lister-name", default=None, - help="Limit origins to those listed from such lister", + help="Limit origins to those listed from lister with provided name", ) -@click.argument("type", type=str) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", +) +@click.argument("visit_type_name", type=str) @click.pass_context -def send_to_celery( +def send_to_celery_cli( ctx, policy: str, queue: Optional[str], tablesample: Optional[float], - type: str, + visit_type_name: str, enabled: bool, - lister_uuid: Optional[str] = None, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, ): - """Send the next origin visits of the TYPE loader to celery, filling the queue.""" - from kombu.utils.uuid import uuid + """Send next origin visits of VISIT_TYPE_NAME to celery, filling the queue.""" - from swh.scheduler.celery_backend.config import app, get_available_slots + from .utils import get_task_type, send_to_celery scheduler = ctx.obj["scheduler"] - task_type = scheduler.get_task_type(f"load-{type}") + task_type = get_task_type(scheduler, visit_type_name) + if not task_type: + raise ValueError(f"Unknown task type {task_type}.") - task_name = task_type["backend_name"] - queue_name = queue or task_name + queue_name = queue or task_type["backend_name"] - num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) - - click.echo(f"{num_tasks} slots available in celery queue") - origins = scheduler.grab_next_visits( - type, - num_tasks, + send_to_celery( + scheduler, + visit_type_to_queue={visit_type_name: queue_name}, policy=policy, tablesample=tablesample, enabled=enabled, - lister_uuid=lister_uuid, + lister_name=lister_name, + lister_instance_name=lister_instance_name, ) - click.echo(f"{len(origins)} visits to send to celery") - for task_dict in create_origin_task_dicts(origins, scheduler): - app.send_task( - task_name, - task_id=uuid(), - args=task_dict["arguments"]["args"], - kwargs=task_dict["arguments"]["kwargs"], - queue=queue_name, - ) - @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( "--instance", default=None, help="Only update metrics for this lister instance" ) @click.pass_context def update_metrics(ctx, lister: Optional[str], instance: Optional[str]): """Update the scheduler metrics on listed origins. Examples: swh scheduler origin update-metrics swh scheduler origin update-metrics --lister github swh scheduler origin update-metrics --lister phabricator --instance llvm """ import json import attr scheduler: SchedulerInterface = ctx.obj["scheduler"] lister_id: Optional[UUID] = None if lister is not None: lister_instance = scheduler.get_lister(name=lister, instance_name=instance) if not lister_instance: click.echo(f"Lister not found: {lister} instance={instance}") ctx.exit(2) assert False # for mypy lister_id = lister_instance.id def dictify_metrics(d): return {k: str(v) for (k, v) in attr.asdict(d).items()} ret = scheduler.update_metrics(lister_id=lister_id) click.echo(json.dumps(list(map(dictify_metrics, ret)), indent=4, sort_keys=True)) diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py index 72a1d96..bc37fb4 100644 --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -1,595 +1,470 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations # WARNING: do not import unnecessary things here to keep cli startup time under # control import locale from typing import TYPE_CHECKING, Iterator, List, Optional import click from . import cli if TYPE_CHECKING: import datetime # importing swh.storage.interface triggers the load of 300+ modules, so... import swh.model.model from swh.storage.interface import StorageInterface locale.setlocale(locale.LC_ALL, "") CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) DATETIME = click.DateTime() -def format_dict(d): - """Recursively format date objects in the dict passed as argument""" - import datetime - - ret = {} - for k, v in d.items(): - if isinstance(v, (datetime.date, datetime.datetime)): - v = v.isoformat() - elif isinstance(v, dict): - v = format_dict(v) - ret[k] = v - return ret - - -def pretty_print_list(list, indent=0): - """Pretty-print a list""" - return "".join("%s%r\n" % (" " * indent, item) for item in list) - - -def pretty_print_dict(dict, indent=0): - """Pretty-print a list""" - return "".join( - "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) - for key, value in sorted(dict.items()) - ) - - -def pretty_print_run(run, indent=4): - fmt = ( - "{indent}{backend_id} [{status}]\n" - "{indent} scheduled: {scheduled} [{started}:{ended}]" - ) - return fmt.format(indent=" " * indent, **format_dict(run)) - - -def pretty_print_task(task, full=False): - """Pretty-print a task - - If 'full' is True, also print the status and priority fields. - - >>> import datetime - >>> task = { - ... 'id': 1234, - ... 'arguments': { - ... 'args': ['foo', 'bar', True], - ... 'kwargs': {'key': 'value', 'key2': 42}, - ... }, - ... 'current_interval': datetime.timedelta(hours=1), - ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), - ... 'policy': 'oneshot', - ... 'priority': None, - ... 'status': 'next_run_not_scheduled', - ... 'type': 'test_task', - ... } - >>> print(click.unstyle(pretty_print_task(task))) - Task 1234 - Next run: ... (2019-02-21T13:52:35.407818) - Interval: 1:00:00 - Type: test_task - Policy: oneshot - Args: - 'foo' - 'bar' - True - Keyword args: - key: 'value' - key2: 42 - - >>> print(click.unstyle(pretty_print_task(task, full=True))) - Task 1234 - Next run: ... (2019-02-21T13:52:35.407818) - Interval: 1:00:00 - Type: test_task - Policy: oneshot - Status: next_run_not_scheduled - Priority:\x20 - Args: - 'foo' - 'bar' - True - Keyword args: - key: 'value' - key2: 42 - - """ - import humanize - - next_run = task["next_run"] - lines = [ - "%s %s\n" % (click.style("Task", bold=True), task["id"]), - click.style(" Next run: ", bold=True), - "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), - "\n", - click.style(" Interval: ", bold=True), - str(task["current_interval"]), - "\n", - click.style(" Type: ", bold=True), - task["type"] or "", - "\n", - click.style(" Policy: ", bold=True), - task["policy"] or "", - "\n", - ] - if full: - lines += [ - click.style(" Status: ", bold=True), - task["status"] or "", - "\n", - click.style(" Priority: ", bold=True), - task["priority"] or "", - "\n", - ] - lines += [ - click.style(" Args:\n", bold=True), - pretty_print_list(task["arguments"]["args"], indent=4), - click.style(" Keyword args:\n", bold=True), - pretty_print_dict(task["arguments"]["kwargs"], indent=4), - ] - - return "".join(lines) - - @cli.group("task") @click.pass_context def task(ctx): """Manipulate tasks.""" pass @task.command("schedule") @click.option( "--columns", "-c", multiple=True, default=["type", "args", "kwargs", "next_run"], type=click.Choice(["type", "args", "kwargs", "policy", "next_run"]), help="columns present in the CSV file", ) @click.option("--delimiter", "-d", default=",") @click.argument("file", type=click.File(encoding="utf-8")) @click.pass_context def schedule_tasks(ctx, columns, delimiter, file): """Schedule tasks from a CSV input file. The following columns are expected, and can be set through the -c option: - type: the type of the task to be scheduled (mandatory) - args: the arguments passed to the task (JSON list, defaults to an empty list) - kwargs: the keyword arguments passed to the task (JSON object, defaults to an empty dict) - next_run: the date at which the task should run (datetime, defaults to now) The CSV can be read either from a named file, or from stdin (use - as filename). Use sample: cat scheduling-task.txt | \ python3 -m swh.scheduler.cli \ --database 'service=swh-scheduler-dev' \ task schedule \ --columns type --columns kwargs --columns policy \ --delimiter ';' - """ import csv import json from swh.scheduler.utils import utcnow + from .utils import pretty_print_task + tasks = [] now = utcnow() scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") reader = csv.reader(file, delimiter=delimiter) for line in reader: task = dict(zip(columns, line)) args = json.loads(task.pop("args", "[]")) kwargs = json.loads(task.pop("kwargs", "{}")) task["arguments"] = { "args": args, "kwargs": kwargs, } task["next_run"] = task.get("next_run", now) tasks.append(task) created = scheduler.create_tasks(tasks) output = [ "Created %d tasks\n" % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager("\n".join(output)) @task.command("add") -@click.argument("type", nargs=1, required=True) +@click.argument("task_type_name", nargs=1, required=True) @click.argument("options", nargs=-1) @click.option( "--policy", "-p", default="recurring", type=click.Choice(["recurring", "oneshot"]) ) @click.option( "--priority", "-P", default=None, type=click.Choice(["low", "normal", "high"]) ) @click.option("--next-run", "-n", default=None) @click.pass_context -def schedule_task(ctx, type, options, policy, priority, next_run): +def schedule_task(ctx, task_type_name, options, policy, priority, next_run): """Schedule one task from arguments. - The first argument is the name of the task type, further ones are - positional and keyword argument(s) of the task, in YAML format. - Keyword args are of the form key=value. + The first argument is the name of the task type. Flag options (policy, priority) are + task configuration. Further options are positional and keyword argument(s) of the + task, in YAML format. Keyword args are of the form key=value. Usage sample: swh-scheduler --database 'service=swh-scheduler' \ task add list-pypi swh-scheduler --database 'service=swh-scheduler' \ task add list-debian-distribution --policy=oneshot distribution=stretch Note: if the priority is not given, the task won't have the priority set, which is considered as the lowest priority level. - """ - from swh.scheduler.utils import utcnow - from .utils import parse_options + """ + from .utils import parse_options, task_add scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") - now = utcnow() + task_type = scheduler.get_task_type(task_type_name) + if not task_type: + raise ValueError(f"Unknown task name {task_type_name}.") (args, kw) = parse_options(options) - task = { - "type": type, - "policy": policy, - "priority": priority, - "arguments": { - "args": args, - "kwargs": kw, - }, - "next_run": next_run or now, - } - created = scheduler.create_tasks([task]) - - output = [ - "Created %d tasks\n" % len(created), - ] - for task in created: - output.append(pretty_print_task(task)) - - click.echo("\n".join(output)) + task_add( + scheduler, + task_type_name=task_type_name, + policy=policy, + priority=priority, + next_run=next_run, + args=args, + kw=kw, + ) def iter_origins( # use string annotations to prevent some pkg loading storage: StorageInterface, page_token: Optional[str] = None, ) -> Iterator[swh.model.model.Origin]: """Iterate over origins in the storage. Optionally starting from page_token. This logs regularly an info message during pagination with the page_token. This, in order to feed it back to the cli if the process interrupted. Yields origin model objects from the storage """ while True: page_result = storage.origin_list(page_token=page_token) page_token = page_result.next_page_token yield from page_result.results if not page_token: break click.echo(f"page_token: {page_token}\n") @task.command("schedule_origins") @click.argument("type", nargs=1, required=True) @click.argument("options", nargs=-1) @click.option( "--batch-size", "-b", "origin_batch_size", default=10, show_default=True, type=int, help="Number of origins per task", ) @click.option( "--page-token", default=0, show_default=True, type=str, help="Only schedule tasks for origins whose ID is greater", ) @click.option( "--limit", default=None, type=int, help="Limit the tasks scheduling up to this number of tasks", ) @click.option("--storage-url", "-g", help="URL of the (graph) storage API") @click.option( "--dry-run/--no-dry-run", is_flag=True, default=False, help="List only what would be scheduled.", ) @click.pass_context def schedule_origin_metadata_index( ctx, type, options, storage_url, origin_batch_size, page_token, limit, dry_run ): """Schedules tasks for origins that are already known. The first argument is the name of the task type, further ones are keyword argument(s) of the task in the form key=value, where value is in YAML format. Usage sample: swh-scheduler --database 'service=swh-scheduler' \ task schedule_origins index-origin-metadata """ from itertools import islice from swh.storage import get_storage from .utils import parse_options, schedule_origin_batches scheduler = ctx.obj["scheduler"] storage = get_storage("remote", url=storage_url) if dry_run: scheduler = None (args, kw) = parse_options(options) if args: raise click.ClickException("Only keywords arguments are allowed.") origins = iter_origins(storage, page_token=page_token) if limit: origins = islice(origins, limit) origin_urls = (origin.url for origin in origins) schedule_origin_batches(scheduler, type, origin_urls, origin_batch_size, kw) @task.command("list-pending") @click.argument("task-types", required=True, nargs=-1) @click.option( "--limit", "-l", "num_tasks", required=False, type=click.INT, help="The maximum number of tasks to fetch", ) @click.option( "--before", "-b", required=False, type=DATETIME, help="List all jobs supposed to run before the given date", ) @click.pass_context def list_pending_tasks(ctx, task_types, num_tasks, before): """List tasks with no priority that are going to be run. You can override the number of tasks to fetch with the --limit flag. """ + from .utils import pretty_print_task + scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") output = [] for task_type in task_types: pending = scheduler.peek_ready_tasks( task_type, timestamp=before, num_tasks=num_tasks, ) output.append("Found %d %s tasks\n" % (len(pending), task_type)) for task in pending: output.append(pretty_print_task(task)) click.echo("\n".join(output)) @task.command("list") @click.option( "--task-id", "-i", default=None, multiple=True, metavar="ID", help="List only tasks whose id is ID.", ) @click.option( "--task-type", "-t", default=None, multiple=True, metavar="TYPE", help="List only tasks of type TYPE", ) @click.option( "--limit", "-l", required=False, type=click.INT, help="The maximum number of tasks to fetch.", ) @click.option( "--status", "-s", multiple=True, metavar="STATUS", type=click.Choice( ("next_run_not_scheduled", "next_run_scheduled", "completed", "disabled") ), default=None, help="List tasks whose status is STATUS.", ) @click.option( "--policy", "-p", default=None, type=click.Choice(["recurring", "oneshot"]), help="List tasks whose policy is POLICY.", ) @click.option( "--priority", "-P", default=None, multiple=True, type=click.Choice(["all", "low", "normal", "high"]), help="List tasks whose priority is PRIORITY.", ) @click.option( "--before", "-b", required=False, type=DATETIME, metavar="DATETIME", help="Limit to tasks supposed to run before the given date.", ) @click.option( "--after", "-a", required=False, type=DATETIME, metavar="DATETIME", help="Limit to tasks supposed to run after the given date.", ) @click.option( "--list-runs", "-r", is_flag=True, default=False, help="Also list past executions of each task.", ) @click.pass_context def list_tasks( ctx, task_id, task_type, limit, status, policy, priority, before, after, list_runs ): """List tasks.""" from operator import itemgetter + from .utils import pretty_print_run, pretty_print_task + scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") if not task_type: task_type = [x["type"] for x in scheduler.get_task_types()] # if task_id is not given, default value for status is # 'next_run_not_scheduled' # if task_id is given, default status is 'all' if task_id is None and status is None: status = ["next_run_not_scheduled"] if status and "all" in status: status = None if priority and "all" in priority: priority = None output = [] tasks = scheduler.search_tasks( task_id=task_id, task_type=task_type, status=status, priority=priority, policy=policy, before=before, after=after, limit=limit, ) if list_runs: runs = {t["id"]: [] for t in tasks} for r in scheduler.get_task_runs([task["id"] for task in tasks]): runs[r["task"]].append(r) else: runs = {} output.append("Found %d tasks\n" % (len(tasks))) for task in sorted(tasks, key=itemgetter("id")): output.append(pretty_print_task(task, full=True)) if runs.get(task["id"]): output.append(click.style(" Executions:", bold=True)) for run in sorted(runs[task["id"]], key=itemgetter("id")): output.append(pretty_print_run(run, indent=4)) click.echo("\n".join(output)) @task.command("respawn") @click.argument("task-ids", required=True, nargs=-1) @click.option( "--next-run", "-n", required=False, type=DATETIME, metavar="DATETIME", default=None, help="Re spawn the selected tasks at this date", ) @click.pass_context def respawn_tasks(ctx, task_ids: List[str], next_run: datetime.datetime): """Respawn tasks. Respawn tasks given by their ids (see the 'task list' command to find task ids) at the given date (immediately by default). Eg. swh-scheduler task respawn 1 3 12 """ from swh.scheduler.utils import utcnow scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") if next_run is None: next_run = utcnow() output = [] task_ids_int = [int(id_) for id_ in task_ids] scheduler.set_status_tasks( task_ids_int, status="next_run_not_scheduled", next_run=next_run ) output.append("Respawn tasks %s\n" % (task_ids_int,)) click.echo("\n".join(output)) diff --git a/swh/scheduler/cli/test_cli_utils.py b/swh/scheduler/cli/test_cli_utils.py new file mode 100644 index 0000000..755407e --- /dev/null +++ b/swh/scheduler/cli/test_cli_utils.py @@ -0,0 +1,17 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.scheduler.cli.utils import lister_task_type + + +@pytest.mark.parametrize( + "lister_name,listing_type", [("foo", "full"), ("bar", "incremental")] +) +def test_lister_task_type(lister_name, listing_type): + assert lister_task_type(lister_name, listing_type) == ( + f"list-{lister_name}-{listing_type}" + ) diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py index 4832212..cb2e865 100644 --- a/swh/scheduler/cli/utils.py +++ b/swh/scheduler/cli/utils.py @@ -1,102 +1,336 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control +from __future__ import annotations + +from typing import TYPE_CHECKING + import click +if TYPE_CHECKING: + from typing import Dict, List, Optional, Tuple + + from swh.scheduler.interface import SchedulerInterface + + TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler def schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs): from itertools import islice from swh.scheduler.utils import create_task_dict nb_origins = 0 nb_tasks = 0 while True: task_batch = [] for _ in range(TASK_BATCH_SIZE): # Group origins origin_batch = [] for origin in islice(origins, origin_batch_size): origin_batch.append(origin) nb_origins += len(origin_batch) if not origin_batch: break # Create a task for these origins args = [origin_batch] task_dict = create_task_dict(task_type, "oneshot", *args, **kwargs) task_batch.append(task_dict) # Schedule a batch of tasks if not task_batch: break nb_tasks += len(task_batch) if scheduler: scheduler.create_tasks(task_batch) click.echo("Scheduled %d tasks (%d origins)." % (nb_tasks, nb_origins)) # Print final status. if nb_tasks: click.echo("Done.") else: click.echo("Nothing to do (no origin metadata matched the criteria).") def parse_argument(option): import yaml if option == "": # yaml.safe_load("") returns None return "" try: return yaml.safe_load(option) except Exception: raise click.ClickException("Invalid argument: {}".format(option)) -def parse_options(options): +def parse_options(options: List[str]) -> Tuple[List[str], Dict]: """Parses options from a CLI as YAML and turns it into Python args and kwargs. >>> parse_options([]) ([], {}) >>> parse_options(['foo', 'bar']) (['foo', 'bar'], {}) >>> parse_options(['[foo, bar]']) ([['foo', 'bar']], {}) >>> parse_options(['"foo"', '"bar"']) (['foo', 'bar'], {}) >>> parse_options(['foo="bar"']) ([], {'foo': 'bar'}) >>> parse_options(['"foo"', 'bar="baz"']) (['foo'], {'bar': 'baz'}) >>> parse_options(['42', 'bar=False']) ([42], {'bar': False}) >>> parse_options(['42', 'bar=false']) ([42], {'bar': False}) >>> parse_options(['foo', '']) (['foo', ''], {}) >>> parse_options(['foo', 'bar=']) (['foo'], {'bar': ''}) >>> parse_options(['foo', 'null']) (['foo', None], {}) >>> parse_options(['foo', 'bar=null']) (['foo'], {'bar': None}) >>> parse_options(['42', '"foo']) Traceback (most recent call last): ... click.exceptions.ClickException: Invalid argument: "foo """ kw_pairs = [x.split("=", 1) for x in options if "=" in x] args = [parse_argument(x) for x in options if "=" not in x] kw = {k: parse_argument(v) for (k, v) in kw_pairs} return (args, kw) + + +def get_task_type(scheduler: SchedulerInterface, visit_type: str) -> Optional[Dict]: + "Given a visit type, return its associated task type." + return scheduler.get_task_type(f"load-{visit_type}") + + +def send_to_celery( + scheduler: SchedulerInterface, + visit_type_to_queue: Dict[str, str], + enabled: bool = True, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, + policy: str = "oldest_scheduled_first", + tablesample: Optional[float] = None, +): + """Utility function to read tasks from the scheduler and send those directly to + celery. + + Args: + visit_type_to_queue: Optional mapping of visit/loader type (e.g git, svn, ...) + to queue to send task to. + enabled: Determine whether we want to list enabled or disabled origins. As + default, we want reasonably enabled origins. For some edge case, we might + want the others. + lister_name: Determine the list of origins listed from the lister with name + lister_instance_name: Determine the list of origins listed from the lister + with instance name + policy: the scheduling policy used to select which visits to schedule + tablesample: the percentage of the table on which we run the query + (None: no sampling) + + """ + + from kombu.utils.uuid import uuid + + from swh.scheduler.celery_backend.config import app, get_available_slots + + from ..utils import create_origin_task_dicts + + for visit_type_name, queue_name in visit_type_to_queue.items(): + task_type = get_task_type(scheduler, visit_type_name) + assert task_type is not None + task_name = task_type["backend_name"] + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + + click.echo(f"{num_tasks} slots available in celery queue") + + origins = scheduler.grab_next_visits( + visit_type_name, + num_tasks, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + ) + + click.echo(f"{len(origins)} visits to send to celery") + for task_dict in create_origin_task_dicts(origins, scheduler): + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + +def pretty_print_list(list, indent=0): + """Pretty-print a list""" + return "".join("%s%r\n" % (" " * indent, item) for item in list) + + +def pretty_print_dict(dict, indent=0): + """Pretty-print a list""" + return "".join( + "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) + for key, value in sorted(dict.items()) + ) + + +def format_dict(d): + """Recursively format date objects in the dict passed as argument""" + import datetime + + ret = {} + for k, v in d.items(): + if isinstance(v, (datetime.date, datetime.datetime)): + v = v.isoformat() + elif isinstance(v, dict): + v = format_dict(v) + ret[k] = v + return ret + + +def pretty_print_run(run, indent=4): + fmt = ( + "{indent}{backend_id} [{status}]\n" + "{indent} scheduled: {scheduled} [{started}:{ended}]" + ) + return fmt.format(indent=" " * indent, **format_dict(run)) + + +def pretty_print_task(task, full=False): + """Pretty-print a task + + If 'full' is True, also print the status and priority fields. + + >>> import datetime + >>> task = { + ... 'id': 1234, + ... 'arguments': { + ... 'args': ['foo', 'bar', True], + ... 'kwargs': {'key': 'value', 'key2': 42}, + ... }, + ... 'current_interval': datetime.timedelta(hours=1), + ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), + ... 'policy': 'oneshot', + ... 'priority': None, + ... 'status': 'next_run_not_scheduled', + ... 'type': 'test_task', + ... } + >>> print(click.unstyle(pretty_print_task(task))) + Task 1234 + Next run: ... (2019-02-21T13:52:35.407818) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Args: + 'foo' + 'bar' + True + Keyword args: + key: 'value' + key2: 42 + + >>> print(click.unstyle(pretty_print_task(task, full=True))) + Task 1234 + Next run: ... (2019-02-21T13:52:35.407818) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Status: next_run_not_scheduled + Priority:\x20 + Args: + 'foo' + 'bar' + True + Keyword args: + key: 'value' + key2: 42 + + """ + import humanize + + next_run = task["next_run"] + lines = [ + "%s %s\n" % (click.style("Task", bold=True), task["id"]), + click.style(" Next run: ", bold=True), + "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), + "\n", + click.style(" Interval: ", bold=True), + str(task["current_interval"]), + "\n", + click.style(" Type: ", bold=True), + task["type"] or "", + "\n", + click.style(" Policy: ", bold=True), + task["policy"] or "", + "\n", + ] + if full: + lines += [ + click.style(" Status: ", bold=True), + task["status"] or "", + "\n", + click.style(" Priority: ", bold=True), + task["priority"] or "", + "\n", + ] + lines += [ + click.style(" Args:\n", bold=True), + pretty_print_list(task["arguments"]["args"], indent=4), + click.style(" Keyword args:\n", bold=True), + pretty_print_dict(task["arguments"]["kwargs"], indent=4), + ] + + return "".join(lines) + + +def task_add( + scheduler: SchedulerInterface, + task_type_name: str, + args: List[str], + kw: Dict, + policy: str, + priority: Optional[str] = None, + next_run: Optional[str] = None, +): + """Add a task task_type_name in the scheduler.""" + from swh.scheduler.utils import utcnow + + task = { + "type": task_type_name, + "policy": policy, + "priority": priority, + "arguments": { + "args": args, + "kwargs": kw, + }, + "next_run": next_run or utcnow(), + } + created = scheduler.create_tasks([task]) + + output = [f"Created {len(created)} tasks\n"] + for task in created: + output.append(pretty_print_task(task)) + + click.echo("\n".join(output)) + + +def lister_task_type(lister_name: str, lister_type: str) -> str: + return f"list-{lister_name}-{lister_type}" diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py index c20e4a0..2f3f733 100644 --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -1,515 +1,522 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint from swh.core.api.classes import PagedResult from swh.scheduler.model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics ListedOriginPageToken = Tuple[str, str] class PaginatedListedOriginList(PagedResult[ListedOrigin, ListedOriginPageToken]): """A list of listed origins, with a continuation token""" def __init__( self, results: List[ListedOrigin], next_page_token: Union[None, ListedOriginPageToken, List[str]], ): parsed_next_page_token: Optional[Tuple[str, str]] = None if next_page_token is not None: if len(next_page_token) != 2: raise TypeError("Expected Tuple[str, str] or list of size 2.") parsed_next_page_token = tuple(next_page_token) # type: ignore super().__init__(results, parsed_next_page_token) @runtime_checkable class SchedulerInterface(Protocol): @remote_api_endpoint("task_type/create") def create_task_type(self, task_type): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ ... @remote_api_endpoint("task_type/get") def get_task_type(self, task_type_name): """Retrieve the task type with id task_type_name""" ... @remote_api_endpoint("task_type/get_all") def get_task_types(self): """Retrieve all registered task types""" ... @remote_api_endpoint("task/create") def create_tasks(self, tasks, policy="recurring"): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ ... @remote_api_endpoint("task/set_status") def set_status_tasks( self, task_ids: List[int], status: str = "disabled", next_run: Optional[datetime.datetime] = None, ): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ ... @remote_api_endpoint("task/disable") def disable_tasks(self, task_ids): """Disable the tasks whose ids are listed.""" ... @remote_api_endpoint("task/search") def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, ): """Search tasks from selected criterions""" ... @remote_api_endpoint("task/get") def get_tasks(self, task_ids): """Retrieve the info of tasks whose ids are listed.""" ... @remote_api_endpoint("task/peek_ready") def peek_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: """Fetch the list of tasks (with no priority) to be scheduled. Args: task_type: filtering task per their type timestamp: peek tasks that need to be executed before that timestamp num_tasks: only peek at num_tasks tasks (with no priority) Returns: the list of tasks which would be scheduled """ ... @remote_api_endpoint("task/grab_ready") def grab_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: """Fetch and schedule the list of tasks (with no priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: grab tasks that need to be executed before that timestamp num_tasks: only grab num_tasks tasks (with no priority) Returns: the list of scheduled tasks """ ... @remote_api_endpoint("task/peek_ready_with_priority") def peek_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: """Fetch list of tasks (with any priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: peek tasks that need to be executed before that timestamp num_tasks: only peek at num_tasks tasks (with no priority) Returns: a list of tasks """ ... @remote_api_endpoint("task/grab_ready_with_priority") def grab_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: """Fetch and schedule the list of tasks (with any priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: grab tasks that need to be executed before that timestamp num_tasks: only grab num_tasks tasks (with no priority) Returns: a list of tasks """ ... @remote_api_endpoint("task_run/schedule_one") def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ ... @remote_api_endpoint("task_run/schedule") def mass_schedule_task_runs(self, task_runs): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ ... @remote_api_endpoint("task_run/start") def start_task_run(self, backend_id, metadata=None, timestamp=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ ... @remote_api_endpoint("task_run/end") def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ ... @remote_api_endpoint("task/filter_for_archive") def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ ... @remote_api_endpoint("task/delete_archived") def delete_archived_tasks(self, task_ids): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ ... @remote_api_endpoint("task_run/get") def get_task_runs(self, task_ids, limit=None): """Search task run for a task id""" ... @remote_api_endpoint("listers/get") def get_listers(self) -> List[Lister]: """Retrieve information about all listers from the database.""" ... @remote_api_endpoint("listers/get_by_id") def get_listers_by_id(self, lister_ids: List[str]) -> List[Lister]: """Retrieve listers in batch, using their UUID""" @remote_api_endpoint("lister/get") def get_lister( self, name: str, instance_name: Optional[str] = None ) -> Optional[Lister]: """Retrieve information about the given instance of the lister from the database. """ ... @remote_api_endpoint("lister/get_or_create") def get_or_create_lister( self, name: str, instance_name: Optional[str] = None ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ ... @remote_api_endpoint("lister/update") def update_lister(self, lister: Lister) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ ... @remote_api_endpoint("origins/record") def record_listed_origins( self, listed_origins: Iterable[ListedOrigin] ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ ... @remote_api_endpoint("origins/get") def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, enabled: Optional[bool] = True, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, ) -> PaginatedListedOriginList: """Get information on listed origins, possibly filtered, in a paginated way. Args: lister_id: if provided, return origins discovered with that lister url: if provided, return origins matching that URL enabled: If :const:`True` return only enabled origins, if :const:`False` return only disabled origins, if :const:`None` return all origins. limit: maximum number of origins per page page_token: to get the next page of origins, is returned in the :class:`PaginatedListedOriginList` object Returns: A page of listed origins """ ... @remote_api_endpoint("origins/grab_next") def grab_next_visits( self, visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: Optional[str] = None, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, + absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12), scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin. Arguments: visit_type: type of visits to schedule count: number of visits to schedule policy: the scheduling policy used to select which visits to schedule enabled: Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others. lister_uuid: Determine the list of origins listed from the lister with uuid + lister_name: Determine the list of origins listed from the lister with name + lister_instance_name: Determine the list of origins listed from the lister + with instance name timestamp: the mocked timestamp at which we're recording that the visits are being scheduled (defaults to the current time) + absolute_cooldown: the minimal interval between two visits of the same origin scheduled_cooldown: the minimal interval before which we can schedule - the same origin again + the same origin again if it's not been visited failed_cooldown: the minimal interval before which we can reschedule a failed origin not_found_cooldown: the minimal interval before which we can reschedule a not_found origin tablesample: the percentage of the table on which we run the query (None: no sampling) """ ... @remote_api_endpoint("visit_stats/upsert") def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats] ) -> None: """Create a new origin visit stats""" ... @remote_api_endpoint("visit_stats/get") def origin_visit_stats_get( self, ids: Iterable[Tuple[str, str]] ) -> List[OriginVisitStats]: """Retrieve the stats for an origin with a given visit type If some visit_stats are not found, they are filtered out of the result. So the output list may be of length inferior to the length of the input list. """ ... @remote_api_endpoint("visit_scheduler/get") def visit_scheduler_queue_position_get( self, ) -> Dict[str, int]: """Retrieve all current queue positions for the recurrent visit scheduler. Returns Mapping of visit type to their current queue position """ ... @remote_api_endpoint("visit_scheduler/set") def visit_scheduler_queue_position_set( self, visit_type: str, position: int ) -> None: """Set the current queue position of the recurrent visit scheduler for `visit_type`.""" ... @remote_api_endpoint("scheduler_metrics/update") def update_metrics( self, lister_id: Optional[UUID] = None, timestamp: Optional[datetime.datetime] = None, ) -> List[SchedulerMetrics]: """Update the performance metrics of this scheduler instance. Returns the updated metrics. Args: lister_id: if passed, update the metrics only for this lister instance timestamp: if passed, the date at which we're updating the metrics, defaults to the database NOW() """ ... @remote_api_endpoint("scheduler_metrics/get") def get_metrics( self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None ) -> List[SchedulerMetrics]: """Retrieve the performance metrics of this scheduler instance. Args: lister_id: filter the metrics for this lister instance only visit_type: filter the metrics for this visit type only """ ... diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py index d25c4f6..e256b9d 100644 --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -1,127 +1,127 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime from typing import Dict, List, Optional TEMPLATES = { "test-git": { "type": "load-test-git", "arguments": { "args": [], "kwargs": {}, }, "next_run": None, }, "test-hg": { "type": "load-test-hg", "arguments": { "args": [], "kwargs": {}, }, "next_run": None, "policy": "oneshot", }, } TASK_TYPES = { "test-git": { "type": "load-test-git", "description": "Update a git repository", "backend_name": "swh.loader.git.tasks.UpdateGitRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, "test-hg": { "type": "load-test-hg", "description": "Update a mercurial repository", "backend_name": "swh.loader.mercurial.tasks.UpdateHgRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, } def _task_from_template( template: Dict, next_run: datetime.datetime, priority: Optional[str], *args, **kwargs, ) -> Dict: ret = copy.deepcopy(template) ret["next_run"] = next_run if priority: ret["priority"] = priority if args: ret["arguments"]["args"] = list(args) if kwargs: ret["arguments"]["kwargs"] = kwargs return ret def tasks_from_template( template: Dict, max_timestamp: datetime.datetime, num: Optional[int] = None, priority: Optional[str] = None, num_priorities: Dict[Optional[str], int] = {}, ) -> List[Dict]: """Build ``num`` tasks from template""" assert bool(num) != bool(num_priorities), "mutually exclusive" if not num_priorities: assert num is not None # to please mypy num_priorities = {None: num} tasks: List[Dict] = [] for (priority, num) in num_priorities.items(): for _ in range(num): i = len(tasks) tasks.append( _task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, "argument-%03d" % i, **{"kwarg%03d" % i: "bogus-kwarg"}, ) ) return tasks def tasks_with_priority_from_template( template: Dict, max_timestamp: datetime.datetime, num: int, priority: str ) -> List[Dict]: """Build tasks with priority from template""" return [ _task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, "argument-%03d" % i, **{"kwarg%03d" % i: "bogus-kwarg"}, ) for i in range(num) ] LISTERS = ( - {"name": "github"}, + {"name": "github", "instance_name": "github"}, {"name": "gitlab", "instance_name": "gitlab"}, {"name": "gitlab", "instance_name": "freedesktop"}, {"name": "npm"}, {"name": "pypi"}, ) diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py index 6a55071..acf6223 100644 --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -1,946 +1,960 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from itertools import islice import logging import random import re import tempfile from unittest.mock import patch from click.testing import CliRunner import pytest from swh.core.api.classes import stream_results from swh.model.model import Origin from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict, utcnow CLI_CONFIG = """ scheduler: cls: foo args: {} """ def invoke(scheduler, catch_exceptions, args, config=CLI_CONFIG): runner = CliRunner() with patch( "swh.scheduler.get_scheduler" ) as get_scheduler_mock, tempfile.NamedTemporaryFile( "a", suffix=".yml" ) as config_fd: config_fd.write(config) config_fd.seek(0) get_scheduler_mock.return_value = scheduler args = [ "-C" + config_fd.name, ] + args result = runner.invoke(cli, args, obj={"log_level": logging.WARNING}) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_schedule_tasks(swh_scheduler): csv_data = ( b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" ) with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(csv_data) csv_fd.seek(0) result = invoke( swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name] ) expected = r""" Created 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg1', 'arg2'\] Keyword args: key: 'value' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg3', 'arg4'\] Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_tasks_columns(swh_scheduler): with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n') csv_fd.seek(0) result = invoke( swh_scheduler, False, [ "task", "schedule", "-c", "type", "-c", "policy", "-c", "args", "-c", "kwargs", "-d", ";", csv_fd.name, ], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_task(swh_scheduler): result = invoke( swh_scheduler, False, [ "task", "add", "swh-test-ping", "arg1", "arg2", "key=value", ], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output +def test_schedule_unknown_task_type(swh_scheduler): + """When scheduling unknown task type, the cli should raise.""" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + False, + [ + "task", + "add", + "unknown-task-type-should-raise", + ], + ) + + def test_list_pending_tasks_none(swh_scheduler): result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 1 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter(swh_scheduler): task = create_task_dict("swh-test-multiping", "oneshot", key="value") swh_scheduler.create_tasks([task]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter_2(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-multiping", "oneshot", key="value"), create_task_dict("swh-test-ping", "oneshot", key="value2"), ] ) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # Fails because "task list-pending --limit 3" only returns 2 tasks, because # of how compute_nb_tasks_from works. @pytest.mark.xfail def test_list_pending_tasks_limit(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-ping", "oneshot", key="value%d" % i) for i in range(10) ] ) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", "--limit", "3", ], ) expected = r""" Found 2 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value0' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3) task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: tomorrow \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", ], ) expected = r""" Found 2 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, [ "task", "list", "--task-id", "2", ], ) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id_2(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"] ) expected = r""" Found 2 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_type(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"] ) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_limit(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, [ "task", "list", "--limit", "2", ], ) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_after(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--after", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def _fill_storage_with_origins(storage, nb_origins): origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)] storage.origin_add(origins) return origins @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins_dry_run(swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a divisor of nb_origins.""" _fill_storage_with_origins(storage, 90) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "--dry-run", "swh-test-ping", ], ) # Check the output expected = r""" Scheduled 3 tasks \(30 origins\). Scheduled 6 tasks \(60 origins\). Scheduled 9 tasks \(90 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check scheduled tasks tasks = swh_scheduler.search_tasks() assert len(tasks) == 0 def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins): # check there are not too many tasks assert len(tasks) <= max_tasks # check tasks are not too large assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks) # check the tasks are exhaustive assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len( expected_origins ) assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == { origin.url for origin in expected_origins } @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins(swh_scheduler, storage): """Tests the scheduling when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" origins = _fill_storage_with_origins(storage, 70) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", "20", ], ) # Check the output expected = r""" Scheduled 3 tasks \(60 origins\). Scheduled 4 tasks \(70 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 4, 20, origins) assert all(task["arguments"]["kwargs"] == {} for task in tasks) def test_task_schedule_origins_kwargs(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" origins = _fill_storage_with_origins(storage, 30) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", "20", 'key1="value1"', 'key2="value2"', ], ) # Check the output expected = r""" Scheduled 2 tasks \(30 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 2, 20, origins) assert all( task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} for task in tasks ) def test_task_schedule_origins_with_limit(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" _fill_storage_with_origins(storage, 50) limit = 20 expected_origins = list(islice(stream_results(storage.origin_list), limit)) nb_origins = len(expected_origins) assert nb_origins == limit max_task_size = 5 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 # made the numbers go round result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--limit", limit, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_task_schedule_origins_with_page_token(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" nb_total_origins = 50 origins = _fill_storage_with_origins(storage, nb_total_origins) # prepare page_token and origins result expectancy page_result = storage.origin_list(limit=10) assert len(page_result.results) == 10 page_token = page_result.next_page_token assert page_token is not None # remove the first 10 origins listed as we won't see those in tasks expected_origins = [o for o in origins if o not in page_result.results] nb_origins = len(expected_origins) assert nb_origins == nb_total_origins - len(page_result.results) max_task_size = 10 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--page-token", page_token, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_cli_task_runner_unknown_task_types(swh_scheduler, storage): """When passing at least one unknown task type, the runner should fail.""" task_types = swh_scheduler.get_task_types() task_type_names = [t["type"] for t in task_types] known_task_type = random.choice(task_type_names) unknown_task_type = "unknown-task-type" assert unknown_task_type not in task_type_names with pytest.raises(ValueError, match="Unknown"): invoke( swh_scheduler, False, [ "start-runner", "--task-type", known_task_type, "--task-type", unknown_task_type, ], ) @pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"]) def test_cli_task_runner_with_known_tasks( swh_scheduler, storage, caplog, flag_priority ): """Trigger runner with known tasks runs smoothly.""" task_types = swh_scheduler.get_task_types() task_type_names = [t["type"] for t in task_types] task_type_name = random.choice(task_type_names) task_type_name2 = random.choice(task_type_names) # The runner will just iterate over the following known tasks and do noop. We are # just checking the runner does not explode here. result = invoke( swh_scheduler, False, [ "start-runner", flag_priority, "--task-type", task_type_name, "--task-type", task_type_name2, ], ) assert result.exit_code == 0, result.output def test_cli_task_runner_no_task(swh_scheduler, storage): """Trigger runner with no parameter should run as before.""" # The runner will just iterate over the existing tasks from the scheduler and do # noop. We are just checking the runner does not explode here. result = invoke( swh_scheduler, False, [ "start-runner", ], ) assert result.exit_code == 0, result.output diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py new file mode 100644 index 0000000..69fe9e1 --- /dev/null +++ b/swh/scheduler/tests/test_cli_add_forge_now.py @@ -0,0 +1,164 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict, Tuple + +import attr +import pytest + +from swh.scheduler.cli.utils import lister_task_type +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.tests.common import TASK_TYPES +from swh.scheduler.tests.test_cli import invoke as basic_invoke + + +def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): + return basic_invoke( + scheduler, args=["add-forge-now", *args], catch_exceptions=catch_exceptions + ) + + +def test_schedule_first_visits_cli_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + args=( + "schedule-first-visits", + "-t", + "unknown-vt0", + "--type-name", + "unknown-visit-type1", + ), + ) + + +@pytest.mark.parametrize( + "cmd_args, subcmd_args", + [ + ([], []), + ([], ["--lister-name", "github", "--lister-instance-name", "github"]), + (["--preset", "staging"], []), + ], +) +def test_schedule_first_visits_cli( + mocker, + swh_scheduler, + swh_scheduler_celery_app, + listed_origins_by_type, + cmd_args, + subcmd_args, +): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + visit_type = next(iter(listed_origins_by_type)) + + # enabled origins by default except when --staging flag is provided + enabled = "staging" not in cmd_args + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins( + (attr.evolve(o, enabled=enabled) for o in origins) + ) + + get_queue_length = mocker.patch( + "swh.scheduler.celery_backend.config.get_queue_length" + ) + get_queue_length.return_value = None + + send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") + send_task.return_value = None + + command_args = ( + cmd_args + ["schedule-first-visits", "--type-name", visit_type] + subcmd_args + ) + + result = invoke(swh_scheduler, args=tuple(command_args)) + assert result.exit_code == 0 + + scheduled_tasks = { + (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list + } + + expected_tasks = { + (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) + for origin in listed_origins_by_type[visit_type] + } + + assert scheduled_tasks == expected_tasks + + +def _create_task_type( + swh_scheduler: SchedulerInterface, lister_name: str, listing_type: str = "full" +) -> Dict: + task_type = { + "type": lister_task_type(lister_name, listing_type), # only relevant bit + "description": f"{listing_type} listing", + "backend_name": "swh.example.backend", + "default_interval": "1 day", + "min_interval": "1 day", + "max_interval": "1 day", + "backoff_factor": "1", + "max_queue_length": "100", + "num_retries": 3, + } + swh_scheduler.create_task_type(task_type) + task_type = swh_scheduler.get_task_type(task_type["type"]) + assert task_type is not None + return task_type + + +@pytest.mark.parametrize("preset", ["staging", "production"]) +def test_schedule_register_lister(swh_scheduler, stored_lister, preset): + # given + assert stored_lister is not None + lister_name = stored_lister.name + # Let's create all possible associated lister task types + full = _create_task_type(swh_scheduler, lister_name, "full") + incremental = _create_task_type(swh_scheduler, lister_name, "incremental") + + # Let's trigger the registering of that lister + result = invoke( + swh_scheduler, + [ + "--preset", + preset, + "register-lister", + lister_name, + "url=https://example.org", + ], + ) + + output = result.output.lstrip() + + expected_msgs = [] + if preset == "production": + # 2 tasks: 1 full + 1 incremental (tomorrow) with recurring policy + expected_msgs = ["Policy: recurring", incremental["type"], "Next run: tomorrow"] + else: + # 1 task full with policy oneshot + expected_msgs = ["Policy: oneshot"] + + # In any case, there is the full listing type too + expected_msgs.append(full["type"]) + + assert len(expected_msgs) > 0 + for msg in expected_msgs: + assert msg in output + + +def test_register_lister_unknown_task_type(swh_scheduler): + """When scheduling unknown task type, the cli should raise.""" + with pytest.raises(ValueError, match="Unknown"): + invoke( + swh_scheduler, + [ + "register-lister", + "unknown-lister-type-should-raise", + ], + ) diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py index 90644c5..b6bb85c 100644 --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -1,162 +1,177 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Tuple import pytest from swh.scheduler.cli.origin import format_origins from swh.scheduler.tests.common import TASK_TYPES from swh.scheduler.tests.test_cli import invoke as basic_invoke def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): return basic_invoke( scheduler, args=["origin", *args], catch_exceptions=catch_exceptions ) def test_cli_origin(swh_scheduler): """Check that swh scheduler origin returns its help text""" result = invoke(swh_scheduler) assert "Commands:" in result.stdout def test_format_origins_basic(listed_origins): listed_origins = listed_origins[:100] basic_output = list(format_origins(listed_origins)) # 1 header line + all origins assert len(basic_output) == len(listed_origins) + 1 no_header_output = list(format_origins(listed_origins, with_header=False)) assert basic_output[1:] == no_header_output def test_format_origins_fields_unknown(listed_origins): listed_origins = listed_origins[:10] it = format_origins(listed_origins, fields=["unknown_field"]) with pytest.raises(ValueError, match="unknown_field"): next(it) def test_format_origins_fields(listed_origins): listed_origins = listed_origins[:10] fields = ["lister_id", "url", "visit_type"] output = list(format_origins(listed_origins, fields=fields)) assert output[0] == ",".join(fields) for i, origin in enumerate(listed_origins): assert output[i + 1] == f"{origin.lister_id},{origin.url},{origin.visit_type}" def test_grab_next(swh_scheduler, listed_origins_by_type): NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested # XXX: should test all of 'listed_origins_by_type' here... visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("grab-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 out_lines = result.stdout.splitlines() assert len(out_lines) == NUM_RESULTS + 1 fields = out_lines[0].split(",") returned_origins = [dict(zip(fields, line.split(","))) for line in out_lines[1:]] # Check that we've received origins we had listed in the first place assert set(origin["url"] for origin in returned_origins) <= set( origin.url for origin in listed_origins_by_type[visit_type] ) def test_schedule_next(swh_scheduler, listed_origins_by_type): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("schedule-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 # pull all tasks out of the scheduler tasks = swh_scheduler.search_tasks() assert len(tasks) == NUM_RESULTS scheduled_tasks = { (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks } all_possible_tasks = { (f"load-{origin.visit_type}", origin.url) for origin in listed_origins_by_type[visit_type] } assert scheduled_tasks <= all_possible_tasks +def test_send_to_celery_unknown_visit_type( + swh_scheduler, +): + "Calling cli without a known visit type should raise" + with pytest.raises(ValueError, match="Unknown"): + invoke(swh_scheduler, args=("send-to-celery", "unknown-visit-type")) + + +@pytest.mark.parametrize( + "extra_cmd_args", + [[], ["--lister-name", "github", "--lister-instance-name", "github"]], +) def test_send_to_celery( mocker, swh_scheduler, swh_scheduler_celery_app, listed_origins_by_type, + extra_cmd_args, ): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) visit_type = next(iter(listed_origins_by_type)) for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) get_queue_length = mocker.patch( "swh.scheduler.celery_backend.config.get_queue_length" ) get_queue_length.return_value = None send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") send_task.return_value = None - result = invoke(swh_scheduler, args=("send-to-celery", visit_type)) + cmd_args = ["send-to-celery", visit_type] + extra_cmd_args + + result = invoke(swh_scheduler, args=tuple(cmd_args)) assert result.exit_code == 0 scheduled_tasks = { (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list } expected_tasks = { (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) for origin in listed_origins_by_type[visit_type] } assert expected_tasks == scheduled_tasks def test_update_metrics(swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) assert swh_scheduler.get_metrics() == [] result = invoke(swh_scheduler, args=("update-metrics",)) assert result.exit_code == 0 assert swh_scheduler.get_metrics() != [] diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 7aafc62..b20c9b9 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,1582 +1,1614 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import copy import datetime from datetime import timedelta import inspect import random from typing import Any, Dict, List, Optional, Tuple import uuid import attr from psycopg2.extras import execute_values import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface from swh.scheduler.model import ( LastVisitStatus, ListedOrigin, OriginVisitStats, SchedulerMetrics, ) from swh.scheduler.utils import utcnow from .common import ( LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template, tasks_with_priority_from_template, ) ONEDAY = timedelta(days=1) NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20} def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: return (m.lister_id, m.visit_type) def assert_metrics_equal(left, right): assert sorted(left, key=metrics_sort_key) == sorted(right, key=metrics_sort_key) class TestScheduler: def test_interface(self, swh_scheduler): """Checks all methods of SchedulerInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (SchedulerInterface,), {})() assert "create_task_type" in dir(interface) missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) try: concrete_meth = getattr(swh_scheduler, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES["test-git"] swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) tt2 = TASK_TYPES["test-hg"] swh_scheduler.create_task_type(tt2) assert tt == swh_scheduler.get_task_type(tt["type"]) assert tt2 == swh_scheduler.get_task_type(tt2["type"]) def test_create_task_type_idempotence(self, swh_scheduler): tt = TASK_TYPES["test-git"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES["test-git"], TASK_TYPES["test-hg"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt2) actual_task_types = swh_scheduler.get_task_types() assert tt in actual_task_types assert tt2 in actual_task_types def test_create_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) num_git = 100 tasks_1 = tasks_from_template(TEMPLATES["test-git"], utcnow(), num_git) tasks_2 = tasks_from_template( TEMPLATES["test-hg"], utcnow(), num_priorities=NUM_PRIORITY_TASKS ) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = swh_scheduler.create_tasks(tasks + tasks) set_ret1 = set([t["id"] for t in ret1]) # creating the same set result in the same ids ret = swh_scheduler.create_tasks(tasks) set_ret = set([t["id"] for t in ret]) # Idempotence results assert set_ret == set_ret1 assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task["type"].split("-", 1)[-1]] assert task["id"] not in ids assert task["status"] == "next_run_not_scheduled" assert task["current_interval"] == task_type["default_interval"] assert task["policy"] == orig_task.get("policy", "recurring") priority = task.get("priority") actual_priorities[priority] += 1 assert task["retries_left"] == (task_type["num_retries"] or 0) ids.add(task["id"]) del task["id"] del task["status"] del task["current_interval"] del task["retries_left"] if "policy" not in orig_task: del task["policy"] if "priority" not in orig_task: del task["priority"] assert task == orig_task expected_priorities = NUM_PRIORITY_TASKS.copy() expected_priorities[None] += num_git assert dict(actual_priorities) == expected_priorities def test_peek_ready_tasks_no_priority(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["test-git"]["type"] tasks = tasks_from_template(TEMPLATES["test-git"], t, 100) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): assert ready_tasks[i]["next_run"] <= ready_tasks[i + 1]["next_run"] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks) // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks(task_type, num_tasks=limit) assert len(ready_tasks_limited) == limit assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit - 1]["next_run"] ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts ) for ready_task in ready_tasks_timestamped: assert ready_task["next_run"] <= max_ts # Make sure we get proper behavior for the first ready tasks assert ready_tasks[: len(ready_tasks_timestamped)] == ready_tasks_timestamped # Limit by both ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit // 3 ) assert len(ready_tasks_both) <= limit // 3 for ready_task in ready_tasks_both: assert ready_task["next_run"] <= max_ts assert ready_task in ready_tasks[: limit // 3] def test_peek_ready_tasks_returns_only_no_priority_tasks(self, swh_scheduler): """Peek ready tasks only return standard tasks (no priority)""" self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["test-git"]["type"] # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["test-git"], t, num_priorities=NUM_PRIORITY_TASKS, ) count_priority = 0 for task in tasks: count_priority += 0 if task.get("priority") is None else 1 assert count_priority > 0, "Some created tasks should have some priority" random.shuffle(tasks) swh_scheduler.create_tasks(tasks) # take all available no priority tasks ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) - count_priority # No read task should have any priority for task in ready_tasks: assert task.get("priority") is None def test_grab_ready_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["test-git"]["type"] # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["test-git"], t, num_priorities=NUM_PRIORITY_TASKS ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) first_ready_tasks = swh_scheduler.peek_ready_tasks(task_type, num_tasks=50) grabbed_tasks = swh_scheduler.grab_ready_tasks(task_type, num_tasks=50) first_ready_tasks.sort(key=lambda task: task["arguments"]["args"][0]) grabbed_tasks.sort(key=lambda task: task["arguments"]["args"][0]) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed priority = grabbed["priority"] assert priority == peeked["priority"] assert priority is None def test_grab_ready_priority_tasks(self, swh_scheduler): """check the grab and peek priority tasks endpoint behave as expected""" self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["test-git"]["type"] num_tasks = 100 # Create tasks with and without priorities tasks0 = tasks_with_priority_from_template( TEMPLATES["test-git"], t, num_tasks, "high", ) tasks1 = tasks_with_priority_from_template( TEMPLATES["test-hg"], t, num_tasks, "low", ) tasks2 = tasks_with_priority_from_template( TEMPLATES["test-hg"], t, num_tasks, "normal", ) tasks = tasks0 + tasks1 + tasks2 random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_priority_tasks(task_type, num_tasks=50) grabbed_tasks = swh_scheduler.grab_ready_priority_tasks(task_type, num_tasks=50) ready_tasks.sort(key=lambda task: task["arguments"]["args"][0]) grabbed_tasks.sort(key=lambda task: task["arguments"]["args"][0]) for peeked, grabbed in zip(ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed assert peeked["priority"] == grabbed["priority"] assert peeked["priority"] is not None def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["test-git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = sorted(tasks[:length], key=lambda x: x["id"]) tasks[:length] = [] ret = swh_scheduler.get_tasks(task["id"] for task in cur_tasks) # result is not guaranteed to be sorted ret.sort(key=lambda x: x["id"]) assert ret == cur_tasks def test_search_tasks(self, swh_scheduler): def make_real_dicts(lst): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in lst] self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["test-git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) assert make_real_dicts(swh_scheduler.search_tasks()) == make_real_dicts(tasks) def assert_filtered_task_ok( self, task: Dict[str, Any], after: datetime.datetime, before: datetime.datetime ) -> None: """Ensure filtered tasks have the right expected properties (within the range, recurring disabled, etc..) """ started = task["started"] date = started if started is not None else task["scheduled"] assert after <= date and date <= before if task["task_policy"] == "oneshot": assert task["task_status"] in ["completed", "disabled"] if task["task_policy"] == "recurring": assert task["task_status"] in ["disabled"] def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks""" self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") _tasks.append(t) # Randomly update task's status per policy status_per_policy = {"recurring": 0, "oneshot": 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] "recurring": [ (1, "disabled"), (0, "completed"), (0, "next_run_not_scheduled"), ], "oneshot": [ (0, "next_run_not_scheduled"), (1, "disabled"), (1, "completed"), ], } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task["policy"] _task_ids[policy].append(task["id"]) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task["id"]) swh_scheduler.disable_tasks(tasks_to_update["recurring"]) # hack: change the status to something else than completed/disabled swh_scheduler.set_status_tasks( _task_ids["oneshot"], status="next_run_not_scheduled" ) # complete the tasks to update swh_scheduler.set_status_tasks(tasks_to_update["oneshot"], status="completed") total_tasks_filtered = ( status_per_policy["recurring"] + status_per_policy["oneshot"] ) # no pagination scenario # retrieve tasks to archive after = _time - ONEDAY after_ts = after.strftime("%Y-%m-%d") before = utcnow() + ONEDAY before_ts = before.strftime("%Y-%m-%d") tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=total_tasks ) tasks_to_archive = tasks_result["tasks"] assert len(tasks_to_archive) == total_tasks_filtered assert tasks_result.get("next_page_token") is None actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in tasks_to_archive: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy # pagination scenario nb_tasks = 3 tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) == nb_tasks next_page_token = tasks_result["next_page_token"] assert next_page_token is not None all_tasks = tasks_to_archive2 while next_page_token is not None: # Retrieve paginated results tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks, page_token=next_page_token, ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) <= nb_tasks all_tasks.extend(tasks_to_archive2) next_page_token = tasks_result.get("next_page_token") actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in all_tasks: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy def test_delete_archived_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") c = random.randint(0, 100) if c <= percent: _tasks.append({"task_id": t["task"], "task_run_id": t["id"]}) swh_scheduler.delete_archived_tasks(_tasks) all_tasks = [task["id"] for task in swh_scheduler.search_tasks()] tasks_count = len(all_tasks) tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) assert tasks_count == total_tasks - len(_tasks) assert tasks_run_count == total_tasks - len(_tasks) def test_get_task_runs_no_task(self, swh_scheduler): """No task exist in the scheduler's db, get_task_runs() should always return an empty list. """ assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_no_task_executed(self, swh_scheduler): """No task has been executed yet, get_task_runs() should always return an empty list. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) swh_scheduler.create_tasks(recurring + oneshots) assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_with_scheduled(self, swh_scheduler): """Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) assert not swh_scheduler.get_task_runs(task_ids=[total_tasks + 1]) btask = backend_tasks[0] runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 run = runs[0] assert subdict(run, excl=("id",)) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": None, "ended": None, "metadata": None, "status": "scheduled", } runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks], limit=2 ) assert len(runs) == 2 runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks] ) assert len(runs) == total_tasks keys = ("task", "backend_id", "scheduled") assert ( sorted([subdict(x, keys) for x in runs], key=lambda x: x["task"]) == backend_tasks ) def test_get_task_runs_with_executed(self, swh_scheduler): """Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() swh_scheduler.start_task_run( btask["backend_id"], metadata={"something": "stupid"}, timestamp=ts ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": None, "metadata": {"something": "stupid"}, "status": "started", } ts2 = utcnow() swh_scheduler.end_task_run( btask["backend_id"], metadata={"other": "stuff"}, timestamp=ts2, status="eventful", ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": ts2, "metadata": {"something": "stupid", "other": "stuff"}, "status": "eventful", } def test_get_or_create_lister(self, swh_scheduler): db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) for lister, lister_args in zip(db_listers, LISTERS): assert lister.name == lister_args["name"] assert lister.instance_name == lister_args.get("instance_name", "") lister_get_again = swh_scheduler.get_or_create_lister( lister.name, lister.instance_name ) assert lister == lister_get_again def test_get_lister(self, swh_scheduler): for lister_args in LISTERS: assert swh_scheduler.get_lister(**lister_args) is None db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) for lister, lister_args in zip(db_listers, LISTERS): lister_get_again = swh_scheduler.get_lister( lister.name, lister.instance_name ) assert lister == lister_get_again def test_get_listers(self, swh_scheduler): assert swh_scheduler.get_listers() == [] db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) assert swh_scheduler.get_listers() == db_listers def test_get_listers_by_id(self, swh_scheduler): assert swh_scheduler.get_listers_by_id([str(uuid.uuid4())]) == [] db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) id0 = db_listers[0].id id1 = db_listers[1].id assert swh_scheduler.get_listers_by_id([id0]) == [db_listers[0]] assert swh_scheduler.get_listers_by_id([id1]) == [db_listers[1]] assert swh_scheduler.get_listers_by_id([id0, id1]) == [ db_listers[0], db_listers[1], ] assert swh_scheduler.get_listers_by_id([id0, str(uuid.uuid4())]) == [ db_listers[0] ] def test_update_lister(self, swh_scheduler, stored_lister): lister = attr.evolve(stored_lister, current_state={"updated": "now"}) updated_lister = swh_scheduler.update_lister(lister) assert updated_lister.updated > lister.updated assert updated_lister == attr.evolve(lister, updated=updated_lister.updated) def test_update_lister_stale(self, swh_scheduler, stored_lister): swh_scheduler.update_lister(stored_lister) with pytest.raises(StaleData) as exc: swh_scheduler.update_lister(stored_lister) assert "state not updated" in exc.value.args[0] def test_record_listed_origins(self, swh_scheduler, listed_origins): ret = swh_scheduler.record_listed_origins(listed_origins) assert set(returned.url for returned in ret) == set( origin.url for origin in listed_origins ) assert all(origin.first_seen == origin.last_seen for origin in ret) def test_record_listed_origins_with_duplicate(self, swh_scheduler, listed_origins): # the duplicates must be in the same page to raise the "on conflict error" listed_origins.insert(0, listed_origins[0]) ret = swh_scheduler.record_listed_origins(listed_origins) # without the duplicate assert len(ret) == len(listed_origins) - 1 def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins): # First, insert `cutoff` origins cutoff = 100 assert cutoff < len(listed_origins) ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff]) assert len(ret) == cutoff # Then, insert all origins, including the `cutoff` first. ret = swh_scheduler.record_listed_origins(listed_origins) assert len(ret) == len(listed_origins) # Two different "first seen" values assert len(set(origin.first_seen for origin in ret)) == 2 # But a single "last seen" value assert len(set(origin.last_seen for origin in ret)) == 1 def test_get_listed_origins_exact(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) for i, origin in enumerate(listed_origins): ret = swh_scheduler.get_listed_origins( lister_id=origin.lister_id, url=origin.url ) assert ret.next_page_token is None assert len(ret.results) == 1 assert ret.results[0].lister_id == origin.lister_id assert ret.results[0].url == origin.url @pytest.mark.parametrize("num_origins,limit", [(20, 6), (5, 42), (20, 20)]) def test_get_listed_origins_limit( self, swh_scheduler, listed_origins, num_origins, limit ) -> None: added_origins = sorted( listed_origins[:num_origins], key=lambda o: (o.lister_id, o.url) ) swh_scheduler.record_listed_origins(added_origins) returned_origins: List[ListedOrigin] = [] call_count = 0 next_page_token: Optional[ListedOriginPageToken] = None while True: call_count += 1 ret = swh_scheduler.get_listed_origins( lister_id=listed_origins[0].lister_id, limit=limit, page_token=next_page_token, ) returned_origins.extend(ret.results) next_page_token = ret.next_page_token if next_page_token is None: break assert call_count == (num_origins // limit) + 1 assert len(returned_origins) == num_origins assert [(origin.lister_id, origin.url) for origin in returned_origins] == [ (origin.lister_id, origin.url) for origin in added_origins ] def test_get_listed_origins_all(self, swh_scheduler, listed_origins) -> None: swh_scheduler.record_listed_origins(listed_origins) ret = swh_scheduler.get_listed_origins(limit=len(listed_origins) + 1) assert ret.next_page_token is None assert len(ret.results) == len(listed_origins) def test_get_listed_origins_with_enabled_parameter( self, swh_scheduler, listed_origins_with_non_enabled ) -> None: swh_scheduler.record_listed_origins(listed_origins_with_non_enabled) # get all enabled listed origins ret = swh_scheduler.get_listed_origins( enabled=True, limit=len(listed_origins_with_non_enabled) + 1 ) assert ret.next_page_token is None assert len(ret.results) == len( [lo for lo in listed_origins_with_non_enabled if lo.enabled] ) assert all([lo.enabled for lo in ret.results]) # get all disabled listed origins ret = swh_scheduler.get_listed_origins( enabled=False, limit=len(listed_origins_with_non_enabled) + 1 ) assert ret.next_page_token is None assert len(ret.results) == len( [lo for lo in listed_origins_with_non_enabled if not lo.enabled] ) assert all([not lo.enabled for lo in ret.results]) # get all listed origins ret = swh_scheduler.get_listed_origins( enabled=None, limit=len(listed_origins_with_non_enabled) + 1 ) assert ret.next_page_token is None assert len(ret.results) == len(listed_origins_with_non_enabled) - def _grab_next_visits_setup(self, swh_scheduler, listed_origins_by_type): + def _grab_next_visits_setup(self, swh_scheduler, listed_origins_by_type, limit=100): """Basic origins setup for scheduling policy tests""" visit_type = next(iter(listed_origins_by_type)) - origins = listed_origins_by_type[visit_type][:100] - assert len(origins) > 0 - recorded_origins = swh_scheduler.record_listed_origins(origins) + all_origins = listed_origins_by_type[visit_type] + origins = all_origins[:limit] if limit else all_origins + assert len(origins) > 0 - return visit_type, recorded_origins + return visit_type, swh_scheduler.record_listed_origins(origins) def _check_grab_next_visit_basic( self, swh_scheduler, visit_type, policy, expected, **kwargs ): """Calls grab_next_visits with the passed policy, and check that: - all the origins returned are the expected ones (in the same order) - no extra origins are returned - the last_scheduled field has been set properly. Pass the extra keyword arguments to the calls to grab_next_visits. Returns a timestamp greater than all `last_scheduled` values for the grabbed visits. """ assert len(expected) != 0 before = utcnow() ret = swh_scheduler.grab_next_visits( visit_type=visit_type, # Request one more than expected to check that no extra origin is returned count=len(expected) + 1, policy=policy, **kwargs, ) after = utcnow() assert ret == expected visit_stats_list = swh_scheduler.origin_visit_stats_get( [(origin.url, origin.visit_type) for origin in expected] ) assert len(visit_stats_list) == len(expected) for visit_stats in visit_stats_list: # Check that last_scheduled got updated assert before <= visit_stats.last_scheduled <= after # They should not be scheduled again ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy=policy, **kwargs ) assert ret == [], "grab_next_visits returned already-scheduled origins" return after def _check_grab_next_visit( self, swh_scheduler, visit_type, policy, expected, **kwargs ): """Run the same check as _check_grab_next_visit_basic, but also checks the origin visits have been marked as scheduled, and are only re-scheduled a week later """ after = self._check_grab_next_visit_basic( swh_scheduler, visit_type, policy, expected, **kwargs ) # But a week, later, they should ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy=policy, timestamp=after + timedelta(days=7), ) # We need to sort them because their 'last_scheduled' field is updated to # exactly the same value, so the order is not deterministic assert sorted(ret) == sorted( expected ), "grab_next_visits didn't reschedule visits after a week" def _prepare_oldest_scheduled_first_origins( self, swh_scheduler, listed_origins_by_type ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Give all origins but one a last_scheduled date base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=None, last_successful=None, last_visit=None, last_scheduled=base_date - timedelta(seconds=i), ) for i, origin in enumerate(origins[1:]) ] swh_scheduler.origin_visit_stats_upsert(visit_stats) # We expect to retrieve the origin with a NULL last_scheduled # as well as those with the oldest values (i.e. the last ones), in order. expected = [origins[0]] + origins[1:][::-1] return visit_type, origins, expected def test_grab_next_visits_oldest_scheduled_first( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins, expected = self._prepare_oldest_scheduled_first_origins( swh_scheduler, listed_origins_by_type ) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="oldest_scheduled_first", expected=expected, ) - @pytest.mark.parametrize("which_cooldown", ("scheduled", "failed", "not_found")) + @pytest.mark.parametrize( + "which_cooldown", ("scheduled", "failed", "not_found", "absolute") + ) @pytest.mark.parametrize("cooldown", (7, 15)) def test_grab_next_visits_cooldowns( self, swh_scheduler, listed_origins_by_type, which_cooldown, cooldown, ): visit_type, origins, expected = self._prepare_oldest_scheduled_first_origins( swh_scheduler, listed_origins_by_type ) after = self._check_grab_next_visit_basic( swh_scheduler, visit_type=visit_type, policy="oldest_scheduled_first", expected=expected, ) - # Mark all the visits as scheduled, failed or notfound on the `after` timestamp + # Mark all the visits as scheduled, failed or not_found on the `after` timestamp. + # If we're testing the `absolute_cooldown`, mark the visit as successful. ovs_args = { "last_visit": None, "last_visit_status": None, "last_scheduled": None, + "last_successful": None, + "last_snapshot": None, } if which_cooldown == "scheduled": ovs_args["last_scheduled"] = after + elif which_cooldown == "absolute": + ovs_args["last_visit"] = after + ovs_args["last_successful"] = after + ovs_args["last_visit_status"] = LastVisitStatus.successful + ovs_args["last_snapshot"] = b"\x00" * 20 else: ovs_args["last_visit"] = after ovs_args["last_visit_status"] = LastVisitStatus(which_cooldown) visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, - last_snapshot=None, - last_successful=None, **ovs_args, ) for i, origin in enumerate(origins) ] swh_scheduler.origin_visit_stats_upsert(visit_stats) cooldown_td = timedelta(days=cooldown) cooldown_args = { "scheduled_cooldown": None, "failed_cooldown": None, "not_found_cooldown": None, + "absolute_cooldown": None, } cooldown_args[f"{which_cooldown}_cooldown"] = cooldown_td ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", timestamp=after + cooldown_td - timedelta(seconds=1), **cooldown_args, ) assert ret == [], f"{which_cooldown}_cooldown ignored" ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", timestamp=after + cooldown_td + timedelta(seconds=1), **cooldown_args, ) assert sorted(ret) == sorted( expected ), "grab_next_visits didn't reschedule visits after the configured cooldown" def test_grab_next_visits_tablesample( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins, expected = self._prepare_oldest_scheduled_first_origins( swh_scheduler, listed_origins_by_type ) ret = swh_scheduler.grab_next_visits( visit_type=visit_type, policy="oldest_scheduled_first", tablesample=50, count=len(expected), ) # Just a smoke test, not obvious how to test this more reliably assert ret is not None def test_grab_next_visits_never_visited_oldest_update_first( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) # We expect to retrieve origins with the oldest update date, that is # origins at the end of our updated_origins list. expected_origins = sorted(updated_origins, key=lambda o: o.last_update) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="never_visited_oldest_update_first", expected=expected_origins, ) def test_grab_next_visits_already_visited_order_by_lag( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) # Update the visit stats with a known visit at a controlled date for # half the origins. Pick the date in the middle of the # updated_origins' `last_update` range visit_date = updated_origins[len(updated_origins) // 2].last_update visited_origins = updated_origins[::2] visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), last_successful=visit_date, last_visit=visit_date, ) for origin in visited_origins ] swh_scheduler.origin_visit_stats_upsert(visit_stats) # We expect to retrieve visited origins with the largest lag, but only # those which haven't been visited since their last update expected_origins = sorted( [origin for origin in visited_origins if origin.last_update > visit_date], key=lambda o: visit_date - o.last_update, ) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="already_visited_order_by_lag", expected=expected_origins, ) def test_grab_next_visits_underflow(self, swh_scheduler, listed_origins_by_type): """Check that grab_next_visits works when there not enough origins in the database""" visit_type = next(iter(listed_origins_by_type)) # Only add 5 origins to the database origins = listed_origins_by_type[visit_type][:5] assert origins swh_scheduler.record_listed_origins(origins) ret = swh_scheduler.grab_next_visits( visit_type, len(origins) + 2, policy="oldest_scheduled_first" ) assert len(ret) == 5 def test_grab_next_visits_no_last_update_nor_visit_stats( self, swh_scheduler, listed_origins_by_type ): """grab_next_visits should retrieve tasks without last update (nor visit stats)""" visit_type = next(iter(listed_origins_by_type)) origins = [] for origin in listed_origins_by_type[visit_type]: origins.append( attr.evolve(origin, last_update=None) ) # void the last update so we are in the relevant context assert len(origins) > 0 swh_scheduler.record_listed_origins(origins) # Initially, we have no global queue position current_state = swh_scheduler.visit_scheduler_queue_position_get() assert current_state == {} # nor any visit statuses actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in origins ) assert len(actual_visit_stats) == 0 # Grab some new visits next_visits = swh_scheduler.grab_next_visits( visit_type, count=len(origins), policy="origins_without_last_update", ) # we do have the one without any last update assert len(next_visits) == len(origins) # Now the global state got updated current_state = swh_scheduler.visit_scheduler_queue_position_get() assert current_state[visit_type] is not None actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in next_visits ) # Visit stats got algo created assert len(actual_visit_stats) == len(origins) def test_grab_next_visits_no_last_update_with_visit_stats( self, swh_scheduler, listed_origins_by_type ): """grab_next_visits should retrieve tasks without last update""" visit_type = next(iter(listed_origins_by_type)) origins = [] for origin in listed_origins_by_type[visit_type]: origins.append( attr.evolve(origin, last_update=None) ) # void the last update so we are in the relevant context assert len(origins) > 0 swh_scheduler.record_listed_origins(origins) # Initially, we have no global queue position current_state = swh_scheduler.visit_scheduler_queue_position_get() assert current_state == {} # Simulate some of those origins have associated visit stats (some with an # existing queue position and some without any) visit_stats = ( [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_successful=utcnow(), last_visit=utcnow(), next_visit_queue_position=int(24 * 3600 * random.uniform(-10, 1)), ) for origin in origins[:100] ] + [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_successful=utcnow(), last_visit=utcnow(), next_visit_queue_position=int( 24 * 3600 * random.uniform(1, 10) ), # definitely > 0 ) for origin in origins[100:150] ] + [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_successful=utcnow(), last_visit=utcnow(), ) for origin in origins[150:] ] ) swh_scheduler.origin_visit_stats_upsert(visit_stats) # Grab next visits actual_visits = swh_scheduler.grab_next_visits( visit_type, count=len(origins), policy="origins_without_last_update", ) assert len(actual_visits) == len(origins) actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in actual_visits ) assert len(actual_visit_stats) == len(origins) current_state = swh_scheduler.visit_scheduler_queue_position_get() assert current_state == { visit_type: max( s.next_visit_queue_position for s in actual_visit_stats if s.next_visit_queue_position is not None ) } def test_grab_next_visits_unknown_policy(self, swh_scheduler): unknown_policy = "non_existing_policy" NUM_RESULTS = 5 with pytest.raises(UnknownPolicy, match=unknown_policy): swh_scheduler.grab_next_visits("type", NUM_RESULTS, policy=unknown_policy) def test_grab_next_visit_duplicates(self, swh_scheduler, listed_origins): """Checks grab_next_visits does not crash when there are rows with duplicated (origin_url, visit_type) in the database """ lister2 = swh_scheduler.get_or_create_lister(**LISTERS[1]) assert lister2.id != listed_origins[0].lister_id # Create two origins with the same url and visit_type, but different listers # (and also differing value for last_update so they are returned in # deterministic order) origin1 = attr.evolve( listed_origins[0], first_seen=utcnow(), last_seen=utcnow() ) origin2 = attr.evolve( origin1, lister_id=lister2.id, last_update=origin1.last_update + datetime.timedelta(seconds=10), ) origins = [origin1, origin2] recorded_origins = swh_scheduler.record_listed_origins(origins) expected_origins = sorted(recorded_origins, key=lambda o: o.last_update) self._check_grab_next_visit( swh_scheduler, visit_type=origin1.visit_type, policy="never_visited_oldest_update_first", expected=expected_origins, ) + def test_grab_next_visit_for_specific_lister( + self, swh_scheduler, listed_origins_by_type, stored_lister + ): + """Checks grab_next_visits filters on the given lister {name, instance name}""" + + visit_type, origins = self._grab_next_visits_setup( + swh_scheduler, listed_origins_by_type, limit=None + ) + + expected_origins = [origin for origin in listed_origins_by_type[visit_type]] + + ret = swh_scheduler.grab_next_visits( + visit_type=visit_type, + count=len(expected_origins), + policy="never_visited_oldest_update_first", + lister_name=stored_lister.name, + lister_instance_name=stored_lister.instance_name, + ) + + assert len(ret) == len(expected_origins) + for origin in ret: + assert origin.lister_id == stored_lister.id + def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt) def test_origin_visit_stats_get_empty(self, swh_scheduler) -> None: assert swh_scheduler.origin_visit_stats_get([]) == [] def test_origin_visit_stats_get_pagination(self, swh_scheduler) -> None: page_size = inspect.signature(execute_values).parameters["page_size"].default visit_stats = [ OriginVisitStats( url=f"https://example.com/origin-{i:03d}", visit_type="git", last_successful=utcnow(), last_visit=utcnow(), ) for i in range( page_size + 1 ) # Ensure overflow of the psycopg2.extras.execute_values page_size ] swh_scheduler.origin_visit_stats_upsert(visit_stats) assert set( swh_scheduler.origin_visit_stats_get( [(ovs.url, ovs.visit_type) for ovs in visit_stats] ) ) == set(visit_stats) def test_origin_visit_stats_upsert(self, swh_scheduler) -> None: eventful_date = utcnow() url = "https://github.com/test" visit_stats = OriginVisitStats( url=url, visit_type="git", last_successful=eventful_date, last_visit=eventful_date, ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] new_visit_date = utcnow() visit_stats = OriginVisitStats( url=url, visit_type="git", last_successful=None, last_visit=new_visit_date, ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) uneventful_visits = swh_scheduler.origin_visit_stats_get([(url, "git")]) expected_visit_stats = OriginVisitStats( url=url, visit_type="git", last_successful=eventful_date, last_visit=new_visit_date, ) assert uneventful_visits == [expected_visit_stats] def test_origin_visit_stats_upsert_with_snapshot(self, swh_scheduler) -> None: eventful_date = utcnow() url = "https://github.com/666/test" visit_stats = OriginVisitStats( url=url, visit_type="git", last_successful=eventful_date, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] def test_origin_visit_stats_upsert_batch(self, swh_scheduler) -> None: """Batch upsert is ok""" visit_stats = [ OriginVisitStats( url="foo", visit_type="git", last_successful=utcnow(), last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ), OriginVisitStats( url="bar", visit_type="git", last_visit=utcnow(), last_snapshot=hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157bfff"), ), ] swh_scheduler.origin_visit_stats_upsert(visit_stats) for visit_stat in swh_scheduler.origin_visit_stats_get( [(vs.url, vs.visit_type) for vs in visit_stats] ): assert visit_stat is not None def test_origin_visit_stats_upsert_cardinality_failing(self, swh_scheduler) -> None: """Batch upsert does not support altering multiple times the same origin-visit-status""" with pytest.raises(SchedulerException, match="CardinalityViolation"): swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url="foo", visit_type="git", last_successful=None, last_visit=utcnow(), ), OriginVisitStats( url="foo", visit_type="git", last_successful=utcnow(), last_visit=None, ), ] ) def test_visit_scheduler_queue_position( self, swh_scheduler, listed_origins ) -> None: result = swh_scheduler.visit_scheduler_queue_position_get() assert result == {} expected_result = {} visit_types = set() for origin in listed_origins: visit_type = origin.visit_type if visit_type in visit_types: continue visit_types.add(visit_type) position = 42 swh_scheduler.visit_scheduler_queue_position_set(visit_type, position) expected_result[visit_type] = position result = swh_scheduler.visit_scheduler_queue_position_get() assert result == expected_result def test_metrics_origins_known(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ret = swh_scheduler.update_metrics() assert sum(metric.origins_known for metric in ret) == len(listed_origins) def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) disabled_origin = attr.evolve(listed_origins[0], enabled=False) swh_scheduler.record_listed_origins([disabled_origin]) ret = swh_scheduler.update_metrics(lister_id=disabled_origin.lister_id) for metric in ret: if metric.visit_type == disabled_origin.visit_type: # We disabled one of these origins assert metric.origins_known - metric.origins_enabled == 1 else: # But these are still all enabled assert metric.origins_known == metric.origins_enabled def test_metrics_origins_never_visited(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) # Pretend that we've recorded a visit on one origin visited_origin = listed_origins[0] swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, last_successful=utcnow(), last_snapshot=hash_to_bytes( "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), ), ] ) ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) for metric in ret: if metric.visit_type == visited_origin.visit_type: # We visited one of these origins assert metric.origins_known - metric.origins_never_visited == 1 else: # But none of these have been visited assert metric.origins_known == metric.origins_never_visited def test_metrics_origins_with_pending_changes(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) # Pretend that we've recorded a visit on one origin, in the past with # respect to the "last update" time for the origin visited_origin = listed_origins[0] assert visited_origin.last_update is not None swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, last_successful=visited_origin.last_update - timedelta(days=1), last_snapshot=hash_to_bytes( "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), ), ] ) ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) for metric in ret: if metric.visit_type == visited_origin.visit_type: # We visited one of these origins, in the past assert metric.origins_with_pending_changes == 1 else: # But none of these have been visited assert metric.origins_with_pending_changes == 0 def test_update_metrics_explicit_lister(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) fake_uuid = uuid.uuid4() assert all(fake_uuid != origin.lister_id for origin in listed_origins) ret = swh_scheduler.update_metrics(lister_id=fake_uuid) assert len(ret) == 0 def test_update_metrics_explicit_timestamp(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ts = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) def test_update_metrics_twice(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ts = utcnow() ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) second_ts = ts + timedelta(seconds=1) ret = swh_scheduler.update_metrics(timestamp=second_ts) assert all(metric.last_update == second_ts for metric in ret) def test_get_metrics(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics() assert_metrics_equal(updated, retrieved) def test_get_metrics_by_lister(self, swh_scheduler, listed_origins): lister_id = listed_origins[0].lister_id assert lister_id is not None swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics(lister_id=lister_id) assert len(retrieved) > 0 assert_metrics_equal( [metric for metric in updated if metric.lister_id == lister_id], retrieved ) def test_get_metrics_by_visit_type(self, swh_scheduler, listed_origins): visit_type = listed_origins[0].visit_type assert visit_type is not None swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics(visit_type=visit_type) assert len(retrieved) > 0 assert_metrics_equal( [metric for metric in updated if metric.visit_type == visit_type], retrieved ) diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py index e518984..2d46058 100644 --- a/swh/scheduler/tests/test_utils.py +++ b/swh/scheduler/tests/test_utils.py @@ -1,191 +1,191 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timezone from unittest.mock import patch import uuid from swh.scheduler import model, utils from .common import LISTERS @patch("swh.scheduler.utils.datetime") def test_create_oneshot_task_dict_simple(mock_datetime): mock_datetime.now.return_value = "some-date" actual_task = utils.create_oneshot_task_dict("some-task-type") expected_task = { "policy": "oneshot", "type": "some-task-type", "next_run": "some-date", "arguments": { "args": [], "kwargs": {}, }, } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch("swh.scheduler.utils.datetime") def test_create_oneshot_task_dict_other_call(mock_datetime): mock_datetime.now.return_value = "some-other-date" actual_task = utils.create_oneshot_task_dict( "some-task-type", "arg0", "arg1", priority="high", other_stuff="normal" ) expected_task = { "policy": "oneshot", "type": "some-task-type", "next_run": "some-other-date", "arguments": { "args": ("arg0", "arg1"), "kwargs": {"other_stuff": "normal"}, }, "priority": "high", } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch("swh.scheduler.utils.datetime") def test_create_task_dict(mock_datetime): mock_datetime.now.return_value = "date" actual_task = utils.create_task_dict( "task-type", "recurring", "arg0", "arg1", priority="low", other_stuff="normal", retries_left=3, ) expected_task = { "policy": "recurring", "type": "task-type", "next_run": "date", "arguments": { "args": ("arg0", "arg1"), "kwargs": {"other_stuff": "normal"}, }, "priority": "low", "retries_left": 3, } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) def test_create_origin_task_dict(): lister = model.Lister(**LISTERS[1], id=uuid.uuid4()) origin = model.ListedOrigin( lister_id=lister.id, url="http://example.com/", visit_type="git", ) task = utils.create_origin_task_dict(origin, lister) assert task == { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/", "lister_name": LISTERS[1]["name"], "lister_instance_name": LISTERS[1]["instance_name"], }, }, } loader_args = {"foo": "bar", "baz": {"foo": "bar"}} origin_w_args = model.ListedOrigin( lister_id=lister.id, url="http://example.com/svn/", visit_type="svn", extra_loader_arguments=loader_args, ) task_w_args = utils.create_origin_task_dict(origin_w_args, lister) assert task_w_args == { "type": "load-svn", "arguments": { "args": [], "kwargs": { "url": "http://example.com/svn/", "lister_name": LISTERS[1]["name"], "lister_instance_name": LISTERS[1]["instance_name"], **loader_args, }, }, } def test_create_origin_task_dicts(swh_scheduler): listers = [] for lister_args in LISTERS: listers.append(swh_scheduler.get_or_create_lister(**lister_args)) origin1 = model.ListedOrigin( lister_id=listers[0].id, url="http://example.com/1", visit_type="git", ) origin2 = model.ListedOrigin( lister_id=listers[0].id, url="http://example.com/2", visit_type="git", ) origin3 = model.ListedOrigin( lister_id=listers[1].id, url="http://example.com/3", visit_type="git", ) origins = [origin1, origin2, origin3] tasks = utils.create_origin_task_dicts(origins, swh_scheduler) assert tasks == [ { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/1", "lister_name": LISTERS[0]["name"], - "lister_instance_name": None, + "lister_instance_name": LISTERS[0]["instance_name"], }, }, }, { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/2", "lister_name": LISTERS[0]["name"], - "lister_instance_name": None, + "lister_instance_name": LISTERS[0]["instance_name"], }, }, }, { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/3", "lister_name": LISTERS[1]["name"], "lister_instance_name": LISTERS[1]["instance_name"], }, }, }, ] diff --git a/tox.ini b/tox.ini index 76bf662..d473f85 100644 --- a/tox.ini +++ b/tox.ini @@ -1,81 +1,82 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov dev: ipdb setenv = LC_ALL=C.UTF-8 LC_CTYPE=C.UTF-8 LANG=C.UTF-8 commands = pytest --doctest-modules \ !slow: --hypothesis-profile=fast \ slow: --hypothesis-profile=slow \ --cov={envsitepackagesdir}/swh/scheduler \ {envsitepackagesdir}/swh/scheduler \ --cov-branch {posargs} [testenv:black] skip_install = true deps = - black==22.3.0 + black==22.10.0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = - flake8==4.0.1 - flake8-bugbear==22.3.23 + flake8==5.0.4 + flake8-bugbear==22.9.23 + pycodestyle==2.9.1 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy==0.942 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs