Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7342976
D1321.id4235.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
26 KB
Subscribers
None
D1321.id4235.diff
View Options
diff --git a/sql/upgrades/131.sql b/sql/upgrades/131.sql
new file mode 100644
--- /dev/null
+++ b/sql/upgrades/131.sql
@@ -0,0 +1,10 @@
+-- SWH DB schema upgrade
+-- from_version: 130
+-- to_version: 131
+-- description: Use sha1 instead of bigint as FK from origin_visit to snapshot (part 1: add new column)
+
+insert into dbversion(version, release, description)
+ values(131, now(), 'Work In Progress');
+
+alter table origin_visit add column snapshot sha1_git;
+comment on column origin_visit.snapshot is 'Origin snapshot at visit time';
diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py
--- a/swh/storage/api/server.py
+++ b/swh/storage/api/server.py
@@ -34,12 +34,34 @@
return d
+def encode(f):
+ @wraps(f)
+ def d(*a, **kw):
+ r = f(*a, **kw)
+ return encode_data(r)
+
+ return d
+
+
+def increment(f):
+ """Increment object counters for the decorated function.
+
+ """
+ @wraps(f)
+ def d(*a, **kw):
+ r = f(*a, **kw)
+ statsd.increment('swh_storage_request_object_count',
+ r, tags={'endpoint': f.__name__})
+ return r
+
+ return d
+
+
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
-@timed
def get_storage():
global storage
if not storage:
@@ -90,9 +112,11 @@
@app.route('/content/add', methods=['POST'])
+@encode
@timed
+@increment
def content_add():
- return encode_data(get_storage().content_add(**decode_request(request)))
+ return get_storage().content_add(**decode_request(request))
@app.route('/content/update', methods=['POST'])
@@ -129,9 +153,11 @@
@app.route('/directory/add', methods=['POST'])
+@encode
@timed
+@increment
def directory_add():
- return encode_data(get_storage().directory_add(**decode_request(request)))
+ return get_storage().directory_add(**decode_request(request))
@app.route('/directory/path', methods=['POST'])
@@ -149,9 +175,11 @@
@app.route('/revision/add', methods=['POST'])
+@encode
@timed
+@increment
def revision_add():
- return encode_data(get_storage().revision_add(**decode_request(request)))
+ return get_storage().revision_add(**decode_request(request))
@app.route('/revision', methods=['POST'])
@@ -181,9 +209,11 @@
@app.route('/release/add', methods=['POST'])
+@encode
@timed
+@increment
def release_add():
- return encode_data(get_storage().release_add(**decode_request(request)))
+ return get_storage().release_add(**decode_request(request))
@app.route('/release', methods=['POST'])
@@ -207,9 +237,11 @@
@app.route('/snapshot/add', methods=['POST'])
+@encode
@timed
+@increment
def snapshot_add():
- return encode_data(get_storage().snapshot_add(**decode_request(request)))
+ return get_storage().snapshot_add(**decode_request(request))
@app.route('/snapshot', methods=['POST'])
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -311,6 +311,11 @@
update_cols.append('metadata=%s')
values.append(jsonize(updates.pop('metadata')))
if 'snapshot' in updates:
+ # New 'snapshot' column
+ update_cols.append('snapshot=%s')
+ values.append(updates['snapshot'])
+
+ # Old 'snapshot_id' column
update_cols.append('snapshot_id=snapshot.object_id')
from_ = 'FROM snapshot'
where.append('snapshot.id=%s')
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -77,6 +77,18 @@
- origin (int): if status = absent, the origin we saw the
content in
+ Raises:
+ HashCollision in case of collision
+
+ Returns:
+ Summary dict of keys 'content_added'
+ 'skipped_content_added', 'content_bytes_added' with
+ associated count as values
+
+ content_added: New contents added
+ content_bytes_added: Sum of the contents' length data
+ skipped_content_added: New skipped contents (no data) added
+
"""
if self.journal_writer:
for content in contents:
@@ -84,6 +96,11 @@
content = content.copy()
del content['data']
self.journal_writer.write_addition('content', content)
+
+ count_contents = 0
+ count_content_added = 0
+ count_content_bytes_added = 0
+
for content in contents:
key = self._content_key(content)
if key in self._contents:
@@ -99,10 +116,19 @@
self._contents[key] = copy.deepcopy(content)
self._contents[key]['ctime'] = now()
bisect.insort(self._sorted_sha1s, content['sha1'])
+ count_contents += 1
if self._contents[key]['status'] == 'visible':
+ count_content_added += 1
content_data = self._contents[key].pop('data')
+ count_content_bytes_added += len(content_data)
self.objstorage.add(content_data, content['sha1'])
+ return {
+ 'content_added': count_content_added,
+ 'content_bytes_added': count_content_bytes_added,
+ 'skipped_content_added': count_contents - count_content_added,
+ }
+
def content_get(self, ids):
"""Retrieve in bulk contents and their data.
@@ -294,16 +320,26 @@
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
+ Returns:
+ Summary dict of keys 'directory_added' with associated
+ count as values:
+
+ directory_added: Number of directories actually added
+
"""
if self.journal_writer:
self.journal_writer.write_additions('directory', directories)
+ count = 0
for directory in directories:
if directory['id'] not in self._directories:
+ count += 1
self._directories[directory['id']] = copy.deepcopy(directory)
self._objects[directory['id']].append(
('directory', directory['id']))
+ return {'directory_added': count}
+
def directory_missing(self, directory_ids):
"""List directories missing from storage
@@ -427,10 +463,18 @@
this revision
date dictionaries have the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys 'revision_added' with associated
+ count as values
+
+ revision_added: New objects actually stored in db
+
"""
if self.journal_writer:
self.journal_writer.write_additions('revision', revisions)
+ count = 0
for revision in revisions:
if revision['id'] not in self._revisions:
self._revisions[revision['id']] = rev = copy.deepcopy(revision)
@@ -441,6 +485,9 @@
rev.get('committer_date'))
self._objects[revision['id']].append(
('revision', revision['id']))
+ count += 1
+
+ return {'revision_added': count}
def revision_missing(self, revision_ids):
"""List revisions missing from storage
@@ -518,17 +565,29 @@
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys 'release_added' with associated count
+ as values
+
+ release_added: New objects contents actually stored in db
+
"""
if self.journal_writer:
self.journal_writer.write_additions('release', releases)
+ count = 0
for rel in releases:
- rel = copy.deepcopy(rel)
- rel['date'] = normalize_timestamp(rel['date'])
- self._person_add(rel['author'])
- self._objects[rel['id']].append(
- ('release', rel['id']))
- self._releases[rel['id']] = rel
+ if rel['id'] not in self._releases:
+ rel = copy.deepcopy(rel)
+ rel['date'] = normalize_timestamp(rel['date'])
+ self._person_add(rel['author'])
+ self._objects[rel['id']].append(
+ ('release', rel['id']))
+ self._releases[rel['id']] = rel
+ count += 1
+
+ return {'release_added': count}
def release_missing(self, releases):
"""List releases missing from storage
@@ -578,6 +637,13 @@
Raises:
ValueError: if the origin's or visit's identifier does not exist.
+
+ Returns:
+ Summary dict of keys 'snapshot_added' with associated
+ count as values
+
+ snapshot_added: Count of object actually stored in db
+
"""
if legacy_arg1:
assert legacy_arg2
@@ -586,11 +652,13 @@
else:
origin = visit = None
+ count = 0
snapshot_id = snapshot['id']
if self.journal_writer:
self.journal_writer.write_addition(
'snapshot', snapshot)
if snapshot_id not in self._snapshots:
+ count += 1
self._snapshots[snapshot_id] = {
'id': snapshot_id,
'branches': copy.deepcopy(snapshot['branches']),
@@ -601,6 +669,8 @@
if origin:
self.origin_visit_update(origin, visit, snapshot=snapshot_id)
+ return {'snapshot_added': count}
+
def snapshot_get(self, snapshot_id):
"""Get the content, possibly partial, of a snapshot with the given id
diff --git a/swh/storage/journal_writer.py b/swh/storage/journal_writer.py
--- a/swh/storage/journal_writer.py
+++ b/swh/storage/journal_writer.py
@@ -27,7 +27,8 @@
if cls == 'inmemory':
JournalWriter = InMemoryJournalWriter
elif cls == 'kafka':
- import swh.journal.direct_writer.DirectKafkaWriter as JournalWriter
+ from swh.journal.direct_writer import DirectKafkaWriter \
+ as JournalWriter
else:
raise ValueError('Unknown storage class `%s`' % cls)
diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql
--- a/swh/storage/sql/30-swh-schema.sql
+++ b/swh/storage/sql/30-swh-schema.sql
@@ -12,7 +12,7 @@
-- latest schema version
insert into dbversion(version, release, description)
- values(130, now(), 'Work In Progress');
+ values(131, now(), 'Work In Progress');
-- a SHA1 checksum
create domain sha1 as bytea check (length(value) = 20);
@@ -211,7 +211,8 @@
date timestamptz not null,
status origin_visit_status not null,
metadata jsonb,
- snapshot_id bigint
+ snapshot_id bigint,
+ snapshot sha1_git
);
comment on column origin_visit.origin is 'Visited origin';
@@ -220,6 +221,7 @@
comment on column origin_visit.status is 'Visit result';
comment on column origin_visit.metadata is 'Origin metadata at visit time';
comment on column origin_visit.snapshot_id is 'Origin snapshot at visit time';
+comment on column origin_visit.snapshot is 'Origin snapshot at visit time';
-- A snapshot represents the entire state of a software origin as crawled by
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -114,7 +114,31 @@
- origin (int): if status = absent, the origin we saw the
content in
+ Raises:
+
+ In case of errors, nothing is stored in the db (in the
+ objstorage, it could though). The following exceptions can
+ occur:
+
+ - HashCollision in case of collision
+ - Any other exceptions raise by the db
+
+ Returns:
+ Summary dict of keys 'content_added'
+ 'skipped_content_added', 'content_bytes_added' with
+ associated count as values
+
+ content_added: New contents added
+ content_bytes_added: Sum of the contents' length data
+ skipped_content_added: New skipped contents (no data) added
+
"""
+ summary = {
+ 'content_added': 0,
+ 'skipped_content_added': 0,
+ 'content_bytes_added': 0,
+ }
+
if self.journal_writer:
for item in content:
if 'data' in item:
@@ -137,7 +161,8 @@
for d in content:
if 'status' not in d:
d['status'] = 'visible'
- if 'length' not in d:
+ length = d.get('length')
+ if length is None:
d['length'] = -1
content_by_status[d['status']].append(d)
@@ -150,12 +175,29 @@
content_without_data))
def add_to_objstorage():
- data = {
- cont['sha1']: cont['data']
- for cont in content_with_data
- if cont['sha1'] in missing_content
- }
+ """Add to objstorage the new missing_content
+
+ Returns:
+ Sum of all the content's data length pushed to the
+ objstorage. No filtering is done on contents here, so
+ we might send over multiple times the same content and
+ count as many times the contents' raw length bytes.
+
+ """
+ content_bytes_added = 0
+ data = {}
+ for cont in content_with_data:
+ sha1 = cont['sha1']
+ seen = data.get(sha1)
+ if sha1 in missing_content and not seen:
+ data[sha1] = cont['data']
+ content_bytes_added += cont['length']
+
+ # FIXME: Since we do the filtering anyway now, we might as
+ # well make the objstorage's add_batch call return what we
+ # want here (real bytes added)... that'd simplify this...
self.objstorage.add_batch(data)
+ return content_bytes_added
with db.transaction() as cur:
with ThreadPoolExecutor(max_workers=1) as executor:
@@ -188,6 +230,8 @@
else:
raise
+ summary['content_added'] = len(missing_content)
+
if missing_skipped:
missing_filtered = (
cont for cont in content_without_data
@@ -200,10 +244,14 @@
# move metadata in place
db.skipped_content_add_from_temp(cur)
+ summary['skipped_content_added'] = len(missing_skipped)
# Wait for objstorage addition before returning from the
# transaction, bubbling up any exception
- added_to_objstorage.result()
+ content_bytes_added = added_to_objstorage.result()
+
+ summary['content_bytes_added'] = content_bytes_added
+ return summary
@db_transaction()
def content_update(self, content, keys=[], db=None, cur=None):
@@ -449,7 +497,15 @@
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
+
+ Returns:
+ Summary dict of keys 'directory_added' with associated
+ count as values:
+
+ directory_added: Number of directories actually added
+
"""
+ summary = {'directory_added': 0}
if self.journal_writer:
self.journal_writer.write_additions('directory', directories)
@@ -470,7 +526,7 @@
dirs_missing = set(self.directory_missing(dirs))
if not dirs_missing:
- return
+ return summary
db = self.get_db()
with db.transaction() as cur:
@@ -498,6 +554,9 @@
# Do the final copy
db.directory_add_from_temp(cur)
+ summary['directory_added'] = len(dirs_missing)
+
+ return summary
@db_transaction_generator()
def directory_missing(self, directories, db=None, cur=None):
@@ -583,7 +642,16 @@
this revision
date dictionaries have the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys 'revision_added' with associated
+ count as values
+
+ revision_added: New objects actually stored in db
+
"""
+ summary = {'revision_added': 0}
+
if self.journal_writer:
self.journal_writer.write_additions('revision', revisions)
@@ -593,7 +661,7 @@
set(revision['id'] for revision in revisions)))
if not revisions_missing:
- return
+ return summary
with db.transaction() as cur:
db.mktemp_revision(cur)
@@ -614,6 +682,8 @@
db.copy_to(parents_filtered, 'revision_history',
['id', 'parent_id', 'parent_rank'], cur)
+ return {'revision_added': len(revisions_missing)}
+
@db_transaction_generator()
def revision_missing(self, revisions, db=None, cur=None):
"""List revisions missing from storage
@@ -707,7 +777,16 @@
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys 'release_added' with associated count
+ as values
+
+ release_added: New objects contents actually stored in db
+
"""
+ summary = {'release_added': 0}
+
if self.journal_writer:
self.journal_writer.write_additions('release', releases)
@@ -717,7 +796,7 @@
releases_missing = set(self.release_missing(release_ids))
if not releases_missing:
- return
+ return summary
with db.transaction() as cur:
db.mktemp_release(cur)
@@ -732,6 +811,8 @@
db.release_add_from_temp(cur)
+ return {'release_added': len(releases_missing)}
+
@db_transaction_generator()
def release_missing(self, releases, db=None, cur=None):
"""List releases missing from storage
@@ -793,6 +874,13 @@
Raises:
ValueError: if the origin or visit id does not exist.
+
+ Returns:
+ Summary dict of keys 'snapshot_added' with associated
+ count as values
+
+ snapshot_added: Count of object actually stored in db
+
"""
if origin:
if not visit:
@@ -811,7 +899,9 @@
# Called by new code that uses the new api/client.py
origin_id = visit_id = None
+ count = 0
if not db.snapshot_exists(snapshot['id'], cur):
+ count += 1
db.mktemp_snapshot_branch(cur)
db.copy_to(
(
@@ -837,6 +927,8 @@
origin_id, visit_id, snapshot=snapshot['id'],
db=db, cur=cur)
+ return {'snapshot_added': count}
+
@db_transaction(statement_timeout=2000)
def snapshot_get(self, snapshot_id, db=None, cur=None):
"""Get the content, possibly partial, of a snapshot with the given id
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -552,7 +552,13 @@
def test_content_add(self):
cont = self.cont
- self.storage.content_add([cont])
+ actual_result = self.storage.content_add([cont])
+ self.assertEqual(actual_result, {
+ 'content_added': 1,
+ 'content_bytes_added': cont['length'],
+ 'skipped_content_added': 0
+ })
+
self.assertEqual(list(self.storage.content_get([cont['sha1']])),
[{'sha1': cont['sha1'], 'data': cont['data']}])
@@ -561,10 +567,38 @@
self.assertEqual(list(self.journal_writer.objects),
[('content', expected_cont)])
+ def test_content_add_same_input(self):
+ cont = self.cont
+
+ actual_result = self.storage.content_add([cont, cont])
+ self.assertEqual(actual_result, {
+ 'content_added': 1,
+ 'content_bytes_added': cont['length'],
+ 'skipped_content_added': 0
+ })
+
+ def test_content_add_different_input(self):
+ cont = self.cont
+ cont2 = self.cont2
+
+ actual_result = self.storage.content_add([cont, cont2])
+ self.assertEqual(actual_result, {
+ 'content_added': 2,
+ 'content_bytes_added': cont['length'] + cont2['length'],
+ 'skipped_content_added': 0
+ })
+
def test_content_add_db(self):
cont = self.cont
- self.storage.content_add([cont])
+ actual_result = self.storage.content_add([cont])
+
+ self.assertEqual(actual_result, {
+ 'content_added': 1,
+ 'content_bytes_added': cont['length'],
+ 'skipped_content_added': 0
+ })
+
if hasattr(self.storage, 'objstorage'):
self.assertIn(cont['sha1'], self.storage.objstorage)
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status'
@@ -601,7 +635,13 @@
cont2 = self.skipped_cont2.copy()
cont2['blake2s256'] = None
- self.storage.content_add([cont, cont, cont2])
+ actual_result = self.storage.content_add([cont, cont, cont2])
+
+ self.assertEqual(actual_result, {
+ 'content_added': 0,
+ 'content_bytes_added': 0,
+ 'skipped_content_added': 2,
+ })
self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, '
'length, status, reason '
@@ -722,7 +762,9 @@
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
- self.storage.directory_add([self.dir])
+ actual_result = self.storage.directory_add([self.dir])
+ self.assertEqual(actual_result, {'directory_added': 1})
+
self.assertEqual(list(self.journal_writer.objects),
[('directory', self.dir)])
@@ -737,7 +779,10 @@
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
- self.storage.directory_add([self.dir, self.dir2, self.dir3])
+ actual_result = self.storage.directory_add(
+ [self.dir, self.dir2, self.dir3])
+ self.assertEqual(actual_result, {'directory_added': 3})
+
self.assertEqual(list(self.journal_writer.objects),
[('directory', self.dir),
('directory', self.dir2),
@@ -765,7 +810,8 @@
init_missing = list(self.storage.directory_missing([self.dir3['id']]))
self.assertEqual([self.dir3['id']], init_missing)
- self.storage.directory_add([self.dir3])
+ actual_result = self.storage.directory_add([self.dir3])
+ self.assertEqual(actual_result, {'directory_added': 1})
expected_entries = [
{
@@ -825,7 +871,8 @@
init_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([self.revision['id']], list(init_missing))
- self.storage.revision_add([self.revision])
+ actual_result = self.storage.revision_add([self.revision])
+ self.assertEqual(actual_result, {'revision_added': 1})
end_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([], list(end_missing))
@@ -833,6 +880,10 @@
self.assertEqual(list(self.journal_writer.objects),
[('revision', self.revision)])
+ # already there so nothing added
+ actual_result = self.storage.revision_add([self.revision])
+ self.assertEqual(actual_result, {'revision_added': 0})
+
def test_revision_log(self):
# given
# self.revision4 -is-child-of-> self.revision3
@@ -945,7 +996,8 @@
self.assertEqual([self.release['id'], self.release2['id']],
list(init_missing))
- self.storage.release_add([self.release, self.release2])
+ actual_result = self.storage.release_add([self.release, self.release2])
+ self.assertEqual(actual_result, {'release_added': 2})
end_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
@@ -955,6 +1007,10 @@
[('release', self.release),
('release', self.release2)])
+ # already present so nothing added
+ actual_result = self.storage.release_add([self.release, self.release2])
+ self.assertEqual(actual_result, {'release_added': 0})
+
def test_release_get(self):
# given
self.storage.release_add([self.release, self.release2])
@@ -1410,7 +1466,9 @@
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(self.empty_snapshot)
+ actual_result = self.storage.snapshot_add(self.empty_snapshot)
+ self.assertEqual(actual_result, {'snapshot_added': 1})
+
self.storage.origin_visit_update(
origin_id, visit_id, snapshot=self.empty_snapshot['id'])
@@ -1488,7 +1546,9 @@
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot_added': 1})
by_id = self.storage.snapshot_get(self.complete_snapshot['id'])
self.assertEqual(by_id, self.complete_snapshot)
@@ -1496,13 +1556,20 @@
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.complete_snapshot)
+ # already injected so no new snapshot added
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot_added': 0})
+
def test_snapshot_add_count_branches(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot_added': 1})
snp_id = self.complete_snapshot['id']
snp_size = self.storage.snapshot_count_branches(snp_id)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 6:27 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3233960
Attached To
D1321: swh-storage: Install counter metrics
Event Timeline
Log In to Comment