diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3cc45b3..9552535 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,49 +1,50 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml - repo: https://gitlab.com/pycqa/flake8 rev: 3.8.3 hooks: - id: flake8 - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell + args: [-L simpy] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] - repo: https://github.com/PyCQA/isort rev: 5.5.2 hooks: - id: isort - repo: https://github.com/python/black rev: 19.10b0 hooks: - id: black # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] diff --git a/docs/index.rst b/docs/index.rst index dd7b1b1..bf185cb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,169 +1,170 @@ .. _swh-scheduler: Software Heritage - Job scheduler ================================= Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Description ----------- This module provides a scheduler service for the Software Heritage platform. It allows to define tasks with a number of properties. In this documentation, we will call these swh-tasks to prevent confusion. These swh-tasks are stored in a database, and a HTTP-based RPC service is provided to create or find existing swh-task declarations. The execution model for these swh-tasks is using Celery. Thus, each swh-task type defined in the database must have a (series of) celery worker capable of executing such a swh-task. Then a number of services are also provided to manage the scheduling of these swh-tasks as Celery tasks. The `scheduler-runner` service is a daemon that regularly looks for swh-tasks in the database that should be scheduled. For each of the selected swh-task, a Celery task is instantiated. The `scheduler-listener` service is a daemon that listen to the Celery event bus and maintain scheduled swh-tasks workflow status. SWH Task Model ~~~~~~~~~~~~~~ Each swh-task-type is the declaration of a type of swh-task. Each swh-task-type have the following fields: - `type`: Name of the swh-task type; can be anything but must be unique, - `description`: Human-readable task description - `backend_name`: Name of the task in the job-running backend, - `default_interval`: Default interval for newly scheduled tasks, - `min_interval`: Minimum interval between two runs of a task, - `max_interval`: Maximum interval between two runs of a task, - `backoff_factor`: Adjustment factor for the backoff between two task runs, - `max_queue_length`: Maximum length of the queue for this type of tasks, - `num_retries`: Default number of retries on transient failures, - `retry_delay`: Retry delay for the task, Existing swh-task-types can be listed using the `swh scheduler` command line tool:: $ swh scheduler task-type list Known task types: check-deposit: Pre-checking deposit step before loading into swh archive index-fossology-license: Fossology license indexer task load-git: Update an origin of type git load-hg: Update an origin of type mercurial You can see the details of a swh-task-type:: $ swh scheduler task-type list -v -t load-git Known task types: load-git: swh.loader.git.tasks.UpdateGitRepository Update an origin of type git interval: 64 days, 0:00:00 [12:00:00, 64 days, 0:00:00] backoff_factor: 2.0 max_queue_length: 5000 num_retries: None retry_delay: None An swh-task is an 'instance' of such a swh-task-type, and consists in: - `arguments`: Arguments passed to the underlying job scheduler, - `next_run`: Next run of this task should be run on or after that time, - `current_interval`: Interval between two runs of this task, taking into account the backoff factor, - `policy`: Whether the task is "one-shot" or "recurring", - `retries_left`: Number of "short delay" retries of the task in case of transient failure, - `priority`: Priority of the task, - `id`: Internal task identifier, - `type`: References task_type table, - `status`: Task status ( among "next_run_not_scheduled", "next_run_scheduled", "completed", "disabled"). So a swh-task consist basically in: - a set of parameters defining how the scheduling of the swh-task is handled, - a set of parameters to specify the retry policy in case of transient failure upon execution, - a set of parameters that defines the job to be done (`bakend_name` + `arguments`). You can list pending swh-tasks (tasks that are to be scheduled ASAP):: $ swh scheduler task list-pending load-git --limit 2 Found 1 load-git tasks Task 9052257 Next run: 15 days ago (2019-06-25 10:35:10+00:00) Interval: 2 days, 0:00:00 Type: load-git Policy: recurring Args: 'https://github.com/turtl/mobile' Keyword args: Looking for existing swh-task can be done via the command line tool:: $ swh scheduler task list -t load-hg --limit 2 Found 2 tasks Task 168802702 Next run: in 4 hours (2019-07-10 17:56:48+00:00) Interval: 1 day, 0:00:00 Type: load-hg Policy: recurring Status: next_run_not_scheduled Priority: Args: 'https://bitbucket.org/kepung/pypy' Keyword args: Task 169800445 Next run: in a month (2019-08-10 17:54:24+00:00) Interval: 32 days, 0:00:00 Type: load-hg Policy: recurring Status: next_run_not_scheduled Priority: Args: 'https://bitbucket.org/lunixbochs/pypy-1' Keyword args: Writing a new worker for a new swh-task-type -------------------------------------------- When you want to add a new swh-task-type, you need a celery worker backend capable of executing this new task-type instances. Celery workers for swh-scheduler based tasks should be started using the Celery app in `swh.scheduler.celery_config`. This later, among other things, provides a loading mechanism for task types based on pkg_resources declared plugins under the `[swh.workers]` entry point. TODO: add a fully working example of a dumb task. Reference Documentation ----------------------- .. toctree:: :maxdepth: 2 cli + simulator /apidoc/swh.scheduler diff --git a/docs/simulator.rst b/docs/simulator.rst new file mode 100644 index 0000000..1405ea3 --- /dev/null +++ b/docs/simulator.rst @@ -0,0 +1,55 @@ +Software Heritage Scheduler Simulator +===================================== + +This component simulates the interaction between the scheduling and loading +infrastructure of Software Heritage. This allows quick(er) development of new +task scheduling policies without having to wait for the actual infrastructure +to perform (heavy) loading tasks. + +Simulator components +-------------------- + +- real instance of the scheduler database +- simulated task queues +- simulated workers +- simulated load tasks +- simulated archive -> scheduler feedback loop + +Installing the simulator +------------------------ + +The simulator depends on SimPy and other specific libraries. To install them, +please use: + +.. code-block:: bash + + pip install 'swh.scheduler[simulator]' + +Running the simulator +--------------------- + +The simulator uses a real instance of the scheduler database, which is (at +least for now) persistent across runs of the simulator. You need to set that up +beforehand: + +.. code-block:: bash + + # if you want to use a temporary instance of postgresql + eval `pifpaf run postgresql` + + # Set this variable for the simulator to know which db to connect to. pifpaf + # sets other variables like PGPORT, PGHOST, ... + export PGDATABASE=swh-scheduler + + # Create/initialize the scheduler database + swh db create scheduler -d $PGDATABASE + swh db init scheduler -d $PGDATABASE + + # This generates some data in the scheduler database. You can also feed the + # database with more realistic data, e.g. from a lister or from a dump of the + # production database. + python3 -m swh.scheduler.simulator fill_test_data + + # Run the simulator itself, interacting with the scheduler database you've + # just seeded. + python3 -m swh.scheduler.simulator run diff --git a/mypy.ini b/mypy.ini index e0fc4ad..da7806d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,39 +1,42 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-arrow.*] ignore_missing_imports = True [mypy-celery.*] ignore_missing_imports = True [mypy-confluent_kafka.*] ignore_missing_imports = True [mypy-elasticsearch.*] ignore_missing_imports = True [mypy-humanize.*] ignore_missing_imports = True [mypy-kombu.*] ignore_missing_imports = True [mypy-pika.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psycopg2.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-pytest_postgresql.*] ignore_missing_imports = True + +[mypy-simpy.*] +ignore_missing_imports = True diff --git a/requirements-simulator.txt b/requirements-simulator.txt new file mode 100644 index 0000000..00f0ddd --- /dev/null +++ b/requirements-simulator.txt @@ -0,0 +1 @@ +simpy>=3,<4 diff --git a/setup.py b/setup.py index df08cbc..1d1e5e4 100755 --- a/setup.py +++ b/setup.py @@ -1,74 +1,76 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() -def parse_requirements(name=None): - if name: - reqf = "requirements-%s.txt" % name - else: - reqf = "requirements.txt" - +def parse_requirements(*names): requirements = [] - if not path.exists(reqf): - return requirements + for name in names: + if name: + reqf = "requirements-%s.txt" % name + else: + reqf = "requirements.txt" + + if not path.exists(reqf): + return requirements - with open(reqf) as f: - for line in f.readlines(): - line = line.strip() - if not line or line.startswith("#"): - continue - requirements.append(line) + with open(reqf) as f: + for line in f.readlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + requirements.append(line) return requirements setup( name="swh.scheduler", description="Software Heritage Scheduler", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSCH/", packages=find_packages(), setup_requires=["setuptools-scm"], use_scm_version=True, - install_requires=parse_requirements() + parse_requirements("swh"), + install_requires=parse_requirements(None, "swh"), extras_require={ - "testing": parse_requirements("test") + parse_requirements("journal"), + "testing": parse_requirements("test", "journal", "simulator"), "journal": parse_requirements("journal"), + "simulator": parse_requirements("simulator"), }, include_package_data=True, entry_points=""" [swh.cli.subcommands] scheduler=swh.scheduler.cli scheduler-journal=swh.scheduler.cli.journal """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-scheduler", "Documentation": "https://docs.softwareheritage.org/devel/swh-scheduler/", }, ) diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py new file mode 100644 index 0000000..3265cf7 --- /dev/null +++ b/swh/scheduler/simulator/__init__.py @@ -0,0 +1,144 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime, timezone +import logging +from typing import Dict, Generator, Iterator, Tuple + +from simpy import Environment, Event, Store + +from swh.scheduler import get_scheduler +from swh.scheduler.model import ListedOrigin + +logger = logging.getLogger(__name__) + + +class Queue(Store): + """Model a queue of objects to be passed between processes.""" + + def __len__(self): + return len(self.items or []) + + def slots_remaining(self): + return self.capacity - len(self) + + +def load_task_duration(visit_type: str, origin: str) -> float: + return 101.0 + + +def worker_process( + env: Environment, name: str, task_queue: Queue, status_queue: Queue +) -> Generator[Event, Tuple[str, str], None]: + """A worker which consumes tasks from the input task_queue. Tasks + themselves send OriginVisitStatus objects to the status_queue.""" + logger.debug("%s worker %s: Start", env.now, name) + while True: + visit_type, origin = yield task_queue.get() + logger.debug( + "%s worker %s: Run task %s origin=%s", env.now, name, visit_type, origin + ) + yield env.process( + load_task_process(env, visit_type, origin, status_queue=status_queue) + ) + + +def load_task_process( + env: Environment, visit_type: str, origin: str, status_queue: Queue +) -> Iterator[Event]: + """A loading task. This pushes OriginVisitStatus objects to the + status_queue to simulate the visible outcomes of the task. + + Uses the `load_task_duration` function to determine its run time. + """ + # TODO: OVS generation + task_duration = load_task_duration(visit_type, origin) + logger.debug("%s task %s origin=%s: Start", env.now, visit_type, origin) + yield env.timeout(task_duration) + logger.debug("%s task %s origin=%s: End", env.now, visit_type, origin) + + +def scheduler_runner_process( + env: Environment, scheduler, task_queues: Dict[str, Queue], policy: str, +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + logger.debug("%s runner: processing %s", env.now, visit_type) + min_batch_size = max(queue.capacity // 10, 1) + remaining = queue.slots_remaining() + if remaining < min_batch_size: + logger.debug( + "%s runner: not enough slots in %s: %s", + env.now, + visit_type, + remaining, + ) + continue + next_origins = scheduler.grab_next_visits( + visit_type, remaining, policy=policy + ) + logger.debug( + "%s runner: running %s %s tasks", env.now, visit_type, len(next_origins) + ) + for origin in next_origins: + yield queue.put((origin.visit_type, origin.url)) + + yield env.timeout(10.0) + + +def setup( + env: Environment, + workers_per_type: Dict[str, int], + task_queue_capacity: int, + policy: str, +): + # We expect PGHOST, PGPORT, ... set externally + scheduler = get_scheduler(cls="local", db="") + + task_queues = { + visit_type: Queue(env, capacity=task_queue_capacity) + for visit_type in workers_per_type + } + status_queue = Queue(env) + + env.process(scheduler_runner_process(env, scheduler, task_queues, policy)) + + for visit_type, num_workers in workers_per_type.items(): + task_queue = task_queues[visit_type] + for i in range(num_workers): + worker_name = f"worker-{visit_type}-{i}" + env.process(worker_process(env, worker_name, task_queue, status_queue)) + + +def fill_test_data(): + scheduler = get_scheduler(cls="local", db="") + stored_lister = scheduler.get_or_create_lister(name="example") + scheduler.record_listed_origins( + [ + ListedOrigin( + lister_id=stored_lister.id, + url=f"https://example.com/{i:04d}.git", + visit_type="git", + last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), + ) + for i in range(1000) + ] + ) + + +def run(): + logging.basicConfig(level=logging.DEBUG) + NUM_WORKERS = 2 + env = Environment() + setup( + env, + workers_per_type={"git": NUM_WORKERS}, + task_queue_capacity=100, + policy="oldest_scheduled_first", + ) + env.run(until=10000) diff --git a/swh/scheduler/simulator/__main__.py b/swh/scheduler/simulator/__main__.py new file mode 100644 index 0000000..af1fc53 --- /dev/null +++ b/swh/scheduler/simulator/__main__.py @@ -0,0 +1,31 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import sys + +from . import fill_test_data, run + + +def usage(): + print(f"Usage: {sys.argv[0]} fill_test_data/run") + sys.exit(2) + + +def main(argv): + try: + myself, arg = argv + except ValueError: + usage() + + if arg == "run": + run() + elif arg == "fill_test_data": + fill_test_data() + else: + usage() + + +if __name__ == "__main__": + main(sys.argv)