Page MenuHomeSoftware Heritage

D8940.id32243.diff
No OneTemporary

D8940.id32243.diff

diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,6 +1,5 @@
pytest
pytest-mock
-celery >= 4.3
hypothesis >= 3.11.0
swh.lister
swh.storage[testing]
diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py
--- a/swh/scheduler/cli/__init__.py
+++ b/swh/scheduler/cli/__init__.py
@@ -85,7 +85,16 @@
ctx.obj["config"] = conf
-from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa
+from . import ( # noqa
+ add_forge_now,
+ admin,
+ celery_monitor,
+ journal,
+ origin,
+ simulator,
+ task,
+ task_type,
+)
def main():
diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py
--- a/swh/scheduler/cli/add_forge_now.py
+++ b/swh/scheduler/cli/add_forge_now.py
@@ -19,12 +19,90 @@
@cli.group("add-forge-now")
+@click.option(
+ "-p",
+ "--preset",
+ "preset",
+ default="production",
+ type=click.Choice(["production", "staging"]),
+ help='Determine preset to use, "production" by default.',
+)
@click.pass_context
-def add_forge_now(ctx):
- """Manipulate listed origins."""
+def add_forge_now(ctx, preset):
+ """Manipulate add-forge-now requests."""
if not ctx.obj["scheduler"]:
raise ValueError("Scheduler class (local/remote) must be instantiated")
+ ctx.obj["preset"] = preset
+
+
+@add_forge_now.command("register-lister")
+@click.argument("lister_name", nargs=1, required=True)
+@click.argument("options", nargs=-1)
+@click.pass_context
+def register_lister_cli(
+ ctx,
+ lister_name,
+ options,
+):
+ """Register the lister tasks in the scheduler.
+
+ Depending on the preset, policies vary:
+ - staging: only "full" but limited listing (as "oneshot" task) of disabled
+ origins
+ - production: both "full" and "incremental" (if that exists) listing (as "recurring"
+ task). The "full" will be triggered asap, the "incremental" will be triggered the
+ next day.
+
+ """
+ from .utils import lister_task_type, parse_options, task_add
+
+ scheduler = ctx.obj["scheduler"]
+ preset = ctx.obj["preset"]
+
+ # Map the associated task types for the lister
+ task_type_names: Dict[str, str] = {
+ listing_type: lister_task_type(lister_name, listing_type)
+ for listing_type in ["full", "incremental"]
+ }
+
+ task_types: Dict[str, Dict] = {}
+ for listing_type, task_type_name in task_type_names.items():
+ task_type = scheduler.get_task_type(task_type_name)
+ if task_type:
+ task_types[listing_type] = task_type
+
+ if not task_types:
+ raise ValueError(f"Unknown lister type {lister_name}.")
+
+ (args, kw) = parse_options(options)
+
+ # Recurring policy on production
+ if preset == "production":
+ policy = "recurring"
+ else: # staging, "full" but limited listing as a oneshot
+ policy = "oneshot"
+ kw.update({"max_pages": 3, "max_origins_per_page": 10, "enable_origins": False})
+ # We want a "full" listing in production if both incremental and full exists
+ if "full" in task_types:
+ task_types.pop("incremental", None)
+
+ from datetime import timedelta
+
+ from swh.scheduler.utils import utcnow
+
+ for listing_type, task_type in task_types.items():
+ now = utcnow()
+ next_run = now if listing_type == "full" else now + timedelta(days=1)
+ task_add(
+ scheduler,
+ task_type_name=task_type["type"],
+ args=args,
+ kw=kw,
+ policy=policy,
+ next_run=next_run,
+ )
+
@add_forge_now.command("schedule-first-visits")
@click.option(
@@ -35,14 +113,6 @@
type=str,
multiple=True,
)
-@click.option(
- "--production/--staging",
- "enabled",
- is_flag=True,
- default=True,
- help="""Determine whether we want to scheduled enabled origins (on production) or
- disabled ones (on staging).""",
-)
@click.option(
"--lister-name",
default=None,
@@ -57,7 +127,6 @@
def schedule_first_visits_cli(
ctx,
visit_type_names: List[str],
- enabled: bool,
lister_name: Optional[str] = None,
lister_instance_name: Optional[str] = None,
):
@@ -68,6 +137,7 @@
from .utils import get_task_type, send_to_celery
scheduler = ctx.obj["scheduler"]
+ preset = ctx.obj["preset"]
visit_type_to_queue: Dict[str, str] = {}
unknown_task_types = []
@@ -85,7 +155,7 @@
send_to_celery(
scheduler,
visit_type_to_queue=visit_type_to_queue,
- enabled=enabled,
+ enabled=preset == "production",
lister_name=lister_name,
lister_instance_name=lister_instance_name,
)
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -122,7 +122,7 @@
"""Send the next COUNT origin visits of the TYPE loader to the scheduler as
one-shot tasks."""
from ..utils import utcnow
- from .task import pretty_print_task
+ from .utils import pretty_print_task
scheduler = ctx.obj["scheduler"]
diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py
--- a/swh/scheduler/cli/task.py
+++ b/swh/scheduler/cli/task.py
@@ -27,128 +27,6 @@
DATETIME = click.DateTime()
-def format_dict(d):
- """Recursively format date objects in the dict passed as argument"""
- import datetime
-
- ret = {}
- for k, v in d.items():
- if isinstance(v, (datetime.date, datetime.datetime)):
- v = v.isoformat()
- elif isinstance(v, dict):
- v = format_dict(v)
- ret[k] = v
- return ret
-
-
-def pretty_print_list(list, indent=0):
- """Pretty-print a list"""
- return "".join("%s%r\n" % (" " * indent, item) for item in list)
-
-
-def pretty_print_dict(dict, indent=0):
- """Pretty-print a list"""
- return "".join(
- "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value)
- for key, value in sorted(dict.items())
- )
-
-
-def pretty_print_run(run, indent=4):
- fmt = (
- "{indent}{backend_id} [{status}]\n"
- "{indent} scheduled: {scheduled} [{started}:{ended}]"
- )
- return fmt.format(indent=" " * indent, **format_dict(run))
-
-
-def pretty_print_task(task, full=False):
- """Pretty-print a task
-
- If 'full' is True, also print the status and priority fields.
-
- >>> import datetime
- >>> task = {
- ... 'id': 1234,
- ... 'arguments': {
- ... 'args': ['foo', 'bar', True],
- ... 'kwargs': {'key': 'value', 'key2': 42},
- ... },
- ... 'current_interval': datetime.timedelta(hours=1),
- ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818),
- ... 'policy': 'oneshot',
- ... 'priority': None,
- ... 'status': 'next_run_not_scheduled',
- ... 'type': 'test_task',
- ... }
- >>> print(click.unstyle(pretty_print_task(task)))
- Task 1234
- Next run: ... (2019-02-21T13:52:35.407818)
- Interval: 1:00:00
- Type: test_task
- Policy: oneshot
- Args:
- 'foo'
- 'bar'
- True
- Keyword args:
- key: 'value'
- key2: 42
- <BLANKLINE>
- >>> print(click.unstyle(pretty_print_task(task, full=True)))
- Task 1234
- Next run: ... (2019-02-21T13:52:35.407818)
- Interval: 1:00:00
- Type: test_task
- Policy: oneshot
- Status: next_run_not_scheduled
- Priority:\x20
- Args:
- 'foo'
- 'bar'
- True
- Keyword args:
- key: 'value'
- key2: 42
- <BLANKLINE>
- """
- import humanize
-
- next_run = task["next_run"]
- lines = [
- "%s %s\n" % (click.style("Task", bold=True), task["id"]),
- click.style(" Next run: ", bold=True),
- "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()),
- "\n",
- click.style(" Interval: ", bold=True),
- str(task["current_interval"]),
- "\n",
- click.style(" Type: ", bold=True),
- task["type"] or "",
- "\n",
- click.style(" Policy: ", bold=True),
- task["policy"] or "",
- "\n",
- ]
- if full:
- lines += [
- click.style(" Status: ", bold=True),
- task["status"] or "",
- "\n",
- click.style(" Priority: ", bold=True),
- task["priority"] or "",
- "\n",
- ]
- lines += [
- click.style(" Args:\n", bold=True),
- pretty_print_list(task["arguments"]["args"], indent=4),
- click.style(" Keyword args:\n", bold=True),
- pretty_print_dict(task["arguments"]["kwargs"], indent=4),
- ]
-
- return "".join(lines)
-
-
@cli.group("task")
@click.pass_context
def task(ctx):
@@ -202,6 +80,8 @@
from swh.scheduler.utils import utcnow
+ from .utils import pretty_print_task
+
tasks = []
now = utcnow()
scheduler = ctx.obj["scheduler"]
@@ -261,39 +141,26 @@
which is considered as the lowest priority level.
"""
- from swh.scheduler.utils import utcnow
-
- from .utils import parse_options
+ from .utils import parse_options, task_add
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
- if scheduler.get_task_type(task_type_name) is None:
- raise ValueError(f"Unknown task type {task_type_name}.")
-
- now = utcnow()
+ task_type = scheduler.get_task_type(task_type_name)
+ if not task_type:
+ raise ValueError(f"Unknown task name {task_type_name}.")
(args, kw) = parse_options(options)
- task = {
- "type": task_type_name,
- "policy": policy,
- "priority": priority,
- "arguments": {
- "args": args,
- "kwargs": kw,
- },
- "next_run": next_run or now,
- }
- created = scheduler.create_tasks([task])
-
- output = [
- "Created %d tasks\n" % len(created),
- ]
- for task in created:
- output.append(pretty_print_task(task))
-
- click.echo("\n".join(output))
+ task_add(
+ scheduler,
+ task_type_name=task_type_name,
+ policy=policy,
+ priority=priority,
+ next_run=next_run,
+ args=args,
+ kw=kw,
+ )
def iter_origins( # use string annotations to prevent some pkg loading
@@ -412,6 +279,8 @@
You can override the number of tasks to fetch with the --limit flag.
"""
+ from .utils import pretty_print_task
+
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
@@ -511,6 +380,8 @@
"""List tasks."""
from operator import itemgetter
+ from .utils import pretty_print_run, pretty_print_task
+
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
diff --git a/swh/scheduler/cli/test_cli_utils.py b/swh/scheduler/cli/test_cli_utils.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/cli/test_cli_utils.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.scheduler.cli.utils import lister_task_type
+
+
+@pytest.mark.parametrize(
+ "lister_name,listing_type", [("foo", "full"), ("bar", "incremental")]
+)
+def test_lister_task_type(lister_name, listing_type):
+ assert lister_task_type(lister_name, listing_type) == (
+ f"list-{lister_name}-{listing_type}"
+ )
diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py
--- a/swh/scheduler/cli/utils.py
+++ b/swh/scheduler/cli/utils.py
@@ -13,7 +13,7 @@
import click
if TYPE_CHECKING:
- from typing import Dict, Optional
+ from typing import Dict, List, Optional, Tuple
from swh.scheduler.interface import SchedulerInterface
@@ -73,7 +73,7 @@
raise click.ClickException("Invalid argument: {}".format(option))
-def parse_options(options):
+def parse_options(options: List[str]) -> Tuple[List[str], Dict]:
"""Parses options from a CLI as YAML and turns it into Python
args and kwargs.
@@ -177,3 +177,160 @@
kwargs=task_dict["arguments"]["kwargs"],
queue=queue_name,
)
+
+
+def pretty_print_list(list, indent=0):
+ """Pretty-print a list"""
+ return "".join("%s%r\n" % (" " * indent, item) for item in list)
+
+
+def pretty_print_dict(dict, indent=0):
+ """Pretty-print a list"""
+ return "".join(
+ "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value)
+ for key, value in sorted(dict.items())
+ )
+
+
+def format_dict(d):
+ """Recursively format date objects in the dict passed as argument"""
+ import datetime
+
+ ret = {}
+ for k, v in d.items():
+ if isinstance(v, (datetime.date, datetime.datetime)):
+ v = v.isoformat()
+ elif isinstance(v, dict):
+ v = format_dict(v)
+ ret[k] = v
+ return ret
+
+
+def pretty_print_run(run, indent=4):
+ fmt = (
+ "{indent}{backend_id} [{status}]\n"
+ "{indent} scheduled: {scheduled} [{started}:{ended}]"
+ )
+ return fmt.format(indent=" " * indent, **format_dict(run))
+
+
+def pretty_print_task(task, full=False):
+ """Pretty-print a task
+
+ If 'full' is True, also print the status and priority fields.
+
+ >>> import datetime
+ >>> task = {
+ ... 'id': 1234,
+ ... 'arguments': {
+ ... 'args': ['foo', 'bar', True],
+ ... 'kwargs': {'key': 'value', 'key2': 42},
+ ... },
+ ... 'current_interval': datetime.timedelta(hours=1),
+ ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818),
+ ... 'policy': 'oneshot',
+ ... 'priority': None,
+ ... 'status': 'next_run_not_scheduled',
+ ... 'type': 'test_task',
+ ... }
+ >>> print(click.unstyle(pretty_print_task(task)))
+ Task 1234
+ Next run: ... (2019-02-21T13:52:35.407818)
+ Interval: 1:00:00
+ Type: test_task
+ Policy: oneshot
+ Args:
+ 'foo'
+ 'bar'
+ True
+ Keyword args:
+ key: 'value'
+ key2: 42
+ <BLANKLINE>
+ >>> print(click.unstyle(pretty_print_task(task, full=True)))
+ Task 1234
+ Next run: ... (2019-02-21T13:52:35.407818)
+ Interval: 1:00:00
+ Type: test_task
+ Policy: oneshot
+ Status: next_run_not_scheduled
+ Priority:\x20
+ Args:
+ 'foo'
+ 'bar'
+ True
+ Keyword args:
+ key: 'value'
+ key2: 42
+ <BLANKLINE>
+ """
+ import humanize
+
+ next_run = task["next_run"]
+ lines = [
+ "%s %s\n" % (click.style("Task", bold=True), task["id"]),
+ click.style(" Next run: ", bold=True),
+ "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()),
+ "\n",
+ click.style(" Interval: ", bold=True),
+ str(task["current_interval"]),
+ "\n",
+ click.style(" Type: ", bold=True),
+ task["type"] or "",
+ "\n",
+ click.style(" Policy: ", bold=True),
+ task["policy"] or "",
+ "\n",
+ ]
+ if full:
+ lines += [
+ click.style(" Status: ", bold=True),
+ task["status"] or "",
+ "\n",
+ click.style(" Priority: ", bold=True),
+ task["priority"] or "",
+ "\n",
+ ]
+ lines += [
+ click.style(" Args:\n", bold=True),
+ pretty_print_list(task["arguments"]["args"], indent=4),
+ click.style(" Keyword args:\n", bold=True),
+ pretty_print_dict(task["arguments"]["kwargs"], indent=4),
+ ]
+
+ return "".join(lines)
+
+
+def task_add(
+ scheduler: SchedulerInterface,
+ task_type_name: str,
+ args: List[str],
+ kw: Dict,
+ policy: str,
+ priority: Optional[str] = None,
+ next_run: Optional[str] = None,
+):
+ """Add a task task_type_name in the scheduler."""
+ from swh.scheduler.utils import utcnow
+
+ task = {
+ "type": task_type_name,
+ "policy": policy,
+ "priority": priority,
+ "arguments": {
+ "args": args,
+ "kwargs": kw,
+ },
+ "next_run": next_run or utcnow(),
+ }
+ created = scheduler.create_tasks([task])
+
+ output = [f"Created {len(created)} tasks\n"]
+ for task in created:
+ output.append(pretty_print_task(task))
+
+ click.echo("\n".join(output))
+
+
+def lister_task_type(lister_name: str, lister_type: str) -> str:
+ return f"list-{lister_name}-{lister_type}"
diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py
--- a/swh/scheduler/tests/test_cli_add_forge_now.py
+++ b/swh/scheduler/tests/test_cli_add_forge_now.py
@@ -3,11 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Tuple
+from typing import Dict, Tuple
import attr
import pytest
+from swh.scheduler.cli.utils import lister_task_type
+from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.tests.common import TASK_TYPES
from swh.scheduler.tests.test_cli import invoke as basic_invoke
@@ -36,11 +38,11 @@
@pytest.mark.parametrize(
- "extra_cmd_args",
+ "cmd_args, subcmd_args",
[
- [],
- ["--lister-name", "github", "--lister-instance-name", "github"],
- ["--staging"],
+ ([], []),
+ ([], ["--lister-name", "github", "--lister-instance-name", "github"]),
+ (["--preset", "staging"], []),
],
)
def test_schedule_first_visits_cli(
@@ -48,7 +50,8 @@
swh_scheduler,
swh_scheduler_celery_app,
listed_origins_by_type,
- extra_cmd_args,
+ cmd_args,
+ subcmd_args,
):
for task_type in TASK_TYPES.values():
swh_scheduler.create_task_type(task_type)
@@ -56,7 +59,7 @@
visit_type = next(iter(listed_origins_by_type))
# enabled origins by default except when --staging flag is provided
- enabled = "--staging" not in extra_cmd_args
+ enabled = "staging" not in cmd_args
for origins in listed_origins_by_type.values():
swh_scheduler.record_listed_origins(
@@ -71,9 +74,11 @@
send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task")
send_task.return_value = None
- cmd_args = ["schedule-first-visits", "--type-name", visit_type] + extra_cmd_args
+ command_args = (
+ cmd_args + ["schedule-first-visits", "--type-name", visit_type] + subcmd_args
+ )
- result = invoke(swh_scheduler, args=tuple(cmd_args))
+ result = invoke(swh_scheduler, args=tuple(command_args))
assert result.exit_code == 0
scheduled_tasks = {
@@ -85,4 +90,75 @@
for origin in listed_origins_by_type[visit_type]
}
- assert expected_tasks == scheduled_tasks
+ assert scheduled_tasks == expected_tasks
+
+
+def _create_task_type(
+ swh_scheduler: SchedulerInterface, lister_name: str, listing_type: str = "full"
+) -> Dict:
+ task_type = {
+ "type": lister_task_type(lister_name, listing_type), # only relevant bit
+ "description": f"{listing_type} listing",
+ "backend_name": "swh.example.backend",
+ "default_interval": "1 day",
+ "min_interval": "1 day",
+ "max_interval": "1 day",
+ "backoff_factor": "1",
+ "max_queue_length": "100",
+ "num_retries": 3,
+ }
+ swh_scheduler.create_task_type(task_type)
+ task_type = swh_scheduler.get_task_type(task_type["type"])
+ assert task_type is not None
+ return task_type
+
+
+@pytest.mark.parametrize("preset", ["staging", "production"])
+def test_schedule_register_lister(swh_scheduler, stored_lister, preset):
+ # given
+ assert stored_lister is not None
+ lister_name = stored_lister.name
+ # Let's create all possible associated lister task types
+ full = _create_task_type(swh_scheduler, lister_name, "full")
+ incremental = _create_task_type(swh_scheduler, lister_name, "incremental")
+
+ # Let's trigger the registering of that lister
+ result = invoke(
+ swh_scheduler,
+ [
+ "--preset",
+ preset,
+ "register-lister",
+ lister_name,
+ "url=https://example.org",
+ ],
+ )
+
+ output = result.output.lstrip()
+
+ expected_msgs = []
+ if preset == "production":
+ # 2 tasks: 1 full + 1 incremental (tomorrow) with recurring policy
+ expected_msgs = ["Policy: recurring", incremental["type"], "Next run: tomorrow"]
+ else:
+ # 1 task full with policy oneshot
+ expected_msgs = ["Policy: oneshot"]
+
+ # In any case, there is the full listing type too
+ expected_msgs.append(full["type"])
+
+ assert len(expected_msgs) > 0
+ for msg in expected_msgs:
+ assert msg in output
+
+
+def test_register_lister_unknown_task_type(swh_scheduler):
+ """When scheduling unknown task type, the cli should raise."""
+ with pytest.raises(ValueError, match="Unknown"):
+ invoke(
+ swh_scheduler,
+ [
+ "register-lister",
+ "unknown-lister-type-should-raise",
+ ],
+ )

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 10:03 AM (18 h, 57 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214898

Event Timeline