diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -19,6 +19,9 @@ from swh.core.config import load_named_config from swh.core.logger import JournalHandler +from kombu.serialization import register +from swh.core.serializers import swh_json_dumps, swh_json_loads + DEFAULT_CONFIG_NAME = 'worker' CONFIG_NAME_ENVVAR = 'SWH_WORKER_INSTANCE' @@ -143,6 +146,12 @@ for queue in CONFIG['task_queues']: CELERY_QUEUES.append(Queue(queue, Exchange(queue), routing_key=queue)) + +register('swhjson', swh_json_dumps, swh_json_loads, + content_type='application/json', + content_encoding='utf-8') + + # Instantiate the Celery app app = CustomCelery() app.conf.update( @@ -156,13 +165,15 @@ # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. None means to never expire results. CELERY_TASK_RESULT_EXPIRES=None, + CELERY_TASK_SERIALIZER='swhjson', + # CELERY_RESULT_SERIALIZER='swhjson', # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry - CELERY_ACCEPT_CONTENT=['msgpack', 'json', 'pickle'], + CELERY_ACCEPT_CONTENT=['msgpack', 'swhjson', 'pickle'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True,