Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
Show First 20 Lines • Show All 126 Lines • ▼ Show 20 Lines | class SchedulerBackend: | ||||
@db_transaction() | @db_transaction() | ||||
def get_task_types(self, db=None, cur=None): | def get_task_types(self, db=None, cur=None): | ||||
"""Retrieve all registered task types""" | """Retrieve all registered task types""" | ||||
query = format_query("select {keys} from task_type", self.task_type_keys,) | query = format_query("select {keys} from task_type", self.task_type_keys,) | ||||
cur.execute(query) | cur.execute(query) | ||||
return cur.fetchall() | return cur.fetchall() | ||||
@db_transaction() | @db_transaction() | ||||
def get_listers(self, db=None, cur=None) -> List[Lister]: | |||||
"""Retrieve information about all listers from the database. | |||||
""" | |||||
select_cols = ", ".join(Lister.select_columns()) | |||||
query = f""" | |||||
select {select_cols} from listers | |||||
""" | |||||
cur.execute(query) | |||||
return [Lister(**ret) for ret in cur.fetchall()] | |||||
@db_transaction() | |||||
def get_lister( | def get_lister( | ||||
self, name: str, instance_name: Optional[str] = None, db=None, cur=None | self, name: str, instance_name: Optional[str] = None, db=None, cur=None | ||||
) -> Optional[Lister]: | ) -> Optional[Lister]: | ||||
"""Retrieve information about the given instance of the lister from the | """Retrieve information about the given instance of the lister from the | ||||
database. | database. | ||||
""" | """ | ||||
if instance_name is None: | if instance_name is None: | ||||
instance_name = "" | instance_name = "" | ||||
▲ Show 20 Lines • Show All 854 Lines • Show Last 20 Lines |