diff --git a/MANIFEST.in b/MANIFEST.in index e7c46fc..a464dd3 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,6 @@ +include README.md include Makefile include requirements.txt include requirements-swh.txt include version.txt +recursive-include swh/scheduler/tests/data *.sql diff --git a/swh/scheduler/tests/__init__.py b/swh/scheduler/tests/__init__.py index e69de29..dc81528 100644 --- a/swh/scheduler/tests/__init__.py +++ b/swh/scheduler/tests/__init__.py @@ -0,0 +1,2 @@ +from os import path +DATA_DIR = path.join(path.dirname(__file__), 'data') diff --git a/swh/scheduler/tests/data/dumps/swh-scheduler-updater.sql b/swh/scheduler/tests/data/dumps/swh-scheduler-updater.sql new file mode 100644 index 0000000..06902e5 --- /dev/null +++ b/swh/scheduler/tests/data/dumps/swh-scheduler-updater.sql @@ -0,0 +1,252 @@ +-- +-- PostgreSQL database dump +-- + +-- Dumped from database version 10.4 (Debian 10.4-2.pgdg+1) +-- Dumped by pg_dump version 10.4 (Debian 10.4-2.pgdg+1) + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET client_min_messages = warning; +SET row_security = off; + +-- +-- Name: plpgsql; Type: EXTENSION; Schema: -; Owner: - +-- + +CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; + + +-- +-- Name: EXTENSION plpgsql; Type: COMMENT; Schema: -; Owner: - +-- + +COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; + + +-- +-- Name: btree_gist; Type: EXTENSION; Schema: -; Owner: - +-- + +CREATE EXTENSION IF NOT EXISTS btree_gist WITH SCHEMA public; + + +-- +-- Name: EXTENSION btree_gist; Type: COMMENT; Schema: -; Owner: - +-- + +COMMENT ON EXTENSION btree_gist IS 'support for indexing common datatypes in GiST'; + + +-- +-- Name: pgcrypto; Type: EXTENSION; Schema: -; Owner: - +-- + +CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public; + + +-- +-- Name: EXTENSION pgcrypto; Type: COMMENT; Schema: -; Owner: - +-- + +COMMENT ON EXTENSION pgcrypto IS 'cryptographic functions'; + + +-- +-- Name: origin_type; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.origin_type AS ENUM ( + 'git', + 'svn', + 'hg', + 'deb' +); + + +-- +-- Name: TYPE origin_type; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TYPE public.origin_type IS 'Url''s repository type'; + + +-- +-- Name: sha1; Type: DOMAIN; Schema: public; Owner: - +-- + +CREATE DOMAIN public.sha1 AS bytea + CONSTRAINT sha1_check CHECK ((length(VALUE) = 20)); + + +-- +-- Name: hash_sha1(text); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.hash_sha1(text) RETURNS public.sha1 + LANGUAGE sql IMMUTABLE STRICT + AS $_$ + select public.digest($1, 'sha1') :: sha1 +$_$; + + +-- +-- Name: FUNCTION hash_sha1(text); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.hash_sha1(text) IS 'Compute sha1 hash as text'; + + +-- +-- Name: swh_cache_put(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_cache_put() RETURNS void + LANGUAGE plpgsql + AS $$ +begin + insert into cache (id, url, origin_type, cnt, last_seen) + select hash_sha1(url), url, origin_type, cnt, last_seen + from tmp_cache t + on conflict(id) + do update set cnt = (select cnt from cache where id=excluded.id) + excluded.cnt, + last_seen = excluded.last_seen; + return; +end +$$; + + +-- +-- Name: FUNCTION swh_cache_put(); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_cache_put() IS 'Write to cache temporary events'; + + +SET default_tablespace = ''; + +SET default_with_oids = false; + +-- +-- Name: cache; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.cache ( + id public.sha1 NOT NULL, + url text NOT NULL, + origin_type public.origin_type NOT NULL, + cnt integer DEFAULT 1, + first_seen timestamp with time zone DEFAULT now() NOT NULL, + last_seen timestamp with time zone NOT NULL +); + + +-- +-- Name: swh_cache_read(timestamp with time zone, integer); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_cache_read(ts timestamp with time zone, lim integer) RETURNS SETOF public.cache + LANGUAGE sql STABLE + AS $$ + select id, url, origin_type, cnt, first_seen, last_seen + from cache + where last_seen <= ts + limit lim; +$$; + + +-- +-- Name: FUNCTION swh_cache_read(ts timestamp with time zone, lim integer); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_cache_read(ts timestamp with time zone, lim integer) IS 'Read cache entries'; + + +-- +-- Name: swh_mktemp_cache(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_mktemp_cache() RETURNS void + LANGUAGE sql + AS $$ + create temporary table tmp_cache ( + like cache including defaults + ) on commit drop; + alter table tmp_cache drop column id; +$$; + + +-- +-- Name: dbversion; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.dbversion ( + version integer NOT NULL, + release timestamp with time zone NOT NULL, + description text NOT NULL +); + + +-- +-- Name: TABLE dbversion; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.dbversion IS 'Schema update tracking'; + + +-- +-- Data for Name: cache; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.cache (id, url, origin_type, cnt, first_seen, last_seen) FROM stdin; +\. + + +-- +-- Data for Name: dbversion; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.dbversion (version, release, description) FROM stdin; +1 2018-06-05 13:57:29.282695+02 Work In Progress +\. + + +-- +-- Name: cache cache_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.cache + ADD CONSTRAINT cache_pkey PRIMARY KEY (id); + + +-- +-- Name: dbversion dbversion_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.dbversion + ADD CONSTRAINT dbversion_pkey PRIMARY KEY (version); + + +-- +-- Name: cache_last_seen_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX cache_last_seen_idx ON public.cache USING btree (last_seen); + + +-- +-- Name: cache_url_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX cache_url_idx ON public.cache USING btree (url); + + +-- +-- PostgreSQL database dump complete +-- + diff --git a/swh/scheduler/tests/data/dumps/swh-scheduler.sql b/swh/scheduler/tests/data/dumps/swh-scheduler.sql new file mode 100644 index 0000000..bde6ad9 --- /dev/null +++ b/swh/scheduler/tests/data/dumps/swh-scheduler.sql @@ -0,0 +1,1114 @@ +-- +-- PostgreSQL database dump +-- + +-- Dumped from database version 10.5 (Debian 10.5-1.pgdg+1) +-- Dumped by pg_dump version 10.5 (Debian 10.5-1.pgdg+1) + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET client_min_messages = warning; +SET row_security = off; + +-- +-- Name: plpgsql; Type: EXTENSION; Schema: -; Owner: - +-- + +CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog; + + +-- +-- Name: EXTENSION plpgsql; Type: COMMENT; Schema: -; Owner: - +-- + +COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; + + +-- +-- Name: task_policy; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.task_policy AS ENUM ( + 'recurring', + 'oneshot' +); + + +-- +-- Name: TYPE task_policy; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TYPE public.task_policy IS 'Recurrence policy of the given task'; + + +-- +-- Name: task_priority; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.task_priority AS ENUM ( + 'high', + 'normal', + 'low' +); + + +-- +-- Name: TYPE task_priority; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TYPE public.task_priority IS 'Priority of the given task'; + + +-- +-- Name: task_run_status; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.task_run_status AS ENUM ( + 'scheduled', + 'started', + 'eventful', + 'uneventful', + 'failed', + 'permfailed', + 'lost' +); + + +-- +-- Name: TYPE task_run_status; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TYPE public.task_run_status IS 'Status of a given task run'; + + +-- +-- Name: task_status; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.task_status AS ENUM ( + 'next_run_not_scheduled', + 'next_run_scheduled', + 'completed', + 'disabled' +); + + +-- +-- Name: TYPE task_status; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TYPE public.task_status IS 'Status of a given task'; + + +-- +-- Name: task_record; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.task_record AS ( + task_id bigint, + task_policy public.task_policy, + task_status public.task_status, + task_run_id bigint, + arguments jsonb, + type text, + backend_id text, + metadata jsonb, + scheduled timestamp with time zone, + started timestamp with time zone, + ended timestamp with time zone, + task_run_status public.task_run_status +); + + +SET default_tablespace = ''; + +SET default_with_oids = false; + +-- +-- Name: task; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.task ( + id bigint NOT NULL, + type text NOT NULL, + arguments jsonb NOT NULL, + next_run timestamp with time zone NOT NULL, + current_interval interval, + status public.task_status NOT NULL, + policy public.task_policy DEFAULT 'recurring'::public.task_policy NOT NULL, + retries_left bigint DEFAULT 0 NOT NULL, + priority public.task_priority, + CONSTRAINT task_check CHECK (((policy <> 'recurring'::public.task_policy) OR (current_interval IS NOT NULL))) +); + + +-- +-- Name: TABLE task; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.task IS 'Schedule of recurring tasks'; + + +-- +-- Name: COLUMN task.arguments; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task.arguments IS 'Arguments passed to the underlying job scheduler. Contains two keys, ''args'' (list) and ''kwargs'' (object).'; + + +-- +-- Name: COLUMN task.next_run; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task.next_run IS 'The next run of this task should be run on or after that time'; + + +-- +-- Name: COLUMN task.current_interval; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task.current_interval IS 'The interval between two runs of this task, taking into account the backoff factor'; + + +-- +-- Name: COLUMN task.policy; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task.policy IS 'Whether the task is one-shot or recurring'; + + +-- +-- Name: COLUMN task.retries_left; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task.retries_left IS 'The number of "short delay" retries of the task in case of transient failure'; + + +-- +-- Name: COLUMN task.priority; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task.priority IS 'Policy of the given task'; + + +-- +-- Name: swh_scheduler_create_tasks_from_temp(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_create_tasks_from_temp() RETURNS SETOF public.task + LANGUAGE plpgsql + AS $$ +begin + -- update the default values in one go + -- this is separated from the insert/select to avoid too much + -- juggling + update tmp_task t + set current_interval = tt.default_interval, + retries_left = coalesce(retries_left, tt.num_retries, 0) + from task_type tt + where tt.type=t.type; + + insert into task (type, arguments, next_run, status, current_interval, policy, + retries_left, priority) + select type, arguments, next_run, status, current_interval, policy, + retries_left, priority + from tmp_task t + where not exists(select 1 + from task + where type = t.type and + arguments->'args' = t.arguments->'args' and + arguments->'kwargs' = t.arguments->'kwargs' and + policy = t.policy and + priority is not distinct from t.priority and + status = t.status); + + return query + select distinct t.* + from tmp_task tt inner join task t on ( + tt.type = t.type and + tt.arguments->'args' = t.arguments->'args' and + tt.arguments->'kwargs' = t.arguments->'kwargs' and + tt.policy = t.policy and + tt.priority is not distinct from t.priority and + tt.status = t.status + ); +end; +$$; + + +-- +-- Name: FUNCTION swh_scheduler_create_tasks_from_temp(); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_create_tasks_from_temp() IS 'Create tasks in bulk from the temporary table'; + + +-- +-- Name: swh_scheduler_delete_archived_tasks(bigint[], bigint[]); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_delete_archived_tasks(task_ids bigint[], task_run_ids bigint[]) RETURNS void + LANGUAGE sql + AS $$ + -- clean up task_run_ids + delete from task_run where id in (select * from unnest(task_run_ids)); + -- clean up only tasks whose associated task_run are all cleaned up. + -- Remaining tasks will stay there and will be cleaned up when + -- remaining data have been indexed + delete from task + where id in (select t.id + from task t left outer join task_run tr on t.id=tr.task + where t.id in (select * from unnest(task_ids)) + and tr.task is null); +$$; + + +-- +-- Name: FUNCTION swh_scheduler_delete_archived_tasks(task_ids bigint[], task_run_ids bigint[]); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_delete_archived_tasks(task_ids bigint[], task_run_ids bigint[]) IS 'Clean up archived tasks function'; + + +-- +-- Name: task_run; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.task_run ( + id bigint NOT NULL, + task bigint NOT NULL, + backend_id text, + scheduled timestamp with time zone, + started timestamp with time zone, + ended timestamp with time zone, + metadata jsonb, + status public.task_run_status DEFAULT 'scheduled'::public.task_run_status NOT NULL +); + + +-- +-- Name: TABLE task_run; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.task_run IS 'History of task runs sent to the job-running backend'; + + +-- +-- Name: COLUMN task_run.backend_id; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_run.backend_id IS 'id of the task run in the job-running backend'; + + +-- +-- Name: COLUMN task_run.metadata; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_run.metadata IS 'Useful metadata for the given task run. For instance, the worker that took on the job, or the logs for the run.'; + + +-- +-- Name: swh_scheduler_end_task_run(text, public.task_run_status, jsonb, timestamp with time zone); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_end_task_run(backend_id text, status public.task_run_status, metadata jsonb DEFAULT '{}'::jsonb, ts timestamp with time zone DEFAULT now()) RETURNS public.task_run + LANGUAGE sql + AS $$ + update task_run + set ended = ts, + status = swh_scheduler_end_task_run.status, + metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata + where task_run.backend_id = swh_scheduler_end_task_run.backend_id + returning *; +$$; + + +-- +-- Name: swh_scheduler_grab_ready_tasks(text, timestamp with time zone, bigint, bigint); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_grab_ready_tasks(task_type text, ts timestamp with time zone DEFAULT now(), num_tasks bigint DEFAULT NULL::bigint, num_tasks_priority bigint DEFAULT NULL::bigint) RETURNS SETOF public.task + LANGUAGE sql + AS $$ + update task + set status='next_run_scheduled' + from ( + select id from swh_scheduler_peek_ready_tasks(task_type, ts, num_tasks, num_tasks_priority) + ) next_tasks + where task.id = next_tasks.id + returning task.*; +$$; + + +-- +-- Name: FUNCTION swh_scheduler_grab_ready_tasks(task_type text, ts timestamp with time zone, num_tasks bigint, num_tasks_priority bigint); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_grab_ready_tasks(task_type text, ts timestamp with time zone, num_tasks bigint, num_tasks_priority bigint) IS 'Grab tasks ready for scheduling and change their status'; + + +-- +-- Name: swh_scheduler_mktemp_task(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_mktemp_task() RETURNS void + LANGUAGE sql + AS $$ + create temporary table tmp_task ( + like task excluding indexes + ) on commit drop; + alter table tmp_task + alter column retries_left drop not null, + drop column id; +$$; + + +-- +-- Name: FUNCTION swh_scheduler_mktemp_task(); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_mktemp_task() IS 'Create a temporary table for bulk task creation'; + + +-- +-- Name: swh_scheduler_mktemp_task_run(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_mktemp_task_run() RETURNS void + LANGUAGE sql + AS $$ + create temporary table tmp_task_run ( + like task_run excluding indexes + ) on commit drop; + alter table tmp_task_run + drop column id, + drop column status; +$$; + + +-- +-- Name: FUNCTION swh_scheduler_mktemp_task_run(); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_mktemp_task_run() IS 'Create a temporary table for bulk task run scheduling'; + + +-- +-- Name: swh_scheduler_nb_priority_tasks(bigint, public.task_priority); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority public.task_priority) RETURNS numeric + LANGUAGE sql STABLE + AS $$ + select ceil(num_tasks_priority * (select ratio from priority_ratio where id = task_priority)) :: numeric +$$; + + +-- +-- Name: FUNCTION swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority public.task_priority); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority public.task_priority) IS 'Given a priority task and a total number, compute the number of tasks to read'; + + +-- +-- Name: swh_scheduler_peek_no_priority_tasks(text, timestamp with time zone, bigint); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_peek_no_priority_tasks(task_type text, ts timestamp with time zone DEFAULT now(), num_tasks bigint DEFAULT NULL::bigint) RETURNS SETOF public.task + LANGUAGE sql STABLE + AS $$ +select * from task + where next_run <= ts + and type = task_type + and status = 'next_run_not_scheduled' + and priority is null + order by next_run + limit num_tasks + for update skip locked; +$$; + + +-- +-- Name: FUNCTION swh_scheduler_peek_no_priority_tasks(task_type text, ts timestamp with time zone, num_tasks bigint); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_peek_no_priority_tasks(task_type text, ts timestamp with time zone, num_tasks bigint) IS 'Retrieve tasks without priority'; + + +-- +-- Name: swh_scheduler_peek_priority_tasks(text, timestamp with time zone, bigint); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_peek_priority_tasks(task_type text, ts timestamp with time zone DEFAULT now(), num_tasks_priority bigint DEFAULT NULL::bigint) RETURNS SETOF public.task + LANGUAGE plpgsql + AS $$ +declare + r record; + count_row bigint; + nb_diff bigint; + nb_high bigint; + nb_normal bigint; + nb_low bigint; +begin + -- expected values to fetch + select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'high') into nb_high; + select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'normal') into nb_normal; + select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'low') into nb_low; + nb_diff := 0; + count_row := 0; + + for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_high, 'high') + loop + count_row := count_row + 1; + return next r; + end loop; + + if count_row < nb_high then + nb_normal := nb_normal + nb_high - count_row; + end if; + + count_row := 0; + for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_normal, 'normal') + loop + count_row := count_row + 1; + return next r; + end loop; + + if count_row < nb_normal then + nb_low := nb_low + nb_normal - count_row; + end if; + + return query select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_low, 'low'); +end +$$; + + +-- +-- Name: FUNCTION swh_scheduler_peek_priority_tasks(task_type text, ts timestamp with time zone, num_tasks_priority bigint); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_peek_priority_tasks(task_type text, ts timestamp with time zone, num_tasks_priority bigint) IS 'Retrieve priority tasks'; + + +-- +-- Name: swh_scheduler_peek_ready_tasks(text, timestamp with time zone, bigint, bigint); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_peek_ready_tasks(task_type text, ts timestamp with time zone DEFAULT now(), num_tasks bigint DEFAULT NULL::bigint, num_tasks_priority bigint DEFAULT NULL::bigint) RETURNS SETOF public.task + LANGUAGE plpgsql + AS $$ +declare + r record; + count_row bigint; + nb_diff bigint; + nb_tasks bigint; +begin + count_row := 0; + + for r in select * from swh_scheduler_peek_priority_tasks(task_type, ts, num_tasks_priority) + order by priority, next_run + loop + count_row := count_row + 1; + return next r; + end loop; + + if count_row < num_tasks_priority then + nb_tasks := num_tasks + num_tasks_priority - count_row; + else + nb_tasks := num_tasks; + end if; + + for r in select * from swh_scheduler_peek_no_priority_tasks(task_type, ts, nb_tasks) + order by priority, next_run + loop + return next r; + end loop; + + return; +end +$$; + + +-- +-- Name: FUNCTION swh_scheduler_peek_ready_tasks(task_type text, ts timestamp with time zone, num_tasks bigint, num_tasks_priority bigint); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_peek_ready_tasks(task_type text, ts timestamp with time zone, num_tasks bigint, num_tasks_priority bigint) IS 'Retrieve tasks with/without priority in order'; + + +-- +-- Name: swh_scheduler_peek_tasks_with_priority(text, timestamp with time zone, bigint, public.task_priority); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_peek_tasks_with_priority(task_type text, ts timestamp with time zone DEFAULT now(), num_tasks_priority bigint DEFAULT NULL::bigint, task_priority public.task_priority DEFAULT 'normal'::public.task_priority) RETURNS SETOF public.task + LANGUAGE sql STABLE + AS $$ + select * + from task t + where t.next_run <= ts + and t.type = task_type + and t.status = 'next_run_not_scheduled' + and t.priority = task_priority + order by t.next_run + limit num_tasks_priority + for update skip locked; +$$; + + +-- +-- Name: FUNCTION swh_scheduler_peek_tasks_with_priority(task_type text, ts timestamp with time zone, num_tasks_priority bigint, task_priority public.task_priority); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_peek_tasks_with_priority(task_type text, ts timestamp with time zone, num_tasks_priority bigint, task_priority public.task_priority) IS 'Retrieve tasks with a given priority'; + + +-- +-- Name: swh_scheduler_schedule_task_run(bigint, text, jsonb, timestamp with time zone); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_schedule_task_run(task_id bigint, backend_id text, metadata jsonb DEFAULT '{}'::jsonb, ts timestamp with time zone DEFAULT now()) RETURNS public.task_run + LANGUAGE sql + AS $$ + insert into task_run (task, backend_id, metadata, scheduled, status) + values (task_id, backend_id, metadata, ts, 'scheduled') + returning *; +$$; + + +-- +-- Name: swh_scheduler_schedule_task_run_from_temp(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_schedule_task_run_from_temp() RETURNS void + LANGUAGE plpgsql + AS $$ +begin + insert into task_run (task, backend_id, metadata, scheduled, status) + select task, backend_id, metadata, scheduled, 'scheduled' + from tmp_task_run; + return; +end; +$$; + + +-- +-- Name: swh_scheduler_start_task_run(text, jsonb, timestamp with time zone); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_start_task_run(backend_id text, metadata jsonb DEFAULT '{}'::jsonb, ts timestamp with time zone DEFAULT now()) RETURNS public.task_run + LANGUAGE sql + AS $$ + update task_run + set started = ts, + status = 'started', + metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata + where task_run.backend_id = swh_scheduler_start_task_run.backend_id + returning *; +$$; + + +-- +-- Name: swh_scheduler_task_to_archive(timestamp with time zone, timestamp with time zone, bigint, bigint); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_task_to_archive(ts_after timestamp with time zone, ts_before timestamp with time zone, last_id bigint DEFAULT '-1'::integer, lim bigint DEFAULT 10) RETURNS SETOF public.task_record + LANGUAGE sql STABLE + AS $$ + select t.id as task_id, t.policy as task_policy, + t.status as task_status, tr.id as task_run_id, + t.arguments, t.type, tr.backend_id, tr.metadata, + tr.scheduled, tr.started, tr.ended, tr.status as task_run_status + from task_run tr inner join task t on tr.task=t.id + where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or + (t.policy = 'recurring' and t.status = 'disabled')) and + ((ts_after <= tr.started and tr.started < ts_before) or tr.started is null) and + t.id > last_id + order by tr.task, tr.started + limit lim; +$$; + + +-- +-- Name: FUNCTION swh_scheduler_task_to_archive(ts_after timestamp with time zone, ts_before timestamp with time zone, last_id bigint, lim bigint); Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON FUNCTION public.swh_scheduler_task_to_archive(ts_after timestamp with time zone, ts_before timestamp with time zone, last_id bigint, lim bigint) IS 'Read archivable tasks function'; + + +-- +-- Name: swh_scheduler_update_task_on_task_end(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.swh_scheduler_update_task_on_task_end() RETURNS trigger + LANGUAGE plpgsql + AS $$ +declare + cur_task task%rowtype; + cur_task_type task_type%rowtype; + adjustment_factor float; + new_interval interval; +begin + select * from task where id = new.task into cur_task; + select * from task_type where type = cur_task.type into cur_task_type; + + case + when new.status = 'permfailed' then + update task + set status = 'disabled' + where id = cur_task.id; + when new.status in ('eventful', 'uneventful') then + case + when cur_task.policy = 'oneshot' then + update task + set status = 'completed' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + if new.status = 'uneventful' then + adjustment_factor := 1/cur_task_type.backoff_factor; + else + adjustment_factor := 1/cur_task_type.backoff_factor; + end if; + new_interval := greatest( + cur_task_type.min_interval, + least( + cur_task_type.max_interval, + adjustment_factor * cur_task.current_interval)); + update task + set status = 'next_run_not_scheduled', + next_run = now() + new_interval, + current_interval = new_interval, + retries_left = coalesce(cur_task_type.num_retries, 0) + where id = cur_task.id; + end case; + else -- new.status in 'failed', 'lost' + if cur_task.retries_left > 0 then + update task + set status = 'next_run_not_scheduled', + next_run = now() + coalesce(cur_task_type.retry_delay, interval '1 hour'), + retries_left = cur_task.retries_left - 1 + where id = cur_task.id; + else -- no retries left + case + when cur_task.policy = 'oneshot' then + update task + set status = 'disabled' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + update task + set status = 'next_run_not_scheduled', + next_run = now() + cur_task.current_interval, + retries_left = coalesce(cur_task_type.num_retries, 0) + where id = cur_task.id; + end case; + end if; -- retries + end case; + return null; +end; +$$; + + +-- +-- Name: dbversion; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.dbversion ( + version integer NOT NULL, + release timestamp with time zone NOT NULL, + description text NOT NULL +); + + +-- +-- Name: TABLE dbversion; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.dbversion IS 'Schema update tracking'; + + +-- +-- Name: priority_ratio; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.priority_ratio ( + id public.task_priority NOT NULL, + ratio double precision NOT NULL +); + + +-- +-- Name: TABLE priority_ratio; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.priority_ratio IS 'Oneshot task''s reading ratio per priority'; + + +-- +-- Name: COLUMN priority_ratio.id; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.priority_ratio.id IS 'Task priority id'; + + +-- +-- Name: COLUMN priority_ratio.ratio; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.priority_ratio.ratio IS 'Percentage of tasks to read per priority'; + + +-- +-- Name: task_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.task_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: task_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.task_id_seq OWNED BY public.task.id; + + +-- +-- Name: task_run_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.task_run_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: task_run_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.task_run_id_seq OWNED BY public.task_run.id; + + +-- +-- Name: task_type; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.task_type ( + type text NOT NULL, + description text NOT NULL, + backend_name text NOT NULL, + default_interval interval, + min_interval interval, + max_interval interval, + backoff_factor double precision, + max_queue_length bigint, + num_retries bigint, + retry_delay interval +); + + +-- +-- Name: TABLE task_type; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.task_type IS 'Types of schedulable tasks'; + + +-- +-- Name: COLUMN task_type.type; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.type IS 'Short identifier for the task type'; + + +-- +-- Name: COLUMN task_type.description; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.description IS 'Human-readable task description'; + + +-- +-- Name: COLUMN task_type.backend_name; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.backend_name IS 'Name of the task in the job-running backend'; + + +-- +-- Name: COLUMN task_type.default_interval; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.default_interval IS 'Default interval for newly scheduled tasks'; + + +-- +-- Name: COLUMN task_type.min_interval; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.min_interval IS 'Minimum interval between two runs of a task'; + + +-- +-- Name: COLUMN task_type.max_interval; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.max_interval IS 'Maximum interval between two runs of a task'; + + +-- +-- Name: COLUMN task_type.backoff_factor; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.backoff_factor IS 'Adjustment factor for the backoff between two task runs'; + + +-- +-- Name: COLUMN task_type.max_queue_length; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.max_queue_length IS 'Maximum length of the queue for this type of tasks'; + + +-- +-- Name: COLUMN task_type.num_retries; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.num_retries IS 'Default number of retries on transient failures'; + + +-- +-- Name: COLUMN task_type.retry_delay; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.task_type.retry_delay IS 'Retry delay for the task'; + + +-- +-- Name: task id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.task ALTER COLUMN id SET DEFAULT nextval('public.task_id_seq'::regclass); + + +-- +-- Name: task_run id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.task_run ALTER COLUMN id SET DEFAULT nextval('public.task_run_id_seq'::regclass); + + +-- +-- Data for Name: dbversion; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.dbversion (version, release, description) FROM stdin; +12 2018-10-10 13:29:22.932763+02 Work In Progress +\. + + +-- +-- Data for Name: priority_ratio; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.priority_ratio (id, ratio) FROM stdin; +high 0.5 +normal 0.299999999999999989 +low 0.200000000000000011 +\. + + +-- +-- Data for Name: task; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.task (id, type, arguments, next_run, current_interval, status, policy, retries_left, priority) FROM stdin; +\. + + +-- +-- Data for Name: task_run; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.task_run (id, task, backend_id, scheduled, started, ended, metadata, status) FROM stdin; +\. + + +-- +-- Data for Name: task_type; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.task_type (type, description, backend_name, default_interval, min_interval, max_interval, backoff_factor, max_queue_length, num_retries, retry_delay) FROM stdin; +swh-loader-mount-dump-and-load-svn-repository Loading svn repositories from svn dump swh.loader.svn.tasks.MountAndLoadSvnRepository 1 day 1 day 1 day 1 1000 \N \N +swh-deposit-archive-loading Loading deposit archive into swh through swh-loader-tar swh.deposit.loader.tasks.LoadDepositArchiveTsk 1 day 1 day 1 day 1 1000 3 \N +swh-deposit-archive-checks Pre-checking deposit step before loading into swh archive swh.deposit.loader.tasks.ChecksDepositTsk 1 day 1 day 1 day 1 1000 3 \N +swh-vault-cooking Cook a Vault bundle swh.vault.cooking_tasks.SWHCookingTask 1 day 1 day 1 day 1 10000 \N \N +origin-load-hg Loading mercurial repository swh-loader-mercurial swh.loader.mercurial.tasks.LoadMercurialTsk 1 day 1 day 1 day 1 1000 \N \N +origin-load-archive-hg Loading archive mercurial repository swh-loader-mercurial swh.loader.mercurial.tasks.LoadArchiveMercurialTsk 1 day 1 day 1 day 1 1000 \N \N +origin-update-git Update an origin of type git swh.loader.git.tasks.UpdateGitRepository 64 days 12:00:00 64 days 2 5000 \N \N +swh-lister-github-incremental Incrementally list GitHub swh.lister.github.tasks.IncrementalGitHubLister 1 day 1 day 1 day 1 \N \N \N +swh-lister-github-full Full update of GitHub repos list swh.lister.github.tasks.FullGitHubRelister 90 days 90 days 90 days 1 \N \N \N +swh-lister-debian List a Debian distribution swh.lister.debian.tasks.DebianListerTask 1 day 1 day 1 day 1 \N \N \N +swh-lister-gitlab-incremental Incrementally list a Gitlab instance swh.lister.gitlab.tasks.IncrementalGitLabLister 1 day 1 day 1 day 1 \N \N \N +swh-lister-gitlab-full Full update of a Gitlab instance's repos list swh.lister.gitlab.tasks.FullGitLabRelister 90 days 90 days 90 days 1 \N \N \N +swh-lister-pypi Full pypi lister swh.lister.pypi.tasks.PyPIListerTask 1 day 1 day 1 day 1 \N \N \N +origin-update-pypi Load Pypi origin swh.loader.pypi.tasks.LoadPyPI 64 days 12:00:00 64 days 2 5000 \N \N +\. + + +-- +-- Name: task_id_seq; Type: SEQUENCE SET; Schema: public; Owner: - +-- + +SELECT pg_catalog.setval('public.task_id_seq', 1, false); + + +-- +-- Name: task_run_id_seq; Type: SEQUENCE SET; Schema: public; Owner: - +-- + +SELECT pg_catalog.setval('public.task_run_id_seq', 1, false); + + +-- +-- Name: dbversion dbversion_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.dbversion + ADD CONSTRAINT dbversion_pkey PRIMARY KEY (version); + + +-- +-- Name: priority_ratio priority_ratio_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.priority_ratio + ADD CONSTRAINT priority_ratio_pkey PRIMARY KEY (id); + + +-- +-- Name: task task_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.task + ADD CONSTRAINT task_pkey PRIMARY KEY (id); + + +-- +-- Name: task_run task_run_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.task_run + ADD CONSTRAINT task_run_pkey PRIMARY KEY (id); + + +-- +-- Name: task_type task_type_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.task_type + ADD CONSTRAINT task_type_pkey PRIMARY KEY (type); + + +-- +-- Name: task_args; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX task_args ON public.task USING btree (((arguments -> 'args'::text))); + + +-- +-- Name: task_kwargs; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX task_kwargs ON public.task USING gin (((arguments -> 'kwargs'::text))); + + +-- +-- Name: task_next_run_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX task_next_run_idx ON public.task USING btree (next_run); + + +-- +-- Name: task_priority_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX task_priority_idx ON public.task USING btree (priority); + + +-- +-- Name: task_run_backend_id_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX task_run_backend_id_idx ON public.task_run USING btree (backend_id); + + +-- +-- Name: task_run_id_asc_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX task_run_id_asc_idx ON public.task_run USING btree (task, started); + + +-- +-- Name: task_run_task_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX task_run_task_idx ON public.task_run USING btree (task); + + +-- +-- Name: task_type_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX task_type_idx ON public.task USING btree (type); + + +-- +-- Name: task_run update_task_on_task_end; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER update_task_on_task_end AFTER UPDATE OF status ON public.task_run FOR EACH ROW WHEN ((new.status <> ALL (ARRAY['scheduled'::public.task_run_status, 'started'::public.task_run_status]))) EXECUTE PROCEDURE public.swh_scheduler_update_task_on_task_end(); + + +-- +-- Name: task task_priority_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.task + ADD CONSTRAINT task_priority_fkey FOREIGN KEY (priority) REFERENCES public.priority_ratio(id); + + +-- +-- Name: task_run task_run_task_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.task_run + ADD CONSTRAINT task_run_task_fkey FOREIGN KEY (task) REFERENCES public.task(id); + + +-- +-- Name: task task_type_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.task + ADD CONSTRAINT task_type_fkey FOREIGN KEY (type) REFERENCES public.task_type(type); + + +-- +-- PostgreSQL database dump complete +-- + diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 6f41a72..664f704 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,486 +1,484 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import os import random import unittest import uuid from arrow import utcnow from collections import defaultdict from nose.plugins.attrib import attr from nose.tools import istest import psycopg2 from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler import get_scheduler - - -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') +from . import DATA_DIR @attr('db') class CommonSchedulerTest(SingleDbTestFixture): TEST_DB_NAME = 'softwareheritage-scheduler-test' - TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler.dump') + TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps/swh-scheduler.sql') + TEST_DB_DUMP_TYPE = 'psql' def setUp(self): super().setUp() tt = { 'type': 'update-git', 'description': 'Update a git repository', 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', 'default_interval': datetime.timedelta(days=64), 'min_interval': datetime.timedelta(hours=12), 'max_interval': datetime.timedelta(days=64), 'backoff_factor': 2, 'max_queue_length': None, 'num_retries': 7, 'retry_delay': datetime.timedelta(hours=2), } tt2 = tt.copy() tt2['type'] = 'update-hg' tt2['description'] = 'Update a mercurial repository' tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository' tt2['max_queue_length'] = 42 tt2['num_retries'] = None tt2['retry_delay'] = None self.task_types = { tt['type']: tt, tt2['type']: tt2, } self.task1_template = t1_template = { 'type': tt['type'], 'arguments': { 'args': [], 'kwargs': {}, }, 'next_run': None, } self.task2_template = t2_template = copy.deepcopy(t1_template) t2_template['type'] = tt2['type'] t2_template['policy'] = 'oneshot' def tearDown(self): self.backend.close_connection() self.empty_tables() super().tearDown() def empty_tables(self, whitelist=["priority_ratio"]): query = """SELECT table_name FROM information_schema.tables WHERE table_schema = %%s and table_name not in (%s) """ % ','.join(map(lambda t: "'%s'" % t, whitelist)) self.cursor.execute(query, ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() @istest def add_task_type(self): tt, tt2 = self.task_types.values() self.backend.create_task_type(tt) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) with self.assertRaisesRegex(psycopg2.IntegrityError, '\(type\)=\(%s\)' % tt['type']): self.backend.create_task_type(tt) self.backend.create_task_type(tt2) self.assertEqual(tt, self.backend.get_task_type(tt['type'])) self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) @istest def get_task_types(self): tt, tt2 = self.task_types.values() self.backend.create_task_type(tt) self.backend.create_task_type(tt2) self.assertCountEqual([tt2, tt], self.backend.get_task_types()) @staticmethod def _task_from_template(template, next_run, priority, *args, **kwargs): ret = copy.deepcopy(template) ret['next_run'] = next_run if priority: ret['priority'] = priority if args: ret['arguments']['args'] = list(args) if kwargs: ret['arguments']['kwargs'] = kwargs return ret def _pop_priority(self, priorities): if not priorities: return None for priority, remains in priorities.items(): if remains > 0: priorities[priority] = remains - 1 return priority return None def _tasks_from_template(self, template, max_timestamp, num, num_priority=0, priorities=None): if num_priority and priorities: priorities = { priority: ratio * num_priority for priority, ratio in priorities.items() } tasks = [] for i in range(num + num_priority): priority = self._pop_priority(priorities) tasks.append(self._task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, 'argument-%03d' % i, **{'kwarg%03d' % i: 'bogus-kwarg'} )) return tasks def _create_task_types(self): for tt in self.task_types.values(): self.backend.create_task_type(tt) @istest def create_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() num_tasks_priority = 100 tasks_1 = self._tasks_from_template(self.task1_template, utcnow(), 100) tasks_2 = self._tasks_from_template( self.task2_template, utcnow(), 100, num_tasks_priority, priorities=priority_ratio) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = self.backend.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t['id'] for t in ret1]) # creating the same set result in the same ids ret = self.backend.create_tasks(tasks) set_ret = set([t['id'] for t in ret]) # Idempotence results self.assertEqual(set_ret, set_ret1) self.assertEqual(len(ret), len(ret1)) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = self.task_types[orig_task['type']] self.assertNotIn(task['id'], ids) self.assertEqual(task['status'], 'next_run_not_scheduled') self.assertEqual(task['current_interval'], task_type['default_interval']) self.assertEqual(task['policy'], orig_task.get('policy', 'recurring')) priority = task.get('priority') if priority: actual_priorities[priority] += 1 self.assertEqual(task['retries_left'], task_type['num_retries'] or 0) ids.add(task['id']) del task['id'] del task['status'] del task['current_interval'] del task['retries_left'] if 'policy' not in orig_task: del task['policy'] if 'priority' not in orig_task: del task['priority'] self.assertEqual(task, orig_task) self.assertEqual(dict(actual_priorities), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) @istest def peek_ready_tasks_no_priority(self): self._create_task_types() t = utcnow() task_type = self.task1_template['type'] tasks = self._tasks_from_template(self.task1_template, t, 100) random.shuffle(tasks) self.backend.create_tasks(tasks) ready_tasks = self.backend.peek_ready_tasks(task_type) self.assertEqual(len(ready_tasks), len(tasks)) for i in range(len(ready_tasks) - 1): self.assertLessEqual(ready_tasks[i]['next_run'], ready_tasks[i+1]['next_run']) # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks)//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=limit) self.assertEqual(len(ready_tasks_limited), limit) self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) # Limit by timestamp max_ts = tasks[limit-1]['next_run'] ready_tasks_timestamped = self.backend.peek_ready_tasks( task_type, timestamp=max_ts) for ready_task in ready_tasks_timestamped: self.assertLessEqual(ready_task['next_run'], max_ts) # Make sure we get proper behavior for the first ready tasks self.assertCountEqual( ready_tasks[:len(ready_tasks_timestamped)], ready_tasks_timestamped, ) # Limit by both ready_tasks_both = self.backend.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit//3) self.assertLessEqual(len(ready_tasks_both), limit//3) for ready_task in ready_tasks_both: self.assertLessEqual(ready_task['next_run'], max_ts) self.assertIn(ready_task, ready_tasks[:limit//3]) def _priority_ratio(self): self.cursor.execute('select id, ratio from priority_ratio') priority_ratio = {} for row in self.cursor.fetchall(): priority_ratio[row[0]] = row[1] return priority_ratio @istest def peek_ready_tasks_mixed_priorities(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() task_type = self.task1_template['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( self.task1_template, t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) # take all available tasks ready_tasks = self.backend.peek_ready_tasks( task_type) self.assertEqual(len(ready_tasks), len(tasks)) self.assertEqual(num_tasks_priority + num_tasks_no_priority, len(ready_tasks)) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get('priority') if priority: count_tasks_per_priority[priority] += 1 self.assertEqual(dict(count_tasks_per_priority), { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() }) # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2) num_tasks_priority = random.randrange(5, num_tasks_priority//2) ready_tasks_limited = self.backend.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get('priority') count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] self.assertTrue( actual_prio == expected_count or actual_prio == expected_count + 1) self.assertEqual(count_tasks_per_priority[None], num_tasks) @istest def grab_ready_tasks(self): priority_ratio = self._priority_ratio() self._create_task_types() t = utcnow() task_type = self.task1_template['type'] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = self._tasks_from_template( self.task1_template, t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio) random.shuffle(tasks) self.backend.create_tasks(tasks) first_ready_tasks = self.backend.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) grabbed_tasks = self.backend.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): self.assertEqual(peeked['status'], 'next_run_not_scheduled') del peeked['status'] self.assertEqual(grabbed['status'], 'next_run_scheduled') del grabbed['status'] self.assertEqual(peeked, grabbed) self.assertEqual(peeked['priority'], grabbed['priority']) @istest def get_tasks(self): self._create_task_types() t = utcnow() tasks = self._tasks_from_template(self.task1_template, t, 100) tasks = self.backend.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = tasks[:length] tasks[:length] = [] ret = self.backend.get_tasks(task['id'] for task in cur_tasks) self.assertCountEqual(ret, cur_tasks) @istest def filter_task_to_archive(self): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types() _time = utcnow() recurring = self._tasks_from_template(self.task1_template, _time, 12) oneshots = self._tasks_from_template(self.task2_template, _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') _tasks.append(t) # Randomly update task's status per policy status_per_policy = {'recurring': 0, 'oneshot': 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] 'recurring': [(1, 'disabled'), (0, 'completed'), (0, 'next_run_not_scheduled')], 'oneshot': [(0, 'next_run_not_scheduled'), (1, 'disabled'), (1, 'completed')] } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task['policy'] _task_ids[policy].append(task['id']) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task['id']) self.backend.disable_tasks(tasks_to_update['recurring']) # hack: change the status to something else than completed/disabled self.backend.set_status_tasks( _task_ids['oneshot'], status='next_run_not_scheduled') # complete the tasks to update self.backend.set_status_tasks( tasks_to_update['oneshot'], status='completed') total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) # retrieve tasks to archive after = _time.shift(days=-1).format('YYYY-MM-DD') before = utcnow().shift(days=1).format('YYYY-MM-DD') tasks_to_archive = list(self.backend.filter_task_to_archive( after_ts=after, before_ts=before, limit=total_tasks)) self.assertEqual(len(tasks_to_archive), total_tasks_filtered) actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: actual_filtered_per_status[task['task_policy']] += 1 self.assertEqual(actual_filtered_per_status, status_per_policy) @istest def delete_archived_tasks(self): self._create_task_types() _time = utcnow() recurring = self._tasks_from_template( self.task1_template, _time, 12) oneshots = self._tasks_from_template( self.task2_template, _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = self.backend.create_tasks(recurring + oneshots) backend_tasks = [{ 'task': task['id'], 'backend_id': str(uuid.uuid4()), 'scheduled': utcnow(), } for task in pending_tasks] self.backend.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = self.backend.end_task_run( task['backend_id'], status='eventful') c = random.randint(0, 100) if c <= percent: _tasks.append({'task_id': t['task'], 'task_run_id': t['id']}) self.backend.delete_archived_tasks(_tasks) self.cursor.execute('select count(*) from task') tasks_count = self.cursor.fetchone() self.cursor.execute('select count(*) from task_run') tasks_run_count = self.cursor.fetchone() self.assertEqual(tasks_count[0], total_tasks - len(_tasks)) self.assertEqual(tasks_run_count[0], total_tasks - len(_tasks)) class LocalSchedulerTest(CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME} self.backend = get_scheduler('local', self.config) diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py index c141f6c..2e3f70e 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -1,71 +1,68 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from arrow import utcnow from nose.plugins.attrib import attr from nose.tools import istest from hypothesis import given from hypothesis.strategies import sets from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent +from swh.scheduler.tests import DATA_DIR from . import from_regex -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') - - @attr('db') class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' - TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, - 'dumps/swh-scheduler-updater.dump') + TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps/swh-scheduler-updater.sql') + TEST_DB_DUMP_TYPE = 'psql' def setUp(self): super().setUp() config = { 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, 'cache_read_limit': 1000, } self.backend = SchedulerUpdaterBackend(**config) def _empty_tables(self): self.cursor.execute( """SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def tearDown(self): self.backend.close_connection() self._empty_tables() super().tearDown() @istest @given(sets( from_regex( r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), min_size=10, max_size=15)) def cache_read(self, urls): def gen_events(urls): for url in urls: yield SWHEvent({ 'url': url, 'type': 'create', 'origin_type': 'git', }) self.backend.cache_put(gen_events(urls)) r = self.backend.cache_read(timestamp=utcnow()) self.assertNotEqual(r, []) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index 2d98f28..305cd35 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,162 +1,163 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from nose.plugins.attrib import attr from nose.tools import istest from swh.core.tests.db_testing import DbTestFixture from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.writer import UpdaterWriter from swh.scheduler.updater.events import LISTENED_EVENTS +from swh.scheduler.tests import DATA_DIR from . import UpdaterTestUtil -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') - - @attr('db') class CommonSchedulerTest(DbTestFixture): TEST_SCHED_DB = 'softwareheritage-scheduler-test' - TEST_SCHED_DUMP = os.path.join(TEST_DATA_DIR, - 'dumps/swh-scheduler.dump') + TEST_SCHED_DUMP = os.path.join( + DATA_DIR, 'dumps/swh-scheduler.sql') + TEST_SCHED_DUMP_TYPE = 'psql' TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' - TEST_SCHED_UPDATER_DUMP = os.path.join(TEST_DATA_DIR, - 'dumps/swh-scheduler-updater.dump') + TEST_SCHED_UPDATER_DUMP = os.path.join( + DATA_DIR, 'dumps/swh-scheduler-updater.sql') + TEST_SCHED_UPDATER_DUMP_TYPE = 'psql' @classmethod def setUpClass(cls): - cls.add_db(cls.TEST_SCHED_DB, cls.TEST_SCHED_DUMP) - cls.add_db(cls.TEST_SCHED_UPDATER_DB, cls.TEST_SCHED_UPDATER_DUMP) + cls.add_db(cls.TEST_SCHED_DB, cls.TEST_SCHED_DUMP, + cls.TEST_SCHED_DUMP_TYPE) + cls.add_db(cls.TEST_SCHED_UPDATER_DB, cls.TEST_SCHED_UPDATER_DUMP, + cls.TEST_SCHED_UPDATER_DUMP_TYPE) super().setUpClass() def tearDown(self): self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) self.reset_db_tables(self.TEST_SCHED_DB, excluded=['task_type', 'priority_ratio']) super().tearDown() class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() config = { 'scheduler': { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-test', 'cache_read_limit': 5, }, 'pause': 0.1, 'verbose': False, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend def tearDown(self): self.scheduler_backend.close_connection() self.scheduler_updater_backend.close_connection() super().tearDown() @istest def run_ko(self): """Only git tasks are supported for now, other types are dismissed. """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'svn')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # other reads after writes are still empty since it's not supported self.assertEqual(len(r), 0) @istest def run_ok(self): """Only git origin are supported for now """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'git')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() # now, we should have scheduling task ready r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEquals(len(r), expected_length) # Check the task has been scheduled for t in r: self.assertEquals(t['type'], 'origin-update-git') self.assertEquals(t['priority'], 'normal') self.assertEquals(t['policy'], 'oneshot') self.assertEquals(t['status'], 'next_run_not_scheduled') # writer has nothing to do now self.writer.run() # so no more data in cache data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), 0) # provided, no runner is ran, still the same amount of scheduling tasks r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEquals(len(r), expected_length)