diff --git a/sql/updates/25.sql b/sql/updates/25.sql new file mode 100644 --- /dev/null +++ b/sql/updates/25.sql @@ -0,0 +1,64 @@ +insert into dbversion (version, release, description) + values (25, now(), 'Work In Progress'); + +create table scheduler_metrics ( + lister_id uuid not null references listers(id), + visit_type text not null, + last_update timestamptz not null, + origins_known int not null default 0, + origins_enabled int not null default 0, + origins_never_visited int not null default 0, + origins_with_pending_changes int not null default 0, + + primary key (lister_id, visit_type) +); + +comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; +comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; +comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; +comment on column scheduler_metrics.last_update is 'Last update of these metrics'; +comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; +comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; +comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been successfully visited'; +comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; + + +create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) + returns setof scheduler_metrics + language sql +as $$ + insert into scheduler_metrics ( + lister_id, visit_type, last_update, + origins_known, origins_enabled, + origins_never_visited, origins_with_pending_changes + ) + select + lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, + count(*) as origins_known, + count(*) filter (where enabled) as origins_enabled, + count(*) filter (where + enabled and last_snapshot is NULL + ) as origins_never_visited, + count(*) filter (where + enabled and lo.last_update > greatest(ovs.last_eventful, ovs.last_uneventful) + ) as origins_with_pending_changes + from listed_origins lo + left join origin_visit_stats ovs using (url, visit_type) + where + -- update only for the requested lister + update_metrics.lister_id = lo.lister_id + -- or for all listers if the function argument is null + or update_metrics.lister_id is null + group by (lister_id, visit_type) + on conflict (lister_id, visit_type) do update + set + last_update = EXCLUDED.last_update, + origins_known = EXCLUDED.origins_known, + origins_enabled = EXCLUDED.origins_enabled, + origins_never_visited = EXCLUDED.origins_never_visited, + origins_with_pending_changes = EXCLUDED.origins_with_pending_changes + returning * +$$; + + +comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import json import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union @@ -24,6 +25,7 @@ Lister, OriginVisitStats, PaginatedListedOriginList, + SchedulerMetrics, ) logger = logging.getLogger(__name__) @@ -850,3 +852,63 @@ return OriginVisitStats(**row) else: return None + + @db_transaction() + def update_metrics( + self, + lister_id: Optional[UUID] = None, + timestamp: Optional[datetime.datetime] = None, + db=None, + cur=None, + ) -> List[SchedulerMetrics]: + """Update the performance metrics of this scheduler instance. + + Returns the updated metrics. + + Args: + lister_id: if passed, update the metrics only for this lister instance + timestamp: if passed, the date at which we're updating the metrics, + defaults to the database NOW() + """ + query = format_query( + "SELECT {keys} FROM update_metrics(%s, %s)", + SchedulerMetrics.select_columns(), + ) + cur.execute(query, (lister_id, timestamp)) + return [SchedulerMetrics(**row) for row in cur.fetchall()] + + @db_transaction() + def get_metrics( + self, + lister_id: Optional[UUID] = None, + visit_type: Optional[str] = None, + db=None, + cur=None, + ) -> List[SchedulerMetrics]: + """Retrieve the performance metrics of this scheduler instance. + + Args: + lister_id: filter the metrics for this lister instance only + visit_type: filter the metrics for this visit type only + """ + + where_filters = [] + where_args = [] + if lister_id: + where_filters.append("lister_id = %s") + where_args.append(str(lister_id)) + if visit_type: + where_filters.append("visit_type = %s") + where_args.append(visit_type) + + where_clause = "" + if where_filters: + where_clause = f"where {' and '.join(where_filters)}" + + query = format_query( + "SELECT {keys} FROM scheduler_metrics %s" % where_clause, + SchedulerMetrics.select_columns(), + ) + + cur.execute(query, tuple(where_args)) + return [SchedulerMetrics(**row) for row in cur.fetchall()] diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from typing import Any, Dict, Iterable, List, Optional from uuid import UUID @@ -15,6 +16,7 @@ Lister, OriginVisitStats, PaginatedListedOriginList, + SchedulerMetrics, ) @@ -340,3 +342,32 @@ ) -> Optional[OriginVisitStats]: """Retrieve the stats for an origin with a given visit type""" ... + + @remote_api_endpoint("scheduler_metrics/update") + def update_metrics( + self, + lister_id: Optional[UUID] = None, + timestamp: Optional[datetime.datetime] = None, + ) -> List[SchedulerMetrics]: + """Update the performance metrics of this scheduler instance. + + Returns the updated metrics. + + Args: + lister_id: if passed, update the metrics only for this lister instance + timestamp: if passed, the date at which we're updating the metrics, + defaults to the database NOW() + """ + ... + + @remote_api_endpoint("scheduler_metrics/get") + def get_metrics( + self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None + ) -> List[SchedulerMetrics]: + """Retrieve the performance metrics of this scheduler instance. + + Args: + lister_id: filter the metrics for this lister instance only + visit_type: filter the metrics for this visit type only + """ + ... diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -251,3 +251,35 @@ @last_notfound.validator def check_last_notfound(self, attribute, value): check_timestamptz(value) + + +@attr.s(frozen=True, slots=True) +class SchedulerMetrics(BaseSchedulerModel): + """Metrics for the scheduler, aggregated by (lister_id, visit_type)""" + + lister_id = attr.ib( + type=UUID, validator=[type_validator()], metadata={"primary_key": True} + ) + visit_type = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + + last_update = attr.ib( + type=Optional[datetime.datetime], validator=[type_validator()], default=None, + ) + + origins_known = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of known (enabled or disabled) origins""" + + origins_enabled = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of origins that were present in the latest listings""" + + origins_never_visited = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of enabled origins that have never been visited + (according to the visit cache)""" + + origins_with_pending_changes = attr.ib( + type=int, validator=[type_validator()], default=0 + ) + """Number of enabled origins with known activity (recorded by a lister) + since our last visit""" diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -11,7 +11,7 @@ comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (24, now(), 'Work In Progress'); + values (25, now(), 'Work In Progress'); create table task_type ( type text primary key, @@ -184,3 +184,25 @@ comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; comment on column origin_visit_stats.last_scheduled is 'Time when this origin was scheduled to be visited last'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; + + +create table scheduler_metrics ( + lister_id uuid not null references listers(id), + visit_type text not null, + last_update timestamptz not null, + origins_known int not null default 0, + origins_enabled int not null default 0, + origins_never_visited int not null default 0, + origins_with_pending_changes int not null default 0, + + primary key (lister_id, visit_type) +); + +comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; +comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; +comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; +comment on column scheduler_metrics.last_update is 'Last update of these metrics'; +comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; +comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; +comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been successfully visited'; +comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; diff --git a/swh/scheduler/sql/40-func.sql b/swh/scheduler/sql/40-func.sql --- a/swh/scheduler/sql/40-func.sql +++ b/swh/scheduler/sql/40-func.sql @@ -406,3 +406,43 @@ for each row when (new.status NOT IN ('scheduled', 'started')) execute procedure swh_scheduler_update_task_on_task_end (); + + +create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) + returns setof scheduler_metrics + language sql +as $$ + insert into scheduler_metrics ( + lister_id, visit_type, last_update, + origins_known, origins_enabled, + origins_never_visited, origins_with_pending_changes + ) + select + lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, + count(*) as origins_known, + count(*) filter (where enabled) as origins_enabled, + count(*) filter (where + enabled and last_snapshot is NULL + ) as origins_never_visited, + count(*) filter (where + enabled and lo.last_update > greatest(ovs.last_eventful, ovs.last_uneventful) + ) as origins_with_pending_changes + from listed_origins lo + left join origin_visit_stats ovs using (url, visit_type) + where + -- update only for the requested lister + update_metrics.lister_id = lo.lister_id + -- or for all listers if the function argument is null + or update_metrics.lister_id is null + group by (lister_id, visit_type) + on conflict (lister_id, visit_type) do update + set + last_update = EXCLUDED.last_update, + origins_known = EXCLUDED.origins_known, + origins_enabled = EXCLUDED.origins_enabled, + origins_never_visited = EXCLUDED.origins_never_visited, + origins_with_pending_changes = EXCLUDED.origins_with_pending_changes + returning * +$$; + +comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -48,6 +48,8 @@ "origins/grab_next", "origins/record", "priority_ratios/get", + "scheduler_metrics/get", + "scheduler_metrics/update", "task/create", "task/delete_archived", "task/disable", diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -8,7 +8,7 @@ import datetime import inspect import random -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import uuid import attr @@ -17,7 +17,12 @@ from swh.model.hashutil import hash_to_bytes from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy from swh.scheduler.interface import SchedulerInterface -from swh.scheduler.model import ListedOrigin, ListedOriginPageToken, OriginVisitStats +from swh.scheduler.model import ( + ListedOrigin, + ListedOriginPageToken, + OriginVisitStats, + SchedulerMetrics, +) from swh.scheduler.utils import utcnow from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template @@ -31,6 +36,14 @@ return {k: d[k] for k in keys if k not in excl} +def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: + return (m.lister_id, m.visit_type) + + +def assert_metrics_equal(left, right): + assert sorted(left, key=metrics_sort_key) == sorted(right, key=metrics_sort_key) + + class TestScheduler: def test_interface(self, swh_scheduler): """Checks all methods of SchedulerInterface are implemented by this @@ -981,3 +994,152 @@ ), ] ) + + def test_metrics_origins_known(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + ret = swh_scheduler.update_metrics() + + assert sum(metric.origins_known for metric in ret) == len(listed_origins) + + def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + disabled_origin = attr.evolve(listed_origins[0], enabled=False) + swh_scheduler.record_listed_origins([disabled_origin]) + + ret = swh_scheduler.update_metrics(lister_id=disabled_origin.lister_id) + for metric in ret: + if metric.visit_type == disabled_origin.visit_type: + # We disabled one of these origins + assert metric.origins_known - metric.origins_enabled == 1 + else: + # But these are still all enabled + assert metric.origins_known == metric.origins_enabled + + def test_metrics_origins_never_visited(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + # Pretend that we've recorded a visit on one origin + visited_origin = listed_origins[0] + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url=visited_origin.url, + visit_type=visited_origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes( + "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" + ), + ), + ] + ) + + ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) + for metric in ret: + if metric.visit_type == visited_origin.visit_type: + # We visited one of these origins + assert metric.origins_known - metric.origins_never_visited == 1 + else: + # But none of these have been visited + assert metric.origins_known == metric.origins_never_visited + + def test_metrics_origins_with_pending_changes(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + # Pretend that we've recorded a visit on one origin, in the past with + # respect to the "last update" time for the origin + visited_origin = listed_origins[0] + assert visited_origin.last_update is not None + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url=visited_origin.url, + visit_type=visited_origin.visit_type, + last_eventful=visited_origin.last_update + - datetime.timedelta(days=1), + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes( + "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" + ), + ), + ] + ) + + ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) + for metric in ret: + if metric.visit_type == visited_origin.visit_type: + # We visited one of these origins, in the past + assert metric.origins_with_pending_changes == 1 + else: + # But none of these have been visited + assert metric.origins_with_pending_changes == 0 + + def test_update_metrics_explicit_lister(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + fake_uuid = uuid.uuid4() + assert all(fake_uuid != origin.lister_id for origin in listed_origins) + + ret = swh_scheduler.update_metrics(lister_id=fake_uuid) + + assert len(ret) == 0 + + def test_update_metrics_explicit_timestamp(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + ts = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + + ret = swh_scheduler.update_metrics(timestamp=ts) + + assert all(metric.last_update == ts for metric in ret) + + def test_update_metrics_twice(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + ts = utcnow() + ret = swh_scheduler.update_metrics(timestamp=ts) + assert all(metric.last_update == ts for metric in ret) + + second_ts = ts + datetime.timedelta(seconds=1) + ret = swh_scheduler.update_metrics(timestamp=second_ts) + assert all(metric.last_update == second_ts for metric in ret) + + def test_get_metrics(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics() + assert_metrics_equal(updated, retrieved) + + def test_get_metrics_by_lister(self, swh_scheduler, listed_origins): + lister_id = listed_origins[0].lister_id + assert lister_id is not None + + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics(lister_id=lister_id) + assert len(retrieved) > 0 + + assert_metrics_equal( + [metric for metric in updated if metric.lister_id == lister_id], retrieved + ) + + def test_get_metrics_by_visit_type(self, swh_scheduler, listed_origins): + visit_type = listed_origins[0].visit_type + assert visit_type is not None + + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics(visit_type=visit_type) + assert len(retrieved) > 0 + + assert_metrics_equal( + [metric for metric in updated if metric.visit_type == visit_type], retrieved + )