diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ arrow attrs +attrs-strict celery >= 4 # Not a direct dependency: missing from celery 4.4.4 future >= 0.18.0 diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,6 +7,7 @@ import logging from arrow import Arrow, utcnow +import attr import psycopg2.pool import psycopg2.extras @@ -16,6 +17,8 @@ from swh.core.db import BaseDb from swh.core.db.common import db_transaction +from .exc import StaleData +from .model import Lister logger = logging.getLogger(__name__) @@ -26,6 +29,7 @@ psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) +psycopg2.extras.register_uuid() def format_query(query, keys): @@ -130,6 +134,63 @@ cur.execute(query) return cur.fetchall() + @db_transaction() + def get_or_create_lister( + self, name: str, arguments: Optional[Dict[str, str]] = None, db=None, cur=None + ) -> Lister: + """Retrieve (or create) the lister with the given name and arguments in database. + """ + + if arguments is None: + arguments = {} + + select_cols = ", ".join(Lister.select_columns()) + insert_cols, insert_meta = ( + ", ".join(tup) for tup in Lister.insert_columns_and_metavars() + ) + + query = f""" + with added as ( + insert into listers ({insert_cols}) values ({insert_meta}) + on conflict do nothing + returning {select_cols} + ) + select {select_cols} from added + union all + select {select_cols} from listers + where (name, arguments) = (%(name)s, %(arguments)s); + """ + + cur.execute(query, attr.asdict(Lister(name=name, arguments=arguments))) + + return Lister(**cur.fetchone()) + + @db_transaction() + def update_lister(self, lister: Lister, db=None, cur=None) -> Lister: + """Update the state for the given lister instance in the database. + + Returns: the lister with all fields updated from the database + """ + + select_cols = ", ".join(Lister.select_columns()) + set_vars = ", ".join( + f"{col} = {meta}" + for col, meta in zip(*Lister.insert_columns_and_metavars()) + ) + + query = f"""update listers + set {set_vars} + where id=%(id)s and updated=%(updated)s + returning {select_cols}""" + + cur.execute(query, attr.asdict(lister)) + updated = cur.fetchone() + + if not updated: + raise StaleData("Stale data; Lister state not updated") + + return Lister(**updated) + task_create_keys = [ "type", "arguments", diff --git a/swh/scheduler/exc.py b/swh/scheduler/exc.py --- a/swh/scheduler/exc.py +++ b/swh/scheduler/exc.py @@ -5,8 +5,13 @@ __all__ = [ "SchedulerException", + "StaleData", ] class SchedulerException(Exception): pass + + +class StaleData(SchedulerException): + pass diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -7,6 +7,8 @@ from swh.core.api import remote_api_endpoint +from swh.scheduler.model import Lister + class SchedulerInterface: @remote_api_endpoint("task_type/create") @@ -249,6 +251,21 @@ """Search task run for a task id""" ... + @remote_api_endpoint("lister/get_or_create") + def get_or_create_lister( + self, name: str, arguments: Optional[Dict[str, str]] = None + ) -> Lister: + """Retrieve (or create) the lister with the given name and given arguments.""" + ... + + @remote_api_endpoint("lister/update") + def update_lister(self, lister: Lister) -> Lister: + """Update the state for the given lister instance in the database. + + Returns: the lister with all fields updated from the database + """ + ... + @remote_api_endpoint("priority_ratios/get") def get_priority_ratios(self): ... diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -3,10 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import List, Optional, Tuple +import datetime +from uuid import UUID +from typing import Any, Dict, List, Optional, Tuple import attr import attr.converters +from attrs_strict import type_validator @attr.s @@ -55,3 +58,38 @@ cls._insert_cols_and_metavars = cols, metavars return cls._insert_cols_and_metavars + + +@attr.s +class Lister(BaseSchedulerModel): + name = attr.ib(type=str, validator=[type_validator()]) + arguments = attr.ib(type=Dict[str, str], validator=[type_validator()], factory=dict) + + # Populated by database + id = attr.ib( + type=Optional[UUID], + validator=type_validator(), + default=None, + metadata={"primary_key": True}, + ) + + current_state = attr.ib( + type=Dict[str, Any], validator=[type_validator()], factory=dict + ) + created = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now_add": True}, + ) + updated = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now": True}, + ) + + @name.validator + def check_name(self, attribute, value): + if not value: + raise ValueError("Lister name cannot be empty") diff --git a/swh/scheduler/sql/10-swh-init.sql b/swh/scheduler/sql/10-swh-init.sql new file mode 100644 --- /dev/null +++ b/swh/scheduler/sql/10-swh-init.sql @@ -0,0 +1 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql --- a/swh/scheduler/sql/30-swh-schema.sql +++ b/swh/scheduler/sql/30-swh-schema.sql @@ -110,3 +110,19 @@ comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; + +create table if not exists listers ( + id uuid primary key default uuid_generate_v4(), + name text not null, + arguments jsonb not null, + created timestamptz not null default now(), -- auto_now_add in the model + current_state jsonb not null, + updated timestamptz not null +); + +comment on table listers is 'Known listers for the origin_tasks mapping'; +comment on column listers.name is 'Name of the lister'; +comment on column listers.arguments is 'Arguments of the lister (e.g. for the GitLab lister, contains the lister instance name)'; +comment on column listers.created is 'Timestamp at which the lister was originally created'; +comment on column listers.current_state is 'Known current state of this lister'; +comment on column listers.updated is 'Timestamp at which the lister state was last updated'; diff --git a/swh/scheduler/sql/60-swh-indexes.sql b/swh/scheduler/sql/60-swh-indexes.sql --- a/swh/scheduler/sql/60-swh-indexes.sql +++ b/swh/scheduler/sql/60-swh-indexes.sql @@ -11,3 +11,6 @@ create index task_run_id_asc_idx on task_run(task asc, started asc); + +-- lister schema +create unique index on listers (name, arguments); diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -93,3 +93,21 @@ ) ) return tasks + + +LISTERS = ( + {"name": "github"}, + { + "name": "gitlab", + "arguments": {"url": "https://gitlab.com/api/v4", "instance": "gitlab"}, + }, + { + "name": "gitlab", + "arguments": { + "url": "https://gitlab.freedesktop.org/api/v4", + "instance": "freedesktop", + }, + }, + {"name": "npm"}, + {"name": "pypi"}, +) diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -42,6 +42,8 @@ expected_rules = set( "/" + rule for rule in ( + "lister/get_or_create", + "lister/update", "priority_ratios/get", "task/create", "task/delete_archived", diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -13,10 +13,12 @@ from typing import Any, Dict from arrow import utcnow +import pytest +from swh.scheduler.exc import StaleData from swh.scheduler.interface import SchedulerInterface -from .common import tasks_from_template, TEMPLATES, TASK_TYPES +from .common import tasks_from_template, TEMPLATES, TASK_TYPES, LISTERS def subdict(d, keys=None, excl=()): @@ -612,6 +614,40 @@ "status": "eventful", } + def test_get_or_create_lister(self, swh_scheduler): + db_listers = [] + for lister_args in LISTERS: + db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) + + for lister, lister_args in zip(db_listers, LISTERS): + assert lister.name == lister_args["name"] + assert lister.arguments == lister_args.get("arguments", {}) + + lister_get_again = swh_scheduler.get_or_create_lister( + lister.name, lister.arguments + ) + + assert lister == lister_get_again + + def test_update_lister(self, swh_scheduler): + lister = swh_scheduler.get_or_create_lister(**LISTERS[0]) + + lister.current_state = {"updated": "now"} + + updated_lister = swh_scheduler.update_lister(lister) + + assert updated_lister.updated > lister.updated + assert updated_lister.current_state == lister.current_state + + def test_update_lister_stale(self, swh_scheduler): + lister = swh_scheduler.get_or_create_lister(**LISTERS[0]) + + swh_scheduler.update_lister(lister) + + with pytest.raises(StaleData) as exc: + swh_scheduler.update_lister(lister) + assert "state not updated" in exc.value.args[0] + def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt)