Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/listener.py
Show All 10 Lines | |||||
import click | import click | ||||
from arrow import utcnow | from arrow import utcnow | ||||
from kombu import Queue | from kombu import Queue | ||||
import celery | import celery | ||||
from celery.events import EventReceiver | from celery.events import EventReceiver | ||||
from swh.core.statsd import statsd | |||||
class ReliableEventReceiver(EventReceiver): | class ReliableEventReceiver(EventReceiver): | ||||
def __init__(self, channel, handlers=None, routing_key='#', | def __init__(self, channel, handlers=None, routing_key='#', | ||||
node_id=None, app=None, queue_prefix='celeryev', | node_id=None, app=None, queue_prefix='celeryev', | ||||
accept=None): | accept=None): | ||||
super(ReliableEventReceiver, self).__init__( | super(ReliableEventReceiver, self).__init__( | ||||
channel, handlers, routing_key, node_id, app, queue_prefix, accept) | channel, handlers, routing_key, node_id, app, queue_prefix, accept) | ||||
Show All 14 Lines | def _receive(self, bodies, message): | ||||
for body in bodies: | for body in bodies: | ||||
type, body = self.event_from_message(body) | type, body = self.event_from_message(body) | ||||
self.process(type, body, message) | self.process(type, body, message) | ||||
def process(self, type, event, message): | def process(self, type, event, message): | ||||
"""Process the received event by dispatching it to the appropriate | """Process the received event by dispatching it to the appropriate | ||||
handler.""" | handler.""" | ||||
handler = self.handlers.get(type) or self.handlers.get('*') | handler = self.handlers.get(type) or self.handlers.get('*') | ||||
handler and handler(event, message) | if handler: | ||||
handler(event, message) | |||||
statsd.increment('swh_scheduler_listener_handled_event_total', | |||||
ardumont: That could be the occasion to also have a counter for unhandled types (i have no idea how noisy… | |||||
Done Inline Actionsnot sure we want to engage the fight with celery on this. Who knows how many event type may exists here? douardda: not sure we want to engage the fight with celery on this. Who knows how many event type may… | |||||
tags={'event_type': type}) | |||||
ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) | ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0) | ||||
ACTION_QUEUE_MAX_LENGTH = 1000 | ACTION_QUEUE_MAX_LENGTH = 1000 | ||||
def event_monitor(app, backend): | def event_monitor(app, backend): | ||||
logger = logging.getLogger('swh.scheduler.listener') | logger = logging.getLogger('swh.scheduler.listener') | ||||
▲ Show 20 Lines • Show All 148 Lines • Show Last 20 Lines |
That could be the occasion to also have a counter for unhandled types (i have no idea how noisy that would be).
swh_listener_unhandled_event_total or something?