Changeset View
Changeset View
Standalone View
Standalone View
swh/core/statsd.py
Show First 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | |||||
class TimedContextManagerDecorator(object): | class TimedContextManagerDecorator(object): | ||||
""" | """ | ||||
A context manager and a decorator which will report the elapsed time in | A context manager and a decorator which will report the elapsed time in | ||||
the context OR in a function call. | the context OR in a function call. | ||||
Attributes: | Attributes: | ||||
elapsed (float): the elapsed time at the point of completion | elapsed (float): the elapsed time at the point of completion | ||||
""" | """ | ||||
def __init__(self, statsd, metric=None, error_metric=None, | |||||
tags=None, sample_rate=1): | def __init__( | ||||
self, statsd, metric=None, error_metric=None, tags=None, sample_rate=1 | |||||
): | |||||
self.statsd = statsd | self.statsd = statsd | ||||
self.metric = metric | self.metric = metric | ||||
self.error_metric = error_metric | self.error_metric = error_metric | ||||
self.tags = tags | self.tags = tags | ||||
self.sample_rate = sample_rate | self.sample_rate = sample_rate | ||||
self.elapsed = None # this is for testing purpose | self.elapsed = None # this is for testing purpose | ||||
def __call__(self, func): | def __call__(self, func): | ||||
""" | """ | ||||
Decorator which returns the elapsed time of the function call. | Decorator which returns the elapsed time of the function call. | ||||
Default to the function name if metric was not provided. | Default to the function name if metric was not provided. | ||||
""" | """ | ||||
if not self.metric: | if not self.metric: | ||||
self.metric = '%s.%s' % (func.__module__, func.__name__) | self.metric = '%s.%s' % (func.__module__, func.__name__) | ||||
# Coroutines | # Coroutines | ||||
if iscoroutinefunction(func): | if iscoroutinefunction(func): | ||||
@wraps(func) | @wraps(func) | ||||
async def wrapped_co(*args, **kwargs): | async def wrapped_co(*args, **kwargs): | ||||
start = monotonic() | start = monotonic() | ||||
try: | try: | ||||
result = await func(*args, **kwargs) | result = await func(*args, **kwargs) | ||||
except: # noqa | except: # noqa | ||||
self._send_error() | self._send_error() | ||||
raise | raise | ||||
self._send(start) | self._send(start) | ||||
return result | return result | ||||
return wrapped_co | return wrapped_co | ||||
# Others | # Others | ||||
@wraps(func) | @wraps(func) | ||||
def wrapped(*args, **kwargs): | def wrapped(*args, **kwargs): | ||||
start = monotonic() | start = monotonic() | ||||
try: | try: | ||||
result = func(*args, **kwargs) | result = func(*args, **kwargs) | ||||
except: # noqa | except: # noqa | ||||
self._send_error() | self._send_error() | ||||
raise | raise | ||||
self._send(start) | self._send(start) | ||||
return result | return result | ||||
return wrapped | return wrapped | ||||
def __enter__(self): | def __enter__(self): | ||||
if not self.metric: | if not self.metric: | ||||
raise TypeError("Cannot used timed without a metric!") | raise TypeError("Cannot used timed without a metric!") | ||||
self._start = monotonic() | self._start = monotonic() | ||||
return self | return self | ||||
def __exit__(self, type, value, traceback): | def __exit__(self, type, value, traceback): | ||||
# Report the elapsed time of the context manager if no error. | # Report the elapsed time of the context manager if no error. | ||||
if type is None: | if type is None: | ||||
self._send(self._start) | self._send(self._start) | ||||
else: | else: | ||||
self._send_error() | self._send_error() | ||||
def _send(self, start): | def _send(self, start): | ||||
elapsed = (monotonic() - start) * 1000 | elapsed = (monotonic() - start) * 1000 | ||||
self.statsd.timing(self.metric, elapsed, | self.statsd.timing( | ||||
tags=self.tags, sample_rate=self.sample_rate) | self.metric, elapsed, tags=self.tags, sample_rate=self.sample_rate | ||||
) | |||||
self.elapsed = elapsed | self.elapsed = elapsed | ||||
def _send_error(self): | def _send_error(self): | ||||
if self.error_metric is None: | if self.error_metric is None: | ||||
self.error_metric = self.metric + '_error_count' | self.error_metric = self.metric + '_error_count' | ||||
self.statsd.increment(self.error_metric, tags=self.tags) | self.statsd.increment(self.error_metric, tags=self.tags) | ||||
def start(self): | def start(self): | ||||
Show All 27 Lines | Note: | ||||
STATSD_PORT | STATSD_PORT | ||||
Override the default port of the statsd server | Override the default port of the statsd server | ||||
STATSD_TAGS | STATSD_TAGS | ||||
Tags to attach to every metric reported. Example value: | Tags to attach to every metric reported. Example value: | ||||
"label:value,other_label:other_value" | "label:value,other_label:other_value" | ||||
""" | """ | ||||
def __init__(self, host=None, port=None, max_buffer_size=50, | def __init__( | ||||
namespace=None, constant_tags=None): | self, | ||||
host=None, | |||||
port=None, | |||||
max_buffer_size=50, | |||||
namespace=None, | |||||
constant_tags=None, | |||||
): | |||||
# Connection | # Connection | ||||
if host is None: | if host is None: | ||||
host = os.environ.get('STATSD_HOST') or 'localhost' | host = os.environ.get('STATSD_HOST') or 'localhost' | ||||
self.host = host | self.host = host | ||||
if port is None: | if port is None: | ||||
port = os.environ.get('STATSD_PORT') or 8125 | port = os.environ.get('STATSD_PORT') or 8125 | ||||
self.port = int(port) | self.port = int(port) | ||||
# Socket | # Socket | ||||
self._socket = None | self._socket = None | ||||
self.lock = threading.Lock() | self.lock = threading.Lock() | ||||
self.max_buffer_size = max_buffer_size | self.max_buffer_size = max_buffer_size | ||||
self._send = self._send_to_server | self._send = self._send_to_server | ||||
self.encoding = 'utf-8' | self.encoding = 'utf-8' | ||||
# Tags | # Tags | ||||
self.constant_tags = {} | self.constant_tags = {} | ||||
tags_envvar = os.environ.get('STATSD_TAGS', '') | tags_envvar = os.environ.get('STATSD_TAGS', '') | ||||
for tag in tags_envvar.split(','): | for tag in tags_envvar.split(','): | ||||
if not tag: | if not tag: | ||||
continue | continue | ||||
if ':' not in tag: | if ':' not in tag: | ||||
warnings.warn( | warnings.warn( | ||||
'STATSD_TAGS needs to be in key:value format, ' | 'STATSD_TAGS needs to be in key:value format, %s invalid' % tag, | ||||
'%s invalid' % tag, | |||||
UserWarning, | UserWarning, | ||||
) | ) | ||||
continue | continue | ||||
k, v = tag.split(':', 1) | k, v = tag.split(':', 1) | ||||
self.constant_tags[k] = v | self.constant_tags[k] = v | ||||
if constant_tags: | if constant_tags: | ||||
self.constant_tags.update({ | self.constant_tags.update( | ||||
str(k): str(v) | {str(k): str(v) for k, v in constant_tags.items()} | ||||
for k, v in constant_tags.items() | ) | ||||
}) | |||||
# Namespace | # Namespace | ||||
if namespace is not None: | if namespace is not None: | ||||
namespace = str(namespace) | namespace = str(namespace) | ||||
self.namespace = namespace | self.namespace = namespace | ||||
def __enter__(self): | def __enter__(self): | ||||
self.open_buffer(self.max_buffer_size) | self.open_buffer(self.max_buffer_size) | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def timed(self, metric=None, error_metric=None, tags=None, sample_rate=1): | ||||
# Is equivalent to ... | # Is equivalent to ... | ||||
start = time.monotonic() | start = time.monotonic() | ||||
try: | try: | ||||
get_user(user_id) | get_user(user_id) | ||||
finally: | finally: | ||||
statsd.timing('user.query.time', time.monotonic() - start) | statsd.timing('user.query.time', time.monotonic() - start) | ||||
""" | """ | ||||
return TimedContextManagerDecorator( | return TimedContextManagerDecorator( | ||||
statsd=self, metric=metric, | statsd=self, | ||||
metric=metric, | |||||
error_metric=error_metric, | error_metric=error_metric, | ||||
tags=tags, sample_rate=sample_rate) | tags=tags, | ||||
sample_rate=sample_rate, | |||||
) | |||||
def set(self, metric, value, tags=None, sample_rate=1): | def set(self, metric, value, tags=None, sample_rate=1): | ||||
""" | """ | ||||
Sample a set value. | Sample a set value. | ||||
>>> statsd.set('visitors.uniques', 999) | >>> statsd.set('visitors.uniques', 999) | ||||
""" | """ | ||||
self._report(metric, 's', value, tags, sample_rate) | self._report(metric, 's', value, tags, sample_rate) | ||||
▲ Show 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | def _report(self, metric, metric_type, value, tags, sample_rate): | ||||
# Create/format the metric packet | # Create/format the metric packet | ||||
payload = "%s%s:%s|%s%s%s" % ( | payload = "%s%s:%s|%s%s%s" % ( | ||||
(self.namespace + ".") if self.namespace else "", | (self.namespace + ".") if self.namespace else "", | ||||
metric, | metric, | ||||
value, | value, | ||||
metric_type, | metric_type, | ||||
("|@" + str(sample_rate)) if sample_rate != 1 else "", | ("|@" + str(sample_rate)) if sample_rate != 1 else "", | ||||
("|#" + ",".join( | ("|#" + ",".join("%s:%s" % (k, v) for (k, v) in sorted(tags.items()))) | ||||
"%s:%s" % (k, v) | if tags | ||||
for (k, v) in sorted(tags.items()) | else "", | ||||
)) if tags else "", | |||||
) | ) | ||||
# Send it | # Send it | ||||
self._send(payload) | self._send(payload) | ||||
def _send_to_server(self, packet): | def _send_to_server(self, packet): | ||||
try: | try: | ||||
# If set, use socket directly | # If set, use socket directly | ||||
self.socket.send(packet.encode('utf-8')) | self.socket.send(packet.encode('utf-8')) | ||||
Show All 14 Lines | class Statsd(object): | ||||
def _flush_buffer(self): | def _flush_buffer(self): | ||||
self._send_to_server("\n".join(self.buffer)) | self._send_to_server("\n".join(self.buffer)) | ||||
self.buffer = [] | self.buffer = [] | ||||
def _add_constant_tags(self, tags): | def _add_constant_tags(self, tags): | ||||
return { | return { | ||||
str(k): str(v) | str(k): str(v) | ||||
for k, v in itertools.chain( | for k, v in itertools.chain( | ||||
self.constant_tags.items(), | self.constant_tags.items(), (tags if tags else {}).items() | ||||
(tags if tags else {}).items(), | |||||
) | ) | ||||
} | } | ||||
statsd = Statsd() | statsd = Statsd() |