Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/config.py
Show First 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | |||||
class CustomCelery(Celery): | class CustomCelery(Celery): | ||||
def get_queue_stats(self, queue_name): | def get_queue_stats(self, queue_name): | ||||
"""Get the statistics regarding a queue on the broker. | """Get the statistics regarding a queue on the broker. | ||||
Arguments: | Arguments: | ||||
queue_name: name of the queue to check | queue_name: name of the queue to check | ||||
Returns a dictionary raw from the RabbitMQ management API. | Returns a dictionary raw from the RabbitMQ management API; | ||||
or `None` if the current configuration does not use RabbitMQ. | |||||
Interesting keys: | Interesting keys: | ||||
- consumers (number of consumers for the queue) | - consumers (number of consumers for the queue) | ||||
- messages (number of messages in queue) | - messages (number of messages in queue) | ||||
- messages_unacknowledged (number of messages currently being | - messages_unacknowledged (number of messages currently being | ||||
processed) | processed) | ||||
Documentation: https://www.rabbitmq.com/management.html#http-api | Documentation: https://www.rabbitmq.com/management.html#http-api | ||||
""" | """ | ||||
conn_info = self.connection().info() | conn_info = self.connection().info() | ||||
if conn_info['transport'] == 'memory': | |||||
# We're running in a test environment, without RabbitMQ. | |||||
return None | |||||
url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( | url = 'http://{hostname}:{port}/api/queues/{vhost}/{queue}'.format( | ||||
hostname=conn_info['hostname'], | hostname=conn_info['hostname'], | ||||
port=conn_info['port'] + 10000, | port=conn_info['port'] + 10000, | ||||
vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), | vhost=urllib.parse.quote(conn_info['virtual_host'], safe=''), | ||||
queue=urllib.parse.quote(queue_name, safe=''), | queue=urllib.parse.quote(queue_name, safe=''), | ||||
) | ) | ||||
credentials = (conn_info['userid'], conn_info['password']) | credentials = (conn_info['userid'], conn_info['password']) | ||||
r = requests.get(url, auth=credentials) | r = requests.get(url, auth=credentials) | ||||
if r.status_code != 200: | if r.status_code != 200: | ||||
raise ValueError('Got error %s when reading queue stats: %s' % ( | raise ValueError('Got error %s when reading queue stats: %s' % ( | ||||
r.status_code, r.json())) | r.status_code, r.json())) | ||||
return r.json() | return r.json() | ||||
def get_queue_length(self, queue_name): | def get_queue_length(self, queue_name): | ||||
"""Shortcut to get a queue's length""" | """Shortcut to get a queue's length""" | ||||
return self.get_queue_stats(queue_name)['messages'] | stats = self.get_queue_stats(queue_name) | ||||
if stats: | |||||
return stats['messages'] | |||||
INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) | INSTANCE_NAME = os.environ.get(CONFIG_NAME_ENVVAR) | ||||
if INSTANCE_NAME: | if INSTANCE_NAME: | ||||
CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME | CONFIG_NAME = CONFIG_NAME_TEMPLATE % INSTANCE_NAME | ||||
else: | else: | ||||
CONFIG_NAME = DEFAULT_CONFIG_NAME | CONFIG_NAME = DEFAULT_CONFIG_NAME | ||||
▲ Show 20 Lines • Show All 67 Lines • Show Last 20 Lines |