Page MenuHomeSoftware Heritage

D8922.id32146.diff
No OneTemporary

D8922.id32146.diff

diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -376,6 +376,8 @@
policy: str,
enabled: bool = True,
lister_uuid: Optional[str] = None,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
timestamp: Optional[datetime.datetime] = None,
absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12),
scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
@@ -390,6 +392,10 @@
origin_select_cols = ", ".join(ListedOrigin.select_columns())
+ joins: Dict[str, str] = {
+ "origin_visit_stats": "USING (url, visit_type)",
+ }
+
query_args: List[Any] = []
where_clauses = []
@@ -511,14 +517,27 @@
where_clauses.append("lister_id = %s")
query_args.append(lister_uuid)
+ if lister_name:
+ joins["listers"] = "on listed_origins.lister_id=listers.id"
+ where_clauses.append("listers.name = %s")
+ query_args.append(lister_name)
+
+ if lister_instance_name:
+ joins["listers"] = "on listed_origins.lister_id=listers.id"
+ where_clauses.append("listers.instance_name = %s")
+ query_args.append(lister_instance_name)
+
+ join_clause = "\n".join(
+ f"left join {table} {clause}" for table, clause in joins.items()
+ )
+
# fmt: off
common_table_expressions.insert(0, ("selected_origins", f"""
SELECT
{origin_select_cols}, next_visit_queue_position
FROM
{table}
- LEFT JOIN
- origin_visit_stats USING (url, visit_type)
+ {join_clause}
WHERE
({") AND (".join(where_clauses)})
ORDER BY
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -175,6 +175,16 @@
default=None,
help="Limit origins to those listed from such lister",
)
+@click.option(
+ "--lister-name",
+ default=None,
+ help="Limit origins to those listed from such lister",
+)
+@click.option(
+ "--lister-instance-name",
+ default=None,
+ help="Limit origins to those listed from such lister",
+)
@click.argument("type", type=str)
@click.pass_context
def send_to_celery(
@@ -185,6 +195,8 @@
type: str,
enabled: bool,
lister_uuid: Optional[str] = None,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
):
"""Send the next origin visits of the TYPE loader to celery, filling the queue."""
from kombu.utils.uuid import uuid
@@ -201,6 +213,12 @@
num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
click.echo(f"{num_tasks} slots available in celery queue")
+
+ if not lister_uuid and lister_name and lister_instance_name:
+ lister = scheduler.get_lister(lister_name, lister_uuid)
+ if lister:
+ lister_uuid = lister.id
+
origins = scheduler.grab_next_visits(
type,
num_tasks,
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -413,6 +413,8 @@
policy: str,
enabled: bool = True,
lister_uuid: Optional[str] = None,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
timestamp: Optional[datetime.datetime] = None,
absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12),
scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
@@ -434,6 +436,9 @@
default, we want reasonably enabled origins. For some edge case, we might
want the others.
lister_uuid: Determine the list of origins listed from the lister with uuid
+ lister_name: Determine the list of origins listed from the lister with name
+ lister_instance_name: Determine the list of origins listed from the lister
+ with instance name
timestamp: the mocked timestamp at which we're recording that the visits are
being scheduled (defaults to the current time)
absolute_cooldown: the minimal interval between two visits of the same origin
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -827,15 +827,15 @@
assert ret.next_page_token is None
assert len(ret.results) == len(listed_origins_with_non_enabled)
- def _grab_next_visits_setup(self, swh_scheduler, listed_origins_by_type):
+ def _grab_next_visits_setup(self, swh_scheduler, listed_origins_by_type, limit=100):
"""Basic origins setup for scheduling policy tests"""
visit_type = next(iter(listed_origins_by_type))
- origins = listed_origins_by_type[visit_type][:100]
- assert len(origins) > 0
- recorded_origins = swh_scheduler.record_listed_origins(origins)
+ all_origins = listed_origins_by_type[visit_type]
+ origins = all_origins[:limit] if limit else all_origins
+ assert len(origins) > 0
- return visit_type, recorded_origins
+ return visit_type, swh_scheduler.record_listed_origins(origins)
def _check_grab_next_visit_basic(
self, swh_scheduler, visit_type, policy, expected, **kwargs
@@ -1303,6 +1303,30 @@
expected=expected_origins,
)
+ def test_grab_next_visit_for_specific_lister(
+ self, swh_scheduler, listed_origins_by_type, stored_lister
+ ):
+ """Checks grab_next_visits filters on the given lister {name, instance name}"""
+
+ visit_type, origins = self._grab_next_visits_setup(
+ swh_scheduler, listed_origins_by_type, limit=None
+ )
+
+ expected_origins = [origin for origin in listed_origins_by_type[visit_type]]
+
+ ret = swh_scheduler.grab_next_visits(
+ visit_type=visit_type,
+ count=len(expected_origins),
+ policy="never_visited_oldest_update_first",
+ # Filtering on lister id
+ lister_name=stored_lister.name,
+ lister_instance_name=stored_lister.instance_name,
+ )
+
+ assert len(ret) == len(expected_origins)
+ for origin in ret:
+ assert origin.lister_id == stored_lister.id
+
def _create_task_types(self, scheduler):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 12:05 PM (18 h, 9 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218602

Event Timeline