diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -34,12 +34,54 @@ return d +def encode(f): + @wraps(f) + def d(*a, **kw): + r = f(*a, **kw) + return encode_data(r) + + return d + + +def increment(f): + """Increment object counters for the decorated function. + + """ + @wraps(f) + def d(*a, **kw): + r = f(*a, **kw) + for key, value in r.items(): + metric_type = key.split(':') + _length = len(metric_type) + if _length == 2: + object_type, operation = metric_type + metric_name = 'swh_storage_%s_%s' % ( + object_type, operation) + elif _length == 3: + object_type, operation, unit = metric_type + metric_name = 'swh_storage_%s_%s_%s' % ( + object_type, operation, unit) + else: + logging.warn('Unknown metric {%s: %s}, skipping' % ( + key, value)) + continue + + statsd.increment( + metric_name, value, tags={ + 'endpoint': f.__name__, + 'object_type': object_type, + 'operation': operation, + }) + return r + + return d + + @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) -@timed def get_storage(): global storage if not storage: @@ -90,9 +132,11 @@ @app.route('/content/add', methods=['POST']) +@encode @timed +@increment def content_add(): - return encode_data(get_storage().content_add(**decode_request(request))) + return get_storage().content_add(**decode_request(request)) @app.route('/content/update', methods=['POST']) @@ -129,9 +173,11 @@ @app.route('/directory/add', methods=['POST']) +@encode @timed +@increment def directory_add(): - return encode_data(get_storage().directory_add(**decode_request(request))) + return get_storage().directory_add(**decode_request(request)) @app.route('/directory/path', methods=['POST']) @@ -149,9 +195,11 @@ @app.route('/revision/add', methods=['POST']) +@encode @timed +@increment def revision_add(): - return encode_data(get_storage().revision_add(**decode_request(request))) + return get_storage().revision_add(**decode_request(request)) @app.route('/revision', methods=['POST']) @@ -181,9 +229,11 @@ @app.route('/release/add', methods=['POST']) +@encode @timed +@increment def release_add(): - return encode_data(get_storage().release_add(**decode_request(request))) + return get_storage().release_add(**decode_request(request)) @app.route('/release', methods=['POST']) @@ -207,9 +257,11 @@ @app.route('/snapshot/add', methods=['POST']) +@encode @timed +@increment def snapshot_add(): - return encode_data(get_storage().snapshot_add(**decode_request(request))) + return get_storage().snapshot_add(**decode_request(request)) @app.route('/snapshot', methods=['POST']) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -77,6 +77,16 @@ - origin (int): if status = absent, the origin we saw the content in + Raises: + HashCollision in case of collision + + Returns: + Summary dict with the following key and associated values: + + content:add: New contents added + content_bytes:add: Sum of the contents' length data + skipped_content:add: New skipped contents (no data) added + """ if self.journal_writer: for content in contents: @@ -84,6 +94,11 @@ content = content.copy() del content['data'] self.journal_writer.write_addition('content', content) + + count_contents = 0 + count_content_added = 0 + count_content_bytes_added = 0 + for content in contents: key = self._content_key(content) if key in self._contents: @@ -98,11 +113,20 @@ ('content', content['sha1'])) self._contents[key] = copy.deepcopy(content) self._contents[key]['ctime'] = now() - bisect.insort(self._sorted_sha1s, content['sha1']) + bisect.insort(self._sorted_sha1s, content['sha1't]) + count_contents += 1 if self._contents[key]['status'] == 'visible': + count_content_added += 1 content_data = self._contents[key].pop('data') + count_content_bytes_added += len(content_data) self.objstorage.add(content_data, content['sha1']) + return { + 'content:add': count_content_added, + 'content:bytes:add': count_content_bytes_added, + 'skipped_content:add': count_contents - count_content_added, + } + def content_get(self, ids): """Retrieve in bulk contents and their data. @@ -294,16 +318,25 @@ - target (sha1_git): id of the object pointed at by the directory entry - perms (int): entry permissions + Returns: + Summary dict of keys with associated count as values: + + directory:add: Number of directories actually added + """ if self.journal_writer: self.journal_writer.write_additions('directory', directories) + count = 0 for directory in directories: if directory['id'] not in self._directories: + count += 1 self._directories[directory['id']] = copy.deepcopy(directory) self._objects[directory['id']].append( ('directory', directory['id'])) + return {'directory:add': count} + def directory_missing(self, directory_ids): """List directories missing from storage @@ -427,10 +460,17 @@ this revision date dictionaries have the form defined in :mod:`swh.model`. + + Returns: + Summary dict of keys with associated count as values + + revision_added: New objects actually stored in db + """ if self.journal_writer: self.journal_writer.write_additions('revision', revisions) + count = 0 for revision in revisions: if revision['id'] not in self._revisions: self._revisions[revision['id']] = rev = copy.deepcopy(revision) @@ -441,6 +481,9 @@ rev.get('committer_date')) self._objects[revision['id']].append( ('revision', revision['id'])) + count += 1 + + return {'revision:add': count} def revision_missing(self, revision_ids): """List revisions missing from storage @@ -518,17 +561,28 @@ keys: name, fullname, email the date dictionary has the form defined in :mod:`swh.model`. + + Returns: + Summary dict of keys with associated count as values + + release:add: New objects contents actually stored in db + """ if self.journal_writer: self.journal_writer.write_additions('release', releases) + count = 0 for rel in releases: - rel = copy.deepcopy(rel) - rel['date'] = normalize_timestamp(rel['date']) - self._person_add(rel['author']) - self._objects[rel['id']].append( - ('release', rel['id'])) - self._releases[rel['id']] = rel + if rel['id'] not in self._releases: + rel = copy.deepcopy(rel) + rel['date'] = normalize_timestamp(rel['date']) + self._person_add(rel['author']) + self._objects[rel['id']].append( + ('release', rel['id'])) + self._releases[rel['id']] = rel + count += 1 + + return {'release:add': count} def release_missing(self, releases): """List releases missing from storage @@ -578,6 +632,12 @@ Raises: ValueError: if the origin's or visit's identifier does not exist. + + Returns: + Summary dict of keys with associated count as values + + snapshot_added: Count of object actually stored in db + """ if legacy_arg1: assert legacy_arg2 @@ -586,11 +646,13 @@ else: origin = visit = None + count = 0 snapshot_id = snapshot['id'] if self.journal_writer: self.journal_writer.write_addition( 'snapshot', snapshot) if snapshot_id not in self._snapshots: + count += 1 self._snapshots[snapshot_id] = { 'id': snapshot_id, 'branches': copy.deepcopy(snapshot['branches']), @@ -601,6 +663,8 @@ if origin: self.origin_visit_update(origin, visit, snapshot=snapshot_id) + return {'snapshot:add': count} + def snapshot_get(self, snapshot_id): """Get the content, possibly partial, of a snapshot with the given id diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -114,7 +114,29 @@ - origin (int): if status = absent, the origin we saw the content in + Raises: + + In case of errors, nothing is stored in the db (in the + objstorage, it could though). The following exceptions can + occur: + + - HashCollision in case of collision + - Any other exceptions raise by the db + + Returns: + Summary dict with the following key and associated values: + + content:add: New contents added + content:bytes:add: Sum of the contents' length data + skipped_content:add: New skipped contents (no data) added + """ + summary = { + 'content:add': 0, + 'content:bytes:add': 0, + 'skipped_content:add': 0, + } + if self.journal_writer: for item in content: if 'data' in item: @@ -137,7 +159,8 @@ for d in content: if 'status' not in d: d['status'] = 'visible' - if 'length' not in d: + length = d.get('length') + if length is None: d['length'] = -1 content_by_status[d['status']].append(d) @@ -150,12 +173,29 @@ content_without_data)) def add_to_objstorage(): - data = { - cont['sha1']: cont['data'] - for cont in content_with_data - if cont['sha1'] in missing_content - } + """Add to objstorage the new missing_content + + Returns: + Sum of all the content's data length pushed to the + objstorage. No filtering is done on contents here, so + we might send over multiple times the same content and + count as many times the contents' raw length bytes. + + """ + content_bytes_added = 0 + data = {} + for cont in content_with_data: + sha1 = cont['sha1'] + seen = data.get(sha1) + if sha1 in missing_content and not seen: + data[sha1] = cont['data'] + content_bytes_added += cont['length'] + + # FIXME: Since we do the filtering anyway now, we might as + # well make the objstorage's add_batch call return what we + # want here (real bytes added)... that'd simplify this... self.objstorage.add_batch(data) + return content_bytes_added with db.transaction() as cur: with ThreadPoolExecutor(max_workers=1) as executor: @@ -188,6 +228,8 @@ else: raise + summary['content:add'] = len(missing_content) + if missing_skipped: missing_filtered = ( cont for cont in content_without_data @@ -200,10 +242,14 @@ # move metadata in place db.skipped_content_add_from_temp(cur) + summary['skipped_content:add'] = len(missing_skipped) # Wait for objstorage addition before returning from the # transaction, bubbling up any exception - added_to_objstorage.result() + content_bytes_added = added_to_objstorage.result() + + summary['content:bytes:add'] = content_bytes_added + return summary @db_transaction() def content_update(self, content, keys=[], db=None, cur=None): @@ -449,7 +495,14 @@ - target (sha1_git): id of the object pointed at by the directory entry - perms (int): entry permissions + + Returns: + Summary dict of keys with associated count as values: + + directory:add: Number of directories actually added + """ + summary = {'directory:add': 0} if self.journal_writer: self.journal_writer.write_additions('directory', directories) @@ -470,7 +523,7 @@ dirs_missing = set(self.directory_missing(dirs)) if not dirs_missing: - return + return summary db = self.get_db() with db.transaction() as cur: @@ -498,6 +551,9 @@ # Do the final copy db.directory_add_from_temp(cur) + summary['directory:add'] = len(dirs_missing) + + return summary @db_transaction_generator() def directory_missing(self, directories, db=None, cur=None): @@ -583,7 +639,15 @@ this revision date dictionaries have the form defined in :mod:`swh.model`. + + Returns: + Summary dict of keys with associated count as values + + revision:add: New objects actually stored in db + """ + summary = {'revision:add': 0} + if self.journal_writer: self.journal_writer.write_additions('revision', revisions) @@ -593,7 +657,7 @@ set(revision['id'] for revision in revisions))) if not revisions_missing: - return + return summary with db.transaction() as cur: db.mktemp_revision(cur) @@ -614,6 +678,8 @@ db.copy_to(parents_filtered, 'revision_history', ['id', 'parent_id', 'parent_rank'], cur) + return {'revision:add': len(revisions_missing)} + @db_transaction_generator() def revision_missing(self, revisions, db=None, cur=None): """List revisions missing from storage @@ -707,7 +773,15 @@ keys: name, fullname, email the date dictionary has the form defined in :mod:`swh.model`. + + Returns: + Summary dict of keys with associated count as values + + release:add: New objects contents actually stored in db + """ + summary = {'release:add': 0} + if self.journal_writer: self.journal_writer.write_additions('release', releases) @@ -717,7 +791,7 @@ releases_missing = set(self.release_missing(release_ids)) if not releases_missing: - return + return summary with db.transaction() as cur: db.mktemp_release(cur) @@ -732,6 +806,8 @@ db.release_add_from_temp(cur) + return {'release:add': len(releases_missing)} + @db_transaction_generator() def release_missing(self, releases, db=None, cur=None): """List releases missing from storage @@ -793,6 +869,13 @@ Raises: ValueError: if the origin or visit id does not exist. + + Returns: + + Summary dict of keys with associated count as values + + snapshot:add: Count of object actually stored in db + """ if origin: if not visit: @@ -811,7 +894,9 @@ # Called by new code that uses the new api/client.py origin_id = visit_id = None + count = 0 if not db.snapshot_exists(snapshot['id'], cur): + count += 1 db.mktemp_snapshot_branch(cur) db.copy_to( ( @@ -837,6 +922,8 @@ origin_id, visit_id, snapshot=snapshot['id'], db=db, cur=cur) + return {'snapshot:add': count} + @db_transaction(statement_timeout=2000) def snapshot_get(self, snapshot_id, db=None, cur=None): """Get the content, possibly partial, of a snapshot with the given id diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -552,7 +552,13 @@ def test_content_add(self): cont = self.cont - self.storage.content_add([cont]) + actual_result = self.storage.content_add([cont]) + self.assertEqual(actual_result, { + 'content:add': 1, + 'content:bytes:add': cont['length'], + 'skipped_content:add': 0 + }) + self.assertEqual(list(self.storage.content_get([cont['sha1']])), [{'sha1': cont['sha1'], 'data': cont['data']}]) @@ -561,10 +567,38 @@ self.assertEqual(list(self.journal_writer.objects), [('content', expected_cont)]) + def test_content_add_same_input(self): + cont = self.cont + + actual_result = self.storage.content_add([cont, cont]) + self.assertEqual(actual_result, { + 'content:add': 1, + 'content:bytes:add': cont['length'], + 'skipped_content:add': 0 + }) + + def test_content_add_different_input(self): + cont = self.cont + cont2 = self.cont2 + + actual_result = self.storage.content_add([cont, cont2]) + self.assertEqual(actual_result, { + 'content:add': 2, + 'content:bytes:add': cont['length'] + cont2['length'], + 'skipped_content:add': 0 + }) + def test_content_add_db(self): cont = self.cont - self.storage.content_add([cont]) + actual_result = self.storage.content_add([cont]) + + self.assertEqual(actual_result, { + 'content:add': 1, + 'content:bytes:add': cont['length'], + 'skipped_content:add': 0 + }) + if hasattr(self.storage, 'objstorage'): self.assertIn(cont['sha1'], self.storage.objstorage) self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' @@ -601,7 +635,13 @@ cont2 = self.skipped_cont2.copy() cont2['blake2s256'] = None - self.storage.content_add([cont, cont, cont2]) + actual_result = self.storage.content_add([cont, cont, cont2]) + + self.assertEqual(actual_result, { + 'content:add': 0, + 'content:bytes:add': 0, + 'skipped_content:add': 2, + }) self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, ' 'length, status, reason ' @@ -722,7 +762,9 @@ init_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([self.dir['id']], init_missing) - self.storage.directory_add([self.dir]) + actual_result = self.storage.directory_add([self.dir]) + self.assertEqual(actual_result, {'directory:add': 1}) + self.assertEqual(list(self.journal_writer.objects), [('directory', self.dir)]) @@ -737,7 +779,10 @@ init_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([self.dir['id']], init_missing) - self.storage.directory_add([self.dir, self.dir2, self.dir3]) + actual_result = self.storage.directory_add( + [self.dir, self.dir2, self.dir3]) + self.assertEqual(actual_result, {'directory:add': 3}) + self.assertEqual(list(self.journal_writer.objects), [('directory', self.dir), ('directory', self.dir2), @@ -765,7 +810,8 @@ init_missing = list(self.storage.directory_missing([self.dir3['id']])) self.assertEqual([self.dir3['id']], init_missing) - self.storage.directory_add([self.dir3]) + actual_result = self.storage.directory_add([self.dir3]) + self.assertEqual(actual_result, {'directory:add': 1}) expected_entries = [ { @@ -825,7 +871,8 @@ init_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([self.revision['id']], list(init_missing)) - self.storage.revision_add([self.revision]) + actual_result = self.storage.revision_add([self.revision]) + self.assertEqual(actual_result, {'revision:add': 1}) end_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([], list(end_missing)) @@ -833,6 +880,10 @@ self.assertEqual(list(self.journal_writer.objects), [('revision', self.revision)]) + # already there so nothing added + actual_result = self.storage.revision_add([self.revision]) + self.assertEqual(actual_result, {'revision:add': 0}) + def test_revision_log(self): # given # self.revision4 -is-child-of-> self.revision3 @@ -945,7 +996,8 @@ self.assertEqual([self.release['id'], self.release2['id']], list(init_missing)) - self.storage.release_add([self.release, self.release2]) + actual_result = self.storage.release_add([self.release, self.release2]) + self.assertEqual(actual_result, {'release:add': 2}) end_missing = self.storage.release_missing([self.release['id'], self.release2['id']]) @@ -955,6 +1007,10 @@ [('release', self.release), ('release', self.release2)]) + # already present so nothing added + actual_result = self.storage.release_add([self.release, self.release2]) + self.assertEqual(actual_result, {'release:add': 0}) + def test_release_get(self): # given self.storage.release_add([self.release, self.release2]) @@ -1432,7 +1488,9 @@ self.date_visit1) visit_id = origin_visit1['visit'] - self.storage.snapshot_add(self.empty_snapshot) + actual_result = self.storage.snapshot_add(self.empty_snapshot) + self.assertEqual(actual_result, {'snapshot:add': 1}) + self.storage.origin_visit_update( origin_id, visit_id, snapshot=self.empty_snapshot['id']) @@ -1510,7 +1568,9 @@ self.date_visit1) visit_id = origin_visit1['visit'] - self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) + actual_result = self.storage.snapshot_add( + origin_id, visit_id, self.complete_snapshot) + self.assertEqual(actual_result, {'snapshot:add': 1}) by_id = self.storage.snapshot_get(self.complete_snapshot['id']) self.assertEqual(by_id, self.complete_snapshot) @@ -1518,13 +1578,20 @@ by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) self.assertEqual(by_ov, self.complete_snapshot) + # already injected so no new snapshot added + actual_result = self.storage.snapshot_add( + origin_id, visit_id, self.complete_snapshot) + self.assertEqual(actual_result, {'snapshot:add': 0}) + def test_snapshot_add_count_branches(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] - self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) + actual_result = self.storage.snapshot_add( + origin_id, visit_id, self.complete_snapshot) + self.assertEqual(actual_result, {'snapshot:add': 1}) snp_id = self.complete_snapshot['id'] snp_size = self.storage.snapshot_count_branches(snp_id)