Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
Show First 20 Lines • Show All 591 Lines • ▼ Show 20 Lines | def get_tasks(self, task_ids, db=None, cur=None): | ||||
"""Retrieve the info of tasks whose ids are listed.""" | """Retrieve the info of tasks whose ids are listed.""" | ||||
query = format_query("select {keys} from task where id in %s", self.task_keys) | query = format_query("select {keys} from task where id in %s", self.task_keys) | ||||
cur.execute(query, (tuple(task_ids),)) | cur.execute(query, (tuple(task_ids),)) | ||||
return cur.fetchall() | return cur.fetchall() | ||||
@db_transaction() | @db_transaction() | ||||
def peek_ready_tasks( | def peek_ready_tasks( | ||||
self, | self, | ||||
task_type, | task_type: str, | ||||
timestamp=None, | timestamp: Optional[datetime.datetime] = None, | ||||
num_tasks=None, | num_tasks: Optional[int] = None, | ||||
num_tasks_priority=None, | |||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
): | ) -> List[Dict]: | ||||
"""Fetch the list of ready tasks | |||||
Args: | |||||
task_type (str): filtering task per their type | |||||
timestamp (datetime.datetime): peek tasks that need to be executed | |||||
before that timestamp | |||||
num_tasks (int): only peek at num_tasks tasks (with no priority) | |||||
num_tasks_priority (int): only peek at num_tasks_priority | |||||
tasks (with priority) | |||||
Returns: | |||||
a list of tasks | |||||
""" | |||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cur.execute( | cur.execute( | ||||
"""select * from swh_scheduler_peek_ready_tasks( | """select * from swh_scheduler_peek_no_priority_tasks( | ||||
%s, %s, %s :: bigint, %s :: bigint)""", | %s, %s, %s :: bigint)""", | ||||
(task_type, timestamp, num_tasks, num_tasks_priority), | (task_type, timestamp, num_tasks), | ||||
) | ) | ||||
logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) | logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) | ||||
return cur.fetchall() | return cur.fetchall() | ||||
@db_transaction() | @db_transaction() | ||||
def grab_ready_tasks( | def grab_ready_tasks( | ||||
self, | self, | ||||
task_type, | task_type: str, | ||||
timestamp=None, | timestamp: Optional[datetime.datetime] = None, | ||||
num_tasks=None, | num_tasks: Optional[int] = None, | ||||
num_tasks_priority=None, | |||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
): | ) -> List[Dict]: | ||||
"""Fetch the list of ready tasks, and mark them as scheduled | |||||
Args: | |||||
task_type (str): filtering task per their type | |||||
timestamp (datetime.datetime): grab tasks that need to be executed | |||||
before that timestamp | |||||
num_tasks (int): only grab num_tasks tasks (with no priority) | |||||
num_tasks_priority (int): only grab oneshot num_tasks tasks (with | |||||
priorities) | |||||
Returns: | |||||
a list of tasks | |||||
""" | |||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cur.execute( | cur.execute( | ||||
"""select * from swh_scheduler_grab_ready_tasks( | """select * from swh_scheduler_grab_ready_tasks( | ||||
%s, %s, %s :: bigint, %s :: bigint)""", | %s, %s, %s :: bigint)""", | ||||
(task_type, timestamp, num_tasks, num_tasks_priority), | (task_type, timestamp, num_tasks), | ||||
) | ) | ||||
logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) | logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) | ||||
return cur.fetchall() | return cur.fetchall() | ||||
@db_transaction() | @db_transaction() | ||||
def peek_ready_priority_tasks( | def peek_ready_priority_tasks( | ||||
self, | self, | ||||
task_type: str, | task_type: str, | ||||
▲ Show 20 Lines • Show All 252 Lines • ▼ Show 20 Lines | def get_task_runs(self, task_ids, limit=None, db=None, cur=None): | ||||
query = "select * from task_run where " + " and ".join(where) | query = "select * from task_run where " + " and ".join(where) | ||||
if limit: | if limit: | ||||
query += " limit %s :: bigint" | query += " limit %s :: bigint" | ||||
args.append(limit) | args.append(limit) | ||||
cur.execute(query, args) | cur.execute(query, args) | ||||
return cur.fetchall() | return cur.fetchall() | ||||
@db_transaction() | @db_transaction() | ||||
def get_priority_ratios(self, db=None, cur=None): | |||||
cur.execute("select id, ratio from priority_ratio") | |||||
return {row["id"]: row["ratio"] for row in cur.fetchall()} | |||||
@db_transaction() | |||||
def origin_visit_stats_upsert( | def origin_visit_stats_upsert( | ||||
self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None | self, origin_visit_stats: Iterable[OriginVisitStats], db=None, cur=None | ||||
) -> None: | ) -> None: | ||||
pk_cols = OriginVisitStats.primary_key_columns() | pk_cols = OriginVisitStats.primary_key_columns() | ||||
insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() | insert_cols, insert_meta = OriginVisitStats.insert_columns_and_metavars() | ||||
query = f""" | query = f""" | ||||
INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) | INSERT into origin_visit_stats AS ovi ({", ".join(insert_cols)}) | ||||
▲ Show 20 Lines • Show All 102 Lines • Show Last 20 Lines |