Changeset View
Changeset View
Standalone View
Standalone View
swh/core/statsd.py
Show First 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | |||||
import itertools | import itertools | ||||
import logging | import logging | ||||
import os | import os | ||||
import socket | import socket | ||||
import threading | import threading | ||||
import warnings | import warnings | ||||
log = logging.getLogger('swh.core.statsd') | log = logging.getLogger("swh.core.statsd") | ||||
class TimedContextManagerDecorator(object): | class TimedContextManagerDecorator(object): | ||||
""" | """ | ||||
A context manager and a decorator which will report the elapsed time in | A context manager and a decorator which will report the elapsed time in | ||||
the context OR in a function call. | the context OR in a function call. | ||||
Attributes: | Attributes: | ||||
elapsed (float): the elapsed time at the point of completion | elapsed (float): the elapsed time at the point of completion | ||||
""" | """ | ||||
def __init__(self, statsd, metric=None, error_metric=None, | |||||
tags=None, sample_rate=1): | def __init__( | ||||
self, statsd, metric=None, error_metric=None, tags=None, sample_rate=1 | |||||
): | |||||
self.statsd = statsd | self.statsd = statsd | ||||
self.metric = metric | self.metric = metric | ||||
self.error_metric = error_metric | self.error_metric = error_metric | ||||
self.tags = tags | self.tags = tags | ||||
self.sample_rate = sample_rate | self.sample_rate = sample_rate | ||||
self.elapsed = None # this is for testing purpose | self.elapsed = None # this is for testing purpose | ||||
def __call__(self, func): | def __call__(self, func): | ||||
""" | """ | ||||
Decorator which returns the elapsed time of the function call. | Decorator which returns the elapsed time of the function call. | ||||
Default to the function name if metric was not provided. | Default to the function name if metric was not provided. | ||||
""" | """ | ||||
if not self.metric: | if not self.metric: | ||||
self.metric = '%s.%s' % (func.__module__, func.__name__) | self.metric = "%s.%s" % (func.__module__, func.__name__) | ||||
# Coroutines | # Coroutines | ||||
if iscoroutinefunction(func): | if iscoroutinefunction(func): | ||||
@wraps(func) | @wraps(func) | ||||
async def wrapped_co(*args, **kwargs): | async def wrapped_co(*args, **kwargs): | ||||
start = monotonic() | start = monotonic() | ||||
try: | try: | ||||
result = await func(*args, **kwargs) | result = await func(*args, **kwargs) | ||||
except: # noqa | except: # noqa | ||||
self._send_error() | self._send_error() | ||||
raise | raise | ||||
self._send(start) | self._send(start) | ||||
return result | return result | ||||
return wrapped_co | return wrapped_co | ||||
# Others | # Others | ||||
@wraps(func) | @wraps(func) | ||||
def wrapped(*args, **kwargs): | def wrapped(*args, **kwargs): | ||||
start = monotonic() | start = monotonic() | ||||
try: | try: | ||||
result = func(*args, **kwargs) | result = func(*args, **kwargs) | ||||
except: # noqa | except: # noqa | ||||
self._send_error() | self._send_error() | ||||
raise | raise | ||||
self._send(start) | self._send(start) | ||||
return result | return result | ||||
return wrapped | return wrapped | ||||
def __enter__(self): | def __enter__(self): | ||||
if not self.metric: | if not self.metric: | ||||
raise TypeError("Cannot used timed without a metric!") | raise TypeError("Cannot used timed without a metric!") | ||||
self._start = monotonic() | self._start = monotonic() | ||||
return self | return self | ||||
def __exit__(self, type, value, traceback): | def __exit__(self, type, value, traceback): | ||||
# Report the elapsed time of the context manager if no error. | # Report the elapsed time of the context manager if no error. | ||||
if type is None: | if type is None: | ||||
self._send(self._start) | self._send(self._start) | ||||
else: | else: | ||||
self._send_error() | self._send_error() | ||||
def _send(self, start): | def _send(self, start): | ||||
elapsed = (monotonic() - start) * 1000 | elapsed = (monotonic() - start) * 1000 | ||||
self.statsd.timing(self.metric, elapsed, | self.statsd.timing( | ||||
tags=self.tags, sample_rate=self.sample_rate) | self.metric, elapsed, tags=self.tags, sample_rate=self.sample_rate | ||||
) | |||||
self.elapsed = elapsed | self.elapsed = elapsed | ||||
def _send_error(self): | def _send_error(self): | ||||
if self.error_metric is None: | if self.error_metric is None: | ||||
self.error_metric = self.metric + '_error_count' | self.error_metric = self.metric + "_error_count" | ||||
self.statsd.increment(self.error_metric, tags=self.tags) | self.statsd.increment(self.error_metric, tags=self.tags) | ||||
def start(self): | def start(self): | ||||
"""Start the timer""" | """Start the timer""" | ||||
self.__enter__() | self.__enter__() | ||||
def stop(self): | def stop(self): | ||||
"""Stop the timer, send the metric value""" | """Stop the timer, send the metric value""" | ||||
Show All 22 Lines | Note: | ||||
STATSD_PORT | STATSD_PORT | ||||
Override the default port of the statsd server | Override the default port of the statsd server | ||||
STATSD_TAGS | STATSD_TAGS | ||||
Tags to attach to every metric reported. Example value: | Tags to attach to every metric reported. Example value: | ||||
"label:value,other_label:other_value" | "label:value,other_label:other_value" | ||||
""" | """ | ||||
def __init__(self, host=None, port=None, max_buffer_size=50, | def __init__( | ||||
namespace=None, constant_tags=None): | self, | ||||
host=None, | |||||
port=None, | |||||
max_buffer_size=50, | |||||
namespace=None, | |||||
constant_tags=None, | |||||
): | |||||
# Connection | # Connection | ||||
if host is None: | if host is None: | ||||
host = os.environ.get('STATSD_HOST') or 'localhost' | host = os.environ.get("STATSD_HOST") or "localhost" | ||||
self.host = host | self.host = host | ||||
if port is None: | if port is None: | ||||
port = os.environ.get('STATSD_PORT') or 8125 | port = os.environ.get("STATSD_PORT") or 8125 | ||||
self.port = int(port) | self.port = int(port) | ||||
# Socket | # Socket | ||||
self._socket = None | self._socket = None | ||||
self.lock = threading.Lock() | self.lock = threading.Lock() | ||||
self.max_buffer_size = max_buffer_size | self.max_buffer_size = max_buffer_size | ||||
self._send = self._send_to_server | self._send = self._send_to_server | ||||
self.encoding = 'utf-8' | self.encoding = "utf-8" | ||||
# Tags | # Tags | ||||
self.constant_tags = {} | self.constant_tags = {} | ||||
tags_envvar = os.environ.get('STATSD_TAGS', '') | tags_envvar = os.environ.get("STATSD_TAGS", "") | ||||
for tag in tags_envvar.split(','): | for tag in tags_envvar.split(","): | ||||
if not tag: | if not tag: | ||||
continue | continue | ||||
if ':' not in tag: | if ":" not in tag: | ||||
warnings.warn( | warnings.warn( | ||||
'STATSD_TAGS needs to be in key:value format, ' | "STATSD_TAGS needs to be in key:value format, " "%s invalid" % tag, | ||||
'%s invalid' % tag, | |||||
UserWarning, | UserWarning, | ||||
) | ) | ||||
continue | continue | ||||
k, v = tag.split(':', 1) | k, v = tag.split(":", 1) | ||||
self.constant_tags[k] = v | self.constant_tags[k] = v | ||||
if constant_tags: | if constant_tags: | ||||
self.constant_tags.update({ | self.constant_tags.update( | ||||
str(k): str(v) | {str(k): str(v) for k, v in constant_tags.items()} | ||||
for k, v in constant_tags.items() | ) | ||||
}) | |||||
# Namespace | # Namespace | ||||
if namespace is not None: | if namespace is not None: | ||||
namespace = str(namespace) | namespace = str(namespace) | ||||
self.namespace = namespace | self.namespace = namespace | ||||
def __enter__(self): | def __enter__(self): | ||||
self.open_buffer(self.max_buffer_size) | self.open_buffer(self.max_buffer_size) | ||||
return self | return self | ||||
def __exit__(self, type, value, traceback): | def __exit__(self, type, value, traceback): | ||||
self.close_buffer() | self.close_buffer() | ||||
def gauge(self, metric, value, tags=None, sample_rate=1): | def gauge(self, metric, value, tags=None, sample_rate=1): | ||||
""" | """ | ||||
Record the value of a gauge, optionally setting a list of tags and a | Record the value of a gauge, optionally setting a list of tags and a | ||||
sample rate. | sample rate. | ||||
>>> statsd.gauge('users.online', 123) | >>> statsd.gauge('users.online', 123) | ||||
>>> statsd.gauge('active.connections', 1001, tags={"protocol": "http"}) | >>> statsd.gauge('active.connections', 1001, tags={"protocol": "http"}) | ||||
""" | """ | ||||
return self._report(metric, 'g', value, tags, sample_rate) | return self._report(metric, "g", value, tags, sample_rate) | ||||
def increment(self, metric, value=1, tags=None, sample_rate=1): | def increment(self, metric, value=1, tags=None, sample_rate=1): | ||||
""" | """ | ||||
Increment a counter, optionally setting a value, tags and a sample | Increment a counter, optionally setting a value, tags and a sample | ||||
rate. | rate. | ||||
>>> statsd.increment('page.views') | >>> statsd.increment('page.views') | ||||
>>> statsd.increment('files.transferred', 124) | >>> statsd.increment('files.transferred', 124) | ||||
""" | """ | ||||
self._report(metric, 'c', value, tags, sample_rate) | self._report(metric, "c", value, tags, sample_rate) | ||||
def decrement(self, metric, value=1, tags=None, sample_rate=1): | def decrement(self, metric, value=1, tags=None, sample_rate=1): | ||||
""" | """ | ||||
Decrement a counter, optionally setting a value, tags and a sample | Decrement a counter, optionally setting a value, tags and a sample | ||||
rate. | rate. | ||||
>>> statsd.decrement('files.remaining') | >>> statsd.decrement('files.remaining') | ||||
>>> statsd.decrement('active.connections', 2) | >>> statsd.decrement('active.connections', 2) | ||||
""" | """ | ||||
metric_value = -value if value else value | metric_value = -value if value else value | ||||
self._report(metric, 'c', metric_value, tags, sample_rate) | self._report(metric, "c", metric_value, tags, sample_rate) | ||||
def histogram(self, metric, value, tags=None, sample_rate=1): | def histogram(self, metric, value, tags=None, sample_rate=1): | ||||
""" | """ | ||||
Sample a histogram value, optionally setting tags and a sample rate. | Sample a histogram value, optionally setting tags and a sample rate. | ||||
>>> statsd.histogram('uploaded.file.size', 1445) | >>> statsd.histogram('uploaded.file.size', 1445) | ||||
>>> statsd.histogram('file.count', 26, tags={"filetype": "python"}) | >>> statsd.histogram('file.count', 26, tags={"filetype": "python"}) | ||||
""" | """ | ||||
self._report(metric, 'h', value, tags, sample_rate) | self._report(metric, "h", value, tags, sample_rate) | ||||
def timing(self, metric, value, tags=None, sample_rate=1): | def timing(self, metric, value, tags=None, sample_rate=1): | ||||
""" | """ | ||||
Record a timing, optionally setting tags and a sample rate. | Record a timing, optionally setting tags and a sample rate. | ||||
>>> statsd.timing("query.response.time", 1234) | >>> statsd.timing("query.response.time", 1234) | ||||
""" | """ | ||||
self._report(metric, 'ms', value, tags, sample_rate) | self._report(metric, "ms", value, tags, sample_rate) | ||||
def timed(self, metric=None, error_metric=None, tags=None, sample_rate=1): | def timed(self, metric=None, error_metric=None, tags=None, sample_rate=1): | ||||
""" | """ | ||||
A decorator or context manager that will measure the distribution of a | A decorator or context manager that will measure the distribution of a | ||||
function's/context's run time. Optionally specify a list of tags or a | function's/context's run time. Optionally specify a list of tags or a | ||||
sample rate. If the metric is not defined as a decorator, the module | sample rate. If the metric is not defined as a decorator, the module | ||||
name and function name will be used. The metric is required as a | name and function name will be used. The metric is required as a | ||||
context manager. | context manager. | ||||
Show All 12 Lines | def timed(self, metric=None, error_metric=None, tags=None, sample_rate=1): | ||||
# Is equivalent to ... | # Is equivalent to ... | ||||
start = time.monotonic() | start = time.monotonic() | ||||
try: | try: | ||||
get_user(user_id) | get_user(user_id) | ||||
finally: | finally: | ||||
statsd.timing('user.query.time', time.monotonic() - start) | statsd.timing('user.query.time', time.monotonic() - start) | ||||
""" | """ | ||||
return TimedContextManagerDecorator( | return TimedContextManagerDecorator( | ||||
statsd=self, metric=metric, | statsd=self, | ||||
metric=metric, | |||||
error_metric=error_metric, | error_metric=error_metric, | ||||
tags=tags, sample_rate=sample_rate) | tags=tags, | ||||
sample_rate=sample_rate, | |||||
) | |||||
def set(self, metric, value, tags=None, sample_rate=1): | def set(self, metric, value, tags=None, sample_rate=1): | ||||
""" | """ | ||||
Sample a set value. | Sample a set value. | ||||
>>> statsd.set('visitors.uniques', 999) | >>> statsd.set('visitors.uniques', 999) | ||||
""" | """ | ||||
self._report(metric, 's', value, tags, sample_rate) | self._report(metric, "s", value, tags, sample_rate) | ||||
@property | @property | ||||
def socket(self): | def socket(self): | ||||
""" | """ | ||||
Return a connected socket. | Return a connected socket. | ||||
Note: connect the socket before assigning it to the class instance to | Note: connect the socket before assigning it to the class instance to | ||||
avoid bad thread race conditions. | avoid bad thread race conditions. | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def _report(self, metric, metric_type, value, tags, sample_rate): | ||||
# Create/format the metric packet | # Create/format the metric packet | ||||
payload = "%s%s:%s|%s%s%s" % ( | payload = "%s%s:%s|%s%s%s" % ( | ||||
(self.namespace + ".") if self.namespace else "", | (self.namespace + ".") if self.namespace else "", | ||||
metric, | metric, | ||||
value, | value, | ||||
metric_type, | metric_type, | ||||
("|@" + str(sample_rate)) if sample_rate != 1 else "", | ("|@" + str(sample_rate)) if sample_rate != 1 else "", | ||||
("|#" + ",".join( | ("|#" + ",".join("%s:%s" % (k, v) for (k, v) in sorted(tags.items()))) | ||||
"%s:%s" % (k, v) | if tags | ||||
for (k, v) in sorted(tags.items()) | else "", | ||||
)) if tags else "", | |||||
) | ) | ||||
# Send it | # Send it | ||||
self._send(payload) | self._send(payload) | ||||
def _send_to_server(self, packet): | def _send_to_server(self, packet): | ||||
try: | try: | ||||
# If set, use socket directly | # If set, use socket directly | ||||
self.socket.send(packet.encode('utf-8')) | self.socket.send(packet.encode("utf-8")) | ||||
except socket.timeout: | except socket.timeout: | ||||
return | return | ||||
except socket.error: | except socket.error: | ||||
log.debug( | log.debug( | ||||
"Error submitting statsd packet." | "Error submitting statsd packet." | ||||
" Dropping the packet and closing the socket." | " Dropping the packet and closing the socket." | ||||
) | ) | ||||
self.close_socket() | self.close_socket() | ||||
def _send_to_buffer(self, packet): | def _send_to_buffer(self, packet): | ||||
self.buffer.append(packet) | self.buffer.append(packet) | ||||
if len(self.buffer) >= self.max_buffer_size: | if len(self.buffer) >= self.max_buffer_size: | ||||
self._flush_buffer() | self._flush_buffer() | ||||
def _flush_buffer(self): | def _flush_buffer(self): | ||||
self._send_to_server("\n".join(self.buffer)) | self._send_to_server("\n".join(self.buffer)) | ||||
self.buffer = [] | self.buffer = [] | ||||
def _add_constant_tags(self, tags): | def _add_constant_tags(self, tags): | ||||
return { | return { | ||||
str(k): str(v) | str(k): str(v) | ||||
for k, v in itertools.chain( | for k, v in itertools.chain( | ||||
self.constant_tags.items(), | self.constant_tags.items(), (tags if tags else {}).items() | ||||
(tags if tags else {}).items(), | |||||
) | ) | ||||
} | } | ||||
statsd = Statsd() | statsd = Statsd() |