Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
import logging | import logging | ||||
from arrow import Arrow, utcnow | from arrow import Arrow, utcnow | ||||
import attr | import attr | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from typing import Any, Dict, Optional | from typing import Any, Dict, Iterable, List, Optional | ||||
from psycopg2.extensions import AsIs | from psycopg2.extensions import AsIs | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.common import db_transaction | from swh.core.db.common import db_transaction | ||||
from .exc import StaleData | from .exc import StaleData | ||||
from .model import Lister | from .model import Lister, ListedOrigin | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def adapt_arrow(arrow): | def adapt_arrow(arrow): | ||||
return AsIs("'%s'::timestamptz" % arrow.isoformat()) | return AsIs("'%s'::timestamptz" % arrow.isoformat()) | ||||
▲ Show 20 Lines • Show All 162 Lines • ▼ Show 20 Lines | def update_lister(self, lister: Lister, db=None, cur=None) -> Lister: | ||||
cur.execute(query, attr.asdict(lister)) | cur.execute(query, attr.asdict(lister)) | ||||
updated = cur.fetchone() | updated = cur.fetchone() | ||||
if not updated: | if not updated: | ||||
raise StaleData("Stale data; Lister state not updated") | raise StaleData("Stale data; Lister state not updated") | ||||
return Lister(**updated) | return Lister(**updated) | ||||
@db_transaction() | |||||
def record_listed_origins( | |||||
self, listed_origins: Iterable[ListedOrigin], db=None, cur=None | |||||
) -> List[ListedOrigin]: | |||||
"""Record a set of origins that a lister has listed. | |||||
This performs an "upsert": origins with the same (lister_id, url, | |||||
visit_type) values are updated with new values for | |||||
extra_loader_arguments, last_update and last_seen. | |||||
""" | |||||
pk_cols = ListedOrigin.primary_key_columns() | |||||
select_cols = ListedOrigin.select_columns() | |||||
insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars() | |||||
upsert_cols = [col for col in insert_cols if col not in pk_cols] | |||||
upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols) | |||||
query = f"""INSERT into listed_origins ({", ".join(insert_cols)}) | |||||
VALUES %s | |||||
ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE | |||||
SET {upsert_set} | |||||
RETURNING {", ".join(select_cols)} | |||||
""" | |||||
ret = psycopg2.extras.execute_values( | |||||
cur=cur, | |||||
sql=query, | |||||
argslist=(attr.asdict(origin) for origin in listed_origins), | |||||
template=f"({', '.join(insert_meta)})", | |||||
page_size=1000, | |||||
fetch=True, | |||||
) | |||||
return [ListedOrigin(**d) for d in ret] | |||||
task_create_keys = [ | task_create_keys = [ | ||||
"type", | "type", | ||||
"arguments", | "arguments", | ||||
"next_run", | "next_run", | ||||
"policy", | "policy", | ||||
"status", | "status", | ||||
"retries_left", | "retries_left", | ||||
"priority", | "priority", | ||||
▲ Show 20 Lines • Show All 431 Lines • Show Last 20 Lines |