Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import binascii | import binascii | ||||
import datetime | import datetime | ||||
from functools import wraps | from functools import wraps | ||||
import json | import json | ||||
import tempfile | import tempfile | ||||
import logging | |||||
from arrow import Arrow, utcnow | from arrow import Arrow, utcnow | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from psycopg2.extensions import AsIs | from psycopg2.extensions import AsIs | ||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
logger = logging.getLogger(__name__) | |||||
def adapt_arrow(arrow): | def adapt_arrow(arrow): | ||||
return AsIs("'%s'::timestamptz" % arrow.isoformat()) | return AsIs("'%s'::timestamptz" % arrow.isoformat()) | ||||
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) | ||||
psycopg2.extensions.register_adapter(Arrow, adapt_arrow) | psycopg2.extensions.register_adapter(Arrow, adapt_arrow) | ||||
▲ Show 20 Lines • Show All 152 Lines • ▼ Show 20 Lines | class SchedulerBackend(SWHConfig, DbBackend): | ||||
def __init__(self, **override_config): | def __init__(self, **override_config): | ||||
super().__init__() | super().__init__() | ||||
self.config = self.parse_config_file(global_config=False) | self.config = self.parse_config_file(global_config=False) | ||||
self.config.update(override_config) | self.config.update(override_config) | ||||
self.db = None | self.db = None | ||||
self.db_conn_dsn = self.config['scheduling_db'] | self.db_conn_dsn = self.config['scheduling_db'] | ||||
self.reconnect() | self.reconnect() | ||||
logger.debug('SchedulerBackend config=%s' % self.config) | |||||
task_type_keys = [ | task_type_keys = [ | ||||
'type', 'description', 'backend_name', 'default_interval', | 'type', 'description', 'backend_name', 'default_interval', | ||||
'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', | 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', | ||||
'num_retries', 'retry_delay', | 'num_retries', 'retry_delay', | ||||
] | ] | ||||
@autocommit | @autocommit | ||||
▲ Show 20 Lines • Show All 132 Lines • ▼ Show 20 Lines | def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cursor.execute( | cursor.execute( | ||||
'''select * from swh_scheduler_peek_ready_tasks( | '''select * from swh_scheduler_peek_ready_tasks( | ||||
%s, %s, %s :: bigint, %s :: bigint)''', | %s, %s, %s :: bigint, %s :: bigint)''', | ||||
(task_type, timestamp, num_tasks, num_tasks_priority) | (task_type, timestamp, num_tasks, num_tasks_priority) | ||||
) | ) | ||||
logger.debug('PEEK %s => %s' % (task_type, cursor.rowcount)) | |||||
return cursor.fetchall() | return cursor.fetchall() | ||||
@autocommit | @autocommit | ||||
def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, | def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, | ||||
num_tasks_priority=None, cursor=None): | num_tasks_priority=None, cursor=None): | ||||
"""Fetch the list of ready tasks, and mark them as scheduled | """Fetch the list of ready tasks, and mark them as scheduled | ||||
Args: | Args: | ||||
task_type (str): filtering task per their type | task_type (str): filtering task per their type | ||||
timestamp (datetime.datetime): grab tasks that need to be executed | timestamp (datetime.datetime): grab tasks that need to be executed | ||||
before that timestamp | before that timestamp | ||||
num_tasks (int): only grab num_tasks tasks (with no priority) | num_tasks (int): only grab num_tasks tasks (with no priority) | ||||
num_tasks_priority (int): only grab oneshot num_tasks tasks (with | num_tasks_priority (int): only grab oneshot num_tasks tasks (with | ||||
priorities) | priorities) | ||||
Returns: | Returns: | ||||
a list of tasks | a list of tasks | ||||
""" | """ | ||||
if timestamp is None: | if timestamp is None: | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cursor.execute( | cursor.execute( | ||||
'''select * from swh_scheduler_grab_ready_tasks( | '''select * from swh_scheduler_grab_ready_tasks( | ||||
%s, %s, %s :: bigint, %s :: bigint)''', | %s, %s, %s :: bigint, %s :: bigint)''', | ||||
(task_type, timestamp, num_tasks, num_tasks_priority) | (task_type, timestamp, num_tasks, num_tasks_priority) | ||||
) | ) | ||||
logger.debug('GRAB %s => %s' % (task_type, cursor.rowcount)) | |||||
return cursor.fetchall() | return cursor.fetchall() | ||||
task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] | task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] | ||||
@autocommit | @autocommit | ||||
def schedule_task_run(self, task_id, backend_id, metadata=None, | def schedule_task_run(self, task_id, backend_id, metadata=None, | ||||
timestamp=None, cursor=None): | timestamp=None, cursor=None): | ||||
"""Mark a given task as scheduled, adding a task_run entry in the database. | """Mark a given task as scheduled, adding a task_run entry in the database. | ||||
▲ Show 20 Lines • Show All 151 Lines • Show Last 20 Lines |