diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -11,14 +11,14 @@ import psycopg2.pool import psycopg2.extras -from typing import Any, Dict, Optional +from typing import Any, Dict, Iterable, List, Optional from psycopg2.extensions import AsIs from swh.core.db import BaseDb from swh.core.db.common import db_transaction from .exc import StaleData -from .model import Lister +from .model import Lister, ListedOrigin logger = logging.getLogger(__name__) @@ -197,6 +197,42 @@ return Lister(**updated) + @db_transaction() + def record_listed_origins( + self, listed_origins: Iterable[ListedOrigin], db=None, cur=None + ) -> List[ListedOrigin]: + """Record a set of origins that a lister has listed. + + This performs an "upsert": origins with the same (lister_id, url, + visit_type) values are updated with new values for + extra_loader_arguments, last_update and last_seen. + """ + + pk_cols = ListedOrigin.primary_key_columns() + select_cols = ListedOrigin.select_columns() + insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars() + + upsert_cols = [col for col in insert_cols if col not in pk_cols] + upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols) + + query = f"""INSERT into listed_origins ({", ".join(insert_cols)}) + VALUES %s + ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE + SET {upsert_set} + RETURNING {", ".join(select_cols)} + """ + + ret = psycopg2.extras.execute_values( + cur=cur, + sql=query, + argslist=(attr.asdict(origin) for origin in listed_origins), + template=f"({', '.join(insert_meta)})", + page_size=1000, + fetch=True, + ) + + return [ListedOrigin(**d) for d in ret] + task_create_keys = [ "type", "arguments", diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -3,11 +3,11 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Optional +from typing import Any, Dict, Iterable, List, Optional from swh.core.api import remote_api_endpoint -from swh.scheduler.model import Lister +from swh.scheduler.model import ListedOrigin, Lister class SchedulerInterface: @@ -273,6 +273,18 @@ """ ... + @remote_api_endpoint("origins/record") + def record_listed_origins( + self, listed_origins: Iterable[ListedOrigin] + ) -> List[ListedOrigin]: + """Record a set of origins that a lister has listed. + + This performs an "upsert": origins with the same (lister_id, url, + visit_type) values are updated with new values for + extra_loader_arguments, last_update and last_seen. + """ + ... + @remote_api_endpoint("priority_ratios/get") def get_priority_ratios(self): ... diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -123,3 +123,40 @@ default=None, metadata={"auto_now": True}, ) + + +@attr.s +class ListedOrigin(BaseSchedulerModel): + """Basic information about a listed origin, output by a lister""" + + lister_id = attr.ib( + type=UUID, validator=[type_validator()], metadata={"primary_key": True} + ) + url = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + visit_type = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + extra_loader_arguments = attr.ib( + type=Dict[str, str], validator=[type_validator()], factory=dict + ) + + last_update = attr.ib( + type=Optional[datetime.datetime], validator=[type_validator()], default=None, + ) + + enabled = attr.ib(type=bool, validator=[type_validator()], default=True) + + first_seen = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now_add": True}, + ) + last_seen = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now": True}, + ) diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql --- a/swh/scheduler/sql/30-swh-schema.sql +++ b/swh/scheduler/sql/30-swh-schema.sql @@ -126,3 +126,36 @@ comment on column listers.created is 'Timestamp at which the lister was originally created'; comment on column listers.current_state is 'Known current state of this lister'; comment on column listers.updated is 'Timestamp at which the lister state was last updated'; + + +create table if not exists listed_origins ( + -- Basic information + lister_id uuid not null references listers(id), + url text not null, + visit_type text not null, + extra_loader_arguments jsonb not null, + + -- Whether this origin still exists or not + enabled boolean not null, + + -- time-based information + first_seen timestamptz not null default now(), + last_seen timestamptz not null, + + -- potentially provided by the lister + last_update timestamptz, + + primary key (lister_id, url, visit_type) +); + +comment on table listed_origins is 'Origins known to the origin visit scheduler'; +comment on column listed_origins.lister_id is 'Lister instance which owns this origin'; +comment on column listed_origins.url is 'URL of the origin listed'; +comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url'; +comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin'; + +comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.'; +comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister'; +comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister'; + +comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote'; diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -3,17 +3,17 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - import os import pytest import glob -from datetime import timedelta +from datetime import datetime, timedelta, timezone import pkg_resources +from typing import List from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR -from swh.scheduler.model import Lister +from swh.scheduler.model import ListedOrigin, Lister from swh.scheduler.tests.common import LISTERS @@ -118,3 +118,17 @@ def stored_lister(swh_scheduler) -> Lister: """Store a lister in the scheduler and return its information""" return swh_scheduler.get_or_create_lister(**LISTERS[0]) + + +@pytest.fixture +def listed_origins(stored_lister) -> List[ListedOrigin]: + """Return a (fixed) set of 1000 listed origins""" + return [ + ListedOrigin( + lister_id=stored_lister.id, + url=f"https://example.com/{i:04d}.git", + visit_type="git", + last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), + ) + for i in range(1000) + ] diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -44,6 +44,7 @@ for rule in ( "lister/get_or_create", "lister/update", + "origins/record", "priority_ratios/get", "task/create", "task/delete_archived", diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -645,6 +645,34 @@ swh_scheduler.update_lister(stored_lister) assert "state not updated" in exc.value.args[0] + def test_record_listed_origins(self, swh_scheduler, listed_origins): + ret = swh_scheduler.record_listed_origins(listed_origins) + + assert set(returned.url for returned in ret) == set( + origin.url for origin in listed_origins + ) + + assert all(origin.first_seen == origin.last_seen for origin in ret) + + def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins): + # First, insert `cutoff` origins + cutoff = 100 + assert cutoff < len(listed_origins) + + ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff]) + assert len(ret) == cutoff + + # Then, insert all origins, including the `cutoff` first. + ret = swh_scheduler.record_listed_origins(listed_origins) + + assert len(ret) == len(listed_origins) + + # Two different "first seen" values + assert len(set(origin.first_seen for origin in ret)) == 2 + + # But a single "last seen" value + assert len(set(origin.last_seen for origin in ret)) == 1 + def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt)