diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ arrow attrs +attrs-strict celery >= 4 # Not a direct dependency: missing from celery 4.4.4 future >= 0.18.0 diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -16,6 +16,7 @@ from swh.core.db import BaseDb from swh.core.db.common import db_transaction +from .model import Lister logger = logging.getLogger(__name__) @@ -26,6 +27,7 @@ psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) +psycopg2.extras.register_uuid() def format_query(query, keys): @@ -130,6 +132,33 @@ cur.execute(query) return cur.fetchall() + @db_transaction() + def get_or_create_lister( + self, name: str, arguments: Optional[Dict[str, str]] = None, db=None, cur=None + ) -> Lister: + """Retrieve (or create) the lister with the given name and given arguments.""" + + if arguments is None: + arguments = {} + + columns = ",".join(Lister.columns()) + + query = f""" + with added as ( + insert into listers (name, arguments) values (%(name)s, %(arguments)s) + on conflict do nothing + returning {columns} + ) + select {columns} from added + union all + select {columns} from listers + where (name, arguments) = (%(name)s, %(arguments)s); + """ + + cur.execute(query, {"name": name, "arguments": arguments}) + + return Lister(**cur.fetchone()) + task_create_keys = [ "type", "arguments", diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -7,6 +7,8 @@ from swh.core.api import remote_api_endpoint +from swh.scheduler.model import Lister + class SchedulerInterface: @remote_api_endpoint("task_type/create") @@ -249,6 +251,13 @@ """Search task run for a task id""" ... + @remote_api_endpoint("lister/get_or_create") + def get_or_create_lister( + self, name: str, arguments: Optional[Dict[str, str]] = None + ) -> Lister: + """Retrieve (or create) the lister with the given name and given arguments.""" + ... + @remote_api_endpoint("priority_ratios/get") def get_priority_ratios(self): ... diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -3,10 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import List, Optional, Tuple +import datetime +from uuid import UUID +from typing import Any, Dict, List, Optional, Tuple import attr import attr.converters +from attrs_strict import type_validator @attr.s @@ -24,3 +27,29 @@ cls._columns = [c[0] for c in sorted(columns, key=lambda c: (c[1], c[0]))] return cls._columns + + +@attr.s +class Lister(BaseSchedulerModel): + name = attr.ib(type=str, validator=[type_validator()]) + arguments = attr.ib(type=Dict[str, str], validator=[type_validator()], factory=dict) + + # Populated by database + id = attr.ib( + type=Optional[UUID], + validator=type_validator(), + default=None, + metadata={"primary_key": True}, + ) + + current_state = attr.ib( + type=Optional[Dict[str, Any]], validator=[type_validator()], default=None + ) + updated = attr.ib( + type=Optional[datetime.datetime], validator=[type_validator()], default=None + ) + + @name.validator + def check_name(self, attribute, value): + if not value: + raise ValueError("Lister name cannot be empty") diff --git a/swh/scheduler/sql/10-swh-init.sql b/swh/scheduler/sql/10-swh-init.sql new file mode 100644 --- /dev/null +++ b/swh/scheduler/sql/10-swh-init.sql @@ -0,0 +1 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql --- a/swh/scheduler/sql/30-swh-schema.sql +++ b/swh/scheduler/sql/30-swh-schema.sql @@ -110,3 +110,17 @@ comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; + +create table if not exists listers ( + id uuid primary key default uuid_generate_v4(), + name text not null, + arguments jsonb not null default '{}'::jsonb, + current_state jsonb not null default '{}'::jsonb, + updated timestamptz not null default now() +); + +comment on table listers is 'Known listers for the origin_tasks mapping'; +comment on column listers.name is 'Name of the lister'; +comment on column listers.arguments is 'Arguments of the lister (e.g. for the GitLab lister, contains the lister instance name)'; +comment on column listers.current_state is 'Known current state of this lister'; +comment on column listers.updated is 'Timestamp at which the lister state was last updated'; diff --git a/swh/scheduler/sql/60-swh-indexes.sql b/swh/scheduler/sql/60-swh-indexes.sql --- a/swh/scheduler/sql/60-swh-indexes.sql +++ b/swh/scheduler/sql/60-swh-indexes.sql @@ -11,3 +11,6 @@ create index task_run_id_asc_idx on task_run(task asc, started asc); + +-- lister schema +create unique index on listers (name, arguments); diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -93,3 +93,21 @@ ) ) return tasks + + +LISTERS = ( + {"name": "github"}, + { + "name": "gitlab", + "arguments": {"url": "https://gitlab.com/api/v4", "instance": "gitlab"}, + }, + { + "name": "gitlab", + "arguments": { + "url": "https://gitlab.freedesktop.org/api/v4", + "instance": "freedesktop", + }, + }, + {"name": "npm"}, + {"name": "pypi"}, +) diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -42,6 +42,7 @@ expected_rules = set( "/" + rule for rule in ( + "lister/get_or_create", "priority_ratios/get", "task/create", "task/delete_archived", diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -16,7 +16,7 @@ from swh.scheduler.interface import SchedulerInterface -from .common import tasks_from_template, TEMPLATES, TASK_TYPES +from .common import tasks_from_template, TEMPLATES, TASK_TYPES, LISTERS def subdict(d, keys=None, excl=()): @@ -612,6 +612,21 @@ "status": "eventful", } + def test_get_or_create_lister(self, swh_scheduler): + db_listers = [] + for lister_args in LISTERS: + db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) + + for lister, lister_args in zip(db_listers, LISTERS): + assert lister.name == lister_args["name"] + assert lister.arguments == lister_args.get("arguments", {}) + + lister_get_again = swh_scheduler.get_or_create_lister( + lister.name, lister.arguments + ) + + assert lister.id == lister_get_again.id + def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt)