diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -3,263 +3,22 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import warnings - from swh.core.api import RPCClient from ..exc import StorageAPIError +from ..storage import Storage class RemoteStorage(RPCClient): """Proxy to a remote storage API""" api_exception = StorageAPIError - - def check_config(self, *, check_write): - return self.post('check_config', {'check_write': check_write}) + backend_class = Storage def reset(self): return self.post('reset', {}) - def content_add(self, content): - return self.post('content/add', {'content': content}) - - def content_add_metadata(self, content): - return self.post('content/add_metadata', {'content': content}) - - def content_update(self, content, keys=[]): - return self.post('content/update', {'content': content, - 'keys': keys}) - - def content_missing(self, content, key_hash='sha1'): - return self.post('content/missing', {'content': content, - 'key_hash': key_hash}) - - def content_missing_per_sha1(self, contents): - return self.post('content/missing/sha1', {'contents': contents}) - - def skipped_content_missing(self, contents): - return self.post('content/skipped/missing', {'contents': contents}) - - def content_get(self, content): - return self.post('content/data', {'content': content}) - - def content_get_metadata(self, content): - return self.post('content/metadata', {'content': content}) - - def content_get_range(self, start, end, limit=1000): - return self.post('content/range', {'start': start, - 'end': end, - 'limit': limit}) - - def content_find(self, content): - return self.post('content/present', {'content': content}) - - def directory_add(self, directories): - return self.post('directory/add', {'directories': directories}) - - def directory_missing(self, directories): - return self.post('directory/missing', {'directories': directories}) - - def directory_ls(self, directory, recursive=False): - return self.post('directory/ls', {'directory': directory, - 'recursive': recursive}) - - def revision_get(self, revisions): - return self.post('revision', {'revisions': revisions}) - - def revision_log(self, revisions, limit=None): - return self.post('revision/log', {'revisions': revisions, - 'limit': limit}) - - def revision_shortlog(self, revisions, limit=None): - return self.post('revision/shortlog', {'revisions': revisions, - 'limit': limit}) - - def revision_add(self, revisions): - return self.post('revision/add', {'revisions': revisions}) - - def revision_missing(self, revisions): - return self.post('revision/missing', {'revisions': revisions}) - - def release_add(self, releases): - return self.post('release/add', {'releases': releases}) - - def release_get(self, releases): - return self.post('release', {'releases': releases}) - - def release_missing(self, releases): - return self.post('release/missing', {'releases': releases}) - - def object_find_by_sha1_git(self, ids): - return self.post('object/find_by_sha1_git', {'ids': ids}) - - def snapshot_add(self, snapshots): - return self.post('snapshot/add', {'snapshots': snapshots}) - - def snapshot_get(self, snapshot_id): - return self.post('snapshot', { - 'snapshot_id': snapshot_id - }) - - def snapshot_get_by_origin_visit(self, origin, visit): - return self.post('snapshot/by_origin_visit', { - 'origin': origin, - 'visit': visit - }) - - def snapshot_get_latest(self, origin, allowed_statuses=None): - return self.post('snapshot/latest', { - 'origin': origin, - 'allowed_statuses': allowed_statuses - }) - - def snapshot_count_branches(self, snapshot_id): - return self.post('snapshot/count_branches', { - 'snapshot_id': snapshot_id - }) - - def snapshot_get_branches(self, snapshot_id, branches_from=b'', - branches_count=1000, target_types=None): - return self.post('snapshot/get_branches', { - 'snapshot_id': snapshot_id, - 'branches_from': branches_from, - 'branches_count': branches_count, - 'target_types': target_types - }) - - def origin_get(self, origins=None, *, origin=None): - if origin is None: - if origins is None: - raise TypeError('origin_get expected 1 argument') - else: - assert origins is None - origins = origin - warnings.warn("argument 'origin' of origin_get was renamed " - "to 'origins' in v0.0.123.", - DeprecationWarning) - return self.post('origin/get', {'origins': origins}) - - def origin_get_by_sha1(self, sha1s): - return self.post('origin/get_sha1', {'sha1s': sha1s}) - - def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, - with_visit=False): - return self.post('origin/search', {'url_pattern': url_pattern, - 'offset': offset, - 'limit': limit, - 'regexp': regexp, - 'with_visit': with_visit}) - - def origin_count(self, url_pattern, regexp=False, with_visit=False): - return self.post('origin/count', {'url_pattern': url_pattern, - 'regexp': regexp, - 'with_visit': with_visit}) - - def origin_get_range(self, origin_from=1, origin_count=100): - return self.post('origin/get_range', {'origin_from': origin_from, - 'origin_count': origin_count}) - - def origin_add(self, origins): - return self.post('origin/add_multi', {'origins': origins}) - - def origin_add_one(self, origin): - return self.post('origin/add', {'origin': origin}) - - def origin_visit_add(self, origin, date, type): - return self.post( - 'origin/visit/add', - {'origin': origin, 'date': date, 'type': type}) - - def origin_visit_update(self, origin, visit_id, status=None, - metadata=None, snapshot=None): - return self.post('origin/visit/update', {'origin': origin, - 'visit_id': visit_id, - 'status': status, - 'metadata': metadata, - 'snapshot': snapshot}) - - def origin_visit_upsert(self, visits): - return self.post('origin/visit/upsert', {'visits': visits}) - - def origin_visit_get(self, origin, last_visit=None, limit=None): - return self.post('origin/visit/get', { - 'origin': origin, 'last_visit': last_visit, 'limit': limit}) - - def origin_visit_get_random(self, type): - return self.post('origin/visit/get_random', { - 'type': type, - }) - - def origin_visit_find_by_date(self, origin, visit_date, limit=None): - return self.post('origin/visit/find_by_date', { - 'origin': origin, 'visit_date': visit_date}) - - def origin_visit_get_by(self, origin, visit): - return self.post('origin/visit/getby', {'origin': origin, - 'visit': visit}) - - def origin_visit_get_latest(self, origin, allowed_statuses=None, - require_snapshot=False): - return self.post( - 'origin/visit/get_latest', - {'origin': origin, 'allowed_statuses': allowed_statuses, - 'require_snapshot': require_snapshot}) - def stat_counters(self): return self.get('stat/counters') def refresh_stat_counters(self): return self.get('stat/refresh') - - def directory_entry_get_by_path(self, directory, paths): - return self.post('directory/path', dict(directory=directory, - paths=paths)) - - def tool_add(self, tools): - return self.post('tool/add', {'tools': tools}) - - def tool_get(self, tool): - return self.post('tool/data', {'tool': tool}) - - def origin_metadata_add(self, origin_url, ts, provider, tool, metadata): - return self.post('origin/metadata/add', {'origin_url': origin_url, - 'ts': ts, - 'provider': provider, - 'tool': tool, - 'metadata': metadata}) - - def origin_metadata_get_by(self, origin_url, provider_type=None): - return self.post('origin/metadata/get', { - 'origin_url': origin_url, - 'provider_type': provider_type - }) - - def metadata_provider_add(self, provider_name, provider_type, provider_url, - metadata): - return self.post('provider/add', {'provider_name': provider_name, - 'provider_type': provider_type, - 'provider_url': provider_url, - 'metadata': metadata}) - - def metadata_provider_get(self, provider_id): - return self.post('provider/get', {'provider_id': provider_id}) - - def metadata_provider_get_by(self, provider): - return self.post('provider/getby', {'provider': provider}) - - def diff_directories(self, from_dir, to_dir, track_renaming=False): - return self.post('algos/diff_directories', - {'from_dir': from_dir, - 'to_dir': to_dir, - 'track_renaming': track_renaming}) - - def diff_revisions(self, from_rev, to_rev, track_renaming=False): - return self.post('algos/diff_revisions', - {'from_rev': from_rev, - 'to_rev': to_rev, - 'track_renaming': track_renaming}) - - def diff_revision(self, revision, track_renaming=False): - return self.post('algos/diff_revision', - {'revision': revision, - 'track_renaming': track_renaming}) diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -6,99 +6,28 @@ import os import logging -from flask import request -from functools import wraps - from swh.core import config from swh.storage import get_storage as get_swhstorage -from swh.core.api import (RPCServerApp, decode_request, +from swh.core.api import (RPCServerApp, error_handler, encode_data_server as encode_data) -from swh.core.statsd import statsd - - -app = RPCServerApp(__name__) -storage = None - - -OPERATIONS_METRIC = 'swh_storage_operations_total' -OPERATIONS_UNIT_METRIC = "swh_storage_operations_{unit}_total" -DURATION_METRIC = "swh_storage_request_duration_seconds" - - -def timed(f): - """Time that function! - - """ - @wraps(f) - def d(*a, **kw): - with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): - return f(*a, **kw) - - return d - - -def encode(f): - @wraps(f) - def d(*a, **kw): - r = f(*a, **kw) - return encode_data(r) - return d +from ..storage import Storage +from ..metrics import timed -def send_metric(metric, count, method_name): - """Send statsd metric with count for method `method_name` - - If count is 0, the metric is discarded. If the metric is not - parseable, the metric is discarded with a log message. - - Args: - metric (str): Metric's name (e.g content:add, content:add:bytes) - count (int): Associated value for the metric - method_name (str): Method's name - - Returns: - Bool to explicit if metric has been set or not - """ - if count == 0: - return False - - metric_type = metric.split(':') - _length = len(metric_type) - if _length == 2: - object_type, operation = metric_type - metric_name = OPERATIONS_METRIC - elif _length == 3: - object_type, operation, unit = metric_type - metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) - else: - logging.warning('Skipping unknown metric {%s: %s}' % ( - metric, count)) - return False - - statsd.increment( - metric_name, count, tags={ - 'endpoint': method_name, - 'object_type': object_type, - 'operation': operation, - }) - return True - - -def process_metrics(f): - """Increment object counters for the decorated function. +def get_storage(): + global storage + if not storage: + storage = get_swhstorage(**app.config['storage']) - """ - @wraps(f) - def d(*a, **kw): - r = f(*a, **kw) - for metric, count in r.items(): - send_metric(metric=metric, count=count, method_name=f.__name__) + return storage - return r - return d +app = RPCServerApp(__name__, + backend_class=Storage, + backend_factory=get_storage) +storage = None @app.errorhandler(Exception) @@ -106,14 +35,6 @@ return error_handler(exception, encode_data) -def get_storage(): - global storage - if not storage: - storage = get_swhstorage(**app.config['storage']) - - return storage - - @app.route('/') @timed def index(): @@ -130,389 +51,6 @@ ''' -@app.route('/check_config', methods=['POST']) -@timed -def check_config(): - return encode_data(get_storage().check_config(**decode_request(request))) - - -@app.route('/reset', methods=['POST']) -@timed -def reset(): - return encode_data(get_storage().reset(**decode_request(request))) - - -@app.route('/content/missing', methods=['POST']) -@timed -def content_missing(): - return encode_data(get_storage().content_missing( - **decode_request(request))) - - -@app.route('/content/missing/sha1', methods=['POST']) -@timed -def content_missing_per_sha1(): - return encode_data(get_storage().content_missing_per_sha1( - **decode_request(request))) - - -@app.route('/content/skipped/missing', methods=['POST']) -@timed -def skipped_content_missing(): - return encode_data(get_storage().skipped_content_missing( - **decode_request(request))) - - -@app.route('/content/present', methods=['POST']) -@timed -def content_find(): - return encode_data(get_storage().content_find(**decode_request(request))) - - -@app.route('/content/add', methods=['POST']) -@timed -@encode -@process_metrics -def content_add(): - return get_storage().content_add(**decode_request(request)) - - -@app.route('/content/add_metadata', methods=['POST']) -@timed -@encode -@process_metrics -def content_add_metadata(): - return get_storage().content_add_metadata(**decode_request(request)) - - -@app.route('/content/update', methods=['POST']) -@timed -def content_update(): - return encode_data(get_storage().content_update(**decode_request(request))) - - -@app.route('/content/data', methods=['POST']) -@timed -def content_get(): - return encode_data(get_storage().content_get(**decode_request(request))) - - -@app.route('/content/metadata', methods=['POST']) -@timed -def content_get_metadata(): - return encode_data(get_storage().content_get_metadata( - **decode_request(request))) - - -@app.route('/content/range', methods=['POST']) -@timed -def content_get_range(): - return encode_data(get_storage().content_get_range( - **decode_request(request))) - - -@app.route('/directory/missing', methods=['POST']) -@timed -def directory_missing(): - return encode_data(get_storage().directory_missing( - **decode_request(request))) - - -@app.route('/directory/add', methods=['POST']) -@timed -@encode -@process_metrics -def directory_add(): - return get_storage().directory_add(**decode_request(request)) - - -@app.route('/directory/path', methods=['POST']) -@timed -def directory_entry_get_by_path(): - return encode_data(get_storage().directory_entry_get_by_path( - **decode_request(request))) - - -@app.route('/directory/ls', methods=['POST']) -@timed -def directory_ls(): - return encode_data(get_storage().directory_ls( - **decode_request(request))) - - -@app.route('/revision/add', methods=['POST']) -@timed -@encode -@process_metrics -def revision_add(): - return get_storage().revision_add(**decode_request(request)) - - -@app.route('/revision', methods=['POST']) -@timed -def revision_get(): - return encode_data(get_storage().revision_get(**decode_request(request))) - - -@app.route('/revision/log', methods=['POST']) -@timed -def revision_log(): - return encode_data(get_storage().revision_log(**decode_request(request))) - - -@app.route('/revision/shortlog', methods=['POST']) -@timed -def revision_shortlog(): - return encode_data(get_storage().revision_shortlog( - **decode_request(request))) - - -@app.route('/revision/missing', methods=['POST']) -@timed -def revision_missing(): - return encode_data(get_storage().revision_missing( - **decode_request(request))) - - -@app.route('/release/add', methods=['POST']) -@timed -@encode -@process_metrics -def release_add(): - return get_storage().release_add(**decode_request(request)) - - -@app.route('/release', methods=['POST']) -@timed -def release_get(): - return encode_data(get_storage().release_get(**decode_request(request))) - - -@app.route('/release/missing', methods=['POST']) -@timed -def release_missing(): - return encode_data(get_storage().release_missing( - **decode_request(request))) - - -@app.route('/object/find_by_sha1_git', methods=['POST']) -@timed -def object_find_by_sha1_git(): - return encode_data(get_storage().object_find_by_sha1_git( - **decode_request(request))) - - -@app.route('/snapshot/add', methods=['POST']) -@timed -@encode -@process_metrics -def snapshot_add(): - req_data = decode_request(request) - return get_storage().snapshot_add(**req_data) - - -@app.route('/snapshot', methods=['POST']) -@timed -def snapshot_get(): - return encode_data(get_storage().snapshot_get(**decode_request(request))) - - -@app.route('/snapshot/by_origin_visit', methods=['POST']) -@timed -def snapshot_get_by_origin_visit(): - return encode_data(get_storage().snapshot_get_by_origin_visit( - **decode_request(request))) - - -@app.route('/snapshot/latest', methods=['POST']) -@timed -def snapshot_get_latest(): - return encode_data(get_storage().snapshot_get_latest( - **decode_request(request))) - - -@app.route('/snapshot/count_branches', methods=['POST']) -@timed -def snapshot_count_branches(): - return encode_data(get_storage().snapshot_count_branches( - **decode_request(request))) - - -@app.route('/snapshot/get_branches', methods=['POST']) -@timed -def snapshot_get_branches(): - return encode_data(get_storage().snapshot_get_branches( - **decode_request(request))) - - -@app.route('/origin/get', methods=['POST']) -@timed -def origin_get(): - return encode_data(get_storage().origin_get(**decode_request(request))) - - -@app.route('/origin/get_sha1', methods=['POST']) -@timed -def origin_get_by_sha1(): - return encode_data(get_storage().origin_get_by_sha1( - **decode_request(request))) - - -@app.route('/origin/get_range', methods=['POST']) -@timed -def origin_get_range(): - return encode_data(get_storage().origin_get_range( - **decode_request(request))) - - -@app.route('/origin/search', methods=['POST']) -@timed -def origin_search(): - return encode_data(get_storage().origin_search(**decode_request(request))) - - -@app.route('/origin/count', methods=['POST']) -@timed -def origin_count(): - return encode_data(get_storage().origin_count(**decode_request(request))) - - -@app.route('/origin/add_multi', methods=['POST']) -@timed -@encode -def origin_add(): - origins = get_storage().origin_add(**decode_request(request)) - send_metric('origin:add', count=len(origins), method_name='origin_add') - return origins - - -@app.route('/origin/add', methods=['POST']) -@timed -@encode -def origin_add_one(): - origin = get_storage().origin_add_one(**decode_request(request)) - send_metric('origin:add', count=1, method_name='origin_add_one') - return origin - - -@app.route('/origin/visit/get', methods=['POST']) -@timed -def origin_visit_get(): - return encode_data(get_storage().origin_visit_get( - **decode_request(request))) - - -@app.route('/origin/visit/get_random', methods=['POST']) -@timed -def origin_visit_get_random(): - return encode_data(get_storage().origin_visit_get_random( - **decode_request(request))) - - -@app.route('/origin/visit/find_by_date', methods=['POST']) -@timed -def origin_visit_find_by_date(): - return encode_data(get_storage().origin_visit_find_by_date( - **decode_request(request))) - - -@app.route('/origin/visit/getby', methods=['POST']) -@timed -def origin_visit_get_by(): - return encode_data( - get_storage().origin_visit_get_by(**decode_request(request))) - - -@app.route('/origin/visit/get_latest', methods=['POST']) -@timed -def origin_visit_get_latest(): - return encode_data( - get_storage().origin_visit_get_latest(**decode_request(request))) - - -@app.route('/origin/visit/add', methods=['POST']) -@timed -@encode -def origin_visit_add(): - origin_visit = get_storage().origin_visit_add( - **decode_request(request)) - send_metric('origin_visit:add', count=1, method_name='origin_visit') - return origin_visit - - -@app.route('/origin/visit/update', methods=['POST']) -@timed -def origin_visit_update(): - return encode_data(get_storage().origin_visit_update( - **decode_request(request))) - - -@app.route('/origin/visit/upsert', methods=['POST']) -@timed -def origin_visit_upsert(): - return encode_data(get_storage().origin_visit_upsert( - **decode_request(request))) - - -@app.route('/tool/data', methods=['POST']) -@timed -def tool_get(): - return encode_data(get_storage().tool_get( - **decode_request(request))) - - -@app.route('/tool/add', methods=['POST']) -@timed -@encode -def tool_add(): - tools = get_storage().tool_add(**decode_request(request)) - send_metric('tool:add', count=len(tools), method_name='tool_add') - return tools - - -@app.route('/origin/metadata/add', methods=['POST']) -@timed -@encode -def origin_metadata_add(): - origin_metadata = get_storage().origin_metadata_add( - **decode_request(request)) - send_metric( - 'origin_metadata:add', count=1, method_name='origin_metadata_add') - return origin_metadata - - -@app.route('/origin/metadata/get', methods=['POST']) -@timed -def origin_metadata_get_by(): - return encode_data(get_storage().origin_metadata_get_by(**decode_request( - request))) - - -@app.route('/provider/add', methods=['POST']) -@timed -@encode -def metadata_provider_add(): - metadata_provider = get_storage().metadata_provider_add(**decode_request( - request)) - send_metric( - 'metadata_provider:add', count=1, method_name='metadata_provider') - return metadata_provider - - -@app.route('/provider/get', methods=['POST']) -@timed -def metadata_provider_get(): - return encode_data(get_storage().metadata_provider_get(**decode_request( - request))) - - -@app.route('/provider/getby', methods=['POST']) -@timed -def metadata_provider_get_by(): - return encode_data(get_storage().metadata_provider_get_by(**decode_request( - request))) - - @app.route('/stat/counters', methods=['GET']) @timed def stat_counters(): @@ -525,25 +63,6 @@ return encode_data(get_storage().refresh_stat_counters()) -@app.route('/algos/diff_directories', methods=['POST']) -@timed -def diff_directories(): - return encode_data(get_storage().diff_directories( - **decode_request(request))) - - -@app.route('/algos/diff_revisions', methods=['POST']) -@timed -def diff_revisions(): - return encode_data(get_storage().diff_revisions(**decode_request(request))) - - -@app.route('/algos/diff_revision', methods=['POST']) -@timed -def diff_revision(): - return encode_data(get_storage().diff_revision(**decode_request(request))) - - api_cfg = None diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -3,11 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import random import select from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize from swh.core.db.db_utils import execute_values_generator +from swh.model.model import SHA1_SIZE class Db(BaseDb): @@ -199,6 +201,10 @@ if ret: return ret[0] + def snapshot_get_random(self, cur=None): + return self._get_random_row_from_table( + 'snapshot', ['id'], 'id', cur) + content_find_cols = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status'] @@ -237,6 +243,10 @@ content = cur.fetchall() return content + def content_get_random(self, cur=None): + return self._get_random_row_from_table( + 'content', ['sha1_git'], 'sha1_git', cur) + def directory_missing_from_list(self, directories, cur=None): cur = self._cursor(cur) yield from execute_values_generator( @@ -280,6 +290,10 @@ return None return data + def directory_get_random(self, cur=None): + return self._get_random_row_from_table( + 'directory', ['id'], 'id', cur) + def revision_missing_from_list(self, revisions, cur=None): cur = self._cursor(cur) @@ -492,6 +506,30 @@ return None return r + def origin_visit_get_random(self, type, cur=None): + """Randomly select one origin visit that was full and in the last 3 + months + + """ + cur = self._cursor(cur) + columns = ','.join(self.origin_visit_select_cols) + query = f"""with visits as ( + select * + from origin_visit + where origin_visit.status='full' and + origin_visit.type=%s and + origin_visit.date > now() - '3 months'::interval + ) + select {columns} + from visits as origin_visit + inner join origin + on origin_visit.origin=origin.id + where random() < 0.1 + limit 1 + """ + cur.execute(query, (type, )) + return cur.fetchone() + @staticmethod def mangle_query_key(key, main_table): if key == 'id': @@ -553,6 +591,10 @@ cur.execute(query, (root_revisions, limit)) yield from cur + def revision_get_random(self, cur=None): + return self._get_random_row_from_table( + 'revision', ['id'], 'id', cur) + def release_missing_from_list(self, releases, cur=None): cur = self._cursor(cur) yield from execute_values_generator( @@ -648,30 +690,6 @@ yield from execute_values_generator( cur, query, ((sha1,) for sha1 in sha1s)) - def origin_visit_get_random(self, type, cur=None): - """Randomly select one origin whose last visit was full in the last 3 - months - - """ - cur = self._cursor(cur) - columns = ','.join(self.origin_visit_select_cols) - query = f"""with visits as ( - select * - from origin_visit - where origin_visit.status='full' and - origin_visit.type=%s and - origin_visit.date > now() - '3 months'::interval - ) - select {columns} - from visits as origin_visit - inner join origin - on origin_visit.origin=origin.id - where random() < 0.1 - limit 1 - """ - cur.execute(query, (type, )) - return cur.fetchone() - def origin_id_get_by_url(self, origins, cur=None): """Retrieve origin `(type, url)` from urls if found.""" cur = self._cursor(cur) @@ -805,6 +823,10 @@ """ % query_keys, ((id,) for id in releases)) + def release_get_random(self, cur=None): + return self._get_random_row_from_table( + 'release', ['id'], 'id', cur) + def origin_metadata_add(self, origin, ts, provider, tool, metadata, cur=None): """ Add an origin_metadata for the origin at ts with provider, tool and @@ -915,3 +937,19 @@ (provider_name, provider_url)) return cur.fetchone() + + def _get_random_row_from_table(self, table_name, cols, id_col, cur=None): + random_sha1 = bytes(random.randint(0, 255) for _ in range(SHA1_SIZE)) + cur = self._cursor(cur) + query = ''' + (SELECT {cols} FROM {table} WHERE {id_col} >= %s + ORDER BY {id_col} LIMIT 1) + UNION + (SELECT {cols} FROM {table} WHERE {id_col} < %s + ORDER BY {id_col} DESC LIMIT 1) + LIMIT 1 + '''.format(cols=', '.join(cols), table=table_name, id_col=id_col) + cur.execute(query, (random_sha1, random_sha1)) + row = cur.fetchone() + if row: + return row[0] diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -420,6 +420,14 @@ yield content break + def content_get_random(self): + """Finds a random content id. + + Returns: + a sha1_git + """ + return random.choice(list(self._content_indexes['sha1_git'])) + def directory_add(self, directories): """Add directories to the storage @@ -535,6 +543,14 @@ """ return self._directory_entry_get_by_path(directory, paths, b'') + def directory_get_random(self): + """Finds a random directory id. + + Returns: + a sha1_git + """ + return random.choice(list(self._directories)) + def _directory_entry_get_by_path(self, directory, paths, prefix): if not paths: return @@ -681,6 +697,14 @@ yield from ((rev['id'], rev['parents']) for rev in self.revision_log(revisions, limit)) + def revision_get_random(self): + """Finds a random revision id. + + Returns: + a sha1_git + """ + return random.choice(list(self._revisions)) + def release_add(self, releases): """Add releases to the storage @@ -756,6 +780,14 @@ else: yield None + def release_get_random(self): + """Finds a random release id. + + Returns: + a sha1_git + """ + return random.choice(list(self._releases)) + def snapshot_add(self, snapshots): """Add a snapshot to the storage @@ -983,6 +1015,14 @@ 'next_branch': next_branch, } + def snapshot_get_random(self): + """Finds a random snapshot id. + + Returns: + a sha1_git + """ + return random.choice(list(self._snapshots)) + def object_find_by_sha1_git(self, ids, db=None, cur=None): """Return the objects found with the given ids. @@ -1092,36 +1132,6 @@ for sha1 in sha1s ] - def _select_random_origin_by_type(self, type: str) -> str: - """Select randomly an origin visit """ - while True: - url = random.choice(list(self._origin_visits.keys())) - random_origin_visits = self._origin_visits[url] - if random_origin_visits[0].type == type: - return url - - def origin_visit_get_random(self, type: str) -> Mapping[str, Any]: - """Randomly select one origin with whose visit was successful - in the last 3 months. - - Returns: - origin dict selected randomly on the dataset - - """ - random_visit: Dict[str, Any] = {} - if not self._origin_visits: # empty dataset - return random_visit - url = self._select_random_origin_by_type(type) - random_origin_visits = copy.deepcopy(self._origin_visits[url]) - random_origin_visits.reverse() - back_in_the_day = now() - timedelta(weeks=12) # 3 months back - # This should be enough for tests - for visit in random_origin_visits: - if visit.date > back_in_the_day and visit.status == 'full': - random_visit = visit.to_dict() - break - return random_visit - def origin_get_range(self, origin_from=1, origin_count=100): """Retrieve ``origin_count`` origins whose ids are greater or equal than ``origin_from``. @@ -1507,6 +1517,37 @@ visits, key=lambda v: (v.date, v.visit), default=None) return self._convert_visit(visit) + def _select_random_origin_visit_by_type(self, type: str) -> str: + """Select randomly an origin visit """ + while True: + url = random.choice(list(self._origin_visits.keys())) + random_origin_visits = self._origin_visits[url] + if random_origin_visits[0].type == type: + return url + + def origin_visit_get_random(self, type: str) -> Mapping[str, Any]: + """Randomly select one successful origin visit with + made in the last 3 months. + + Returns: + dict representing an origin visit, in the same format as + `origin_visit_get`. + + """ + random_visit: Dict[str, Any] = {} + if not self._origin_visits: # empty dataset + return random_visit + url = self._select_random_origin_visit_by_type(type) + random_origin_visits = copy.deepcopy(self._origin_visits[url]) + random_origin_visits.reverse() + back_in_the_day = now() - timedelta(weeks=12) # 3 months back + # This should be enough for tests + for visit in random_origin_visits: + if visit.date > back_in_the_day and visit.status == 'full': + random_visit = visit.to_dict() + break + return random_visit + def stat_counters(self): """compute statistics about the number of tuples in various tables diff --git a/swh/storage/metrics.py b/swh/storage/metrics.py new file mode 100644 --- /dev/null +++ b/swh/storage/metrics.py @@ -0,0 +1,79 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps +import logging + +from swh.core.statsd import statsd + +OPERATIONS_METRIC = 'swh_storage_operations_total' +OPERATIONS_UNIT_METRIC = "swh_storage_operations_{unit}_total" +DURATION_METRIC = "swh_storage_request_duration_seconds" + + +def timed(f): + """Time that function! + + """ + @wraps(f) + def d(*a, **kw): + with statsd.timed(DURATION_METRIC, tags={'endpoint': f.__name__}): + return f(*a, **kw) + + return d + + +def send_metric(metric, count, method_name): + """Send statsd metric with count for method `method_name` + + If count is 0, the metric is discarded. If the metric is not + parseable, the metric is discarded with a log message. + + Args: + metric (str): Metric's name (e.g content:add, content:add:bytes) + count (int): Associated value for the metric + method_name (str): Method's name + + Returns: + Bool to explicit if metric has been set or not + """ + if count == 0: + return False + + metric_type = metric.split(':') + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = OPERATIONS_METRIC + elif _length == 3: + object_type, operation, unit = metric_type + metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit) + else: + logging.warning('Skipping unknown metric {%s: %s}' % ( + metric, count)) + return False + + statsd.increment( + metric_name, count, tags={ + 'endpoint': method_name, + 'object_type': object_type, + 'operation': operation, + }) + return True + + +def process_metrics(f): + """Increment object counters for the decorated function. + + """ + @wraps(f) + def d(*a, **kw): + r = f(*a, **kw) + for metric, count in r.items(): + send_metric(metric=metric, count=count, method_name=f.__name__) + + return r + + return d diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -17,12 +17,7 @@ import psycopg2 import psycopg2.pool -from . import converters -from .common import db_transaction_generator, db_transaction -from .db import Db -from .exc import StorageDBError -from .algos import diff - +from swh.core.api import remote_api_endpoint from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError @@ -32,6 +27,13 @@ get_journal_writer = None # type: ignore # mypy limitation, see https://github.com/python/mypy/issues/1153 +from . import converters +from .common import db_transaction_generator, db_transaction +from .db import Db +from .exc import StorageDBError +from .algos import diff +from .metrics import timed, send_metric, process_metrics + # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 @@ -95,8 +97,10 @@ if db: self.put_db(db) + @remote_api_endpoint('check_config') + @timed @db_transaction() - def check_config(self, *, check_write, db, cur): + def check_config(self, *, check_write, db=None, cur=None): """Check that the storage is configured and ready to go.""" if not self.objstorage.check_config(check_write=check_write): @@ -150,7 +154,7 @@ if d['length'] < -1: raise ValueError('Content length must be positive or -1.') - def _filter_new_content(self, content, db, cur): + def _filter_new_content(self, content, db=None, cur=None): """Sort contents into buckets 'with data' and 'without data', and filter out those already in the database.""" content_by_status = defaultdict(list) @@ -226,8 +230,11 @@ # move metadata in place db.skipped_content_add_from_temp(cur) + @remote_api_endpoint('content/add') + @timed + @process_metrics @db_transaction() - def content_add(self, content, db, cur): + def content_add(self, content, db=None, cur=None): """Add content blobs to the storage Note: in case of DB errors, objects might have already been added to @@ -320,6 +327,8 @@ summary['content:add:bytes'] = content_bytes_added return summary + @remote_api_endpoint('content/update') + @timed @db_transaction() def content_update(self, content, keys=[], db=None, cur=None): """Update content blobs to the storage. Does nothing for unknown @@ -354,8 +363,11 @@ db.content_update_from_temp(keys_to_update=keys, cur=cur) + @remote_api_endpoint('content/add_metadata') + @timed + @process_metrics @db_transaction() - def content_add_metadata(self, content, db, cur): + def content_add_metadata(self, content, db=None, cur=None): """Add content metadata to the storage (like `content_add`, but without inserting to the objstorage). @@ -399,6 +411,8 @@ return summary + @remote_api_endpoint('content/data') + @timed def content_get(self, content): """Retrieve in bulk contents and their data. @@ -436,6 +450,8 @@ yield {'sha1': obj_id, 'data': data} + @remote_api_endpoint('content/range') + @timed @db_transaction() def content_get_range(self, start, end, limit=1000, db=None, cur=None): """Retrieve contents within range [start, end] bound by limit. @@ -475,6 +491,8 @@ 'next': next_content, } + @remote_api_endpoint('content/metadata') + @timed @db_transaction_generator(statement_timeout=500) def content_get_metadata(self, content, db=None, cur=None): """Retrieve content metadata in bulk @@ -488,6 +506,8 @@ for metadata in db.content_get_metadata_from_sha1s(content, cur): yield dict(zip(db.content_get_metadata_keys, metadata)) + @remote_api_endpoint('content/missing') + @timed @db_transaction_generator() def content_missing(self, content, key_hash='sha1', db=None, cur=None): """List content missing from storage @@ -523,6 +543,8 @@ for obj in db.content_missing_from_list(content, cur): yield obj[key_hash_idx] + @remote_api_endpoint('content/missing/sha1') + @timed @db_transaction_generator() def content_missing_per_sha1(self, contents, db=None, cur=None): """List content missing from storage based only on sha1. @@ -540,6 +562,8 @@ for obj in db.content_missing_per_sha1(contents, cur): yield obj[0] + @remote_api_endpoint('content/skipped/missing') + @timed @db_transaction_generator() def skipped_content_missing(self, contents, db=None, cur=None): """List skipped_content missing from storage @@ -555,6 +579,8 @@ for content in db.skipped_content_missing(contents, cur): yield dict(zip(db.content_hash_keys, content)) + @remote_api_endpoint('content/present') + @timed @db_transaction() def content_find(self, content, db=None, cur=None): """Find a content hash in db. @@ -585,8 +611,22 @@ return [dict(zip(db.content_find_cols, content)) for content in contents] + @remote_api_endpoint('content/get_random') + @timed + @db_transaction() + def content_get_random(self, db=None, cur=None): + """Finds a random content id. + + Returns: + a sha1_git + """ + return db.content_get_random(cur) + + @remote_api_endpoint('directory/add') + @timed + @process_metrics @db_transaction() - def directory_add(self, directories, db, cur): + def directory_add(self, directories, db=None, cur=None): """Add directories to the storage Args: @@ -670,6 +710,8 @@ return summary + @remote_api_endpoint('directory/missing') + @timed @db_transaction_generator() def directory_missing(self, directories, db=None, cur=None): """List directories missing from storage @@ -684,6 +726,8 @@ for obj in db.directory_missing_from_list(directories, cur): yield obj[0] + @remote_api_endpoint('directory/ls') + @timed @db_transaction_generator(statement_timeout=20000) def directory_ls(self, directory, recursive=False, db=None, cur=None): """Get entries for one directory. @@ -707,6 +751,8 @@ for line in res_gen: yield dict(zip(db.directory_ls_cols, line)) + @remote_api_endpoint('directory/path') + @timed @db_transaction(statement_timeout=2000) def directory_entry_get_by_path(self, directory, paths, db=None, cur=None): """Get the directory entry (either file or dir) from directory with path. @@ -724,8 +770,22 @@ if res: return dict(zip(db.directory_ls_cols, res)) + @remote_api_endpoint('directory/get_random') + @timed + @db_transaction() + def directory_get_random(self, db=None, cur=None): + """Finds a random directory id. + + Returns: + a sha1_git + """ + return db.directory_get_random(cur) + + @remote_api_endpoint('revision/add') + @timed + @process_metrics @db_transaction() - def revision_add(self, revisions, db, cur): + def revision_add(self, revisions, db=None, cur=None): """Add revisions to the storage Args: @@ -796,6 +856,8 @@ return {'revision:add': len(revisions_missing)} + @remote_api_endpoint('revision/missing') + @timed @db_transaction_generator() def revision_missing(self, revisions, db=None, cur=None): """List revisions missing from storage @@ -813,6 +875,8 @@ for obj in db.revision_missing_from_list(revisions, cur): yield obj[0] + @remote_api_endpoint('revision') + @timed @db_transaction_generator(statement_timeout=1000) def revision_get(self, revisions, db=None, cur=None): """Get all revisions from storage @@ -834,6 +898,8 @@ continue yield data + @remote_api_endpoint('revision/log') + @timed @db_transaction_generator(statement_timeout=2000) def revision_log(self, revisions, limit=None, db=None, cur=None): """Fetch revision entry from the given root revisions. @@ -855,6 +921,8 @@ continue yield data + @remote_api_endpoint('revision/shortlog') + @timed @db_transaction_generator(statement_timeout=2000) def revision_shortlog(self, revisions, limit=None, db=None, cur=None): """Fetch the shortlog for the given revisions @@ -870,8 +938,22 @@ yield from db.revision_shortlog(revisions, limit, cur) + @remote_api_endpoint('revision/get_random') + @timed + @db_transaction() + def revision_get_random(self, db=None, cur=None): + """Finds a random revision id. + + Returns: + a sha1_git + """ + return db.revision_get_random(cur) + + @remote_api_endpoint('release/add') + @timed + @process_metrics @db_transaction() - def release_add(self, releases, db, cur): + def release_add(self, releases, db=None, cur=None): """Add releases to the storage Args: @@ -927,6 +1009,8 @@ return {'release:add': len(releases_missing)} + @remote_api_endpoint('release/missing') + @timed @db_transaction_generator() def release_missing(self, releases, db=None, cur=None): """List releases missing from storage @@ -944,6 +1028,8 @@ for obj in db.release_missing_from_list(releases, cur): yield obj[0] + @remote_api_endpoint('release') + @timed @db_transaction_generator(statement_timeout=500) def release_get(self, releases, db=None, cur=None): """Given a list of sha1, return the releases's information @@ -962,6 +1048,20 @@ ) yield data if data['target_type'] else None + @remote_api_endpoint('release/get_random') + @timed + @db_transaction() + def release_get_random(self, db=None, cur=None): + """Finds a random release id. + + Returns: + a sha1_git + """ + return db.release_get_random(cur) + + @remote_api_endpoint('snapshot/add') + @timed + @process_metrics @db_transaction() def snapshot_add(self, snapshots, db=None, cur=None): """Add snapshots to the storage. @@ -1025,6 +1125,8 @@ return {'snapshot:add': count} + @remote_api_endpoint('snapshot') + @timed @db_transaction(statement_timeout=2000) def snapshot_get(self, snapshot_id, db=None, cur=None): """Get the content, possibly partial, of a snapshot with the given id @@ -1051,6 +1153,8 @@ return self.snapshot_get_branches(snapshot_id, db=db, cur=cur) + @remote_api_endpoint('snapshot/by_origin_visit') + @timed @db_transaction(statement_timeout=2000) def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None): """Get the content, possibly partial, of a snapshot for the given origin visit @@ -1084,6 +1188,8 @@ return None + @remote_api_endpoint('snapshot/latest') + @timed @db_transaction(statement_timeout=4000) def snapshot_get_latest(self, origin, allowed_statuses=None, db=None, cur=None): @@ -1131,6 +1237,8 @@ 'last origin visit references an unknown snapshot') return snapshot + @remote_api_endpoint('snapshot/count_branches') + @timed @db_transaction(statement_timeout=2000) def snapshot_count_branches(self, snapshot_id, db=None, cur=None): """Count the number of branches in the snapshot with the given id @@ -1145,6 +1253,8 @@ return dict([bc for bc in db.snapshot_count_branches(snapshot_id, cur)]) + @remote_api_endpoint('snapshot/get_branches') + @timed @db_transaction(statement_timeout=2000) def snapshot_get_branches(self, snapshot_id, branches_from=b'', branches_count=1000, target_types=None, @@ -1210,6 +1320,19 @@ return None + @remote_api_endpoint('snapshot/get_random') + @timed + @db_transaction() + def snapshot_get_random(self, db=None, cur=None): + """Finds a random snapshot id. + + Returns: + a sha1_git + """ + return db.snapshot_get_random(cur) + + @remote_api_endpoint('origin/visit/add') + @timed @db_transaction() def origin_visit_add(self, origin, date, type, db=None, cur=None): @@ -1243,11 +1366,14 @@ 'visit': visit_id, 'status': 'ongoing', 'metadata': None, 'snapshot': None}) + send_metric('origin_visit:add', count=1, method_name='origin_visit') return { 'origin': origin_url, 'visit': visit_id, } + @remote_api_endpoint('origin/visit/update') + @timed @db_transaction() def origin_visit_update(self, origin, visit_id, status=None, metadata=None, snapshot=None, @@ -1291,6 +1417,8 @@ db.origin_visit_update(origin_url, visit_id, updates, cur) + @remote_api_endpoint('origin/visit/upsert') + @timed @db_transaction() def origin_visit_upsert(self, visits, db=None, cur=None): """Add a origin_visits with a specific id and with all its data. @@ -1324,6 +1452,8 @@ # TODO: upsert them all in a single query db.origin_visit_upsert(**visit, cur=cur) + @remote_api_endpoint('origin/visit/get') + @timed @db_transaction_generator(statement_timeout=500) def origin_visit_get(self, origin, last_visit=None, limit=None, db=None, cur=None): @@ -1345,6 +1475,8 @@ data = dict(zip(db.origin_visit_get_cols, line)) yield data + @remote_api_endpoint('origin/visit/find_by_date') + @timed @db_transaction(statement_timeout=500) def origin_visit_find_by_date(self, origin, visit_date, db=None, cur=None): """Retrieves the origin visit whose date is closest to the provided @@ -1363,6 +1495,8 @@ if line: return dict(zip(db.origin_visit_get_cols, line)) + @remote_api_endpoint('origin/visit/getby') + @timed @db_transaction(statement_timeout=500) def origin_visit_get_by(self, origin, visit, db=None, cur=None): """Retrieve origin visit's information. @@ -1381,6 +1515,8 @@ return dict(zip(db.origin_visit_get_cols, ori_visit)) + @remote_api_endpoint('origin/visit/get_latest') + @timed @db_transaction(statement_timeout=4000) def origin_visit_get_latest( self, origin, allowed_statuses=None, require_snapshot=False, @@ -1415,6 +1551,27 @@ if origin_visit: return dict(zip(db.origin_visit_get_cols, origin_visit)) + @remote_api_endpoint('origin/visit/get_random') + @timed + @db_transaction() + def origin_visit_get_random( + self, type: str, db=None, cur=None) -> Mapping[str, Any]: + """Randomly select one successful origin visit with + made in the last 3 months. + + Returns: + dict representing an origin visit, in the same format as + :py:meth:`origin_visit_get`. + + """ + data: Dict[str, Any] = {} + result = db.origin_visit_get_random(type, cur) + if result: + data = dict(zip(db.origin_visit_get_cols, result)) + return data + + @remote_api_endpoint('object/find_by_sha1_git') + @timed @db_transaction(statement_timeout=2000) def object_find_by_sha1_git(self, ids, db=None, cur=None): """Return the objects found with the given ids. @@ -1441,6 +1598,8 @@ return ret + @remote_api_endpoint('origin/get') + @timed @db_transaction(statement_timeout=500) def origin_get(self, origins, db=None, cur=None): """Return origins, either all identified by their ids or all @@ -1489,6 +1648,8 @@ else: return [None if res['url'] is None else res for res in results] + @remote_api_endpoint('origin/get_sha1') + @timed @db_transaction_generator(statement_timeout=500) def origin_get_by_sha1(self, sha1s, db=None, cur=None): """Return origins, identified by the sha1 of their URLs. @@ -1508,21 +1669,8 @@ else: yield None - @db_transaction() - def origin_visit_get_random( - self, type, db=None, cur=None) -> Mapping[str, Any]: - """Randomly select one origin from the archive - - Returns: - origin dict selected randomly on the dataset if found - - """ - data: Dict[str, Any] = {} - result = db.origin_visit_get_random(type, cur) - if result: - data = dict(zip(db.origin_visit_get_cols, result)) - return data - + @remote_api_endpoint('origin/get_range') + @timed @db_transaction_generator() def origin_get_range(self, origin_from=1, origin_count=100, db=None, cur=None): @@ -1542,6 +1690,8 @@ for origin in db.origin_get_range(origin_from, origin_count, cur): yield dict(zip(db.origin_get_range_cols, origin)) + @remote_api_endpoint('origin/search') + @timed @db_transaction_generator() def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False, db=None, cur=None): @@ -1566,6 +1716,8 @@ regexp, with_visit, cur): yield dict(zip(db.origin_cols, origin)) + @remote_api_endpoint('origin/count') + @timed @db_transaction() def origin_count(self, url_pattern, regexp=False, with_visit=False, db=None, cur=None): @@ -1585,6 +1737,8 @@ """ return db.origin_count(url_pattern, regexp, with_visit, cur) + @remote_api_endpoint('origin/add_multi') + @timed @db_transaction() def origin_add(self, origins, db=None, cur=None): """Add origins to the storage @@ -1604,8 +1758,11 @@ for origin in origins: self.origin_add_one(origin, db=db, cur=cur) + send_metric('origin:add', count=len(origins), method_name='origin_add') return origins + @remote_api_endpoint('origin/add') + @timed @db_transaction() def origin_add_one(self, origin, db=None, cur=None): """Add origin to the storage @@ -1630,7 +1787,9 @@ if self.journal_writer: self.journal_writer.write_addition('origin', origin) - return db.origin_add(origin['url'], cur) + origins = db.origin_add(origin['url'], cur) + send_metric('origin:add', count=len(origins), method_name='origin_add') + return origins @db_transaction(statement_timeout=500) def stat_counters(self, db=None, cur=None): @@ -1664,6 +1823,8 @@ for key in keys: cur.execute('select * from swh_update_counter(%s)', (key,)) + @remote_api_endpoint('origin/metadata/add') + @timed @db_transaction() def origin_metadata_add(self, origin_url, ts, provider, tool, metadata, db=None, cur=None): @@ -1682,7 +1843,11 @@ db.origin_metadata_add(origin_url, ts, provider, tool, metadata, cur) + send_metric( + 'origin_metadata:add', count=1, method_name='origin_metadata_add') + @remote_api_endpoint('origin/metadata/get') + @timed @db_transaction_generator(statement_timeout=500) def origin_metadata_get_by(self, origin_url, provider_type=None, db=None, cur=None): @@ -1708,6 +1873,8 @@ for line in db.origin_metadata_get_by(origin_url, provider_type, cur): yield dict(zip(db.origin_metadata_get_cols, line)) + @remote_api_endpoint('tool/add') + @timed @db_transaction() def tool_add(self, tools, db=None, cur=None): """Add new tools to the storage. @@ -1733,8 +1900,12 @@ cur) tools = db.tool_add_from_temp(cur) - return [dict(zip(db.tool_cols, line)) for line in tools] + results = [dict(zip(db.tool_cols, line)) for line in tools] + send_metric('tool:add', count=len(results), method_name='tool_add') + return results + @remote_api_endpoint('tool/data') + @timed @db_transaction(statement_timeout=500) def tool_get(self, tool, db=None, cur=None): """Retrieve tool information. @@ -1759,6 +1930,8 @@ return None return dict(zip(db.tool_cols, idx)) + @remote_api_endpoint('provider/add') + @timed @db_transaction() def metadata_provider_add(self, provider_name, provider_type, provider_url, metadata, db=None, cur=None): @@ -1773,9 +1946,14 @@ Returns: int: an identifier of the provider """ - return db.metadata_provider_add(provider_name, provider_type, - provider_url, metadata, cur) - + result = db.metadata_provider_add(provider_name, provider_type, + provider_url, metadata, cur) + send_metric( + 'metadata_provider:add', count=1, method_name='metadata_provider') + return result + + @remote_api_endpoint('provider/get') + @timed @db_transaction() def metadata_provider_get(self, provider_id, db=None, cur=None): """Get a metadata provider @@ -1792,6 +1970,8 @@ return None return dict(zip(db.metadata_provider_cols, result)) + @remote_api_endpoint('provider/getby') + @timed @db_transaction() def metadata_provider_get_by(self, provider, db=None, cur=None): """Get a metadata provider @@ -1811,6 +1991,8 @@ return None return dict(zip(db.metadata_provider_cols, result)) + @remote_api_endpoint('algos/diff_directories') + @timed def diff_directories(self, from_dir, to_dir, track_renaming=False): """Compute the list of file changes introduced between two arbitrary directories (insertion / deletion / modification / renaming of files). @@ -1827,6 +2009,8 @@ """ return diff.diff_directories(self, from_dir, to_dir, track_renaming) + @remote_api_endpoint('algos/diff_revisions') + @timed def diff_revisions(self, from_rev, to_rev, track_renaming=False): """Compute the list of file changes introduced between two arbitrary revisions (insertion / deletion / modification / renaming of files). @@ -1843,6 +2027,8 @@ """ return diff.diff_revisions(self, from_rev, to_rev, track_renaming) + @remote_api_endpoint('algos/diff_revision') + @timed def diff_revision(self, revision, track_renaming=False): """Compute the list of file changes introduced by a specific revision (insertion / deletion / modification / renaming of files) by comparing diff --git a/swh/storage/tests/test_metrics.py b/swh/storage/tests/test_metrics.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_metrics.py @@ -0,0 +1,51 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from unittest.mock import patch + +from swh.storage.metrics import ( + send_metric, OPERATIONS_METRIC, OPERATIONS_UNIT_METRIC +) + + +def test_send_metric_unknown_unit(): + r = send_metric('content', count=10, method_name='content_add') + assert r is False + r = send_metric('sthg:add:bytes:extra', count=10, method_name='sthg_add') + assert r is False + + +def test_send_metric_no_value(): + r = send_metric('content:add', count=0, method_name='content_add') + assert r is False + + +@patch('swh.storage.metrics.statsd.increment') +def test_send_metric_no_unit(mock_statsd): + r = send_metric('content:add', count=10, method_name='content_add') + + mock_statsd.assert_called_with(OPERATIONS_METRIC, 10, tags={ + 'endpoint': 'content_add', + 'object_type': 'content', + 'operation': 'add', + }) + + assert r + + +@patch('swh.storage.metrics.statsd.increment') +def test_send_metric_unit(mock_statsd): + unit_ = 'bytes' + r = send_metric('c:add:%s' % unit_, count=100, method_name='c_add') + + expected_metric = OPERATIONS_UNIT_METRIC.format(unit=unit_) + mock_statsd.assert_called_with( + expected_metric, 100, tags={ + 'endpoint': 'c_add', + 'object_type': 'c', + 'operation': 'add', + }) + + assert r diff --git a/swh/storage/tests/test_server.py b/swh/storage/tests/test_server.py --- a/swh/storage/tests/test_server.py +++ b/swh/storage/tests/test_server.py @@ -7,12 +7,7 @@ import pytest import yaml -from unittest.mock import patch - -from swh.storage.api.server import ( - load_and_check_config, send_metric, - OPERATIONS_METRIC, OPERATIONS_UNIT_METRIC -) +from swh.storage.api.server import load_and_check_config def prepare_config_file(tmpdir, content, name='config.yml'): @@ -132,44 +127,3 @@ cfg = load_and_check_config(config_path, type='any') assert cfg == config - - -def test_send_metric_unknown_unit(): - r = send_metric('content', count=10, method_name='content_add') - assert r is False - r = send_metric('sthg:add:bytes:extra', count=10, method_name='sthg_add') - assert r is False - - -def test_send_metric_no_value(): - r = send_metric('content:add', count=0, method_name='content_add') - assert r is False - - -@patch('swh.storage.api.server.statsd.increment') -def test_send_metric_no_unit(mock_statsd): - r = send_metric('content:add', count=10, method_name='content_add') - - mock_statsd.assert_called_with(OPERATIONS_METRIC, 10, tags={ - 'endpoint': 'content_add', - 'object_type': 'content', - 'operation': 'add', - }) - - assert r - - -@patch('swh.storage.api.server.statsd.increment') -def test_send_metric_unit(mock_statsd): - unit_ = 'bytes' - r = send_metric('c:add:%s' % unit_, count=100, method_name='c_add') - - expected_metric = OPERATIONS_UNIT_METRIC.format(unit=unit_) - mock_statsd.assert_called_with( - expected_metric, 100, tags={ - 'endpoint': 'c_add', - 'object_type': 'c', - 'operation': 'add', - }) - - assert r diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -389,6 +389,13 @@ assert list(gen) == [missing_cont] + def test_content_get_random(self, swh_storage): + swh_storage.content_add([data.cont, data.cont2, data.cont3]) + + assert swh_storage.content_get_random() in { + data.cont['sha1_git'], data.cont2['sha1_git'], + data.cont3['sha1_git']} + def test_directory_add(self, swh_storage): init_missing = list(swh_storage.directory_missing([data.dir['id']])) assert [data.dir['id']] == init_missing @@ -579,6 +586,12 @@ [entry['name']]) assert actual_entry is None + def test_directory_get_random(self, swh_storage): + swh_storage.directory_add([data.dir, data.dir2, data.dir3]) + + assert swh_storage.directory_get_random() in \ + {data.dir['id'], data.dir2['id'], data.dir3['id']} + def test_revision_add(self, swh_storage): init_missing = swh_storage.revision_missing([data.revision['id']]) assert list(init_missing) == [data.revision['id']] @@ -758,6 +771,13 @@ assert len(get) == 1 assert get[0]['parents'] == [] # no parents on this one + def test_revision_get_random(self, swh_storage): + swh_storage.revision_add( + [data.revision, data.revision2, data.revision3]) + + assert swh_storage.revision_get_random() in \ + {data.revision['id'], data.revision2['id'], data.revision3['id']} + def test_release_add(self, swh_storage): init_missing = swh_storage.release_missing([data.release['id'], data.release2['id']]) @@ -868,6 +888,12 @@ assert unknown_releases[0] is None + def test_release_get_random(self, swh_storage): + swh_storage.release_add([data.release, data.release2, data.release3]) + + assert swh_storage.release_get_random() in \ + {data.release['id'], data.release2['id'], data.release3['id']} + def test_origin_add_one(self, swh_storage): origin0 = swh_storage.origin_get(data.origin) assert origin0 is None @@ -2362,6 +2388,14 @@ assert{**data.snapshot, 'next_branch': None} \ == swh_storage.snapshot_get_latest(origin_url) + def test_snapshot_get_random(self, swh_storage): + swh_storage.snapshot_add( + [data.snapshot, data.empty_snapshot, data.complete_snapshot]) + + assert swh_storage.snapshot_get_random() in { + data.snapshot['id'], data.empty_snapshot['id'], + data.complete_snapshot['id']} + def test_stat_counters(self, swh_storage): expected_keys = ['content', 'directory', 'origin', 'revision']