Page MenuHomeSoftware Heritage

D3271.id11648.diff
No OneTemporary

D3271.id11648.diff

diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,6 +4,7 @@
arrow
attrs
+attrs-strict
celery >= 4
# Not a direct dependency: missing from celery 4.4.4
future >= 0.18.0
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2019 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -7,6 +7,7 @@
import logging
from arrow import Arrow, utcnow
+import attr
import psycopg2.pool
import psycopg2.extras
@@ -16,6 +17,8 @@
from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
+from .exc import StaleData
+from .model import Lister
logger = logging.getLogger(__name__)
@@ -26,6 +29,7 @@
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
psycopg2.extensions.register_adapter(Arrow, adapt_arrow)
+psycopg2.extras.register_uuid()
def format_query(query, keys):
@@ -130,6 +134,69 @@
cur.execute(query)
return cur.fetchall()
+ @db_transaction()
+ def get_or_create_lister(
+ self, name: str, instance_name: Optional[str] = None, db=None, cur=None
+ ) -> Lister:
+ """Retrieve information about the given instance of the lister from the
+ database, or create the entry if it did not exist.
+ """
+
+ if instance_name is None:
+ instance_name = ""
+
+ select_cols = ", ".join(Lister.select_columns())
+ insert_cols, insert_meta = (
+ ", ".join(tup) for tup in Lister.insert_columns_and_metavars()
+ )
+
+ query = f"""
+ with added as (
+ insert into listers ({insert_cols}) values ({insert_meta})
+ on conflict do nothing
+ returning {select_cols}
+ )
+ select {select_cols} from added
+ union all
+ select {select_cols} from listers
+ where (name, instance_name) = (%(name)s, %(instance_name)s);
+ """
+
+ cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name)))
+
+ return Lister(**cur.fetchone())
+
+ @db_transaction()
+ def update_lister(self, lister: Lister, db=None, cur=None) -> Lister:
+ """Update the state for the given lister instance in the database.
+
+ Returns:
+ a new Lister object, with all fields updated from the database
+
+ Raises:
+ StaleData if the `updated` timestamp for the lister instance in
+ database doesn't match the one passed by the user.
+ """
+
+ select_cols = ", ".join(Lister.select_columns())
+ set_vars = ", ".join(
+ f"{col} = {meta}"
+ for col, meta in zip(*Lister.insert_columns_and_metavars())
+ )
+
+ query = f"""update listers
+ set {set_vars}
+ where id=%(id)s and updated=%(updated)s
+ returning {select_cols}"""
+
+ cur.execute(query, attr.asdict(lister))
+ updated = cur.fetchone()
+
+ if not updated:
+ raise StaleData("Stale data; Lister state not updated")
+
+ return Lister(**updated)
+
task_create_keys = [
"type",
"arguments",
diff --git a/swh/scheduler/exc.py b/swh/scheduler/exc.py
--- a/swh/scheduler/exc.py
+++ b/swh/scheduler/exc.py
@@ -5,8 +5,13 @@
__all__ = [
"SchedulerException",
+ "StaleData",
]
class SchedulerException(Exception):
pass
+
+
+class StaleData(SchedulerException):
+ pass
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -7,6 +7,8 @@
from swh.core.api import remote_api_endpoint
+from swh.scheduler.model import Lister
+
class SchedulerInterface:
@remote_api_endpoint("task_type/create")
@@ -249,6 +251,28 @@
"""Search task run for a task id"""
...
+ @remote_api_endpoint("lister/get_or_create")
+ def get_or_create_lister(
+ self, name: str, instance_name: Optional[str] = None
+ ) -> Lister:
+ """Retrieve information about the given instance of the lister from the
+ database, or create the entry if it did not exist.
+ """
+ ...
+
+ @remote_api_endpoint("lister/update")
+ def update_lister(self, lister: Lister) -> Lister:
+ """Update the state for the given lister instance in the database.
+
+ Returns:
+ a new Lister object, with all fields updated from the database
+
+ Raises:
+ StaleData if the `updated` timestamp for the lister instance in
+ database doesn't match the one passed by the user.
+ """
+ ...
+
@remote_api_endpoint("priority_ratios/get")
def get_priority_ratios(self):
...
diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py
--- a/swh/scheduler/model.py
+++ b/swh/scheduler/model.py
@@ -3,10 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import List, Optional, Tuple
+import datetime
+from uuid import UUID
+from typing import Any, Dict, List, Optional, Tuple
import attr
import attr.converters
+from attrs_strict import type_validator
@attr.s
@@ -55,3 +58,33 @@
cls._insert_cols_and_metavars = cols, metavars
return cls._insert_cols_and_metavars
+
+
+@attr.s
+class Lister(BaseSchedulerModel):
+ name = attr.ib(type=str, validator=[type_validator()])
+ instance_name = attr.ib(type=str, validator=[type_validator()], factory=str)
+
+ # Populated by database
+ id = attr.ib(
+ type=Optional[UUID],
+ validator=type_validator(),
+ default=None,
+ metadata={"primary_key": True},
+ )
+
+ current_state = attr.ib(
+ type=Dict[str, Any], validator=[type_validator()], factory=dict
+ )
+ created = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now_add": True},
+ )
+ updated = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now": True},
+ )
diff --git a/swh/scheduler/sql/10-swh-init.sql b/swh/scheduler/sql/10-swh-init.sql
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/sql/10-swh-init.sql
@@ -0,0 +1 @@
+CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql
--- a/swh/scheduler/sql/30-swh-schema.sql
+++ b/swh/scheduler/sql/30-swh-schema.sql
@@ -110,3 +110,19 @@
comment on column task_run.scheduled is 'Scheduled run time for task';
comment on column task_run.started is 'Task starting time';
comment on column task_run.ended is 'Task ending time';
+
+create table if not exists listers (
+ id uuid primary key default uuid_generate_v4(),
+ name text not null,
+ instance_name text not null,
+ created timestamptz not null default now(), -- auto_now_add in the model
+ current_state jsonb not null,
+ updated timestamptz not null
+);
+
+comment on table listers is 'Lister instances known to the origin visit scheduler';
+comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)';
+comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)';
+comment on column listers.created is 'Timestamp at which the lister was originally created';
+comment on column listers.current_state is 'Known current state of this lister';
+comment on column listers.updated is 'Timestamp at which the lister state was last updated';
diff --git a/swh/scheduler/sql/60-swh-indexes.sql b/swh/scheduler/sql/60-swh-indexes.sql
--- a/swh/scheduler/sql/60-swh-indexes.sql
+++ b/swh/scheduler/sql/60-swh-indexes.sql
@@ -11,3 +11,6 @@
create index task_run_id_asc_idx on task_run(task asc, started asc);
+
+-- lister schema
+create unique index on listers (name, instance_name);
diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py
--- a/swh/scheduler/tests/common.py
+++ b/swh/scheduler/tests/common.py
@@ -93,3 +93,12 @@
)
)
return tasks
+
+
+LISTERS = (
+ {"name": "github"},
+ {"name": "gitlab", "instance_name": "gitlab"},
+ {"name": "gitlab", "instance_name": "freedesktop"},
+ {"name": "npm"},
+ {"name": "pypi"},
+)
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -42,6 +42,8 @@
expected_rules = set(
"/" + rule
for rule in (
+ "lister/get_or_create",
+ "lister/update",
"priority_ratios/get",
"task/create",
"task/delete_archived",
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -13,10 +13,13 @@
from typing import Any, Dict
from arrow import utcnow
+import attr
+import pytest
+from swh.scheduler.exc import StaleData
from swh.scheduler.interface import SchedulerInterface
-from .common import tasks_from_template, TEMPLATES, TASK_TYPES
+from .common import tasks_from_template, TEMPLATES, TASK_TYPES, LISTERS
def subdict(d, keys=None, excl=()):
@@ -612,6 +615,40 @@
"status": "eventful",
}
+ def test_get_or_create_lister(self, swh_scheduler):
+ db_listers = []
+ for lister_args in LISTERS:
+ db_listers.append(swh_scheduler.get_or_create_lister(**lister_args))
+
+ for lister, lister_args in zip(db_listers, LISTERS):
+ assert lister.name == lister_args["name"]
+ assert lister.instance_name == lister_args.get("instance_name", "")
+
+ lister_get_again = swh_scheduler.get_or_create_lister(
+ lister.name, lister.instance_name
+ )
+
+ assert lister == lister_get_again
+
+ def test_update_lister(self, swh_scheduler):
+ lister = swh_scheduler.get_or_create_lister(**LISTERS[0])
+
+ lister.current_state = {"updated": "now"}
+
+ updated_lister = swh_scheduler.update_lister(lister)
+
+ assert updated_lister.updated > lister.updated
+ assert updated_lister == attr.evolve(lister, updated=updated_lister.updated)
+
+ def test_update_lister_stale(self, swh_scheduler):
+ lister = swh_scheduler.get_or_create_lister(**LISTERS[0])
+
+ swh_scheduler.update_lister(lister)
+
+ with pytest.raises(StaleData) as exc:
+ swh_scheduler.update_lister(lister)
+ assert "state not updated" in exc.value.args[0]
+
def _create_task_types(self, scheduler):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 11:57 AM (1 w, 23 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219801

Event Timeline