diff --git a/PKG-INFO b/PKG-INFO index d6463d2..680a338 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.24 +Version: 0.0.25 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/data/elastic-template.json b/data/elastic-template.json index da3799d..c12d602 100644 --- a/data/elastic-template.json +++ b/data/elastic-template.json @@ -1,53 +1,52 @@ { "order": 0, "index_patterns": ["swh-tasks-*"], "settings": { "index": { "codec": "best_compression", "refresh_interval": "30s" } }, "mappings" : { "task" : { "_source" : { "enabled": true}, "properties": { "task_id": {"type": "double"}, "task_policy": {"type": "text"}, "task_status": {"type": "text"}, "task_run_id": {"type": "double"}, "arguments": { "type": "object", "properties" : { "args": { "type": "nested", "dynamic": false }, "kwargs": { - "type": "object", - "dynamic": false + "type": "text" } } }, "type": {"type": "text"}, "backend_id": {"type": "text"}, "metadata": { "type": "object", "enabled" : false }, "scheduled": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" }, "started": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" }, "ended": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" } } } }, "aliases": {} } diff --git a/debian/changelog b/debian/changelog index 1db3c68..e3bec86 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,191 +1,192 @@ -swh-scheduler (0.0.24-1~swh1~bpo9+1) stretch-swh; urgency=medium +swh-scheduler (0.0.25-1~swh1) unstable-swh; urgency=medium - * Rebuild for stretch-backports. + * v0.0.25 + * swh.scheduler.cli.archive: Index arguments.kwargs as text - -- Antoine R. Dumont (@ardumont) Fri, 13 Apr 2018 14:55:32 +0200 + -- Antoine R. Dumont (@ardumont) Wed, 18 Apr 2018 12:34:43 +0200 swh-scheduler (0.0.24-1~swh1) unstable-swh; urgency=medium * v0.0.24 * data/template: Do not index the arguments field (it's in _source) * data/README: Add a small readme to explain es install step * swh.scheduler.cli: Add a bulk index flag to separate read from index -- Antoine R. Dumont (@ardumont) Fri, 13 Apr 2018 14:55:32 +0200 swh-scheduler (0.0.23-1~swh1) unstable-swh; urgency=medium * swh.scheduler.cli.archive: Delete only completely indexed tasks * Prior to this commit, it could happen that we removed tasks even * though we did not yet index associated task_run. * Related T986 -- Antoine R. Dumont (@ardumont) Tue, 10 Apr 2018 17:43:07 +0200 swh-scheduler (0.0.22-1~swh1) unstable-swh; urgency=medium * v0.0.22 * Update to a more recent python3-elasticsearch client -- Antoine R. Dumont (@ardumont) Mon, 09 Apr 2018 16:09:16 +0200 swh-scheduler (0.0.21-1~swh1) unstable-swh; urgency=medium * v0.0.21 * Adapt default configuration * Fix typo in configuration variable name -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 15:02:55 +0200 swh-scheduler (0.0.20-1~swh1) unstable-swh; urgency=medium * v0.0.20 * swh.scheduler.cli.archive: Open completed oneshot or disabled * recurring tasks archival endpoint * swh.core.serializer: Move to msgpack serialization format * swh.scheduler.cli: Unify pretty print output * sql/data: Add new task type for loading mercurial dump * swh.scheduler.cli: Add sample use case for the scheduling cli * swh.scheduler.cli: Open policy column to the scheduling cli * swh.scheduler.cli: Open the delimiter option as cli argument * Fix issue when updating task-type without any retry delay defined * swh-scheduler/data: Add new oneshot scheduling load-mercurial task * backend: fix default scheduling_db value for consistency * backend: doc: fix return value of create_tasks -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 11:44:18 +0200 swh-scheduler (0.0.19-1~swh1) unstable-swh; urgency=medium * v0.0.19 * swh.scheduler.utils: Open utility function to create oneshot task -- Antoine R. Dumont (@ardumont) Wed, 29 Nov 2017 12:51:15 +0100 swh-scheduler (0.0.18-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.18 * Celery 4 compatibility -- Nicolas Dandrimont Wed, 08 Nov 2017 17:06:22 +0100 swh-scheduler (0.0.17-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.17 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 18:49:02 +0200 swh-scheduler (0.0.16-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.16 * add some tests * implement one-shot tasks * implement retry on temporary failure -- Nicolas Dandrimont Mon, 07 Aug 2017 18:44:03 +0200 swh-scheduler (0.0.15-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.15 * Add some methods to get the length of task queues * worker: Show logs on stdout if loglevel = debug -- Nicolas Dandrimont Mon, 19 Jun 2017 19:44:56 +0200 swh-scheduler (0.0.14-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler 0.0.14 * Make the return value of tasks available in the listener -- Nicolas Dandrimont Mon, 12 Jun 2017 17:50:32 +0200 swh-scheduler (0.0.13-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.13 * Use systemd for logging rather than PostgreSQL -- Nicolas Dandrimont Fri, 07 Apr 2017 11:57:50 +0200 swh-scheduler (0.0.12-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.12 * Only log to database if the configuration is present -- Nicolas Dandrimont Thu, 09 Mar 2017 11:12:45 +0100 swh-scheduler (0.0.11-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.11 * add utils.get_task -- Nicolas Dandrimont Tue, 14 Feb 2017 19:49:34 +0100 swh-scheduler (0.0.10-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.10 * Allow disabling tasks -- Nicolas Dandrimont Thu, 20 Oct 2016 17:20:17 +0200 swh-scheduler (0.0.9-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.9 * Revert management of one shot tasks * Add possibility of launching several worker instances -- Nicolas Dandrimont Fri, 02 Sep 2016 17:09:18 +0200 swh-scheduler (0.0.7-1~swh1) unstable-swh; urgency=medium * v0.0.7 * Add oneshot task -- Antoine R. Dumont (@ardumont) Fri, 01 Jul 2016 16:42:45 +0200 swh-scheduler (0.0.6-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.6 * More reliability and efficiency when scheduling a lot ot tasks -- Nicolas Dandrimont Wed, 24 Feb 2016 18:46:57 +0100 swh-scheduler (0.0.5-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.5 * Use copy for task mass-scheduling -- Nicolas Dandrimont Wed, 24 Feb 2016 12:13:38 +0100 swh-scheduler (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.4 * general cleanup of the backend * use arrow instead of dateutil * add new cli program -- Nicolas Dandrimont Tue, 23 Feb 2016 17:46:04 +0100 swh-scheduler (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.3 * Implement the timestamp arguments to the task_run functions * Make the celery event listener use a reliable queue -- Nicolas Dandrimont Mon, 22 Feb 2016 15:14:28 +0100 swh-scheduler (0.0.2-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.2 * Multiple schema changes * Initial releases for the celery job runner and the event listener -- Nicolas Dandrimont Fri, 19 Feb 2016 18:50:47 +0100 swh-scheduler (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * Release swh.scheduler v0.0.1 * Move swh.core.scheduling and swh.core.worker to swh.scheduler -- Nicolas Dandrimont Mon, 15 Feb 2016 11:07:30 +0100 diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index d6463d2..680a338 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.24 +Version: 0.0.25 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index c8bb989..7b37d73 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,480 +1,482 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime from functools import wraps import json import tempfile from arrow import Arrow, utcnow import psycopg2 import psycopg2.extras from psycopg2.extensions import AsIs from swh.core.config import SWHConfig def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except Exception: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped class SchedulerBackend(SWHConfig): """ Backend for the Software Heritage scheduling database. """ CONFIG_BASE_FILENAME = 'scheduler.ini' DEFAULT_CONFIG = { 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), } def __init__(self, **override_config): self.config = self.parse_config_file(global_config=False) self.config.update(override_config) self.db = None self.reconnect() def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['scheduling_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() def copy_to(self, items, tblname, columns, cursor=None, item_cb=None): def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, (datetime.datetime, Arrow)): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k)) for k in columns] f.write(','.join(line)) f.write('\n') f.seek(0) cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) task_type_keys = [ 'type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay', ] def _format_query(self, query, keys): """Format a query with the given keys""" query_keys = ', '.join(keys) placeholders = ', '.join(['%s'] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) def _format_multiquery(self, query, keys, values): """Format a query with placeholders generated for multiple values""" query_keys = ', '.join(keys) placeholders = '), ('.join( [', '.join(['%s'] * len(keys))] * len(values) ) ret_values = sum([[value[key] for key in keys] for value in values], []) return ( query.format(keys=query_keys, placeholders=placeholders), ret_values, ) @autocommit def create_task_type(self, task_type, cursor=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ query = self._format_query( """insert into task_type ({keys}) values ({placeholders})""", self.task_type_keys, ) cursor.execute(query, [task_type[key] for key in self.task_type_keys]) @autocommit def get_task_type(self, task_type_name, cursor=None): """Retrieve the task type with id task_type_name""" query = self._format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cursor.execute(query, (task_type_name,)) ret = cursor.fetchone() return ret @autocommit def get_task_types(self, cursor=None): query = self._format_query( "select {keys} from task_type", self.task_type_keys, ) cursor.execute(query) ret = cursor.fetchall() return ret task_create_keys = [ 'type', 'arguments', 'next_run', 'policy', 'retries_left', ] task_keys = task_create_keys + ['id', 'current_interval', 'status'] @autocommit def create_tasks(self, tasks, cursor=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cursor.execute('select swh_scheduler_mktemp_task()') self.copy_to(tasks, 'tmp_task', self.task_create_keys, cursor) query = self._format_query( 'select {keys} from swh_scheduler_create_tasks_from_temp()', self.task_keys, ) cursor.execute(query) return cursor.fetchall() @autocommit def disable_tasks(self, task_ids, cursor=None): """Disable the tasks whose ids are listed.""" query = "UPDATE task SET status = 'disabled' WHERE id IN %s" cursor.execute(query, (tuple(task_ids),)) return None @autocommit def get_tasks(self, task_ids, cursor=None): """Retrieve the info of tasks whose ids are listed.""" query = self._format_query('select {keys} from task where id in %s', self.task_keys) cursor.execute(query, (tuple(task_ids),)) return cursor.fetchall() @autocommit def peek_ready_tasks(self, task_type, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks Args: task_type (str): filtering task per their type timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_peek_ready_tasks(%s, %s, %s)', (task_type, timestamp, num_tasks) ) return cursor.fetchall() @autocommit def grab_ready_tasks(self, task_type, timestamp=None, num_tasks=None, cursor=None): """Fetch the list of ready tasks, and mark them as scheduled Args: task_type (str): filtering task per their type timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_grab_ready_tasks(%s, %s, %s)', (task_type, timestamp, num_tasks) ) return cursor.fetchall() task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata'] @autocommit def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)', (task_id, backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def mass_schedule_task_runs(self, task_runs, cursor=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cursor.execute('select swh_scheduler_mktemp_task_run()') self.copy_to(task_runs, 'tmp_task_run', self.task_run_create_keys, cursor) cursor.execute('select swh_scheduler_schedule_task_run_from_temp()') @autocommit def start_task_run(self, backend_id, metadata=None, timestamp=None, cursor=None): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_start_task_run(%s, %s, %s)', (backend_id, metadata, timestamp) ) return cursor.fetchone() @autocommit def end_task_run(self, backend_id, status, metadata=None, timestamp=None, result=None, cursor=None): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cursor.execute( 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', (backend_id, status, metadata, timestamp) ) return cursor.fetchone() @autocommit def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, cursor=None): """Returns the list of task/task_run prior to a given date to archive. """ last_task_run_id = None while True: row = None cursor.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit) ) for row in cursor: # nested type index does not accept bare values # transform it as a dict to comply with this row['arguments']['args'] = { i: v for i, v in enumerate(row['arguments']['args']) } + kwargs = row['arguments']['kwargs'] + row['arguments']['kwargs'] = json.dumps(kwargs) yield row if not row: break _id = row.get('task_id') _task_run_id = row.get('task_run_id') if last_id == _id and last_task_run_id == _task_run_id: break last_id = _id last_task_run_id = _task_run_id @autocommit def delete_archived_tasks(self, task_ids, cursor=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id['task_id']) _task_run_ids.add(task_id['task_run_id']) cursor.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids)) diff --git a/version.txt b/version.txt index 19ebef5..2c5c848 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.24-0-gf4587a3 \ No newline at end of file +v0.0.25-0-g8124229 \ No newline at end of file