Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9123163
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
47 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py
index 0ec6110..62389f1 100644
--- a/swh/scheduler/celery_backend/recurrent_visits.py
+++ b/swh/scheduler/celery_backend/recurrent_visits.py
@@ -1,365 +1,369 @@
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This schedules the recurrent visits, for listed origins, in Celery.
For "oneshot" (save code now, lister) tasks, check the
:mod:`swh.scheduler.celery_backend.runner` and
:mod:`swh.scheduler.celery_backend.pika_listener` modules.
"""
from __future__ import annotations
from itertools import chain
import logging
from queue import Empty, Queue
import random
from threading import Thread
import time
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from kombu.utils.uuid import uuid
from swh.scheduler.celery_backend.config import get_available_slots
from swh.scheduler.utils import create_origin_task_dicts
if TYPE_CHECKING:
from ..interface import SchedulerInterface
from ..model import ListedOrigin
logger = logging.getLogger(__name__)
DEFAULT_POLICY = [
{"policy": "already_visited_order_by_lag", "weight": 50},
{"policy": "never_visited_oldest_update_first", "weight": 50},
]
DEFAULT_DVCS_POLICY = [
{"policy": "already_visited_order_by_lag", "weight": 49},
{"policy": "never_visited_oldest_update_first", "weight": 49},
{"policy": "origins_without_last_update", "weight": 2},
]
DEFAULT_GIT_POLICY = [
{"policy": "already_visited_order_by_lag", "weight": 49, "tablesample": 0.1},
{"policy": "never_visited_oldest_update_first", "weight": 49, "tablesample": 0.1},
{"policy": "origins_without_last_update", "weight": 2, "tablesample": 0.1},
]
DEFAULT_POLICY_CONFIG: Dict[str, List[Dict[str, Any]]] = {
"default": DEFAULT_POLICY,
"hg": DEFAULT_DVCS_POLICY,
"svn": DEFAULT_DVCS_POLICY,
"cvs": DEFAULT_DVCS_POLICY,
"bzr": DEFAULT_DVCS_POLICY,
"git": DEFAULT_GIT_POLICY,
}
"""Scheduling policies to use to retrieve visits for the given visit types, with their
relative weights"""
MIN_SLOTS_RATIO = 0.05
"""Quantity of slots that need to be available (with respect to max_queue_length) for
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger"""
QUEUE_FULL_BACKOFF = 60
"""Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots
available in the queue."""
-NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
+DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60
"""Backoff time (in seconds) if no origins have been scheduled in the current
iteration"""
BACKOFF_SPLAY = 5.0
"""Amplitude of the fuzziness between backoffs"""
TERMINATE = object()
"""Termination request received from command queue (singleton used for identity
comparison)"""
def grab_next_visits_policy_weights(
scheduler: SchedulerInterface,
visit_type: str,
num_visits: int,
policy_cfg: List[Dict[str, Any]],
) -> List[ListedOrigin]:
"""Get the next ``num_visits`` for the given ``visit_type`` using the corresponding
set of scheduling policies.
The :py:data:`POLICY_CFG` list sets, for the current visit type, the
scheduling policies used to pull the next tasks. Each policy config entry
in the list should at least have a 'policy' (policy name) and a 'weight'
entry. For each policy in this policy_cfg list, the number of returned
origins to visit will be weighted using this weight config option so that
the total number of returned origins is around num_visits. Any other
key/value entry in the policy configuration will be passed to the
scheduler.grab_next_visit() method.
This function emits a warning if the ratio of retrieved origins is off of
the requested ratio by more than 5%.
Returns:
at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects
"""
policies = [cfg["policy"] for cfg in policy_cfg]
if len(set(policies)) != len(policies):
raise ValueError(
"A policy weights can only appear once; check your policy "
f"configuration for visit type {visit_type}"
)
weights = [cfg["weight"] for cfg in policy_cfg]
total_weight = sum(weights)
if not total_weight:
raise ValueError(f"No policy weights set for visit type {visit_type}")
ratios = [weight / total_weight for weight in weights]
extra_kws = [
{k: v for k, v in cfg.items() if k not in ("weight", "policy")}
for cfg in policy_cfg
]
fetched_origins: Dict[str, List[ListedOrigin]] = {}
for policy, ratio, extra_kw in zip(policies, ratios, extra_kws):
num_tasks_to_send = int(num_visits * ratio)
fetched_origins[policy] = scheduler.grab_next_visits(
visit_type,
num_tasks_to_send,
policy=policy,
**extra_kw,
)
all_origins: List[ListedOrigin] = list(
chain.from_iterable(fetched_origins.values())
)
if not all_origins:
return []
# Check whether the ratios of origins fetched are skewed with respect to the
# ones we requested
fetched_origin_ratios = {
policy: len(origins) / len(all_origins)
for policy, origins in fetched_origins.items()
}
for policy, expected_ratio in zip(policies, ratios):
# 5% of skew with respect to request
if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05:
logger.info(
"Skewed fetch for visit type %s with policy %s: fetched %s, "
"requested %s",
visit_type,
policy,
fetched_origin_ratios[policy],
expected_ratio,
)
return all_origins
def splay():
"""Return a random short interval by which to vary the backoffs for the visit
scheduling threads"""
return random.uniform(0, BACKOFF_SPLAY)
def send_visits_for_visit_type(
scheduler: SchedulerInterface,
app,
visit_type: str,
task_type: Dict,
policy_cfg: List[Dict[str, Any]],
+ no_origins_scheduled_backoff: int = DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF,
) -> float:
"""Schedule the next batch of visits for the given ``visit_type``.
First, we determine the number of available slots by introspecting the RabbitMQ
queue.
If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we
wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when
there's not many jobs to queue.
Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run
:py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits
to schedule, and we send them to celery.
- If the last scheduling attempt didn't return any origins, we sleep for
- :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
+ If the last scheduling attempt didn't return any origins, we sleep by default for
+ :py:data:`DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive
:py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too
often if there's nothing left to schedule.
The :py:data:`POLICY_CFG` argument is the policy configuration used to
choose the next origins to visit. It is passed directly to the
:py:func:`grab_next_visits_policy_weights()` function.
Returns:
the earliest :py:func:`time.monotonic` value at which to run the next iteration
of the loop.
"""
queue_name = task_type["backend_name"]
max_queue_length = task_type.get("max_queue_length") or 0
min_available_slots = max_queue_length * MIN_SLOTS_RATIO
current_iteration_start = time.monotonic()
# Check queue level
available_slots = get_available_slots(app, queue_name, max_queue_length)
logger.debug(
"%s available slots for visit type %s in queue %s",
available_slots,
visit_type,
queue_name,
)
if available_slots < min_available_slots:
return current_iteration_start + QUEUE_FULL_BACKOFF
origins = grab_next_visits_policy_weights(
scheduler, visit_type, available_slots, policy_cfg
)
if not origins:
logger.debug("No origins to visit for type %s", visit_type)
- return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF
+ return current_iteration_start + no_origins_scheduled_backoff
# Try to smooth the ingestion load, origins pulled by different
# scheduling policies have different resource usage patterns
random.shuffle(origins)
for task_dict in create_origin_task_dicts(origins, scheduler):
app.send_task(
queue_name,
task_id=uuid(),
args=task_dict["arguments"]["args"],
kwargs=task_dict["arguments"]["kwargs"],
queue=queue_name,
)
logger.info(
"%s: %s visits scheduled in queue %s",
visit_type,
len(origins),
queue_name,
)
# When everything worked, we can try to schedule origins again ASAP.
return time.monotonic()
def visit_scheduler_thread(
config: Dict,
visit_type: str,
command_queue: Queue[object],
exc_queue: Queue[Tuple[str, BaseException]],
):
"""Target function for the visit sending thread, which initializes local connections
and handles exceptions by sending them back to the main thread."""
from swh.scheduler import get_scheduler
from swh.scheduler.celery_backend.config import build_app
try:
# We need to reinitialize these connections because they're not generally
# thread-safe
app = build_app(config.get("celery"))
scheduler = get_scheduler(**config["scheduler"])
task_type = scheduler.get_task_type(f"load-{visit_type}")
if task_type is None:
raise ValueError(f"Unknown task type: load-{visit_type}")
policy_cfg = config.get("scheduling_policy", DEFAULT_POLICY_CONFIG)
for policies in policy_cfg.values():
for policy in policies:
if "weight" not in policy or "policy" not in policy:
raise ValueError(
"Each policy configuration needs at least a 'policy' "
"and a 'weight' entry"
)
policy_cfg = {**DEFAULT_POLICY_CONFIG, **policy_cfg}
next_iteration = time.monotonic()
while True:
# vary the next iteration time a little bit
next_iteration = next_iteration + splay()
while time.monotonic() < next_iteration:
# Wait for next iteration to start. Listen for termination message.
try:
msg = command_queue.get(block=True, timeout=1)
except Empty:
continue
if msg is TERMINATE:
return
else:
logger.warn("Received unexpected message %s in command queue", msg)
next_iteration = send_visits_for_visit_type(
scheduler,
app,
visit_type,
task_type,
policy_cfg.get(visit_type, policy_cfg["default"]),
+ config.get(
+ "no_origins_scheduled_backoff", DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF
+ ),
)
except BaseException as e:
exc_queue.put((visit_type, e))
VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]]
"""Dict storing the visit scheduler threads and their command queues"""
def spawn_visit_scheduler_thread(
threads: VisitSchedulerThreads,
exc_queue: Queue[Tuple[str, BaseException]],
config: Dict[str, Any],
visit_type: str,
):
"""Spawn a new thread to schedule the visits of type ``visit_type``."""
command_queue: Queue[object] = Queue()
thread = Thread(
target=visit_scheduler_thread,
kwargs={
"config": config,
"visit_type": visit_type,
"command_queue": command_queue,
"exc_queue": exc_queue,
},
)
threads[visit_type] = (thread, command_queue)
thread.start()
def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]:
"""Terminate all visit scheduler threads"""
logger.info("Termination requested...")
for _, command_queue in threads.values():
command_queue.put(TERMINATE)
loops = 0
while threads and loops < 10:
logger.info(
"Terminating visit scheduling threads: %s", ", ".join(sorted(threads))
)
loops += 1
for visit_type, (thread, _) in list(threads.items()):
thread.join(timeout=1)
if not thread.is_alive():
logger.debug("Thread %s terminated", visit_type)
del threads[visit_type]
if threads:
logger.warn(
"Could not reap the following threads after 10 attempts: %s",
", ".join(sorted(threads)),
)
return list(sorted(threads))
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
index 5152656..6a55071 100644
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -1,946 +1,946 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from itertools import islice
import logging
import random
import re
import tempfile
from unittest.mock import patch
from click.testing import CliRunner
import pytest
from swh.core.api.classes import stream_results
from swh.model.model import Origin
from swh.scheduler.cli import cli
from swh.scheduler.utils import create_task_dict, utcnow
CLI_CONFIG = """
scheduler:
cls: foo
args: {}
"""
-def invoke(scheduler, catch_exceptions, args):
+def invoke(scheduler, catch_exceptions, args, config=CLI_CONFIG):
runner = CliRunner()
with patch(
"swh.scheduler.get_scheduler"
) as get_scheduler_mock, tempfile.NamedTemporaryFile(
"a", suffix=".yml"
) as config_fd:
- config_fd.write(CLI_CONFIG)
+ config_fd.write(config)
config_fd.seek(0)
get_scheduler_mock.return_value = scheduler
args = [
"-C" + config_fd.name,
] + args
result = runner.invoke(cli, args, obj={"log_level": logging.WARNING})
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
return result
def test_schedule_tasks(swh_scheduler):
csv_data = (
b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};'
+ utcnow().isoformat().encode()
+ b"\n"
+ b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};'
+ utcnow().isoformat().encode()
+ b"\n"
)
with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
csv_fd.write(csv_data)
csv_fd.seek(0)
result = invoke(
swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name]
)
expected = r"""
Created 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
\['arg1', 'arg2'\]
Keyword args:
key: 'value'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
\['arg3', 'arg4'\]
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_schedule_tasks_columns(swh_scheduler):
with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n')
csv_fd.seek(0)
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule",
"-c",
"type",
"-c",
"policy",
"-c",
"args",
"-c",
"kwargs",
"-d",
";",
csv_fd.name,
],
)
expected = r"""
Created 1 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
'arg1'
'arg2'
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_schedule_task(swh_scheduler):
result = invoke(
swh_scheduler,
False,
[
"task",
"add",
"swh-test-ping",
"arg1",
"arg2",
"key=value",
],
)
expected = r"""
Created 1 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
'arg1'
'arg2'
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_none(swh_scheduler):
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
],
)
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task2["next_run"] += datetime.timedelta(days=1)
swh_scheduler.create_tasks([task1, task2])
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
],
)
expected = r"""
Found 1 swh-test-ping tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value1'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
],
)
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_filter(swh_scheduler):
task = create_task_dict("swh-test-multiping", "oneshot", key="value")
swh_scheduler.create_tasks([task])
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
],
)
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_filter_2(swh_scheduler):
swh_scheduler.create_tasks(
[
create_task_dict("swh-test-multiping", "oneshot", key="value"),
create_task_dict("swh-test-ping", "oneshot", key="value2"),
]
)
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
],
)
expected = r"""
Found 1 swh-test-ping tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
# Fails because "task list-pending --limit 3" only returns 2 tasks, because
# of how compute_nb_tasks_from works.
@pytest.mark.xfail
def test_list_pending_tasks_limit(swh_scheduler):
swh_scheduler.create_tasks(
[
create_task_dict("swh-test-ping", "oneshot", key="value%d" % i)
for i in range(10)
]
)
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
"--limit",
"3",
],
)
expected = r"""
Found 2 swh-test-ping tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value0'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value1'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_before(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3)
task2["next_run"] += datetime.timedelta(days=1)
swh_scheduler.create_tasks([task1, task2])
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
"--before",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 swh-test-ping tasks
Task 2
Next run: tomorrow \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
],
)
expected = r"""
Found 2 tasks
Task 1
Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_id(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
"--task-id",
"2",
],
)
expected = r"""
Found 1 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_id_2(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(
swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"]
)
expected = r"""
Found 2 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value3'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_type(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(
swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"]
)
expected = r"""
Found 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value3'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_limit(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
"--limit",
"2",
],
)
expected = r"""
Found 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_before(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
"--before",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_after(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
"--after",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 tasks
Task 1
Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def _fill_storage_with_origins(storage, nb_origins):
origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)]
storage.origin_add(origins)
return origins
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
def test_task_schedule_origins_dry_run(swh_scheduler, storage):
"""Tests the scheduling when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
_fill_storage_with_origins(storage, 90)
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"--dry-run",
"swh-test-ping",
],
)
# Check the output
expected = r"""
Scheduled 3 tasks \(30 origins\).
Scheduled 6 tasks \(60 origins\).
Scheduled 9 tasks \(90 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check scheduled tasks
tasks = swh_scheduler.search_tasks()
assert len(tasks) == 0
def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins):
# check there are not too many tasks
assert len(tasks) <= max_tasks
# check tasks are not too large
assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks)
# check the tasks are exhaustive
assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len(
expected_origins
)
assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == {
origin.url for origin in expected_origins
}
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
def test_task_schedule_origins(swh_scheduler, storage):
"""Tests the scheduling when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
origins = _fill_storage_with_origins(storage, 70)
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"swh-test-ping",
"--batch-size",
"20",
],
)
# Check the output
expected = r"""
Scheduled 3 tasks \(60 origins\).
Scheduled 4 tasks \(70 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check tasks
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, 4, 20, origins)
assert all(task["arguments"]["kwargs"] == {} for task in tasks)
def test_task_schedule_origins_kwargs(swh_scheduler, storage):
"""Tests support of extra keyword-arguments."""
origins = _fill_storage_with_origins(storage, 30)
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"swh-test-ping",
"--batch-size",
"20",
'key1="value1"',
'key2="value2"',
],
)
# Check the output
expected = r"""
Scheduled 2 tasks \(30 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check tasks
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, 2, 20, origins)
assert all(
task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"}
for task in tasks
)
def test_task_schedule_origins_with_limit(swh_scheduler, storage):
"""Tests support of extra keyword-arguments."""
_fill_storage_with_origins(storage, 50)
limit = 20
expected_origins = list(islice(stream_results(storage.origin_list), limit))
nb_origins = len(expected_origins)
assert nb_origins == limit
max_task_size = 5
nb_tasks, remainder = divmod(nb_origins, max_task_size)
assert remainder == 0 # made the numbers go round
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"swh-test-ping",
"--batch-size",
max_task_size,
"--limit",
limit,
],
)
# Check the output
expected = rf"""
Scheduled {nb_tasks} tasks \({nb_origins} origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins)
def test_task_schedule_origins_with_page_token(swh_scheduler, storage):
"""Tests support of extra keyword-arguments."""
nb_total_origins = 50
origins = _fill_storage_with_origins(storage, nb_total_origins)
# prepare page_token and origins result expectancy
page_result = storage.origin_list(limit=10)
assert len(page_result.results) == 10
page_token = page_result.next_page_token
assert page_token is not None
# remove the first 10 origins listed as we won't see those in tasks
expected_origins = [o for o in origins if o not in page_result.results]
nb_origins = len(expected_origins)
assert nb_origins == nb_total_origins - len(page_result.results)
max_task_size = 10
nb_tasks, remainder = divmod(nb_origins, max_task_size)
assert remainder == 0
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"swh-test-ping",
"--batch-size",
max_task_size,
"--page-token",
page_token,
],
)
# Check the output
expected = rf"""
Scheduled {nb_tasks} tasks \({nb_origins} origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check tasks
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins)
def test_cli_task_runner_unknown_task_types(swh_scheduler, storage):
"""When passing at least one unknown task type, the runner should fail."""
task_types = swh_scheduler.get_task_types()
task_type_names = [t["type"] for t in task_types]
known_task_type = random.choice(task_type_names)
unknown_task_type = "unknown-task-type"
assert unknown_task_type not in task_type_names
with pytest.raises(ValueError, match="Unknown"):
invoke(
swh_scheduler,
False,
[
"start-runner",
"--task-type",
known_task_type,
"--task-type",
unknown_task_type,
],
)
@pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"])
def test_cli_task_runner_with_known_tasks(
swh_scheduler, storage, caplog, flag_priority
):
"""Trigger runner with known tasks runs smoothly."""
task_types = swh_scheduler.get_task_types()
task_type_names = [t["type"] for t in task_types]
task_type_name = random.choice(task_type_names)
task_type_name2 = random.choice(task_type_names)
# The runner will just iterate over the following known tasks and do noop. We are
# just checking the runner does not explode here.
result = invoke(
swh_scheduler,
False,
[
"start-runner",
flag_priority,
"--task-type",
task_type_name,
"--task-type",
task_type_name2,
],
)
assert result.exit_code == 0, result.output
def test_cli_task_runner_no_task(swh_scheduler, storage):
"""Trigger runner with no parameter should run as before."""
# The runner will just iterate over the existing tasks from the scheduler and do
# noop. We are just checking the runner does not explode here.
result = invoke(
swh_scheduler,
False,
[
"start-runner",
],
)
assert result.exit_code == 0, result.output
diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py
index ad3cfd2..484176f 100644
--- a/swh/scheduler/tests/test_recurrent_visits.py
+++ b/swh/scheduler/tests/test_recurrent_visits.py
@@ -1,216 +1,271 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
import logging
from queue import Queue
-from unittest.mock import MagicMock
+import time
import pytest
+import yaml
from swh.scheduler.celery_backend.recurrent_visits import (
DEFAULT_DVCS_POLICY,
VisitSchedulerThreads,
grab_next_visits_policy_weights,
send_visits_for_visit_type,
spawn_visit_scheduler_thread,
terminate_visit_scheduler_threads,
visit_scheduler_thread,
)
from .test_cli import invoke
TEST_MAX_QUEUE = 10000
MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits"
def _compute_backend_name(visit_type: str) -> str:
"Build a dummy reproducible backend name"
return f"swh.loader.{visit_type}.tasks"
@pytest.fixture
def swh_scheduler(swh_scheduler):
"""Override default fixture of the scheduler to install some more task types."""
for visit_type in ["test-git", "test-hg", "test-svn"]:
task_type = f"load-{visit_type}"
swh_scheduler.create_task_type(
{
"type": task_type,
"max_queue_length": TEST_MAX_QUEUE,
"description": "The {} testing task".format(task_type),
"backend_name": _compute_backend_name(visit_type),
"default_interval": timedelta(days=1),
"min_interval": timedelta(hours=6),
"max_interval": timedelta(days=12),
}
)
return swh_scheduler
+@pytest.fixture
+def all_task_types(swh_scheduler):
+ return {
+ task_type_d["type"]: task_type_d
+ for task_type_d in swh_scheduler.get_task_types()
+ }
+
+
def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler):
"""When passed an unknown visit type, the recurrent visit scheduler should refuse
to start."""
with pytest.raises(ValueError, match="Unknown"):
invoke(
swh_scheduler,
False,
[
"schedule-recurrent",
"--visit-type",
"unknown",
"--visit-type",
"test-git",
],
)
def test_cli_schedule_recurrent_noop(swh_scheduler, mocker):
"""When passing no visit types, the recurrent visit scheduler should start."""
spawn_visit_scheduler_thread = mocker.patch(
f"{MODULE_NAME}.spawn_visit_scheduler_thread"
)
spawn_visit_scheduler_thread.side_effect = SystemExit
# The actual scheduling threads won't spawn, they'll immediately terminate. This
# only exercises the logic to pull task types out of the database
result = invoke(swh_scheduler, False, ["schedule-recurrent"])
assert result.exit_code == 0, result.output
+def test_send_visits_for_type_no_origin_scheduled_backoff(
+ swh_scheduler, all_task_types, mocker
+):
+ visit_type = "test-git"
+ task_type = f"load-{visit_type}"
+ mocker.patch.object(time, "monotonic", lambda: 0)
+ no_origins_scheduled_backoff = 60
+ assert (
+ send_visits_for_visit_type(
+ swh_scheduler,
+ mocker.MagicMock(),
+ visit_type,
+ all_task_types[task_type],
+ DEFAULT_DVCS_POLICY,
+ no_origins_scheduled_backoff,
+ )
+ == no_origins_scheduled_backoff
+ )
+
+
+def test_cli_schedule_recurrent_no_origins_scheduled_backoff_in_config(
+ swh_scheduler, mocker
+):
+ """When passing no visit types, the recurrent visit scheduler should start."""
+
+ config = """
+ scheduler:
+ cls: foo
+ args: {}
+ no_origins_scheduled_backoff: 60
+ """
+
+ spawn_visit_scheduler_thread = mocker.patch(
+ f"{MODULE_NAME}.spawn_visit_scheduler_thread"
+ )
+ spawn_visit_scheduler_thread.side_effect = SystemExit
+ # The actual scheduling threads won't spawn, they'll immediately terminate. This
+ # only exercises the logic to pull task types out of the database
+
+ result = invoke(
+ swh_scheduler,
+ False,
+ ["schedule-recurrent", "--visit-type", "test-git"],
+ config=config,
+ )
+ assert result.exit_code == 0, result.output
+
+ assert yaml.safe_load(config) in spawn_visit_scheduler_thread.call_args[0]
+
+
def test_recurrent_visit_scheduling(
swh_scheduler,
caplog,
listed_origins_by_type,
+ all_task_types,
mocker,
):
"""Scheduling known tasks is ok."""
caplog.set_level(logging.DEBUG, MODULE_NAME)
nb_origins = 1000
- mock_celery_app = MagicMock()
+ mock_celery_app = mocker.MagicMock()
mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots")
mock_available_slots.return_value = nb_origins # Slots available in queue
# Make sure the scheduler is properly configured in terms of visit/task types
- all_task_types = {
- task_type_d["type"]: task_type_d
- for task_type_d in swh_scheduler.get_task_types()
- }
-
visit_types = list(listed_origins_by_type.keys())
assert len(visit_types) > 0
task_types = []
origins = []
for visit_type, _origins in listed_origins_by_type.items():
origins.extend(swh_scheduler.record_listed_origins(_origins))
task_type_name = f"load-{visit_type}"
assert task_type_name in all_task_types.keys()
task_type = all_task_types[task_type_name]
task_type["visit_type"] = visit_type
# we'll limit the orchestrator to the origins' type we know
task_types.append(task_type)
for visit_type in ["test-git", "test-svn"]:
task_type = f"load-{visit_type}"
send_visits_for_visit_type(
swh_scheduler,
mock_celery_app,
visit_type,
all_task_types[task_type],
DEFAULT_DVCS_POLICY,
)
assert mock_available_slots.called, "The available slots functions should be called"
records = [record.message for record in caplog.records]
# Mapping over the dict ratio/policies entries can change overall order so let's
# check the set of records
expected_records = set()
for task_type in task_types:
visit_type = task_type["visit_type"]
queue_name = task_type["backend_name"]
msg = (
f"{nb_origins} available slots for visit type {visit_type} "
f"in queue {queue_name}"
)
expected_records.add(msg)
for expected_record in expected_records:
assert expected_record in set(records)
@pytest.mark.parametrize(
"visit_type, extras",
[("test-hg", {}), ("test-git", {"tablesample": 0.1})],
)
def test_recurrent_visit_additional_parameters(
swh_scheduler, mocker, visit_type, extras
):
"""Testing additional policy parameters"""
mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits")
mock_grab_next_visits.return_value = []
policy_cfg = DEFAULT_DVCS_POLICY[:]
for policy in policy_cfg:
policy.update(extras)
grab_next_visits_policy_weights(swh_scheduler, visit_type, 10, policy_cfg)
for call in mock_grab_next_visits.call_args_list:
assert call[1].get("tablesample") == extras.get("tablesample")
@pytest.fixture
def scheduler_config(swh_scheduler_config):
return {"scheduler": {"cls": "postgresql", **swh_scheduler_config}, "celery": {}}
def test_visit_scheduler_thread_unknown_task(
swh_scheduler,
scheduler_config,
):
"""Starting a thread with unknown task type reports the error"""
unknown_visit_type = "unknown"
command_queue = Queue()
exc_queue = Queue()
visit_scheduler_thread(
scheduler_config, unknown_visit_type, command_queue, exc_queue
)
assert command_queue.empty() is True
assert exc_queue.empty() is False
assert len(exc_queue.queue) == 1
result = exc_queue.queue.pop()
assert result[0] == unknown_visit_type
assert isinstance(result[1], ValueError)
def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker):
"""Spawning and terminating threads runs smoothly"""
threads: VisitSchedulerThreads = {}
exc_queue = Queue()
mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app")
- mock_build_app.return_value = MagicMock()
+ mock_build_app.return_value = mocker.MagicMock()
assert len(threads) == 0
for visit_type in visit_types:
spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type)
# This actually only checks the spawning and terminating logic is sound
assert len(threads) == len(visit_types)
actual_threads = terminate_visit_scheduler_threads(threads)
assert not len(actual_threads)
assert mock_build_app.called
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 5:07 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3288413
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment