Page MenuHomeSoftware Heritage

Fix scheduler listener on buster's celery version (4.1.0-4)
ClosedPublic

Authored by ardumont on Apr 27 2018, 3:36 PM.

Details

Summary

Without this, the listener just fails as some internal function is no longer defined:

  • Problem 1.
Traceback (most recent call last):
  File "/usr/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/tony/work/inria/repo/swh/swh-environment/swh-scheduler/swh/scheduler/celery_backend/listener.py", line 178, in <module>
    main()
  File "/usr/lib/python3/dist-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/usr/lib/python3/dist-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/usr/lib/python3/dist-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/lib/python3/dist-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/tony/work/inria/repo/swh/swh-environment/swh-scheduler/swh/scheduler/celery_backend/listener.py", line 174, in main
    event_monitor(main_app, backend=scheduler)
  File "/home/tony/work/inria/repo/swh/swh-environment/swh-scheduler/swh/scheduler/celery_backend/listener.py", line 149, in event_monitor
    node_id='listener-%s' % socket.gethostname(),
  File "/home/tony/work/inria/repo/swh/swh-environment/swh-scheduler/swh/scheduler/celery_backend/listener.py", line 30, in __init__
    queue_arguments=self._get_queue_arguments())
AttributeError: 'ReliableEventReceiver' object has no attribute '_get_queue_arguments'
  • Problem 2.
Traceback (most recent call last):
  File "/usr/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/tony/work/inria/repo/swh/swh-environment/swh-scheduler/swh/scheduler/celery_backend/listener.py", line 207, in <module>
    main()
  File "/usr/lib/python3/dist-packages/click/core.py", line 759, in __call__
    return self.main(*args, **kwargs)
  File "/usr/lib/python3/dist-packages/click/core.py", line 714, in main
    rv = self.invoke(ctx)
  File "/usr/lib/python3/dist-packages/click/core.py", line 951, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/lib/python3/dist-packages/click/core.py", line 552, in invoke
    return callback(*args, **kwargs)
  File "/home/tony/work/inria/repo/swh/swh-environment/swh-scheduler/swh/scheduler/celery_backend/listener.py", line 203, in main
    event_monitor(main_app, backend=scheduler)
  File "/home/tony/work/inria/repo/swh/swh-environment/swh-scheduler/swh/scheduler/celery_backend/listener.py", line 173, in event_monitor
    recv.capture(limit=None, timeout=None, wakeup=True)
  File "/usr/local/lib/python3.6/dist-packages/celery/events/receiver.py", line 92, in capture
    return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
  File "/usr/local/lib/python3.6/dist-packages/kombu/mixins.py", line 192, in consume
    conn.drain_events(timeout=safety_interval)
  File "/usr/local/lib/python3.6/dist-packages/kombu/connection.py", line 301, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
  File "/usr/local/lib/python3.6/dist-packages/amqp/connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/lib/python3.6/dist-packages/amqp/connection.py", line 497, in blocking_read
    return self.on_inbound_frame(frame)
  File "/usr/local/lib/python3.6/dist-packages/amqp/method_framing.py", line 77, in on_frame
    callback(channel, msg.frame_method, msg.frame_args, msg)
  File "/usr/local/lib/python3.6/dist-packages/amqp/connection.py", line 501, in on_inbound_method
    method_sig, payload, content,
  File "/usr/local/lib/python3.6/dist-packages/amqp/abstract_channel.py", line 128, in dispatch_method
    listener(*args)
  File "/usr/local/lib/python3.6/dist-packages/amqp/channel.py", line 1597, in _on_basic_deliver
    fun(msg)
  File "/usr/local/lib/python3.6/dist-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/usr/local/lib/python3.6/dist-packages/kombu/messaging.py", line 590, in receive
    [callback(body, message) for callback in callbacks]
  File "/usr/local/lib/python3.6/dist-packages/kombu/messaging.py", line 590, in <listcomp>
    [callback(body, message) for callback in callbacks]
  File "/home/tony/work/inria/repo/swh/swh-environment/swh-scheduler/swh/scheduler/celery_backend/listener.py", line 47, in _receive
    type, body = self.event_from_message(body)
  File "/usr/local/lib/python3.6/dist-packages/celery/events/receiver.py", line 103, in event_from_message
    type = body['type']
TypeError: list indices must be integers or slices, not str

version:

$ dpkg -l python3-celery
Desired=Unknown/Install/Remove/Purge/Hold
| Status=Not/Inst/Conf-files/Unpacked/halF-conf/Half-inst/trig-aWait/Trig-pend
|/ Err?=(none)/Reinst-required (Status,Err: uppercase=bad)
||/ Name                                                           Version                              Architecture                         Description
+++-==============================================================-====================================-====================================-=================================================================================================================================
ii  python3-celery                                                 4.1.0-4                              all                                  async task/job queue based on message passing (Python3 version)

Diff Detail

Repository
rDSCH Scheduling utilities
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

zack added inline comments.
swh/scheduler/celery_backend/listener.py
43–44

are these print (and the following ones) meant to be here?
if we really want to log we should rather use logging module facilities, or am I missing something

swh/scheduler/celery_backend/listener.py
43–44

are these print (and the following ones) meant to be here?

no. it's for debug purposes.

It's a wip work that i proposed to olasd cause the buster version was ... busted ;)
I realize I should have mentioned wip on the diff title.

To clarify, I don't know where we stand for that to be integrated (without print ;) or not.

zack retitled this revision from Fix scheduler listener on buster's celery version (4.1.0-4) to [WIP] Fix scheduler listener on buster's celery version (4.1.0-4).Sep 19 2018, 8:00 PM
vlorentz added inline comments.
swh/scheduler/celery_backend/listener.py
28
ardumont marked an inline comment as done.
  • Fix scheduler listener on buster's celery version (4.1.0-4)
  • celery_backend.listener: Access directly existing queue_arguments

+ Rebase to latest master

  • celery_backend.listener: Use debugging logging statements
ardumont marked 2 inline comments as done.
ardumont added inline comments.
swh/scheduler/celery_backend/listener.py
28

Changing from the initial change to what you propose seems to work.

queue_arguments = self.queue.queue_arguments
28

That's the workaround around the description's stacktrace 1.

48

That's the workaround around the description's stacktrace 2.

ardumont marked an inline comment as done.
ardumont added inline comments.
swh/scheduler/celery_backend/listener.py
43–44

we should rather use logging module facilities, or am I missing something

To clarify, I don't know where we stand for that to be integrated (without print ;) or not.

Changed to logging statements anyway, thanks.

olasd requested changes to this revision.Oct 10 2018, 3:10 PM

Thanks for looking into this!

After tracking down the two upstream changes that made the code break, the changes need to be adapted.

swh/scheduler/celery_backend/listener.py
24

That's due to https://github.com/celery/celery/commit/ff1406474436b173029e2fc024d5c110c2aa338c#diff-4317e106cdad8e6eecc47eb0ccdbeb19

Considering these settings are never used in our case, I think we can just drop the queue_arguments argument completely.

45–48

Celery can now batch events on the worker side before sending them http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#event-batching

This means we really need to process all events from the list, using a for loop.

(that's what celery itself does)

This revision now requires changes to proceed.Oct 10 2018, 3:10 PM

As per inline comments:

  • listener: Drop unused queue_arguments settings
  • listener: Deal with messages received as list

Thanks for looking into this!
After tracking down the two upstream changes that made the code break, the changes need to be adapted.

Awesome. Thanks.
Adapted accordingly.

Cheers,

ardumont retitled this revision from [WIP] Fix scheduler listener on buster's celery version (4.1.0-4) to Fix scheduler listener on buster's celery version (4.1.0-4).Oct 10 2018, 5:39 PM
This revision is now accepted and ready to land.Oct 11 2018, 2:52 PM