Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import binascii | import binascii | ||||
import datetime | import datetime | ||||
from functools import wraps | |||||
import json | import json | ||||
import tempfile | import tempfile | ||||
import logging | import logging | ||||
from arrow import Arrow, utcnow | from arrow import Arrow, utcnow | ||||
import psycopg2 | import psycopg2.pool | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from psycopg2.extensions import AsIs | from psycopg2.extensions import AsIs | ||||
from swh.core.config import SWHConfig | from swh.core.db import BaseDb | ||||
from swh.core.db.common import db_transaction, db_transaction_generator | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def adapt_arrow(arrow): | def adapt_arrow(arrow): | ||||
return AsIs("'%s'::timestamptz" % arrow.isoformat()) | return AsIs("'%s'::timestamptz" % arrow.isoformat()) | ||||
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | ||||
psycopg2.extensions.register_adapter(Arrow, adapt_arrow) | psycopg2.extensions.register_adapter(Arrow, adapt_arrow) | ||||
def autocommit(fn): | class DbBackend(BaseDb): | ||||
@wraps(fn) | """Base class intended to be used for scheduling db backend classes | ||||
def wrapped(self, *args, **kwargs): | |||||
autocommit = False | |||||
if 'cursor' not in kwargs or not kwargs['cursor']: | |||||
autocommit = True | |||||
kwargs['cursor'] = self.cursor() | |||||
try: | |||||
ret = fn(self, *args, **kwargs) | |||||
except Exception: | |||||
if autocommit: | |||||
self.rollback() | |||||
raise | |||||
if autocommit: | |||||
self.commit() | |||||
return ret | |||||
return wrapped | |||||
class DbBackend: | |||||
"""Mixin intended to be used within scheduling db backend classes | |||||
cf. swh.scheduler.backend.SchedulerBackend, and | cf. swh.scheduler.backend.SchedulerBackend, and | ||||
swh.scheduler.updater.backend.SchedulerUpdaterBackend | swh.scheduler.updater.backend.SchedulerUpdaterBackend | ||||
""" | """ | ||||
def reconnect(self): | cursor = BaseDb._cursor | ||||
if not self.db or self.db.closed: | |||||
self.db = psycopg2.connect( | |||||
dsn=self.db_conn_dsn, | |||||
cursor_factory=psycopg2.extras.RealDictCursor, | |||||
) | |||||
def cursor(self): | |||||
"""Return a fresh cursor on the database, with auto-reconnection in | |||||
case of failure | |||||
""" | |||||
cur = None | |||||
# Get a fresh cursor and reconnect at most three times | |||||
tries = 0 | |||||
while True: | |||||
tries += 1 | |||||
try: | |||||
cur = self.db.cursor() | |||||
cur.execute('select 1') | |||||
break | |||||
except psycopg2.OperationalError: | |||||
if tries < 3: | |||||
self.reconnect() | |||||
else: | |||||
raise | |||||
return cur | |||||
def commit(self): | |||||
"""Commit a transaction""" | |||||
self.db.commit() | |||||
def rollback(self): | |||||
"""Rollback a transaction""" | |||||
self.db.rollback() | |||||
def close_connection(self): | |||||
"""Close db connection""" | |||||
if self.db and not self.db.closed: | |||||
self.db.close() | |||||
def _format_query(self, query, keys): | def _format_query(self, query, keys): | ||||
"""Format a query with the given keys""" | """Format a query with the given keys""" | ||||
query_keys = ', '.join(keys) | query_keys = ', '.join(keys) | ||||
placeholders = ', '.join(['%s'] * len(keys)) | placeholders = ', '.join(['%s'] * len(keys)) | ||||
return query.format(keys=query_keys, placeholders=placeholders) | return query.format(keys=query_keys, placeholders=placeholders) | ||||
def _format_multiquery(self, query, keys, values): | |||||
"""Format a query with placeholders generated for multiple values""" | |||||
query_keys = ', '.join(keys) | |||||
placeholders = '), ('.join( | |||||
[', '.join(['%s'] * len(keys))] * len(values) | |||||
) | |||||
ret_values = sum([[value[key] for key in keys] | |||||
for value in values], []) | |||||
return ( | |||||
query.format(keys=query_keys, placeholders=placeholders), | |||||
ret_values, | |||||
) | |||||
def copy_to(self, items, tblname, columns, default_columns={}, | def copy_to(self, items, tblname, columns, default_columns={}, | ||||
cursor=None, item_cb=None): | cursor=None, item_cb=None): | ||||
def escape(data): | def escape(data): | ||||
if data is None: | if data is None: | ||||
return '' | return '' | ||||
if isinstance(data, bytes): | if isinstance(data, bytes): | ||||
return '\\x%s' % binascii.hexlify(data).decode('ascii') | return '\\x%s' % binascii.hexlify(data).decode('ascii') | ||||
elif isinstance(data, str): | elif isinstance(data, str): | ||||
Show All 33 Lines | def copy_to(self, items, tblname, columns, default_columns={}, | ||||
line.append(v) | line.append(v) | ||||
f.write(','.join(line)) | f.write(','.join(line)) | ||||
f.write('\n') | f.write('\n') | ||||
f.seek(0) | f.seek(0) | ||||
cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( | cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( | ||||
tblname, ', '.join(columns)), f) | tblname, ', '.join(columns)), f) | ||||
class SchedulerBackend(SWHConfig, DbBackend): | class SchedulerBackend: | ||||
"""Backend for the Software Heritage scheduling database. | """Backend for the Software Heritage scheduling database. | ||||
""" | """ | ||||
CONFIG_BASE_FILENAME = 'scheduler' | |||||
DEFAULT_CONFIG = { | |||||
'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), | |||||
} | |||||
def __init__(self, **override_config): | def __init__(self, db, min_pool_conns=1, max_pool_conns=10): | ||||
super().__init__() | """ | ||||
self.config = self.parse_config_file(global_config=False) | Args: | ||||
self.config.update(override_config) | db_conn: either a libpq connection string, or a psycopg2 connection | ||||
self.db = None | |||||
self.db_conn_dsn = self.config['scheduling_db'] | """ | ||||
self.reconnect() | if isinstance(db, psycopg2.extensions.connection): | ||||
logger.debug('SchedulerBackend config=%s' % self.config) | self._pool = None | ||||
self._db = DbBackend(db) | |||||
else: | |||||
self._pool = psycopg2.pool.ThreadedConnectionPool( | |||||
min_pool_conns, max_pool_conns, db, | |||||
cursor_factory=psycopg2.extras.RealDictCursor, | |||||
) | |||||
self._db = None | |||||
def get_db(self): | |||||
if self._db: | |||||
return self._db | |||||
return DbBackend.from_pool(self._pool) | |||||
task_type_keys = [ | task_type_keys = [ | ||||
'type', 'description', 'backend_name', 'default_interval', | 'type', 'description', 'backend_name', 'default_interval', | ||||
'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', | 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', | ||||
'num_retries', 'retry_delay', | 'num_retries', 'retry_delay', | ||||
] | ] | ||||
@autocommit | @db_transaction() | ||||
def create_task_type(self, task_type, cursor=None): | def create_task_type(self, task_type, db=None, cur=None): | ||||
"""Create a new task type ready for scheduling. | """Create a new task type ready for scheduling. | ||||
Args: | Args: | ||||
task_type (dict): a dictionary with the following keys: | task_type (dict): a dictionary with the following keys: | ||||
- type (str): an identifier for the task type | - type (str): an identifier for the task type | ||||
- description (str): a human-readable description of what the | - description (str): a human-readable description of what the | ||||
task does | task does | ||||
- backend_name (str): the name of the task in the | - backend_name (str): the name of the task in the | ||||
job-scheduling backend | job-scheduling backend | ||||
- default_interval (datetime.timedelta): the default interval | - default_interval (datetime.timedelta): the default interval | ||||
between two task runs | between two task runs | ||||
- min_interval (datetime.timedelta): the minimum interval | - min_interval (datetime.timedelta): the minimum interval | ||||
between two task runs | between two task runs | ||||
- max_interval (datetime.timedelta): the maximum interval | - max_interval (datetime.timedelta): the maximum interval | ||||
between two task runs | between two task runs | ||||
- backoff_factor (float): the factor by which the interval | - backoff_factor (float): the factor by which the interval | ||||
changes at each run | changes at each run | ||||
- max_queue_length (int): the maximum length of the task queue | - max_queue_length (int): the maximum length of the task queue | ||||
for this task type | for this task type | ||||
""" | """ | ||||
keys = [key for key in self.task_type_keys if key in task_type] | keys = [key for key in self.task_type_keys if key in task_type] | ||||
query = self._format_query( | query = db._format_query( | ||||
"""insert into task_type ({keys}) values ({placeholders})""", | """insert into task_type ({keys}) values ({placeholders})""", | ||||
keys) | keys) | ||||
cursor.execute(query, [task_type[key] for key in keys]) | cur.execute(query, [task_type[key] for key in keys]) | ||||
@autocommit | @db_transaction() | ||||
def get_task_type(self, task_type_name, cursor=None): | def get_task_type(self, task_type_name, db=None, cur=None): | ||||
"""Retrieve the task type with id task_type_name""" | """Retrieve the task type with id task_type_name""" | ||||
query = self._format_query( | query = db._format_query( | ||||
"select {keys} from task_type where type=%s", | "select {keys} from task_type where type=%s", | ||||
self.task_type_keys, | self.task_type_keys, | ||||
) | ) | ||||
cursor.execute(query, (task_type_name,)) | cur.execute(query, (task_type_name,)) | ||||
return cur.fetchone() | |||||
ret = cursor.fetchone() | |||||
return ret | @db_transaction() | ||||
def get_task_types(self, db=None, cur=None): | |||||
@autocommit | """Retrieve all registered task types""" | ||||
def get_task_types(self, cursor=None): | query = db._format_query( | ||||
query = self._format_query( | |||||
"select {keys} from task_type", | "select {keys} from task_type", | ||||
self.task_type_keys, | self.task_type_keys, | ||||
) | ) | ||||
cursor.execute(query) | cur.execute(query) | ||||
ret = cursor.fetchall() | return cur.fetchall() | ||||
return ret | |||||
task_create_keys = [ | task_create_keys = [ | ||||
'type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', | 'type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', | ||||
'priority' | 'priority' | ||||
] | ] | ||||
task_keys = task_create_keys + ['id', 'current_interval', 'status'] | task_keys = task_create_keys + ['id', 'current_interval', 'status'] | ||||
@autocommit | @db_transaction() | ||||
def create_tasks(self, tasks, policy='recurring', cursor=None): | def create_tasks(self, tasks, policy='recurring', db=None, cur=None): | ||||
"""Create new tasks. | """Create new tasks. | ||||
Args: | Args: | ||||
tasks (list): each task is a dictionary with the following keys: | tasks (list): each task is a dictionary with the following keys: | ||||
- type (str): the task type | - type (str): the task type | ||||
- arguments (dict): the arguments for the task runner, keys: | - arguments (dict): the arguments for the task runner, keys: | ||||
- args (list of str): arguments | - args (list of str): arguments | ||||
- kwargs (dict str -> str): keyword arguments | - kwargs (dict str -> str): keyword arguments | ||||
- next_run (datetime.datetime): the next scheduled run for the | - next_run (datetime.datetime): the next scheduled run for the | ||||
task | task | ||||
Returns: | Returns: | ||||
a list of created tasks. | a list of created tasks. | ||||
""" | """ | ||||
cursor.execute('select swh_scheduler_mktemp_task()') | cur.execute('select swh_scheduler_mktemp_task()') | ||||
self.copy_to(tasks, 'tmp_task', self.task_create_keys, | db.copy_to(tasks, 'tmp_task', self.task_create_keys, | ||||
default_columns={ | default_columns={ | ||||
'policy': policy, | 'policy': policy, | ||||
'status': 'next_run_not_scheduled' | 'status': 'next_run_not_scheduled' | ||||
}, | }, | ||||
cursor=cursor) | cursor=cur) | ||||
query = self._format_query( | query = db._format_query( | ||||
'select {keys} from swh_scheduler_create_tasks_from_temp()', | 'select {keys} from swh_scheduler_create_tasks_from_temp()', | ||||
self.task_keys, | self.task_keys, | ||||
) | ) | ||||
cursor.execute(query) | cur.execute(query) | ||||
return cursor.fetchall() | return cur.fetchall() | ||||
@autocommit | @db_transaction() | ||||
def set_status_tasks(self, task_ids, | def set_status_tasks(self, task_ids, status='disabled', next_run=None, | ||||
status='disabled', next_run=None, cursor=None): | db=None, cur=None): | ||||
"""Set the tasks' status whose ids are listed. | """Set the tasks' status whose ids are listed. | ||||
If given, also set the next_run date. | If given, also set the next_run date. | ||||
""" | """ | ||||
if not task_ids: | if not task_ids: | ||||
return | return | ||||
query = ["UPDATE task SET status = %s"] | query = ["UPDATE task SET status = %s"] | ||||
args = [status] | args = [status] | ||||
if next_run: | if next_run: | ||||
query.append(', next_run = %s') | query.append(', next_run = %s') | ||||
args.append(next_run) | args.append(next_run) | ||||
query.append(" WHERE id IN %s") | query.append(" WHERE id IN %s") | ||||
args.append(tuple(task_ids)) | args.append(tuple(task_ids)) | ||||
cursor.execute(''.join(query), args) | cur.execute(''.join(query), args) | ||||
@autocommit | @db_transaction() | ||||
def disable_tasks(self, task_ids, cursor=None): | def disable_tasks(self, task_ids, db=None, cur=None): | ||||
"""Disable the tasks whose ids are listed.""" | """Disable the tasks whose ids are listed.""" | ||||
return self.set_status_tasks(task_ids) | return self.set_status_tasks(task_ids, db=db, cur=cur) | ||||
@autocommit | @db_transaction() | ||||
def search_tasks(self, task_id=None, task_type=None, status=None, | def search_tasks(self, task_id=None, task_type=None, status=None, | ||||
priority=None, policy=None, before=None, after=None, | priority=None, policy=None, before=None, after=None, | ||||
limit=None, cursor=None): | limit=None, db=None, cur=None): | ||||
"""Search tasks from selected criterions""" | """Search tasks from selected criterions""" | ||||
where = [] | where = [] | ||||
args = [] | args = [] | ||||
if task_id: | if task_id: | ||||
if isinstance(task_id, (str, int)): | if isinstance(task_id, (str, int)): | ||||
where.append('id = %s') | where.append('id = %s') | ||||
else: | else: | ||||
Show All 30 Lines | def search_tasks(self, task_id=None, task_type=None, status=None, | ||||
if after: | if after: | ||||
where.append('next_run >= %s') | where.append('next_run >= %s') | ||||
args.append(after) | args.append(after) | ||||
query = 'select * from task where ' + ' and '.join(where) | query = 'select * from task where ' + ' and '.join(where) | ||||
if limit: | if limit: | ||||
query += ' limit %s :: bigint' | query += ' limit %s :: bigint' | ||||
args.append(limit) | args.append(limit) | ||||
cursor.execute(query, args) | cur.execute(query, args) | ||||
return cursor.fetchall() | return cur.fetchall() | ||||
@autocommit | @db_transaction() | ||||
def get_tasks(self, task_ids, cursor=None): | def get_tasks(self, task_ids, db=None, cur=None): | ||||
"""Retrieve the info of tasks whose ids are listed.""" | """Retrieve the info of tasks whose ids are listed.""" | ||||
query = self._format_query('select {keys} from task where id in %s', | query = db._format_query('select {keys} from task where id in %s', | ||||
self.task_keys) | self.task_keys) | ||||
cursor.execute(query, (tuple(task_ids),)) | cur.execute(query, (tuple(task_ids),)) | ||||
return cursor.fetchall() | return cur.fetchall() | ||||
@autocommit | @db_transaction() | ||||
def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, | def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, | ||||
num_tasks_priority=None, | num_tasks_priority=None, | ||||
cursor=None): | db=None, cur=None): | ||||
"""Fetch the list of ready tasks | """Fetch the list of ready tasks | ||||
Args: | Args: | ||||
task_type (str): filtering task per their type | task_type (str): filtering task per their type | ||||
timestamp (datetime.datetime): peek tasks that need to be executed | timestamp (datetime.datetime): peek tasks that need to be executed | ||||
before that timestamp | before that timestamp | ||||
num_tasks (int): only peek at num_tasks tasks (with no priority) | num_tasks (int): only peek at num_tasks tasks (with no priority) | ||||
num_tasks_priority (int): only peek at num_tasks_priority | num_tasks_priority (int): only peek at num_tasks_priority | ||||
tasks (with priority) | tasks (with priority) | ||||
Returns: | Returns: | ||||
a list of tasks | a list of tasks | ||||
""" | """ | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cursor.execute( | cur.execute( | ||||
'''select * from swh_scheduler_peek_ready_tasks( | '''select * from swh_scheduler_peek_ready_tasks( | ||||
%s, %s, %s :: bigint, %s :: bigint)''', | %s, %s, %s :: bigint, %s :: bigint)''', | ||||
(task_type, timestamp, num_tasks, num_tasks_priority) | (task_type, timestamp, num_tasks, num_tasks_priority) | ||||
) | ) | ||||
logger.debug('PEEK %s => %s' % (task_type, cursor.rowcount)) | logger.debug('PEEK %s => %s' % (task_type, cur.rowcount)) | ||||
return cursor.fetchall() | return cur.fetchall() | ||||
@autocommit | @db_transaction() | ||||
def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, | def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, | ||||
num_tasks_priority=None, cursor=None): | num_tasks_priority=None, db=None, cur=None): | ||||
"""Fetch the list of ready tasks, and mark them as scheduled | """Fetch the list of ready tasks, and mark them as scheduled | ||||
Args: | Args: | ||||
task_type (str): filtering task per their type | task_type (str): filtering task per their type | ||||
timestamp (datetime.datetime): grab tasks that need to be executed | timestamp (datetime.datetime): grab tasks that need to be executed | ||||
before that timestamp | before that timestamp | ||||
num_tasks (int): only grab num_tasks tasks (with no priority) | num_tasks (int): only grab num_tasks tasks (with no priority) | ||||
num_tasks_priority (int): only grab oneshot num_tasks tasks (with | num_tasks_priority (int): only grab oneshot num_tasks tasks (with | ||||
priorities) | priorities) | ||||
Returns: | Returns: | ||||
a list of tasks | a list of tasks | ||||
""" | """ | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cursor.execute( | cur.execute( | ||||
'''select * from swh_scheduler_grab_ready_tasks( | '''select * from swh_scheduler_grab_ready_tasks( | ||||
%s, %s, %s :: bigint, %s :: bigint)''', | %s, %s, %s :: bigint, %s :: bigint)''', | ||||
(task_type, timestamp, num_tasks, num_tasks_priority) | (task_type, timestamp, num_tasks, num_tasks_priority) | ||||
) | ) | ||||
logger.debug('GRAB %s => %s' % (task_type, cursor.rowcount)) | logger.debug('GRAB %s => %s' % (task_type, cur.rowcount)) | ||||
return cursor.fetchall() | return cur.fetchall() | ||||
task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] | task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] | ||||
@autocommit | @db_transaction() | ||||
def schedule_task_run(self, task_id, backend_id, metadata=None, | def schedule_task_run(self, task_id, backend_id, metadata=None, | ||||
timestamp=None, cursor=None): | timestamp=None, db=None, cur=None): | ||||
"""Mark a given task as scheduled, adding a task_run entry in the database. | """Mark a given task as scheduled, adding a task_run entry in the database. | ||||
Args: | Args: | ||||
task_id (int): the identifier for the task being scheduled | task_id (int): the identifier for the task being scheduled | ||||
backend_id (str): the identifier of the job in the backend | backend_id (str): the identifier of the job in the backend | ||||
metadata (dict): metadata to add to the task_run entry | metadata (dict): metadata to add to the task_run entry | ||||
timestamp (datetime.datetime): the instant the event occurred | timestamp (datetime.datetime): the instant the event occurred | ||||
Returns: | Returns: | ||||
a fresh task_run entry | a fresh task_run entry | ||||
""" | """ | ||||
if metadata is None: | if metadata is None: | ||||
metadata = {} | metadata = {} | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cursor.execute( | cur.execute( | ||||
'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', | 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', | ||||
(task_id, backend_id, metadata, timestamp) | (task_id, backend_id, metadata, timestamp) | ||||
) | ) | ||||
return cursor.fetchone() | return cur.fetchone() | ||||
@autocommit | @db_transaction() | ||||
def mass_schedule_task_runs(self, task_runs, cursor=None): | def mass_schedule_task_runs(self, task_runs, db=None, cur=None): | ||||
"""Schedule a bunch of task runs. | """Schedule a bunch of task runs. | ||||
Args: | Args: | ||||
task_runs (list): a list of dicts with keys: | task_runs (list): a list of dicts with keys: | ||||
- task (int): the identifier for the task being scheduled | - task (int): the identifier for the task being scheduled | ||||
- backend_id (str): the identifier of the job in the backend | - backend_id (str): the identifier of the job in the backend | ||||
- metadata (dict): metadata to add to the task_run entry | - metadata (dict): metadata to add to the task_run entry | ||||
- scheduled (datetime.datetime): the instant the event occurred | - scheduled (datetime.datetime): the instant the event occurred | ||||
Returns: | Returns: | ||||
None | None | ||||
""" | """ | ||||
cursor.execute('select swh_scheduler_mktemp_task_run()') | cur.execute('select swh_scheduler_mktemp_task_run()') | ||||
self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, | db.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, | ||||
cursor=cursor) | cursor=cur) | ||||
cursor.execute('select swh_scheduler_schedule_task_run_from_temp()') | cur.execute('select swh_scheduler_schedule_task_run_from_temp()') | ||||
@autocommit | @db_transaction() | ||||
def start_task_run(self, backend_id, metadata=None, timestamp=None, | def start_task_run(self, backend_id, metadata=None, timestamp=None, | ||||
cursor=None): | db=None, cur=None): | ||||
"""Mark a given task as started, updating the corresponding task_run | """Mark a given task as started, updating the corresponding task_run | ||||
entry in the database. | entry in the database. | ||||
Args: | Args: | ||||
backend_id (str): the identifier of the job in the backend | backend_id (str): the identifier of the job in the backend | ||||
metadata (dict): metadata to add to the task_run entry | metadata (dict): metadata to add to the task_run entry | ||||
timestamp (datetime.datetime): the instant the event occurred | timestamp (datetime.datetime): the instant the event occurred | ||||
Returns: | Returns: | ||||
the updated task_run entry | the updated task_run entry | ||||
""" | """ | ||||
if metadata is None: | if metadata is None: | ||||
metadata = {} | metadata = {} | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cursor.execute( | cur.execute( | ||||
'select * from swh_scheduler_start_task_run(%s, %s, %s)', | 'select * from swh_scheduler_start_task_run(%s, %s, %s)', | ||||
(backend_id, metadata, timestamp) | (backend_id, metadata, timestamp) | ||||
) | ) | ||||
return cursor.fetchone() | return cur.fetchone() | ||||
@autocommit | @db_transaction() | ||||
def end_task_run(self, backend_id, status, metadata=None, timestamp=None, | def end_task_run(self, backend_id, status, metadata=None, timestamp=None, | ||||
result=None, cursor=None): | result=None, db=None, cur=None): | ||||
"""Mark a given task as ended, updating the corresponding task_run entry in the | """Mark a given task as ended, updating the corresponding task_run entry in the | ||||
database. | database. | ||||
Args: | Args: | ||||
backend_id (str): the identifier of the job in the backend | backend_id (str): the identifier of the job in the backend | ||||
status (str): how the task ended; one of: 'eventful', 'uneventful', | status (str): how the task ended; one of: 'eventful', 'uneventful', | ||||
'failed' | 'failed' | ||||
metadata (dict): metadata to add to the task_run entry | metadata (dict): metadata to add to the task_run entry | ||||
timestamp (datetime.datetime): the instant the event occurred | timestamp (datetime.datetime): the instant the event occurred | ||||
Returns: | Returns: | ||||
the updated task_run entry | the updated task_run entry | ||||
""" | """ | ||||
if metadata is None: | if metadata is None: | ||||
metadata = {} | metadata = {} | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cursor.execute( | cur.execute( | ||||
'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', | 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', | ||||
(backend_id, status, metadata, timestamp) | (backend_id, status, metadata, timestamp) | ||||
) | ) | ||||
return cur.fetchone() | |||||
return cursor.fetchone() | @db_transaction_generator() | ||||
@autocommit | |||||
def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, | def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, | ||||
cursor=None): | db=None, cur=None): | ||||
"""Returns the list of task/task_run prior to a given date to archive. | """Returns the list of task/task_run prior to a given date to archive. | ||||
""" | """ | ||||
last_task_run_id = None | last_task_run_id = None | ||||
while True: | while True: | ||||
row = None | row = None | ||||
cursor.execute( | cur.execute( | ||||
"select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", | "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", | ||||
(after_ts, before_ts, last_id, limit) | (after_ts, before_ts, last_id, limit) | ||||
) | ) | ||||
for row in cursor: | for row in cur: | ||||
# nested type index does not accept bare values | # nested type index does not accept bare values | ||||
# transform it as a dict to comply with this | # transform it as a dict to comply with this | ||||
row['arguments']['args'] = { | row['arguments']['args'] = { | ||||
i: v for i, v in enumerate(row['arguments']['args']) | i: v for i, v in enumerate(row['arguments']['args']) | ||||
} | } | ||||
kwargs = row['arguments']['kwargs'] | kwargs = row['arguments']['kwargs'] | ||||
row['arguments']['kwargs'] = json.dumps(kwargs) | row['arguments']['kwargs'] = json.dumps(kwargs) | ||||
yield row | yield row | ||||
if not row: | if not row: | ||||
break | break | ||||
_id = row.get('task_id') | _id = row.get('task_id') | ||||
_task_run_id = row.get('task_run_id') | _task_run_id = row.get('task_run_id') | ||||
if last_id == _id and last_task_run_id == _task_run_id: | if last_id == _id and last_task_run_id == _task_run_id: | ||||
break | break | ||||
last_id = _id | last_id = _id | ||||
last_task_run_id = _task_run_id | last_task_run_id = _task_run_id | ||||
@autocommit | @db_transaction() | ||||
def delete_archived_tasks(self, task_ids, cursor=None): | def delete_archived_tasks(self, task_ids, db=None, cur=None): | ||||
"""Delete archived tasks as much as possible. Only the task_ids whose | """Delete archived tasks as much as possible. Only the task_ids whose | ||||
complete associated task_run have been cleaned up will be. | complete associated task_run have been cleaned up will be. | ||||
""" | """ | ||||
_task_ids = _task_run_ids = [] | _task_ids = _task_run_ids = [] | ||||
for task_id in task_ids: | for task_id in task_ids: | ||||
_task_ids.append(task_id['task_id']) | _task_ids.append(task_id['task_id']) | ||||
_task_run_ids.append(task_id['task_run_id']) | _task_run_ids.append(task_id['task_run_id']) | ||||
cursor.execute( | cur.execute( | ||||
"select * from swh_scheduler_delete_archived_tasks(%s, %s)", | "select * from swh_scheduler_delete_archived_tasks(%s, %s)", | ||||
(_task_ids, _task_run_ids)) | (_task_ids, _task_run_ids)) |