Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123593
D8922.id32146.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D8922.id32146.diff
View Options
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -376,6 +376,8 @@
policy: str,
enabled: bool = True,
lister_uuid: Optional[str] = None,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
timestamp: Optional[datetime.datetime] = None,
absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12),
scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
@@ -390,6 +392,10 @@
origin_select_cols = ", ".join(ListedOrigin.select_columns())
+ joins: Dict[str, str] = {
+ "origin_visit_stats": "USING (url, visit_type)",
+ }
+
query_args: List[Any] = []
where_clauses = []
@@ -511,14 +517,27 @@
where_clauses.append("lister_id = %s")
query_args.append(lister_uuid)
+ if lister_name:
+ joins["listers"] = "on listed_origins.lister_id=listers.id"
+ where_clauses.append("listers.name = %s")
+ query_args.append(lister_name)
+
+ if lister_instance_name:
+ joins["listers"] = "on listed_origins.lister_id=listers.id"
+ where_clauses.append("listers.instance_name = %s")
+ query_args.append(lister_instance_name)
+
+ join_clause = "\n".join(
+ f"left join {table} {clause}" for table, clause in joins.items()
+ )
+
# fmt: off
common_table_expressions.insert(0, ("selected_origins", f"""
SELECT
{origin_select_cols}, next_visit_queue_position
FROM
{table}
- LEFT JOIN
- origin_visit_stats USING (url, visit_type)
+ {join_clause}
WHERE
({") AND (".join(where_clauses)})
ORDER BY
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -175,6 +175,16 @@
default=None,
help="Limit origins to those listed from such lister",
)
+@click.option(
+ "--lister-name",
+ default=None,
+ help="Limit origins to those listed from such lister",
+)
+@click.option(
+ "--lister-instance-name",
+ default=None,
+ help="Limit origins to those listed from such lister",
+)
@click.argument("type", type=str)
@click.pass_context
def send_to_celery(
@@ -185,6 +195,8 @@
type: str,
enabled: bool,
lister_uuid: Optional[str] = None,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
):
"""Send the next origin visits of the TYPE loader to celery, filling the queue."""
from kombu.utils.uuid import uuid
@@ -201,6 +213,12 @@
num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
click.echo(f"{num_tasks} slots available in celery queue")
+
+ if not lister_uuid and lister_name and lister_instance_name:
+ lister = scheduler.get_lister(lister_name, lister_uuid)
+ if lister:
+ lister_uuid = lister.id
+
origins = scheduler.grab_next_visits(
type,
num_tasks,
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -413,6 +413,8 @@
policy: str,
enabled: bool = True,
lister_uuid: Optional[str] = None,
+ lister_name: Optional[str] = None,
+ lister_instance_name: Optional[str] = None,
timestamp: Optional[datetime.datetime] = None,
absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12),
scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
@@ -434,6 +436,9 @@
default, we want reasonably enabled origins. For some edge case, we might
want the others.
lister_uuid: Determine the list of origins listed from the lister with uuid
+ lister_name: Determine the list of origins listed from the lister with name
+ lister_instance_name: Determine the list of origins listed from the lister
+ with instance name
timestamp: the mocked timestamp at which we're recording that the visits are
being scheduled (defaults to the current time)
absolute_cooldown: the minimal interval between two visits of the same origin
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -827,15 +827,15 @@
assert ret.next_page_token is None
assert len(ret.results) == len(listed_origins_with_non_enabled)
- def _grab_next_visits_setup(self, swh_scheduler, listed_origins_by_type):
+ def _grab_next_visits_setup(self, swh_scheduler, listed_origins_by_type, limit=100):
"""Basic origins setup for scheduling policy tests"""
visit_type = next(iter(listed_origins_by_type))
- origins = listed_origins_by_type[visit_type][:100]
- assert len(origins) > 0
- recorded_origins = swh_scheduler.record_listed_origins(origins)
+ all_origins = listed_origins_by_type[visit_type]
+ origins = all_origins[:limit] if limit else all_origins
+ assert len(origins) > 0
- return visit_type, recorded_origins
+ return visit_type, swh_scheduler.record_listed_origins(origins)
def _check_grab_next_visit_basic(
self, swh_scheduler, visit_type, policy, expected, **kwargs
@@ -1303,6 +1303,30 @@
expected=expected_origins,
)
+ def test_grab_next_visit_for_specific_lister(
+ self, swh_scheduler, listed_origins_by_type, stored_lister
+ ):
+ """Checks grab_next_visits filters on the given lister {name, instance name}"""
+
+ visit_type, origins = self._grab_next_visits_setup(
+ swh_scheduler, listed_origins_by_type, limit=None
+ )
+
+ expected_origins = [origin for origin in listed_origins_by_type[visit_type]]
+
+ ret = swh_scheduler.grab_next_visits(
+ visit_type=visit_type,
+ count=len(expected_origins),
+ policy="never_visited_oldest_update_first",
+ # Filtering on lister id
+ lister_name=stored_lister.name,
+ lister_instance_name=stored_lister.instance_name,
+ )
+
+ assert len(ret) == len(expected_origins)
+ for origin in ret:
+ assert origin.lister_id == stored_lister.id
+
def _create_task_types(self, scheduler):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 19, 12:05 PM (14 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218602
Attached To
D8922: send-to-celery: Adapt to schedule from lister name & instance_name
Event Timeline
Log In to Comment