diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ arrow attrs +attrs-strict celery >= 4 # Not a direct dependency: missing from celery 4.4.4 future >= 0.18.0 diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -7,6 +7,7 @@ import logging from arrow import Arrow, utcnow +import attr import psycopg2.pool import psycopg2.extras @@ -16,6 +17,7 @@ from swh.core.db import BaseDb from swh.core.db.common import db_transaction +from .model import Lister logger = logging.getLogger(__name__) @@ -26,6 +28,7 @@ psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) +psycopg2.extras.register_uuid() def format_query(query, keys): @@ -130,6 +133,36 @@ cur.execute(query) return cur.fetchall() + @db_transaction() + def get_or_create_lister( + self, name: str, arguments: Optional[Dict[str, str]] = None, db=None, cur=None + ) -> Lister: + """Retrieve (or create) the lister with the given name and given arguments.""" + + if arguments is None: + arguments = {} + + select_cols = ", ".join(Lister.select_columns()) + insert_cols, insert_meta = ( + ", ".join(tup) for tup in Lister.insert_columns_and_metavars() + ) + + query = f""" + with added as ( + insert into listers ({insert_cols}) values ({insert_meta}) + on conflict do nothing + returning {select_cols} + ) + select {select_cols} from added + union all + select {select_cols} from listers + where (name, arguments) = (%(name)s, %(arguments)s); + """ + + cur.execute(query, attr.asdict(Lister(name=name, arguments=arguments))) + + return Lister(**cur.fetchone()) + task_create_keys = [ "type", "arguments", diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -7,6 +7,8 @@ from swh.core.api import remote_api_endpoint +from swh.scheduler.model import Lister + class SchedulerInterface: @remote_api_endpoint("task_type/create") @@ -249,6 +251,13 @@ """Search task run for a task id""" ... + @remote_api_endpoint("lister/get_or_create") + def get_or_create_lister( + self, name: str, arguments: Optional[Dict[str, str]] = None + ) -> Lister: + """Retrieve (or create) the lister with the given name and given arguments.""" + ... + @remote_api_endpoint("priority_ratios/get") def get_priority_ratios(self): ... diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -3,10 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import List, Optional, Tuple +import datetime +from uuid import UUID +from typing import Any, Dict, List, Optional, Tuple import attr import attr.converters +from attrs_strict import type_validator @attr.s @@ -55,3 +58,38 @@ cls._insert_cols_and_metavars = cols, metavars return cls._insert_cols_and_metavars + + +@attr.s +class Lister(BaseSchedulerModel): + name = attr.ib(type=str, validator=[type_validator()]) + arguments = attr.ib(type=Dict[str, str], validator=[type_validator()], factory=dict) + + # Populated by database + id = attr.ib( + type=Optional[UUID], + validator=type_validator(), + default=None, + metadata={"primary_key": True}, + ) + + current_state = attr.ib( + type=Dict[str, Any], validator=[type_validator()], factory=dict + ) + created = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now_add": True}, + ) + updated = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now": True}, + ) + + @name.validator + def check_name(self, attribute, value): + if not value: + raise ValueError("Lister name cannot be empty") diff --git a/swh/scheduler/sql/10-swh-init.sql b/swh/scheduler/sql/10-swh-init.sql new file mode 100644 --- /dev/null +++ b/swh/scheduler/sql/10-swh-init.sql @@ -0,0 +1 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql --- a/swh/scheduler/sql/30-swh-schema.sql +++ b/swh/scheduler/sql/30-swh-schema.sql @@ -110,3 +110,19 @@ comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; + +create table if not exists listers ( + id uuid primary key default uuid_generate_v4(), + name text not null, + arguments jsonb not null, + created timestamptz not null default now(), -- auto_now_add in the model + current_state jsonb not null, + updated timestamptz not null +); + +comment on table listers is 'Known listers for the origin_tasks mapping'; +comment on column listers.name is 'Name of the lister'; +comment on column listers.arguments is 'Arguments of the lister (e.g. for the GitLab lister, contains the lister instance name)'; +comment on column listers.created is 'Timestamp at which the lister was originally created'; +comment on column listers.current_state is 'Known current state of this lister'; +comment on column listers.updated is 'Timestamp at which the lister state was last updated'; diff --git a/swh/scheduler/sql/60-swh-indexes.sql b/swh/scheduler/sql/60-swh-indexes.sql --- a/swh/scheduler/sql/60-swh-indexes.sql +++ b/swh/scheduler/sql/60-swh-indexes.sql @@ -11,3 +11,6 @@ create index task_run_id_asc_idx on task_run(task asc, started asc); + +-- lister schema +create unique index on listers (name, arguments); diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -93,3 +93,21 @@ ) ) return tasks + + +LISTERS = ( + {"name": "github"}, + { + "name": "gitlab", + "arguments": {"url": "https://gitlab.com/api/v4", "instance": "gitlab"}, + }, + { + "name": "gitlab", + "arguments": { + "url": "https://gitlab.freedesktop.org/api/v4", + "instance": "freedesktop", + }, + }, + {"name": "npm"}, + {"name": "pypi"}, +) diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -42,6 +42,7 @@ expected_rules = set( "/" + rule for rule in ( + "lister/get_or_create", "priority_ratios/get", "task/create", "task/delete_archived", diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -16,7 +16,7 @@ from swh.scheduler.interface import SchedulerInterface -from .common import tasks_from_template, TEMPLATES, TASK_TYPES +from .common import tasks_from_template, TEMPLATES, TASK_TYPES, LISTERS def subdict(d, keys=None, excl=()): @@ -612,6 +612,21 @@ "status": "eventful", } + def test_get_or_create_lister(self, swh_scheduler): + db_listers = [] + for lister_args in LISTERS: + db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) + + for lister, lister_args in zip(db_listers, LISTERS): + assert lister.name == lister_args["name"] + assert lister.arguments == lister_args.get("arguments", {}) + + lister_get_again = swh_scheduler.get_or_create_lister( + lister.name, lister.arguments + ) + + assert lister == lister_get_again + def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt)