Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/sql/swh-scheduler-schema.sql b/sql/swh-scheduler-schema.sql
index 024326e..a297e98 100644
--- a/sql/swh-scheduler-schema.sql
+++ b/sql/swh-scheduler-schema.sql
@@ -1,260 +1,262 @@
create table dbversion
(
version int primary key,
release timestamptz not null,
description text not null
);
comment on table dbversion is 'Schema update tracking';
insert into dbversion (version, release, description)
values (4, now(), 'Work In Progress');
create table task_type (
type text primary key,
description text not null,
backend_name text not null,
default_interval interval not null,
min_interval interval not null,
max_interval interval not null,
- backoff_factor float not null
+ backoff_factor float not null,
+ max_queue_length bigint
);
comment on table task_type is 'Types of schedulable tasks';
comment on column task_type.type is 'Short identifier for the task type';
comment on column task_type.description is 'Human-readable task description';
comment on column task_type.backend_name is 'Name of the task in the job-running backend';
comment on column task_type.default_interval is 'Default interval for newly scheduled tasks';
comment on column task_type.min_interval is 'Minimum interval between two runs of a task';
comment on column task_type.max_interval is 'Maximum interval between two runs of a task';
comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs';
+comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks';
create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'disabled');
comment on type task_status is 'Status of a given task';
create table task (
id bigserial primary key,
type text not null references task_type(type),
arguments jsonb not null,
next_run timestamptz not null,
current_interval interval not null,
status task_status not null
);
comment on table task is 'Schedule of recurring tasks';
comment on column task.arguments is 'Arguments passed to the underlying job scheduler. '
'Contains two keys, ''args'' (list) and ''kwargs'' (object).';
comment on column task.next_run is 'The next run of this task should be run on or after that time';
comment on column task.current_interval is 'The interval between two runs of this task, '
'taking into account the backoff factor';
create index on task(type);
create index on task(next_run);
create index task_args on task using btree ((arguments -> 'args'));
create index task_kwargs on task using gin ((arguments -> 'kwargs'));
create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'lost');
comment on type task_run_status is 'Status of a given task run';
create table task_run (
id bigserial primary key,
task bigint not null references task(id),
backend_id text,
scheduled timestamptz,
started timestamptz,
ended timestamptz,
metadata jsonb,
status task_run_status not null default 'scheduled'
);
comment on table task_run is 'History of task runs sent to the job-running backend';
comment on column task_run.backend_id is 'id of the task run in the job-running backend';
comment on column task_run.metadata is 'Useful metadata for the given task run. '
'For instance, the worker that took on the job, '
'or the logs for the run.';
create index on task_run(task);
create index on task_run(backend_id);
create or replace function swh_scheduler_mktemp_task ()
returns void
language sql
as $$
create temporary table tmp_task (
like task excluding indexes
) on commit drop;
alter table tmp_task
drop column id,
drop column current_interval,
drop column status;
$$;
comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation';
create or replace function swh_scheduler_create_tasks_from_temp ()
returns setof task
language plpgsql
as $$
begin
return query
insert into task (type, arguments, next_run, status, current_interval)
select type, arguments, next_run, 'next_run_not_scheduled',
(select default_interval from task_type tt where tt.type = tmp_task.type)
from tmp_task
returning task.*;
end;
$$;
comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table';
create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(),
num_tasks bigint default NULL)
returns setof task
language sql
stable
as $$
select * from task
where next_run <= ts
and type = task_type
and status = 'next_run_not_scheduled'
order by next_run
limit num_tasks;
$$;
create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(),
num_tasks bigint default NULL)
returns setof task
language sql
as $$
update task
set status='next_run_scheduled'
from (
select id from task
where next_run <= ts
and type = task_type
and status='next_run_not_scheduled'
order by next_run
limit num_tasks
for update skip locked
) next_tasks
where task.id = next_tasks.id
returning task.*;
$$;
create or replace function swh_scheduler_schedule_task_run (task_id bigint,
backend_id text,
metadata jsonb default '{}'::jsonb,
ts timestamptz default now())
returns task_run
language sql
as $$
insert into task_run (task, backend_id, metadata, scheduled, status)
values (task_id, backend_id, metadata, ts, 'scheduled')
returning *;
$$;
create or replace function swh_scheduler_mktemp_task_run ()
returns void
language sql
as $$
create temporary table tmp_task_run (
like task_run excluding indexes
) on commit drop;
alter table tmp_task_run
drop column id,
drop column status;
$$;
comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling';
create or replace function swh_scheduler_schedule_task_run_from_temp ()
returns void
language plpgsql
as $$
begin
insert into task_run (task, backend_id, metadata, scheduled, status)
select task, backend_id, metadata, scheduled, 'scheduled'
from tmp_task_run;
return;
end;
$$;
create or replace function swh_scheduler_start_task_run (backend_id text,
metadata jsonb default '{}'::jsonb,
ts timestamptz default now())
returns task_run
language sql
as $$
update task_run
set started = ts,
status = 'started',
metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata
where task_run.backend_id = swh_scheduler_start_task_run.backend_id
returning *;
$$;
create or replace function swh_scheduler_end_task_run (backend_id text,
status task_run_status,
metadata jsonb default '{}'::jsonb,
ts timestamptz default now())
returns task_run
language sql
as $$
update task_run
set ended = ts,
status = swh_scheduler_end_task_run.status,
metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata
where task_run.backend_id = swh_scheduler_end_task_run.backend_id
returning *;
$$;
create or replace function swh_scheduler_compute_new_task_interval (task_type text,
current_interval interval,
end_status task_run_status)
returns interval
language plpgsql
stable
as $$
declare
task_type_row task_type%rowtype;
adjustment_factor float;
begin
select *
from task_type
where type = swh_scheduler_compute_new_task_interval.task_type
into task_type_row;
case end_status
when 'eventful' then
adjustment_factor := 1/task_type_row.backoff_factor;
when 'uneventful' then
adjustment_factor := task_type_row.backoff_factor;
else
-- failed or lost task: no backoff.
adjustment_factor := 1;
end case;
return greatest(task_type_row.min_interval,
least(task_type_row.max_interval,
adjustment_factor * current_interval));
end;
$$;
create or replace function swh_scheduler_update_task_interval ()
returns trigger
language plpgsql
as $$
begin
update task
set status = 'next_run_not_scheduled',
current_interval = swh_scheduler_compute_new_task_interval(type, current_interval, new.status),
next_run = now () + swh_scheduler_compute_new_task_interval(type, current_interval, new.status)
where id = new.task;
return null;
end;
$$;
create trigger update_interval_on_task_end
after update of status on task_run
for each row
when (new.status IN ('eventful', 'uneventful', 'failed', 'lost'))
execute procedure swh_scheduler_update_task_interval ();
diff --git a/sql/updates/04.sql b/sql/updates/04.sql
index c28629f..a051388 100644
--- a/sql/updates/04.sql
+++ b/sql/updates/04.sql
@@ -1,49 +1,53 @@
-- SWH Scheduler Schema upgrade
-- from_version: 03
-- to_version: 04
-- description: Add a maximum queue length to the task types in the scheduler
begin;
insert into dbversion (version, release, description)
values (4, now(), 'Work In Progress');
+alter table task_type add column max_queue_length bigint;
+comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks';
+
+
drop function swh_scheduler_peek_ready_tasks (timestamptz, bigint);
drop function swh_scheduler_grab_ready_tasks (timestamptz, bigint);
create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(),
num_tasks bigint default NULL)
returns setof task
language sql
stable
as $$
select * from task
where next_run <= ts
and type = task_type
and status = 'next_run_not_scheduled'
order by next_run
limit num_tasks;
$$;
create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(),
num_tasks bigint default NULL)
returns setof task
language sql
as $$
update task
set status='next_run_scheduled'
from (
select id from task
where next_run <= ts
and type = task_type
and status='next_run_not_scheduled'
order by next_run
limit num_tasks
for update skip locked
) next_tasks
where task.id = next_tasks.id
returning task.*;
$$;
commit;
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
index 29b234a..641c3a0 100644
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -1,409 +1,411 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import binascii
import datetime
from functools import wraps
import json
import tempfile
from arrow import Arrow, utcnow
import psycopg2
import psycopg2.extras
from psycopg2.extensions import AsIs
from swh.core.config import SWHConfig
def adapt_arrow(arrow):
return AsIs("'%s'::timestamptz" % arrow.isoformat())
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
psycopg2.extensions.register_adapter(Arrow, adapt_arrow)
def autocommit(fn):
@wraps(fn)
def wrapped(self, *args, **kwargs):
autocommit = False
if 'cursor' not in kwargs or not kwargs['cursor']:
autocommit = True
kwargs['cursor'] = self.cursor()
try:
ret = fn(self, *args, **kwargs)
except:
if autocommit:
self.rollback()
raise
if autocommit:
self.commit()
return ret
return wrapped
class SchedulerBackend(SWHConfig):
"""
Backend for the Software Heritage scheduling database.
"""
CONFIG_BASE_FILENAME = 'scheduler.ini'
DEFAULT_CONFIG = {
'scheduling_db': ('str', 'dbname=swh-scheduler'),
}
def __init__(self, **override_config):
self.config = self.parse_config_file(global_config=False)
self.config.update(override_config)
self.db = None
self.reconnect()
def reconnect(self):
if not self.db or self.db.closed:
self.db = psycopg2.connect(
dsn=self.config['scheduling_db'],
cursor_factory=psycopg2.extras.RealDictCursor,
)
def cursor(self):
"""Return a fresh cursor on the database, with auto-reconnection in case of
failure"""
cur = None
# Get a fresh cursor and reconnect at most three times
tries = 0
while True:
tries += 1
try:
cur = self.db.cursor()
cur.execute('select 1')
break
except psycopg2.OperationalError:
if tries < 3:
self.reconnect()
else:
raise
return cur
def commit(self):
"""Commit a transaction"""
self.db.commit()
def rollback(self):
"""Rollback a transaction"""
self.db.rollback()
def copy_to(self, items, tblname, columns, cursor=None, item_cb=None):
def escape(data):
if data is None:
return ''
if isinstance(data, bytes):
return '\\x%s' % binascii.hexlify(data).decode('ascii')
elif isinstance(data, str):
return '"%s"' % data.replace('"', '""')
elif isinstance(data, (datetime.datetime, Arrow)):
# We escape twice to make sure the string generated by
# isoformat gets escaped
return escape(data.isoformat())
elif isinstance(data, dict):
return escape(json.dumps(data))
elif isinstance(data, list):
return escape("{%s}" % ','.join(escape(d) for d in data))
elif isinstance(data, psycopg2.extras.Range):
# We escape twice here too, so that we make sure
# everything gets passed to copy properly
return escape(
'%s%s,%s%s' % (
'[' if data.lower_inc else '(',
'-infinity' if data.lower_inf else escape(data.lower),
'infinity' if data.upper_inf else escape(data.upper),
']' if data.upper_inc else ')',
)
)
else:
# We don't escape here to make sure we pass literals properly
return str(data)
with tempfile.TemporaryFile('w+') as f:
for d in items:
if item_cb is not None:
item_cb(d)
line = [escape(d.get(k)) for k in columns]
f.write(','.join(line))
f.write('\n')
f.seek(0)
cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % (
tblname, ', '.join(columns)), f)
task_type_keys = [
'type', 'description', 'backend_name', 'default_interval',
- 'min_interval', 'max_interval', 'backoff_factor',
+ 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length',
]
def _format_query(self, query, keys):
"""Format a query with the given keys"""
query_keys = ', '.join(keys)
placeholders = ', '.join(['%s'] * len(keys))
return query.format(keys=query_keys, placeholders=placeholders)
def _format_multiquery(self, query, keys, values):
"""Format a query with placeholders generated for multiple values"""
query_keys = ', '.join(keys)
placeholders = '), ('.join(
[', '.join(['%s'] * len(keys))] * len(values)
)
ret_values = sum([[value[key] for key in keys]
for value in values], [])
return (
query.format(keys=query_keys, placeholders=placeholders),
ret_values,
)
@autocommit
def create_task_type(self, task_type, cursor=None):
"""Create a new task type ready for scheduling.
A task type is a dictionary with the following keys:
type (str): an identifier for the task type
description (str): a human-readable description of what the task
does
backend_name (str): the name of the task in the job-scheduling
backend
default_interval (datetime.timedelta): the default interval
between two task runs
min_interval (datetime.timedelta): the minimum interval between
two task runs
max_interval (datetime.timedelta): the maximum interval between
two task runs
backoff_factor (float): the factor by which the interval changes
at each run
+ max_queue_length (int): the maximum length of the task queue for
+ this task type
"""
query = self._format_query(
"""insert into task_type ({keys}) values ({placeholders})""",
self.task_type_keys,
)
cursor.execute(query, [task_type[key] for key in self.task_type_keys])
@autocommit
def get_task_type(self, task_type_name, cursor=None):
"""Retrieve the task type with id task_type_name"""
query = self._format_query(
"select {keys} from task_type where type=%s",
self.task_type_keys,
)
cursor.execute(query, (task_type_name,))
ret = cursor.fetchone()
return ret
@autocommit
def get_task_types(self, cursor=None):
query = self._format_query(
"select {keys} from task_type",
self.task_type_keys,
)
cursor.execute(query)
ret = cursor.fetchall()
return ret
task_keys = ['id', 'type', 'arguments', 'next_run', 'current_interval',
'status']
task_create_keys = ['type', 'arguments', 'next_run']
@autocommit
def create_tasks(self, tasks, cursor=None):
"""Create new tasks.
A task is a dictionary with the following keys:
type (str): the task type
arguments (dict): the arguments for the task runner
args (list of str): arguments
kwargs (dict str -> str): keyword arguments
next_run (datetime.datetime): the next scheduled run for the task
This returns a list of created task ids.
"""
cursor.execute('select swh_scheduler_mktemp_task()')
self.copy_to(tasks, 'tmp_task', self.task_create_keys, cursor)
query = self._format_query(
'select {keys} from swh_scheduler_create_tasks_from_temp()',
self.task_keys,
)
cursor.execute(query)
return cursor.fetchall()
@autocommit
def disable_tasks(self, task_ids, cursor=None):
"""Disable the tasks whose ids are listed."""
query = "UPDATE task SET status = 'disabled' WHERE id IN %s"
cursor.execute(query, (tuple(task_ids),))
return None
@autocommit
def get_tasks(self, task_ids, cursor=None):
"""Retrieve the info of tasks whose ids are listed."""
query = self._format_query('select {keys} from task where id in %s',
self.task_keys)
cursor.execute(query, (tuple(task_ids),))
return cursor.fetchall()
@autocommit
def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None,
cursor=None):
"""Fetch the list of ready tasks
Args:
timestamp (datetime.datetime): peek tasks that need to be executed
before that timestamp
num_tasks (int): only peek at num_tasks tasks
Returns:
a list of tasks
"""
if timestamp is None:
timestamp = utcnow()
cursor.execute(
'select * from swh_scheduler_peek_ready_tasks(%s, %s, %s)',
(task_type, timestamp, num_tasks)
)
return cursor.fetchall()
@autocommit
def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None,
cursor=None):
"""Fetch the list of ready tasks, and mark them as scheduled
Args:
timestamp (datetime.datetime): grab tasks that need to be executed
before that timestamp
num_tasks (int): only grab num_tasks tasks
Returns:
a list of tasks
"""
if timestamp is None:
timestamp = utcnow()
cursor.execute(
'select * from swh_scheduler_grab_ready_tasks(%s, %s, %s)',
(task_type, timestamp, num_tasks)
)
return cursor.fetchall()
task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata']
@autocommit
def schedule_task_run(self, task_id, backend_id, metadata=None,
timestamp=None, cursor=None):
"""Mark a given task as scheduled, adding a task_run entry in the database.
Args:
task_id (int): the identifier for the task being scheduled
backend_id (str): the identifier of the job in the backend
metadata (dict): metadata to add to the task_run entry
timestamp (datetime.datetime): the instant the event occurred
Returns:
a fresh task_run entry
"""
if metadata is None:
metadata = {}
if timestamp is None:
timestamp = utcnow()
cursor.execute(
'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)',
(task_id, backend_id, metadata, timestamp)
)
return cursor.fetchone()
@autocommit
def mass_schedule_task_runs(self, task_runs, cursor=None):
"""Schedule a bunch of task runs.
Args:
task_runs: a list of dicts with keys:
task (int): the identifier for the task being scheduled
backend_id (str): the identifier of the job in the backend
metadata (dict): metadata to add to the task_run entry
scheduled (datetime.datetime): the instant the event occurred
Returns:
None
"""
cursor.execute('select swh_scheduler_mktemp_task_run()')
self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys,
cursor)
cursor.execute('select swh_scheduler_schedule_task_run_from_temp()')
@autocommit
def start_task_run(self, backend_id, metadata=None, timestamp=None,
cursor=None):
"""Mark a given task as started, updating the corresponding task_run
entry in the database.
Args:
backend_id (str): the identifier of the job in the backend
metadata (dict): metadata to add to the task_run entry
timestamp (datetime.datetime): the instant the event occurred
Returns:
the updated task_run entry
"""
if metadata is None:
metadata = {}
if timestamp is None:
timestamp = utcnow()
cursor.execute(
'select * from swh_scheduler_start_task_run(%s, %s, %s)',
(backend_id, metadata, timestamp)
)
return cursor.fetchone()
@autocommit
def end_task_run(self, backend_id, status, metadata=None, timestamp=None,
result=None, cursor=None):
"""Mark a given task as ended, updating the corresponding task_run
entry in the database.
Args:
backend_id (str): the identifier of the job in the backend
status ('eventful', 'uneventful', 'failed'): how the task ended
metadata (dict): metadata to add to the task_run entry
timestamp (datetime.datetime): the instant the event occurred
Returns:
the updated task_run entry
"""
if metadata is None:
metadata = {}
if timestamp is None:
timestamp = utcnow()
cursor.execute(
'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)',
(backend_id, status, metadata, timestamp)
)
return cursor.fetchone()
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
index f2c1056..34d25a6 100644
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -1,215 +1,217 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import os
import random
import unittest
from arrow import utcnow
from nose.tools import istest
import psycopg2
from swh.core.tests.db_testing import SingleDbTestFixture
from swh.scheduler.backend import SchedulerBackend
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
class Scheduler(SingleDbTestFixture, unittest.TestCase):
TEST_DB_NAME = 'softwareheritage-scheduler-test'
TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler.dump')
def setUp(self):
super().setUp()
self.config = {'scheduling_db': 'dbname=' + self.TEST_DB_NAME}
self.backend = SchedulerBackend(**self.config)
self.task_type = tt = {
'type': 'update-git',
'description': 'Update a git repository',
'backend_name': 'swh.loader.git.tasks.UpdateGitRepository',
'default_interval': datetime.timedelta(days=64),
'min_interval': datetime.timedelta(hours=12),
'max_interval': datetime.timedelta(days=64),
'backoff_factor': 2,
+ 'max_queue_length': None,
}
self.task_type2 = tt2 = tt.copy()
tt2['type'] = 'update-hg'
tt2['description'] = 'Update a mercurial repository'
tt2['backend_name'] = 'swh.loader.mercurial.tasks.UpdateHgRepository'
+ tt2['max_queue_length'] = 42
self.task1_template = t1_template = {
'type': tt['type'],
'arguments': {
'args': [],
'kwargs': {},
},
'next_run': None,
}
self.task2_template = t2_template = copy.deepcopy(t1_template)
t2_template['type'] = tt2['type']
def tearDown(self):
self.backend.db.close()
self.empty_tables()
super().tearDown()
def empty_tables(self):
self.cursor.execute("""SELECT table_name FROM information_schema.tables
WHERE table_schema = %s""", ('public',))
tables = set(table for (table,) in self.cursor.fetchall())
for table in tables:
self.cursor.execute('truncate table %s cascade' % table)
self.conn.commit()
@istest
def add_task_type(self):
tt = self.task_type
tt2 = self.task_type2
self.backend.create_task_type(tt)
self.assertEqual(tt, self.backend.get_task_type(tt['type']))
with self.assertRaisesRegex(psycopg2.IntegrityError,
'\(type\)=\(%s\)' % tt['type']):
self.backend.create_task_type(tt)
self.backend.create_task_type(tt2)
self.assertEqual(tt, self.backend.get_task_type(tt['type']))
self.assertEqual(tt2, self.backend.get_task_type(tt2['type']))
@istest
def get_task_types(self):
tt = self.task_type
tt2 = self.task_type2
self.backend.create_task_type(tt)
self.backend.create_task_type(tt2)
self.assertCountEqual([tt2, tt], self.backend.get_task_types())
@staticmethod
def _task_from_template(template, next_run, *args, **kwargs):
ret = copy.deepcopy(template)
ret['next_run'] = next_run
if args:
ret['arguments']['args'] = list(args)
if kwargs:
ret['arguments']['kwargs'] = kwargs
return ret
def _tasks_from_template(self, template, max_timestamp, num):
return [
self._task_from_template(
template,
max_timestamp - datetime.timedelta(microseconds=i),
'argument-%03d' % i,
**{'kwarg%03d' % i: 'bogus-kwarg'}
)
for i in range(num)
]
def _create_task_types(self):
self.backend.create_task_type(self.task_type)
self.backend.create_task_type(self.task_type2)
@istest
def create_tasks(self):
self._create_task_types()
tasks = self._tasks_from_template(self.task1_template, utcnow(), 100)
ret = self.backend.create_tasks(tasks)
ids = set()
for task, orig_task in zip(ret, tasks):
task = copy.deepcopy(task)
self.assertNotIn(task['id'], ids)
self.assertEqual(task['status'], 'next_run_not_scheduled')
self.assertEqual(task['current_interval'],
self.task_type['default_interval'])
ids.add(task['id'])
del task['id']
del task['status']
del task['current_interval']
self.assertEqual(task, orig_task)
@istest
def peek_ready_tasks(self):
self._create_task_types()
t = utcnow()
task_type = self.task1_template['type']
tasks = self._tasks_from_template(self.task1_template, t, 100)
random.shuffle(tasks)
self.backend.create_tasks(tasks)
ready_tasks = self.backend.peek_ready_tasks(task_type)
self.assertEqual(len(ready_tasks), len(tasks))
for i in range(len(ready_tasks) - 1):
self.assertLessEqual(ready_tasks[i]['next_run'],
ready_tasks[i+1]['next_run'])
# Only get the first few ready tasks
limit = random.randrange(5, 5 + len(tasks)//2)
ready_tasks_limited = self.backend.peek_ready_tasks(
task_type, num_tasks=limit)
self.assertEqual(len(ready_tasks_limited), limit)
self.assertCountEqual(ready_tasks_limited, ready_tasks[:limit])
# Limit by timestamp
max_ts = tasks[limit-1]['next_run']
ready_tasks_timestamped = self.backend.peek_ready_tasks(
task_type, timestamp=max_ts)
for ready_task in ready_tasks_timestamped:
self.assertLessEqual(ready_task['next_run'], max_ts)
# Make sure we get proper behavior for the first ready tasks
self.assertCountEqual(
ready_tasks[:len(ready_tasks_timestamped)],
ready_tasks_timestamped,
)
# Limit by both
ready_tasks_both = self.backend.peek_ready_tasks(
task_type, timestamp=max_ts, num_tasks=limit//3)
self.assertLessEqual(len(ready_tasks_both), limit//3)
for ready_task in ready_tasks_both:
self.assertLessEqual(ready_task['next_run'], max_ts)
self.assertIn(ready_task, ready_tasks[:limit//3])
@istest
def grab_ready_tasks(self):
self._create_task_types()
t = utcnow()
task_type = self.task1_template['type']
tasks = self._tasks_from_template(self.task1_template, t, 100)
random.shuffle(tasks)
self.backend.create_tasks(tasks)
first_ready_tasks = self.backend.peek_ready_tasks(
task_type, num_tasks=10)
grabbed_tasks = self.backend.grab_ready_tasks(task_type, num_tasks=10)
for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks):
self.assertEqual(peeked['status'], 'next_run_not_scheduled')
del peeked['status']
self.assertEqual(grabbed['status'], 'next_run_scheduled')
del grabbed['status']
self.assertEqual(peeked, grabbed)
@istest
def get_tasks(self):
self._create_task_types()
t = utcnow()
tasks = self._tasks_from_template(self.task1_template, t, 100)
tasks = self.backend.create_tasks(tasks)
random.shuffle(tasks)
while len(tasks) > 1:
length = random.randrange(1, len(tasks))
cur_tasks = tasks[:length]
tasks[:length] = []
ret = self.backend.get_tasks(task['id'] for task in cur_tasks)
self.assertCountEqual(ret, cur_tasks)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 7:55 PM (3 w, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3247644

Event Timeline