diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3cc45b3..ec7bad1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,49 +1,50 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml - repo: https://gitlab.com/pycqa/flake8 rev: 3.8.3 hooks: - id: flake8 - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell + args: [-L simpy, -L hist] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] - repo: https://github.com/PyCQA/isort rev: 5.5.2 hooks: - id: isort - repo: https://github.com/python/black rev: 19.10b0 hooks: - id: black # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] diff --git a/PKG-INFO b/PKG-INFO index 57efe61..14a97e4 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,30 +1,32 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.8.2 +Version: 0.9.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing +Provides-Extra: journal +Provides-Extra: simulator diff --git a/docs/cli.rst b/docs/cli.rst new file mode 100644 index 0000000..c1b25bf --- /dev/null +++ b/docs/cli.rst @@ -0,0 +1,42 @@ +.. _swh-scheduler-cli: + +Command-line interface +====================== + +Shared command-line interface +----------------------------- + +.. click:: swh.scheduler.cli:cli + :prog: swh scheduler + :nested: short + +Scheduler task utilities +------------------------ + +.. click:: swh.scheduler.cli.task:task + :prog: swh scheduler task + :nested: full + +.. click:: swh.scheduler.cli.task_type:task_type + :prog: swh scheduler task_type + :nested: full + + +Scheduler server utilities +-------------------------- + +.. click:: swh.scheduler.cli.admin:runner + :prog: swh scheduler runner + :nested: full + +.. click:: swh.scheduler.cli.admin:listener + :prog: swh scheduler listener + :nested: full + +.. click:: swh.scheduler.cli.admin:rpc_server + :prog: swh scheduler rpc-serve + :nested: full + +.. click:: swh.scheduler.cli.celery_monitor:celery_monitor + :prog: swh scheduler celery-monitor + :nested: full diff --git a/docs/index.rst b/docs/index.rst index 5935f97..bf185cb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,168 +1,170 @@ .. _swh-scheduler: Software Heritage - Job scheduler ================================= Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Description ----------- This module provides a scheduler service for the Software Heritage platform. It allows to define tasks with a number of properties. In this documentation, we will call these swh-tasks to prevent confusion. These swh-tasks are stored in a database, and a HTTP-based RPC service is provided to create or find existing swh-task declarations. The execution model for these swh-tasks is using Celery. Thus, each swh-task type defined in the database must have a (series of) celery worker capable of executing such a swh-task. Then a number of services are also provided to manage the scheduling of these swh-tasks as Celery tasks. The `scheduler-runner` service is a daemon that regularly looks for swh-tasks in the database that should be scheduled. For each of the selected swh-task, a Celery task is instantiated. The `scheduler-listener` service is a daemon that listen to the Celery event bus and maintain scheduled swh-tasks workflow status. SWH Task Model ~~~~~~~~~~~~~~ Each swh-task-type is the declaration of a type of swh-task. Each swh-task-type have the following fields: - `type`: Name of the swh-task type; can be anything but must be unique, - `description`: Human-readable task description - `backend_name`: Name of the task in the job-running backend, - `default_interval`: Default interval for newly scheduled tasks, - `min_interval`: Minimum interval between two runs of a task, - `max_interval`: Maximum interval between two runs of a task, - `backoff_factor`: Adjustment factor for the backoff between two task runs, - `max_queue_length`: Maximum length of the queue for this type of tasks, - `num_retries`: Default number of retries on transient failures, - `retry_delay`: Retry delay for the task, Existing swh-task-types can be listed using the `swh scheduler` command line tool:: $ swh scheduler task-type list Known task types: check-deposit: Pre-checking deposit step before loading into swh archive index-fossology-license: Fossology license indexer task load-git: Update an origin of type git load-hg: Update an origin of type mercurial You can see the details of a swh-task-type:: $ swh scheduler task-type list -v -t load-git Known task types: load-git: swh.loader.git.tasks.UpdateGitRepository Update an origin of type git interval: 64 days, 0:00:00 [12:00:00, 64 days, 0:00:00] backoff_factor: 2.0 max_queue_length: 5000 num_retries: None retry_delay: None An swh-task is an 'instance' of such a swh-task-type, and consists in: - `arguments`: Arguments passed to the underlying job scheduler, - `next_run`: Next run of this task should be run on or after that time, - `current_interval`: Interval between two runs of this task, taking into account the backoff factor, - `policy`: Whether the task is "one-shot" or "recurring", - `retries_left`: Number of "short delay" retries of the task in case of transient failure, - `priority`: Priority of the task, - `id`: Internal task identifier, - `type`: References task_type table, - `status`: Task status ( among "next_run_not_scheduled", "next_run_scheduled", "completed", "disabled"). So a swh-task consist basically in: - a set of parameters defining how the scheduling of the swh-task is handled, - a set of parameters to specify the retry policy in case of transient failure upon execution, - a set of parameters that defines the job to be done (`bakend_name` + `arguments`). You can list pending swh-tasks (tasks that are to be scheduled ASAP):: $ swh scheduler task list-pending load-git --limit 2 Found 1 load-git tasks Task 9052257 Next run: 15 days ago (2019-06-25 10:35:10+00:00) Interval: 2 days, 0:00:00 Type: load-git Policy: recurring Args: 'https://github.com/turtl/mobile' Keyword args: Looking for existing swh-task can be done via the command line tool:: $ swh scheduler task list -t load-hg --limit 2 Found 2 tasks Task 168802702 Next run: in 4 hours (2019-07-10 17:56:48+00:00) Interval: 1 day, 0:00:00 Type: load-hg Policy: recurring Status: next_run_not_scheduled Priority: Args: 'https://bitbucket.org/kepung/pypy' Keyword args: Task 169800445 Next run: in a month (2019-08-10 17:54:24+00:00) Interval: 32 days, 0:00:00 Type: load-hg Policy: recurring Status: next_run_not_scheduled Priority: Args: 'https://bitbucket.org/lunixbochs/pypy-1' Keyword args: Writing a new worker for a new swh-task-type -------------------------------------------- When you want to add a new swh-task-type, you need a celery worker backend capable of executing this new task-type instances. Celery workers for swh-scheduler based tasks should be started using the Celery app in `swh.scheduler.celery_config`. This later, among other things, provides a loading mechanism for task types based on pkg_resources declared plugins under the `[swh.workers]` entry point. TODO: add a fully working example of a dumb task. Reference Documentation ----------------------- .. toctree:: :maxdepth: 2 + cli + simulator /apidoc/swh.scheduler diff --git a/docs/simulator.rst b/docs/simulator.rst new file mode 100644 index 0000000..923d71a --- /dev/null +++ b/docs/simulator.rst @@ -0,0 +1,65 @@ +.. _swh-scheduler-simulator: + +Software Heritage Scheduler Simulator +===================================== + +This component simulates the interaction between the scheduling and loading +infrastructure of Software Heritage. This allows quick(er) development of new +task scheduling policies without having to wait for the actual infrastructure +to perform (heavy) loading tasks. + +Simulator components +-------------------- + +- real instance of the scheduler database +- simulated task queues: replaces RabbitMQ with simple in-memory structures +- simulated workers: replaces Celery with simple while loops +- simulated load tasks: replaces loaders with noops that take a certain time, + and generate synthetic OriginVisitStatus objects +- simulated archive -> scheduler feedback loop: OriginVisitStatus objects are + pushed to a simple queue which gets processed by the scheduler journal + client's process function directly (instead of going through swh.storage and + swh.journal (kafka)) + +In short, only the scheduler database and scheduler logic is kept; every other +component (RabbitMQ, Celery, Kafka, SWH loaders, SWH storage) is either replaced +with an barebones in-process utility, or removed entirely. + +Installing the simulator +------------------------ + +The simulator depends on SimPy and other specific libraries. To install them, +please use: + +.. code-block:: bash + + pip install 'swh.scheduler[simulator]' + +Running the simulator +--------------------- + +The simulator uses a real instance of the scheduler database, which is (at +least for now) persistent across runs of the simulator. You need to set that up +beforehand: + +.. code-block:: bash + + # if you want to use a temporary instance of postgresql + eval `pifpaf run postgresql` + + # Set this variable for the simulator to know which db to connect to. pifpaf + # sets other variables like PGPORT, PGHOST, ... + export PGDATABASE=swh-scheduler + + # Create/initialize the scheduler database + swh db create scheduler -d $PGDATABASE + swh db init scheduler -d $PGDATABASE + + # This generates some data in the scheduler database. You can also feed the + # database with more realistic data, e.g. from a lister or from a dump of the + # production database. + swh scheduler -d "dbname=$PGDATABASE" simulator fill-test-data + + # Run the simulator itself, interacting with the scheduler database you've + # just seeded. + swh scheduler -d "dbname=$PGDATABASE" simulator run --scheduler origin_scheduler diff --git a/mypy.ini b/mypy.ini index 070dde4..f8f8bcc 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,36 +1,45 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-arrow.*] ignore_missing_imports = True [mypy-celery.*] ignore_missing_imports = True +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-elasticsearch.*] ignore_missing_imports = True [mypy-humanize.*] ignore_missing_imports = True [mypy-kombu.*] ignore_missing_imports = True [mypy-pika.*] ignore_missing_imports = True +[mypy-plotille.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psycopg2.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-pytest_postgresql.*] ignore_missing_imports = True + +[mypy-simpy.*] +ignore_missing_imports = True diff --git a/requirements-journal.txt b/requirements-journal.txt new file mode 100644 index 0000000..d85a23c --- /dev/null +++ b/requirements-journal.txt @@ -0,0 +1 @@ +swh.journal diff --git a/requirements-simulator.txt b/requirements-simulator.txt new file mode 100644 index 0000000..41acb4f --- /dev/null +++ b/requirements-simulator.txt @@ -0,0 +1,2 @@ +plotille +simpy>=3,<4 diff --git a/setup.py b/setup.py index b216dce..1d1e5e4 100755 --- a/setup.py +++ b/setup.py @@ -1,70 +1,76 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() -def parse_requirements(name=None): - if name: - reqf = "requirements-%s.txt" % name - else: - reqf = "requirements.txt" - +def parse_requirements(*names): requirements = [] - if not path.exists(reqf): - return requirements + for name in names: + if name: + reqf = "requirements-%s.txt" % name + else: + reqf = "requirements.txt" + + if not path.exists(reqf): + return requirements - with open(reqf) as f: - for line in f.readlines(): - line = line.strip() - if not line or line.startswith("#"): - continue - requirements.append(line) + with open(reqf) as f: + for line in f.readlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + requirements.append(line) return requirements setup( name="swh.scheduler", description="Software Heritage Scheduler", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSCH/", packages=find_packages(), setup_requires=["setuptools-scm"], use_scm_version=True, - install_requires=parse_requirements() + parse_requirements("swh"), - extras_require={"testing": parse_requirements("test")}, + install_requires=parse_requirements(None, "swh"), + extras_require={ + "testing": parse_requirements("test", "journal", "simulator"), + "journal": parse_requirements("journal"), + "simulator": parse_requirements("simulator"), + }, include_package_data=True, entry_points=""" [swh.cli.subcommands] scheduler=swh.scheduler.cli + scheduler-journal=swh.scheduler.cli.journal """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-scheduler", "Documentation": "https://docs.softwareheritage.org/devel/swh-scheduler/", }, ) diff --git a/sql/updates/18.sql b/sql/updates/18.sql new file mode 100644 index 0000000..f8cb915 --- /dev/null +++ b/sql/updates/18.sql @@ -0,0 +1,8 @@ + +insert into dbversion (version, release, description) + values (18, now(), 'Work In Progress'); + +alter table listed_origins add column last_scheduled timestamptz; +comment on column listed_origins.last_scheduled is 'Time when this origin was scheduled to be visited last'; + +create index on listed_origins (last_scheduled); diff --git a/sql/updates/19.sql b/sql/updates/19.sql new file mode 100644 index 0000000..f1a5736 --- /dev/null +++ b/sql/updates/19.sql @@ -0,0 +1,22 @@ +insert into dbversion (version, release, description) + values (19, now(), 'Work In Progress'); + +create table origin_visit_stats ( + url text not null, + visit_type text not null, + last_eventful timestamptz, + last_uneventful timestamptz, + last_failed timestamptz, + last_notfound timestamptz, + last_snapshot bytea, + + primary key (url, visit_type) +); + +comment on column origin_visit_stats.url is 'Origin URL'; +comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; +comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; +comment on column origin_visit_stats.last_uneventful is 'Date of the last uneventful event'; +comment on column origin_visit_stats.last_failed is 'Date of the last failed event'; +comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; +comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; diff --git a/sql/updates/20.sql b/sql/updates/20.sql new file mode 100644 index 0000000..3498c07 --- /dev/null +++ b/sql/updates/20.sql @@ -0,0 +1,6 @@ + +insert into dbversion (version, release, description) + values (20, now(), 'Work In Progress'); + +create index on listed_origins (visit_type, last_scheduled); +drop index listed_origins_last_scheduled_idx; diff --git a/sql/updates/23.sql b/sql/updates/23.sql new file mode 100644 index 0000000..392f818 --- /dev/null +++ b/sql/updates/23.sql @@ -0,0 +1,71 @@ +insert into dbversion (version, release, description) + values (23, now(), 'Work In Progress'); + +create or replace function swh_scheduler_update_task_on_task_end () + returns trigger + language plpgsql +as $$ +declare + cur_task task%rowtype; + cur_task_type task_type%rowtype; + adjustment_factor float; + new_interval interval; +begin + select * from task where id = new.task into cur_task; + select * from task_type where type = cur_task.type into cur_task_type; + + case + when new.status = 'permfailed' then + update task + set status = 'disabled' + where id = cur_task.id; + when new.status in ('eventful', 'uneventful') then + case + when cur_task.policy = 'oneshot' then + update task + set status = 'completed' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + if new.status = 'uneventful' then + adjustment_factor := 1/cur_task_type.backoff_factor; + else + adjustment_factor := 1/cur_task_type.backoff_factor; + end if; + new_interval := greatest( + cur_task_type.min_interval, + least( + cur_task_type.max_interval, + adjustment_factor * cur_task.current_interval)); + update task + set status = 'next_run_not_scheduled', + next_run = new.ended + new_interval, + current_interval = new_interval, + retries_left = coalesce(cur_task_type.num_retries, 0) + where id = cur_task.id; + end case; + else -- new.status in 'failed', 'lost' + if cur_task.retries_left > 0 then + update task + set status = 'next_run_not_scheduled', + next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'), + retries_left = cur_task.retries_left - 1 + where id = cur_task.id; + else -- no retries left + case + when cur_task.policy = 'oneshot' then + update task + set status = 'disabled' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + update task + set status = 'next_run_not_scheduled', + next_run = new.ended + cur_task.current_interval, + retries_left = coalesce(cur_task_type.num_retries, 0) + where id = cur_task.id; + end case; + end if; -- retries + end case; + return null; +end; +$$; + diff --git a/sql/updates/24.sql b/sql/updates/24.sql new file mode 100644 index 0000000..25d43d0 --- /dev/null +++ b/sql/updates/24.sql @@ -0,0 +1,9 @@ +insert into dbversion (version, release, description) + values (24, now(), 'Work In Progress'); + +alter table origin_visit_stats add column last_scheduled timestamptz; +comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; + +-- no need for a proper migration script of this last_schedules column: this +-- have not been published or deployed; just drop it +alter table listed_origins drop column last_scheduled; diff --git a/sql/updates/25.sql b/sql/updates/25.sql new file mode 100644 index 0000000..c70f87d --- /dev/null +++ b/sql/updates/25.sql @@ -0,0 +1,64 @@ +insert into dbversion (version, release, description) + values (25, now(), 'Work In Progress'); + +create table scheduler_metrics ( + lister_id uuid not null references listers(id), + visit_type text not null, + last_update timestamptz not null, + origins_known int not null default 0, + origins_enabled int not null default 0, + origins_never_visited int not null default 0, + origins_with_pending_changes int not null default 0, + + primary key (lister_id, visit_type) +); + +comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; +comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; +comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; +comment on column scheduler_metrics.last_update is 'Last update of these metrics'; +comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; +comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; +comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been successfully visited'; +comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; + + +create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) + returns setof scheduler_metrics + language sql +as $$ + insert into scheduler_metrics ( + lister_id, visit_type, last_update, + origins_known, origins_enabled, + origins_never_visited, origins_with_pending_changes + ) + select + lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, + count(*) as origins_known, + count(*) filter (where enabled) as origins_enabled, + count(*) filter (where + enabled and last_snapshot is NULL + ) as origins_never_visited, + count(*) filter (where + enabled and lo.last_update > greatest(ovs.last_eventful, ovs.last_uneventful) + ) as origins_with_pending_changes + from listed_origins lo + left join origin_visit_stats ovs using (url, visit_type) + where + -- update only for the requested lister + update_metrics.lister_id = lo.lister_id + -- or for all listers if the function argument is null + or update_metrics.lister_id is null + group by (lister_id, visit_type) + on conflict (lister_id, visit_type) do update + set + last_update = EXCLUDED.last_update, + origins_known = EXCLUDED.origins_known, + origins_enabled = EXCLUDED.origins_enabled, + origins_never_visited = EXCLUDED.origins_never_visited, + origins_with_pending_changes = EXCLUDED.origins_with_pending_changes + returning * +$$; + + +comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 57efe61..14a97e4 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,30 +1,32 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.8.2 +Version: 0.9.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing +Provides-Extra: journal +Provides-Extra: simulator diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index c3d8a89..cb760dc 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,106 +1,129 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE LICENSE.Celery MANIFEST.in Makefile README.md conftest.py mypy.ini pyproject.toml pytest.ini +requirements-journal.txt +requirements-simulator.txt requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini data/README.md data/elastic-template.json data/update-index-settings.json docs/.gitignore docs/Makefile +docs/cli.rst docs/conf.py docs/index.rst +docs/simulator.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile sql/updates/02.sql sql/updates/03.sql sql/updates/04.sql sql/updates/05.sql sql/updates/06.sql sql/updates/07.sql sql/updates/08.sql sql/updates/09.sql sql/updates/10.sql sql/updates/11.sql sql/updates/12.sql sql/updates/13.sql sql/updates/14.sql sql/updates/15.sql sql/updates/16.sql sql/updates/17.sql +sql/updates/18.sql +sql/updates/19.sql +sql/updates/20.sql +sql/updates/23.sql +sql/updates/24.sql +sql/updates/25.sql swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli_utils.py swh/scheduler/elasticsearch_memory.py swh/scheduler/exc.py swh/scheduler/interface.py +swh/scheduler/journal_client.py swh/scheduler/model.py swh/scheduler/py.typed swh/scheduler/pytest_plugin.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/serializers.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/pika_listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py swh/scheduler/cli/admin.py swh/scheduler/cli/celery_monitor.py +swh/scheduler/cli/journal.py +swh/scheduler/cli/origin.py +swh/scheduler/cli/simulator.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py swh/scheduler/cli/utils.py +swh/scheduler/simulator/__init__.py +swh/scheduler/simulator/common.py +swh/scheduler/simulator/origin_scheduler.py +swh/scheduler/simulator/origins.py +swh/scheduler/simulator/task_scheduler.py swh/scheduler/sql/10-superuser-init.sql swh/scheduler/sql/30-schema.sql swh/scheduler/sql/40-func.sql swh/scheduler/sql/50-data.sql swh/scheduler/sql/60-indexes.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/common.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py swh/scheduler/tests/test_cli_celery_monitor.py +swh/scheduler/tests/test_cli_journal.py +swh/scheduler/tests/test_cli_origin.py swh/scheduler/tests/test_cli_task_type.py swh/scheduler/tests/test_common.py swh/scheduler/tests/test_init.py +swh/scheduler/tests/test_journal_client.py swh/scheduler/tests/test_model.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py +swh/scheduler/tests/test_simulator.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/es/__init__.py swh/scheduler/tests/es/conftest.py swh/scheduler/tests/es/test_backend_es.py swh/scheduler/tests/es/test_cli_task.py swh/scheduler/tests/es/test_elasticsearch_memory.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/entry_points.txt b/swh.scheduler.egg-info/entry_points.txt index 7f54aac..85f4ea9 100644 --- a/swh.scheduler.egg-info/entry_points.txt +++ b/swh.scheduler.egg-info/entry_points.txt @@ -1,4 +1,5 @@ [swh.cli.subcommands] scheduler=swh.scheduler.cli + scheduler-journal=swh.scheduler.cli.journal \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index d33f785..ab6f0f2 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,21 +1,31 @@ attrs attrs-strict celery!=5.0.3,>=4.3 Click elasticsearch>5.4 flask humanize pika>=1.1.0 psycopg2 pyyaml setuptools typing-extensions swh.core[db,http]>=0.5 swh.storage>=0.11.1 +[journal] +swh.journal + +[simulator] +plotille +simpy<4,>=3 + [testing] pytest pytest-mock celery>=4.3 hypothesis>=3.11.0 swh.lister +swh.journal +plotille +simpy<4,>=3 diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 6234f7c..f6edcb1 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,727 +1,956 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import json import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID import attr +from psycopg2.errors import CardinalityViolation import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.scheduler.utils import utcnow -from .exc import StaleData +from .exc import SchedulerException, StaleData, UnknownPolicy from .model import ( ListedOrigin, ListedOriginPageToken, Lister, + OriginVisitStats, PaginatedListedOriginList, + SchedulerMetrics, ) logger = logging.getLogger(__name__) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extras.register_uuid() def format_query(query, keys): """Format a query with the given keys""" query_keys = ", ".join(keys) placeholders = ", ".join(["%s"] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) class SchedulerBackend: """Backend for the Software Heritage scheduling database. """ def __init__(self, db, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = BaseDb(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() task_type_keys = [ "type", "description", "backend_name", "default_interval", "min_interval", "max_interval", "backoff_factor", "max_queue_length", "num_retries", "retry_delay", ] @db_transaction() def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] query = format_query( """insert into task_type ({keys}) values ({placeholders}) on conflict do nothing""", keys, ) cur.execute(query, [task_type[key] for key in keys]) @db_transaction() def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" query = format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cur.execute(query, (task_type_name,)) return cur.fetchone() @db_transaction() def get_task_types(self, db=None, cur=None): """Retrieve all registered task types""" query = format_query("select {keys} from task_type", self.task_type_keys,) cur.execute(query) return cur.fetchall() + @db_transaction() + def get_lister( + self, name: str, instance_name: Optional[str] = None, db=None, cur=None + ) -> Optional[Lister]: + """Retrieve information about the given instance of the lister from the + database. + """ + if instance_name is None: + instance_name = "" + + select_cols = ", ".join(Lister.select_columns()) + + query = f""" + select {select_cols} from listers + where (name, instance_name) = (%s, %s) + """ + + cur.execute(query, (name, instance_name)) + + ret = cur.fetchone() + if not ret: + return None + + return Lister(**ret) + @db_transaction() def get_or_create_lister( self, name: str, instance_name: Optional[str] = None, db=None, cur=None ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ if instance_name is None: instance_name = "" select_cols = ", ".join(Lister.select_columns()) insert_cols, insert_meta = ( ", ".join(tup) for tup in Lister.insert_columns_and_metavars() ) query = f""" with added as ( insert into listers ({insert_cols}) values ({insert_meta}) on conflict do nothing returning {select_cols} ) select {select_cols} from added union all select {select_cols} from listers where (name, instance_name) = (%(name)s, %(instance_name)s); """ cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name))) return Lister(**cur.fetchone()) @db_transaction() def update_lister(self, lister: Lister, db=None, cur=None) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ select_cols = ", ".join(Lister.select_columns()) set_vars = ", ".join( f"{col} = {meta}" for col, meta in zip(*Lister.insert_columns_and_metavars()) ) query = f"""update listers set {set_vars} where id=%(id)s and updated=%(updated)s returning {select_cols}""" cur.execute(query, attr.asdict(lister)) updated = cur.fetchone() if not updated: raise StaleData("Stale data; Lister state not updated") return Lister(**updated) @db_transaction() def record_listed_origins( self, listed_origins: Iterable[ListedOrigin], db=None, cur=None ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ pk_cols = ListedOrigin.primary_key_columns() select_cols = ListedOrigin.select_columns() insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars() upsert_cols = [col for col in insert_cols if col not in pk_cols] upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols) query = f"""INSERT into listed_origins ({", ".join(insert_cols)}) VALUES %s ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE SET {upsert_set} RETURNING {", ".join(select_cols)} """ ret = psycopg2.extras.execute_values( cur=cur, sql=query, argslist=(attr.asdict(origin) for origin in listed_origins), template=f"({', '.join(insert_meta)})", page_size=1000, fetch=True, ) return [ListedOrigin(**d) for d in ret] @db_transaction() def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, db=None, cur=None, ) -> PaginatedListedOriginList: """Get information on the listed origins matching either the `url` or `lister_id`, or both arguments. """ query_filters: List[str] = [] query_params: List[Union[int, str, UUID, Tuple[UUID, str]]] = [] if lister_id: query_filters.append("lister_id = %s") query_params.append(lister_id) if url is not None: query_filters.append("url = %s") query_params.append(url) if page_token is not None: query_filters.append("(lister_id, url) > %s") # the typeshed annotation for tuple() is too strict. query_params.append(tuple(page_token)) # type: ignore query_params.append(limit) select_cols = ", ".join(ListedOrigin.select_columns()) if query_filters: where_clause = "where %s" % (" and ".join(query_filters)) else: where_clause = "" query = f"""SELECT {select_cols} from listed_origins {where_clause} ORDER BY lister_id, url LIMIT %s""" cur.execute(query, tuple(query_params)) origins = [ListedOrigin(**d) for d in cur] if len(origins) == limit: page_token = (origins[-1].lister_id, origins[-1].url) else: page_token = None return PaginatedListedOriginList(origins, page_token) + @db_transaction() + def grab_next_visits( + self, visit_type: str, count: int, policy: str, db=None, cur=None, + ) -> List[ListedOrigin]: + """Get at most the `count` next origins that need to be visited with + the `visit_type` loader according to the given scheduling `policy`. + + This will mark the origins as "being visited" in the listed_origins + table, to avoid scheduling multiple visits to the same origin. + """ + origin_select_cols = ", ".join(ListedOrigin.select_columns()) + + # TODO: filter on last_scheduled "too recent" to avoid always + # re-scheduling the same tasks. + where_clauses = [ + "enabled", # "NOT enabled" = the lister said the origin no longer exists + "visit_type = %s", + ] + if policy == "oldest_scheduled_first": + order_by = "origin_visit_stats.last_scheduled NULLS FIRST" + else: + raise UnknownPolicy(f"Unknown scheduling policy {policy}") + + select_query = f""" + SELECT + {origin_select_cols} + FROM + listed_origins + LEFT JOIN + origin_visit_stats USING (url, visit_type) + WHERE + {" AND ".join(where_clauses)} + ORDER BY + {order_by} + LIMIT %s + """ + + query = f""" + WITH selected_origins AS ( + {select_query} + ), + update_stats AS ( + INSERT INTO + origin_visit_stats ( + url, visit_type, last_scheduled + ) + SELECT + url, visit_type, now() + FROM + selected_origins + ON CONFLICT (url, visit_type) DO UPDATE + SET last_scheduled = GREATEST( + origin_visit_stats.last_scheduled, + EXCLUDED.last_scheduled + ) + ) + SELECT + * + FROM + selected_origins + """ + + cur.execute(query, (visit_type, count)) + return [ListedOrigin(**d) for d in cur] + task_create_keys = [ "type", "arguments", "next_run", "policy", "status", "retries_left", "priority", ] task_keys = task_create_keys + ["id", "current_interval"] @db_transaction() def create_tasks(self, tasks, policy="recurring", db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cur.execute("select swh_scheduler_mktemp_task()") db.copy_to( tasks, "tmp_task", self.task_create_keys, default_values={"policy": policy, "status": "next_run_not_scheduled"}, cur=cur, ) query = format_query( "select {keys} from swh_scheduler_create_tasks_from_temp()", self.task_keys, ) cur.execute(query) return cur.fetchall() @db_transaction() def set_status_tasks( self, task_ids, status="disabled", next_run=None, db=None, cur=None ): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args = [status] if next_run: query.append(", next_run = %s") args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) cur.execute("".join(query), args) @db_transaction() def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" return self.set_status_tasks(task_ids, db=db, cur=cur) @db_transaction() def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, db=None, cur=None, ): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append("id = %s") else: where.append("id in %s") task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append("type = %s") else: where.append("type in %s") task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append("status = %s") else: where.append("status in %s") status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append("priority = %s") else: priority = tuple(priority) where.append("priority in %s") args.append(priority) if policy: where.append("policy = %s") args.append(policy) if before: where.append("next_run <= %s") args.append(before) if after: where.append("next_run >= %s") args.append(after) query = "select * from task" if where: query += " where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" query = format_query("select {keys} from task where id in %s", self.task_keys) cur.execute(query, (tuple(task_ids),)) return cur.fetchall() @db_transaction() def peek_ready_tasks( self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, db=None, cur=None, ): """Fetch the list of ready tasks Args: task_type (str): filtering task per their type timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks (with no priority) num_tasks_priority (int): only peek at num_tasks_priority tasks (with priority) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)""", (task_type, timestamp, num_tasks, num_tasks_priority), ) logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def grab_ready_tasks( self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, db=None, cur=None, ): """Fetch the list of ready tasks, and mark them as scheduled Args: task_type (str): filtering task per their type timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks (with no priority) num_tasks_priority (int): only grab oneshot num_tasks tasks (with priorities) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)""", (task_type, timestamp, num_tasks, num_tasks_priority), ) logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() task_run_create_keys = ["task", "backend_id", "scheduled", "metadata"] @db_transaction() def schedule_task_run( self, task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)", (task_id, backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cur.execute("select swh_scheduler_mktemp_task_run()") db.copy_to(task_runs, "tmp_task_run", self.task_run_create_keys, cur=cur) cur.execute("select swh_scheduler_schedule_task_run_from_temp()") @db_transaction() def start_task_run( self, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_start_task_run(%s, %s, %s)", (backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, db=None, cur=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_end_task_run(%s, %s, %s, %s)", (backend_id, status, metadata, timestamp), ) return cur.fetchone() @db_transaction() def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ assert not page_token or isinstance(page_token, str) last_id = -1 if page_token is None else int(page_token) tasks = [] cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit + 1), ) for row in cur: task = dict(row) # nested type index does not accept bare values # transform it as a dict to comply with this task["arguments"]["args"] = { i: v for i, v in enumerate(task["arguments"]["args"]) } kwargs = task["arguments"]["kwargs"] task["arguments"]["kwargs"] = json.dumps(kwargs) tasks.append(task) if len(tasks) >= limit + 1: # remains data, add pagination information result = { "tasks": tasks[:limit], "next_page_token": str(tasks[-1]["task_id"]), } else: result = {"tasks": tasks} return result @db_transaction() def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id["task_id"]) _task_run_ids.append(task_id["task_run_id"]) cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids), ) task_run_keys = [ "id", "task", "backend_id", "scheduled", "started", "ended", "metadata", "status", ] @db_transaction() def get_task_runs(self, task_ids, limit=None, db=None, cur=None): """Search task run for a task id""" where = [] args = [] if task_ids: if isinstance(task_ids, (str, int)): where.append("task = %s") else: where.append("task in %s") task_ids = tuple(task_ids) args.append(task_ids) else: return () query = "select * from task_run where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_priority_ratios(self, db=None, cur=None): cur.execute("select id, ratio from priority_ratio") return {row["id"]: row["ratio"] for row in cur.fetchall()} + + @db_transaction() + def origin_visit_stats_upsert( + self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None + ) -> None: + pk_cols = OriginVisitStats.primary_key_columns() + insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() + + query = f""" + INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) + VALUES %s + ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE + SET last_eventful = ( + select max(eventful.date) from (values + (excluded.last_eventful), + (ovi.last_eventful) + ) as eventful(date) + ), + last_uneventful = ( + select max(uneventful.date) from (values + (excluded.last_uneventful), + (ovi.last_uneventful) + ) as uneventful(date) + ), + last_failed = ( + select max(failed.date) from (values + (excluded.last_failed), + (ovi.last_failed) + ) as failed(date) + ), + last_notfound = ( + select max(notfound.date) from (values + (excluded.last_notfound), + (ovi.last_notfound) + ) as notfound(date) + ), + last_snapshot = (select + case + when ovi.last_eventful < excluded.last_eventful then excluded.last_snapshot + else coalesce(ovi.last_snapshot, excluded.last_snapshot) + end + ) + """ # noqa + + try: + psycopg2.extras.execute_values( + cur=cur, + sql=query, + argslist=( + attr.asdict(visit_stats) for visit_stats in origin_visit_stats + ), + template=f"({', '.join(insert_meta)})", + page_size=1000, + fetch=False, + ) + except CardinalityViolation as e: + raise SchedulerException(repr(e)) + + @db_transaction() + def origin_visit_stats_get( + self, ids: Iterable[Tuple[str, str]], db=None, cur=None + ) -> List[OriginVisitStats]: + if not ids: + return [] + primary_keys = tuple((origin, visit_type) for (origin, visit_type) in ids) + query = format_query( + """ + SELECT {keys} + FROM (VALUES %s) as stats(url, visit_type) + INNER JOIN origin_visit_stats USING (url, visit_type) + """, + OriginVisitStats.select_columns(), + ) + psycopg2.extras.execute_values(cur=cur, sql=query, argslist=primary_keys) + return [OriginVisitStats(**row) for row in cur.fetchall()] + + @db_transaction() + def update_metrics( + self, + lister_id: Optional[UUID] = None, + timestamp: Optional[datetime.datetime] = None, + db=None, + cur=None, + ) -> List[SchedulerMetrics]: + """Update the performance metrics of this scheduler instance. + + Returns the updated metrics. + + Args: + lister_id: if passed, update the metrics only for this lister instance + timestamp: if passed, the date at which we're updating the metrics, + defaults to the database NOW() + """ + query = format_query( + "SELECT {keys} FROM update_metrics(%s, %s)", + SchedulerMetrics.select_columns(), + ) + cur.execute(query, (lister_id, timestamp)) + return [SchedulerMetrics(**row) for row in cur.fetchall()] + + @db_transaction() + def get_metrics( + self, + lister_id: Optional[UUID] = None, + visit_type: Optional[str] = None, + db=None, + cur=None, + ) -> List[SchedulerMetrics]: + """Retrieve the performance metrics of this scheduler instance. + + Args: + lister_id: filter the metrics for this lister instance only + visit_type: filter the metrics for this visit type only + """ + + where_filters = [] + where_args = [] + if lister_id: + where_filters.append("lister_id = %s") + where_args.append(str(lister_id)) + if visit_type: + where_filters.append("visit_type = %s") + where_args.append(visit_type) + + where_clause = "" + if where_filters: + where_clause = f"where {' and '.join(where_filters)}" + + query = format_query( + "SELECT {keys} FROM scheduler_metrics %s" % where_clause, + SchedulerMetrics.select_columns(), + ) + + cur.execute(query, tuple(where_args)) + return [SchedulerMetrics(**row) for row in cur.fetchall()] diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py index 49c6f38..9a2ab19 100644 --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -1,96 +1,99 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group +# If you're looking for subcommand imports, they are further down this file to +# avoid a circular import! + @swh_cli_group.group( name="scheduler", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--database", "-d", default=None, help="Scheduling database DSN (imply cls is 'local')", ) @click.option( "--url", "-u", default=None, help="Scheduler's url access (imply cls is 'remote')" ) @click.option( "--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console" ) @click.pass_context def cli(ctx, config_file, database, url, no_stdout): """Software Heritage Scheduler tools. Use a local scheduler instance by default (plugged to the main scheduler db). """ try: from psycopg2 import OperationalError except ImportError: class OperationalError(Exception): pass from swh.core import config from swh.scheduler import DEFAULT_CONFIG, get_scheduler ctx.ensure_object(dict) logger = logging.getLogger(__name__) scheduler = None conf = config.read(config_file, DEFAULT_CONFIG) if "scheduler" not in conf: raise ValueError("missing 'scheduler' configuration") if database: conf["scheduler"]["cls"] = "local" conf["scheduler"]["db"] = database elif url: conf["scheduler"]["cls"] = "remote" conf["scheduler"]["url"] = url sched_conf = conf["scheduler"] try: logger.debug("Instantiating scheduler with %s", sched_conf) scheduler = get_scheduler(**sched_conf) except (ValueError, OperationalError): # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj["scheduler"] = scheduler ctx.obj["config"] = conf -from . import admin, celery_monitor, task, task_type # noqa +from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa def main(): import click.core click.core.DEPRECATED_HELP_NOTICE = """ DEPRECATED! Please use the command 'swh scheduler'.""" cli.deprecated = True return cli(auto_envvar_prefix="SWH_SCHEDULER") if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/journal.py b/swh/scheduler/cli/journal.py new file mode 100644 index 0000000..9551164 --- /dev/null +++ b/swh/scheduler/cli/journal.py @@ -0,0 +1,59 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import click + +from . import cli as cli_scheduler_group + + +@cli_scheduler_group.command("journal-client") +@click.pass_context +@click.option( + "--stop-after-objects", + "-m", + default=None, + type=int, + help="Maximum number of objects to replay. Default is to run forever.", +) +def visit_stats_journal_client(ctx, stop_after_objects): + """Keep the the origin visits stats table up to date from a swh kafka journal + """ + from functools import partial + + from swh.journal.client import get_journal_client + from swh.scheduler.journal_client import process_journal_objects + + if not ctx.obj["scheduler"]: + raise ValueError("Scheduler class (local/remote) must be instantiated") + + scheduler = ctx.obj["scheduler"] + config = ctx.obj["config"] + + if "journal" not in config: + raise ValueError("Missing 'journal' configuration key") + + journal_cfg = config["journal"] + journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( + "stop_after_objects" + ) + + client = get_journal_client( + cls="kafka", + object_types=["origin_visit_status"], + prefix="swh.journal.objects", + **journal_cfg, + ) + worker_fn = partial(process_journal_objects, scheduler=scheduler,) + nb_messages = 0 + try: + nb_messages = client.process(worker_fn) + print(f"Processed {nb_messages} message(s).") + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py new file mode 100644 index 0000000..1c72fe1 --- /dev/null +++ b/swh/scheduler/cli/origin.py @@ -0,0 +1,182 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from __future__ import annotations + +from typing import TYPE_CHECKING, Iterable, List, Optional + +import click + +from . import cli + +if TYPE_CHECKING: + from uuid import UUID + + from ..interface import SchedulerInterface + from ..model import ListedOrigin + + +@cli.group("origin") +@click.pass_context +def origin(ctx): + """Manipulate listed origins.""" + if not ctx.obj["scheduler"]: + raise ValueError("Scheduler class (local/remote) must be instantiated") + + +def format_origins( + origins: List[ListedOrigin], + fields: Optional[List[str]] = None, + with_header: bool = True, +) -> Iterable[str]: + """Format a list of origins as CSV. + + Arguments: + origins: list of origins to output + fields: optional list of fields to output (defaults to all fields) + with_header: if True, output a CSV header. + """ + import csv + from io import StringIO + + import attr + + from ..model import ListedOrigin + + expected_fields = [field.name for field in attr.fields(ListedOrigin)] + if not fields: + fields = expected_fields + + unknown_fields = set(fields) - set(expected_fields) + if unknown_fields: + raise ValueError( + "Unknown ListedOrigin field(s): %s" % ", ".join(unknown_fields) + ) + + output = StringIO() + writer = csv.writer(output) + + def csv_row(data): + """Return a single CSV-formatted row. We clear the output buffer after we're + done to keep it reasonably sized.""" + writer.writerow(data) + output.seek(0) + ret = output.read().rstrip() + output.seek(0) + output.truncate() + return ret + + if with_header: + yield csv_row(fields) + + for origin in origins: + yield csv_row(str(getattr(origin, field)) for field in fields) + + +@origin.command("grab-next") +@click.option( + "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" +) +@click.option( + "--fields", "-f", default=None, help="Listed origin fields to print on output" +) +@click.option( + "--with-header/--without-header", + is_flag=True, + default=True, + help="Print the CSV header?", +) +@click.argument("type", type=str) +@click.argument("count", type=int) +@click.pass_context +def grab_next( + ctx, policy: str, fields: Optional[str], with_header: bool, type: str, count: int +): + """Grab the next COUNT origins to visit using the TYPE loader from the + listed origins table.""" + + if fields: + parsed_fields: Optional[List[str]] = fields.split(",") + else: + parsed_fields = None + + scheduler = ctx.obj["scheduler"] + + origins = scheduler.grab_next_visits(type, count, policy=policy) + for line in format_origins(origins, fields=parsed_fields, with_header=with_header): + click.echo(line) + + +@origin.command("schedule-next") +@click.option( + "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" +) +@click.argument("type", type=str) +@click.argument("count", type=int) +@click.pass_context +def schedule_next(ctx, policy: str, type: str, count: int): + """Send the next COUNT origin visits of the TYPE loader to the scheduler as + one-shot tasks.""" + from ..utils import utcnow + from .task import pretty_print_task + + scheduler = ctx.obj["scheduler"] + + origins = scheduler.grab_next_visits(type, count, policy=policy) + + created = scheduler.create_tasks( + [ + { + **origin.as_task_dict(), + "policy": "oneshot", + "next_run": utcnow(), + "retries_left": 1, + } + for origin in origins + ] + ) + + output = ["Created %d tasks\n" % len(created)] + for task in created: + output.append(pretty_print_task(task)) + + click.echo_via_pager("\n".join(output)) + + +@origin.command("update-metrics") +@click.option("--lister", default=None, help="Only update metrics for this lister") +@click.option( + "--instance", default=None, help="Only update metrics for this lister instance" +) +@click.pass_context +def update_metrics(ctx, lister: Optional[str], instance: Optional[str]): + """Update the scheduler metrics on listed origins. + + Examples: + swh scheduler origin update-metrics + swh scheduler origin update-metrics --lister github + swh scheduler origin update-metrics --lister phabricator --instance llvm + """ + import json + + import attr + + scheduler: SchedulerInterface = ctx.obj["scheduler"] + + lister_id: Optional[UUID] = None + if lister is not None: + lister_instance = scheduler.get_lister(name=lister, instance_name=instance) + if not lister_instance: + click.echo(f"Lister not found: {lister} instance={instance}") + ctx.exit(2) + assert False # for mypy + + lister_id = lister_instance.id + + def dictify_metrics(d): + return {k: str(v) for (k, v) in attr.asdict(d).items()} + + ret = scheduler.update_metrics(lister_id=lister_id) + click.echo(json.dumps(list(map(dictify_metrics, ret)), indent=4, sort_keys=True)) diff --git a/swh/scheduler/cli/simulator.py b/swh/scheduler/cli/simulator.py new file mode 100644 index 0000000..bade0d4 --- /dev/null +++ b/swh/scheduler/cli/simulator.py @@ -0,0 +1,68 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import time + +import click + +from . import cli + + +@cli.group("simulator") +def simulator(): + """Scheduler simulator.""" + pass + + +@simulator.command("fill-test-data") +@click.pass_context +def fill_test_data_command(ctx): + """Fill the scheduler with test data for simulation purposes.""" + from swh.scheduler.simulator import fill_test_data + + click.echo("Filling test data...") + start = time.monotonic() + fill_test_data(ctx.obj["scheduler"]) + runtime = time.monotonic() - start + click.echo(f"Completed in {runtime:.2f} seconds") + + +@simulator.command("run") +@click.option( + "--scheduler", + "-s", + type=click.Choice(["task_scheduler", "origin_scheduler"]), + default="origin_scheduler", + help="Scheduler to simulate", +) +@click.option( + "--policy", + "-p", + type=click.Choice(["oldest_scheduled_first"]), + default="oldest_scheduled_first", + help="Scheduling policy to simulate (only for origin_scheduler)", +) +@click.option("--runtime", "-t", type=float, help="Simulated runtime") +@click.pass_context +def run_command(ctx, scheduler, policy, runtime): + """Run the scheduler simulator. + + By default, the simulation runs forever. You can cap the simulated runtime + with the --runtime option, and you can always press Ctrl+C to interrupt the + running simulation. + + 'task_scheduler' is the "classic" task-based scheduler; 'origin_scheduler' + is the new origin-visit-aware simulator. The latter uses --policy to decide + which origins to schedule first based on information from listers. + """ + from swh.scheduler.simulator import run + + policy = policy if scheduler == "origin_scheduler" else None + run( + scheduler=ctx.obj["scheduler"], + scheduler_type=scheduler, + policy=policy, + runtime=runtime, + ) diff --git a/swh/scheduler/exc.py b/swh/scheduler/exc.py index 0c92e43..aff3360 100644 --- a/swh/scheduler/exc.py +++ b/swh/scheduler/exc.py @@ -1,17 +1,22 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information __all__ = [ "SchedulerException", "StaleData", + "UnknownPolicy", ] class SchedulerException(Exception): pass class StaleData(SchedulerException): pass + + +class UnknownPolicy(SchedulerException): + pass diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py index a2f8198..289d95d 100644 --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -1,314 +1,388 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, List, Optional + +import datetime +from typing import Any, Dict, Iterable, List, Optional, Tuple from uuid import UUID from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint from swh.scheduler.model import ( ListedOrigin, ListedOriginPageToken, Lister, + OriginVisitStats, PaginatedListedOriginList, + SchedulerMetrics, ) @runtime_checkable class SchedulerInterface(Protocol): @remote_api_endpoint("task_type/create") def create_task_type(self, task_type): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ ... @remote_api_endpoint("task_type/get") def get_task_type(self, task_type_name): """Retrieve the task type with id task_type_name""" ... @remote_api_endpoint("task_type/get_all") def get_task_types(self): """Retrieve all registered task types""" ... @remote_api_endpoint("task/create") def create_tasks(self, tasks, policy="recurring"): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ ... @remote_api_endpoint("task/set_status") def set_status_tasks(self, task_ids, status="disabled", next_run=None): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ ... @remote_api_endpoint("task/disable") def disable_tasks(self, task_ids): """Disable the tasks whose ids are listed.""" ... @remote_api_endpoint("task/search") def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, ): """Search tasks from selected criterions""" ... @remote_api_endpoint("task/get") def get_tasks(self, task_ids): """Retrieve the info of tasks whose ids are listed.""" ... @remote_api_endpoint("task/peek_ready") def peek_ready_tasks( self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, ): """Fetch the list of ready tasks Args: task_type (str): filtering task per their type timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks (with no priority) num_tasks_priority (int): only peek at num_tasks_priority tasks (with priority) Returns: a list of tasks """ ... @remote_api_endpoint("task/grab_ready") def grab_ready_tasks( self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, ): """Fetch the list of ready tasks, and mark them as scheduled Args: task_type (str): filtering task per their type timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks (with no priority) num_tasks_priority (int): only grab oneshot num_tasks tasks (with priorities) Returns: a list of tasks """ ... @remote_api_endpoint("task_run/schedule_one") def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ ... @remote_api_endpoint("task_run/schedule") def mass_schedule_task_runs(self, task_runs): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ ... @remote_api_endpoint("task_run/start") def start_task_run(self, backend_id, metadata=None, timestamp=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ ... @remote_api_endpoint("task_run/end") def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ ... @remote_api_endpoint("task/filter_for_archive") def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ ... @remote_api_endpoint("task/delete_archived") def delete_archived_tasks(self, task_ids): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ ... @remote_api_endpoint("task_run/get") def get_task_runs(self, task_ids, limit=None): """Search task run for a task id""" ... + @remote_api_endpoint("lister/get") + def get_lister( + self, name: str, instance_name: Optional[str] = None + ) -> Optional[Lister]: + """Retrieve information about the given instance of the lister from the + database. + """ + ... + @remote_api_endpoint("lister/get_or_create") def get_or_create_lister( self, name: str, instance_name: Optional[str] = None ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ ... @remote_api_endpoint("lister/update") def update_lister(self, lister: Lister) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ ... @remote_api_endpoint("origins/record") def record_listed_origins( self, listed_origins: Iterable[ListedOrigin] ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ ... @remote_api_endpoint("origins/get") def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, ) -> PaginatedListedOriginList: """Get information on the listed origins matching either the `url` or `lister_id`, or both arguments. Use the `limit` and `page_token` arguments for continuation. The next page token, if any, is returned in the PaginatedListedOriginList object. """ ... + @remote_api_endpoint("origins/grab_next") + def grab_next_visits( + self, visit_type: str, count: int, policy: str + ) -> List[ListedOrigin]: + """Get at most the `count` next origins that need to be visited with + the `visit_type` loader according to the given scheduling `policy`. + + This will mark the origins as "being visited" in the listed_origins + table, to avoid scheduling multiple visits to the same origin. + """ + ... + @remote_api_endpoint("priority_ratios/get") def get_priority_ratios(self): ... + + @remote_api_endpoint("visit_stats/upsert") + def origin_visit_stats_upsert( + self, origin_visit_stats: Iterable[OriginVisitStats] + ) -> None: + """Create a new origin visit stats + """ + ... + + @remote_api_endpoint("visit_stats/get") + def origin_visit_stats_get( + self, ids: Iterable[Tuple[str, str]] + ) -> List[OriginVisitStats]: + """Retrieve the stats for an origin with a given visit type + + If some visit_stats are not found, they are filtered out of the result. So the + output list may be of length inferior to the length of the input list. + + """ + ... + + @remote_api_endpoint("scheduler_metrics/update") + def update_metrics( + self, + lister_id: Optional[UUID] = None, + timestamp: Optional[datetime.datetime] = None, + ) -> List[SchedulerMetrics]: + """Update the performance metrics of this scheduler instance. + + Returns the updated metrics. + + Args: + lister_id: if passed, update the metrics only for this lister instance + timestamp: if passed, the date at which we're updating the metrics, + defaults to the database NOW() + """ + ... + + @remote_api_endpoint("scheduler_metrics/get") + def get_metrics( + self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None + ) -> List[SchedulerMetrics]: + """Retrieve the performance metrics of this scheduler instance. + + Args: + lister_id: filter the metrics for this lister instance only + visit_type: filter the metrics for this visit type only + """ + ... diff --git a/swh/scheduler/journal_client.py b/swh/scheduler/journal_client.py new file mode 100644 index 0000000..f270737 --- /dev/null +++ b/swh/scheduler/journal_client.py @@ -0,0 +1,131 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime +from typing import Dict, List, Optional, Tuple + +import attr + +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.model import OriginVisitStats + +msg_type = "origin_visit_status" + + +def max_date(*dates: Optional[datetime]) -> datetime: + """Return the max date of given (possibly None) dates + + At least one date must be not None. + """ + datesok: Tuple[datetime, ...] = tuple(d for d in dates if d is not None) + if not datesok: + raise ValueError("At least one date should be a valid datetime") + + maxdate = datesok[0] + if len(datesok) == 1: + return maxdate + + for d in datesok[1:]: + maxdate = max(d, maxdate) + + return maxdate + + +def process_journal_objects( + messages: Dict[str, List[Dict]], *, scheduler: SchedulerInterface +) -> None: + """Read messages from origin_visit_status journal topics, then inserts them in the + scheduler "origin_visit_stats" table. + + Worker function for `JournalClient.process(worker_fn)`, after + currification of `scheduler` and `task_names`. + + """ + assert set(messages) <= { + msg_type + }, f"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types" + assert msg_type in messages, f"Expected {msg_type} messages" + + interesting_messages = [ + msg for msg in messages[msg_type] if msg["status"] not in ("created", "ongoing") + ] + + origin_visit_stats: Dict[Tuple[str, str], Dict] = { + (visit_stats.url, visit_stats.visit_type): attr.asdict(visit_stats) + for visit_stats in scheduler.origin_visit_stats_get( + list(set((vs["origin"], vs["type"]) for vs in interesting_messages)) + ) + } + + for msg_dict in interesting_messages: + origin = msg_dict["origin"] + visit_type = msg_dict["type"] + empty_object = { + "url": origin, + "visit_type": visit_type, + "last_uneventful": None, + "last_eventful": None, + "last_failed": None, + "last_notfound": None, + "last_snapshot": None, + } + pk = origin, visit_type + if pk not in origin_visit_stats: + origin_visit_stats[pk] = empty_object + visit_stats_d = origin_visit_stats[pk] + + if msg_dict["status"] == "not_found": + visit_stats_d["last_notfound"] = max_date( + msg_dict["date"], visit_stats_d.get("last_notfound") + ) + elif msg_dict["snapshot"] is None: + visit_stats_d["last_failed"] = max_date( + msg_dict["date"], visit_stats_d.get("last_failed") + ) + else: # visit with snapshot, something happened + if visit_stats_d["last_snapshot"] is None: + # first time visit with snapshot, we keep relevant information + visit_stats_d["last_eventful"] = msg_dict["date"] + visit_stats_d["last_snapshot"] = msg_dict["snapshot"] + else: + # visit with snapshot already stored, last_eventful should already be + # stored + assert visit_stats_d["last_eventful"] is not None + latest_recorded_visit_date = max_date( + visit_stats_d["last_eventful"], visit_stats_d["last_uneventful"] + ) + current_status_date = msg_dict["date"] + previous_snapshot = visit_stats_d["last_snapshot"] + if msg_dict["snapshot"] != previous_snapshot: + if ( + latest_recorded_visit_date + and current_status_date < latest_recorded_visit_date + ): + # out of order message so ignored + continue + # new eventful visit (new snapshot) + visit_stats_d["last_eventful"] = current_status_date + visit_stats_d["last_snapshot"] = msg_dict["snapshot"] + else: + # same snapshot as before + if ( + latest_recorded_visit_date + and current_status_date < latest_recorded_visit_date + ): + # we receive an old message which is an earlier "eventful" event + # than what we had, we consider the last_eventful event as + # actually an uneventful event. The true eventful message is the + # current one + visit_stats_d["last_uneventful"] = visit_stats_d[ + "last_eventful" + ] + visit_stats_d["last_eventful"] = current_status_date + else: + # uneventful event + visit_stats_d["last_uneventful"] = current_status_date + + scheduler.origin_visit_stats_upsert( + OriginVisitStats(**ovs) for ovs in origin_visit_stats.values() + ) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py index e3827ff..1889b23 100644 --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -1,193 +1,285 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Any, Dict, List, Optional, Tuple, Union from uuid import UUID import attr import attr.converters from attrs_strict import type_validator +def check_timestamptz(value) -> None: + """Checks the date has a timezone.""" + if value is not None and value.tzinfo is None: + raise ValueError("date must be a timezone-aware datetime.") + + @attr.s class BaseSchedulerModel: """Base class for database-backed objects. These database-backed objects are defined through attrs-based attributes that match the columns of the database 1:1. This is a (very) lightweight ORM. These attrs-based attributes have metadata specific to the functionality expected from these fields in the database: - `primary_key`: the column is a primary key; it should be filtered out when doing an `update` of the object - `auto_primary_key`: the column is a primary key, which is automatically handled by the database. It will not be inserted to. This must be matched with a database-side default value. - `auto_now_add`: the column is a timestamp that is set to the current time when the object is inserted, and never updated afterwards. This must be matched with a database-side default value. - `auto_now`: the column is a timestamp that is set to the current time when the object is inserted or updated. """ _pk_cols: Optional[Tuple[str, ...]] = None _select_cols: Optional[Tuple[str, ...]] = None _insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None @classmethod def primary_key_columns(cls) -> Tuple[str, ...]: """Get the primary key columns for this object type""" if cls._pk_cols is None: columns: List[str] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_primary_key", "primary_key") ): columns.append(field.name) cls._pk_cols = tuple(sorted(columns)) return cls._pk_cols @classmethod def select_columns(cls) -> Tuple[str, ...]: """Get all the database columns needed for a `select` on this object type""" if cls._select_cols is None: columns: List[str] = [] for field in attr.fields(cls): columns.append(field.name) cls._select_cols = tuple(sorted(columns)) return cls._select_cols @classmethod def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: """Get the database columns and metavars needed for an `insert` or `update` on this object type. This implements support for the `auto_*` field metadata attributes. """ if cls._insert_cols_and_metavars is None: zipped_cols_and_metavars: List[Tuple[str, str]] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_now_add", "auto_primary_key") ): continue elif field.metadata.get("auto_now"): zipped_cols_and_metavars.append((field.name, "now()")) else: zipped_cols_and_metavars.append((field.name, f"%({field.name})s")) zipped_cols_and_metavars.sort() cols, metavars = zip(*zipped_cols_and_metavars) cls._insert_cols_and_metavars = cols, metavars return cls._insert_cols_and_metavars @attr.s class Lister(BaseSchedulerModel): name = attr.ib(type=str, validator=[type_validator()]) instance_name = attr.ib(type=str, validator=[type_validator()]) # Populated by database id = attr.ib( type=Optional[UUID], validator=type_validator(), default=None, metadata={"auto_primary_key": True}, ) current_state = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) created = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) updated = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) @attr.s class ListedOrigin(BaseSchedulerModel): """Basic information about a listed origin, output by a lister""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) extra_loader_arguments = attr.ib( type=Dict[str, str], validator=[type_validator()], factory=dict ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) enabled = attr.ib(type=bool, validator=[type_validator()], default=True) first_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) last_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) + def as_task_dict(self): + return { + "type": f"load-{self.visit_type}", + "arguments": { + "args": [], + "kwargs": {"url": self.url, **self.extra_loader_arguments}, + }, + } + ListedOriginPageToken = Tuple[UUID, str] def convert_listed_origin_page_token( input: Union[None, ListedOriginPageToken, List[Union[UUID, str]]] ) -> Optional[ListedOriginPageToken]: if input is None: return None if isinstance(input, tuple): return input x, y = input assert isinstance(x, UUID) assert isinstance(y, str) return (x, y) @attr.s class PaginatedListedOriginList(BaseSchedulerModel): """A list of listed origins, with a continuation token""" origins = attr.ib(type=List[ListedOrigin], validator=[type_validator()]) next_page_token = attr.ib( type=Optional[ListedOriginPageToken], validator=[type_validator()], converter=convert_listed_origin_page_token, default=None, ) + + +@attr.s(frozen=True, slots=True) +class OriginVisitStats(BaseSchedulerModel): + """Represents an aggregated origin visits view. + """ + + url = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + visit_type = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + last_eventful = attr.ib( + type=Optional[datetime.datetime], validator=type_validator() + ) + last_uneventful = attr.ib( + type=Optional[datetime.datetime], validator=type_validator() + ) + last_failed = attr.ib(type=Optional[datetime.datetime], validator=type_validator()) + last_notfound = attr.ib( + type=Optional[datetime.datetime], validator=type_validator() + ) + last_scheduled = attr.ib( + type=Optional[datetime.datetime], validator=[type_validator()], default=None, + ) + last_snapshot = attr.ib( + type=Optional[bytes], validator=type_validator(), default=None + ) + + @last_eventful.validator + def check_last_eventful(self, attribute, value): + check_timestamptz(value) + + @last_uneventful.validator + def check_last_uneventful(self, attribute, value): + check_timestamptz(value) + + @last_failed.validator + def check_last_failed(self, attribute, value): + check_timestamptz(value) + + @last_notfound.validator + def check_last_notfound(self, attribute, value): + check_timestamptz(value) + + +@attr.s(frozen=True, slots=True) +class SchedulerMetrics(BaseSchedulerModel): + """Metrics for the scheduler, aggregated by (lister_id, visit_type)""" + + lister_id = attr.ib( + type=UUID, validator=[type_validator()], metadata={"primary_key": True} + ) + visit_type = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + + last_update = attr.ib( + type=Optional[datetime.datetime], validator=[type_validator()], default=None, + ) + + origins_known = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of known (enabled or disabled) origins""" + + origins_enabled = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of origins that were present in the latest listings""" + + origins_never_visited = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of enabled origins that have never been visited + (according to the visit cache)""" + + origins_with_pending_changes = attr.ib( + type=int, validator=[type_validator()], default=0 + ) + """Number of enabled origins with known activity (recorded by a lister) + since our last visit""" diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py new file mode 100644 index 0000000..aa83267 --- /dev/null +++ b/swh/scheduler/simulator/__init__.py @@ -0,0 +1,163 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This package runs the scheduler in a simulated environment, to evaluate +various metrics. See :ref:`swh-scheduler-simulator`. + +This module orchestrates of the simulator by initializing processes and connecting +them together; these processes are defined in modules in the package and +simulate/call specific components.""" + +from datetime import datetime, timedelta, timezone +import logging +from typing import Dict, Generator, Optional + +from simpy import Event + +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.model import ListedOrigin + +from . import origin_scheduler, task_scheduler +from .common import Environment, Queue, SimulationReport, Task +from .origins import load_task_process + +logger = logging.getLogger(__name__) + + +def update_metrics_process( + env: Environment, update_interval: int +) -> Generator[Event, None, None]: + """Update the scheduler metrics every `update_interval` seconds, and add + them to the SimulationReport""" + while True: + metrics = env.scheduler.update_metrics(timestamp=env.time) + env.report.record_metrics(env.time, metrics) + yield env.timeout(update_interval) + + +def worker_process( + env: Environment, name: str, task_queue: Queue, status_queue: Queue +) -> Generator[Event, Task, None]: + """A worker which consumes tasks from the input task_queue. Tasks + themselves send OriginVisitStatus objects to the status_queue.""" + logger.debug("%s worker %s: Start", env.time, name) + while True: + task = yield task_queue.get() + logger.debug( + "%s worker %s: Run task %s origin=%s", + env.time, + name, + task.visit_type, + task.origin, + ) + yield env.process(load_task_process(env, task, status_queue=status_queue)) + + +def setup( + env: Environment, + scheduler_type: str, + policy: Optional[str], + workers_per_type: Dict[str, int], + task_queue_capacity: int, + min_batch_size: int, + metrics_update_interval: int, +): + task_queues = { + visit_type: Queue(env, capacity=task_queue_capacity) + for visit_type in workers_per_type + } + status_queue = Queue(env) + + if scheduler_type == "origin_scheduler": + if policy is None: + raise ValueError("origin_scheduler needs a scheduling policy") + env.process( + origin_scheduler.scheduler_runner_process( + env, task_queues, policy, min_batch_size=min_batch_size + ) + ) + env.process( + origin_scheduler.scheduler_journal_client_process(env, status_queue) + ) + elif scheduler_type == "task_scheduler": + if policy is not None: + raise ValueError("task_scheduler doesn't support a scheduling policy") + env.process( + task_scheduler.scheduler_runner_process( + env, task_queues, min_batch_size=min_batch_size + ) + ) + env.process(task_scheduler.scheduler_listener_process(env, status_queue)) + else: + raise ValueError(f"Unknown scheduler type to simulate: {scheduler_type}") + + env.process(update_metrics_process(env, metrics_update_interval)) + + for visit_type, num_workers in workers_per_type.items(): + task_queue = task_queues[visit_type] + for i in range(num_workers): + worker_name = f"worker-{visit_type}-{i}" + env.process(worker_process(env, worker_name, task_queue, status_queue)) + + +def fill_test_data(scheduler: SchedulerInterface, num_origins: int = 100000): + """Fills the database with mock data to test the simulator.""" + stored_lister = scheduler.get_or_create_lister(name="example") + assert stored_lister.id is not None + + origins = [ + ListedOrigin( + lister_id=stored_lister.id, + url=f"https://example.com/{i:04d}.git", + visit_type="git", + last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), + ) + for i in range(num_origins) + ] + scheduler.record_listed_origins(origins) + + scheduler.create_tasks( + [ + { + **origin.as_task_dict(), + "policy": "recurring", + "next_run": origin.last_update, + "interval": timedelta(days=64), + } + for origin in origins + ] + ) + + +def run( + scheduler: SchedulerInterface, + scheduler_type: str, + policy: Optional[str], + runtime: Optional[int], +): + NUM_WORKERS = 48 + start_time = datetime.now(tz=timezone.utc) + env = Environment(start_time=start_time) + env.scheduler = scheduler + env.report = SimulationReport() + setup( + env, + scheduler_type=scheduler_type, + policy=policy, + workers_per_type={"git": NUM_WORKERS}, + task_queue_capacity=10000, + min_batch_size=1000, + metrics_update_interval=3600, + ) + try: + env.run(until=runtime) + except KeyboardInterrupt: + pass + finally: + end_time = env.time + print("total simulated time:", end_time - start_time) + metrics = env.scheduler.update_metrics(timestamp=end_time) + env.report.record_metrics(end_time, metrics) + print(env.report.format()) diff --git a/swh/scheduler/simulator/common.py b/swh/scheduler/simulator/common.py new file mode 100644 index 0000000..7a26341 --- /dev/null +++ b/swh/scheduler/simulator/common.py @@ -0,0 +1,132 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from dataclasses import dataclass, field +from datetime import datetime, timedelta +import textwrap +from typing import Dict, List, Tuple +import uuid + +import plotille +from simpy import Environment as _Environment +from simpy import Store + +from swh.model.model import OriginVisitStatus +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.model import SchedulerMetrics + + +@dataclass +class SimulationReport: + DURATION_THRESHOLD = 3600 + """Max duration for histograms""" + + total_visits: int = 0 + """Total count of finished visits""" + + visit_runtimes: Dict[Tuple[str, bool], List[float]] = field(default_factory=dict) + """Collected visit runtimes for each (status, eventful) tuple""" + + metrics: List[Tuple[datetime, List[SchedulerMetrics]]] = field(default_factory=list) + """Collected scheduler metrics for every timestamp""" + + def record_visit(self, duration: float, eventful: bool, status: str) -> None: + self.total_visits += 1 + self.visit_runtimes.setdefault((status, eventful), []).append(duration) + + def record_metrics(self, timestamp: datetime, metrics: List[SchedulerMetrics]): + self.metrics.append((timestamp, metrics)) + + @property + def useless_visits(self): + """Number of uneventful, full visits""" + return len(self.visit_runtimes.get(("full", False), [])) + + def runtime_histogram(self, status: str, eventful: bool) -> str: + runtimes = self.visit_runtimes.get((status, eventful), []) + return plotille.hist( + [runtime for runtime in runtimes if runtime <= self.DURATION_THRESHOLD] + ) + + def metrics_plot(self) -> str: + timestamps, metric_lists = zip(*self.metrics) + known = [sum(m.origins_known for m in metrics) for metrics in metric_lists] + never_visited = [ + sum(m.origins_never_visited for m in metrics) for metrics in metric_lists + ] + + figure = plotille.Figure() + figure.x_label = "simulated time" + figure.y_label = "origins" + figure.scatter(timestamps, known, label="Known origins") + figure.scatter(timestamps, never_visited, label="Origins never visited") + + return figure.show(legend=True) + + def format(self): + full_visits = self.visit_runtimes.get(("full", True), []) + histogram = self.runtime_histogram("full", True) + plot = self.metrics_plot() + long_tasks = sum(runtime > self.DURATION_THRESHOLD for runtime in full_visits) + + return ( + textwrap.dedent( + f"""\ + Total visits: {self.total_visits} + Useless visits: {self.useless_visits} + Eventful visits: {len(full_visits)} + Very long running tasks: {long_tasks} + Visit time histogram for eventful visits: + """ + ) + + histogram + + "\n" + + textwrap.dedent( + """\ + Metrics over time: + """ + ) + + plot + ) + + +@dataclass +class Task: + visit_type: str + origin: str + backend_id: uuid.UUID = field(default_factory=uuid.uuid4) + + +@dataclass +class TaskEvent: + task: Task + status: OriginVisitStatus + eventful: bool = field(default=False) + + +class Queue(Store): + """Model a queue of objects to be passed between processes.""" + + def __len__(self): + return len(self.items or []) + + def slots_remaining(self): + return self.capacity - len(self) + + +class Environment(_Environment): + report: SimulationReport + scheduler: SchedulerInterface + + def __init__(self, start_time: datetime): + if start_time.tzinfo is None: + raise ValueError("start_time must have timezone information") + self.start_time = start_time + super().__init__() + + @property + def time(self): + """Get the current simulated wall clock time""" + return self.start_time + timedelta(seconds=self.now) diff --git a/swh/scheduler/simulator/origin_scheduler.py b/swh/scheduler/simulator/origin_scheduler.py new file mode 100644 index 0000000..3b9d59a --- /dev/null +++ b/swh/scheduler/simulator/origin_scheduler.py @@ -0,0 +1,68 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Agents using the new origin-aware scheduler.""" + +import logging +from typing import Any, Dict, Generator, Iterator, List + +from simpy import Event + +from swh.scheduler.journal_client import process_journal_objects + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +def scheduler_runner_process( + env: Environment, task_queues: Dict[str, Queue], policy: str, min_batch_size: int +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + remaining = queue.slots_remaining() + if remaining < min_batch_size: + continue + next_origins = env.scheduler.grab_next_visits( + visit_type, remaining, policy=policy + ) + logger.debug( + "%s runner: running %s %s tasks", + env.time, + visit_type, + len(next_origins), + ) + for origin in next_origins: + yield queue.put(Task(visit_type=origin.visit_type, origin=origin.url)) + + yield env.timeout(10.0) + + +def scheduler_journal_client_process( + env: Environment, status_queue: Queue +) -> Generator[Event, TaskEvent, None]: + """Scheduler journal client. Every once in a while, pulls + `OriginVisitStatus`es from the status_queue to update the scheduler + origin_visit_stats table.""" + BATCH_SIZE = 100 + + statuses: List[Dict[str, Any]] = [] + while True: + task_event = yield status_queue.get() + statuses.append(task_event.status.to_dict()) + if len(statuses) < BATCH_SIZE: + continue + + logger.debug( + "%s journal client: processing %s statuses", env.time, len(statuses) + ) + process_journal_objects( + {"origin_visit_status": statuses}, scheduler=env.scheduler + ) + + statuses = [] diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py new file mode 100644 index 0000000..3320790 --- /dev/null +++ b/swh/scheduler/simulator/origins.py @@ -0,0 +1,130 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This module implements a model of the frequency of updates of an origin +and how long it takes to load it.""" + +from datetime import timedelta +import hashlib +import logging +import os +from typing import Iterator, Optional, Tuple + +import attr +from simpy import Event + +from swh.model.model import OriginVisitStatus +from swh.scheduler.model import OriginVisitStats + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +class OriginModel: + MIN_RUN_TIME = 0.5 + """Minimal run time for a visit (retrieved from production data)""" + + MAX_RUN_TIME = 7200 + """Max run time for a visit""" + + PER_COMMIT_RUN_TIME = 0.1 + """Run time per commit""" + + def __init__(self, type: str, origin: str): + self.type = type + self.origin = origin + + def seconds_between_commits(self): + """Returns a random 'average time between two commits' of this origin, + used to estimate the run time of a load task, and how much the loading + architecture is lagging behind origin updates.""" + n_bytes = 2 + num_buckets = 2 ** (8 * n_bytes) + + # Deterministic seed to generate "random" characteristics of this origin + bucket = int.from_bytes( + hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" + ) + + # minimum: 1 second (bucket == 0) + # max: 10 years (bucket == num_buckets - 1) + ten_y = 10 * 365 * 24 * 3600 + + return ten_y ** (bucket / num_buckets) + # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) + + def load_task_characteristics( + self, env: Environment, stats: Optional[OriginVisitStats] + ) -> Tuple[float, bool, str]: + """Returns the (run_time, eventfulness, end_status) of the next + origin visit.""" + if stats and stats.last_eventful: + time_since_last_successful_run = env.time - stats.last_eventful + else: + time_since_last_successful_run = timedelta(days=365) + + seconds_between_commits = self.seconds_between_commits() + logger.debug( + "Interval between commits: %s", timedelta(seconds=seconds_between_commits) + ) + + seconds_since_last_successful = time_since_last_successful_run.total_seconds() + if seconds_since_last_successful < seconds_between_commits: + # No commits since last visit => uneventful + return (self.MIN_RUN_TIME, False, "full") + else: + n_commits = seconds_since_last_successful / seconds_between_commits + run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits + if run_time > self.MAX_RUN_TIME: + return (self.MAX_RUN_TIME, False, "partial") + else: + return (run_time, True, "full") + + +def load_task_process( + env: Environment, task: Task, status_queue: Queue +) -> Iterator[Event]: + """A loading task. This pushes OriginVisitStatus objects to the + status_queue to simulate the visible outcomes of the task. + + Uses the `load_task_duration` function to determine its run time. + """ + # This is cheating; actual tasks access the state from the storage, not the + # scheduler + pk = task.origin, task.visit_type + visit_stats = env.scheduler.origin_visit_stats_get([pk]) + stats: Optional[OriginVisitStats] = visit_stats[0] if len(visit_stats) > 0 else None + last_snapshot = stats.last_snapshot if stats else None + + status = OriginVisitStatus( + origin=task.origin, + visit=42, + type=task.visit_type, + status="created", + date=env.time, + snapshot=None, + ) + origin_model = OriginModel(task.visit_type, task.origin) + (run_time, eventful, end_status) = origin_model.load_task_characteristics( + env, stats + ) + logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) + yield status_queue.put(TaskEvent(task=task, status=status)) + yield env.timeout(run_time) + logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) + + new_snapshot = os.urandom(20) if eventful else last_snapshot + yield status_queue.put( + TaskEvent( + task=task, + status=attr.evolve( + status, status=end_status, date=env.time, snapshot=new_snapshot + ), + eventful=eventful, + ) + ) + + env.report.record_visit(run_time, eventful, end_status) diff --git a/swh/scheduler/simulator/task_scheduler.py b/swh/scheduler/simulator/task_scheduler.py new file mode 100644 index 0000000..a86a962 --- /dev/null +++ b/swh/scheduler/simulator/task_scheduler.py @@ -0,0 +1,76 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Agents using the "old" task-based scheduler.""" + +import logging +from typing import Dict, Generator, Iterator + +from simpy import Event + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +def scheduler_runner_process( + env: Environment, task_queues: Dict[str, Queue], min_batch_size: int, +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + remaining = queue.slots_remaining() + if remaining < min_batch_size: + continue + next_tasks = env.scheduler.grab_ready_tasks( + f"load-{visit_type}", num_tasks=remaining, timestamp=env.time + ) + logger.debug( + "%s runner: running %s %s tasks", env.time, visit_type, len(next_tasks), + ) + + sim_tasks = [ + Task(visit_type=visit_type, origin=task["arguments"]["kwargs"]["url"]) + for task in next_tasks + ] + + env.scheduler.mass_schedule_task_runs( + [ + { + "task": task["id"], + "scheduled": env.time, + "backend_id": str(sim_task.backend_id), + } + for task, sim_task in zip(next_tasks, sim_tasks) + ] + ) + + for sim_task in sim_tasks: + yield queue.put(sim_task) + + yield env.timeout(10.0) + + +def scheduler_listener_process( + env: Environment, status_queue: Queue +) -> Generator[Event, TaskEvent, None]: + """Scheduler listener. In the real world this would listen to celery + events, but we listen to the status_queue and simulate celery events from + that.""" + while True: + event = yield status_queue.get() + if event.status.status == "ongoing": + env.scheduler.start_task_run(event.task.backend_id, timestamp=env.time) + else: + if event.status.status == "full": + status = "eventful" if event.eventful else "uneventful" + else: + status = "failed" + + env.scheduler.end_task_run( + str(event.task.backend_id), status=status, timestamp=env.time + ) diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql index 118744c..6a17d51 100644 --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -1,161 +1,208 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (17, now(), 'Work In Progress'); + values (25, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval, min_interval interval, max_interval interval, backoff_factor float, max_queue_length bigint, num_retries bigint, retry_delay interval ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; comment on column task_type.num_retries is 'Default number of retries on transient failures'; comment on column task_type.retry_delay is 'Retry delay for the task'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'); comment on type task_status is 'Status of a given task'; create type task_policy as enum ('recurring', 'oneshot'); comment on type task_policy is 'Recurrence policy of the given task'; create type task_priority as enum('high', 'normal', 'low'); comment on type task_priority is 'Priority of the given task'; create table priority_ratio( id task_priority primary key, ratio float not null ); comment on table priority_ratio is 'Oneshot task''s reading ratio per priority'; comment on column priority_ratio.id is 'Task priority id'; comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority'; insert into priority_ratio (id, ratio) values ('high', 0.5); insert into priority_ratio (id, ratio) values ('normal', 0.3); insert into priority_ratio (id, ratio) values ('low', 0.2); create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval, status task_status not null, policy task_policy not null default 'recurring', retries_left bigint not null default 0, priority task_priority references priority_ratio(id), check (policy <> 'recurring' or current_interval is not null) ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; comment on column task.policy is 'Whether the task is one-shot or recurring'; comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' 'transient failure'; comment on column task.priority is 'Policy of the given task'; comment on column task.id is 'Task Identifier'; comment on column task.type is 'References task_type table'; comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')'; create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'); comment on type task_run_status is 'Status of a given task run'; create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; comment on column task_run.id is 'Task run identifier'; comment on column task_run.task is 'References task table'; comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; create table if not exists listers ( id uuid primary key default uuid_generate_v4(), name text not null, instance_name text not null, created timestamptz not null default now(), -- auto_now_add in the model current_state jsonb not null, updated timestamptz not null ); comment on table listers is 'Lister instances known to the origin visit scheduler'; comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)'; comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)'; comment on column listers.created is 'Timestamp at which the lister was originally created'; comment on column listers.current_state is 'Known current state of this lister'; comment on column listers.updated is 'Timestamp at which the lister state was last updated'; create table if not exists listed_origins ( -- Basic information lister_id uuid not null references listers(id), url text not null, visit_type text not null, extra_loader_arguments jsonb not null, -- Whether this origin still exists or not enabled boolean not null, -- time-based information first_seen timestamptz not null default now(), last_seen timestamptz not null, -- potentially provided by the lister last_update timestamptz, primary key (lister_id, url, visit_type) ); comment on table listed_origins is 'Origins known to the origin visit scheduler'; comment on column listed_origins.lister_id is 'Lister instance which owns this origin'; comment on column listed_origins.url is 'URL of the origin listed'; comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url'; comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin'; comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.'; comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister'; comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister'; comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote'; + + +create table origin_visit_stats ( + url text not null, + visit_type text not null, + last_eventful timestamptz, + last_uneventful timestamptz, + last_failed timestamptz, + last_notfound timestamptz, + -- visit scheduling information + last_scheduled timestamptz, + -- last snapshot resulting from an eventful visit + last_snapshot bytea, + + primary key (url, visit_type) +); + +comment on column origin_visit_stats.url is 'Origin URL'; +comment on column origin_visit_stats.visit_type is 'Type of the visit for the given url'; +comment on column origin_visit_stats.last_eventful is 'Date of the last eventful event'; +comment on column origin_visit_stats.last_uneventful is 'Date of the last uneventful event'; +comment on column origin_visit_stats.last_failed is 'Date of the last failed event'; +comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; +comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; +comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; + + +create table scheduler_metrics ( + lister_id uuid not null references listers(id), + visit_type text not null, + last_update timestamptz not null, + origins_known int not null default 0, + origins_enabled int not null default 0, + origins_never_visited int not null default 0, + origins_with_pending_changes int not null default 0, + + primary key (lister_id, visit_type) +); + +comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; +comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; +comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; +comment on column scheduler_metrics.last_update is 'Last update of these metrics'; +comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; +comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; +comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been successfully visited'; +comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; diff --git a/swh/scheduler/sql/40-func.sql b/swh/scheduler/sql/40-func.sql index 684aebc..13ca599 100644 --- a/swh/scheduler/sql/40-func.sql +++ b/swh/scheduler/sql/40-func.sql @@ -1,408 +1,448 @@ create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task alter column retries_left drop not null, drop column id; $$; comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin -- update the default values in one go -- this is separated from the insert/select to avoid too much -- juggling update tmp_task t set current_interval = tt.default_interval, retries_left = coalesce(retries_left, tt.num_retries, 0) from task_type tt where tt.type=t.type; insert into task (type, arguments, next_run, status, current_interval, policy, retries_left, priority) select type, arguments, next_run, status, current_interval, policy, retries_left, priority from tmp_task t where not exists(select 1 from task where type = t.type and md5(arguments::text) = md5(t.arguments::text) and arguments = t.arguments and policy = t.policy and priority is not distinct from t.priority and status = t.status); return query select distinct t.* from tmp_task tt inner join task t on ( tt.type = t.type and md5(tt.arguments::text) = md5(t.arguments::text) and tt.arguments = t.arguments and tt.policy = t.policy and tt.priority is not distinct from t.priority and tt.status = t.status ); end; $$; comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; create or replace function swh_scheduler_peek_no_priority_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts and type = task_type and status = 'next_run_not_scheduled' and priority is null order by next_run limit num_tasks for update skip locked; $$; comment on function swh_scheduler_peek_no_priority_tasks (text, timestamptz, bigint) is 'Retrieve tasks without priority'; create or replace function swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority task_priority) returns numeric language sql stable as $$ select ceil(num_tasks_priority * (select ratio from priority_ratio where id = task_priority)) :: numeric $$; comment on function swh_scheduler_nb_priority_tasks (bigint, task_priority) is 'Given a priority task and a total number, compute the number of tasks to read'; create or replace function swh_scheduler_peek_tasks_with_priority (task_type text, ts timestamptz default now(), num_tasks_priority bigint default NULL, task_priority task_priority default 'normal') returns setof task language sql stable as $$ select * from task t where t.next_run <= ts and t.type = task_type and t.status = 'next_run_not_scheduled' and t.priority = task_priority order by t.next_run limit num_tasks_priority for update skip locked; $$; comment on function swh_scheduler_peek_tasks_with_priority(text, timestamptz, bigint, task_priority) is 'Retrieve tasks with a given priority'; create or replace function swh_scheduler_peek_priority_tasks (task_type text, ts timestamptz default now(), num_tasks_priority bigint default NULL) returns setof task language plpgsql as $$ declare r record; count_row bigint; nb_diff bigint; nb_high bigint; nb_normal bigint; nb_low bigint; begin -- expected values to fetch select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'high') into nb_high; select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'normal') into nb_normal; select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'low') into nb_low; nb_diff := 0; count_row := 0; for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_high, 'high') loop count_row := count_row + 1; return next r; end loop; if count_row < nb_high then nb_normal := nb_normal + nb_high - count_row; end if; count_row := 0; for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_normal, 'normal') loop count_row := count_row + 1; return next r; end loop; if count_row < nb_normal then nb_low := nb_low + nb_normal - count_row; end if; return query select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_low, 'low'); end $$; comment on function swh_scheduler_peek_priority_tasks(text, timestamptz, bigint) is 'Retrieve priority tasks'; create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL, num_tasks_priority bigint default NULL) returns setof task language plpgsql as $$ declare r record; count_row bigint; nb_diff bigint; nb_tasks bigint; begin count_row := 0; for r in select * from swh_scheduler_peek_priority_tasks(task_type, ts, num_tasks_priority) order by priority, next_run loop count_row := count_row + 1; return next r; end loop; if count_row < num_tasks_priority then nb_tasks := num_tasks + num_tasks_priority - count_row; else nb_tasks := num_tasks; end if; for r in select * from swh_scheduler_peek_no_priority_tasks(task_type, ts, nb_tasks) order by priority, next_run loop return next r; end loop; return; end $$; comment on function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint, bigint) is 'Retrieve tasks with/without priority in order'; create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL, num_tasks_priority bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from swh_scheduler_peek_ready_tasks(task_type, ts, num_tasks, num_tasks_priority) ) next_tasks where task.id = next_tasks.id returning task.*; $$; comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint) is 'Grab tasks ready for scheduling and change their status'; create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ insert into task_run (task, backend_id, metadata, scheduled, status) values (task_id, backend_id, metadata, ts, 'scheduled') returning *; $$; create or replace function swh_scheduler_mktemp_task_run () returns void language sql as $$ create temporary table tmp_task_run ( like task_run excluding indexes ) on commit drop; alter table tmp_task_run drop column id, drop column status; $$; comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling'; create or replace function swh_scheduler_schedule_task_run_from_temp () returns void language plpgsql as $$ begin insert into task_run (task, backend_id, metadata, scheduled, status) select task, backend_id, metadata, scheduled, 'scheduled' from tmp_task_run; return; end; $$; create or replace function swh_scheduler_start_task_run (backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set started = ts, status = 'started', metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; create or replace function swh_scheduler_end_task_run (backend_id text, status task_run_status, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set ended = ts, status = swh_scheduler_end_task_run.status, metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; create type task_record as ( task_id bigint, task_policy task_policy, task_status task_status, task_run_id bigint, arguments jsonb, type text, backend_id text, metadata jsonb, scheduled timestamptz, started timestamptz, ended timestamptz, task_run_status task_run_status ); create or replace function swh_scheduler_task_to_archive( ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, lim bigint default 10) returns setof task_record language sql stable as $$ select t.id as task_id, t.policy as task_policy, t.status as task_status, tr.id as task_run_id, t.arguments, t.type, tr.backend_id, tr.metadata, tr.scheduled, tr.started, tr.ended, tr.status as task_run_status from task_run tr inner join task t on tr.task=t.id where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or (t.policy = 'recurring' and t.status = 'disabled')) and ((ts_after <= tr.started and tr.started < ts_before) or (tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and t.id >= last_id order by tr.task, tr.started limit lim; $$; comment on function swh_scheduler_task_to_archive (timestamptz, timestamptz, bigint, bigint) is 'Read archivable tasks function'; create or replace function swh_scheduler_delete_archived_tasks( task_ids bigint[], task_run_ids bigint[]) returns void language sql as $$ -- clean up task_run_ids delete from task_run where id in (select * from unnest(task_run_ids)); -- clean up only tasks whose associated task_run are all cleaned up. -- Remaining tasks will stay there and will be cleaned up when -- remaining data have been indexed delete from task where id in (select t.id from task t left outer join task_run tr on t.id=tr.task where t.id in (select * from unnest(task_ids)) and tr.task is null); $$; comment on function swh_scheduler_delete_archived_tasks(bigint[], bigint[]) is 'Clean up archived tasks function'; create or replace function swh_scheduler_update_task_on_task_end () returns trigger language plpgsql as $$ declare cur_task task%rowtype; cur_task_type task_type%rowtype; adjustment_factor float; new_interval interval; begin select * from task where id = new.task into cur_task; select * from task_type where type = cur_task.type into cur_task_type; case when new.status = 'permfailed' then update task set status = 'disabled' where id = cur_task.id; when new.status in ('eventful', 'uneventful') then case when cur_task.policy = 'oneshot' then update task set status = 'completed' where id = cur_task.id; when cur_task.policy = 'recurring' then if new.status = 'uneventful' then adjustment_factor := 1/cur_task_type.backoff_factor; else adjustment_factor := 1/cur_task_type.backoff_factor; end if; new_interval := greatest( cur_task_type.min_interval, least( cur_task_type.max_interval, adjustment_factor * cur_task.current_interval)); update task set status = 'next_run_not_scheduled', - next_run = now() + new_interval, + next_run = new.ended + new_interval, current_interval = new_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; else -- new.status in 'failed', 'lost' if cur_task.retries_left > 0 then update task set status = 'next_run_not_scheduled', - next_run = now() + coalesce(cur_task_type.retry_delay, interval '1 hour'), + next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'), retries_left = cur_task.retries_left - 1 where id = cur_task.id; else -- no retries left case when cur_task.policy = 'oneshot' then update task set status = 'disabled' where id = cur_task.id; when cur_task.policy = 'recurring' then update task set status = 'next_run_not_scheduled', - next_run = now() + cur_task.current_interval, + next_run = new.ended + cur_task.current_interval, retries_left = coalesce(cur_task_type.num_retries, 0) where id = cur_task.id; end case; end if; -- retries end case; return null; end; $$; create trigger update_task_on_task_end after update of status on task_run for each row when (new.status NOT IN ('scheduled', 'started')) execute procedure swh_scheduler_update_task_on_task_end (); + + +create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) + returns setof scheduler_metrics + language sql +as $$ + insert into scheduler_metrics ( + lister_id, visit_type, last_update, + origins_known, origins_enabled, + origins_never_visited, origins_with_pending_changes + ) + select + lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, + count(*) as origins_known, + count(*) filter (where enabled) as origins_enabled, + count(*) filter (where + enabled and last_snapshot is NULL + ) as origins_never_visited, + count(*) filter (where + enabled and lo.last_update > greatest(ovs.last_eventful, ovs.last_uneventful) + ) as origins_with_pending_changes + from listed_origins lo + left join origin_visit_stats ovs using (url, visit_type) + where + -- update only for the requested lister + update_metrics.lister_id = lo.lister_id + -- or for all listers if the function argument is null + or update_metrics.lister_id is null + group by (lister_id, visit_type) + on conflict (lister_id, visit_type) do update + set + last_update = EXCLUDED.last_update, + origins_known = EXCLUDED.origins_known, + origins_enabled = EXCLUDED.origins_enabled, + origins_never_visited = EXCLUDED.origins_never_visited, + origins_with_pending_changes = EXCLUDED.origins_with_pending_changes + returning * +$$; + +comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; diff --git a/swh/scheduler/sql/60-indexes.sql b/swh/scheduler/sql/60-indexes.sql index 1812c3e..cbe8642 100644 --- a/swh/scheduler/sql/60-indexes.sql +++ b/swh/scheduler/sql/60-indexes.sql @@ -1,19 +1,23 @@ create index on task(type); create index on task(next_run); -- used for quick equality checking create index on task using btree(type, md5(arguments::text)); create index on task(priority); create index on task_run(task); create index on task_run(backend_id); create index task_run_id_asc_idx on task_run(task asc, started asc); -- lister schema create unique index on listers (name, instance_name); -- listed origins -create index on listed_origins (url); +create index on listed_origins (url, visit_type); + +-- visit stats +create index on origin_visit_stats (url, visit_type); +-- XXX probably add indexes on most (visit_type, last_xxx) couples diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py index 13c38d3..3b783c5 100644 --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -1,103 +1,103 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime TEMPLATES = { "git": { - "type": "update-git", + "type": "load-git", "arguments": {"args": [], "kwargs": {},}, "next_run": None, }, "hg": { - "type": "update-hg", + "type": "load-hg", "arguments": {"args": [], "kwargs": {},}, "next_run": None, "policy": "oneshot", }, } TASK_TYPES = { "git": { - "type": "update-git", + "type": "load-git", "description": "Update a git repository", "backend_name": "swh.loader.git.tasks.UpdateGitRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, "hg": { - "type": "update-hg", + "type": "load-hg", "description": "Update a mercurial repository", "backend_name": "swh.loader.mercurial.tasks.UpdateHgRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, } def tasks_from_template(template, max_timestamp, num, num_priority=0, priorities=None): """Build tasks from template """ def _task_from_template(template, next_run, priority, *args, **kwargs): ret = copy.deepcopy(template) ret["next_run"] = next_run if priority: ret["priority"] = priority if args: ret["arguments"]["args"] = list(args) if kwargs: ret["arguments"]["kwargs"] = kwargs return ret def _pop_priority(priorities): if not priorities: return None for priority, remains in priorities.items(): if remains > 0: priorities[priority] = remains - 1 return priority return None if num_priority and priorities: priorities = { priority: ratio * num_priority for priority, ratio in priorities.items() } tasks = [] for i in range(num + num_priority): priority = _pop_priority(priorities) tasks.append( _task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, "argument-%03d" % i, **{"kwarg%03d" % i: "bogus-kwarg"}, ) ) return tasks LISTERS = ( {"name": "github"}, {"name": "gitlab", "instance_name": "gitlab"}, {"name": "gitlab", "instance_name": "freedesktop"}, {"name": "npm"}, {"name": "pypi"}, ) diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 91e92bc..775b38a 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,41 +1,62 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os -from typing import List +from typing import Dict, List import pytest from swh.scheduler.model import ListedOrigin, Lister from swh.scheduler.tests.common import LISTERS # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith("CELERY")]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ["LC_ALL"] = "C.UTF-8" @pytest.fixture def stored_lister(swh_scheduler) -> Lister: """Store a lister in the scheduler and return its information""" return swh_scheduler.get_or_create_lister(**LISTERS[0]) @pytest.fixture -def listed_origins(stored_lister) -> List[ListedOrigin]: - """Return a (fixed) set of 1000 listed origins""" - return [ - ListedOrigin( - lister_id=stored_lister.id, - url=f"https://example.com/{i:04d}.git", - visit_type="git", - last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), - ) - for i in range(1000) - ] +def visit_types() -> List[str]: + """Possible visit types in `ListedOrigin`s""" + return ["git", "svn"] + + +@pytest.fixture +def listed_origins_by_type( + stored_lister: Lister, visit_types: List[str] +) -> Dict[str, List[ListedOrigin]]: + """A fixed list of `ListedOrigin`s, for each `visit_type`.""" + count_per_type = 1000 + assert stored_lister.id + return { + visit_type: [ + ListedOrigin( + lister_id=stored_lister.id, + url=f"https://{visit_type}.example.com/{i:04d}", + visit_type=visit_type, + last_update=datetime( + 2020, 6, 15, 16, 0, 0, j * count_per_type + i, tzinfo=timezone.utc + ), + ) + for i in range(count_per_type) + ] + for j, visit_type in enumerate(visit_types) + } + + +@pytest.fixture +def listed_origins(listed_origins_by_type) -> List[ListedOrigin]: + """Return a (fixed) set of listed origins""" + return sum(listed_origins_by_type.values(), []) diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index eae6b0c..ff0e6d2 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,75 +1,81 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from flask import url_for import pytest from swh.scheduler.api.client import RemoteScheduler import swh.scheduler.api.server as server from swh.scheduler.tests.test_scheduler import TestScheduler # noqa # tests are executed using imported class (TestScheduler) using overloaded # swh_scheduler fixture below # the Flask app used as server in these tests @pytest.fixture def app(swh_db_scheduler): assert hasattr(server, "scheduler") server.scheduler = swh_db_scheduler yield server.app # the RPCClient class used as client used in these tests @pytest.fixture def swh_rpc_client_class(): return RemoteScheduler @pytest.fixture def swh_scheduler(swh_rpc_client, app): yield swh_rpc_client def test_site_map(flask_app_client): sitemap = flask_app_client.get(url_for("site_map")) assert sitemap.headers["Content-Type"] == "application/json" rules = set(x["rule"] for x in sitemap.json) # we expect at least these rules expected_rules = set( "/" + rule for rule in ( + "lister/get", "lister/get_or_create", "lister/update", "origins/get", + "origins/grab_next", "origins/record", "priority_ratios/get", + "scheduler_metrics/get", + "scheduler_metrics/update", "task/create", "task/delete_archived", "task/disable", "task/filter_for_archive", "task/get", "task/grab_ready", "task/peek_ready", "task/search", "task/set_status", "task_run/end", "task_run/get", "task_run/schedule", "task_run/schedule_one", "task_run/start", "task_type/create", "task_type/get", "task_type/get_all", + "visit_stats/get", + "visit_stats/upsert", ) ) assert rules == expected_rules def test_root(flask_app_client): root = flask_app_client.get("/") assert root.status_code == 200 assert b"Software Heritage scheduler RPC server" in root.data diff --git a/swh/scheduler/tests/test_cli_journal.py b/swh/scheduler/tests/test_cli_journal.py new file mode 100644 index 0000000..666c5a6 --- /dev/null +++ b/swh/scheduler/tests/test_cli_journal.py @@ -0,0 +1,115 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +from typing import Dict, List + +from click.testing import CliRunner, Result +from confluent_kafka import Producer +import pytest +import yaml + +from swh.journal.serializers import value_to_kafka +from swh.scheduler import get_scheduler +from swh.scheduler.cli import cli +from swh.scheduler.tests.test_journal_client import VISIT_STATUSES_1 + + +@pytest.fixture +def swh_scheduler_cfg(postgresql_scheduler, kafka_server): + """Journal client configuration ready""" + return { + "scheduler": {"cls": "local", "db": postgresql_scheduler.dsn,}, + "journal": { + "brokers": [kafka_server], + "group_id": "test-consume-visit-status", + }, + } + + +def _write_configuration_path(config: Dict, tmp_path: str) -> str: + config_path = os.path.join(str(tmp_path), "scheduler.yml") + with open(config_path, "w") as f: + f.write(yaml.dump(config)) + return config_path + + +@pytest.fixture +def swh_scheduler_cfg_path(swh_scheduler_cfg, tmp_path): + """Write scheduler configuration in temporary path and returns such path""" + return _write_configuration_path(swh_scheduler_cfg, tmp_path) + + +def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result: + """Invoke swh scheduler journal subcommands + + """ + runner = CliRunner() + result = runner.invoke(cli, ["-C" + config_path] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def test_cli_journal_client_origin_visit_status_misconfiguration_no_scheduler( + swh_scheduler_cfg, tmp_path +): + config = swh_scheduler_cfg.copy() + config["scheduler"] = {"cls": "foo"} + config_path = _write_configuration_path(config, tmp_path) + with pytest.raises(ValueError, match="must be instantiated"): + invoke( + ["journal-client", "--stop-after-objects", "1",], config_path, + ) + + +def test_cli_journal_client_origin_visit_status_misconfiguration_missing_journal_conf( + swh_scheduler_cfg, tmp_path +): + config = swh_scheduler_cfg.copy() + config.pop("journal", None) + config_path = _write_configuration_path(config, tmp_path) + + with pytest.raises(ValueError, match="Missing 'journal'"): + invoke( + ["journal-client", "--stop-after-objects", "1",], config_path, + ) + + +def test_cli_journal_client_origin_visit_status( + swh_scheduler_cfg, swh_scheduler_cfg_path, +): + kafka_server = swh_scheduler_cfg["journal"]["brokers"][0] + swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"]) + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test visit-stats producer", + "acks": "all", + } + ) + visit_status = VISIT_STATUSES_1[0] + + value = value_to_kafka(visit_status) + topic = "swh.journal.objects.origin_visit_status" + producer.produce(topic=topic, key=b"bogus-origin", value=value) + producer.flush() + + result = invoke( + ["journal-client", "--stop-after-objects", "1",], swh_scheduler_cfg_path, + ) + + # Check the output + expected_output = "Processed 1 message(s).\nDone.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + actual_visit_stats = swh_scheduler.origin_visit_stats_get( + [(visit_status["origin"], visit_status["type"])] + ) + + assert actual_visit_stats + assert len(actual_visit_stats) == 1 diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py new file mode 100644 index 0000000..339260f --- /dev/null +++ b/swh/scheduler/tests/test_cli_origin.py @@ -0,0 +1,123 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Tuple + +import pytest + +from swh.scheduler.cli.origin import format_origins +from swh.scheduler.tests.common import TASK_TYPES +from swh.scheduler.tests.test_cli import invoke as basic_invoke + + +def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): + return basic_invoke( + scheduler, args=["origin", *args], catch_exceptions=catch_exceptions + ) + + +def test_cli_origin(swh_scheduler): + """Check that swh scheduler origin returns its help text""" + + result = invoke(swh_scheduler) + + assert "Commands:" in result.stdout + + +def test_format_origins_basic(listed_origins): + listed_origins = listed_origins[:100] + + basic_output = list(format_origins(listed_origins)) + # 1 header line + all origins + assert len(basic_output) == len(listed_origins) + 1 + + no_header_output = list(format_origins(listed_origins, with_header=False)) + assert basic_output[1:] == no_header_output + + +def test_format_origins_fields_unknown(listed_origins): + listed_origins = listed_origins[:10] + + it = format_origins(listed_origins, fields=["unknown_field"]) + + with pytest.raises(ValueError, match="unknown_field"): + next(it) + + +def test_format_origins_fields(listed_origins): + listed_origins = listed_origins[:10] + fields = ["lister_id", "url", "visit_type"] + + output = list(format_origins(listed_origins, fields=fields)) + assert output[0] == ",".join(fields) + for i, origin in enumerate(listed_origins): + assert output[i + 1] == f"{origin.lister_id},{origin.url},{origin.visit_type}" + + +def test_grab_next(swh_scheduler, listed_origins_by_type): + NUM_RESULTS = 10 + # Strict inequality to check that grab_next_visits doesn't return more + # results than requested + visit_type = next(iter(listed_origins_by_type)) + assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins(origins) + + result = invoke(swh_scheduler, args=("grab-next", visit_type, str(NUM_RESULTS))) + assert result.exit_code == 0 + + out_lines = result.stdout.splitlines() + assert len(out_lines) == NUM_RESULTS + 1 + + fields = out_lines[0].split(",") + returned_origins = [dict(zip(fields, line.split(","))) for line in out_lines[1:]] + + # Check that we've received origins we had listed in the first place + assert set(origin["url"] for origin in returned_origins) <= set( + origin.url for origin in listed_origins_by_type[visit_type] + ) + + +def test_schedule_next(swh_scheduler, listed_origins_by_type): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + NUM_RESULTS = 10 + # Strict inequality to check that grab_next_visits doesn't return more + # results than requested + visit_type = next(iter(listed_origins_by_type)) + assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins(origins) + + result = invoke(swh_scheduler, args=("schedule-next", visit_type, str(NUM_RESULTS))) + assert result.exit_code == 0 + + # pull all tasks out of the scheduler + tasks = swh_scheduler.search_tasks() + assert len(tasks) == NUM_RESULTS + + scheduled_tasks = { + (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks + } + all_possible_tasks = { + (f"load-{origin.visit_type}", origin.url) + for origin in listed_origins_by_type[visit_type] + } + + assert scheduled_tasks <= all_possible_tasks + + +def test_update_metrics(swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + assert swh_scheduler.get_metrics() == [] + + result = invoke(swh_scheduler, args=("update-metrics",)) + + assert result.exit_code == 0 + assert swh_scheduler.get_metrics() != [] diff --git a/swh/scheduler/tests/test_journal_client.py b/swh/scheduler/tests/test_journal_client.py new file mode 100644 index 0000000..c0576b7 --- /dev/null +++ b/swh/scheduler/tests/test_journal_client.py @@ -0,0 +1,541 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import functools +from itertools import permutations + +import pytest + +from swh.model.hashutil import hash_to_bytes +from swh.scheduler.journal_client import max_date, process_journal_objects +from swh.scheduler.model import ListedOrigin, OriginVisitStats +from swh.scheduler.utils import utcnow + + +def test_journal_client_origin_visit_status_from_journal_fail(swh_scheduler): + process_fn = functools.partial(process_journal_objects, scheduler=swh_scheduler,) + + with pytest.raises(AssertionError, match="Got unexpected origin_visit"): + process_fn({"origin_visit": [{"url": "http://foobar.baz"},]}) + + with pytest.raises(AssertionError, match="Expected origin_visit_status"): + process_fn({}) + + +ONE_DAY = datetime.timedelta(days=1) + +DATE3 = utcnow() +DATE2 = DATE3 - ONE_DAY +DATE1 = DATE2 - ONE_DAY + + +assert DATE1 < DATE2 < DATE3 + + +@pytest.mark.parametrize( + "dates,expected_max_date", + [ + ((DATE1,), DATE1), + ((None, DATE2), DATE2), + ((DATE1, None), DATE1), + ((DATE1, DATE2), DATE2), + ((DATE2, DATE1), DATE2), + ((DATE1, DATE2, DATE3), DATE3), + ((None, DATE2, DATE3), DATE3), + ((None, None, DATE3), DATE3), + ((DATE1, None, DATE3), DATE3), + ], +) +def test_max_date(dates, expected_max_date): + assert max_date(*dates) == expected_max_date + + +def test_max_date_raise(): + with pytest.raises(ValueError, match="valid datetime"): + max_date() + with pytest.raises(ValueError, match="valid datetime"): + max_date(None) + with pytest.raises(ValueError, match="valid datetime"): + max_date(None, None) + + +def test_journal_client_origin_visit_status_from_journal_ignored_status(swh_scheduler): + """Only final statuses (full, partial) are important, the rest remain ignored. + + """ + visit_statuses = [ + { + "origin": "foo", + "visit": 1, + "status": "created", + "date": utcnow(), + "type": "git", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 1, + "status": "ongoing", + "date": utcnow(), + "type": "svn", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + # Ensure those visit status are ignored + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + [(vs["origin"], vs["type"]) for vs in visit_statuses] + ) + assert actual_origin_visit_stats == [] + + +def test_journal_client_origin_visit_status_from_journal_last_notfound(swh_scheduler): + visit_status = { + "origin": "foo", + "visit": 1, + "status": "not_found", + "date": DATE1, + "type": "git", + "snapshot": None, + } + + process_journal_objects( + {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert actual_origin_visit_stats == [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=visit_status["date"], + last_snapshot=None, + ) + ] + + visit_statuses = [ + { + "origin": "foo", + "visit": 3, + "status": "not_found", + "date": DATE2, + "type": "git", + "snapshot": None, + }, + { + "origin": "foo", + "visit": 4, + "status": "not_found", + "date": DATE3, + "type": "git", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert actual_origin_visit_stats == [OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=None, + last_notfound=DATE3, + last_snapshot=None, + )] + + +def test_journal_client_origin_visit_status_from_journal_last_failed(swh_scheduler): + visit_statuses = [ + { + "origin": "foo", + "visit": 1, + "status": "partial", + "date": utcnow(), + "type": "git", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 1, + "status": "full", + "date": DATE1, + "type": "git", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 2, + "status": "full", + "date": DATE2, + "type": "git", + "snapshot": None, + }, + { + "origin": "bar", + "visit": 3, + "status": "full", + "date": DATE3, + "type": "git", + "snapshot": None, + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("bar", "git")]) + assert actual_origin_visit_stats == [OriginVisitStats( + url="bar", + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=DATE3, + last_notfound=None, + last_snapshot=None, + )] + + +def test_journal_client_origin_visit_status_from_journal_last_eventful(swh_scheduler): + visit_statuses = [ + { + "origin": "bar", + "visit": 1, + "status": "partial", + "date": utcnow(), + "type": "git", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "foo", + "visit": 1, + "status": "full", + "date": DATE1, + "type": "git", + "snapshot": hash_to_bytes("eeecc0710eb6cf9efd5b920a8453e1e07157bfff"), + }, + { + "origin": "foo", + "visit": 2, + "status": "partial", + "date": DATE2, + "type": "git", + "snapshot": hash_to_bytes("aaacc0710eb6cf9efd5b920a8453e1e07157baaa"), + }, + { + "origin": "foo", + "visit": 3, + "status": "full", + "date": DATE3, + "type": "git", + "snapshot": hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + }, + ] + + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get([("foo", "git")]) + assert actual_origin_visit_stats == [OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE3, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("dddcc0710eb6cf9efd5b920a8453e1e07157bddd"), + )] + + +def test_journal_client_origin_visit_status_from_journal_last_uneventful(swh_scheduler): + visit_status = { + "origin": "foo", + "visit": 1, + "status": "full", + "date": DATE3 + ONE_DAY, + "type": "git", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + } + + # Let's insert some visit stats with some previous visit information + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=DATE1, + last_uneventful=DATE3, + last_failed=DATE2, + last_notfound=DATE1, + last_snapshot=visit_status["snapshot"], + ) + ] + ) + + process_journal_objects( + {"origin_visit_status": [visit_status]}, scheduler=swh_scheduler + ) + + actual_origin_visit_stats = swh_scheduler.origin_visit_stats_get( + [(visit_status["origin"], visit_status["type"])] + ) + assert actual_origin_visit_stats == [ + OriginVisitStats( + url=visit_status["origin"], + visit_type=visit_status["type"], + last_eventful=DATE1, + last_uneventful=visit_status["date"], # most recent date but uneventful + last_failed=DATE2, + last_notfound=DATE1, + last_snapshot=visit_status["snapshot"], + ) + ] + + +VISIT_STATUSES = [ + {**ovs, "date": DATE1 + n * ONE_DAY} + for n, ovs in enumerate( + [ + { + "origin": "foo", + "type": "git", + "visit": 1, + "status": "created", + "snapshot": None, + }, + { + "origin": "foo", + "type": "git", + "visit": 1, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "foo", + "type": "git", + "visit": 2, + "status": "created", + "snapshot": None, + }, + { + "origin": "foo", + "type": "git", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + ] + ) +] + + +@pytest.mark.parametrize( + "visit_statuses", permutations(VISIT_STATUSES, len(VISIT_STATUSES)) +) +def test_journal_client_origin_visit_status_permutation0(visit_statuses, swh_scheduler): + """Ensure out of order topic subscription ends up in the same final state + + """ + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + expected_visit_stats = OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=DATE1 + ONE_DAY, + last_uneventful=DATE1 + 3 * ONE_DAY, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ) + + assert swh_scheduler.origin_visit_stats_get([("foo", "git")]) == [ + expected_visit_stats + ] + + +VISIT_STATUSES_1 = [ + {**ovs, "date": DATE1 + n * ONE_DAY} + for n, ovs in enumerate( + [ + { + "origin": "cavabarder", + "type": "hg", + "visit": 1, + "status": "partial", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 3, + "status": "full", + "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 4, + "status": "full", + "snapshot": hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + }, + ] + ) +] + + +@pytest.mark.parametrize( + "visit_statuses", permutations(VISIT_STATUSES_1, len(VISIT_STATUSES_1)) +) +def test_journal_client_origin_visit_status_permutation1(visit_statuses, swh_scheduler): + """Ensure out of order topic subscription ends up in the same final state + + """ + process_journal_objects( + {"origin_visit_status": visit_statuses}, scheduler=swh_scheduler + ) + + expected_visit_stats = OriginVisitStats( + url="cavabarder", + visit_type="hg", + last_eventful=DATE1 + 2 * ONE_DAY, + last_uneventful=DATE1 + 3 * ONE_DAY, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("aaaaaabbbeb6cf9efd5b920a8453e1e07157b6cd"), + ) + + assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [ + expected_visit_stats + ] + + +VISIT_STATUSES_2 = [ + {**ovs, "date": DATE1 + n * ONE_DAY} + for n, ovs in enumerate( + [ + { + "origin": "cavabarder", + "type": "hg", + "visit": 1, + "status": "full", + "snapshot": hash_to_bytes("0000000000000000000000000000000000000000"), + }, + { + "origin": "cavabarder", + "type": "hg", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("1111111111111111111111111111111111111111"), + }, + { + "origin": "iciaussi", + "type": "hg", + "visit": 1, + "status": "full", + "snapshot": hash_to_bytes("2222222222222222222222222222222222222222"), + }, + { + "origin": "iciaussi", + "type": "hg", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("3333333333333333333333333333333333333333"), + }, + { + "origin": "cavabarder", + "type": "git", + "visit": 1, + "status": "full", + "snapshot": hash_to_bytes("4444444444444444444444444444444444444444"), + }, + { + "origin": "cavabarder", + "type": "git", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("5555555555555555555555555555555555555555"), + }, + { + "origin": "iciaussi", + "type": "git", + "visit": 1, + "status": "full", + "snapshot": hash_to_bytes("6666666666666666666666666666666666666666"), + }, + { + "origin": "iciaussi", + "type": "git", + "visit": 2, + "status": "full", + "snapshot": hash_to_bytes("7777777777777777777777777777777777777777"), + }, + ] + ) +] + + +def test_journal_client_origin_visit_status_after_grab_next_visits( + swh_scheduler, stored_lister +): + """Ensure OriginVisitStat entries created in the db as a result of calling + grab_next_visits() do not mess the OriginVisitStats upsert mechanism. + + """ + + listed_origins = [ + ListedOrigin(lister_id=stored_lister.id, url=url, visit_type=visit_type) + for (url, visit_type) in set((v["origin"], v["type"]) for v in VISIT_STATUSES_2) + ] + swh_scheduler.record_listed_origins(listed_origins) + before = utcnow() + swh_scheduler.grab_next_visits( + visit_type="git", count=10, policy="oldest_scheduled_first" + ) + after = utcnow() + + assert swh_scheduler.origin_visit_stats_get([("cavabarder", "hg")]) == [] + assert swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] is not None + + process_journal_objects( + {"origin_visit_status": VISIT_STATUSES_2}, scheduler=swh_scheduler + ) + + for url in ("cavabarder", "iciaussi"): + ovs = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] + assert before <= ovs.last_scheduled <= after + + ovs = swh_scheduler.origin_visit_stats_get([(url, "hg")])[0] + assert ovs.last_scheduled is None + + ovs = swh_scheduler.origin_visit_stats_get([("cavabarder", "git")])[0] + assert ovs.last_eventful == DATE1 + 5 * ONE_DAY + assert ovs.last_uneventful is None + assert ovs.last_failed is None + assert ovs.last_notfound is None + assert ovs.last_snapshot == hash_to_bytes( + "5555555555555555555555555555555555555555" + ) diff --git a/swh/scheduler/tests/test_model.py b/swh/scheduler/tests/test_model.py index 47bb618..1293e64 100644 --- a/swh/scheduler/tests/test_model.py +++ b/swh/scheduler/tests/test_model.py @@ -1,94 +1,123 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime +import uuid import attr from swh.scheduler import model def test_select_columns(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str) test1 = attr.ib(type=str) a_first_attr = attr.ib(type=str) @property def test2(self): """This property should not show up in the extracted columns""" return self.test1 assert TestModel.select_columns() == ("a_first_attr", "id", "test1") def test_insert_columns(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str) test1 = attr.ib(type=str) @property def test2(self): """This property should not show up in the extracted columns""" return self.test1 assert TestModel.insert_columns_and_metavars() == ( ("id", "test1"), ("%(id)s", "%(test1)s"), ) def test_insert_columns_auto_now_add(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str) test1 = attr.ib(type=str) added = attr.ib(type=datetime.datetime, metadata={"auto_now_add": True}) assert TestModel.insert_columns_and_metavars() == ( ("id", "test1"), ("%(id)s", "%(test1)s"), ) def test_insert_columns_auto_now(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str) test1 = attr.ib(type=str) updated = attr.ib(type=datetime.datetime, metadata={"auto_now": True}) assert TestModel.insert_columns_and_metavars() == ( ("id", "test1", "updated"), ("%(id)s", "%(test1)s", "now()"), ) def test_insert_columns_primary_key(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str, metadata={"auto_primary_key": True}) test1 = attr.ib(type=str) assert TestModel.insert_columns_and_metavars() == (("test1",), ("%(test1)s",)) def test_insert_primary_key(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str, metadata={"auto_primary_key": True}) test1 = attr.ib(type=str) assert TestModel.primary_key_columns() == ("id",) @attr.s class TestModel2(model.BaseSchedulerModel): col1 = attr.ib(type=str, metadata={"primary_key": True}) col2 = attr.ib(type=str, metadata={"primary_key": True}) test1 = attr.ib(type=str) assert TestModel2.primary_key_columns() == ("col1", "col2") + + +def test_listed_origin_as_task_dict(): + origin = model.ListedOrigin( + lister_id=uuid.uuid4(), url="http://example.com/", visit_type="git", + ) + + task = origin.as_task_dict() + assert task == { + "type": "load-git", + "arguments": {"args": [], "kwargs": {"url": "http://example.com/"}}, + } + + origin_w_args = model.ListedOrigin( + lister_id=uuid.uuid4(), + url="http://example.com/svn/", + visit_type="svn", + extra_loader_arguments={"foo": "bar"}, + ) + + task_w_args = origin_w_args.as_task_dict() + assert task_w_args == { + "type": "load-svn", + "arguments": { + "args": [], + "kwargs": {"url": "http://example.com/svn/", "foo": "bar"}, + }, + } diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 9c8f6b7..ad78cff 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,731 +1,1159 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import copy import datetime import inspect import random -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import uuid import attr import pytest -from swh.scheduler.exc import StaleData +from swh.model.hashutil import hash_to_bytes +from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy from swh.scheduler.interface import SchedulerInterface -from swh.scheduler.model import ListedOrigin, ListedOriginPageToken +from swh.scheduler.model import ( + ListedOrigin, + ListedOriginPageToken, + OriginVisitStats, + SchedulerMetrics, +) from swh.scheduler.utils import utcnow from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template ONEDAY = datetime.timedelta(days=1) def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} +def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: + return (m.lister_id, m.visit_type) + + +def assert_metrics_equal(left, right): + assert sorted(left, key=metrics_sort_key) == sorted(right, key=metrics_sort_key) + + class TestScheduler: def test_interface(self, swh_scheduler): """Checks all methods of SchedulerInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (SchedulerInterface,), {})() assert "create_task_type" in dir(interface) missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) try: concrete_meth = getattr(swh_scheduler, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] def test_get_priority_ratios(self, swh_scheduler): assert swh_scheduler.get_priority_ratios() == { "high": 0.5, "normal": 0.3, "low": 0.2, } def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) tt2 = TASK_TYPES["hg"] swh_scheduler.create_task_type(tt2) assert tt == swh_scheduler.get_task_type(tt["type"]) assert tt2 == swh_scheduler.get_task_type(tt2["type"]) def test_create_task_type_idempotence(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES["git"], TASK_TYPES["hg"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt2) actual_task_types = swh_scheduler.get_task_types() assert tt in actual_task_types assert tt2 in actual_task_types def test_create_tasks(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) num_tasks_priority = 100 tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), 100) tasks_2 = tasks_from_template( TEMPLATES["hg"], utcnow(), 100, num_tasks_priority, priorities=priority_ratio, ) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t["id"] for t in ret1]) # creating the same set result in the same ids ret = swh_scheduler.create_tasks(tasks) set_ret = set([t["id"] for t in ret]) # Idempotence results assert set_ret == set_ret1 assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task["type"].split("-")[-1]] assert task["id"] not in ids assert task["status"] == "next_run_not_scheduled" assert task["current_interval"] == task_type["default_interval"] assert task["policy"] == orig_task.get("policy", "recurring") priority = task.get("priority") if priority: actual_priorities[priority] += 1 assert task["retries_left"] == (task_type["num_retries"] or 0) ids.add(task["id"]) del task["id"] del task["status"] del task["current_interval"] del task["retries_left"] if "policy" not in orig_task: del task["policy"] if "priority" not in orig_task: del task["priority"] assert task == orig_task assert dict(actual_priorities) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() } def test_peek_ready_tasks_no_priority(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] tasks = tasks_from_template(TEMPLATES["git"], t, 100) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): assert ready_tasks[i]["next_run"] <= ready_tasks[i + 1]["next_run"] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks) // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks(task_type, num_tasks=limit) assert len(ready_tasks_limited) == limit assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit - 1]["next_run"] ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts ) for ready_task in ready_tasks_timestamped: assert ready_task["next_run"] <= max_ts # Make sure we get proper behavior for the first ready tasks assert ready_tasks[: len(ready_tasks_timestamped)] == ready_tasks_timestamped # Limit by both ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit // 3 ) assert len(ready_tasks_both) <= limit // 3 for ready_task in ready_tasks_both: assert ready_task["next_run"] <= max_ts assert ready_task in ready_tasks[: limit // 3] def _priority_ratio(self, swh_scheduler): return swh_scheduler.get_priority_ratios() def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["git"], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio, ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) # take all available tasks ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) assert num_tasks_priority + num_tasks_no_priority == len(ready_tasks) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get("priority") if priority: count_tasks_per_priority[priority] += 1 assert dict(count_tasks_per_priority) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() } # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority // 2) num_tasks_priority = random.randrange(5, num_tasks_priority // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority ) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get("priority") count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] assert actual_prio == expected_count or actual_prio == expected_count + 1 assert count_tasks_per_priority[None] == num_tasks def test_grab_ready_tasks(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["git"], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio, ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) first_ready_tasks = swh_scheduler.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10 ) grabbed_tasks = swh_scheduler.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10 ) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed assert peeked["priority"] == grabbed["priority"] def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = sorted(tasks[:length], key=lambda x: x["id"]) tasks[:length] = [] ret = swh_scheduler.get_tasks(task["id"] for task in cur_tasks) # result is not guaranteed to be sorted ret.sort(key=lambda x: x["id"]) assert ret == cur_tasks def test_search_tasks(self, swh_scheduler): def make_real_dicts(lst): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in lst] self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) assert make_real_dicts(swh_scheduler.search_tasks()) == make_real_dicts(tasks) def assert_filtered_task_ok( self, task: Dict[str, Any], after: datetime.datetime, before: datetime.datetime ) -> None: """Ensure filtered tasks have the right expected properties (within the range, recurring disabled, etc..) """ started = task["started"] date = started if started is not None else task["scheduled"] assert after <= date and date <= before if task["task_policy"] == "oneshot": assert task["task_status"] in ["completed", "disabled"] if task["task_policy"] == "recurring": assert task["task_status"] in ["disabled"] def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") _tasks.append(t) # Randomly update task's status per policy status_per_policy = {"recurring": 0, "oneshot": 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] "recurring": [ (1, "disabled"), (0, "completed"), (0, "next_run_not_scheduled"), ], "oneshot": [ (0, "next_run_not_scheduled"), (1, "disabled"), (1, "completed"), ], } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task["policy"] _task_ids[policy].append(task["id"]) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task["id"]) swh_scheduler.disable_tasks(tasks_to_update["recurring"]) # hack: change the status to something else than completed/disabled swh_scheduler.set_status_tasks( _task_ids["oneshot"], status="next_run_not_scheduled" ) # complete the tasks to update swh_scheduler.set_status_tasks(tasks_to_update["oneshot"], status="completed") total_tasks_filtered = ( status_per_policy["recurring"] + status_per_policy["oneshot"] ) # no pagination scenario # retrieve tasks to archive after = _time - ONEDAY after_ts = after.strftime("%Y-%m-%d") before = utcnow() + ONEDAY before_ts = before.strftime("%Y-%m-%d") tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=total_tasks ) tasks_to_archive = tasks_result["tasks"] assert len(tasks_to_archive) == total_tasks_filtered assert tasks_result.get("next_page_token") is None actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in tasks_to_archive: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy # pagination scenario nb_tasks = 3 tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) == nb_tasks next_page_token = tasks_result["next_page_token"] assert next_page_token is not None all_tasks = tasks_to_archive2 while next_page_token is not None: # Retrieve paginated results tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks, page_token=next_page_token, ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) <= nb_tasks all_tasks.extend(tasks_to_archive2) next_page_token = tasks_result.get("next_page_token") actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in all_tasks: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy def test_delete_archived_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") c = random.randint(0, 100) if c <= percent: _tasks.append({"task_id": t["task"], "task_run_id": t["id"]}) swh_scheduler.delete_archived_tasks(_tasks) all_tasks = [task["id"] for task in swh_scheduler.search_tasks()] tasks_count = len(all_tasks) tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) assert tasks_count == total_tasks - len(_tasks) assert tasks_run_count == total_tasks - len(_tasks) def test_get_task_runs_no_task(self, swh_scheduler): """No task exist in the scheduler's db, get_task_runs() should always return an empty list. """ assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_no_task_executed(self, swh_scheduler): """No task has been executed yet, get_task_runs() should always return an empty list. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) swh_scheduler.create_tasks(recurring + oneshots) assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_with_scheduled(self, swh_scheduler): """Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) assert not swh_scheduler.get_task_runs(task_ids=[total_tasks + 1]) btask = backend_tasks[0] runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 run = runs[0] assert subdict(run, excl=("id",)) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": None, "ended": None, "metadata": None, "status": "scheduled", } runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks], limit=2 ) assert len(runs) == 2 runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks] ) assert len(runs) == total_tasks keys = ("task", "backend_id", "scheduled") assert ( sorted([subdict(x, keys) for x in runs], key=lambda x: x["task"]) == backend_tasks ) def test_get_task_runs_with_executed(self, swh_scheduler): """Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() swh_scheduler.start_task_run( btask["backend_id"], metadata={"something": "stupid"}, timestamp=ts ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": None, "metadata": {"something": "stupid"}, "status": "started", } ts2 = utcnow() swh_scheduler.end_task_run( btask["backend_id"], metadata={"other": "stuff"}, timestamp=ts2, status="eventful", ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": ts2, "metadata": {"something": "stupid", "other": "stuff"}, "status": "eventful", } def test_get_or_create_lister(self, swh_scheduler): db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) for lister, lister_args in zip(db_listers, LISTERS): assert lister.name == lister_args["name"] assert lister.instance_name == lister_args.get("instance_name", "") lister_get_again = swh_scheduler.get_or_create_lister( lister.name, lister.instance_name ) assert lister == lister_get_again + def test_get_lister(self, swh_scheduler): + for lister_args in LISTERS: + assert swh_scheduler.get_lister(**lister_args) is None + + db_listers = [] + for lister_args in LISTERS: + db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) + + for lister, lister_args in zip(db_listers, LISTERS): + lister_get_again = swh_scheduler.get_lister( + lister.name, lister.instance_name + ) + + assert lister == lister_get_again + def test_update_lister(self, swh_scheduler, stored_lister): lister = attr.evolve(stored_lister, current_state={"updated": "now"}) updated_lister = swh_scheduler.update_lister(lister) assert updated_lister.updated > lister.updated assert updated_lister == attr.evolve(lister, updated=updated_lister.updated) def test_update_lister_stale(self, swh_scheduler, stored_lister): swh_scheduler.update_lister(stored_lister) with pytest.raises(StaleData) as exc: swh_scheduler.update_lister(stored_lister) assert "state not updated" in exc.value.args[0] def test_record_listed_origins(self, swh_scheduler, listed_origins): ret = swh_scheduler.record_listed_origins(listed_origins) assert set(returned.url for returned in ret) == set( origin.url for origin in listed_origins ) assert all(origin.first_seen == origin.last_seen for origin in ret) def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins): # First, insert `cutoff` origins cutoff = 100 assert cutoff < len(listed_origins) ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff]) assert len(ret) == cutoff # Then, insert all origins, including the `cutoff` first. ret = swh_scheduler.record_listed_origins(listed_origins) assert len(ret) == len(listed_origins) # Two different "first seen" values assert len(set(origin.first_seen for origin in ret)) == 2 # But a single "last seen" value assert len(set(origin.last_seen for origin in ret)) == 1 def test_get_listed_origins_exact(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) for i, origin in enumerate(listed_origins): ret = swh_scheduler.get_listed_origins( lister_id=origin.lister_id, url=origin.url ) assert ret.next_page_token is None assert len(ret.origins) == 1 assert ret.origins[0].lister_id == origin.lister_id assert ret.origins[0].url == origin.url @pytest.mark.parametrize("num_origins,limit", [(20, 6), (5, 42), (20, 20)]) def test_get_listed_origins_limit( self, swh_scheduler, listed_origins, num_origins, limit ) -> None: added_origins = sorted( listed_origins[:num_origins], key=lambda o: (o.lister_id, o.url) ) swh_scheduler.record_listed_origins(added_origins) returned_origins: List[ListedOrigin] = [] call_count = 0 next_page_token: Optional[ListedOriginPageToken] = None while True: call_count += 1 ret = swh_scheduler.get_listed_origins( lister_id=listed_origins[0].lister_id, limit=limit, page_token=next_page_token, ) returned_origins.extend(ret.origins) next_page_token = ret.next_page_token if next_page_token is None: break assert call_count == (num_origins // limit) + 1 assert len(returned_origins) == num_origins assert [(origin.lister_id, origin.url) for origin in returned_origins] == [ (origin.lister_id, origin.url) for origin in added_origins ] def test_get_listed_origins_all(self, swh_scheduler, listed_origins) -> None: swh_scheduler.record_listed_origins(listed_origins) ret = swh_scheduler.get_listed_origins(limit=len(listed_origins) + 1) assert ret.next_page_token is None assert len(ret.origins) == len(listed_origins) + @pytest.mark.parametrize("policy", ["oldest_scheduled_first"]) + def test_grab_next_visits(self, swh_scheduler, listed_origins_by_type, policy): + NUM_RESULTS = 5 + # Strict inequality to check that grab_next_visits doesn't return more + # results than requested + visit_type = next(iter(listed_origins_by_type)) + assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS + + for origins in listed_origins_by_type.values(): + swh_scheduler.record_listed_origins(origins) + + before = utcnow() + ret = swh_scheduler.grab_next_visits(visit_type, NUM_RESULTS, policy=policy) + after = utcnow() + + assert len(ret) == NUM_RESULTS + for origin in ret: + pk = (origin.url, origin.visit_type) + visit_stats = swh_scheduler.origin_visit_stats_get([pk])[0] + assert visit_stats is not None + assert before <= visit_stats.last_scheduled <= after + + @pytest.mark.parametrize("policy", ["oldest_scheduled_first"]) + def test_grab_next_visits_underflow( + self, swh_scheduler, listed_origins_by_type, policy + ): + NUM_RESULTS = 5 + visit_type = next(iter(listed_origins_by_type)) + assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS + + swh_scheduler.record_listed_origins( + listed_origins_by_type[visit_type][:NUM_RESULTS] + ) + + ret = swh_scheduler.grab_next_visits(visit_type, NUM_RESULTS + 2, policy=policy) + + assert len(ret) == NUM_RESULTS + + def test_grab_next_visits_unknown_policy(self, swh_scheduler): + NUM_RESULTS = 5 + with pytest.raises(UnknownPolicy, match="non_existing_policy"): + swh_scheduler.grab_next_visits( + "type", NUM_RESULTS, policy="non_existing_policy" + ) + def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt) + + def test_origin_visit_stats_get_empty(self, swh_scheduler) -> None: + assert swh_scheduler.origin_visit_stats_get([]) == [] + + def test_origin_visit_stats_upsert(self, swh_scheduler) -> None: + eventful_date = utcnow() + url = "https://github.com/test" + + visit_stats = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=eventful_date, + last_uneventful=None, + last_failed=None, + last_notfound=None, + ) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) + + assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] + assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] + + uneventful_date = utcnow() + visit_stats = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=None, + last_uneventful=uneventful_date, + last_failed=None, + last_notfound=None, + ) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) + + uneventful_visits = swh_scheduler.origin_visit_stats_get([(url, "git")]) + + expected_visit_stats = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=eventful_date, + last_uneventful=uneventful_date, + last_failed=None, + last_notfound=None, + ) + + assert uneventful_visits == [expected_visit_stats] + + failed_date = utcnow() + visit_stats = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=None, + last_uneventful=None, + last_failed=failed_date, + last_notfound=None, + ) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) + + failed_visits = swh_scheduler.origin_visit_stats_get([(url, "git")]) + + expected_visit_stats = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=eventful_date, + last_uneventful=uneventful_date, + last_failed=failed_date, + last_notfound=None, + ) + + assert failed_visits == [expected_visit_stats] + + def test_origin_visit_stats_upsert_with_snapshot(self, swh_scheduler) -> None: + eventful_date = utcnow() + url = "https://github.com/666/test" + + visit_stats = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=eventful_date, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ) + swh_scheduler.origin_visit_stats_upsert([visit_stats]) + + assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] + assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] + + def test_origin_visit_stats_upsert_messing_with_time(self, swh_scheduler) -> None: + url = "interesting-origin" + + # Let's play with dates... + date2 = utcnow() + date1 = date2 - ONEDAY + date0 = date1 - ONEDAY + assert date0 < date1 < date2 + + snapshot2 = hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd") + snapshot0 = hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157bfff") + visit_stats0 = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=date2, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=snapshot2, + ) + swh_scheduler.origin_visit_stats_upsert([visit_stats0]) + + actual_visit_stats0 = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] + assert actual_visit_stats0 == visit_stats0 + + visit_stats2 = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=None, + last_uneventful=date1, + last_notfound=None, + last_failed=None, + ) + swh_scheduler.origin_visit_stats_upsert([visit_stats2]) + + actual_visit_stats2 = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] + assert actual_visit_stats2 == attr.evolve( + actual_visit_stats0, last_uneventful=date1 + ) + + # a past date, what happens? + # date0 < date2 so this ovs should be dismissed + # the "eventful" associated snapshot should be dismissed as well + visit_stats1 = OriginVisitStats( + url=url, + visit_type="git", + last_eventful=date0, + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=snapshot0, + ) + swh_scheduler.origin_visit_stats_upsert([visit_stats1]) + + actual_visit_stats1 = swh_scheduler.origin_visit_stats_get([(url, "git")])[0] + + assert actual_visit_stats1 == attr.evolve( + actual_visit_stats2, last_eventful=date2 + ) + + def test_origin_visit_stats_upsert_batch(self, swh_scheduler) -> None: + """Batch upsert is ok""" + visit_stats = [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), + ), + OriginVisitStats( + url="bar", + visit_type="git", + last_eventful=None, + last_uneventful=utcnow(), + last_notfound=None, + last_failed=None, + last_snapshot=hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157bfff"), + ), + ] + + swh_scheduler.origin_visit_stats_upsert(visit_stats) + + for visit_stat in swh_scheduler.origin_visit_stats_get( + [(vs.url, vs.visit_type) for vs in visit_stats] + ): + assert visit_stat is not None + + def test_origin_visit_stats_upsert_cardinality_failing(self, swh_scheduler) -> None: + """Batch upsert does not support altering multiple times the same origin-visit-status + + """ + with pytest.raises(SchedulerException, match="CardinalityViolation"): + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=utcnow(), + last_notfound=None, + last_failed=None, + last_snapshot=None, + ), + OriginVisitStats( + url="foo", + visit_type="git", + last_eventful=None, + last_uneventful=utcnow(), + last_notfound=None, + last_failed=None, + last_snapshot=None, + ), + ] + ) + + def test_metrics_origins_known(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + ret = swh_scheduler.update_metrics() + + assert sum(metric.origins_known for metric in ret) == len(listed_origins) + + def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + disabled_origin = attr.evolve(listed_origins[0], enabled=False) + swh_scheduler.record_listed_origins([disabled_origin]) + + ret = swh_scheduler.update_metrics(lister_id=disabled_origin.lister_id) + for metric in ret: + if metric.visit_type == disabled_origin.visit_type: + # We disabled one of these origins + assert metric.origins_known - metric.origins_enabled == 1 + else: + # But these are still all enabled + assert metric.origins_known == metric.origins_enabled + + def test_metrics_origins_never_visited(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + # Pretend that we've recorded a visit on one origin + visited_origin = listed_origins[0] + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url=visited_origin.url, + visit_type=visited_origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes( + "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" + ), + ), + ] + ) + + ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) + for metric in ret: + if metric.visit_type == visited_origin.visit_type: + # We visited one of these origins + assert metric.origins_known - metric.origins_never_visited == 1 + else: + # But none of these have been visited + assert metric.origins_known == metric.origins_never_visited + + def test_metrics_origins_with_pending_changes(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + # Pretend that we've recorded a visit on one origin, in the past with + # respect to the "last update" time for the origin + visited_origin = listed_origins[0] + assert visited_origin.last_update is not None + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url=visited_origin.url, + visit_type=visited_origin.visit_type, + last_eventful=visited_origin.last_update + - datetime.timedelta(days=1), + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes( + "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" + ), + ), + ] + ) + + ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) + for metric in ret: + if metric.visit_type == visited_origin.visit_type: + # We visited one of these origins, in the past + assert metric.origins_with_pending_changes == 1 + else: + # But none of these have been visited + assert metric.origins_with_pending_changes == 0 + + def test_update_metrics_explicit_lister(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + fake_uuid = uuid.uuid4() + assert all(fake_uuid != origin.lister_id for origin in listed_origins) + + ret = swh_scheduler.update_metrics(lister_id=fake_uuid) + + assert len(ret) == 0 + + def test_update_metrics_explicit_timestamp(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + ts = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + + ret = swh_scheduler.update_metrics(timestamp=ts) + + assert all(metric.last_update == ts for metric in ret) + + def test_update_metrics_twice(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + ts = utcnow() + ret = swh_scheduler.update_metrics(timestamp=ts) + assert all(metric.last_update == ts for metric in ret) + + second_ts = ts + datetime.timedelta(seconds=1) + ret = swh_scheduler.update_metrics(timestamp=second_ts) + assert all(metric.last_update == second_ts for metric in ret) + + def test_get_metrics(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics() + assert_metrics_equal(updated, retrieved) + + def test_get_metrics_by_lister(self, swh_scheduler, listed_origins): + lister_id = listed_origins[0].lister_id + assert lister_id is not None + + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics(lister_id=lister_id) + assert len(retrieved) > 0 + + assert_metrics_equal( + [metric for metric in updated if metric.lister_id == lister_id], retrieved + ) + + def test_get_metrics_by_visit_type(self, swh_scheduler, listed_origins): + visit_type = listed_origins[0].visit_type + assert visit_type is not None + + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics(visit_type=visit_type) + assert len(retrieved) > 0 + + assert_metrics_equal( + [metric for metric in updated if metric.visit_type == visit_type], retrieved + ) diff --git a/swh/scheduler/tests/test_simulator.py b/swh/scheduler/tests/test_simulator.py new file mode 100644 index 0000000..a93542e --- /dev/null +++ b/swh/scheduler/tests/test_simulator.py @@ -0,0 +1,53 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +import swh.scheduler.simulator as simulator +from swh.scheduler.tests.common import TASK_TYPES + +NUM_ORIGINS = 42 +TEST_RUNTIME = 1000 + + +def test_fill_test_data(swh_scheduler): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) + + res = swh_scheduler.get_listed_origins() + assert len(res.origins) == NUM_ORIGINS + assert res.next_page_token is None + + res = swh_scheduler.search_tasks() + assert len(res) == NUM_ORIGINS + + +@pytest.mark.parametrize("policy", ("oldest_scheduled_first",)) +def test_run_origin_scheduler(swh_scheduler, policy): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) + simulator.run( + swh_scheduler, + scheduler_type="origin_scheduler", + policy=policy, + runtime=TEST_RUNTIME, + ) + + +def test_run_task_scheduler(swh_scheduler): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) + simulator.run( + swh_scheduler, + scheduler_type="task_scheduler", + policy=None, + runtime=TEST_RUNTIME, + )