diff --git a/PKG-INFO b/PKG-INFO index eddb527..8dd20c8 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.15 +Version: 0.0.16 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/control b/debian/control index 8835277..83ffa94 100644 --- a/debian/control +++ b/debian/control @@ -1,22 +1,22 @@ Source: swh-scheduler Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-arrow, python3-celery, python3-click, python3-nose, python3-psycopg2, python3-setuptools, - python3-swh.core (>= 0.0.30), + python3-swh.core (>= 0.0.34), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSCH/ Package: python3-swh.scheduler Architecture: all -Depends: python3-swh.core (>= 0.0.30), ${misc:Depends}, ${python3:Depends} +Depends: python3-swh.core (>= 0.0.34), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Scheduler diff --git a/requirements-swh.txt b/requirements-swh.txt index 805bdd1..aaeccaa 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1 @@ -swh.core >= 0.0.30 +swh.core >= 0.0.34 diff --git a/sql/.gitignore b/sql/.gitignore new file mode 100644 index 0000000..d501764 --- /dev/null +++ b/sql/.gitignore @@ -0,0 +1,3 @@ +*-stamp +autodoc/ +swh-scheduler.dump diff --git a/sql/Makefile b/sql/Makefile new file mode 100644 index 0000000..ae73de6 --- /dev/null +++ b/sql/Makefile @@ -0,0 +1,48 @@ +# Depends: postgresql-client, postgresql-autodoc + +DBNAME = softwareheritage-scheduler-dev +DOCDIR = autodoc + +SQL_SCHEMA = swh-scheduler-schema.sql +SQLS = $(SQL_SCHEMA) + +PSQL_BIN = psql +PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1 +PSQL = $(PSQL_BIN) $(PSQL_FLAGS) + +all: + +createdb: createdb-stamp +createdb-stamp: $(SQL_INIT) + createdb $(DBNAME) + touch $@ + +filldb: filldb-stamp +filldb-stamp: createdb-stamp + cat $(SQLS) | $(PSQL) $(DBNAME) + touch $@ + +dropdb: + -dropdb $(DBNAME) + +dumpdb: swh-scheduler.dump +swh-scheduler.dump: filldb-stamp + pg_dump -Fc $(DBNAME) > $@ + +doc: autodoc-stamp $(DOCDIR)/swh-scheduler.pdf +autodoc-stamp: filldb-stamp + test -d $(DOCDIR)/ || mkdir $(DOCDIR) + postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh + cp -a $(DOCDIR)/swh-scheduler.dot $(DOCDIR)/swh-scheduler.dot.orig + touch $@ + +$(DOCDIR)/swh-scheduler.pdf: autodoc-stamp + dot -T pdf $(DOCDIR)/swh-scheduler.dot > $(DOCDIR)/swh-scheduler.pdf + +clean: + rm -rf *-stamp $(DOCDIR)/ + +distclean: clean dropdb + rm -f swh-scheduler.dump + +.PHONY: all initdb createdb dropdb doc clean diff --git a/sql/swh-scheduler-schema.sql b/sql/swh-scheduler-schema.sql index 697f397..6b1d403 100644 --- a/sql/swh-scheduler-schema.sql +++ b/sql/swh-scheduler-schema.sql @@ -1,258 +1,300 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) - values (3, now(), 'Work In Progress'); + values (5, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval not null, min_interval interval not null, max_interval interval not null, - backoff_factor float not null + backoff_factor float not null, + max_queue_length bigint, + num_retries bigint, + retry_delay interval ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; +comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; +comment on column task_type.num_retries is 'Default number of retries on transient failures'; +comment on column task_type.retry_delay is 'Retry delay for the task'; -create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'disabled'); +create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'); comment on type task_status is 'Status of a given task'; +create type task_policy as enum ('recurring', 'oneshot'); +comment on type task_policy is 'Recurrence policy of the given task'; + create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval not null, - status task_status not null + status task_status not null, + policy task_policy not null default 'recurring', + retries_left bigint not null default 0 ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; +comment on column task.policy is 'Whether the task is one-shot or recurring'; +comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' + 'transient failure'; create index on task(type); create index on task(next_run); create index task_args on task using btree ((arguments -> 'args')); create index task_kwargs on task using gin ((arguments -> 'kwargs')); create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'lost'); comment on type task_run_status is 'Status of a given task run'; create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; create index on task_run(task); create index on task_run(backend_id); create or replace function swh_scheduler_mktemp_task () returns void language sql as $$ create temporary table tmp_task ( like task excluding indexes ) on commit drop; alter table tmp_task drop column id, drop column current_interval, - drop column status; + drop column status, + alter column policy drop not null, + alter column retries_left drop not null; $$; comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; create or replace function swh_scheduler_create_tasks_from_temp () returns setof task language plpgsql as $$ begin return query - insert into task (type, arguments, next_run, status, current_interval) + insert into task (type, arguments, next_run, status, current_interval, policy, retries_left) select type, arguments, next_run, 'next_run_not_scheduled', - (select default_interval from task_type tt where tt.type = tmp_task.type) + (select default_interval from task_type tt where tt.type = tmp_task.type), + coalesce(policy, 'recurring'), + coalesce(retries_left, (select num_retries from task_type tt where tt.type = tmp_task.type), 0) from tmp_task returning task.*; end; $$; comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; -create or replace function swh_scheduler_peek_ready_tasks (ts timestamptz default now(), +create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql stable as $$ select * from task where next_run <= ts - and status = 'next_run_not_scheduled' + and type = task_type + and status = 'next_run_not_scheduled' order by next_run limit num_tasks; $$; -create or replace function swh_scheduler_grab_ready_tasks (ts timestamptz default now(), +create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), num_tasks bigint default NULL) returns setof task language sql as $$ update task set status='next_run_scheduled' from ( select id from task where next_run <= ts + and type = task_type and status='next_run_not_scheduled' order by next_run limit num_tasks for update skip locked ) next_tasks where task.id = next_tasks.id returning task.*; $$; create or replace function swh_scheduler_schedule_task_run (task_id bigint, backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ insert into task_run (task, backend_id, metadata, scheduled, status) values (task_id, backend_id, metadata, ts, 'scheduled') returning *; $$; create or replace function swh_scheduler_mktemp_task_run () returns void language sql as $$ create temporary table tmp_task_run ( like task_run excluding indexes ) on commit drop; alter table tmp_task_run drop column id, drop column status; $$; comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling'; create or replace function swh_scheduler_schedule_task_run_from_temp () returns void language plpgsql as $$ begin insert into task_run (task, backend_id, metadata, scheduled, status) select task, backend_id, metadata, scheduled, 'scheduled' from tmp_task_run; return; end; $$; create or replace function swh_scheduler_start_task_run (backend_id text, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set started = ts, status = 'started', metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata where task_run.backend_id = swh_scheduler_start_task_run.backend_id returning *; $$; create or replace function swh_scheduler_end_task_run (backend_id text, status task_run_status, metadata jsonb default '{}'::jsonb, ts timestamptz default now()) returns task_run language sql as $$ update task_run set ended = ts, status = swh_scheduler_end_task_run.status, metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata where task_run.backend_id = swh_scheduler_end_task_run.backend_id returning *; $$; -create or replace function swh_scheduler_compute_new_task_interval (task_type text, - current_interval interval, - end_status task_run_status) - returns interval +create or replace function swh_scheduler_update_task_on_task_end () + returns trigger language plpgsql - stable as $$ declare - task_type_row task_type%rowtype; + cur_task task%rowtype; + cur_task_type task_type%rowtype; adjustment_factor float; + new_interval interval; begin - select * - from task_type - where type = swh_scheduler_compute_new_task_interval.task_type - into task_type_row; + select * from task where id = new.task into cur_task; + select * from task_type where type = cur_task.type into cur_task_type; - case end_status - when 'eventful' then - adjustment_factor := 1/task_type_row.backoff_factor; - when 'uneventful' then - adjustment_factor := task_type_row.backoff_factor; - else - -- failed or lost task: no backoff. - adjustment_factor := 1; + case + when new.status = 'permfailed' then + update task + set status = 'disabled' + where id = cur_task.id; + when new.status in ('eventful', 'uneventful') then + case + when cur_task.policy = 'oneshot' then + update task + set status = 'completed' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + if new.status = 'uneventful' then + adjustment_factor := 1/cur_task_type.backoff_factor; + else + adjustment_factor := 1/cur_task_type.backoff_factor; + end if; + new_interval := greatest( + cur_task_type.min_interval, + least( + cur_task_type.max_interval, + adjustment_factor * cur_task.current_interval)); + update task + set status = 'next_run_not_scheduled', + next_run = now() + new_interval, + interval = new_interval, + retries_left = cur_task_type.max_retries + where id = cur_task.id; + end case; + else -- new.status in 'failed', 'lost' + if coalesce(cur_task.retries_left, 0) > 0 then + update task + set status = 'next_run_not_scheduled', + next_run = now() + cur_task_type.retry_delay, + retries_left = cur_task.retries_left - 1 + where id = cur_task.id; + else -- no retries left + case + when cur_task.policy = 'oneshot' then + update task + set status = 'disabled' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + update task + set status = 'next_run_not_scheduled', + next_run = now() + cur_task.current_interval, + retries_left = cur_task_type.max_retries + where id = cur_task.id; + end case; + end if; -- retries end case; - - return greatest(task_type_row.min_interval, - least(task_type_row.max_interval, - adjustment_factor * current_interval)); -end; -$$; - -create or replace function swh_scheduler_update_task_interval () - returns trigger - language plpgsql -as $$ -begin - update task - set status = 'next_run_not_scheduled', - current_interval = swh_scheduler_compute_new_task_interval(type, current_interval, new.status), - next_run = now () + swh_scheduler_compute_new_task_interval(type, current_interval, new.status) - where id = new.task; return null; end; $$; -create trigger update_interval_on_task_end +create trigger update_task_on_task_end after update of status on task_run for each row - when (new.status IN ('eventful', 'uneventful', 'failed', 'lost')) - execute procedure swh_scheduler_update_task_interval (); + when (new.status NOT IN ('scheduled', 'started')) + execute procedure swh_scheduler_update_task_on_task_end (); diff --git a/sql/updates/04.sql b/sql/updates/04.sql new file mode 100644 index 0000000..a051388 --- /dev/null +++ b/sql/updates/04.sql @@ -0,0 +1,53 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 03 +-- to_version: 04 +-- description: Add a maximum queue length to the task types in the scheduler + +begin; + +insert into dbversion (version, release, description) +values (4, now(), 'Work In Progress'); + +alter table task_type add column max_queue_length bigint; +comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; + + +drop function swh_scheduler_peek_ready_tasks (timestamptz, bigint); +drop function swh_scheduler_grab_ready_tasks (timestamptz, bigint); + +create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(), + num_tasks bigint default NULL) + returns setof task + language sql + stable +as $$ +select * from task + where next_run <= ts + and type = task_type + and status = 'next_run_not_scheduled' + order by next_run + limit num_tasks; +$$; + +create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), + num_tasks bigint default NULL) + returns setof task + language sql +as $$ + update task + set status='next_run_scheduled' + from ( + select id from task + where next_run <= ts + and type = task_type + and status='next_run_not_scheduled' + order by next_run + limit num_tasks + for update skip locked + ) next_tasks + where task.id = next_tasks.id + returning task.*; +$$; + + +commit; diff --git a/sql/updates/05.sql b/sql/updates/05.sql new file mode 100644 index 0000000..ee40ebb --- /dev/null +++ b/sql/updates/05.sql @@ -0,0 +1,143 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 04 +-- to_version: 05 +-- description: Add reccurrence logic for temporary failures and one-shot tasks + +begin; + +insert into dbversion (version, release, description) +values (5, now(), 'Work In Progress'); + +alter table task_type add column num_retries bigint; +alter table task_type add column retry_delay interval; + +comment on column task_type.num_retries is 'Default number of retries on transient failures'; +comment on column task_type.retry_delay is 'Retry delay for the task'; + +alter type task_status add value if not exists 'completed' before 'disabled'; + +create type task_policy as enum ('recurring', 'oneshot'); +comment on type task_policy is 'Recurrence policy of the given task'; + +alter table task add column policy task_policy not null default 'recurring'; +alter table task add column retries_left bigint not null default 0; + +comment on column task.policy is 'Whether the task is one-shot or recurring'; +comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' + 'transient failure'; + + +create or replace function swh_scheduler_mktemp_task () + returns void + language sql +as $$ + create temporary table tmp_task ( + like task excluding indexes + ) on commit drop; + alter table tmp_task + drop column id, + drop column current_interval, + drop column status, + alter column policy drop not null, + alter column retries_left drop not null; +$$; + +comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; + +create or replace function swh_scheduler_create_tasks_from_temp () + returns setof task + language plpgsql +as $$ +begin + return query + insert into task (type, arguments, next_run, status, current_interval, policy, retries_left) + select type, arguments, next_run, 'next_run_not_scheduled', + (select default_interval from task_type tt where tt.type = tmp_task.type), + coalesce(policy, 'recurring'), + coalesce(retries_left, (select num_retries from task_type tt where tt.type = tmp_task.type), 0) + from tmp_task + returning task.*; +end; +$$; + +comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; + +drop trigger update_interval_on_task_end; +drop function swh_scheduler_compute_new_task_interval (task_type, current_interval, end_status) cascade; +drop function swh_scheduler_update_task_interval () cascade; + +create or replace function swh_scheduler_update_task_on_task_end () + returns trigger + language plpgsql +as $$ +declare + cur_task task%rowtype; + cur_task_type task_type%rowtype; + adjustment_factor float; + new_interval interval; +begin + select * from task where id = new.task into cur_task; + select * from task_type where type = cur_task.type into cur_task_type; + + case + when new.status = 'permfailed' then + update task + set status = 'disabled' + where id = cur_task.id; + when new.status in ('eventful', 'uneventful') then + case + when cur_task.policy = 'oneshot' then + update task + set status = 'completed' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + if new.status = 'uneventful' then + adjustment_factor := 1/cur_task_type.backoff_factor; + else + adjustment_factor := 1/cur_task_type.backoff_factor; + end if; + new_interval := greatest( + cur_task_type.min_interval, + least( + cur_task_type.max_interval, + adjustment_factor * cur_task.current_interval)); + update task + set status = 'next_run_not_scheduled', + next_run = now() + new_interval, + interval = new_interval, + retries_left = cur_task_type.max_retries + where id = cur_task.id; + end case; + else -- new.status in 'failed', 'lost' + if coalesce(cur_task.retries_left, 0) > 0 then + update task + set status = 'next_run_not_scheduled', + next_run = now() + cur_task_type.retry_delay, + retries_left = cur_task.retries_left - 1 + where id = cur_task.id; + else -- no retries left + case + when cur_task.policy = 'oneshot' then + update task + set status = 'disabled' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + update task + set status = 'next_run_not_scheduled', + next_run = now() + cur_task.current_interval, + retries_left = cur_task_type.max_retries + where id = cur_task.id; + end case; + end if; -- retries + end case; + return null; +end; +$$; + +create trigger update_task_on_task_end + after update of status on task_run + for each row + when (new.status NOT IN ('scheduled', 'started')) + execute procedure swh_scheduler_update_task_on_task_end (); + +commit; diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index eddb527..8dd20c8 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.15 +Version: 0.0.16 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index f342d47..c69088f 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,36 +1,41 @@ .gitignore AUTHORS LICENSE LICENSE.Celery MANIFEST.in Makefile requirements-swh.txt requirements.txt setup.py version.txt bin/swh-worker-control debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format +sql/.gitignore +sql/Makefile sql/swh-scheduler-schema.sql sql/swh-scheduler-testdata.sql sql/updates/02.sql sql/updates/03.sql +sql/updates/04.sql +sql/updates/05.sql swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/backend.py swh/scheduler/cli.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py +swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_task.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index 83d6ba0..8aac98b 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,6 +1,6 @@ Click arrow celery<4 psycopg2 -swh.core>=0.0.30 +swh.core>=0.0.34 vcversioner diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index b7f0cb4..df13e58 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,385 +1,413 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime from functools import wraps import json import tempfile from arrow import Arrow, utcnow import psycopg2 import psycopg2.extras from psycopg2.extensions import AsIs from swh.core.config import SWHConfig def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped class SchedulerBackend(SWHConfig): """ Backend for the Software Heritage scheduling database. """ CONFIG_BASE_FILENAME = 'scheduler.ini' DEFAULT_CONFIG = { 'scheduling_db': ('str', 'dbname=swh-scheduler'), } def __init__(self, **override_config): self.config = self.parse_config_file(global_config=False) self.config.update(override_config) self.db = None self.reconnect() def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['scheduling_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() def copy_to(self, items, tblname, columns, cursor=None, item_cb=None): def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, (datetime.datetime, Arrow)): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k)) for k in columns] f.write(','.join(line)) f.write('\n') f.seek(0) cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', - 'min_interval', 'max_interval', 'backoff_factor', + 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', + 'num_retries', 'retry_delay', ] def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) def _format_multiquery(self, query, keys, values): """Format a query with placeholders generated for multiple values""" query_keys = ', '.join(keys) placeholders = '), ('.join( [', '.join(['%s'] * len(keys))] * len(values) ) ret_values = sum([[value[key] for key in keys] for value in values], []) return ( query.format(keys=query_keys, placeholders=placeholders), ret_values, ) @autocommit def create_task_type(self, task_type, cursor=None): """Create a new task type ready for scheduling. A task type is a dictionary with the following keys: type (str): an identifier for the task type description (str): a human-readable description of what the task does backend_name (str): the name of the task in the job-scheduling backend default_interval (datetime.timedelta): the default interval between two task runs min_interval (datetime.timedelta): the minimum interval between two task runs max_interval (datetime.timedelta): the maximum interval between two task runs backoff_factor (float): the factor by which the interval changes at each run + max_queue_length (int): the maximum length of the task queue for + this task type """ query = self._format_query( """insert into task_type ({keys}) values ({placeholders})""", self.task_type_keys, ) cursor.execute(query, [task_type[key] for key in self.task_type_keys]) @autocommit def get_task_type(self, task_type_name, cursor=None): """Retrieve the task type with id task_type_name""" query = self._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cursor.execute(query, (task_type_name,)) ret = cursor.fetchone() return ret - task_keys = ['id', 'type', 'arguments', 'next_run', 'current_interval', - 'status'] - task_create_keys = ['type', 'arguments', 'next_run'] + @autocommit + def get_task_types(self, cursor=None): + query = self._format_query( + "select {keys} from task_type", + self.task_type_keys, + ) + cursor.execute(query) + ret = cursor.fetchall() + return ret + + task_create_keys = [ + 'type', 'arguments', 'next_run', 'policy', 'retries_left', + ] + task_keys = task_create_keys + ['id', 'current_interval', 'status'] @autocommit def create_tasks(self, tasks, cursor=None): """Create new tasks. A task is a dictionary with the following keys: type (str): the task type arguments (dict): the arguments for the task runner args (list of str): arguments kwargs (dict str -> str): keyword arguments next_run (datetime.datetime): the next scheduled run for the task This returns a list of created task ids. """ cursor.execute('select swh_scheduler_mktemp_task()') self.copy_to(tasks, 'tmp_task', self.task_create_keys, cursor) query = self._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cursor.execute(query) return cursor.fetchall() @autocommit def disable_tasks(self, task_ids, cursor=None): """Disable the tasks whose ids are listed.""" query = "UPDATE task SET status = 'disabled' WHERE id IN %s" cursor.execute(query, (tuple(task_ids),)) return None @autocommit - def peek_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): + def get_tasks(self, task_ids, cursor=None): + """Retrieve the info of tasks whose ids are listed.""" + query = self._format_query('select {keys} from task where id in %s', + self.task_keys) + cursor.execute(query, (tuple(task_ids),)) + return cursor.fetchall() + + @autocommit + def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, + cursor=None): """Fetch the list of ready tasks Args: timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() - cursor.execute('select * from swh_scheduler_peek_ready_tasks(%s, %s)', - (timestamp, num_tasks)) + cursor.execute( + 'select * from swh_scheduler_peek_ready_tasks(%s, %s, %s)', + (task_type, timestamp, num_tasks) + ) return cursor.fetchall() @autocommit - def grab_ready_tasks(self, timestamp=None, num_tasks=None, cursor=None): + def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, + cursor=None): """Fetch the list of ready tasks, and mark them as scheduled Args: timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() - cursor.execute('select * from swh_scheduler_grab_ready_tasks(%s, %s)', - (timestamp, num_tasks)) + cursor.execute( + 'select * from swh_scheduler_grab_ready_tasks(%s, %s, %s)', + (task_type, timestamp, num_tasks) + ) return cursor.fetchall() task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] @autocommit def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def mass_schedule_task_runs(self, task_runs, cursor=None): """Schedule a bunch of task runs. Args: task_runs: a list of dicts with keys: task (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry scheduled (datetime.datetime): the instant the event occurred Returns: None """ cursor.execute('select swh_scheduler_mktemp_task_run()') self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, cursor) cursor.execute('select swh_scheduler_schedule_task_run_from_temp()') @autocommit def start_task_run(self, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def end_task_run(self, backend_id, status, metadata=None, timestamp=None, result=None, cursor=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status ('eventful', 'uneventful', 'failed'): how the task ended metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) return cursor.fetchone() diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index 67738c3..9d48c74 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,62 +1,84 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import time + import arrow from celery import group from ..backend import SchedulerBackend from .config import app as main_app +# Max batch size for tasks +MAX_NUM_TASKS = 10000 + + def run_ready_tasks(backend, app): """Run all tasks that are ready""" - backend_names = {} - while True: + throttled = False cursor = backend.cursor() - pending_tasks = backend.grab_ready_tasks(num_tasks=10000, - cursor=cursor) + task_types = {} + pending_tasks = [] + for task_type in backend.get_task_types(cursor=cursor): + task_type_name = task_type['type'] + task_types[task_type_name] = task_type + max_queue_length = task_type['max_queue_length'] + if max_queue_length: + backend_name = task_type['backend_name'] + queue_name = app.tasks[backend_name].task_queue + queue_length = app.get_queue_length(queue_name) + num_tasks = min(max_queue_length - queue_length, + MAX_NUM_TASKS) + else: + num_tasks = MAX_NUM_TASKS + if num_tasks > 0: + pending_tasks.extend( + backend.grab_ready_tasks(task_type_name, + num_tasks=num_tasks, + cursor=cursor)) + if not pending_tasks: break celery_tasks = [] for task in pending_tasks: - backend_name = backend_names.get(task['type']) - if not backend_name: - task_type = backend.get_task_type(task['type'], cursor=cursor) - backend_names[task['type']] = task_type['backend_name'] - backend_name = task_type['backend_name'] - args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] - celery_task = app.tasks[backend_name].s(*args, **kwargs) + celery_task = app.tasks[ + task_types[task['type']]['backend_name'] + ].s(*args, **kwargs) celery_tasks.append(celery_task) group_result = group(celery_tasks).delay() backend_tasks = [{ 'task': task['id'], 'backend_id': group_result.results[i].id, 'scheduled': arrow.utcnow(), } for i, task in enumerate(pending_tasks)] backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) backend.commit() + if throttled: + time.sleep(10) + if __name__ == '__main__': for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = SchedulerBackend() try: run_ready_tasks(main_backend, main_app) except: main_backend.rollback() raise diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py new file mode 100644 index 0000000..67d7f2a --- /dev/null +++ b/swh/scheduler/tests/test_scheduler.py @@ -0,0 +1,238 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import copy +import datetime +import os +import random +import unittest + +from arrow import utcnow +from nose.plugins.attrib import attr +from nose.tools import istest +import psycopg2 + +from swh.core.tests.db_testing import SingleDbTestFixture +from swh.scheduler.backend import SchedulerBackend + + +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') + + +@attr('db') +class Scheduler(SingleDbTestFixture, unittest.TestCase): + TEST_DB_NAME = 'softwareheritage-scheduler-test' + TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler.dump') + + def setUp(self): + super().setUp() + self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME} + self.backend = SchedulerBackend(**self.config) + + tt = { + 'type': 'update-git', + 'description': 'Update a git repository', + 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', + 'default_interval': datetime.timedelta(days=64), + 'min_interval': datetime.timedelta(hours=12), + 'max_interval': datetime.timedelta(days=64), + 'backoff_factor': 2, + 'max_queue_length': None, + 'num_retries': 7, + 'retry_delay': datetime.timedelta(hours=2), + } + tt2 = tt.copy() + tt2['type'] = 'update-hg' + tt2['description'] = 'Update a mercurial repository' + tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository' + tt2['max_queue_length'] = 42 + tt2['num_retries'] = None + tt2['retry_delay'] = None + + self.task_types = { + tt['type']: tt, + tt2['type']: tt2, + } + + self.task1_template = t1_template = { + 'type': tt['type'], + 'arguments': { + 'args': [], + 'kwargs': {}, + }, + 'next_run': None, + } + self.task2_template = t2_template = copy.deepcopy(t1_template) + t2_template['type'] = tt2['type'] + t2_template['policy'] = 'oneshot' + + def tearDown(self): + self.backend.db.close() + self.empty_tables() + super().tearDown() + + def empty_tables(self): + self.cursor.execute("""SELECT table_name FROM information_schema.tables + WHERE table_schema = %s""", ('public',)) + + tables = set(table for (table,) in self.cursor.fetchall()) + + for table in tables: + self.cursor.execute('truncate table %s cascade' % table) + self.conn.commit() + + @istest + def add_task_type(self): + tt, tt2 = self.task_types.values() + self.backend.create_task_type(tt) + self.assertEqual(tt, self.backend.get_task_type(tt['type'])) + with self.assertRaisesRegex(psycopg2.IntegrityError, + '\(type\)=\(%s\)' % tt['type']): + self.backend.create_task_type(tt) + self.backend.create_task_type(tt2) + self.assertEqual(tt, self.backend.get_task_type(tt['type'])) + self.assertEqual(tt2, self.backend.get_task_type(tt2['type'])) + + @istest + def get_task_types(self): + tt, tt2 = self.task_types.values() + self.backend.create_task_type(tt) + self.backend.create_task_type(tt2) + self.assertCountEqual([tt2, tt], self.backend.get_task_types()) + + @staticmethod + def _task_from_template(template, next_run, *args, **kwargs): + ret = copy.deepcopy(template) + ret['next_run'] = next_run + if args: + ret['arguments']['args'] = list(args) + if kwargs: + ret['arguments']['kwargs'] = kwargs + return ret + + def _tasks_from_template(self, template, max_timestamp, num): + return [ + self._task_from_template( + template, + max_timestamp - datetime.timedelta(microseconds=i), + 'argument-%03d' % i, + **{'kwarg%03d' % i: 'bogus-kwarg'} + ) + for i in range(num) + ] + + def _create_task_types(self): + for tt in self.task_types.values(): + self.backend.create_task_type(tt) + + @istest + def create_tasks(self): + self._create_task_types() + tasks = ( + self._tasks_from_template(self.task1_template, utcnow(), 100) + + self._tasks_from_template(self.task2_template, utcnow(), 100) + ) + ret = self.backend.create_tasks(tasks) + ids = set() + for task, orig_task in zip(ret, tasks): + task = copy.deepcopy(task) + task_type = self.task_types[orig_task['type']] + self.assertNotIn(task['id'], ids) + self.assertEqual(task['status'], 'next_run_not_scheduled') + self.assertEqual(task['current_interval'], + task_type['default_interval']) + self.assertEqual(task['policy'], orig_task.get('policy', + 'recurring')) + self.assertEqual(task['retries_left'], + task_type['num_retries'] or 0) + ids.add(task['id']) + del task['id'] + del task['status'] + del task['current_interval'] + del task['retries_left'] + if 'policy' not in orig_task: + del task['policy'] + self.assertEqual(task, orig_task) + + @istest + def peek_ready_tasks(self): + self._create_task_types() + t = utcnow() + task_type = self.task1_template['type'] + tasks = self._tasks_from_template(self.task1_template, t, 100) + random.shuffle(tasks) + self.backend.create_tasks(tasks) + + ready_tasks = self.backend.peek_ready_tasks(task_type) + self.assertEqual(len(ready_tasks), len(tasks)) + for i in range(len(ready_tasks) - 1): + self.assertLessEqual(ready_tasks[i]['next_run'], + ready_tasks[i+1]['next_run']) + + # Only get the first few ready tasks + limit = random.randrange(5, 5 + len(tasks)//2) + ready_tasks_limited = self.backend.peek_ready_tasks( + task_type, num_tasks=limit) + + self.assertEqual(len(ready_tasks_limited), limit) + self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit]) + + # Limit by timestamp + max_ts = tasks[limit-1]['next_run'] + ready_tasks_timestamped = self.backend.peek_ready_tasks( + task_type, timestamp=max_ts) + + for ready_task in ready_tasks_timestamped: + self.assertLessEqual(ready_task['next_run'], max_ts) + + # Make sure we get proper behavior for the first ready tasks + self.assertCountEqual( + ready_tasks[:len(ready_tasks_timestamped)], + ready_tasks_timestamped, + ) + + # Limit by both + ready_tasks_both = self.backend.peek_ready_tasks( + task_type, timestamp=max_ts, num_tasks=limit//3) + self.assertLessEqual(len(ready_tasks_both), limit//3) + for ready_task in ready_tasks_both: + self.assertLessEqual(ready_task['next_run'], max_ts) + self.assertIn(ready_task, ready_tasks[:limit//3]) + + @istest + def grab_ready_tasks(self): + self._create_task_types() + t = utcnow() + task_type = self.task1_template['type'] + tasks = self._tasks_from_template(self.task1_template, t, 100) + random.shuffle(tasks) + self.backend.create_tasks(tasks) + + first_ready_tasks = self.backend.peek_ready_tasks( + task_type, num_tasks=10) + grabbed_tasks = self.backend.grab_ready_tasks(task_type, num_tasks=10) + + for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): + self.assertEqual(peeked['status'], 'next_run_not_scheduled') + del peeked['status'] + self.assertEqual(grabbed['status'], 'next_run_scheduled') + del grabbed['status'] + self.assertEqual(peeked, grabbed) + + @istest + def get_tasks(self): + self._create_task_types() + t = utcnow() + tasks = self._tasks_from_template(self.task1_template, t, 100) + tasks = self.backend.create_tasks(tasks) + random.shuffle(tasks) + while len(tasks) > 1: + length = random.randrange(1, len(tasks)) + cur_tasks = tasks[:length] + tasks[:length] = [] + + ret = self.backend.get_tasks(task['id'] for task in cur_tasks) + self.assertCountEqual(ret, cur_tasks) diff --git a/version.txt b/version.txt index 66131fc..45b2792 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.15-0-g0e14eff \ No newline at end of file +v0.0.16-0-gee545d4 \ No newline at end of file