Page MenuHomeSoftware Heritage

D1321.id4254.diff
No OneTemporary

D1321.id4254.diff

diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py
--- a/swh/storage/api/server.py
+++ b/swh/storage/api/server.py
@@ -34,12 +34,54 @@
return d
+def encode(f):
+ @wraps(f)
+ def d(*a, **kw):
+ r = f(*a, **kw)
+ return encode_data(r)
+
+ return d
+
+
+def increment(f):
+ """Increment object counters for the decorated function.
+
+ """
+ @wraps(f)
+ def d(*a, **kw):
+ r = f(*a, **kw)
+ for key, value in r.items():
+ metric_type = key.split(':')
+ _length = len(metric_type)
+ if _length == 2:
+ object_type, operation = metric_type
+ metric_name = 'swh_storage_%s_%s' % (
+ object_type, operation)
+ elif _length == 3:
+ object_type, operation, unit = metric_type
+ metric_name = 'swh_storage_%s_%s_%s' % (
+ object_type, operation, unit)
+ else:
+ logging.warn('Unknown metric {%s: %s}, skipping' % (
+ key, value))
+ continue
+
+ statsd.increment(
+ metric_name, value, tags={
+ 'endpoint': f.__name__,
+ 'object_type': object_type,
+ 'operation': operation,
+ })
+ return r
+
+ return d
+
+
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
-@timed
def get_storage():
global storage
if not storage:
@@ -90,9 +132,11 @@
@app.route('/content/add', methods=['POST'])
+@encode
@timed
+@increment
def content_add():
- return encode_data(get_storage().content_add(**decode_request(request)))
+ return get_storage().content_add(**decode_request(request))
@app.route('/content/update', methods=['POST'])
@@ -129,9 +173,11 @@
@app.route('/directory/add', methods=['POST'])
+@encode
@timed
+@increment
def directory_add():
- return encode_data(get_storage().directory_add(**decode_request(request)))
+ return get_storage().directory_add(**decode_request(request))
@app.route('/directory/path', methods=['POST'])
@@ -149,9 +195,11 @@
@app.route('/revision/add', methods=['POST'])
+@encode
@timed
+@increment
def revision_add():
- return encode_data(get_storage().revision_add(**decode_request(request)))
+ return get_storage().revision_add(**decode_request(request))
@app.route('/revision', methods=['POST'])
@@ -181,9 +229,11 @@
@app.route('/release/add', methods=['POST'])
+@encode
@timed
+@increment
def release_add():
- return encode_data(get_storage().release_add(**decode_request(request)))
+ return get_storage().release_add(**decode_request(request))
@app.route('/release', methods=['POST'])
@@ -207,9 +257,11 @@
@app.route('/snapshot/add', methods=['POST'])
+@encode
@timed
+@increment
def snapshot_add():
- return encode_data(get_storage().snapshot_add(**decode_request(request)))
+ return get_storage().snapshot_add(**decode_request(request))
@app.route('/snapshot', methods=['POST'])
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -77,6 +77,16 @@
- origin (int): if status = absent, the origin we saw the
content in
+ Raises:
+ HashCollision in case of collision
+
+ Returns:
+ Summary dict with the following key and associated values:
+
+ content:add: New contents added
+ content_bytes:add: Sum of the contents' length data
+ skipped_content:add: New skipped contents (no data) added
+
"""
if self.journal_writer:
for content in contents:
@@ -84,6 +94,11 @@
content = content.copy()
del content['data']
self.journal_writer.write_addition('content', content)
+
+ count_contents = 0
+ count_content_added = 0
+ count_content_bytes_added = 0
+
for content in contents:
key = self._content_key(content)
if key in self._contents:
@@ -98,11 +113,20 @@
('content', content['sha1']))
self._contents[key] = copy.deepcopy(content)
self._contents[key]['ctime'] = now()
- bisect.insort(self._sorted_sha1s, content['sha1'])
+ bisect.insort(self._sorted_sha1s, content['sha1't])
+ count_contents += 1
if self._contents[key]['status'] == 'visible':
+ count_content_added += 1
content_data = self._contents[key].pop('data')
+ count_content_bytes_added += len(content_data)
self.objstorage.add(content_data, content['sha1'])
+ return {
+ 'content:add': count_content_added,
+ 'content:bytes:add': count_content_bytes_added,
+ 'skipped_content:add': count_contents - count_content_added,
+ }
+
def content_get(self, ids):
"""Retrieve in bulk contents and their data.
@@ -294,16 +318,25 @@
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
+ Returns:
+ Summary dict of keys with associated count as values:
+
+ directory:add: Number of directories actually added
+
"""
if self.journal_writer:
self.journal_writer.write_additions('directory', directories)
+ count = 0
for directory in directories:
if directory['id'] not in self._directories:
+ count += 1
self._directories[directory['id']] = copy.deepcopy(directory)
self._objects[directory['id']].append(
('directory', directory['id']))
+ return {'directory:add': count}
+
def directory_missing(self, directory_ids):
"""List directories missing from storage
@@ -427,10 +460,17 @@
this revision
date dictionaries have the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys with associated count as values
+
+ revision_added: New objects actually stored in db
+
"""
if self.journal_writer:
self.journal_writer.write_additions('revision', revisions)
+ count = 0
for revision in revisions:
if revision['id'] not in self._revisions:
self._revisions[revision['id']] = rev = copy.deepcopy(revision)
@@ -441,6 +481,9 @@
rev.get('committer_date'))
self._objects[revision['id']].append(
('revision', revision['id']))
+ count += 1
+
+ return {'revision:add': count}
def revision_missing(self, revision_ids):
"""List revisions missing from storage
@@ -518,17 +561,28 @@
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys with associated count as values
+
+ release:add: New objects contents actually stored in db
+
"""
if self.journal_writer:
self.journal_writer.write_additions('release', releases)
+ count = 0
for rel in releases:
- rel = copy.deepcopy(rel)
- rel['date'] = normalize_timestamp(rel['date'])
- self._person_add(rel['author'])
- self._objects[rel['id']].append(
- ('release', rel['id']))
- self._releases[rel['id']] = rel
+ if rel['id'] not in self._releases:
+ rel = copy.deepcopy(rel)
+ rel['date'] = normalize_timestamp(rel['date'])
+ self._person_add(rel['author'])
+ self._objects[rel['id']].append(
+ ('release', rel['id']))
+ self._releases[rel['id']] = rel
+ count += 1
+
+ return {'release:add': count}
def release_missing(self, releases):
"""List releases missing from storage
@@ -578,6 +632,12 @@
Raises:
ValueError: if the origin's or visit's identifier does not exist.
+
+ Returns:
+ Summary dict of keys with associated count as values
+
+ snapshot_added: Count of object actually stored in db
+
"""
if legacy_arg1:
assert legacy_arg2
@@ -586,11 +646,13 @@
else:
origin = visit = None
+ count = 0
snapshot_id = snapshot['id']
if self.journal_writer:
self.journal_writer.write_addition(
'snapshot', snapshot)
if snapshot_id not in self._snapshots:
+ count += 1
self._snapshots[snapshot_id] = {
'id': snapshot_id,
'branches': copy.deepcopy(snapshot['branches']),
@@ -601,6 +663,8 @@
if origin:
self.origin_visit_update(origin, visit, snapshot=snapshot_id)
+ return {'snapshot:add': count}
+
def snapshot_get(self, snapshot_id):
"""Get the content, possibly partial, of a snapshot with the given id
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -114,7 +114,29 @@
- origin (int): if status = absent, the origin we saw the
content in
+ Raises:
+
+ In case of errors, nothing is stored in the db (in the
+ objstorage, it could though). The following exceptions can
+ occur:
+
+ - HashCollision in case of collision
+ - Any other exceptions raise by the db
+
+ Returns:
+ Summary dict with the following key and associated values:
+
+ content:add: New contents added
+ content:bytes:add: Sum of the contents' length data
+ skipped_content:add: New skipped contents (no data) added
+
"""
+ summary = {
+ 'content:add': 0,
+ 'content:bytes:add': 0,
+ 'skipped_content:add': 0,
+ }
+
if self.journal_writer:
for item in content:
if 'data' in item:
@@ -137,7 +159,8 @@
for d in content:
if 'status' not in d:
d['status'] = 'visible'
- if 'length' not in d:
+ length = d.get('length')
+ if length is None:
d['length'] = -1
content_by_status[d['status']].append(d)
@@ -150,12 +173,29 @@
content_without_data))
def add_to_objstorage():
- data = {
- cont['sha1']: cont['data']
- for cont in content_with_data
- if cont['sha1'] in missing_content
- }
+ """Add to objstorage the new missing_content
+
+ Returns:
+ Sum of all the content's data length pushed to the
+ objstorage. No filtering is done on contents here, so
+ we might send over multiple times the same content and
+ count as many times the contents' raw length bytes.
+
+ """
+ content_bytes_added = 0
+ data = {}
+ for cont in content_with_data:
+ sha1 = cont['sha1']
+ seen = data.get(sha1)
+ if sha1 in missing_content and not seen:
+ data[sha1] = cont['data']
+ content_bytes_added += cont['length']
+
+ # FIXME: Since we do the filtering anyway now, we might as
+ # well make the objstorage's add_batch call return what we
+ # want here (real bytes added)... that'd simplify this...
self.objstorage.add_batch(data)
+ return content_bytes_added
with db.transaction() as cur:
with ThreadPoolExecutor(max_workers=1) as executor:
@@ -188,6 +228,8 @@
else:
raise
+ summary['content:add'] = len(missing_content)
+
if missing_skipped:
missing_filtered = (
cont for cont in content_without_data
@@ -200,10 +242,14 @@
# move metadata in place
db.skipped_content_add_from_temp(cur)
+ summary['skipped_content:add'] = len(missing_skipped)
# Wait for objstorage addition before returning from the
# transaction, bubbling up any exception
- added_to_objstorage.result()
+ content_bytes_added = added_to_objstorage.result()
+
+ summary['content:bytes:add'] = content_bytes_added
+ return summary
@db_transaction()
def content_update(self, content, keys=[], db=None, cur=None):
@@ -449,7 +495,14 @@
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
+
+ Returns:
+ Summary dict of keys with associated count as values:
+
+ directory:add: Number of directories actually added
+
"""
+ summary = {'directory:add': 0}
if self.journal_writer:
self.journal_writer.write_additions('directory', directories)
@@ -470,7 +523,7 @@
dirs_missing = set(self.directory_missing(dirs))
if not dirs_missing:
- return
+ return summary
db = self.get_db()
with db.transaction() as cur:
@@ -498,6 +551,9 @@
# Do the final copy
db.directory_add_from_temp(cur)
+ summary['directory:add'] = len(dirs_missing)
+
+ return summary
@db_transaction_generator()
def directory_missing(self, directories, db=None, cur=None):
@@ -583,7 +639,15 @@
this revision
date dictionaries have the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys with associated count as values
+
+ revision:add: New objects actually stored in db
+
"""
+ summary = {'revision:add': 0}
+
if self.journal_writer:
self.journal_writer.write_additions('revision', revisions)
@@ -593,7 +657,7 @@
set(revision['id'] for revision in revisions)))
if not revisions_missing:
- return
+ return summary
with db.transaction() as cur:
db.mktemp_revision(cur)
@@ -614,6 +678,8 @@
db.copy_to(parents_filtered, 'revision_history',
['id', 'parent_id', 'parent_rank'], cur)
+ return {'revision:add': len(revisions_missing)}
+
@db_transaction_generator()
def revision_missing(self, revisions, db=None, cur=None):
"""List revisions missing from storage
@@ -707,7 +773,15 @@
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys with associated count as values
+
+ release:add: New objects contents actually stored in db
+
"""
+ summary = {'release:add': 0}
+
if self.journal_writer:
self.journal_writer.write_additions('release', releases)
@@ -717,7 +791,7 @@
releases_missing = set(self.release_missing(release_ids))
if not releases_missing:
- return
+ return summary
with db.transaction() as cur:
db.mktemp_release(cur)
@@ -732,6 +806,8 @@
db.release_add_from_temp(cur)
+ return {'release:add': len(releases_missing)}
+
@db_transaction_generator()
def release_missing(self, releases, db=None, cur=None):
"""List releases missing from storage
@@ -793,6 +869,13 @@
Raises:
ValueError: if the origin or visit id does not exist.
+
+ Returns:
+
+ Summary dict of keys with associated count as values
+
+ snapshot:add: Count of object actually stored in db
+
"""
if origin:
if not visit:
@@ -811,7 +894,9 @@
# Called by new code that uses the new api/client.py
origin_id = visit_id = None
+ count = 0
if not db.snapshot_exists(snapshot['id'], cur):
+ count += 1
db.mktemp_snapshot_branch(cur)
db.copy_to(
(
@@ -837,6 +922,8 @@
origin_id, visit_id, snapshot=snapshot['id'],
db=db, cur=cur)
+ return {'snapshot:add': count}
+
@db_transaction(statement_timeout=2000)
def snapshot_get(self, snapshot_id, db=None, cur=None):
"""Get the content, possibly partial, of a snapshot with the given id
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -552,7 +552,13 @@
def test_content_add(self):
cont = self.cont
- self.storage.content_add([cont])
+ actual_result = self.storage.content_add([cont])
+ self.assertEqual(actual_result, {
+ 'content:add': 1,
+ 'content:bytes:add': cont['length'],
+ 'skipped_content:add': 0
+ })
+
self.assertEqual(list(self.storage.content_get([cont['sha1']])),
[{'sha1': cont['sha1'], 'data': cont['data']}])
@@ -561,10 +567,38 @@
self.assertEqual(list(self.journal_writer.objects),
[('content', expected_cont)])
+ def test_content_add_same_input(self):
+ cont = self.cont
+
+ actual_result = self.storage.content_add([cont, cont])
+ self.assertEqual(actual_result, {
+ 'content:add': 1,
+ 'content:bytes:add': cont['length'],
+ 'skipped_content:add': 0
+ })
+
+ def test_content_add_different_input(self):
+ cont = self.cont
+ cont2 = self.cont2
+
+ actual_result = self.storage.content_add([cont, cont2])
+ self.assertEqual(actual_result, {
+ 'content:add': 2,
+ 'content:bytes:add': cont['length'] + cont2['length'],
+ 'skipped_content:add': 0
+ })
+
def test_content_add_db(self):
cont = self.cont
- self.storage.content_add([cont])
+ actual_result = self.storage.content_add([cont])
+
+ self.assertEqual(actual_result, {
+ 'content:add': 1,
+ 'content:bytes:add': cont['length'],
+ 'skipped_content:add': 0
+ })
+
if hasattr(self.storage, 'objstorage'):
self.assertIn(cont['sha1'], self.storage.objstorage)
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status'
@@ -601,7 +635,13 @@
cont2 = self.skipped_cont2.copy()
cont2['blake2s256'] = None
- self.storage.content_add([cont, cont, cont2])
+ actual_result = self.storage.content_add([cont, cont, cont2])
+
+ self.assertEqual(actual_result, {
+ 'content:add': 0,
+ 'content:bytes:add': 0,
+ 'skipped_content:add': 2,
+ })
self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, '
'length, status, reason '
@@ -722,7 +762,9 @@
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
- self.storage.directory_add([self.dir])
+ actual_result = self.storage.directory_add([self.dir])
+ self.assertEqual(actual_result, {'directory:add': 1})
+
self.assertEqual(list(self.journal_writer.objects),
[('directory', self.dir)])
@@ -737,7 +779,10 @@
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
- self.storage.directory_add([self.dir, self.dir2, self.dir3])
+ actual_result = self.storage.directory_add(
+ [self.dir, self.dir2, self.dir3])
+ self.assertEqual(actual_result, {'directory:add': 3})
+
self.assertEqual(list(self.journal_writer.objects),
[('directory', self.dir),
('directory', self.dir2),
@@ -765,7 +810,8 @@
init_missing = list(self.storage.directory_missing([self.dir3['id']]))
self.assertEqual([self.dir3['id']], init_missing)
- self.storage.directory_add([self.dir3])
+ actual_result = self.storage.directory_add([self.dir3])
+ self.assertEqual(actual_result, {'directory:add': 1})
expected_entries = [
{
@@ -825,7 +871,8 @@
init_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([self.revision['id']], list(init_missing))
- self.storage.revision_add([self.revision])
+ actual_result = self.storage.revision_add([self.revision])
+ self.assertEqual(actual_result, {'revision:add': 1})
end_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([], list(end_missing))
@@ -833,6 +880,10 @@
self.assertEqual(list(self.journal_writer.objects),
[('revision', self.revision)])
+ # already there so nothing added
+ actual_result = self.storage.revision_add([self.revision])
+ self.assertEqual(actual_result, {'revision:add': 0})
+
def test_revision_log(self):
# given
# self.revision4 -is-child-of-> self.revision3
@@ -945,7 +996,8 @@
self.assertEqual([self.release['id'], self.release2['id']],
list(init_missing))
- self.storage.release_add([self.release, self.release2])
+ actual_result = self.storage.release_add([self.release, self.release2])
+ self.assertEqual(actual_result, {'release:add': 2})
end_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
@@ -955,6 +1007,10 @@
[('release', self.release),
('release', self.release2)])
+ # already present so nothing added
+ actual_result = self.storage.release_add([self.release, self.release2])
+ self.assertEqual(actual_result, {'release:add': 0})
+
def test_release_get(self):
# given
self.storage.release_add([self.release, self.release2])
@@ -1432,7 +1488,9 @@
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(self.empty_snapshot)
+ actual_result = self.storage.snapshot_add(self.empty_snapshot)
+ self.assertEqual(actual_result, {'snapshot:add': 1})
+
self.storage.origin_visit_update(
origin_id, visit_id, snapshot=self.empty_snapshot['id'])
@@ -1510,7 +1568,9 @@
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot:add': 1})
by_id = self.storage.snapshot_get(self.complete_snapshot['id'])
self.assertEqual(by_id, self.complete_snapshot)
@@ -1518,13 +1578,20 @@
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.complete_snapshot)
+ # already injected so no new snapshot added
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot:add': 0})
+
def test_snapshot_add_count_branches(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot:add': 1})
snp_id = self.complete_snapshot['id']
snp_size = self.storage.snapshot_count_branches(snp_id)

File Metadata

Mime Type
text/plain
Expires
Mar 17 2025, 6:25 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232918

Event Timeline