Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | |||||
import json | import json | ||||
import logging | import logging | ||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | ||||
from uuid import UUID | from uuid import UUID | ||||
import attr | import attr | ||||
from psycopg2.errors import CardinalityViolation | from psycopg2.errors import CardinalityViolation | ||||
import psycopg2.extras | import psycopg2.extras | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.common import db_transaction | from swh.core.db.common import db_transaction | ||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||
from .exc import SchedulerException, StaleData, UnknownPolicy | from .exc import SchedulerException, StaleData, UnknownPolicy | ||||
from .model import ( | from .model import ( | ||||
ListedOrigin, | ListedOrigin, | ||||
ListedOriginPageToken, | ListedOriginPageToken, | ||||
Lister, | Lister, | ||||
OriginVisitStats, | OriginVisitStats, | ||||
PaginatedListedOriginList, | PaginatedListedOriginList, | ||||
SchedulerMetrics, | |||||
) | ) | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | ||||
psycopg2.extras.register_uuid() | psycopg2.extras.register_uuid() | ||||
▲ Show 20 Lines • Show All 793 Lines • ▼ Show 20 Lines | ) -> Optional[OriginVisitStats]: | ||||
) | ) | ||||
cur.execute(query, (url, visit_type)) | cur.execute(query, (url, visit_type)) | ||||
row = cur.fetchone() | row = cur.fetchone() | ||||
if row: | if row: | ||||
return OriginVisitStats(**row) | return OriginVisitStats(**row) | ||||
else: | else: | ||||
return None | return None | ||||
@db_transaction() | |||||
def update_metrics( | |||||
self, | |||||
lister_id: Optional[UUID] = None, | |||||
timestamp: Optional[datetime.datetime] = None, | |||||
db=None, | |||||
cur=None, | |||||
) -> List[SchedulerMetrics]: | |||||
"""Update the performance metrics of this scheduler instance. | |||||
Returns the updated metrics. | |||||
Args: | |||||
lister_id: if passed, update the metrics only for this lister instance | |||||
timestamp: if passed, the date at which we're updating the metrics, | |||||
defaults to the database NOW() | |||||
""" | |||||
query = format_query( | |||||
"SELECT {keys} FROM update_metrics(%s, %s)", | |||||
SchedulerMetrics.select_columns(), | |||||
) | |||||
cur.execute(query, (lister_id, timestamp)) | |||||
return [SchedulerMetrics(**row) for row in cur.fetchall()] | |||||
@db_transaction() | |||||
def get_metrics( | |||||
self, | |||||
lister_id: Optional[UUID] = None, | |||||
visit_type: Optional[str] = None, | |||||
db=None, | |||||
cur=None, | |||||
) -> List[SchedulerMetrics]: | |||||
"""Retrieve the performance metrics of this scheduler instance. | |||||
Args: | |||||
lister_id: filter the metrics for this lister instance only | |||||
visit_type: filter the metrics for this visit type only | |||||
""" | |||||
where_filters = [] | |||||
where_args = [] | |||||
if lister_id: | |||||
where_filters.append("lister_id = %s") | |||||
where_args.append(str(lister_id)) | |||||
if visit_type: | |||||
where_filters.append("visit_type = %s") | |||||
where_args.append(visit_type) | |||||
where_clause = "" | |||||
if where_filters: | |||||
where_clause = f"where {' and '.join(where_filters)}" | |||||
query = format_query( | |||||
"SELECT {keys} FROM scheduler_metrics %s" % where_clause, | |||||
SchedulerMetrics.select_columns(), | |||||
) | |||||
cur.execute(query, tuple(where_args)) | |||||
return [SchedulerMetrics(**row) for row in cur.fetchall()] |