Page MenuHomeSoftware Heritage

D1321.id4235.diff
No OneTemporary

D1321.id4235.diff

diff --git a/sql/upgrades/131.sql b/sql/upgrades/131.sql
new file mode 100644
--- /dev/null
+++ b/sql/upgrades/131.sql
@@ -0,0 +1,10 @@
+-- SWH DB schema upgrade
+-- from_version: 130
+-- to_version: 131
+-- description: Use sha1 instead of bigint as FK from origin_visit to snapshot (part 1: add new column)
+
+insert into dbversion(version, release, description)
+ values(131, now(), 'Work In Progress');
+
+alter table origin_visit add column snapshot sha1_git;
+comment on column origin_visit.snapshot is 'Origin snapshot at visit time';
diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py
--- a/swh/storage/api/server.py
+++ b/swh/storage/api/server.py
@@ -34,12 +34,34 @@
return d
+def encode(f):
+ @wraps(f)
+ def d(*a, **kw):
+ r = f(*a, **kw)
+ return encode_data(r)
+
+ return d
+
+
+def increment(f):
+ """Increment object counters for the decorated function.
+
+ """
+ @wraps(f)
+ def d(*a, **kw):
+ r = f(*a, **kw)
+ statsd.increment('swh_storage_request_object_count',
+ r, tags={'endpoint': f.__name__})
+ return r
+
+ return d
+
+
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
-@timed
def get_storage():
global storage
if not storage:
@@ -90,9 +112,11 @@
@app.route('/content/add', methods=['POST'])
+@encode
@timed
+@increment
def content_add():
- return encode_data(get_storage().content_add(**decode_request(request)))
+ return get_storage().content_add(**decode_request(request))
@app.route('/content/update', methods=['POST'])
@@ -129,9 +153,11 @@
@app.route('/directory/add', methods=['POST'])
+@encode
@timed
+@increment
def directory_add():
- return encode_data(get_storage().directory_add(**decode_request(request)))
+ return get_storage().directory_add(**decode_request(request))
@app.route('/directory/path', methods=['POST'])
@@ -149,9 +175,11 @@
@app.route('/revision/add', methods=['POST'])
+@encode
@timed
+@increment
def revision_add():
- return encode_data(get_storage().revision_add(**decode_request(request)))
+ return get_storage().revision_add(**decode_request(request))
@app.route('/revision', methods=['POST'])
@@ -181,9 +209,11 @@
@app.route('/release/add', methods=['POST'])
+@encode
@timed
+@increment
def release_add():
- return encode_data(get_storage().release_add(**decode_request(request)))
+ return get_storage().release_add(**decode_request(request))
@app.route('/release', methods=['POST'])
@@ -207,9 +237,11 @@
@app.route('/snapshot/add', methods=['POST'])
+@encode
@timed
+@increment
def snapshot_add():
- return encode_data(get_storage().snapshot_add(**decode_request(request)))
+ return get_storage().snapshot_add(**decode_request(request))
@app.route('/snapshot', methods=['POST'])
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -311,6 +311,11 @@
update_cols.append('metadata=%s')
values.append(jsonize(updates.pop('metadata')))
if 'snapshot' in updates:
+ # New 'snapshot' column
+ update_cols.append('snapshot=%s')
+ values.append(updates['snapshot'])
+
+ # Old 'snapshot_id' column
update_cols.append('snapshot_id=snapshot.object_id')
from_ = 'FROM snapshot'
where.append('snapshot.id=%s')
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -77,6 +77,18 @@
- origin (int): if status = absent, the origin we saw the
content in
+ Raises:
+ HashCollision in case of collision
+
+ Returns:
+ Summary dict of keys 'content_added'
+ 'skipped_content_added', 'content_bytes_added' with
+ associated count as values
+
+ content_added: New contents added
+ content_bytes_added: Sum of the contents' length data
+ skipped_content_added: New skipped contents (no data) added
+
"""
if self.journal_writer:
for content in contents:
@@ -84,6 +96,11 @@
content = content.copy()
del content['data']
self.journal_writer.write_addition('content', content)
+
+ count_contents = 0
+ count_content_added = 0
+ count_content_bytes_added = 0
+
for content in contents:
key = self._content_key(content)
if key in self._contents:
@@ -99,10 +116,19 @@
self._contents[key] = copy.deepcopy(content)
self._contents[key]['ctime'] = now()
bisect.insort(self._sorted_sha1s, content['sha1'])
+ count_contents += 1
if self._contents[key]['status'] == 'visible':
+ count_content_added += 1
content_data = self._contents[key].pop('data')
+ count_content_bytes_added += len(content_data)
self.objstorage.add(content_data, content['sha1'])
+ return {
+ 'content_added': count_content_added,
+ 'content_bytes_added': count_content_bytes_added,
+ 'skipped_content_added': count_contents - count_content_added,
+ }
+
def content_get(self, ids):
"""Retrieve in bulk contents and their data.
@@ -294,16 +320,26 @@
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
+ Returns:
+ Summary dict of keys 'directory_added' with associated
+ count as values:
+
+ directory_added: Number of directories actually added
+
"""
if self.journal_writer:
self.journal_writer.write_additions('directory', directories)
+ count = 0
for directory in directories:
if directory['id'] not in self._directories:
+ count += 1
self._directories[directory['id']] = copy.deepcopy(directory)
self._objects[directory['id']].append(
('directory', directory['id']))
+ return {'directory_added': count}
+
def directory_missing(self, directory_ids):
"""List directories missing from storage
@@ -427,10 +463,18 @@
this revision
date dictionaries have the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys 'revision_added' with associated
+ count as values
+
+ revision_added: New objects actually stored in db
+
"""
if self.journal_writer:
self.journal_writer.write_additions('revision', revisions)
+ count = 0
for revision in revisions:
if revision['id'] not in self._revisions:
self._revisions[revision['id']] = rev = copy.deepcopy(revision)
@@ -441,6 +485,9 @@
rev.get('committer_date'))
self._objects[revision['id']].append(
('revision', revision['id']))
+ count += 1
+
+ return {'revision_added': count}
def revision_missing(self, revision_ids):
"""List revisions missing from storage
@@ -518,17 +565,29 @@
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys 'release_added' with associated count
+ as values
+
+ release_added: New objects contents actually stored in db
+
"""
if self.journal_writer:
self.journal_writer.write_additions('release', releases)
+ count = 0
for rel in releases:
- rel = copy.deepcopy(rel)
- rel['date'] = normalize_timestamp(rel['date'])
- self._person_add(rel['author'])
- self._objects[rel['id']].append(
- ('release', rel['id']))
- self._releases[rel['id']] = rel
+ if rel['id'] not in self._releases:
+ rel = copy.deepcopy(rel)
+ rel['date'] = normalize_timestamp(rel['date'])
+ self._person_add(rel['author'])
+ self._objects[rel['id']].append(
+ ('release', rel['id']))
+ self._releases[rel['id']] = rel
+ count += 1
+
+ return {'release_added': count}
def release_missing(self, releases):
"""List releases missing from storage
@@ -578,6 +637,13 @@
Raises:
ValueError: if the origin's or visit's identifier does not exist.
+
+ Returns:
+ Summary dict of keys 'snapshot_added' with associated
+ count as values
+
+ snapshot_added: Count of object actually stored in db
+
"""
if legacy_arg1:
assert legacy_arg2
@@ -586,11 +652,13 @@
else:
origin = visit = None
+ count = 0
snapshot_id = snapshot['id']
if self.journal_writer:
self.journal_writer.write_addition(
'snapshot', snapshot)
if snapshot_id not in self._snapshots:
+ count += 1
self._snapshots[snapshot_id] = {
'id': snapshot_id,
'branches': copy.deepcopy(snapshot['branches']),
@@ -601,6 +669,8 @@
if origin:
self.origin_visit_update(origin, visit, snapshot=snapshot_id)
+ return {'snapshot_added': count}
+
def snapshot_get(self, snapshot_id):
"""Get the content, possibly partial, of a snapshot with the given id
diff --git a/swh/storage/journal_writer.py b/swh/storage/journal_writer.py
--- a/swh/storage/journal_writer.py
+++ b/swh/storage/journal_writer.py
@@ -27,7 +27,8 @@
if cls == 'inmemory':
JournalWriter = InMemoryJournalWriter
elif cls == 'kafka':
- import swh.journal.direct_writer.DirectKafkaWriter as JournalWriter
+ from swh.journal.direct_writer import DirectKafkaWriter \
+ as JournalWriter
else:
raise ValueError('Unknown storage class `%s`' % cls)
diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql
--- a/swh/storage/sql/30-swh-schema.sql
+++ b/swh/storage/sql/30-swh-schema.sql
@@ -12,7 +12,7 @@
-- latest schema version
insert into dbversion(version, release, description)
- values(130, now(), 'Work In Progress');
+ values(131, now(), 'Work In Progress');
-- a SHA1 checksum
create domain sha1 as bytea check (length(value) = 20);
@@ -211,7 +211,8 @@
date timestamptz not null,
status origin_visit_status not null,
metadata jsonb,
- snapshot_id bigint
+ snapshot_id bigint,
+ snapshot sha1_git
);
comment on column origin_visit.origin is 'Visited origin';
@@ -220,6 +221,7 @@
comment on column origin_visit.status is 'Visit result';
comment on column origin_visit.metadata is 'Origin metadata at visit time';
comment on column origin_visit.snapshot_id is 'Origin snapshot at visit time';
+comment on column origin_visit.snapshot is 'Origin snapshot at visit time';
-- A snapshot represents the entire state of a software origin as crawled by
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -114,7 +114,31 @@
- origin (int): if status = absent, the origin we saw the
content in
+ Raises:
+
+ In case of errors, nothing is stored in the db (in the
+ objstorage, it could though). The following exceptions can
+ occur:
+
+ - HashCollision in case of collision
+ - Any other exceptions raise by the db
+
+ Returns:
+ Summary dict of keys 'content_added'
+ 'skipped_content_added', 'content_bytes_added' with
+ associated count as values
+
+ content_added: New contents added
+ content_bytes_added: Sum of the contents' length data
+ skipped_content_added: New skipped contents (no data) added
+
"""
+ summary = {
+ 'content_added': 0,
+ 'skipped_content_added': 0,
+ 'content_bytes_added': 0,
+ }
+
if self.journal_writer:
for item in content:
if 'data' in item:
@@ -137,7 +161,8 @@
for d in content:
if 'status' not in d:
d['status'] = 'visible'
- if 'length' not in d:
+ length = d.get('length')
+ if length is None:
d['length'] = -1
content_by_status[d['status']].append(d)
@@ -150,12 +175,29 @@
content_without_data))
def add_to_objstorage():
- data = {
- cont['sha1']: cont['data']
- for cont in content_with_data
- if cont['sha1'] in missing_content
- }
+ """Add to objstorage the new missing_content
+
+ Returns:
+ Sum of all the content's data length pushed to the
+ objstorage. No filtering is done on contents here, so
+ we might send over multiple times the same content and
+ count as many times the contents' raw length bytes.
+
+ """
+ content_bytes_added = 0
+ data = {}
+ for cont in content_with_data:
+ sha1 = cont['sha1']
+ seen = data.get(sha1)
+ if sha1 in missing_content and not seen:
+ data[sha1] = cont['data']
+ content_bytes_added += cont['length']
+
+ # FIXME: Since we do the filtering anyway now, we might as
+ # well make the objstorage's add_batch call return what we
+ # want here (real bytes added)... that'd simplify this...
self.objstorage.add_batch(data)
+ return content_bytes_added
with db.transaction() as cur:
with ThreadPoolExecutor(max_workers=1) as executor:
@@ -188,6 +230,8 @@
else:
raise
+ summary['content_added'] = len(missing_content)
+
if missing_skipped:
missing_filtered = (
cont for cont in content_without_data
@@ -200,10 +244,14 @@
# move metadata in place
db.skipped_content_add_from_temp(cur)
+ summary['skipped_content_added'] = len(missing_skipped)
# Wait for objstorage addition before returning from the
# transaction, bubbling up any exception
- added_to_objstorage.result()
+ content_bytes_added = added_to_objstorage.result()
+
+ summary['content_bytes_added'] = content_bytes_added
+ return summary
@db_transaction()
def content_update(self, content, keys=[], db=None, cur=None):
@@ -449,7 +497,15 @@
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
+
+ Returns:
+ Summary dict of keys 'directory_added' with associated
+ count as values:
+
+ directory_added: Number of directories actually added
+
"""
+ summary = {'directory_added': 0}
if self.journal_writer:
self.journal_writer.write_additions('directory', directories)
@@ -470,7 +526,7 @@
dirs_missing = set(self.directory_missing(dirs))
if not dirs_missing:
- return
+ return summary
db = self.get_db()
with db.transaction() as cur:
@@ -498,6 +554,9 @@
# Do the final copy
db.directory_add_from_temp(cur)
+ summary['directory_added'] = len(dirs_missing)
+
+ return summary
@db_transaction_generator()
def directory_missing(self, directories, db=None, cur=None):
@@ -583,7 +642,16 @@
this revision
date dictionaries have the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys 'revision_added' with associated
+ count as values
+
+ revision_added: New objects actually stored in db
+
"""
+ summary = {'revision_added': 0}
+
if self.journal_writer:
self.journal_writer.write_additions('revision', revisions)
@@ -593,7 +661,7 @@
set(revision['id'] for revision in revisions)))
if not revisions_missing:
- return
+ return summary
with db.transaction() as cur:
db.mktemp_revision(cur)
@@ -614,6 +682,8 @@
db.copy_to(parents_filtered, 'revision_history',
['id', 'parent_id', 'parent_rank'], cur)
+ return {'revision_added': len(revisions_missing)}
+
@db_transaction_generator()
def revision_missing(self, revisions, db=None, cur=None):
"""List revisions missing from storage
@@ -707,7 +777,16 @@
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
+
+ Returns:
+ Summary dict of keys 'release_added' with associated count
+ as values
+
+ release_added: New objects contents actually stored in db
+
"""
+ summary = {'release_added': 0}
+
if self.journal_writer:
self.journal_writer.write_additions('release', releases)
@@ -717,7 +796,7 @@
releases_missing = set(self.release_missing(release_ids))
if not releases_missing:
- return
+ return summary
with db.transaction() as cur:
db.mktemp_release(cur)
@@ -732,6 +811,8 @@
db.release_add_from_temp(cur)
+ return {'release_added': len(releases_missing)}
+
@db_transaction_generator()
def release_missing(self, releases, db=None, cur=None):
"""List releases missing from storage
@@ -793,6 +874,13 @@
Raises:
ValueError: if the origin or visit id does not exist.
+
+ Returns:
+ Summary dict of keys 'snapshot_added' with associated
+ count as values
+
+ snapshot_added: Count of object actually stored in db
+
"""
if origin:
if not visit:
@@ -811,7 +899,9 @@
# Called by new code that uses the new api/client.py
origin_id = visit_id = None
+ count = 0
if not db.snapshot_exists(snapshot['id'], cur):
+ count += 1
db.mktemp_snapshot_branch(cur)
db.copy_to(
(
@@ -837,6 +927,8 @@
origin_id, visit_id, snapshot=snapshot['id'],
db=db, cur=cur)
+ return {'snapshot_added': count}
+
@db_transaction(statement_timeout=2000)
def snapshot_get(self, snapshot_id, db=None, cur=None):
"""Get the content, possibly partial, of a snapshot with the given id
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -552,7 +552,13 @@
def test_content_add(self):
cont = self.cont
- self.storage.content_add([cont])
+ actual_result = self.storage.content_add([cont])
+ self.assertEqual(actual_result, {
+ 'content_added': 1,
+ 'content_bytes_added': cont['length'],
+ 'skipped_content_added': 0
+ })
+
self.assertEqual(list(self.storage.content_get([cont['sha1']])),
[{'sha1': cont['sha1'], 'data': cont['data']}])
@@ -561,10 +567,38 @@
self.assertEqual(list(self.journal_writer.objects),
[('content', expected_cont)])
+ def test_content_add_same_input(self):
+ cont = self.cont
+
+ actual_result = self.storage.content_add([cont, cont])
+ self.assertEqual(actual_result, {
+ 'content_added': 1,
+ 'content_bytes_added': cont['length'],
+ 'skipped_content_added': 0
+ })
+
+ def test_content_add_different_input(self):
+ cont = self.cont
+ cont2 = self.cont2
+
+ actual_result = self.storage.content_add([cont, cont2])
+ self.assertEqual(actual_result, {
+ 'content_added': 2,
+ 'content_bytes_added': cont['length'] + cont2['length'],
+ 'skipped_content_added': 0
+ })
+
def test_content_add_db(self):
cont = self.cont
- self.storage.content_add([cont])
+ actual_result = self.storage.content_add([cont])
+
+ self.assertEqual(actual_result, {
+ 'content_added': 1,
+ 'content_bytes_added': cont['length'],
+ 'skipped_content_added': 0
+ })
+
if hasattr(self.storage, 'objstorage'):
self.assertIn(cont['sha1'], self.storage.objstorage)
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status'
@@ -601,7 +635,13 @@
cont2 = self.skipped_cont2.copy()
cont2['blake2s256'] = None
- self.storage.content_add([cont, cont, cont2])
+ actual_result = self.storage.content_add([cont, cont, cont2])
+
+ self.assertEqual(actual_result, {
+ 'content_added': 0,
+ 'content_bytes_added': 0,
+ 'skipped_content_added': 2,
+ })
self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, '
'length, status, reason '
@@ -722,7 +762,9 @@
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
- self.storage.directory_add([self.dir])
+ actual_result = self.storage.directory_add([self.dir])
+ self.assertEqual(actual_result, {'directory_added': 1})
+
self.assertEqual(list(self.journal_writer.objects),
[('directory', self.dir)])
@@ -737,7 +779,10 @@
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
- self.storage.directory_add([self.dir, self.dir2, self.dir3])
+ actual_result = self.storage.directory_add(
+ [self.dir, self.dir2, self.dir3])
+ self.assertEqual(actual_result, {'directory_added': 3})
+
self.assertEqual(list(self.journal_writer.objects),
[('directory', self.dir),
('directory', self.dir2),
@@ -765,7 +810,8 @@
init_missing = list(self.storage.directory_missing([self.dir3['id']]))
self.assertEqual([self.dir3['id']], init_missing)
- self.storage.directory_add([self.dir3])
+ actual_result = self.storage.directory_add([self.dir3])
+ self.assertEqual(actual_result, {'directory_added': 1})
expected_entries = [
{
@@ -825,7 +871,8 @@
init_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([self.revision['id']], list(init_missing))
- self.storage.revision_add([self.revision])
+ actual_result = self.storage.revision_add([self.revision])
+ self.assertEqual(actual_result, {'revision_added': 1})
end_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([], list(end_missing))
@@ -833,6 +880,10 @@
self.assertEqual(list(self.journal_writer.objects),
[('revision', self.revision)])
+ # already there so nothing added
+ actual_result = self.storage.revision_add([self.revision])
+ self.assertEqual(actual_result, {'revision_added': 0})
+
def test_revision_log(self):
# given
# self.revision4 -is-child-of-> self.revision3
@@ -945,7 +996,8 @@
self.assertEqual([self.release['id'], self.release2['id']],
list(init_missing))
- self.storage.release_add([self.release, self.release2])
+ actual_result = self.storage.release_add([self.release, self.release2])
+ self.assertEqual(actual_result, {'release_added': 2})
end_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
@@ -955,6 +1007,10 @@
[('release', self.release),
('release', self.release2)])
+ # already present so nothing added
+ actual_result = self.storage.release_add([self.release, self.release2])
+ self.assertEqual(actual_result, {'release_added': 0})
+
def test_release_get(self):
# given
self.storage.release_add([self.release, self.release2])
@@ -1410,7 +1466,9 @@
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(self.empty_snapshot)
+ actual_result = self.storage.snapshot_add(self.empty_snapshot)
+ self.assertEqual(actual_result, {'snapshot_added': 1})
+
self.storage.origin_visit_update(
origin_id, visit_id, snapshot=self.empty_snapshot['id'])
@@ -1488,7 +1546,9 @@
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot_added': 1})
by_id = self.storage.snapshot_get(self.complete_snapshot['id'])
self.assertEqual(by_id, self.complete_snapshot)
@@ -1496,13 +1556,20 @@
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.complete_snapshot)
+ # already injected so no new snapshot added
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot_added': 0})
+
def test_snapshot_add_count_branches(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
+ actual_result = self.storage.snapshot_add(
+ origin_id, visit_id, self.complete_snapshot)
+ self.assertEqual(actual_result, {'snapshot_added': 1})
snp_id = self.complete_snapshot['id']
snp_size = self.storage.snapshot_count_branches(snp_id)

File Metadata

Mime Type
text/plain
Expires
Mar 17 2025, 6:27 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3233960

Event Timeline