Changeset View
Standalone View
swh/scheduler/backend.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
import logging | import logging | ||||
from arrow import Arrow, utcnow | from arrow import Arrow, utcnow | ||||
import attr | |||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from typing import Any, Dict, Optional | from typing import Any, Dict, Optional | ||||
from psycopg2.extensions import AsIs | from psycopg2.extensions import AsIs | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.common import db_transaction | from swh.core.db.common import db_transaction | ||||
from .model import Lister | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def adapt_arrow(arrow): | def adapt_arrow(arrow): | ||||
return AsIs("'%s'::timestamptz" % arrow.isoformat()) | return AsIs("'%s'::timestamptz" % arrow.isoformat()) | ||||
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | ||||
psycopg2.extensions.register_adapter(Arrow, adapt_arrow) | psycopg2.extensions.register_adapter(Arrow, adapt_arrow) | ||||
psycopg2.extras.register_uuid() | |||||
def format_query(query, keys): | def format_query(query, keys): | ||||
"""Format a query with the given keys""" | """Format a query with the given keys""" | ||||
query_keys = ", ".join(keys) | query_keys = ", ".join(keys) | ||||
placeholders = ", ".join(["%s"] * len(keys)) | placeholders = ", ".join(["%s"] * len(keys)) | ||||
▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | class SchedulerBackend: | ||||
@db_transaction() | @db_transaction() | ||||
def get_task_types(self, db=None, cur=None): | def get_task_types(self, db=None, cur=None): | ||||
"""Retrieve all registered task types""" | """Retrieve all registered task types""" | ||||
query = format_query("select {keys} from task_type", self.task_type_keys,) | query = format_query("select {keys} from task_type", self.task_type_keys,) | ||||
cur.execute(query) | cur.execute(query) | ||||
return cur.fetchall() | return cur.fetchall() | ||||
@db_transaction() | |||||
def get_or_create_lister( | |||||
douardda: why a single 'get_or_create' function here?
At first sight, this looks a weird API design. I… | |||||
olasdAuthorUnsubmitted Done Inline ActionsThe intent is to let listers call this function once on every run so they can retrieve their UUID and "stored state", from their name and relevant arguments. In the case of github / gitlab, the "stored state" would be, for instance, the numeric id of the latest repository lister (so an incremental lister could pick it up). Having two separate columns for "lister name" and "lister arguments" felt cleaner, as it makes it easier to, e.g., count the instances of a given lister "family". The long(er) term goal is to get rid of the lister-specific databases, at least for listers that only send simple loading tasks to the scheduler. What alternative API design would you be proposing? I'm not sure I understand the gitlab lister question, but no, we don't expect several instances of listers with the same (name, arguments) tuple. olasd: The intent is to let listers call this function once on every run so they can retrieve their… | |||||
olasdAuthorUnsubmitted Done Inline Actions(basically, the API would have one "get or create" called at the start of listing, and one "update" called at the end of listing) olasd: (basically, the API would have one "get or create" called at the start of listing, and one… | |||||
self, name: str, arguments: Optional[Dict[str, str]] = None, db=None, cur=None | |||||
) -> Lister: | |||||
"""Retrieve (or create) the lister with the given name and given arguments.""" | |||||
if arguments is None: | |||||
arguments = {} | |||||
select_cols = ", ".join(Lister.select_columns()) | |||||
insert_cols, insert_meta = ( | |||||
", ".join(tup) for tup in Lister.insert_columns_and_metavars() | |||||
) | |||||
query = f""" | |||||
with added as ( | |||||
insert into listers ({insert_cols}) values ({insert_meta}) | |||||
on conflict do nothing | |||||
returning {select_cols} | |||||
) | |||||
select {select_cols} from added | |||||
union all | |||||
select {select_cols} from listers | |||||
where (name, arguments) = (%(name)s, %(arguments)s); | |||||
""" | |||||
cur.execute(query, attr.asdict(Lister(name=name, arguments=arguments))) | |||||
return Lister(**cur.fetchone()) | |||||
task_create_keys = [ | task_create_keys = [ | ||||
"type", | "type", | ||||
"arguments", | "arguments", | ||||
"next_run", | "next_run", | ||||
"policy", | "policy", | ||||
"status", | "status", | ||||
"retries_left", | "retries_left", | ||||
"priority", | "priority", | ||||
▲ Show 20 Lines • Show All 431 Lines • Show Last 20 Lines |
why a single 'get_or_create' function here?
At first sight, this looks a weird API design. I understand we need to cover a variety of cases (especially github-style v.s. gitlab-style, i.e. singleton vs. multiple instance), but it looks weird to me to handle this like that.
Do we expect to have several gitlab listers with the same "human readable identifier" (namely name + arguments["instance"])?