diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py index 0ec6110..62389f1 100644 --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -1,365 +1,369 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This schedules the recurrent visits, for listed origins, in Celery. For "oneshot" (save code now, lister) tasks, check the :mod:`swh.scheduler.celery_backend.runner` and :mod:`swh.scheduler.celery_backend.pika_listener` modules. """ from __future__ import annotations from itertools import chain import logging from queue import Empty, Queue import random from threading import Thread import time from typing import TYPE_CHECKING, Any, Dict, List, Tuple from kombu.utils.uuid import uuid from swh.scheduler.celery_backend.config import get_available_slots from swh.scheduler.utils import create_origin_task_dicts if TYPE_CHECKING: from ..interface import SchedulerInterface from ..model import ListedOrigin logger = logging.getLogger(__name__) DEFAULT_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 50}, {"policy": "never_visited_oldest_update_first", "weight": 50}, ] DEFAULT_DVCS_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 49}, {"policy": "never_visited_oldest_update_first", "weight": 49}, {"policy": "origins_without_last_update", "weight": 2}, ] DEFAULT_GIT_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 49, "tablesample": 0.1}, {"policy": "never_visited_oldest_update_first", "weight": 49, "tablesample": 0.1}, {"policy": "origins_without_last_update", "weight": 2, "tablesample": 0.1}, ] DEFAULT_POLICY_CONFIG: Dict[str, List[Dict[str, Any]]] = { "default": DEFAULT_POLICY, "hg": DEFAULT_DVCS_POLICY, "svn": DEFAULT_DVCS_POLICY, "cvs": DEFAULT_DVCS_POLICY, "bzr": DEFAULT_DVCS_POLICY, "git": DEFAULT_GIT_POLICY, } """Scheduling policies to use to retrieve visits for the given visit types, with their relative weights""" MIN_SLOTS_RATIO = 0.05 """Quantity of slots that need to be available (with respect to max_queue_length) for :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" QUEUE_FULL_BACKOFF = 60 """Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue.""" -NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 +DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 """Backoff time (in seconds) if no origins have been scheduled in the current iteration""" BACKOFF_SPLAY = 5.0 """Amplitude of the fuzziness between backoffs""" TERMINATE = object() """Termination request received from command queue (singleton used for identity comparison)""" def grab_next_visits_policy_weights( scheduler: SchedulerInterface, visit_type: str, num_visits: int, policy_cfg: List[Dict[str, Any]], ) -> List[ListedOrigin]: """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding set of scheduling policies. The :py:data:`POLICY_CFG` list sets, for the current visit type, the scheduling policies used to pull the next tasks. Each policy config entry in the list should at least have a 'policy' (policy name) and a 'weight' entry. For each policy in this policy_cfg list, the number of returned origins to visit will be weighted using this weight config option so that the total number of returned origins is around num_visits. Any other key/value entry in the policy configuration will be passed to the scheduler.grab_next_visit() method. This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%. Returns: at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects """ policies = [cfg["policy"] for cfg in policy_cfg] if len(set(policies)) != len(policies): raise ValueError( "A policy weights can only appear once; check your policy " f"configuration for visit type {visit_type}" ) weights = [cfg["weight"] for cfg in policy_cfg] total_weight = sum(weights) if not total_weight: raise ValueError(f"No policy weights set for visit type {visit_type}") ratios = [weight / total_weight for weight in weights] extra_kws = [ {k: v for k, v in cfg.items() if k not in ("weight", "policy")} for cfg in policy_cfg ] fetched_origins: Dict[str, List[ListedOrigin]] = {} for policy, ratio, extra_kw in zip(policies, ratios, extra_kws): num_tasks_to_send = int(num_visits * ratio) fetched_origins[policy] = scheduler.grab_next_visits( visit_type, num_tasks_to_send, policy=policy, **extra_kw, ) all_origins: List[ListedOrigin] = list( chain.from_iterable(fetched_origins.values()) ) if not all_origins: return [] # Check whether the ratios of origins fetched are skewed with respect to the # ones we requested fetched_origin_ratios = { policy: len(origins) / len(all_origins) for policy, origins in fetched_origins.items() } for policy, expected_ratio in zip(policies, ratios): # 5% of skew with respect to request if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: logger.info( "Skewed fetch for visit type %s with policy %s: fetched %s, " "requested %s", visit_type, policy, fetched_origin_ratios[policy], expected_ratio, ) return all_origins def splay(): """Return a random short interval by which to vary the backoffs for the visit scheduling threads""" return random.uniform(0, BACKOFF_SPLAY) def send_visits_for_visit_type( scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, policy_cfg: List[Dict[str, Any]], + no_origins_scheduled_backoff: int = DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF, ) -> float: """Schedule the next batch of visits for the given ``visit_type``. First, we determine the number of available slots by introspecting the RabbitMQ queue. If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when there's not many jobs to queue. Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run :py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits to schedule, and we send them to celery. - If the last scheduling attempt didn't return any origins, we sleep for - :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive + If the last scheduling attempt didn't return any origins, we sleep by default for + :py:data:`DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too often if there's nothing left to schedule. The :py:data:`POLICY_CFG` argument is the policy configuration used to choose the next origins to visit. It is passed directly to the :py:func:`grab_next_visits_policy_weights()` function. Returns: the earliest :py:func:`time.monotonic` value at which to run the next iteration of the loop. """ queue_name = task_type["backend_name"] max_queue_length = task_type.get("max_queue_length") or 0 min_available_slots = max_queue_length * MIN_SLOTS_RATIO current_iteration_start = time.monotonic() # Check queue level available_slots = get_available_slots(app, queue_name, max_queue_length) logger.debug( "%s available slots for visit type %s in queue %s", available_slots, visit_type, queue_name, ) if available_slots < min_available_slots: return current_iteration_start + QUEUE_FULL_BACKOFF origins = grab_next_visits_policy_weights( scheduler, visit_type, available_slots, policy_cfg ) if not origins: logger.debug("No origins to visit for type %s", visit_type) - return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF + return current_iteration_start + no_origins_scheduled_backoff # Try to smooth the ingestion load, origins pulled by different # scheduling policies have different resource usage patterns random.shuffle(origins) for task_dict in create_origin_task_dicts(origins, scheduler): app.send_task( queue_name, task_id=uuid(), args=task_dict["arguments"]["args"], kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, ) logger.info( "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, ) # When everything worked, we can try to schedule origins again ASAP. return time.monotonic() def visit_scheduler_thread( config: Dict, visit_type: str, command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]], ): """Target function for the visit sending thread, which initializes local connections and handles exceptions by sending them back to the main thread.""" from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import build_app try: # We need to reinitialize these connections because they're not generally # thread-safe app = build_app(config.get("celery")) scheduler = get_scheduler(**config["scheduler"]) task_type = scheduler.get_task_type(f"load-{visit_type}") if task_type is None: raise ValueError(f"Unknown task type: load-{visit_type}") policy_cfg = config.get("scheduling_policy", DEFAULT_POLICY_CONFIG) for policies in policy_cfg.values(): for policy in policies: if "weight" not in policy or "policy" not in policy: raise ValueError( "Each policy configuration needs at least a 'policy' " "and a 'weight' entry" ) policy_cfg = {**DEFAULT_POLICY_CONFIG, **policy_cfg} next_iteration = time.monotonic() while True: # vary the next iteration time a little bit next_iteration = next_iteration + splay() while time.monotonic() < next_iteration: # Wait for next iteration to start. Listen for termination message. try: msg = command_queue.get(block=True, timeout=1) except Empty: continue if msg is TERMINATE: return else: logger.warn("Received unexpected message %s in command queue", msg) next_iteration = send_visits_for_visit_type( scheduler, app, visit_type, task_type, policy_cfg.get(visit_type, policy_cfg["default"]), + config.get( + "no_origins_scheduled_backoff", DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF + ), ) except BaseException as e: exc_queue.put((visit_type, e)) VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] """Dict storing the visit scheduler threads and their command queues""" def spawn_visit_scheduler_thread( threads: VisitSchedulerThreads, exc_queue: Queue[Tuple[str, BaseException]], config: Dict[str, Any], visit_type: str, ): """Spawn a new thread to schedule the visits of type ``visit_type``.""" command_queue: Queue[object] = Queue() thread = Thread( target=visit_scheduler_thread, kwargs={ "config": config, "visit_type": visit_type, "command_queue": command_queue, "exc_queue": exc_queue, }, ) threads[visit_type] = (thread, command_queue) thread.start() def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: """Terminate all visit scheduler threads""" logger.info("Termination requested...") for _, command_queue in threads.values(): command_queue.put(TERMINATE) loops = 0 while threads and loops < 10: logger.info( "Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) ) loops += 1 for visit_type, (thread, _) in list(threads.items()): thread.join(timeout=1) if not thread.is_alive(): logger.debug("Thread %s terminated", visit_type) del threads[visit_type] if threads: logger.warn( "Could not reap the following threads after 10 attempts: %s", ", ".join(sorted(threads)), ) return list(sorted(threads)) diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py index 5152656..6a55071 100644 --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -1,946 +1,946 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from itertools import islice import logging import random import re import tempfile from unittest.mock import patch from click.testing import CliRunner import pytest from swh.core.api.classes import stream_results from swh.model.model import Origin from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict, utcnow CLI_CONFIG = """ scheduler: cls: foo args: {} """ -def invoke(scheduler, catch_exceptions, args): +def invoke(scheduler, catch_exceptions, args, config=CLI_CONFIG): runner = CliRunner() with patch( "swh.scheduler.get_scheduler" ) as get_scheduler_mock, tempfile.NamedTemporaryFile( "a", suffix=".yml" ) as config_fd: - config_fd.write(CLI_CONFIG) + config_fd.write(config) config_fd.seek(0) get_scheduler_mock.return_value = scheduler args = [ "-C" + config_fd.name, ] + args result = runner.invoke(cli, args, obj={"log_level": logging.WARNING}) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_schedule_tasks(swh_scheduler): csv_data = ( b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};' + utcnow().isoformat().encode() + b"\n" ) with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(csv_data) csv_fd.seek(0) result = invoke( swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name] ) expected = r""" Created 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg1', 'arg2'\] Keyword args: key: 'value' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg3', 'arg4'\] Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_tasks_columns(swh_scheduler): with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n') csv_fd.seek(0) result = invoke( swh_scheduler, False, [ "task", "schedule", "-c", "type", "-c", "policy", "-c", "args", "-c", "kwargs", "-d", ";", csv_fd.name, ], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_task(swh_scheduler): result = invoke( swh_scheduler, False, [ "task", "add", "swh-test-ping", "arg1", "arg2", "key=value", ], ) expected = r""" Created 1 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: 'arg1' 'arg2' Keyword args: key: 'value' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_none(swh_scheduler): result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 1 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter(swh_scheduler): task = create_task_dict("swh-test-multiping", "oneshot", key="value") swh_scheduler.create_tasks([task]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 0 swh-test-ping tasks """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter_2(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-multiping", "oneshot", key="value"), create_task_dict("swh-test-ping", "oneshot", key="value2"), ] ) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", ], ) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # Fails because "task list-pending --limit 3" only returns 2 tasks, because # of how compute_nb_tasks_from works. @pytest.mark.xfail def test_list_pending_tasks_limit(swh_scheduler): swh_scheduler.create_tasks( [ create_task_dict("swh-test-ping", "oneshot", key="value%d" % i) for i in range(10) ] ) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", "--limit", "3", ], ) expected = r""" Found 2 swh-test-ping tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value0' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3) task2["next_run"] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke( swh_scheduler, False, [ "task", "list-pending", "swh-test-ping", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 swh-test-ping tasks Task 2 Next run: tomorrow \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", ], ) expected = r""" Found 2 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, [ "task", "list", "--task-id", "2", ], ) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id_2(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"] ) expected = r""" Found 2 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_type(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"] ) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 3 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_limit(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task3 = create_task_dict("swh-test-ping", "oneshot", key="value3") swh_scheduler.create_tasks([task1, task2, task3]) result = invoke( swh_scheduler, False, [ "task", "list", "--limit", "2", ], ) expected = r""" Found 2 tasks Task 1 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_before(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--before", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 2 Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_after(swh_scheduler): task1 = create_task_dict("swh-test-ping", "oneshot", key="value1") task2 = create_task_dict("swh-test-ping", "oneshot", key="value2") task1["next_run"] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks("swh-test-ping") result = invoke( swh_scheduler, False, [ "task", "list", "--after", (datetime.date.today() + datetime.timedelta(days=2)).isoformat(), ], ) expected = r""" Found 1 tasks Task 1 Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def _fill_storage_with_origins(storage, nb_origins): origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)] storage.origin_add(origins) return origins @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins_dry_run(swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a divisor of nb_origins.""" _fill_storage_with_origins(storage, 90) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "--dry-run", "swh-test-ping", ], ) # Check the output expected = r""" Scheduled 3 tasks \(30 origins\). Scheduled 6 tasks \(60 origins\). Scheduled 9 tasks \(90 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check scheduled tasks tasks = swh_scheduler.search_tasks() assert len(tasks) == 0 def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins): # check there are not too many tasks assert len(tasks) <= max_tasks # check tasks are not too large assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks) # check the tasks are exhaustive assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len( expected_origins ) assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == { origin.url for origin in expected_origins } @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) def test_task_schedule_origins(swh_scheduler, storage): """Tests the scheduling when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" origins = _fill_storage_with_origins(storage, 70) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", "20", ], ) # Check the output expected = r""" Scheduled 3 tasks \(60 origins\). Scheduled 4 tasks \(70 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 4, 20, origins) assert all(task["arguments"]["kwargs"] == {} for task in tasks) def test_task_schedule_origins_kwargs(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" origins = _fill_storage_with_origins(storage, 30) result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", "20", 'key1="value1"', 'key2="value2"', ], ) # Check the output expected = r""" Scheduled 2 tasks \(30 origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 2, 20, origins) assert all( task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} for task in tasks ) def test_task_schedule_origins_with_limit(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" _fill_storage_with_origins(storage, 50) limit = 20 expected_origins = list(islice(stream_results(storage.origin_list), limit)) nb_origins = len(expected_origins) assert nb_origins == limit max_task_size = 5 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 # made the numbers go round result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--limit", limit, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_task_schedule_origins_with_page_token(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" nb_total_origins = 50 origins = _fill_storage_with_origins(storage, nb_total_origins) # prepare page_token and origins result expectancy page_result = storage.origin_list(limit=10) assert len(page_result.results) == 10 page_token = page_result.next_page_token assert page_token is not None # remove the first 10 origins listed as we won't see those in tasks expected_origins = [o for o in origins if o not in page_result.results] nb_origins = len(expected_origins) assert nb_origins == nb_total_origins - len(page_result.results) max_task_size = 10 nb_tasks, remainder = divmod(nb_origins, max_task_size) assert remainder == 0 result = invoke( swh_scheduler, False, [ "task", "schedule_origins", "swh-test-ping", "--batch-size", max_task_size, "--page-token", page_token, ], ) # Check the output expected = rf""" Scheduled {nb_tasks} tasks \({nb_origins} origins\). Done. """.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) def test_cli_task_runner_unknown_task_types(swh_scheduler, storage): """When passing at least one unknown task type, the runner should fail.""" task_types = swh_scheduler.get_task_types() task_type_names = [t["type"] for t in task_types] known_task_type = random.choice(task_type_names) unknown_task_type = "unknown-task-type" assert unknown_task_type not in task_type_names with pytest.raises(ValueError, match="Unknown"): invoke( swh_scheduler, False, [ "start-runner", "--task-type", known_task_type, "--task-type", unknown_task_type, ], ) @pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"]) def test_cli_task_runner_with_known_tasks( swh_scheduler, storage, caplog, flag_priority ): """Trigger runner with known tasks runs smoothly.""" task_types = swh_scheduler.get_task_types() task_type_names = [t["type"] for t in task_types] task_type_name = random.choice(task_type_names) task_type_name2 = random.choice(task_type_names) # The runner will just iterate over the following known tasks and do noop. We are # just checking the runner does not explode here. result = invoke( swh_scheduler, False, [ "start-runner", flag_priority, "--task-type", task_type_name, "--task-type", task_type_name2, ], ) assert result.exit_code == 0, result.output def test_cli_task_runner_no_task(swh_scheduler, storage): """Trigger runner with no parameter should run as before.""" # The runner will just iterate over the existing tasks from the scheduler and do # noop. We are just checking the runner does not explode here. result = invoke( swh_scheduler, False, [ "start-runner", ], ) assert result.exit_code == 0, result.output diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py index ad3cfd2..484176f 100644 --- a/swh/scheduler/tests/test_recurrent_visits.py +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -1,216 +1,271 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import logging from queue import Queue -from unittest.mock import MagicMock +import time import pytest +import yaml from swh.scheduler.celery_backend.recurrent_visits import ( DEFAULT_DVCS_POLICY, VisitSchedulerThreads, grab_next_visits_policy_weights, send_visits_for_visit_type, spawn_visit_scheduler_thread, terminate_visit_scheduler_threads, visit_scheduler_thread, ) from .test_cli import invoke TEST_MAX_QUEUE = 10000 MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits" def _compute_backend_name(visit_type: str) -> str: "Build a dummy reproducible backend name" return f"swh.loader.{visit_type}.tasks" @pytest.fixture def swh_scheduler(swh_scheduler): """Override default fixture of the scheduler to install some more task types.""" for visit_type in ["test-git", "test-hg", "test-svn"]: task_type = f"load-{visit_type}" swh_scheduler.create_task_type( { "type": task_type, "max_queue_length": TEST_MAX_QUEUE, "description": "The {} testing task".format(task_type), "backend_name": _compute_backend_name(visit_type), "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), } ) return swh_scheduler +@pytest.fixture +def all_task_types(swh_scheduler): + return { + task_type_d["type"]: task_type_d + for task_type_d in swh_scheduler.get_task_types() + } + + def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): """When passed an unknown visit type, the recurrent visit scheduler should refuse to start.""" with pytest.raises(ValueError, match="Unknown"): invoke( swh_scheduler, False, [ "schedule-recurrent", "--visit-type", "unknown", "--visit-type", "test-git", ], ) def test_cli_schedule_recurrent_noop(swh_scheduler, mocker): """When passing no visit types, the recurrent visit scheduler should start.""" spawn_visit_scheduler_thread = mocker.patch( f"{MODULE_NAME}.spawn_visit_scheduler_thread" ) spawn_visit_scheduler_thread.side_effect = SystemExit # The actual scheduling threads won't spawn, they'll immediately terminate. This # only exercises the logic to pull task types out of the database result = invoke(swh_scheduler, False, ["schedule-recurrent"]) assert result.exit_code == 0, result.output +def test_send_visits_for_type_no_origin_scheduled_backoff( + swh_scheduler, all_task_types, mocker +): + visit_type = "test-git" + task_type = f"load-{visit_type}" + mocker.patch.object(time, "monotonic", lambda: 0) + no_origins_scheduled_backoff = 60 + assert ( + send_visits_for_visit_type( + swh_scheduler, + mocker.MagicMock(), + visit_type, + all_task_types[task_type], + DEFAULT_DVCS_POLICY, + no_origins_scheduled_backoff, + ) + == no_origins_scheduled_backoff + ) + + +def test_cli_schedule_recurrent_no_origins_scheduled_backoff_in_config( + swh_scheduler, mocker +): + """When passing no visit types, the recurrent visit scheduler should start.""" + + config = """ + scheduler: + cls: foo + args: {} + no_origins_scheduled_backoff: 60 + """ + + spawn_visit_scheduler_thread = mocker.patch( + f"{MODULE_NAME}.spawn_visit_scheduler_thread" + ) + spawn_visit_scheduler_thread.side_effect = SystemExit + # The actual scheduling threads won't spawn, they'll immediately terminate. This + # only exercises the logic to pull task types out of the database + + result = invoke( + swh_scheduler, + False, + ["schedule-recurrent", "--visit-type", "test-git"], + config=config, + ) + assert result.exit_code == 0, result.output + + assert yaml.safe_load(config) in spawn_visit_scheduler_thread.call_args[0] + + def test_recurrent_visit_scheduling( swh_scheduler, caplog, listed_origins_by_type, + all_task_types, mocker, ): """Scheduling known tasks is ok.""" caplog.set_level(logging.DEBUG, MODULE_NAME) nb_origins = 1000 - mock_celery_app = MagicMock() + mock_celery_app = mocker.MagicMock() mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") mock_available_slots.return_value = nb_origins # Slots available in queue # Make sure the scheduler is properly configured in terms of visit/task types - all_task_types = { - task_type_d["type"]: task_type_d - for task_type_d in swh_scheduler.get_task_types() - } - visit_types = list(listed_origins_by_type.keys()) assert len(visit_types) > 0 task_types = [] origins = [] for visit_type, _origins in listed_origins_by_type.items(): origins.extend(swh_scheduler.record_listed_origins(_origins)) task_type_name = f"load-{visit_type}" assert task_type_name in all_task_types.keys() task_type = all_task_types[task_type_name] task_type["visit_type"] = visit_type # we'll limit the orchestrator to the origins' type we know task_types.append(task_type) for visit_type in ["test-git", "test-svn"]: task_type = f"load-{visit_type}" send_visits_for_visit_type( swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type], DEFAULT_DVCS_POLICY, ) assert mock_available_slots.called, "The available slots functions should be called" records = [record.message for record in caplog.records] # Mapping over the dict ratio/policies entries can change overall order so let's # check the set of records expected_records = set() for task_type in task_types: visit_type = task_type["visit_type"] queue_name = task_type["backend_name"] msg = ( f"{nb_origins} available slots for visit type {visit_type} " f"in queue {queue_name}" ) expected_records.add(msg) for expected_record in expected_records: assert expected_record in set(records) @pytest.mark.parametrize( "visit_type, extras", [("test-hg", {}), ("test-git", {"tablesample": 0.1})], ) def test_recurrent_visit_additional_parameters( swh_scheduler, mocker, visit_type, extras ): """Testing additional policy parameters""" mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits") mock_grab_next_visits.return_value = [] policy_cfg = DEFAULT_DVCS_POLICY[:] for policy in policy_cfg: policy.update(extras) grab_next_visits_policy_weights(swh_scheduler, visit_type, 10, policy_cfg) for call in mock_grab_next_visits.call_args_list: assert call[1].get("tablesample") == extras.get("tablesample") @pytest.fixture def scheduler_config(swh_scheduler_config): return {"scheduler": {"cls": "postgresql", **swh_scheduler_config}, "celery": {}} def test_visit_scheduler_thread_unknown_task( swh_scheduler, scheduler_config, ): """Starting a thread with unknown task type reports the error""" unknown_visit_type = "unknown" command_queue = Queue() exc_queue = Queue() visit_scheduler_thread( scheduler_config, unknown_visit_type, command_queue, exc_queue ) assert command_queue.empty() is True assert exc_queue.empty() is False assert len(exc_queue.queue) == 1 result = exc_queue.queue.pop() assert result[0] == unknown_visit_type assert isinstance(result[1], ValueError) def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker): """Spawning and terminating threads runs smoothly""" threads: VisitSchedulerThreads = {} exc_queue = Queue() mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") - mock_build_app.return_value = MagicMock() + mock_build_app.return_value = mocker.MagicMock() assert len(threads) == 0 for visit_type in visit_types: spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) # This actually only checks the spawning and terminating logic is sound assert len(threads) == len(visit_types) actual_threads = terminate_visit_scheduler_threads(threads) assert not len(actual_threads) assert mock_build_app.called