Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9342706
D3289.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Subscribers
None
D3289.id.diff
View Options
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -11,14 +11,14 @@
import psycopg2.pool
import psycopg2.extras
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Iterable, List, Optional
from psycopg2.extensions import AsIs
from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
from .exc import StaleData
-from .model import Lister
+from .model import Lister, ListedOrigin
logger = logging.getLogger(__name__)
@@ -197,6 +197,42 @@
return Lister(**updated)
+ @db_transaction()
+ def record_listed_origins(
+ self, listed_origins: Iterable[ListedOrigin], db=None, cur=None
+ ) -> List[ListedOrigin]:
+ """Record a set of origins that a lister has listed.
+
+ This performs an "upsert": origins with the same (lister_id, url,
+ visit_type) values are updated with new values for
+ extra_loader_arguments, last_update and last_seen.
+ """
+
+ pk_cols = ListedOrigin.primary_key_columns()
+ select_cols = ListedOrigin.select_columns()
+ insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars()
+
+ upsert_cols = [col for col in insert_cols if col not in pk_cols]
+ upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols)
+
+ query = f"""INSERT into listed_origins ({", ".join(insert_cols)})
+ VALUES %s
+ ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE
+ SET {upsert_set}
+ RETURNING {", ".join(select_cols)}
+ """
+
+ ret = psycopg2.extras.execute_values(
+ cur=cur,
+ sql=query,
+ argslist=(attr.asdict(origin) for origin in listed_origins),
+ template=f"({', '.join(insert_meta)})",
+ page_size=1000,
+ fetch=True,
+ )
+
+ return [ListedOrigin(**d) for d in ret]
+
task_create_keys = [
"type",
"arguments",
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -3,11 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Iterable, List, Optional
from swh.core.api import remote_api_endpoint
-from swh.scheduler.model import Lister
+from swh.scheduler.model import ListedOrigin, Lister
class SchedulerInterface:
@@ -273,6 +273,18 @@
"""
...
+ @remote_api_endpoint("origins/record")
+ def record_listed_origins(
+ self, listed_origins: Iterable[ListedOrigin]
+ ) -> List[ListedOrigin]:
+ """Record a set of origins that a lister has listed.
+
+ This performs an "upsert": origins with the same (lister_id, url,
+ visit_type) values are updated with new values for
+ extra_loader_arguments, last_update and last_seen.
+ """
+ ...
+
@remote_api_endpoint("priority_ratios/get")
def get_priority_ratios(self):
...
diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py
--- a/swh/scheduler/model.py
+++ b/swh/scheduler/model.py
@@ -123,3 +123,40 @@
default=None,
metadata={"auto_now": True},
)
+
+
+@attr.s
+class ListedOrigin(BaseSchedulerModel):
+ """Basic information about a listed origin, output by a lister"""
+
+ lister_id = attr.ib(
+ type=UUID, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ url = attr.ib(
+ type=str, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ visit_type = attr.ib(
+ type=str, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ extra_loader_arguments = attr.ib(
+ type=Dict[str, str], validator=[type_validator()], factory=dict
+ )
+
+ last_update = attr.ib(
+ type=Optional[datetime.datetime], validator=[type_validator()], default=None,
+ )
+
+ enabled = attr.ib(type=bool, validator=[type_validator()], default=True)
+
+ first_seen = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now_add": True},
+ )
+ last_seen = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now": True},
+ )
diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql
--- a/swh/scheduler/sql/30-swh-schema.sql
+++ b/swh/scheduler/sql/30-swh-schema.sql
@@ -126,3 +126,36 @@
comment on column listers.created is 'Timestamp at which the lister was originally created';
comment on column listers.current_state is 'Known current state of this lister';
comment on column listers.updated is 'Timestamp at which the lister state was last updated';
+
+
+create table if not exists listed_origins (
+ -- Basic information
+ lister_id uuid not null references listers(id),
+ url text not null,
+ visit_type text not null,
+ extra_loader_arguments jsonb not null,
+
+ -- Whether this origin still exists or not
+ enabled boolean not null,
+
+ -- time-based information
+ first_seen timestamptz not null default now(),
+ last_seen timestamptz not null,
+
+ -- potentially provided by the lister
+ last_update timestamptz,
+
+ primary key (lister_id, url, visit_type)
+);
+
+comment on table listed_origins is 'Origins known to the origin visit scheduler';
+comment on column listed_origins.lister_id is 'Lister instance which owns this origin';
+comment on column listed_origins.url is 'URL of the origin listed';
+comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url';
+comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin';
+
+comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.';
+comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister';
+comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister';
+
+comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote';
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -3,17 +3,17 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-
import os
import pytest
import glob
-from datetime import timedelta
+from datetime import datetime, timedelta, timezone
import pkg_resources
+from typing import List
from swh.core.utils import numfile_sortkey as sortkey
from swh.scheduler import get_scheduler
from swh.scheduler.tests import SQL_DIR
-from swh.scheduler.model import Lister
+from swh.scheduler.model import ListedOrigin, Lister
from swh.scheduler.tests.common import LISTERS
@@ -118,3 +118,17 @@
def stored_lister(swh_scheduler) -> Lister:
"""Store a lister in the scheduler and return its information"""
return swh_scheduler.get_or_create_lister(**LISTERS[0])
+
+
+@pytest.fixture
+def listed_origins(stored_lister) -> List[ListedOrigin]:
+ """Return a (fixed) set of 1000 listed origins"""
+ return [
+ ListedOrigin(
+ lister_id=stored_lister.id,
+ url=f"https://example.com/{i:04d}.git",
+ visit_type="git",
+ last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc),
+ )
+ for i in range(1000)
+ ]
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -44,6 +44,7 @@
for rule in (
"lister/get_or_create",
"lister/update",
+ "origins/record",
"priority_ratios/get",
"task/create",
"task/delete_archived",
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -645,6 +645,34 @@
swh_scheduler.update_lister(stored_lister)
assert "state not updated" in exc.value.args[0]
+ def test_record_listed_origins(self, swh_scheduler, listed_origins):
+ ret = swh_scheduler.record_listed_origins(listed_origins)
+
+ assert set(returned.url for returned in ret) == set(
+ origin.url for origin in listed_origins
+ )
+
+ assert all(origin.first_seen == origin.last_seen for origin in ret)
+
+ def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins):
+ # First, insert `cutoff` origins
+ cutoff = 100
+ assert cutoff < len(listed_origins)
+
+ ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff])
+ assert len(ret) == cutoff
+
+ # Then, insert all origins, including the `cutoff` first.
+ ret = swh_scheduler.record_listed_origins(listed_origins)
+
+ assert len(ret) == len(listed_origins)
+
+ # Two different "first seen" values
+ assert len(set(origin.first_seen for origin in ret)) == 2
+
+ # But a single "last seen" value
+ assert len(set(origin.last_seen for origin in ret)) == 1
+
def _create_task_types(self, scheduler):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 12:57 PM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219245
Attached To
D3289: Implement storage of listed origins
Event Timeline
Log In to Comment