diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -4,38 +4,31 @@ # See top-level LICENSE file for more information import os -import logging - -from flask import request from functools import wraps +import logging from swh.core import config from swh.storage import get_storage as get_swhstorage -from swh.core.api import (RPCServerApp, decode_request, +from swh.core.api import (RPCServerApp, error_handler, encode_data_server as encode_data) -from swh.core.statsd import statsd +from ..storage import Storage +from ..metrics import timed -app = RPCServerApp(__name__) -storage = None - - -OPERATIONS_METRIC = 'swh_storage_operations_total' -OPERATIONS_UNIT_METRIC = "swh_storage_operations_{unit}_total" -DURATION_METRIC = "swh_storage_request_duration_seconds" +def get_storage(): + global storage + if not storage: + storage = get_swhstorage(**app.config['storage']) -def timed(f): - """Time that function! + return storage - """ - @wraps(f) - def d(*a, **kw): - with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): - return f(*a, **kw) - return d +app = RPCServerApp(__name__, + backend_class=Storage, + backend_factory=get_storage) +storage = None def encode(f): @@ -47,73 +40,11 @@ return d -def send_metric(metric, count, method_name): - """Send statsd metric with count for method `method_name` - - If count is 0, the metric is discarded. If the metric is not - parseable, the metric is discarded with a log message. - - Args: - metric (str): Metric's name (e.g content:add, content:add:bytes) - count (int): Associated value for the metric - method_name (str): Method's name - - Returns: - Bool to explicit if metric has been set or not - """ - if count == 0: - return False - - metric_type = metric.split(':') - _length = len(metric_type) - if _length == 2: - object_type, operation = metric_type - metric_name = OPERATIONS_METRIC - elif _length == 3: - object_type, operation, unit = metric_type - metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) - else: - logging.warning('Skipping unknown metric {%s: %s}' % ( - metric, count)) - return False - - statsd.increment( - metric_name, count, tags={ - 'endpoint': method_name, - 'object_type': object_type, - 'operation': operation, - }) - return True - - -def process_metrics(f): - """Increment object counters for the decorated function. - - """ - @wraps(f) - def d(*a, **kw): - r = f(*a, **kw) - for metric, count in r.items(): - send_metric(metric=metric, count=count, method_name=f.__name__) - - return r - - return d - - @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) -def get_storage(): - global storage - if not storage: - storage = get_swhstorage(**app.config['storage']) - - return storage - - @app.route('/') @timed def index(): @@ -130,389 +61,6 @@ ''' -@app.route('/check_config', methods=['POST']) -@timed -def check_config(): - return encode_data(get_storage().check_config(**decode_request(request))) - - -@app.route('/reset', methods=['POST']) -@timed -def reset(): - return encode_data(get_storage().reset(**decode_request(request))) - - -@app.route('/content/missing', methods=['POST']) -@timed -def content_missing(): - return encode_data(get_storage().content_missing( - **decode_request(request))) - - -@app.route('/content/missing/sha1', methods=['POST']) -@timed -def content_missing_per_sha1(): - return encode_data(get_storage().content_missing_per_sha1( - **decode_request(request))) - - -@app.route('/content/skipped/missing', methods=['POST']) -@timed -def skipped_content_missing(): - return encode_data(get_storage().skipped_content_missing( - **decode_request(request))) - - -@app.route('/content/present', methods=['POST']) -@timed -def content_find(): - return encode_data(get_storage().content_find(**decode_request(request))) - - -@app.route('/content/add', methods=['POST']) -@timed -@encode -@process_metrics -def content_add(): - return get_storage().content_add(**decode_request(request)) - - -@app.route('/content/add_metadata', methods=['POST']) -@timed -@encode -@process_metrics -def content_add_metadata(): - return get_storage().content_add_metadata(**decode_request(request)) - - -@app.route('/content/update', methods=['POST']) -@timed -def content_update(): - return encode_data(get_storage().content_update(**decode_request(request))) - - -@app.route('/content/data', methods=['POST']) -@timed -def content_get(): - return encode_data(get_storage().content_get(**decode_request(request))) - - -@app.route('/content/metadata', methods=['POST']) -@timed -def content_get_metadata(): - return encode_data(get_storage().content_get_metadata( - **decode_request(request))) - - -@app.route('/content/range', methods=['POST']) -@timed -def content_get_range(): - return encode_data(get_storage().content_get_range( - **decode_request(request))) - - -@app.route('/directory/missing', methods=['POST']) -@timed -def directory_missing(): - return encode_data(get_storage().directory_missing( - **decode_request(request))) - - -@app.route('/directory/add', methods=['POST']) -@timed -@encode -@process_metrics -def directory_add(): - return get_storage().directory_add(**decode_request(request)) - - -@app.route('/directory/path', methods=['POST']) -@timed -def directory_entry_get_by_path(): - return encode_data(get_storage().directory_entry_get_by_path( - **decode_request(request))) - - -@app.route('/directory/ls', methods=['POST']) -@timed -def directory_ls(): - return encode_data(get_storage().directory_ls( - **decode_request(request))) - - -@app.route('/revision/add', methods=['POST']) -@timed -@encode -@process_metrics -def revision_add(): - return get_storage().revision_add(**decode_request(request)) - - -@app.route('/revision', methods=['POST']) -@timed -def revision_get(): - return encode_data(get_storage().revision_get(**decode_request(request))) - - -@app.route('/revision/log', methods=['POST']) -@timed -def revision_log(): - return encode_data(get_storage().revision_log(**decode_request(request))) - - -@app.route('/revision/shortlog', methods=['POST']) -@timed -def revision_shortlog(): - return encode_data(get_storage().revision_shortlog( - **decode_request(request))) - - -@app.route('/revision/missing', methods=['POST']) -@timed -def revision_missing(): - return encode_data(get_storage().revision_missing( - **decode_request(request))) - - -@app.route('/release/add', methods=['POST']) -@timed -@encode -@process_metrics -def release_add(): - return get_storage().release_add(**decode_request(request)) - - -@app.route('/release', methods=['POST']) -@timed -def release_get(): - return encode_data(get_storage().release_get(**decode_request(request))) - - -@app.route('/release/missing', methods=['POST']) -@timed -def release_missing(): - return encode_data(get_storage().release_missing( - **decode_request(request))) - - -@app.route('/object/find_by_sha1_git', methods=['POST']) -@timed -def object_find_by_sha1_git(): - return encode_data(get_storage().object_find_by_sha1_git( - **decode_request(request))) - - -@app.route('/snapshot/add', methods=['POST']) -@timed -@encode -@process_metrics -def snapshot_add(): - req_data = decode_request(request) - return get_storage().snapshot_add(**req_data) - - -@app.route('/snapshot', methods=['POST']) -@timed -def snapshot_get(): - return encode_data(get_storage().snapshot_get(**decode_request(request))) - - -@app.route('/snapshot/by_origin_visit', methods=['POST']) -@timed -def snapshot_get_by_origin_visit(): - return encode_data(get_storage().snapshot_get_by_origin_visit( - **decode_request(request))) - - -@app.route('/snapshot/latest', methods=['POST']) -@timed -def snapshot_get_latest(): - return encode_data(get_storage().snapshot_get_latest( - **decode_request(request))) - - -@app.route('/snapshot/count_branches', methods=['POST']) -@timed -def snapshot_count_branches(): - return encode_data(get_storage().snapshot_count_branches( - **decode_request(request))) - - -@app.route('/snapshot/get_branches', methods=['POST']) -@timed -def snapshot_get_branches(): - return encode_data(get_storage().snapshot_get_branches( - **decode_request(request))) - - -@app.route('/origin/get', methods=['POST']) -@timed -def origin_get(): - return encode_data(get_storage().origin_get(**decode_request(request))) - - -@app.route('/origin/get_sha1', methods=['POST']) -@timed -def origin_get_by_sha1(): - return encode_data(get_storage().origin_get_by_sha1( - **decode_request(request))) - - -@app.route('/origin/get_range', methods=['POST']) -@timed -def origin_get_range(): - return encode_data(get_storage().origin_get_range( - **decode_request(request))) - - -@app.route('/origin/search', methods=['POST']) -@timed -def origin_search(): - return encode_data(get_storage().origin_search(**decode_request(request))) - - -@app.route('/origin/count', methods=['POST']) -@timed -def origin_count(): - return encode_data(get_storage().origin_count(**decode_request(request))) - - -@app.route('/origin/add_multi', methods=['POST']) -@timed -@encode -def origin_add(): - origins = get_storage().origin_add(**decode_request(request)) - send_metric('origin:add', count=len(origins), method_name='origin_add') - return origins - - -@app.route('/origin/add', methods=['POST']) -@timed -@encode -def origin_add_one(): - origin = get_storage().origin_add_one(**decode_request(request)) - send_metric('origin:add', count=1, method_name='origin_add_one') - return origin - - -@app.route('/origin/visit/get', methods=['POST']) -@timed -def origin_visit_get(): - return encode_data(get_storage().origin_visit_get( - **decode_request(request))) - - -@app.route('/origin/visit/get_random', methods=['POST']) -@timed -def origin_visit_get_random(): - return encode_data(get_storage().origin_visit_get_random( - **decode_request(request))) - - -@app.route('/origin/visit/find_by_date', methods=['POST']) -@timed -def origin_visit_find_by_date(): - return encode_data(get_storage().origin_visit_find_by_date( - **decode_request(request))) - - -@app.route('/origin/visit/getby', methods=['POST']) -@timed -def origin_visit_get_by(): - return encode_data( - get_storage().origin_visit_get_by(**decode_request(request))) - - -@app.route('/origin/visit/get_latest', methods=['POST']) -@timed -def origin_visit_get_latest(): - return encode_data( - get_storage().origin_visit_get_latest(**decode_request(request))) - - -@app.route('/origin/visit/add', methods=['POST']) -@timed -@encode -def origin_visit_add(): - origin_visit = get_storage().origin_visit_add( - **decode_request(request)) - send_metric('origin_visit:add', count=1, method_name='origin_visit') - return origin_visit - - -@app.route('/origin/visit/update', methods=['POST']) -@timed -def origin_visit_update(): - return encode_data(get_storage().origin_visit_update( - **decode_request(request))) - - -@app.route('/origin/visit/upsert', methods=['POST']) -@timed -def origin_visit_upsert(): - return encode_data(get_storage().origin_visit_upsert( - **decode_request(request))) - - -@app.route('/tool/data', methods=['POST']) -@timed -def tool_get(): - return encode_data(get_storage().tool_get( - **decode_request(request))) - - -@app.route('/tool/add', methods=['POST']) -@timed -@encode -def tool_add(): - tools = get_storage().tool_add(**decode_request(request)) - send_metric('tool:add', count=len(tools), method_name='tool_add') - return tools - - -@app.route('/origin/metadata/add', methods=['POST']) -@timed -@encode -def origin_metadata_add(): - origin_metadata = get_storage().origin_metadata_add( - **decode_request(request)) - send_metric( - 'origin_metadata:add', count=1, method_name='origin_metadata_add') - return origin_metadata - - -@app.route('/origin/metadata/get', methods=['POST']) -@timed -def origin_metadata_get_by(): - return encode_data(get_storage().origin_metadata_get_by(**decode_request( - request))) - - -@app.route('/provider/add', methods=['POST']) -@timed -@encode -def metadata_provider_add(): - metadata_provider = get_storage().metadata_provider_add(**decode_request( - request)) - send_metric( - 'metadata_provider:add', count=1, method_name='metadata_provider') - return metadata_provider - - -@app.route('/provider/get', methods=['POST']) -@timed -def metadata_provider_get(): - return encode_data(get_storage().metadata_provider_get(**decode_request( - request))) - - -@app.route('/provider/getby', methods=['POST']) -@timed -def metadata_provider_get_by(): - return encode_data(get_storage().metadata_provider_get_by(**decode_request( - request))) - - @app.route('/stat/counters', methods=['GET']) @timed def stat_counters(): @@ -525,25 +73,6 @@ return encode_data(get_storage().refresh_stat_counters()) -@app.route('/algos/diff_directories', methods=['POST']) -@timed -def diff_directories(): - return encode_data(get_storage().diff_directories( - **decode_request(request))) - - -@app.route('/algos/diff_revisions', methods=['POST']) -@timed -def diff_revisions(): - return encode_data(get_storage().diff_revisions(**decode_request(request))) - - -@app.route('/algos/diff_revision', methods=['POST']) -@timed -def diff_revision(): - return encode_data(get_storage().diff_revision(**decode_request(request))) - - api_cfg = None diff --git a/swh/storage/metrics.py b/swh/storage/metrics.py new file mode 100644 --- /dev/null +++ b/swh/storage/metrics.py @@ -0,0 +1,79 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps +import logging + +from swh.core.statsd import statsd + +OPERATIONS_METRIC = 'swh_storage_operations_total' +OPERATIONS_UNIT_METRIC = "swh_storage_operations_{unit}_total" +DURATION_METRIC = "swh_storage_request_duration_seconds" + + +def timed(f): + """Time that function! + + """ + @wraps(f) + def d(*a, **kw): + with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): + return f(*a, **kw) + + return d + + +def send_metric(metric, count, method_name): + """Send statsd metric with count for method `method_name` + + If count is 0, the metric is discarded. If the metric is not + parseable, the metric is discarded with a log message. + + Args: + metric (str): Metric's name (e.g content:add, content:add:bytes) + count (int): Associated value for the metric + method_name (str): Method's name + + Returns: + Bool to explicit if metric has been set or not + """ + if count == 0: + return False + + metric_type = metric.split(':') + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = OPERATIONS_METRIC + elif _length == 3: + object_type, operation, unit = metric_type + metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) + else: + logging.warning('Skipping unknown metric {%s: %s}' % ( + metric, count)) + return False + + statsd.increment( + metric_name, count, tags={ + 'endpoint': method_name, + 'object_type': object_type, + 'operation': operation, + }) + return True + + +def process_metrics(f): + """Increment object counters for the decorated function. + + """ + @wraps(f) + def d(*a, **kw): + r = f(*a, **kw) + for metric, count in r.items(): + send_metric(metric=metric, count=count, method_name=f.__name__) + + return r + + return d diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -32,6 +32,7 @@ from .db import Db from .exc import StorageDBError from .algos import diff +from .metrics import timed, send_metric, process_metrics # Max block size of contents to return @@ -97,6 +98,7 @@ self.put_db(db) @remote_api_endpoint('check_config') + @timed @db_transaction() def check_config(self, *, check_write, db=None, cur=None): """Check that the storage is configured and ready to go.""" @@ -229,6 +231,8 @@ db.skipped_content_add_from_temp(cur) @remote_api_endpoint('content/add') + @timed + @process_metrics @db_transaction() def content_add(self, content, db=None, cur=None): """Add content blobs to the storage @@ -324,6 +328,7 @@ return summary @remote_api_endpoint('content/update') + @timed @db_transaction() def content_update(self, content, keys=[], db=None, cur=None): """Update content blobs to the storage. Does nothing for unknown @@ -359,6 +364,8 @@ cur=cur) @remote_api_endpoint('content/add_metadata') + @timed + @process_metrics @db_transaction() def content_add_metadata(self, content, db=None, cur=None): """Add content metadata to the storage (like `content_add`, but @@ -405,6 +412,7 @@ return summary @remote_api_endpoint('content/data') + @timed def content_get(self, content): """Retrieve in bulk contents and their data. @@ -443,6 +451,7 @@ yield {'sha1': obj_id, 'data': data} @remote_api_endpoint('content/range') + @timed @db_transaction() def content_get_range(self, start, end, limit=1000, db=None, cur=None): """Retrieve contents within range [start, end] bound by limit. @@ -483,6 +492,7 @@ } @remote_api_endpoint('content/metadata') + @timed @db_transaction_generator(statement_timeout=500) def content_get_metadata(self, content, db=None, cur=None): """Retrieve content metadata in bulk @@ -497,6 +507,7 @@ yield dict(zip(db.content_get_metadata_keys, metadata)) @remote_api_endpoint('content/missing') + @timed @db_transaction_generator() def content_missing(self, content, key_hash='sha1', db=None, cur=None): """List content missing from storage @@ -533,6 +544,7 @@ yield obj[key_hash_idx] @remote_api_endpoint('content/missing/sha1') + @timed @db_transaction_generator() def content_missing_per_sha1(self, contents, db=None, cur=None): """List content missing from storage based only on sha1. @@ -551,6 +563,7 @@ yield obj[0] @remote_api_endpoint('content/skipped/missing') + @timed @db_transaction_generator() def skipped_content_missing(self, contents, db=None, cur=None): """List skipped_content missing from storage @@ -567,6 +580,7 @@ yield dict(zip(db.content_hash_keys, content)) @remote_api_endpoint('content/present') + @timed @db_transaction() def content_find(self, content, db=None, cur=None): """Find a content hash in db. @@ -598,6 +612,8 @@ for content in contents] @remote_api_endpoint('directory/add') + @timed + @process_metrics @db_transaction() def directory_add(self, directories, db=None, cur=None): """Add directories to the storage @@ -684,6 +700,7 @@ return summary @remote_api_endpoint('directory/missing') + @timed @db_transaction_generator() def directory_missing(self, directories, db=None, cur=None): """List directories missing from storage @@ -699,6 +716,7 @@ yield obj[0] @remote_api_endpoint('directory/ls') + @timed @db_transaction_generator(statement_timeout=20000) def directory_ls(self, directory, recursive=False, db=None, cur=None): """Get entries for one directory. @@ -723,6 +741,7 @@ yield dict(zip(db.directory_ls_cols, line)) @remote_api_endpoint('directory/path') + @timed @db_transaction(statement_timeout=2000) def directory_entry_get_by_path(self, directory, paths, db=None, cur=None): """Get the directory entry (either file or dir) from directory with path. @@ -741,6 +760,8 @@ return dict(zip(db.directory_ls_cols, res)) @remote_api_endpoint('revision/add') + @timed + @process_metrics @db_transaction() def revision_add(self, revisions, db=None, cur=None): """Add revisions to the storage @@ -814,6 +835,7 @@ return {'revision:add': len(revisions_missing)} @remote_api_endpoint('revision/missing') + @timed @db_transaction_generator() def revision_missing(self, revisions, db=None, cur=None): """List revisions missing from storage @@ -832,6 +854,7 @@ yield obj[0] @remote_api_endpoint('revision') + @timed @db_transaction_generator(statement_timeout=1000) def revision_get(self, revisions, db=None, cur=None): """Get all revisions from storage @@ -854,6 +877,7 @@ yield data @remote_api_endpoint('revision/log') + @timed @db_transaction_generator(statement_timeout=2000) def revision_log(self, revisions, limit=None, db=None, cur=None): """Fetch revision entry from the given root revisions. @@ -876,6 +900,7 @@ yield data @remote_api_endpoint('revision/shortlog') + @timed @db_transaction_generator(statement_timeout=2000) def revision_shortlog(self, revisions, limit=None, db=None, cur=None): """Fetch the shortlog for the given revisions @@ -892,6 +917,8 @@ yield from db.revision_shortlog(revisions, limit, cur) @remote_api_endpoint('release/add') + @timed + @process_metrics @db_transaction() def release_add(self, releases, db=None, cur=None): """Add releases to the storage @@ -950,6 +977,7 @@ return {'release:add': len(releases_missing)} @remote_api_endpoint('release/missing') + @timed @db_transaction_generator() def release_missing(self, releases, db=None, cur=None): """List releases missing from storage @@ -968,6 +996,7 @@ yield obj[0] @remote_api_endpoint('release') + @timed @db_transaction_generator(statement_timeout=500) def release_get(self, releases, db=None, cur=None): """Given a list of sha1, return the releases's information @@ -987,6 +1016,8 @@ yield data if data['target_type'] else None @remote_api_endpoint('snapshot/add') + @timed + @process_metrics @db_transaction() def snapshot_add(self, snapshots, db=None, cur=None): """Add snapshots to the storage. @@ -1051,6 +1082,7 @@ return {'snapshot:add': count} @remote_api_endpoint('snapshot') + @timed @db_transaction(statement_timeout=2000) def snapshot_get(self, snapshot_id, db=None, cur=None): """Get the content, possibly partial, of a snapshot with the given id @@ -1078,6 +1110,7 @@ return self.snapshot_get_branches(snapshot_id, db=db, cur=cur) @remote_api_endpoint('snapshot/by_origin_visit') + @timed @db_transaction(statement_timeout=2000) def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None): """Get the content, possibly partial, of a snapshot for the given origin visit @@ -1112,6 +1145,7 @@ return None @remote_api_endpoint('snapshot/latest') + @timed @db_transaction(statement_timeout=4000) def snapshot_get_latest(self, origin, allowed_statuses=None, db=None, cur=None): @@ -1160,6 +1194,7 @@ return snapshot @remote_api_endpoint('snapshot/count_branches') + @timed @db_transaction(statement_timeout=2000) def snapshot_count_branches(self, snapshot_id, db=None, cur=None): """Count the number of branches in the snapshot with the given id @@ -1175,6 +1210,7 @@ db.snapshot_count_branches(snapshot_id, cur)]) @remote_api_endpoint('snapshot/get_branches') + @timed @db_transaction(statement_timeout=2000) def snapshot_get_branches(self, snapshot_id, branches_from=b'', branches_count=1000, target_types=None, @@ -1241,6 +1277,7 @@ return None @remote_api_endpoint('origin/visit/add') + @timed @db_transaction() def origin_visit_add(self, origin, date, type, db=None, cur=None): @@ -1274,12 +1311,14 @@ 'visit': visit_id, 'status': 'ongoing', 'metadata': None, 'snapshot': None}) + send_metric('origin_visit:add', count=1, method_name='origin_visit') return { 'origin': origin_url, 'visit': visit_id, } @remote_api_endpoint('origin/visit/update') + @timed @db_transaction() def origin_visit_update(self, origin, visit_id, status=None, metadata=None, snapshot=None, @@ -1324,6 +1363,7 @@ db.origin_visit_update(origin_url, visit_id, updates, cur) @remote_api_endpoint('origin/visit/upsert') + @timed @db_transaction() def origin_visit_upsert(self, visits, db=None, cur=None): """Add a origin_visits with a specific id and with all its data. @@ -1358,6 +1398,7 @@ db.origin_visit_upsert(**visit, cur=cur) @remote_api_endpoint('origin/visit/get') + @timed @db_transaction_generator(statement_timeout=500) def origin_visit_get(self, origin, last_visit=None, limit=None, db=None, cur=None): @@ -1380,6 +1421,7 @@ yield data @remote_api_endpoint('origin/visit/find_by_date') + @timed @db_transaction(statement_timeout=500) def origin_visit_find_by_date(self, origin, visit_date, db=None, cur=None): """Retrieves the origin visit whose date is closest to the provided @@ -1399,6 +1441,7 @@ return dict(zip(db.origin_visit_get_cols, line)) @remote_api_endpoint('origin/visit/getby') + @timed @db_transaction(statement_timeout=500) def origin_visit_get_by(self, origin, visit, db=None, cur=None): """Retrieve origin visit's information. @@ -1418,6 +1461,7 @@ return dict(zip(db.origin_visit_get_cols, ori_visit)) @remote_api_endpoint('origin/visit/get_latest') + @timed @db_transaction(statement_timeout=4000) def origin_visit_get_latest( self, origin, allowed_statuses=None, require_snapshot=False, @@ -1453,6 +1497,7 @@ return dict(zip(db.origin_visit_get_cols, origin_visit)) @remote_api_endpoint('object/find_by_sha1_git') + @timed @db_transaction(statement_timeout=2000) def object_find_by_sha1_git(self, ids, db=None, cur=None): """Return the objects found with the given ids. @@ -1480,6 +1525,7 @@ return ret @remote_api_endpoint('origin/get') + @timed @db_transaction(statement_timeout=500) def origin_get(self, origins, db=None, cur=None): """Return origins, either all identified by their ids or all @@ -1529,6 +1575,7 @@ return [None if res['url'] is None else res for res in results] @remote_api_endpoint('origin/get_sha1') + @timed @db_transaction_generator(statement_timeout=500) def origin_get_by_sha1(self, sha1s, db=None, cur=None): """Return origins, identified by the sha1 of their URLs. @@ -1549,6 +1596,7 @@ yield None @remote_api_endpoint('origin/visit/get_random') + @timed @db_transaction() def origin_visit_get_random( self, type, db=None, cur=None) -> Mapping[str, Any]: @@ -1565,6 +1613,7 @@ return data @remote_api_endpoint('origin/get_range') + @timed @db_transaction_generator() def origin_get_range(self, origin_from=1, origin_count=100, db=None, cur=None): @@ -1585,6 +1634,7 @@ yield dict(zip(db.origin_get_range_cols, origin)) @remote_api_endpoint('origin/search') + @timed @db_transaction_generator() def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False, db=None, cur=None): @@ -1610,6 +1660,7 @@ yield dict(zip(db.origin_cols, origin)) @remote_api_endpoint('origin/count') + @timed @db_transaction() def origin_count(self, url_pattern, regexp=False, with_visit=False, db=None, cur=None): @@ -1630,6 +1681,7 @@ return db.origin_count(url_pattern, regexp, with_visit, cur) @remote_api_endpoint('origin/add_multi') + @timed @db_transaction() def origin_add(self, origins, db=None, cur=None): """Add origins to the storage @@ -1649,9 +1701,11 @@ for origin in origins: self.origin_add_one(origin, db=db, cur=cur) + send_metric('origin:add', count=len(origins), method_name='origin_add') return origins @remote_api_endpoint('origin/add') + @timed @db_transaction() def origin_add_one(self, origin, db=None, cur=None): """Add origin to the storage @@ -1676,7 +1730,9 @@ if self.journal_writer: self.journal_writer.write_addition('origin', origin) - return db.origin_add(origin['url'], cur) + origins = db.origin_add(origin['url'], cur) + send_metric('origin:add', count=len(origins), method_name='origin_add') + return origins @db_transaction(statement_timeout=500) def stat_counters(self, db=None, cur=None): @@ -1711,6 +1767,7 @@ cur.execute('select * from swh_update_counter(%s)', (key,)) @remote_api_endpoint('origin/metadata/add') + @timed @db_transaction() def origin_metadata_add(self, origin_url, ts, provider, tool, metadata, db=None, cur=None): @@ -1729,8 +1786,11 @@ db.origin_metadata_add(origin_url, ts, provider, tool, metadata, cur) + send_metric( + 'origin_metadata:add', count=1, method_name='origin_metadata_add') @remote_api_endpoint('origin/metadata/get') + @timed @db_transaction_generator(statement_timeout=500) def origin_metadata_get_by(self, origin_url, provider_type=None, db=None, cur=None): @@ -1757,6 +1817,7 @@ yield dict(zip(db.origin_metadata_get_cols, line)) @remote_api_endpoint('tool/add') + @timed @db_transaction() def tool_add(self, tools, db=None, cur=None): """Add new tools to the storage. @@ -1782,9 +1843,12 @@ cur) tools = db.tool_add_from_temp(cur) - return [dict(zip(db.tool_cols, line)) for line in tools] + results = [dict(zip(db.tool_cols, line)) for line in tools] + send_metric('tool:add', count=len(results), method_name='tool_add') + return results @remote_api_endpoint('tool/data') + @timed @db_transaction(statement_timeout=500) def tool_get(self, tool, db=None, cur=None): """Retrieve tool information. @@ -1810,6 +1874,7 @@ return dict(zip(db.tool_cols, idx)) @remote_api_endpoint('provider/add') + @timed @db_transaction() def metadata_provider_add(self, provider_name, provider_type, provider_url, metadata, db=None, cur=None): @@ -1824,10 +1889,14 @@ Returns: int: an identifier of the provider """ - return db.metadata_provider_add(provider_name, provider_type, - provider_url, metadata, cur) + result = db.metadata_provider_add(provider_name, provider_type, + provider_url, metadata, cur) + send_metric( + 'metadata_provider:add', count=1, method_name='metadata_provider') + return result @remote_api_endpoint('provider/get') + @timed @db_transaction() def metadata_provider_get(self, provider_id, db=None, cur=None): """Get a metadata provider @@ -1845,6 +1914,7 @@ return dict(zip(db.metadata_provider_cols, result)) @remote_api_endpoint('provider/getby') + @timed @db_transaction() def metadata_provider_get_by(self, provider, db=None, cur=None): """Get a metadata provider @@ -1865,6 +1935,7 @@ return dict(zip(db.metadata_provider_cols, result)) @remote_api_endpoint('algos/diff_directories') + @timed def diff_directories(self, from_dir, to_dir, track_renaming=False): """Compute the list of file changes introduced between two arbitrary directories (insertion / deletion / modification / renaming of files). @@ -1882,6 +1953,7 @@ return diff.diff_directories(self, from_dir, to_dir, track_renaming) @remote_api_endpoint('algos/diff_revisions') + @timed def diff_revisions(self, from_rev, to_rev, track_renaming=False): """Compute the list of file changes introduced between two arbitrary revisions (insertion / deletion / modification / renaming of files). @@ -1899,6 +1971,7 @@ return diff.diff_revisions(self, from_rev, to_rev, track_renaming) @remote_api_endpoint('algos/diff_revision') + @timed def diff_revision(self, revision, track_renaming=False): """Compute the list of file changes introduced by a specific revision (insertion / deletion / modification / renaming of files) by comparing diff --git a/swh/storage/tests/test_metrics.py b/swh/storage/tests/test_metrics.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_metrics.py @@ -0,0 +1,51 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from unittest.mock import patch + +from swh.storage.metrics import ( + send_metric, OPERATIONS_METRIC, OPERATIONS_UNIT_METRIC +) + + +def test_send_metric_unknown_unit(): + r = send_metric('content', count=10, method_name='content_add') + assert r is False + r = send_metric('sthg:add:bytes:extra', count=10, method_name='sthg_add') + assert r is False + + +def test_send_metric_no_value(): + r = send_metric('content:add', count=0, method_name='content_add') + assert r is False + + +@patch('swh.storage.metrics.statsd.increment') +def test_send_metric_no_unit(mock_statsd): + r = send_metric('content:add', count=10, method_name='content_add') + + mock_statsd.assert_called_with(OPERATIONS_METRIC, 10, tags={ + 'endpoint': 'content_add', + 'object_type': 'content', + 'operation': 'add', + }) + + assert r + + +@patch('swh.storage.metrics.statsd.increment') +def test_send_metric_unit(mock_statsd): + unit_ = 'bytes' + r = send_metric('c:add:%s' % unit_, count=100, method_name='c_add') + + expected_metric = OPERATIONS_UNIT_METRIC.format(unit=unit_) + mock_statsd.assert_called_with( + expected_metric, 100, tags={ + 'endpoint': 'c_add', + 'object_type': 'c', + 'operation': 'add', + }) + + assert r diff --git a/swh/storage/tests/test_server.py b/swh/storage/tests/test_server.py --- a/swh/storage/tests/test_server.py +++ b/swh/storage/tests/test_server.py @@ -7,12 +7,7 @@ import pytest import yaml -from unittest.mock import patch - -from swh.storage.api.server import ( - load_and_check_config, send_metric, - OPERATIONS_METRIC, OPERATIONS_UNIT_METRIC -) +from swh.storage.api.server import load_and_check_config def prepare_config_file(tmpdir, content, name='config.yml'): @@ -132,44 +127,3 @@ cfg = load_and_check_config(config_path, type='any') assert cfg == config - - -def test_send_metric_unknown_unit(): - r = send_metric('content', count=10, method_name='content_add') - assert r is False - r = send_metric('sthg:add:bytes:extra', count=10, method_name='sthg_add') - assert r is False - - -def test_send_metric_no_value(): - r = send_metric('content:add', count=0, method_name='content_add') - assert r is False - - -@patch('swh.storage.api.server.statsd.increment') -def test_send_metric_no_unit(mock_statsd): - r = send_metric('content:add', count=10, method_name='content_add') - - mock_statsd.assert_called_with(OPERATIONS_METRIC, 10, tags={ - 'endpoint': 'content_add', - 'object_type': 'content', - 'operation': 'add', - }) - - assert r - - -@patch('swh.storage.api.server.statsd.increment') -def test_send_metric_unit(mock_statsd): - unit_ = 'bytes' - r = send_metric('c:add:%s' % unit_, count=100, method_name='c_add') - - expected_metric = OPERATIONS_UNIT_METRIC.format(unit=unit_) - mock_statsd.assert_called_with( - expected_metric, 100, tags={ - 'endpoint': 'c_add', - 'object_type': 'c', - 'operation': 'add', - }) - - assert r