-- SWH DB schema upgrade
-- from_version: 32
-- to_version: 33
-- description: Archive old new_task/new_task-run tasks

rollback;
begin;

insert into dbversion (version, release, description)
       values (33, now(), 'Work In Progress');

-- Schema

create table new_task (
  id bigserial primary key,
  type text, -- not null references task_type(type),
  arguments jsonb not null,
  next_run timestamptz not null,
  current_interval interval,
  status task_status not null,
  policy task_policy not null default 'recurring',
  retries_left bigint not null default 0,
  priority task_priority -- references priority_ratio(id)
  -- check (policy <> 'recurring' or current_interval is not null)
);

comment on table new_task is 'Schedule of recurring tasks';
comment on column new_task.arguments is 'Arguments passed to the underlying job scheduler. '
                                    'Contains two keys, ''args'' (list) and ''kwargs'' (object).';
comment on column new_task.next_run is 'The next run of this new_task should be run on or after that time';
comment on column new_task.current_interval is 'The interval between two runs of this new_task, '
                                           'taking into account the backoff factor';
comment on column new_task.policy is 'Whether the new_task is one-shot or recurring';
comment on column new_task.retries_left is 'The number of "short delay" retries of the new_task in case of '
                                       'transient failure';
comment on column new_task.priority is 'Policy of the given new_task';
comment on column new_task.id is 'Task Identifier';
comment on column new_task.type is 'References task_type table';
comment on column new_task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')';

create table new_task_run (
  id bigserial primary key,
  task bigint, -- not null references task(id),
  backend_id text,
  scheduled timestamptz,
  started timestamptz,
  ended timestamptz,
  metadata jsonb,
  status task_run_status --  not null default 'scheduled'
);
comment on table new_task_run is 'History of new_task runs sent to the job-running backend';
comment on column new_task_run.backend_id is 'id of the new_task run in the job-running backend';
comment on column new_task_run.metadata is 'Useful metadata for the given new_task run. '
                                       'For instance, the worker that took on the job, '
                                       'or the logs for the run.';
comment on column new_task_run.id is 'Task run identifier';
comment on column new_task_run.task is 'References new_task table';
comment on column new_task_run.scheduled is 'Scheduled run time for new_task';
comment on column new_task_run.started is 'Task starting time';
comment on column new_task_run.ended is 'Task ending time';

--------------------------
-- Actual migration script
--------------------------

-- Keep only the necessary recurring task runs (from tasks lister & load-nixguix)
insert into new_task_run
select id, task, backend_id, scheduled, started, ended, metadata, status
from task_run where task in (
  select distinct id from task
  where (
    policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%')
  ) or (
    policy = 'oneshot' and next_run > now() - interval '2 months'
  )
);

insert into new_task(id, type, arguments, next_run, current_interval, status, policy, retries_left, priority)
select distinct id, type, arguments, next_run, current_interval, status, policy, retries_left, priority
from task
where (policy='recurring' and (type = 'load-nixguix' or type like 'list-%' or type like 'index-%'))
    or (policy = 'oneshot' and next_run > now() - interval '2 months')
;

-- -----------
-- -- renaming
-- -----------

-- -- Rename current tables to archive_ prefixed names tables
alter table task rename to archive_task;
alter table task_run rename to archive_task_run;

-- -- Rename new tables to standard tables
alter table new_task rename to task;
alter table new_task_run rename to task_run;

-- -- -------------
-- -- -- pk and fks
-- -- -------------

alter table task
alter column type set not null;

alter table task
add constraint task_type_fk
foreign key (type) references task_type (type);

alter table task
add constraint task_priority_fk
foreign key (priority) references priority_ratio (id);

alter table task
add constraint task_check_policy
check (policy <> 'recurring' or current_interval is not null)
not valid;

alter table task
  validate constraint task_check_policy;

alter table task_run
alter column task set not null,
add constraint task_id_fk
foreign key (task) references task (id);

alter table task_run
alter column status set not null,
alter column status set default 'scheduled';

--------
-- index
--------

create index on task(type);
create index on task(next_run);

create index on task using btree(type, md5(arguments::text));
create index on task(priority);

create index on task_run(task);
create index on task_run(backend_id);

create index on task_run(task asc, started asc);

create index on task(type, next_run)
where status = 'next_run_not_scheduled'::task_status;

----------
-- trigger
----------

create or replace function swh_scheduler_update_task_on_task_end ()
  returns trigger
  language plpgsql
as $$
declare
  cur_task task%rowtype;
  cur_task_type task_type%rowtype;
  adjustment_factor float;
  new_interval interval;
begin
  select * from task where id = new.task into cur_task;
  select * from task_type where type = cur_task.type into cur_task_type;

  case
    when new.status = 'permfailed' then
      update task
        set status = 'disabled'
        where id = cur_task.id;
    when new.status in ('eventful', 'uneventful') then
      case
        when cur_task.policy = 'oneshot' then
          update task
            set status = 'completed'
            where id = cur_task.id;
        when cur_task.policy = 'recurring' then
          if new.status = 'uneventful' then
            adjustment_factor := 1/cur_task_type.backoff_factor;
          else
            adjustment_factor := 1/cur_task_type.backoff_factor;
          end if;
          new_interval := greatest(
            cur_task_type.min_interval,
            least(
              cur_task_type.max_interval,
              adjustment_factor * cur_task.current_interval));
          update task
            set status = 'next_run_not_scheduled',
                next_run = new.ended + new_interval,
                current_interval = new_interval,
                retries_left = coalesce(cur_task_type.num_retries, 0)
            where id = cur_task.id;
      end case;
    else -- new.status in 'failed', 'lost'
      if cur_task.retries_left > 0 then
        update task
          set status = 'next_run_not_scheduled',
              next_run = new.ended + coalesce(cur_task_type.retry_delay, interval '1 hour'),
              retries_left = cur_task.retries_left - 1
          where id = cur_task.id;
      else -- no retries left
        case
          when cur_task.policy = 'oneshot' then
            update task
              set status = 'disabled'
              where id = cur_task.id;
          when cur_task.policy = 'recurring' then
            update task
              set status = 'next_run_not_scheduled',
                  next_run = new.ended + cur_task.current_interval,
                  retries_left = coalesce(cur_task_type.num_retries, 0)
              where id = cur_task.id;
        end case;
      end if; -- retries
  end case;
  return null;
end;
$$;

create trigger update_task_on_task_end
  after update of status on task_run
  for each row
  when (new.status NOT IN ('scheduled', 'started'))
  execute procedure swh_scheduler_update_task_on_task_end ();