diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py index be389a57..865b3663 100644 --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -1,265 +1,268 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import warnings from swh.core.api import RPCClient from ..exc import StorageAPIError class RemoteStorage(RPCClient): """Proxy to a remote storage API""" api_exception = StorageAPIError def check_config(self, *, check_write): return self.post('check_config', {'check_write': check_write}) def reset(self): return self.post('reset', {}) def content_add(self, content): return self.post('content/add', {'content': content}) def content_add_metadata(self, content): return self.post('content/add_metadata', {'content': content}) def content_update(self, content, keys=[]): return self.post('content/update', {'content': content, 'keys': keys}) def content_missing(self, content, key_hash='sha1'): return self.post('content/missing', {'content': content, 'key_hash': key_hash}) def content_missing_per_sha1(self, contents): return self.post('content/missing/sha1', {'contents': contents}) def skipped_content_missing(self, contents): return self.post('content/skipped/missing', {'contents': contents}) def content_get(self, content): return self.post('content/data', {'content': content}) def content_get_metadata(self, content): return self.post('content/metadata', {'content': content}) def content_get_range(self, start, end, limit=1000): return self.post('content/range', {'start': start, 'end': end, 'limit': limit}) def content_find(self, content): return self.post('content/present', {'content': content}) def directory_add(self, directories): return self.post('directory/add', {'directories': directories}) def directory_missing(self, directories): return self.post('directory/missing', {'directories': directories}) def directory_ls(self, directory, recursive=False): return self.post('directory/ls', {'directory': directory, 'recursive': recursive}) def revision_get(self, revisions): return self.post('revision', {'revisions': revisions}) def revision_log(self, revisions, limit=None): return self.post('revision/log', {'revisions': revisions, 'limit': limit}) def revision_shortlog(self, revisions, limit=None): return self.post('revision/shortlog', {'revisions': revisions, 'limit': limit}) def revision_add(self, revisions): return self.post('revision/add', {'revisions': revisions}) def revision_missing(self, revisions): return self.post('revision/missing', {'revisions': revisions}) def release_add(self, releases): return self.post('release/add', {'releases': releases}) def release_get(self, releases): return self.post('release', {'releases': releases}) def release_missing(self, releases): return self.post('release/missing', {'releases': releases}) def object_find_by_sha1_git(self, ids): return self.post('object/find_by_sha1_git', {'ids': ids}) def snapshot_add(self, snapshots): return self.post('snapshot/add', {'snapshots': snapshots}) def snapshot_get(self, snapshot_id): return self.post('snapshot', { 'snapshot_id': snapshot_id }) def snapshot_get_by_origin_visit(self, origin, visit): return self.post('snapshot/by_origin_visit', { 'origin': origin, 'visit': visit }) def snapshot_get_latest(self, origin, allowed_statuses=None): return self.post('snapshot/latest', { 'origin': origin, 'allowed_statuses': allowed_statuses }) def snapshot_count_branches(self, snapshot_id): return self.post('snapshot/count_branches', { 'snapshot_id': snapshot_id }) def snapshot_get_branches(self, snapshot_id, branches_from=b'', branches_count=1000, target_types=None): return self.post('snapshot/get_branches', { 'snapshot_id': snapshot_id, 'branches_from': branches_from, 'branches_count': branches_count, 'target_types': target_types }) def origin_get(self, origins=None, *, origin=None): if origin is None: if origins is None: raise TypeError('origin_get expected 1 argument') else: assert origins is None origins = origin warnings.warn("argument 'origin' of origin_get was renamed " "to 'origins' in v0.0.123.", DeprecationWarning) return self.post('origin/get', {'origins': origins}) def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False): return self.post('origin/search', {'url_pattern': url_pattern, 'offset': offset, 'limit': limit, 'regexp': regexp, 'with_visit': with_visit}) def origin_count(self, url_pattern, regexp=False, with_visit=False): return self.post('origin/count', {'url_pattern': url_pattern, 'regexp': regexp, 'with_visit': with_visit}) def origin_get_range(self, origin_from=1, origin_count=100): return self.post('origin/get_range', {'origin_from': origin_from, 'origin_count': origin_count}) def origin_add(self, origins): return self.post('origin/add_multi', {'origins': origins}) def origin_add_one(self, origin): return self.post('origin/add', {'origin': origin}) def origin_visit_add(self, origin, date, type=None): return self.post( 'origin/visit/add', {'origin': origin, 'date': date, 'type': type}) def origin_visit_update(self, origin, visit_id, status=None, metadata=None, snapshot=None): return self.post('origin/visit/update', {'origin': origin, 'visit_id': visit_id, 'status': status, 'metadata': metadata, 'snapshot': snapshot}) def origin_visit_upsert(self, visits): return self.post('origin/visit/upsert', {'visits': visits}) def origin_visit_get(self, origin, last_visit=None, limit=None): return self.post('origin/visit/get', { 'origin': origin, 'last_visit': last_visit, 'limit': limit}) def origin_visit_find_by_date(self, origin, visit_date, limit=None): return self.post('origin/visit/find_by_date', { 'origin': origin, 'visit_date': visit_date}) def origin_visit_get_by(self, origin, visit): return self.post('origin/visit/getby', {'origin': origin, 'visit': visit}) def origin_visit_get_latest(self, origin, allowed_statuses=None, require_snapshot=False): return self.post( 'origin/visit/get_latest', {'origin': origin, 'allowed_statuses': allowed_statuses, 'require_snapshot': require_snapshot}) def fetch_history_start(self, origin_id): return self.post('fetch_history/start', {'origin_id': origin_id}) def fetch_history_end(self, fetch_history_id, data): return self.post('fetch_history/end', {'fetch_history_id': fetch_history_id, 'data': data}) def fetch_history_get(self, fetch_history_id): return self.get('fetch_history', {'id': fetch_history_id}) def stat_counters(self): return self.get('stat/counters') + def refresh_stat_counters(self): + return self.get('stat/refresh') + def directory_entry_get_by_path(self, directory, paths): return self.post('directory/path', dict(directory=directory, paths=paths)) def tool_add(self, tools): return self.post('tool/add', {'tools': tools}) def tool_get(self, tool): return self.post('tool/data', {'tool': tool}) def origin_metadata_add(self, origin_id, ts, provider, tool, metadata): return self.post('origin/metadata/add', {'origin_id': origin_id, 'ts': ts, 'provider': provider, 'tool': tool, 'metadata': metadata}) def origin_metadata_get_by(self, origin_id, provider_type=None): return self.post('origin/metadata/get', { 'origin_id': origin_id, 'provider_type': provider_type }) def metadata_provider_add(self, provider_name, provider_type, provider_url, metadata): return self.post('provider/add', {'provider_name': provider_name, 'provider_type': provider_type, 'provider_url': provider_url, 'metadata': metadata}) def metadata_provider_get(self, provider_id): return self.post('provider/get', {'provider_id': provider_id}) def metadata_provider_get_by(self, provider): return self.post('provider/getby', {'provider': provider}) def diff_directories(self, from_dir, to_dir, track_renaming=False): return self.post('algos/diff_directories', {'from_dir': from_dir, 'to_dir': to_dir, 'track_renaming': track_renaming}) def diff_revisions(self, from_rev, to_rev, track_renaming=False): return self.post('algos/diff_revisions', {'from_rev': from_rev, 'to_rev': to_rev, 'track_renaming': track_renaming}) def diff_revision(self, revision, track_renaming=False): return self.post('algos/diff_revision', {'revision': revision, 'track_renaming': track_renaming}) diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py index 9c52c4bb..797c5452 100644 --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -1,613 +1,619 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging from flask import request from functools import wraps from swh.core import config from swh.storage import get_storage as get_swhstorage from swh.core.api import (RPCServerApp, decode_request, error_handler, encode_data_server as encode_data) from swh.core.statsd import statsd app = RPCServerApp(__name__) storage = None OPERATIONS_METRIC = 'swh_storage_operations_total' OPERATIONS_UNIT_METRIC = "swh_storage_operations_{unit}_total" DURATION_METRIC = "swh_storage_request_duration_seconds" def timed(f): """Time that function! """ @wraps(f) def d(*a, **kw): with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): return f(*a, **kw) return d def encode(f): @wraps(f) def d(*a, **kw): r = f(*a, **kw) return encode_data(r) return d def send_metric(metric, count, method_name): """Send statsd metric with count for method `method_name` If count is 0, the metric is discarded. If the metric is not parseable, the metric is discarded with a log message. Args: metric (str): Metric's name (e.g content:add, content:add:bytes) count (int): Associated value for the metric method_name (str): Method's name Returns: Bool to explicit if metric has been set or not """ if count == 0: return False metric_type = metric.split(':') _length = len(metric_type) if _length == 2: object_type, operation = metric_type metric_name = OPERATIONS_METRIC elif _length == 3: object_type, operation, unit = metric_type metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) else: logging.warning('Skipping unknown metric {%s: %s}' % ( metric, count)) return False statsd.increment( metric_name, count, tags={ 'endpoint': method_name, 'object_type': object_type, 'operation': operation, }) return True def process_metrics(f): """Increment object counters for the decorated function. """ @wraps(f) def d(*a, **kw): r = f(*a, **kw) for metric, count in r.items(): send_metric(metric=metric, count=count, method_name=f.__name__) return r return d @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) def get_storage(): global storage if not storage: storage = get_swhstorage(**app.config['storage']) return storage @app.route('/') @timed def index(): return ''' Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

''' @app.route('/check_config', methods=['POST']) @timed def check_config(): return encode_data(get_storage().check_config(**decode_request(request))) @app.route('/reset', methods=['POST']) @timed def reset(): return encode_data(get_storage().reset(**decode_request(request))) @app.route('/content/missing', methods=['POST']) @timed def content_missing(): return encode_data(get_storage().content_missing( **decode_request(request))) @app.route('/content/missing/sha1', methods=['POST']) @timed def content_missing_per_sha1(): return encode_data(get_storage().content_missing_per_sha1( **decode_request(request))) @app.route('/content/skipped/missing', methods=['POST']) @timed def skipped_content_missing(): return encode_data(get_storage().skipped_content_missing( **decode_request(request))) @app.route('/content/present', methods=['POST']) @timed def content_find(): return encode_data(get_storage().content_find(**decode_request(request))) @app.route('/content/add', methods=['POST']) @timed @encode @process_metrics def content_add(): return get_storage().content_add(**decode_request(request)) @app.route('/content/add_metadata', methods=['POST']) @timed @encode @process_metrics def content_add_metadata(): return get_storage().content_add_metadata(**decode_request(request)) @app.route('/content/update', methods=['POST']) @timed def content_update(): return encode_data(get_storage().content_update(**decode_request(request))) @app.route('/content/data', methods=['POST']) @timed def content_get(): return encode_data(get_storage().content_get(**decode_request(request))) @app.route('/content/metadata', methods=['POST']) @timed def content_get_metadata(): return encode_data(get_storage().content_get_metadata( **decode_request(request))) @app.route('/content/range', methods=['POST']) @timed def content_get_range(): return encode_data(get_storage().content_get_range( **decode_request(request))) @app.route('/directory/missing', methods=['POST']) @timed def directory_missing(): return encode_data(get_storage().directory_missing( **decode_request(request))) @app.route('/directory/add', methods=['POST']) @timed @encode @process_metrics def directory_add(): return get_storage().directory_add(**decode_request(request)) @app.route('/directory/path', methods=['POST']) @timed def directory_entry_get_by_path(): return encode_data(get_storage().directory_entry_get_by_path( **decode_request(request))) @app.route('/directory/ls', methods=['POST']) @timed def directory_ls(): return encode_data(get_storage().directory_ls( **decode_request(request))) @app.route('/revision/add', methods=['POST']) @timed @encode @process_metrics def revision_add(): return get_storage().revision_add(**decode_request(request)) @app.route('/revision', methods=['POST']) @timed def revision_get(): return encode_data(get_storage().revision_get(**decode_request(request))) @app.route('/revision/log', methods=['POST']) @timed def revision_log(): return encode_data(get_storage().revision_log(**decode_request(request))) @app.route('/revision/shortlog', methods=['POST']) @timed def revision_shortlog(): return encode_data(get_storage().revision_shortlog( **decode_request(request))) @app.route('/revision/missing', methods=['POST']) @timed def revision_missing(): return encode_data(get_storage().revision_missing( **decode_request(request))) @app.route('/release/add', methods=['POST']) @timed @encode @process_metrics def release_add(): return get_storage().release_add(**decode_request(request)) @app.route('/release', methods=['POST']) @timed def release_get(): return encode_data(get_storage().release_get(**decode_request(request))) @app.route('/release/missing', methods=['POST']) @timed def release_missing(): return encode_data(get_storage().release_missing( **decode_request(request))) @app.route('/object/find_by_sha1_git', methods=['POST']) @timed def object_find_by_sha1_git(): return encode_data(get_storage().object_find_by_sha1_git( **decode_request(request))) @app.route('/snapshot/add', methods=['POST']) @timed @encode @process_metrics def snapshot_add(): req_data = decode_request(request) return get_storage().snapshot_add(**req_data) @app.route('/snapshot', methods=['POST']) @timed def snapshot_get(): return encode_data(get_storage().snapshot_get(**decode_request(request))) @app.route('/snapshot/by_origin_visit', methods=['POST']) @timed def snapshot_get_by_origin_visit(): return encode_data(get_storage().snapshot_get_by_origin_visit( **decode_request(request))) @app.route('/snapshot/latest', methods=['POST']) @timed def snapshot_get_latest(): return encode_data(get_storage().snapshot_get_latest( **decode_request(request))) @app.route('/snapshot/count_branches', methods=['POST']) @timed def snapshot_count_branches(): return encode_data(get_storage().snapshot_count_branches( **decode_request(request))) @app.route('/snapshot/get_branches', methods=['POST']) @timed def snapshot_get_branches(): return encode_data(get_storage().snapshot_get_branches( **decode_request(request))) @app.route('/origin/get', methods=['POST']) @timed def origin_get(): return encode_data(get_storage().origin_get(**decode_request(request))) @app.route('/origin/get_range', methods=['POST']) @timed def origin_get_range(): return encode_data(get_storage().origin_get_range( **decode_request(request))) @app.route('/origin/search', methods=['POST']) @timed def origin_search(): return encode_data(get_storage().origin_search(**decode_request(request))) @app.route('/origin/count', methods=['POST']) @timed def origin_count(): return encode_data(get_storage().origin_count(**decode_request(request))) @app.route('/origin/add_multi', methods=['POST']) @timed @encode def origin_add(): origins = get_storage().origin_add(**decode_request(request)) send_metric('origin:add', count=len(origins), method_name='origin_add') return origins @app.route('/origin/add', methods=['POST']) @timed @encode def origin_add_one(): origin = get_storage().origin_add_one(**decode_request(request)) send_metric('origin:add', count=1, method_name='origin_add_one') return origin @app.route('/origin/visit/get', methods=['POST']) @timed def origin_visit_get(): return encode_data(get_storage().origin_visit_get( **decode_request(request))) @app.route('/origin/visit/find_by_date', methods=['POST']) @timed def origin_visit_find_by_date(): return encode_data(get_storage().origin_visit_find_by_date( **decode_request(request))) @app.route('/origin/visit/getby', methods=['POST']) @timed def origin_visit_get_by(): return encode_data( get_storage().origin_visit_get_by(**decode_request(request))) @app.route('/origin/visit/get_latest', methods=['POST']) @timed def origin_visit_get_latest(): return encode_data( get_storage().origin_visit_get_latest(**decode_request(request))) @app.route('/origin/visit/add', methods=['POST']) @timed @encode def origin_visit_add(): origin_visit = get_storage().origin_visit_add( **decode_request(request)) send_metric('origin_visit:add', count=1, method_name='origin_visit') return origin_visit @app.route('/origin/visit/update', methods=['POST']) @timed def origin_visit_update(): return encode_data(get_storage().origin_visit_update( **decode_request(request))) @app.route('/origin/visit/upsert', methods=['POST']) @timed def origin_visit_upsert(): return encode_data(get_storage().origin_visit_upsert( **decode_request(request))) @app.route('/fetch_history', methods=['GET']) @timed def fetch_history_get(): return encode_data(get_storage().fetch_history_get(request.args['id'])) @app.route('/fetch_history/start', methods=['POST']) @timed def fetch_history_start(): return encode_data( get_storage().fetch_history_start(**decode_request(request))) @app.route('/fetch_history/end', methods=['POST']) @timed def fetch_history_end(): return encode_data( get_storage().fetch_history_end(**decode_request(request))) @app.route('/tool/data', methods=['POST']) @timed def tool_get(): return encode_data(get_storage().tool_get( **decode_request(request))) @app.route('/tool/add', methods=['POST']) @timed @encode def tool_add(): tools = get_storage().tool_add(**decode_request(request)) send_metric('tool:add', count=len(tools), method_name='tool_add') return tools @app.route('/origin/metadata/add', methods=['POST']) @timed @encode def origin_metadata_add(): origin_metadata = get_storage().origin_metadata_add( **decode_request(request)) send_metric( 'origin_metadata:add', count=1, method_name='origin_metadata_add') return origin_metadata @app.route('/origin/metadata/get', methods=['POST']) @timed def origin_metadata_get_by(): return encode_data(get_storage().origin_metadata_get_by(**decode_request( request))) @app.route('/provider/add', methods=['POST']) @timed @encode def metadata_provider_add(): metadata_provider = get_storage().metadata_provider_add(**decode_request( request)) send_metric( 'metadata_provider:add', count=1, method_name='metadata_provider') return metadata_provider @app.route('/provider/get', methods=['POST']) @timed def metadata_provider_get(): return encode_data(get_storage().metadata_provider_get(**decode_request( request))) @app.route('/provider/getby', methods=['POST']) @timed def metadata_provider_get_by(): return encode_data(get_storage().metadata_provider_get_by(**decode_request( request))) @app.route('/stat/counters', methods=['GET']) @timed def stat_counters(): return encode_data(get_storage().stat_counters()) +@app.route('/stat/refresh', methods=['GET']) +@timed +def refresh_stat_counters(): + return encode_data(get_storage().refresh_stat_counters()) + + @app.route('/algos/diff_directories', methods=['POST']) @timed def diff_directories(): return encode_data(get_storage().diff_directories( **decode_request(request))) @app.route('/algos/diff_revisions', methods=['POST']) @timed def diff_revisions(): return encode_data(get_storage().diff_revisions(**decode_request(request))) @app.route('/algos/diff_revision', methods=['POST']) @timed def diff_revision(): return encode_data(get_storage().diff_revision(**decode_request(request))) api_cfg = None def load_and_check_config(config_file, type='local'): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError('Configuration file must be defined') if not os.path.exists(config_file): raise FileNotFoundError('Configuration file %s does not exist' % ( config_file, )) cfg = config.read(config_file) if 'storage' not in cfg: raise KeyError("Missing '%storage' configuration") if type == 'local': vcfg = cfg['storage'] cls = vcfg.get('cls') if cls != 'local': raise ValueError( "The storage backend can only be started with a 'local' " "configuration") args = vcfg['args'] for key in ('db', 'objstorage'): if not args.get(key): raise ValueError( "Invalid configuration; missing '%s' config entry" % key) return cfg def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_file = os.environ.get('SWH_CONFIG_FILENAME') api_cfg = load_and_check_config(config_file) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == '__main__': print('Deprecated. Use swh-storage')