Page MenuHomeSoftware Heritage

D3289.id.diff
No OneTemporary

D3289.id.diff

diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -11,14 +11,14 @@
import psycopg2.pool
import psycopg2.extras
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Iterable, List, Optional
from psycopg2.extensions import AsIs
from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
from .exc import StaleData
-from .model import Lister
+from .model import Lister, ListedOrigin
logger = logging.getLogger(__name__)
@@ -197,6 +197,42 @@
return Lister(**updated)
+ @db_transaction()
+ def record_listed_origins(
+ self, listed_origins: Iterable[ListedOrigin], db=None, cur=None
+ ) -> List[ListedOrigin]:
+ """Record a set of origins that a lister has listed.
+
+ This performs an "upsert": origins with the same (lister_id, url,
+ visit_type) values are updated with new values for
+ extra_loader_arguments, last_update and last_seen.
+ """
+
+ pk_cols = ListedOrigin.primary_key_columns()
+ select_cols = ListedOrigin.select_columns()
+ insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars()
+
+ upsert_cols = [col for col in insert_cols if col not in pk_cols]
+ upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols)
+
+ query = f"""INSERT into listed_origins ({", ".join(insert_cols)})
+ VALUES %s
+ ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE
+ SET {upsert_set}
+ RETURNING {", ".join(select_cols)}
+ """
+
+ ret = psycopg2.extras.execute_values(
+ cur=cur,
+ sql=query,
+ argslist=(attr.asdict(origin) for origin in listed_origins),
+ template=f"({', '.join(insert_meta)})",
+ page_size=1000,
+ fetch=True,
+ )
+
+ return [ListedOrigin(**d) for d in ret]
+
task_create_keys = [
"type",
"arguments",
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -3,11 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Iterable, List, Optional
from swh.core.api import remote_api_endpoint
-from swh.scheduler.model import Lister
+from swh.scheduler.model import ListedOrigin, Lister
class SchedulerInterface:
@@ -273,6 +273,18 @@
"""
...
+ @remote_api_endpoint("origins/record")
+ def record_listed_origins(
+ self, listed_origins: Iterable[ListedOrigin]
+ ) -> List[ListedOrigin]:
+ """Record a set of origins that a lister has listed.
+
+ This performs an "upsert": origins with the same (lister_id, url,
+ visit_type) values are updated with new values for
+ extra_loader_arguments, last_update and last_seen.
+ """
+ ...
+
@remote_api_endpoint("priority_ratios/get")
def get_priority_ratios(self):
...
diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py
--- a/swh/scheduler/model.py
+++ b/swh/scheduler/model.py
@@ -123,3 +123,40 @@
default=None,
metadata={"auto_now": True},
)
+
+
+@attr.s
+class ListedOrigin(BaseSchedulerModel):
+ """Basic information about a listed origin, output by a lister"""
+
+ lister_id = attr.ib(
+ type=UUID, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ url = attr.ib(
+ type=str, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ visit_type = attr.ib(
+ type=str, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ extra_loader_arguments = attr.ib(
+ type=Dict[str, str], validator=[type_validator()], factory=dict
+ )
+
+ last_update = attr.ib(
+ type=Optional[datetime.datetime], validator=[type_validator()], default=None,
+ )
+
+ enabled = attr.ib(type=bool, validator=[type_validator()], default=True)
+
+ first_seen = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now_add": True},
+ )
+ last_seen = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now": True},
+ )
diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql
--- a/swh/scheduler/sql/30-swh-schema.sql
+++ b/swh/scheduler/sql/30-swh-schema.sql
@@ -126,3 +126,36 @@
comment on column listers.created is 'Timestamp at which the lister was originally created';
comment on column listers.current_state is 'Known current state of this lister';
comment on column listers.updated is 'Timestamp at which the lister state was last updated';
+
+
+create table if not exists listed_origins (
+ -- Basic information
+ lister_id uuid not null references listers(id),
+ url text not null,
+ visit_type text not null,
+ extra_loader_arguments jsonb not null,
+
+ -- Whether this origin still exists or not
+ enabled boolean not null,
+
+ -- time-based information
+ first_seen timestamptz not null default now(),
+ last_seen timestamptz not null,
+
+ -- potentially provided by the lister
+ last_update timestamptz,
+
+ primary key (lister_id, url, visit_type)
+);
+
+comment on table listed_origins is 'Origins known to the origin visit scheduler';
+comment on column listed_origins.lister_id is 'Lister instance which owns this origin';
+comment on column listed_origins.url is 'URL of the origin listed';
+comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url';
+comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin';
+
+comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.';
+comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister';
+comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister';
+
+comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote';
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -3,17 +3,17 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-
import os
import pytest
import glob
-from datetime import timedelta
+from datetime import datetime, timedelta, timezone
import pkg_resources
+from typing import List
from swh.core.utils import numfile_sortkey as sortkey
from swh.scheduler import get_scheduler
from swh.scheduler.tests import SQL_DIR
-from swh.scheduler.model import Lister
+from swh.scheduler.model import ListedOrigin, Lister
from swh.scheduler.tests.common import LISTERS
@@ -118,3 +118,17 @@
def stored_lister(swh_scheduler) -> Lister:
"""Store a lister in the scheduler and return its information"""
return swh_scheduler.get_or_create_lister(**LISTERS[0])
+
+
+@pytest.fixture
+def listed_origins(stored_lister) -> List[ListedOrigin]:
+ """Return a (fixed) set of 1000 listed origins"""
+ return [
+ ListedOrigin(
+ lister_id=stored_lister.id,
+ url=f"https://example.com/{i:04d}.git",
+ visit_type="git",
+ last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc),
+ )
+ for i in range(1000)
+ ]
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -44,6 +44,7 @@
for rule in (
"lister/get_or_create",
"lister/update",
+ "origins/record",
"priority_ratios/get",
"task/create",
"task/delete_archived",
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -645,6 +645,34 @@
swh_scheduler.update_lister(stored_lister)
assert "state not updated" in exc.value.args[0]
+ def test_record_listed_origins(self, swh_scheduler, listed_origins):
+ ret = swh_scheduler.record_listed_origins(listed_origins)
+
+ assert set(returned.url for returned in ret) == set(
+ origin.url for origin in listed_origins
+ )
+
+ assert all(origin.first_seen == origin.last_seen for origin in ret)
+
+ def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins):
+ # First, insert `cutoff` origins
+ cutoff = 100
+ assert cutoff < len(listed_origins)
+
+ ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff])
+ assert len(ret) == cutoff
+
+ # Then, insert all origins, including the `cutoff` first.
+ ret = swh_scheduler.record_listed_origins(listed_origins)
+
+ assert len(ret) == len(listed_origins)
+
+ # Two different "first seen" values
+ assert len(set(origin.first_seen for origin in ret)) == 2
+
+ # But a single "last seen" value
+ assert len(set(origin.last_seen for origin in ret)) == 1
+
def _create_task_types(self, scheduler):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:57 PM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219245

Event Timeline