Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163653
D3271.id11648.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
D3271.id11648.diff
View Options
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,6 +4,7 @@
arrow
attrs
+attrs-strict
celery >= 4
# Not a direct dependency: missing from celery 4.4.4
future >= 0.18.0
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2019 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -7,6 +7,7 @@
import logging
from arrow import Arrow, utcnow
+import attr
import psycopg2.pool
import psycopg2.extras
@@ -16,6 +17,8 @@
from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
+from .exc import StaleData
+from .model import Lister
logger = logging.getLogger(__name__)
@@ -26,6 +29,7 @@
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
psycopg2.extensions.register_adapter(Arrow, adapt_arrow)
+psycopg2.extras.register_uuid()
def format_query(query, keys):
@@ -130,6 +134,69 @@
cur.execute(query)
return cur.fetchall()
+ @db_transaction()
+ def get_or_create_lister(
+ self, name: str, instance_name: Optional[str] = None, db=None, cur=None
+ ) -> Lister:
+ """Retrieve information about the given instance of the lister from the
+ database, or create the entry if it did not exist.
+ """
+
+ if instance_name is None:
+ instance_name = ""
+
+ select_cols = ", ".join(Lister.select_columns())
+ insert_cols, insert_meta = (
+ ", ".join(tup) for tup in Lister.insert_columns_and_metavars()
+ )
+
+ query = f"""
+ with added as (
+ insert into listers ({insert_cols}) values ({insert_meta})
+ on conflict do nothing
+ returning {select_cols}
+ )
+ select {select_cols} from added
+ union all
+ select {select_cols} from listers
+ where (name, instance_name) = (%(name)s, %(instance_name)s);
+ """
+
+ cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name)))
+
+ return Lister(**cur.fetchone())
+
+ @db_transaction()
+ def update_lister(self, lister: Lister, db=None, cur=None) -> Lister:
+ """Update the state for the given lister instance in the database.
+
+ Returns:
+ a new Lister object, with all fields updated from the database
+
+ Raises:
+ StaleData if the `updated` timestamp for the lister instance in
+ database doesn't match the one passed by the user.
+ """
+
+ select_cols = ", ".join(Lister.select_columns())
+ set_vars = ", ".join(
+ f"{col} = {meta}"
+ for col, meta in zip(*Lister.insert_columns_and_metavars())
+ )
+
+ query = f"""update listers
+ set {set_vars}
+ where id=%(id)s and updated=%(updated)s
+ returning {select_cols}"""
+
+ cur.execute(query, attr.asdict(lister))
+ updated = cur.fetchone()
+
+ if not updated:
+ raise StaleData("Stale data; Lister state not updated")
+
+ return Lister(**updated)
+
task_create_keys = [
"type",
"arguments",
diff --git a/swh/scheduler/exc.py b/swh/scheduler/exc.py
--- a/swh/scheduler/exc.py
+++ b/swh/scheduler/exc.py
@@ -5,8 +5,13 @@
__all__ = [
"SchedulerException",
+ "StaleData",
]
class SchedulerException(Exception):
pass
+
+
+class StaleData(SchedulerException):
+ pass
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -7,6 +7,8 @@
from swh.core.api import remote_api_endpoint
+from swh.scheduler.model import Lister
+
class SchedulerInterface:
@remote_api_endpoint("task_type/create")
@@ -249,6 +251,28 @@
"""Search task run for a task id"""
...
+ @remote_api_endpoint("lister/get_or_create")
+ def get_or_create_lister(
+ self, name: str, instance_name: Optional[str] = None
+ ) -> Lister:
+ """Retrieve information about the given instance of the lister from the
+ database, or create the entry if it did not exist.
+ """
+ ...
+
+ @remote_api_endpoint("lister/update")
+ def update_lister(self, lister: Lister) -> Lister:
+ """Update the state for the given lister instance in the database.
+
+ Returns:
+ a new Lister object, with all fields updated from the database
+
+ Raises:
+ StaleData if the `updated` timestamp for the lister instance in
+ database doesn't match the one passed by the user.
+ """
+ ...
+
@remote_api_endpoint("priority_ratios/get")
def get_priority_ratios(self):
...
diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py
--- a/swh/scheduler/model.py
+++ b/swh/scheduler/model.py
@@ -3,10 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import List, Optional, Tuple
+import datetime
+from uuid import UUID
+from typing import Any, Dict, List, Optional, Tuple
import attr
import attr.converters
+from attrs_strict import type_validator
@attr.s
@@ -55,3 +58,33 @@
cls._insert_cols_and_metavars = cols, metavars
return cls._insert_cols_and_metavars
+
+
+@attr.s
+class Lister(BaseSchedulerModel):
+ name = attr.ib(type=str, validator=[type_validator()])
+ instance_name = attr.ib(type=str, validator=[type_validator()], factory=str)
+
+ # Populated by database
+ id = attr.ib(
+ type=Optional[UUID],
+ validator=type_validator(),
+ default=None,
+ metadata={"primary_key": True},
+ )
+
+ current_state = attr.ib(
+ type=Dict[str, Any], validator=[type_validator()], factory=dict
+ )
+ created = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now_add": True},
+ )
+ updated = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now": True},
+ )
diff --git a/swh/scheduler/sql/10-swh-init.sql b/swh/scheduler/sql/10-swh-init.sql
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/sql/10-swh-init.sql
@@ -0,0 +1 @@
+CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql
--- a/swh/scheduler/sql/30-swh-schema.sql
+++ b/swh/scheduler/sql/30-swh-schema.sql
@@ -110,3 +110,19 @@
comment on column task_run.scheduled is 'Scheduled run time for task';
comment on column task_run.started is 'Task starting time';
comment on column task_run.ended is 'Task ending time';
+
+create table if not exists listers (
+ id uuid primary key default uuid_generate_v4(),
+ name text not null,
+ instance_name text not null,
+ created timestamptz not null default now(), -- auto_now_add in the model
+ current_state jsonb not null,
+ updated timestamptz not null
+);
+
+comment on table listers is 'Lister instances known to the origin visit scheduler';
+comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)';
+comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)';
+comment on column listers.created is 'Timestamp at which the lister was originally created';
+comment on column listers.current_state is 'Known current state of this lister';
+comment on column listers.updated is 'Timestamp at which the lister state was last updated';
diff --git a/swh/scheduler/sql/60-swh-indexes.sql b/swh/scheduler/sql/60-swh-indexes.sql
--- a/swh/scheduler/sql/60-swh-indexes.sql
+++ b/swh/scheduler/sql/60-swh-indexes.sql
@@ -11,3 +11,6 @@
create index task_run_id_asc_idx on task_run(task asc, started asc);
+
+-- lister schema
+create unique index on listers (name, instance_name);
diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py
--- a/swh/scheduler/tests/common.py
+++ b/swh/scheduler/tests/common.py
@@ -93,3 +93,12 @@
)
)
return tasks
+
+
+LISTERS = (
+ {"name": "github"},
+ {"name": "gitlab", "instance_name": "gitlab"},
+ {"name": "gitlab", "instance_name": "freedesktop"},
+ {"name": "npm"},
+ {"name": "pypi"},
+)
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -42,6 +42,8 @@
expected_rules = set(
"/" + rule
for rule in (
+ "lister/get_or_create",
+ "lister/update",
"priority_ratios/get",
"task/create",
"task/delete_archived",
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -13,10 +13,13 @@
from typing import Any, Dict
from arrow import utcnow
+import attr
+import pytest
+from swh.scheduler.exc import StaleData
from swh.scheduler.interface import SchedulerInterface
-from .common import tasks_from_template, TEMPLATES, TASK_TYPES
+from .common import tasks_from_template, TEMPLATES, TASK_TYPES, LISTERS
def subdict(d, keys=None, excl=()):
@@ -612,6 +615,40 @@
"status": "eventful",
}
+ def test_get_or_create_lister(self, swh_scheduler):
+ db_listers = []
+ for lister_args in LISTERS:
+ db_listers.append(swh_scheduler.get_or_create_lister(**lister_args))
+
+ for lister, lister_args in zip(db_listers, LISTERS):
+ assert lister.name == lister_args["name"]
+ assert lister.instance_name == lister_args.get("instance_name", "")
+
+ lister_get_again = swh_scheduler.get_or_create_lister(
+ lister.name, lister.instance_name
+ )
+
+ assert lister == lister_get_again
+
+ def test_update_lister(self, swh_scheduler):
+ lister = swh_scheduler.get_or_create_lister(**LISTERS[0])
+
+ lister.current_state = {"updated": "now"}
+
+ updated_lister = swh_scheduler.update_lister(lister)
+
+ assert updated_lister.updated > lister.updated
+ assert updated_lister == attr.evolve(lister, updated=updated_lister.updated)
+
+ def test_update_lister_stale(self, swh_scheduler):
+ lister = swh_scheduler.get_or_create_lister(**LISTERS[0])
+
+ swh_scheduler.update_lister(lister)
+
+ with pytest.raises(StaleData) as exc:
+ swh_scheduler.update_lister(lister)
+ assert "state not updated" in exc.value.args[0]
+
def _create_task_types(self, scheduler):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 11:57 AM (1 w, 23 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219801
Attached To
D3271: Implement basic storage of lister information
Event Timeline
Log In to Comment