Page MenuHomeSoftware Heritage

D8936.id.diff
No OneTemporary

D8936.id.diff

diff --git a/swh/scheduler/cli/add_forge_now.py b/swh/scheduler/cli/add_forge_now.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/cli/add_forge_now.py
@@ -0,0 +1,91 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import click
+
+from . import cli
+
+if TYPE_CHECKING:
+ from typing import Dict, List, Optional
+
+
+@cli.group("add-forge-now")
+@click.pass_context
+def add_forge_now(ctx):
+ """Manipulate listed origins."""
+ if not ctx.obj["scheduler"]:
+ raise ValueError("Scheduler class (local/remote) must be instantiated")
+
+
+@add_forge_now.command("schedule-first-visits")
+@click.option(
+ "--type-name",
+ "-t",
+ "visit_type_names",
+ help="Visit/loader type (can be provided multiple times)",
+ type=str,
+ multiple=True,
+)
+@click.option(
+ "--production/--staging",
+ "enabled",
+ is_flag=True,
+ default=True,
+ help="""Determine whether we want to scheduled enabled origins (on production) or
+ disabled ones (on staging).""",
+)
+@click.option(
+ "--lister-name",
+ default=None,
+ help="Limit origins to those listed from lister with provided name",
+)
+@click.option(
+ "--lister-instance-name",
+ default=None,
+ help="Limit origins to those listed from lister with instance name",
+)
+@click.pass_context
+def schedule_first_visits_cli(
+ ctx,
+ visit_type_names: List[str],
+ enabled: bool,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
+):
+ """Send next origin visits of VISIT_TYPE_NAME(S) loader to celery, filling the
+ associated add_forge_now queue(s).
+
+ """
+ from .utils import get_task_type, send_to_celery
+
+ scheduler = ctx.obj["scheduler"]
+
+ visit_type_to_queue: Dict[str, str] = {}
+ unknown_task_types = []
+ for visit_type_name in visit_type_names:
+ task_type = get_task_type(scheduler, visit_type_name)
+ if not task_type:
+ unknown_task_types.append(visit_type_name)
+ continue
+ queue_name = task_type["backend_name"]
+ visit_type_to_queue[visit_type_name] = f"add_forge_now:{queue_name}"
+
+ if unknown_task_types:
+ raise ValueError(f"Unknown task types {','.join(unknown_task_types)}.")
+
+ send_to_celery(
+ scheduler,
+ visit_type_to_queue=visit_type_to_queue,
+ enabled=enabled,
+ lister_name=lister_name,
+ lister_instance_name=lister_instance_name,
+ )
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -1,11 +1,11 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
-from typing import TYPE_CHECKING, Iterable, List, Optional
+from typing import TYPE_CHECKING
import click
@@ -13,6 +13,7 @@
from ..utils import create_origin_task_dicts
if TYPE_CHECKING:
+ from typing import Iterable, List, Optional
from uuid import UUID
from ..interface import SchedulerInterface
@@ -180,59 +181,40 @@
default=None,
help="Limit origins to those listed from lister with instance name",
)
-@click.argument("type", type=str)
+@click.argument("visit_type_name", type=str)
@click.pass_context
-def send_to_celery(
+def send_to_celery_cli(
ctx,
policy: str,
queue: Optional[str],
tablesample: Optional[float],
- type: str,
+ visit_type_name: str,
enabled: bool,
lister_name: Optional[str] = None,
lister_instance_name: Optional[str] = None,
):
- """Send the next origin visits of the TYPE loader to celery, filling the queue."""
- from kombu.utils.uuid import uuid
+ """Send next origin visits of VISIT_TYPE_NAME to celery, filling the queue."""
- from swh.scheduler.celery_backend.config import app, get_available_slots
+ from .utils import get_task_type, send_to_celery
scheduler = ctx.obj["scheduler"]
- task_type = scheduler.get_task_type(f"load-{type}")
-
- task_name = task_type["backend_name"]
- queue_name = queue or task_name
-
- num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
+ task_type = get_task_type(scheduler, visit_type_name)
+ if not task_type:
+ raise ValueError(f"Unknown task type {task_type}.")
- click.echo(f"{num_tasks} slots available in celery queue")
+ queue_name = queue or task_type["backend_name"]
- lister_uuid: Optional[str] = None
- if lister_name and lister_instance_name:
- lister = scheduler.get_lister(lister_name, lister_instance_name)
- if lister:
- lister_uuid = lister.id
-
- origins = scheduler.grab_next_visits(
- type,
- num_tasks,
+ send_to_celery(
+ scheduler,
+ visit_type_to_queue={visit_type_name: queue_name},
policy=policy,
tablesample=tablesample,
enabled=enabled,
- lister_uuid=lister_uuid,
+ lister_name=lister_name,
+ lister_instance_name=lister_instance_name,
)
- click.echo(f"{len(origins)} visits to send to celery")
- for task_dict in create_origin_task_dicts(origins, scheduler):
- app.send_task(
- task_name,
- task_id=uuid(),
- args=task_dict["arguments"]["args"],
- kwargs=task_dict["arguments"]["kwargs"],
- queue=queue_name,
- )
-
@origin.command("update-metrics")
@click.option("--lister", default=None, help="Only update metrics for this lister")
diff --git a/swh/scheduler/cli/utils.py b/swh/scheduler/cli/utils.py
--- a/swh/scheduler/cli/utils.py
+++ b/swh/scheduler/cli/utils.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -6,8 +6,18 @@
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
import click
+if TYPE_CHECKING:
+ from typing import Dict, Optional
+
+ from swh.scheduler.interface import SchedulerInterface
+
+
TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler
@@ -100,3 +110,70 @@
args = [parse_argument(x) for x in options if "=" not in x]
kw = {k: parse_argument(v) for (k, v) in kw_pairs}
return (args, kw)
+
+
+def get_task_type(scheduler: SchedulerInterface, visit_type: str) -> Optional[Dict]:
+ "Given a visit type, return its associated task type."
+ return scheduler.get_task_type(f"load-{visit_type}")
+
+
+def send_to_celery(
+ scheduler: SchedulerInterface,
+ visit_type_to_queue: Dict[str, str],
+ enabled: bool = True,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
+ policy: str = "oldest_scheduled_first",
+ tablesample: Optional[float] = None,
+):
+ """Utility function to read tasks from the scheduler and send those directly to
+ celery.
+
+ Args:
+ visit_type_to_queue: Optional mapping of visit/loader type (e.g git, svn, ...)
+ to queue to send task to.
+ enabled: Determine whether we want to list enabled or disabled origins. As
+ default, we want reasonably enabled origins. For some edge case, we might
+ want the others.
+ lister_name: Determine the list of origins listed from the lister with name
+ lister_instance_name: Determine the list of origins listed from the lister
+ with instance name
+ policy: the scheduling policy used to select which visits to schedule
+ tablesample: the percentage of the table on which we run the query
+ (None: no sampling)
+
+ """
+
+ from kombu.utils.uuid import uuid
+
+ from swh.scheduler.celery_backend.config import app, get_available_slots
+
+ from ..utils import create_origin_task_dicts
+
+ for visit_type_name, queue_name in visit_type_to_queue.items():
+ task_type = get_task_type(scheduler, visit_type_name)
+ assert task_type is not None
+ task_name = task_type["backend_name"]
+ num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
+
+ click.echo(f"{num_tasks} slots available in celery queue")
+
+ origins = scheduler.grab_next_visits(
+ visit_type_name,
+ num_tasks,
+ policy=policy,
+ tablesample=tablesample,
+ enabled=enabled,
+ lister_name=lister_name,
+ lister_instance_name=lister_instance_name,
+ )
+
+ click.echo(f"{len(origins)} visits to send to celery")
+ for task_dict in create_origin_task_dicts(origins, scheduler):
+ app.send_task(
+ task_name,
+ task_id=uuid(),
+ args=task_dict["arguments"]["args"],
+ kwargs=task_dict["arguments"]["kwargs"],
+ queue=queue_name,
+ )
diff --git a/swh/scheduler/tests/test_cli_add_forge_now.py b/swh/scheduler/tests/test_cli_add_forge_now.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_add_forge_now.py
@@ -0,0 +1,88 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Tuple
+
+import attr
+import pytest
+
+from swh.scheduler.tests.common import TASK_TYPES
+from swh.scheduler.tests.test_cli import invoke as basic_invoke
+
+
+def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False):
+ return basic_invoke(
+ scheduler, args=["add-forge-now", *args], catch_exceptions=catch_exceptions
+ )
+
+
+def test_schedule_first_visits_cli_unknown_visit_type(
+ swh_scheduler,
+):
+ "Calling cli without a known visit type should raise"
+ with pytest.raises(ValueError, match="Unknown"):
+ invoke(
+ swh_scheduler,
+ args=(
+ "schedule-first-visits",
+ "-t",
+ "unknown-vt0",
+ "--type-name",
+ "unknown-visit-type1",
+ ),
+ )
+
+
+@pytest.mark.parametrize(
+ "extra_cmd_args",
+ [
+ [],
+ ["--lister-name", "github", "--lister-instance-name", "github"],
+ ["--staging"],
+ ],
+)
+def test_schedule_first_visits_cli(
+ mocker,
+ swh_scheduler,
+ swh_scheduler_celery_app,
+ listed_origins_by_type,
+ extra_cmd_args,
+):
+ for task_type in TASK_TYPES.values():
+ swh_scheduler.create_task_type(task_type)
+
+ visit_type = next(iter(listed_origins_by_type))
+
+ # enabled origins by default except when --staging flag is provided
+ enabled = "--staging" not in extra_cmd_args
+
+ for origins in listed_origins_by_type.values():
+ swh_scheduler.record_listed_origins(
+ (attr.evolve(o, enabled=enabled) for o in origins)
+ )
+
+ get_queue_length = mocker.patch(
+ "swh.scheduler.celery_backend.config.get_queue_length"
+ )
+ get_queue_length.return_value = None
+
+ send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task")
+ send_task.return_value = None
+
+ cmd_args = ["schedule-first-visits", "--type-name", visit_type] + extra_cmd_args
+
+ result = invoke(swh_scheduler, args=tuple(cmd_args))
+ assert result.exit_code == 0
+
+ scheduled_tasks = {
+ (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list
+ }
+
+ expected_tasks = {
+ (TASK_TYPES[origin.visit_type]["backend_name"], origin.url)
+ for origin in listed_origins_by_type[visit_type]
+ }
+
+ assert expected_tasks == scheduled_tasks
diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py
--- a/swh/scheduler/tests/test_cli_origin.py
+++ b/swh/scheduler/tests/test_cli_origin.py
@@ -114,6 +114,14 @@
assert scheduled_tasks <= all_possible_tasks
+def test_send_to_celery_unknown_visit_type(
+ swh_scheduler,
+):
+ "Calling cli without a known visit type should raise"
+ with pytest.raises(ValueError, match="Unknown"):
+ invoke(swh_scheduler, args=("send-to-celery", "unknown-visit-type"))
+
+
@pytest.mark.parametrize(
"extra_cmd_args",
[[], ["--lister-name", "github", "--lister-instance-name", "github"]],

File Metadata

Mime Type
text/plain
Expires
Sun, Aug 17, 7:00 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214909

Event Timeline