Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9749506
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
29 KB
Subscribers
None
View Options
diff --git a/PKG-INFO b/PKG-INFO
index 955abca..67cd4dc 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,65 +1,65 @@
Metadata-Version: 2.1
Name: swh.scheduler
-Version: 0.0.42
+Version: 0.0.43
Summary: Software Heritage Scheduler
Home-page: https://forge.softwareheritage.org/diffusion/DSCH/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
-Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
+Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler
Description: swh-scheduler
=============
Job scheduler for the Software Heritage project.
Task manager for asynchronous/delayed tasks, used for both recurrent (e.g.,
listing a forge, loading new stuff from a Git repository) and one-off
activities (e.g., loading a specific version of a source package).
# Tests
## Running test manually
### Test data
To be able to run (unit) tests, you need to have the
[[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]]
in the parent directory. If you have set your environment following the
[[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]]
document everything should be set up just fine.
Otherwise:
```
~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata
```
### Required services
Unit tests that require a running celery broker uses an in memory broker/result
backend by default, but you can choose to use a true broker by setting
`CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up.
For example:
```
$ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests
.....................................
----------------------------------------------------------------------
Ran 37 tests in 15.578s
OK
```
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO
index 955abca..67cd4dc 100644
--- a/swh.scheduler.egg-info/PKG-INFO
+++ b/swh.scheduler.egg-info/PKG-INFO
@@ -1,65 +1,65 @@
Metadata-Version: 2.1
Name: swh.scheduler
-Version: 0.0.42
+Version: 0.0.43
Summary: Software Heritage Scheduler
Home-page: https://forge.softwareheritage.org/diffusion/DSCH/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
-Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
+Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler
Description: swh-scheduler
=============
Job scheduler for the Software Heritage project.
Task manager for asynchronous/delayed tasks, used for both recurrent (e.g.,
listing a forge, loading new stuff from a Git repository) and one-off
activities (e.g., loading a specific version of a source package).
# Tests
## Running test manually
### Test data
To be able to run (unit) tests, you need to have the
[[https://forge.softwareheritage.org/source/swh-storage-testdata.git|swh-storage-testdata]]
in the parent directory. If you have set your environment following the
[[ https://docs.softwareheritage.org/devel/getting-started.html#getting-started|Getting started]]
document everything should be set up just fine.
Otherwise:
```
~/.../swh-scheduler$ git clone https://forge.softwareheritage.org/source/swh-storage-testdata.git ../swh-storage-testdata
```
### Required services
Unit tests that require a running celery broker uses an in memory broker/result
backend by default, but you can choose to use a true broker by setting
`CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` environment variables up.
For example:
```
$ CELERY_BROKER_URL=amqp://localhost pifpaf run postgresql nosetests
.....................................
----------------------------------------------------------------------
Ran 37 tests in 15.578s
OK
```
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py
index dca4a7f..98e9107 100644
--- a/swh/scheduler/celery_backend/listener.py
+++ b/swh/scheduler/celery_backend/listener.py
@@ -1,208 +1,207 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import logging
import time
-import socket
import sys
import click
from arrow import utcnow
from kombu import Queue
import celery
from celery.events import EventReceiver
class ReliableEventReceiver(EventReceiver):
def __init__(self, channel, handlers=None, routing_key='#',
node_id=None, app=None, queue_prefix='celeryev',
accept=None):
super(ReliableEventReceiver, self).__init__(
channel, handlers, routing_key, node_id, app, queue_prefix, accept)
self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=False,
durable=True)
def get_consumers(self, consumer, channel):
return [consumer(queues=[self.queue],
callbacks=[self._receive], no_ack=False,
accept=self.accept)]
def _receive(self, bodies, message):
logging.debug('## event-receiver: bodies: %s' % bodies)
logging.debug('## event-receiver: message: %s' % message)
if not isinstance(bodies, list): # celery<4 returned body as element
bodies = [bodies]
for body in bodies:
type, body = self.event_from_message(body)
self.process(type, body, message)
def process(self, type, event, message):
"""Process the received event by dispatching it to the appropriate
handler."""
handler = self.handlers.get(type) or self.handlers.get('*')
logging.debug('## event-receiver: type: %s' % type)
logging.debug('## event-receiver: event: %s' % event)
logging.debug('## event-receiver: handler: %s' % handler)
handler and handler(event, message)
ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0)
ACTION_QUEUE_MAX_LENGTH = 1000
def event_monitor(app, backend):
logger = logging.getLogger('swh.scheduler.listener')
actions = {
'last_send': utcnow() - 2*ACTION_SEND_DELAY,
'queue': [],
}
def try_perform_actions(actions=actions):
logger.debug('Try perform pending actions')
if actions['queue'] and (
len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH or
utcnow() - actions['last_send'] > ACTION_SEND_DELAY):
perform_actions(actions)
def perform_actions(actions, backend=backend):
logger.info('Perform %s pending actions' % len(actions['queue']))
action_map = {
'start_task_run': backend.start_task_run,
'end_task_run': backend.end_task_run,
}
messages = []
db = backend.get_db()
cursor = db.cursor(None)
for action in actions['queue']:
messages.append(action['message'])
function = action_map[action['action']]
args = action.get('args', ())
kwargs = action.get('kwargs', {})
kwargs['cur'] = cursor
function(*args, **kwargs)
db.conn.commit()
for message in messages:
if not message.acknowledged:
message.ack()
else:
logger.info('message already acknowledged: %s', message)
actions['queue'] = []
actions['last_send'] = utcnow()
def queue_action(action, actions=actions):
actions['queue'].append(action)
try_perform_actions()
def catchall_event(event, message):
logger.info('event: %s, message:%s', event, message)
if not message.acknowledged:
message.ack()
else:
logger.info('message already acknowledged: %s', message)
try_perform_actions()
def task_started(event, message):
logger.debug('task_started: event: %s' % event)
logger.debug('task_started: message: %s' % message)
queue_action({
'action': 'start_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'metadata': {
'worker': event['hostname'],
},
},
'message': message,
})
def task_succeeded(event, message):
logger.debug('task_succeeded: event: %s' % event)
logger.debug('task_succeeded: message: %s' % message)
result = event['result']
logger.debug('task_succeeded: result: %s' % result)
try:
status = result.get('status')
if status == 'success':
status = 'eventful' if result.get('eventful') else 'uneventful'
except Exception:
status = 'eventful' if result else 'uneventful'
queue_action({
'action': 'end_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'status': status,
'result': result,
},
'message': message,
})
def task_failed(event, message):
logger.debug('task_failed: event: %s' % event)
logger.debug('task_failed: message: %s' % message)
queue_action({
'action': 'end_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'status': 'failed',
},
'message': message,
})
recv = ReliableEventReceiver(
celery.current_app.connection(),
app=celery.current_app,
handlers={
'task-started': task_started,
'task-result': task_succeeded,
'task-failed': task_failed,
'*': catchall_event,
},
- node_id='listener-%s' % socket.gethostname(),
+ node_id='listener',
)
errors = 0
while True:
try:
recv.capture(limit=None, timeout=None, wakeup=True)
errors = 0
except KeyboardInterrupt:
logger.exception('Keyboard interrupt, exiting')
break
except Exception:
logger.exception('Unexpected exception')
if errors < 5:
time.sleep(errors)
errors += 1
else:
logger.error('Too many consecutive errors, exiting')
sys.exit(1)
@click.command()
@click.pass_context
def main(ctx):
click.echo("Deprecated! Use 'swh-scheduler listener' instead.",
err=True)
ctx.exit(1)
if __name__ == '__main__':
main()
diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py
index d4bdafc..9e4dc5f 100644
--- a/swh/scheduler/celery_backend/runner.py
+++ b/swh/scheduler/celery_backend/runner.py
@@ -1,115 +1,121 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import arrow
import logging
+from kombu.utils.uuid import uuid
from swh.scheduler import get_scheduler, compute_nb_tasks_from
logger = logging.getLogger(__name__)
# Max batch size for tasks
MAX_NUM_TASKS = 10000
def run_ready_tasks(backend, app):
"""Run tasks that are ready
Args:
backend (Scheduler): backend to read tasks to schedule
app (App): Celery application to send tasks to
Returns:
A list of dictionaries::
{
'task': the scheduler's task id,
'backend_id': Celery's task id,
'scheduler': arrow.utcnow()
}
The result can be used to block-wait for the tasks' results::
backend_tasks = run_ready_tasks(self.scheduler, app)
for task in backend_tasks:
AsyncResult(id=task['backend_id']).get()
"""
all_backend_tasks = []
while True:
task_types = {}
pending_tasks = []
for task_type in backend.get_task_types():
task_type_name = task_type['type']
task_types[task_type_name] = task_type
max_queue_length = task_type['max_queue_length']
backend_name = task_type['backend_name']
if max_queue_length:
try:
queue_length = app.get_queue_length(backend_name)
except ValueError:
queue_length = None
if queue_length is None:
# Running without RabbitMQ (probably a test env).
num_tasks = MAX_NUM_TASKS
else:
num_tasks = min(max_queue_length - queue_length,
MAX_NUM_TASKS)
else:
num_tasks = MAX_NUM_TASKS
if num_tasks > 0:
num_tasks, num_tasks_priority = compute_nb_tasks_from(
num_tasks)
grabbed_tasks = backend.grab_ready_tasks(
task_type_name,
num_tasks=num_tasks,
num_tasks_priority=num_tasks_priority)
if grabbed_tasks:
pending_tasks.extend(grabbed_tasks)
logger.info('Grabbed %s tasks %s',
len(grabbed_tasks), task_type_name)
if not pending_tasks:
return all_backend_tasks
backend_tasks = []
+ celery_tasks = []
for task in pending_tasks:
args = task['arguments']['args']
kwargs = task['arguments']['kwargs']
backend_name = task_types[task['type']]['backend_name']
- celery_result = app.send_task(
- backend_name, args=args, kwargs=kwargs,
- )
+ backend_id = uuid()
+ celery_tasks.append((backend_name, backend_id, args, kwargs))
data = {
'task': task['id'],
- 'backend_id': celery_result.id,
+ 'backend_id': backend_id,
'scheduled': arrow.utcnow(),
}
backend_tasks.append(data)
logger.debug('Sent %s celery tasks', len(backend_tasks))
backend.mass_schedule_task_runs(backend_tasks)
+ for backend_name, backend_id, args, kwargs in celery_tasks:
+ app.send_task(
+ backend_name, task_id=backend_id, args=args, kwargs=kwargs,
+ )
+
all_backend_tasks.extend(backend_tasks)
def main():
from .config import app as main_app
for module in main_app.conf.CELERY_IMPORTS:
__import__(module)
main_backend = get_scheduler('local')
try:
run_ready_tasks(main_backend, main_app)
except Exception:
main_backend.rollback()
raise
if __name__ == '__main__':
main()
diff --git a/swh/scheduler/sql/40-swh-data.sql b/swh/scheduler/sql/40-swh-data.sql
index a0322c6..e44ff5c 100644
--- a/swh/scheduler/sql/40-swh-data.sql
+++ b/swh/scheduler/sql/40-swh-data.sql
@@ -1,286 +1,312 @@
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'swh-loader-mount-dump-and-load-svn-repository',
'Loading svn repositories from svn dump',
'swh.loader.svn.tasks.MountAndLoadSvnRepository',
'1 day', '1 day', '1 day', 1,
1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'origin-update-svn',
'Create dump of a remote svn repository, mount it and load it',
'swh.loader.svn.tasks.DumpMountAndLoadSvnRepository',
'1 day', '1 day', '1 day', 1,
1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
num_retries,
max_queue_length)
values (
'swh-deposit-archive-loading',
'Loading deposit archive into swh through swh-loader-tar',
'swh.deposit.loader.tasks.LoadDepositArchiveTsk',
'1 day', '1 day', '1 day', 1, 3, 1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
num_retries, max_queue_length)
values (
'swh-deposit-archive-checks',
'Pre-checking deposit step before loading into swh archive',
'swh.deposit.loader.tasks.ChecksDepositTsk',
'1 day', '1 day', '1 day', 1, 3, 1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'swh-vault-cooking',
'Cook a Vault bundle',
'swh.vault.cooking_tasks.SWHCookingTask',
'1 day', '1 day', '1 day', 1,
10000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'origin-update-hg',
'Loading mercurial repository swh-loader-mercurial',
'swh.loader.mercurial.tasks.LoadMercurial',
'1 day', '1 day', '1 day', 1,
1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'origin-load-archive-hg',
'Loading archive mercurial repository swh-loader-mercurial',
'swh.loader.mercurial.tasks.LoadArchiveMercurial',
'1 day', '1 day', '1 day', 1,
1000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'origin-update-git',
'Update an origin of type git',
'swh.loader.git.tasks.UpdateGitRepository',
'64 days',
'12:00:00',
'64 days', 2, 5000);
+insert into task_type(
+ type,
+ description,
+ backend_name,
+ default_interval, min_interval, max_interval, backoff_factor)
+values (
+ 'swh-lister-bitbucket-incremental',
+ 'Incrementally list BitBucket',
+ 'swh.lister.bitbucket.tasks.IncrementalBitBucketLister',
+ '1 day',
+ '1 day',
+ '1 day', 1);
+
+insert into task_type(
+ type,
+ description,
+ backend_name,
+ default_interval, min_interval, max_interval, backoff_factor)
+values (
+ 'swh-lister-bitbucket-full',
+ 'Full update of Bitbucket repos list',
+ 'swh.lister.bitbucket.tasks.FullBitBucketRelister',
+ '90 days',
+ '90 days',
+ '90 days', 1);
+
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor)
values (
'swh-lister-github-incremental',
'Incrementally list GitHub',
'swh.lister.github.tasks.IncrementalGitHubLister',
'1 day',
'1 day',
'1 day', 1);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor)
values (
'swh-lister-github-full',
'Full update of GitHub repos list',
'swh.lister.github.tasks.FullGitHubRelister',
'90 days',
'90 days',
'90 days', 1);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor)
values (
'swh-lister-debian',
'List a Debian distribution',
'swh.lister.debian.tasks.DebianListerTask',
'1 day',
'1 day',
'1 day', 1);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor)
values (
'swh-lister-gitlab-incremental',
'Incrementally list a Gitlab instance',
'swh.lister.gitlab.tasks.IncrementalGitLabLister',
'1 day',
'1 day',
'1 day', 1);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor)
values (
'swh-lister-gitlab-full',
'Full update of a Gitlab instance''s repos list',
'swh.lister.gitlab.tasks.FullGitLabRelister',
'90 days',
'90 days',
'90 days', 1);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor)
values (
'swh-lister-pypi',
'Full pypi lister',
'swh.lister.pypi.tasks.PyPIListerTask',
'1 days',
'1 days',
'1 days', 1);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'origin-update-pypi',
'Load Pypi origin',
'swh.loader.pypi.tasks.LoadPyPI',
'64 days', '12:00:00', '64 days', 2,
5000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'indexer_mimetype',
'Mimetype indexer task',
'swh.indexer.tasks.ContentMimetype',
'1 day', '12:00:00', '1 days', 2,
5000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'indexer_range_mimetype',
'Mimetype Range indexer task',
'swh.indexer.tasks.ContentRangeMimetype',
'1 day', '12:00:00', '1 days', 2,
5000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'indexer_fossology_license',
'Fossology license indexer task',
'swh.indexer.tasks.ContentFossologyLicense',
'1 day', '12:00:00', '1 days', 2,
5000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'indexer_range_fossology_license',
'Fossology license range indexer task',
'swh.indexer.tasks.ContentRangeFossologyLicense',
'1 day', '12:00:00', '1 days', 2,
5000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'indexer_origin_head',
'Origin Head indexer task',
'swh.indexer.tasks.OriginHead',
'1 day', '12:00:00', '1 days', 2,
5000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'indexer_revision_metadata',
'Revision Metadata indexer task',
'swh.indexer.tasks.RevisionMetadata',
'1 day', '12:00:00', '1 days', 2,
5000);
insert into task_type(
type,
description,
backend_name,
default_interval, min_interval, max_interval, backoff_factor,
max_queue_length)
values (
'indexer_origin_metadata',
'Origin Metadata indexer task',
'swh.indexer.tasks.OriginMetadata',
'1 day', '12:00:00', '1 days', 2,
20000);
diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py
index ca65742..d2c9cf0 100644
--- a/swh/scheduler/task.py
+++ b/swh/scheduler/task.py
@@ -1,96 +1,97 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from celery import current_app
import celery.app.task
from celery.utils.log import get_task_logger
from swh.core.statsd import Statsd
class SWHTask(celery.app.task.Task):
"""a schedulable task (abstract class)
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
_statsd = None
_log = None
@property
def statsd(self):
if self._statsd:
return self._statsd
worker_name = current_app.conf.get('worker_name')
if worker_name:
self._statsd = Statsd(constant_tags={
'task': self.name,
'worker': worker_name,
})
return self._statsd
else:
return Statsd(constant_tags={
'task': self.name,
'worker': 'unknown worker',
})
def __call__(self, *args, **kwargs):
self.statsd.increment('swh_task_called_count')
with self.statsd.timed('swh_task_duration_seconds'):
return super().__call__(*args, **kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.statsd.increment('swh_task_failure_count')
- self.send_event('task-result-exception')
def on_success(self, retval, task_id, args, kwargs):
self.statsd.increment('swh_task_success_count')
+ # this is a swh specific event. Used to attach the retval to the
+ # task_run
self.send_event('task-result', result=retval)
@property
def log(self):
if self._log is None:
self._log = get_task_logger(self.name)
return self._log
def run(self, *args, **kwargs):
self.log.debug('%s: args=%s, kwargs=%s', self.name, args, kwargs)
ret = super().run(*args, **kwargs)
self.log.debug('%s: OK => %s', self.name, ret)
return ret
class Task(SWHTask):
"""a schedulable task (abstract class)
DEPRECATED! Please use SWHTask as base for decorated functions instead.
Sub-classes must implement the run_task() method.
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
abstract = True
def run(self, *args, **kwargs):
"""This method is called by the celery worker when a task is received.
Should not be overridden as we need our special events to be sent for
the reccurrent scheduler. Override run_task instead."""
return self.run_task(*args, **kwargs)
def run_task(self, *args, **kwargs):
"""Perform the task.
Must return a json-serializable value as it is passed back to the task
scheduler using a celery event.
"""
raise NotImplementedError('tasks must implement the run_task() method')
diff --git a/version.txt b/version.txt
index d860e55..ed1bf66 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-v0.0.42-0-g3488c26
\ No newline at end of file
+v0.0.43-0-gf0a8c43
\ No newline at end of file
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Aug 25, 5:47 PM (3 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3429730
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment