Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163540
D8940.id32243.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D8940.id32243.diff
View Options
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,6 +1,5 @@
pytest
pytest-mock
-celery >= 4.3
hypothesis >= 3.11.0
swh.lister
swh.storage[testing]
diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py
--- a/swh/scheduler/cli/__init__.py
+++ b/swh/scheduler/cli/__init__.py
@@ -85,7 +85,16 @@
ctx.obj["config"] = conf
-from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa
+from . import ( # noqa
+ add_forge_now,
+ admin,
+ celery_monitor,
+ journal,
+ origin,
+ simulator,
+ task,
+ task_type,
+)
def main():
diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py
--- a/swh/scheduler/cli/add_forge_now.py
+++ b/swh/scheduler/cli/add_forge_now.py
@@ -19,12 +19,90 @@
@cli.group("add-forge-now")
+@click.option(
+ "-p",
+ "--preset",
+ "preset",
+ default="production",
+ type=click.Choice(["production", "staging"]),
+ help='Determine preset to use, "production" by default.',
+)
@click.pass_context
-def add_forge_now(ctx):
- """Manipulate listed origins."""
+def add_forge_now(ctx, preset):
+ """Manipulate add-forge-now requests."""
if not ctx.obj["scheduler"]:
raise ValueError("Scheduler class (local/remote) must be instantiated")
+ ctx.obj["preset"] = preset
+
+
+@add_forge_now.command("register-lister")
+@click.argument("lister_name", nargs=1, required=True)
+@click.argument("options", nargs=-1)
+@click.pass_context
+def register_lister_cli(
+ ctx,
+ lister_name,
+ options,
+):
+ """Register the lister tasks in the scheduler.
+
+ Depending on the preset, policies vary:
+ - staging: only "full" but limited listing (as "oneshot" task) of disabled
+ origins
+ - production: both "full" and "incremental" (if that exists) listing (as "recurring"
+ task). The "full" will be triggered asap, the "incremental" will be triggered the
+ next day.
+
+ """
+ from .utils import lister_task_type, parse_options, task_add
+
+ scheduler = ctx.obj["scheduler"]
+ preset = ctx.obj["preset"]
+
+ # Map the associated task types for the lister
+ task_type_names: Dict[str, str] = {
+ listing_type: lister_task_type(lister_name, listing_type)
+ for listing_type in ["full", "incremental"]
+ }
+
+ task_types: Dict[str, Dict] = {}
+ for listing_type, task_type_name in task_type_names.items():
+ task_type = scheduler.get_task_type(task_type_name)
+ if task_type:
+ task_types[listing_type] = task_type
+
+ if not task_types:
+ raise ValueError(f"Unknown lister type {lister_name}.")
+
+ (args, kw) = parse_options(options)
+
+ # Recurring policy on production
+ if preset == "production":
+ policy = "recurring"
+ else: # staging, "full" but limited listing as a oneshot
+ policy = "oneshot"
+ kw.update({"max_pages": 3, "max_origins_per_page": 10, "enable_origins": False})
+ # We want a "full" listing in production if both incremental and full exists
+ if "full" in task_types:
+ task_types.pop("incremental", None)
+
+ from datetime import timedelta
+
+ from swh.scheduler.utils import utcnow
+
+ for listing_type, task_type in task_types.items():
+ now = utcnow()
+ next_run = now if listing_type == "full" else now + timedelta(days=1)
+ task_add(
+ scheduler,
+ task_type_name=task_type["type"],
+ args=args,
+ kw=kw,
+ policy=policy,
+ next_run=next_run,
+ )
+
@add_forge_now.command("schedule-first-visits")
@click.option(
@@ -35,14 +113,6 @@
type=str,
multiple=True,
)
-@click.option(
- "--production/--staging",
- "enabled",
- is_flag=True,
- default=True,
- help="""Determine whether we want to scheduled enabled origins (on production) or
- disabled ones (on staging).""",
-)
@click.option(
"--lister-name",
default=None,
@@ -57,7 +127,6 @@
def schedule_first_visits_cli(
ctx,
visit_type_names: List[str],
- enabled: bool,
lister_name: Optional[str] = None,
lister_instance_name: Optional[str] = None,
):
@@ -68,6 +137,7 @@
from .utils import get_task_type, send_to_celery
scheduler = ctx.obj["scheduler"]
+ preset = ctx.obj["preset"]
visit_type_to_queue: Dict[str, str] = {}
unknown_task_types = []
@@ -85,7 +155,7 @@
send_to_celery(
scheduler,
visit_type_to_queue=visit_type_to_queue,
- enabled=enabled,
+ enabled=preset == "production",
lister_name=lister_name,
lister_instance_name=lister_instance_name,
)
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -122,7 +122,7 @@
"""Send the next COUNT origin visits of the TYPE loader to the scheduler as
one-shot tasks."""
from ..utils import utcnow
- from .task import pretty_print_task
+ from .utils import pretty_print_task
scheduler = ctx.obj["scheduler"]
diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py
--- a/swh/scheduler/cli/task.py
+++ b/swh/scheduler/cli/task.py
@@ -27,128 +27,6 @@
DATETIME = click.DateTime()
-def format_dict(d):
- """Recursively format date objects in the dict passed as argument"""
- import datetime
-
- ret = {}
- for k, v in d.items():
- if isinstance(v, (datetime.date, datetime.datetime)):
- v = v.isoformat()
- elif isinstance(v, dict):
- v = format_dict(v)
- ret[k] = v
- return ret
-
-
-def pretty_print_list(list, indent=0):
- """Pretty-print a list"""
- return "".join("%s%r\n" % (" " * indent, item) for item in list)
-
-
-def pretty_print_dict(dict, indent=0):
- """Pretty-print a list"""
- return "".join(
- "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value)
- for key, value in sorted(dict.items())
- )
-
-
-def pretty_print_run(run, indent=4):
- fmt = (
- "{indent}{backend_id} [{status}]\n"
- "{indent} scheduled: {scheduled} [{started}:{ended}]"
- )
- return fmt.format(indent=" " * indent, **format_dict(run))
-
-
-def pretty_print_task(task, full=False):
- """Pretty-print a task
-
- If 'full' is True, also print the status and priority fields.
-
- >>> import datetime
- >>> task = {
- ... 'id': 1234,
- ... 'arguments': {
- ... 'args': ['foo', 'bar', True],
- ... 'kwargs': {'key': 'value', 'key2': 42},
- ... },
- ... 'current_interval': datetime.timedelta(hours=1),
- ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818),
- ... 'policy': 'oneshot',
- ... 'priority': None,
- ... 'status': 'next_run_not_scheduled',
- ... 'type': 'test_task',
- ... }
- >>> print(click.unstyle(pretty_print_task(task)))
- Task 1234
- Next run: ... (2019-02-21T13:52:35.407818)
- Interval: 1:00:00
- Type: test_task
- Policy: oneshot
- Args:
- 'foo'
- 'bar'
- True
- Keyword args:
- key: 'value'
- key2: 42
- <BLANKLINE>
- >>> print(click.unstyle(pretty_print_task(task, full=True)))
- Task 1234
- Next run: ... (2019-02-21T13:52:35.407818)
- Interval: 1:00:00
- Type: test_task
- Policy: oneshot
- Status: next_run_not_scheduled
- Priority:\x20
- Args:
- 'foo'
- 'bar'
- True
- Keyword args:
- key: 'value'
- key2: 42
- <BLANKLINE>
- """
- import humanize
-
- next_run = task["next_run"]
- lines = [
- "%s %s\n" % (click.style("Task", bold=True), task["id"]),
- click.style(" Next run: ", bold=True),
- "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()),
- "\n",
- click.style(" Interval: ", bold=True),
- str(task["current_interval"]),
- "\n",
- click.style(" Type: ", bold=True),
- task["type"] or "",
- "\n",
- click.style(" Policy: ", bold=True),
- task["policy"] or "",
- "\n",
- ]
- if full:
- lines += [
- click.style(" Status: ", bold=True),
- task["status"] or "",
- "\n",
- click.style(" Priority: ", bold=True),
- task["priority"] or "",
- "\n",
- ]
- lines += [
- click.style(" Args:\n", bold=True),
- pretty_print_list(task["arguments"]["args"], indent=4),
- click.style(" Keyword args:\n", bold=True),
- pretty_print_dict(task["arguments"]["kwargs"], indent=4),
- ]
-
- return "".join(lines)
-
-
@cli.group("task")
@click.pass_context
def task(ctx):
@@ -202,6 +80,8 @@
from swh.scheduler.utils import utcnow
+ from .utils import pretty_print_task
+
tasks = []
now = utcnow()
scheduler = ctx.obj["scheduler"]
@@ -261,39 +141,26 @@
which is considered as the lowest priority level.
"""
- from swh.scheduler.utils import utcnow
-
- from .utils import parse_options
+ from .utils import parse_options, task_add
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
- if scheduler.get_task_type(task_type_name) is None:
- raise ValueError(f"Unknown task type {task_type_name}.")
-
- now = utcnow()
+ task_type = scheduler.get_task_type(task_type_name)
+ if not task_type:
+ raise ValueError(f"Unknown task name {task_type_name}.")
(args, kw) = parse_options(options)
- task = {
- "type": task_type_name,
- "policy": policy,
- "priority": priority,
- "arguments": {
- "args": args,
- "kwargs": kw,
- },
- "next_run": next_run or now,
- }
- created = scheduler.create_tasks([task])
-
- output = [
- "Created %d tasks\n" % len(created),
- ]
- for task in created:
- output.append(pretty_print_task(task))
-
- click.echo("\n".join(output))
+ task_add(
+ scheduler,
+ task_type_name=task_type_name,
+ policy=policy,
+ priority=priority,
+ next_run=next_run,
+ args=args,
+ kw=kw,
+ )
def iter_origins( # use string annotations to prevent some pkg loading
@@ -412,6 +279,8 @@
You can override the number of tasks to fetch with the --limit flag.
"""
+ from .utils import pretty_print_task
+
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
@@ -511,6 +380,8 @@
"""List tasks."""
from operator import itemgetter
+ from .utils import pretty_print_run, pretty_print_task
+
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
diff --git a/swh/scheduler/cli/test_cli_utils.py b/swh/scheduler/cli/test_cli_utils.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/cli/test_cli_utils.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.scheduler.cli.utils import lister_task_type
+
+
+@pytest.mark.parametrize(
+ "lister_name,listing_type", [("foo", "full"), ("bar", "incremental")]
+)
+def test_lister_task_type(lister_name, listing_type):
+ assert lister_task_type(lister_name, listing_type) == (
+ f"list-{lister_name}-{listing_type}"
+ )
diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py
--- a/swh/scheduler/cli/utils.py
+++ b/swh/scheduler/cli/utils.py
@@ -13,7 +13,7 @@
import click
if TYPE_CHECKING:
- from typing import Dict, Optional
+ from typing import Dict, List, Optional, Tuple
from swh.scheduler.interface import SchedulerInterface
@@ -73,7 +73,7 @@
raise click.ClickException("Invalid argument: {}".format(option))
-def parse_options(options):
+def parse_options(options: List[str]) -> Tuple[List[str], Dict]:
"""Parses options from a CLI as YAML and turns it into Python
args and kwargs.
@@ -177,3 +177,160 @@
kwargs=task_dict["arguments"]["kwargs"],
queue=queue_name,
)
+
+
+def pretty_print_list(list, indent=0):
+ """Pretty-print a list"""
+ return "".join("%s%r\n" % (" " * indent, item) for item in list)
+
+
+def pretty_print_dict(dict, indent=0):
+ """Pretty-print a list"""
+ return "".join(
+ "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value)
+ for key, value in sorted(dict.items())
+ )
+
+
+def format_dict(d):
+ """Recursively format date objects in the dict passed as argument"""
+ import datetime
+
+ ret = {}
+ for k, v in d.items():
+ if isinstance(v, (datetime.date, datetime.datetime)):
+ v = v.isoformat()
+ elif isinstance(v, dict):
+ v = format_dict(v)
+ ret[k] = v
+ return ret
+
+
+def pretty_print_run(run, indent=4):
+ fmt = (
+ "{indent}{backend_id} [{status}]\n"
+ "{indent} scheduled: {scheduled} [{started}:{ended}]"
+ )
+ return fmt.format(indent=" " * indent, **format_dict(run))
+
+
+def pretty_print_task(task, full=False):
+ """Pretty-print a task
+
+ If 'full' is True, also print the status and priority fields.
+
+ >>> import datetime
+ >>> task = {
+ ... 'id': 1234,
+ ... 'arguments': {
+ ... 'args': ['foo', 'bar', True],
+ ... 'kwargs': {'key': 'value', 'key2': 42},
+ ... },
+ ... 'current_interval': datetime.timedelta(hours=1),
+ ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818),
+ ... 'policy': 'oneshot',
+ ... 'priority': None,
+ ... 'status': 'next_run_not_scheduled',
+ ... 'type': 'test_task',
+ ... }
+ >>> print(click.unstyle(pretty_print_task(task)))
+ Task 1234
+ Next run: ... (2019-02-21T13:52:35.407818)
+ Interval: 1:00:00
+ Type: test_task
+ Policy: oneshot
+ Args:
+ 'foo'
+ 'bar'
+ True
+ Keyword args:
+ key: 'value'
+ key2: 42
+ <BLANKLINE>
+ >>> print(click.unstyle(pretty_print_task(task, full=True)))
+ Task 1234
+ Next run: ... (2019-02-21T13:52:35.407818)
+ Interval: 1:00:00
+ Type: test_task
+ Policy: oneshot
+ Status: next_run_not_scheduled
+ Priority:\x20
+ Args:
+ 'foo'
+ 'bar'
+ True
+ Keyword args:
+ key: 'value'
+ key2: 42
+ <BLANKLINE>
+ """
+ import humanize
+
+ next_run = task["next_run"]
+ lines = [
+ "%s %s\n" % (click.style("Task", bold=True), task["id"]),
+ click.style(" Next run: ", bold=True),
+ "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()),
+ "\n",
+ click.style(" Interval: ", bold=True),
+ str(task["current_interval"]),
+ "\n",
+ click.style(" Type: ", bold=True),
+ task["type"] or "",
+ "\n",
+ click.style(" Policy: ", bold=True),
+ task["policy"] or "",
+ "\n",
+ ]
+ if full:
+ lines += [
+ click.style(" Status: ", bold=True),
+ task["status"] or "",
+ "\n",
+ click.style(" Priority: ", bold=True),
+ task["priority"] or "",
+ "\n",
+ ]
+ lines += [
+ click.style(" Args:\n", bold=True),
+ pretty_print_list(task["arguments"]["args"], indent=4),
+ click.style(" Keyword args:\n", bold=True),
+ pretty_print_dict(task["arguments"]["kwargs"], indent=4),
+ ]
+
+ return "".join(lines)
+
+
+def task_add(
+ scheduler: SchedulerInterface,
+ task_type_name: str,
+ args: List[str],
+ kw: Dict,
+ policy: str,
+ priority: Optional[str] = None,
+ next_run: Optional[str] = None,
+):
+ """Add a task task_type_name in the scheduler."""
+ from swh.scheduler.utils import utcnow
+
+ task = {
+ "type": task_type_name,
+ "policy": policy,
+ "priority": priority,
+ "arguments": {
+ "args": args,
+ "kwargs": kw,
+ },
+ "next_run": next_run or utcnow(),
+ }
+ created = scheduler.create_tasks([task])
+
+ output = [f"Created {len(created)} tasks\n"]
+ for task in created:
+ output.append(pretty_print_task(task))
+
+ click.echo("\n".join(output))
+
+
+def lister_task_type(lister_name: str, lister_type: str) -> str:
+ return f"list-{lister_name}-{lister_type}"
diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py
--- a/swh/scheduler/tests/test_cli_add_forge_now.py
+++ b/swh/scheduler/tests/test_cli_add_forge_now.py
@@ -3,11 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Tuple
+from typing import Dict, Tuple
import attr
import pytest
+from swh.scheduler.cli.utils import lister_task_type
+from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.tests.common import TASK_TYPES
from swh.scheduler.tests.test_cli import invoke as basic_invoke
@@ -36,11 +38,11 @@
@pytest.mark.parametrize(
- "extra_cmd_args",
+ "cmd_args, subcmd_args",
[
- [],
- ["--lister-name", "github", "--lister-instance-name", "github"],
- ["--staging"],
+ ([], []),
+ ([], ["--lister-name", "github", "--lister-instance-name", "github"]),
+ (["--preset", "staging"], []),
],
)
def test_schedule_first_visits_cli(
@@ -48,7 +50,8 @@
swh_scheduler,
swh_scheduler_celery_app,
listed_origins_by_type,
- extra_cmd_args,
+ cmd_args,
+ subcmd_args,
):
for task_type in TASK_TYPES.values():
swh_scheduler.create_task_type(task_type)
@@ -56,7 +59,7 @@
visit_type = next(iter(listed_origins_by_type))
# enabled origins by default except when --staging flag is provided
- enabled = "--staging" not in extra_cmd_args
+ enabled = "staging" not in cmd_args
for origins in listed_origins_by_type.values():
swh_scheduler.record_listed_origins(
@@ -71,9 +74,11 @@
send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task")
send_task.return_value = None
- cmd_args = ["schedule-first-visits", "--type-name", visit_type] + extra_cmd_args
+ command_args = (
+ cmd_args + ["schedule-first-visits", "--type-name", visit_type] + subcmd_args
+ )
- result = invoke(swh_scheduler, args=tuple(cmd_args))
+ result = invoke(swh_scheduler, args=tuple(command_args))
assert result.exit_code == 0
scheduled_tasks = {
@@ -85,4 +90,75 @@
for origin in listed_origins_by_type[visit_type]
}
- assert expected_tasks == scheduled_tasks
+ assert scheduled_tasks == expected_tasks
+
+
+def _create_task_type(
+ swh_scheduler: SchedulerInterface, lister_name: str, listing_type: str = "full"
+) -> Dict:
+ task_type = {
+ "type": lister_task_type(lister_name, listing_type), # only relevant bit
+ "description": f"{listing_type} listing",
+ "backend_name": "swh.example.backend",
+ "default_interval": "1 day",
+ "min_interval": "1 day",
+ "max_interval": "1 day",
+ "backoff_factor": "1",
+ "max_queue_length": "100",
+ "num_retries": 3,
+ }
+ swh_scheduler.create_task_type(task_type)
+ task_type = swh_scheduler.get_task_type(task_type["type"])
+ assert task_type is not None
+ return task_type
+
+
+@pytest.mark.parametrize("preset", ["staging", "production"])
+def test_schedule_register_lister(swh_scheduler, stored_lister, preset):
+ # given
+ assert stored_lister is not None
+ lister_name = stored_lister.name
+ # Let's create all possible associated lister task types
+ full = _create_task_type(swh_scheduler, lister_name, "full")
+ incremental = _create_task_type(swh_scheduler, lister_name, "incremental")
+
+ # Let's trigger the registering of that lister
+ result = invoke(
+ swh_scheduler,
+ [
+ "--preset",
+ preset,
+ "register-lister",
+ lister_name,
+ "url=https://example.org",
+ ],
+ )
+
+ output = result.output.lstrip()
+
+ expected_msgs = []
+ if preset == "production":
+ # 2 tasks: 1 full + 1 incremental (tomorrow) with recurring policy
+ expected_msgs = ["Policy: recurring", incremental["type"], "Next run: tomorrow"]
+ else:
+ # 1 task full with policy oneshot
+ expected_msgs = ["Policy: oneshot"]
+
+ # In any case, there is the full listing type too
+ expected_msgs.append(full["type"])
+
+ assert len(expected_msgs) > 0
+ for msg in expected_msgs:
+ assert msg in output
+
+
+def test_register_lister_unknown_task_type(swh_scheduler):
+ """When scheduling unknown task type, the cli should raise."""
+ with pytest.raises(ValueError, match="Unknown"):
+ invoke(
+ swh_scheduler,
+ [
+ "register-lister",
+ "unknown-lister-type-should-raise",
+ ],
+ )
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 10:03 AM (18 h, 57 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214898
Attached To
D8940: cli.add_forge_now: Open `register-lister` with sensible defaults
Event Timeline
Log In to Comment