Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9696122
D8936.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D8936.id.diff
View Options
diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/cli/add_forge_now.py
@@ -0,0 +1,91 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import click
+
+from . import cli
+
+if TYPE_CHECKING:
+ from typing import Dict, List, Optional
+
+
+@cli.group("add-forge-now")
+@click.pass_context
+def add_forge_now(ctx):
+ """Manipulate listed origins."""
+ if not ctx.obj["scheduler"]:
+ raise ValueError("Scheduler class (local/remote) must be instantiated")
+
+
+@add_forge_now.command("schedule-first-visits")
+@click.option(
+ "--type-name",
+ "-t",
+ "visit_type_names",
+ help="Visit/loader type (can be provided multiple times)",
+ type=str,
+ multiple=True,
+)
+@click.option(
+ "--production/--staging",
+ "enabled",
+ is_flag=True,
+ default=True,
+ help="""Determine whether we want to scheduled enabled origins (on production) or
+ disabled ones (on staging).""",
+)
+@click.option(
+ "--lister-name",
+ default=None,
+ help="Limit origins to those listed from lister with provided name",
+)
+@click.option(
+ "--lister-instance-name",
+ default=None,
+ help="Limit origins to those listed from lister with instance name",
+)
+@click.pass_context
+def schedule_first_visits_cli(
+ ctx,
+ visit_type_names: List[str],
+ enabled: bool,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
+):
+ """Send next origin visits of VISIT_TYPE_NAME(S) loader to celery, filling the
+ associated add_forge_now queue(s).
+
+ """
+ from .utils import get_task_type, send_to_celery
+
+ scheduler = ctx.obj["scheduler"]
+
+ visit_type_to_queue: Dict[str, str] = {}
+ unknown_task_types = []
+ for visit_type_name in visit_type_names:
+ task_type = get_task_type(scheduler, visit_type_name)
+ if not task_type:
+ unknown_task_types.append(visit_type_name)
+ continue
+ queue_name = task_type["backend_name"]
+ visit_type_to_queue[visit_type_name] = f"add_forge_now:{queue_name}"
+
+ if unknown_task_types:
+ raise ValueError(f"Unknown task types {','.join(unknown_task_types)}.")
+
+ send_to_celery(
+ scheduler,
+ visit_type_to_queue=visit_type_to_queue,
+ enabled=enabled,
+ lister_name=lister_name,
+ lister_instance_name=lister_instance_name,
+ )
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -1,11 +1,11 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
-from typing import TYPE_CHECKING, Iterable, List, Optional
+from typing import TYPE_CHECKING
import click
@@ -13,6 +13,7 @@
from ..utils import create_origin_task_dicts
if TYPE_CHECKING:
+ from typing import Iterable, List, Optional
from uuid import UUID
from ..interface import SchedulerInterface
@@ -180,59 +181,40 @@
default=None,
help="Limit origins to those listed from lister with instance name",
)
-@click.argument("type", type=str)
+@click.argument("visit_type_name", type=str)
@click.pass_context
-def send_to_celery(
+def send_to_celery_cli(
ctx,
policy: str,
queue: Optional[str],
tablesample: Optional[float],
- type: str,
+ visit_type_name: str,
enabled: bool,
lister_name: Optional[str] = None,
lister_instance_name: Optional[str] = None,
):
- """Send the next origin visits of the TYPE loader to celery, filling the queue."""
- from kombu.utils.uuid import uuid
+ """Send next origin visits of VISIT_TYPE_NAME to celery, filling the queue."""
- from swh.scheduler.celery_backend.config import app, get_available_slots
+ from .utils import get_task_type, send_to_celery
scheduler = ctx.obj["scheduler"]
- task_type = scheduler.get_task_type(f"load-{type}")
-
- task_name = task_type["backend_name"]
- queue_name = queue or task_name
-
- num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
+ task_type = get_task_type(scheduler, visit_type_name)
+ if not task_type:
+ raise ValueError(f"Unknown task type {task_type}.")
- click.echo(f"{num_tasks} slots available in celery queue")
+ queue_name = queue or task_type["backend_name"]
- lister_uuid: Optional[str] = None
- if lister_name and lister_instance_name:
- lister = scheduler.get_lister(lister_name, lister_instance_name)
- if lister:
- lister_uuid = lister.id
-
- origins = scheduler.grab_next_visits(
- type,
- num_tasks,
+ send_to_celery(
+ scheduler,
+ visit_type_to_queue={visit_type_name: queue_name},
policy=policy,
tablesample=tablesample,
enabled=enabled,
- lister_uuid=lister_uuid,
+ lister_name=lister_name,
+ lister_instance_name=lister_instance_name,
)
- click.echo(f"{len(origins)} visits to send to celery")
- for task_dict in create_origin_task_dicts(origins, scheduler):
- app.send_task(
- task_name,
- task_id=uuid(),
- args=task_dict["arguments"]["args"],
- kwargs=task_dict["arguments"]["kwargs"],
- queue=queue_name,
- )
-
@origin.command("update-metrics")
@click.option("--lister", default=None, help="Only update metrics for this lister")
diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py
--- a/swh/scheduler/cli/utils.py
+++ b/swh/scheduler/cli/utils.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -6,8 +6,18 @@
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
import click
+if TYPE_CHECKING:
+ from typing import Dict, Optional
+
+ from swh.scheduler.interface import SchedulerInterface
+
+
TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler
@@ -100,3 +110,70 @@
args = [parse_argument(x) for x in options if "=" not in x]
kw = {k: parse_argument(v) for (k, v) in kw_pairs}
return (args, kw)
+
+
+def get_task_type(scheduler: SchedulerInterface, visit_type: str) -> Optional[Dict]:
+ "Given a visit type, return its associated task type."
+ return scheduler.get_task_type(f"load-{visit_type}")
+
+
+def send_to_celery(
+ scheduler: SchedulerInterface,
+ visit_type_to_queue: Dict[str, str],
+ enabled: bool = True,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
+ policy: str = "oldest_scheduled_first",
+ tablesample: Optional[float] = None,
+):
+ """Utility function to read tasks from the scheduler and send those directly to
+ celery.
+
+ Args:
+ visit_type_to_queue: Optional mapping of visit/loader type (e.g git, svn, ...)
+ to queue to send task to.
+ enabled: Determine whether we want to list enabled or disabled origins. As
+ default, we want reasonably enabled origins. For some edge case, we might
+ want the others.
+ lister_name: Determine the list of origins listed from the lister with name
+ lister_instance_name: Determine the list of origins listed from the lister
+ with instance name
+ policy: the scheduling policy used to select which visits to schedule
+ tablesample: the percentage of the table on which we run the query
+ (None: no sampling)
+
+ """
+
+ from kombu.utils.uuid import uuid
+
+ from swh.scheduler.celery_backend.config import app, get_available_slots
+
+ from ..utils import create_origin_task_dicts
+
+ for visit_type_name, queue_name in visit_type_to_queue.items():
+ task_type = get_task_type(scheduler, visit_type_name)
+ assert task_type is not None
+ task_name = task_type["backend_name"]
+ num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
+
+ click.echo(f"{num_tasks} slots available in celery queue")
+
+ origins = scheduler.grab_next_visits(
+ visit_type_name,
+ num_tasks,
+ policy=policy,
+ tablesample=tablesample,
+ enabled=enabled,
+ lister_name=lister_name,
+ lister_instance_name=lister_instance_name,
+ )
+
+ click.echo(f"{len(origins)} visits to send to celery")
+ for task_dict in create_origin_task_dicts(origins, scheduler):
+ app.send_task(
+ task_name,
+ task_id=uuid(),
+ args=task_dict["arguments"]["args"],
+ kwargs=task_dict["arguments"]["kwargs"],
+ queue=queue_name,
+ )
diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_add_forge_now.py
@@ -0,0 +1,88 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Tuple
+
+import attr
+import pytest
+
+from swh.scheduler.tests.common import TASK_TYPES
+from swh.scheduler.tests.test_cli import invoke as basic_invoke
+
+
+def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False):
+ return basic_invoke(
+ scheduler, args=["add-forge-now", *args], catch_exceptions=catch_exceptions
+ )
+
+
+def test_schedule_first_visits_cli_unknown_visit_type(
+ swh_scheduler,
+):
+ "Calling cli without a known visit type should raise"
+ with pytest.raises(ValueError, match="Unknown"):
+ invoke(
+ swh_scheduler,
+ args=(
+ "schedule-first-visits",
+ "-t",
+ "unknown-vt0",
+ "--type-name",
+ "unknown-visit-type1",
+ ),
+ )
+
+
+@pytest.mark.parametrize(
+ "extra_cmd_args",
+ [
+ [],
+ ["--lister-name", "github", "--lister-instance-name", "github"],
+ ["--staging"],
+ ],
+)
+def test_schedule_first_visits_cli(
+ mocker,
+ swh_scheduler,
+ swh_scheduler_celery_app,
+ listed_origins_by_type,
+ extra_cmd_args,
+):
+ for task_type in TASK_TYPES.values():
+ swh_scheduler.create_task_type(task_type)
+
+ visit_type = next(iter(listed_origins_by_type))
+
+ # enabled origins by default except when --staging flag is provided
+ enabled = "--staging" not in extra_cmd_args
+
+ for origins in listed_origins_by_type.values():
+ swh_scheduler.record_listed_origins(
+ (attr.evolve(o, enabled=enabled) for o in origins)
+ )
+
+ get_queue_length = mocker.patch(
+ "swh.scheduler.celery_backend.config.get_queue_length"
+ )
+ get_queue_length.return_value = None
+
+ send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task")
+ send_task.return_value = None
+
+ cmd_args = ["schedule-first-visits", "--type-name", visit_type] + extra_cmd_args
+
+ result = invoke(swh_scheduler, args=tuple(cmd_args))
+ assert result.exit_code == 0
+
+ scheduled_tasks = {
+ (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list
+ }
+
+ expected_tasks = {
+ (TASK_TYPES[origin.visit_type]["backend_name"], origin.url)
+ for origin in listed_origins_by_type[visit_type]
+ }
+
+ assert expected_tasks == scheduled_tasks
diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py
--- a/swh/scheduler/tests/test_cli_origin.py
+++ b/swh/scheduler/tests/test_cli_origin.py
@@ -114,6 +114,14 @@
assert scheduled_tasks <= all_possible_tasks
+def test_send_to_celery_unknown_visit_type(
+ swh_scheduler,
+):
+ "Calling cli without a known visit type should raise"
+ with pytest.raises(ValueError, match="Unknown"):
+ invoke(swh_scheduler, args=("send-to-celery", "unknown-visit-type"))
+
+
@pytest.mark.parametrize(
"extra_cmd_args",
[[], ["--lister-name", "github", "--lister-instance-name", "github"]],
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 17, 7:00 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214909
Attached To
D8936: cli.add_forge_now: Open `schedule-first-visits` with sensible defaults
Event Timeline
Log In to Comment