Page MenuHomeSoftware Heritage

D1343.id4299.diff
No OneTemporary

D1343.id4299.diff

diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -92,6 +92,82 @@
return True
+ def _content_unique_key(self, hash):
+ """Given a hash (tuple or dict), return a unique key from the
+ aggregation of keys.
+
+ """
+ keys = self.get_db().content_hash_keys
+ if isinstance(hash, tuple):
+ return hash
+ return tuple([hash[k] for k in keys])
+
+ def _filter_new_content(self, content):
+ content_by_status = defaultdict(list)
+ for d in content:
+ if 'status' not in d:
+ d['status'] = 'visible'
+ if 'length' not in d:
+ d['length'] = -1
+ content_by_status[d['status']].append(d)
+
+ content_with_data = content_by_status['visible']
+ content_without_data = content_by_status['absent']
+
+ missing_content = set(self.content_missing(content_with_data))
+ missing_skipped = set(self._content_unique_key(hashes) for hashes
+ in self.skipped_content_missing(
+ content_without_data))
+
+ content_with_data = [
+ cont for cont in content_with_data
+ if cont['sha1'] in missing_content]
+ content_without_data = [
+ cont for cont in content_without_data
+ if self._content_unique_key(cont) in missing_skipped]
+
+ summary = {
+ 'content:add': len(missing_content),
+ 'skipped_content:add': len(missing_skipped),
+ }
+
+ return (content_with_data, content_without_data, summary)
+
+ def _content_add_metadata(self, db, cur,
+ content_with_data, content_without_data):
+ if content_with_data:
+ # create temporary table for metadata injection
+ db.mktemp('content', cur)
+
+ db.copy_to(content_with_data, 'tmp_content',
+ db.content_get_metadata_keys, cur)
+
+ # move metadata in place
+ try:
+ db.content_add_from_temp(cur)
+ except psycopg2.IntegrityError as e:
+ from . import HashCollision
+ if e.diag.sqlstate == '23505' and \
+ e.diag.table_name == 'content':
+ constraint_to_hash_name = {
+ 'content_pkey': 'sha1',
+ 'content_sha1_git_idx': 'sha1_git',
+ 'content_sha256_idx': 'sha256',
+ }
+ colliding_hash_name = constraint_to_hash_name \
+ .get(e.diag.constraint_name)
+ raise HashCollision(colliding_hash_name)
+ else:
+ raise
+
+ if content_without_data:
+ db.mktemp('skipped_content', cur)
+ db.copy_to(content_without_data, 'tmp_skipped_content',
+ db.skipped_content_keys, cur)
+
+ # move metadata in place
+ db.skipped_content_add_from_temp(cur)
+
def content_add(self, content):
"""Add content blobs to the storage
@@ -129,13 +205,7 @@
content:add: New contents added
content:bytes:add: Sum of the contents' length data
skipped_content:add: New skipped contents (no data) added
-
"""
- summary = {
- 'content:add': 0,
- 'content:bytes:add': 0,
- 'skipped_content:add': 0,
- }
if self.journal_writer:
for item in content:
@@ -146,50 +216,23 @@
db = self.get_db()
- def _unique_key(hash, keys=db.content_hash_keys):
- """Given a hash (tuple or dict), return a unique key from the
- aggregation of keys.
-
- """
- if isinstance(hash, tuple):
- return hash
- return tuple([hash[k] for k in keys])
-
- content_by_status = defaultdict(list)
- for d in content:
- if 'status' not in d:
- d['status'] = 'visible'
- length = d.get('length')
- if length is None:
- d['length'] = -1
- content_by_status[d['status']].append(d)
-
- content_with_data = content_by_status['visible']
- content_without_data = content_by_status['absent']
-
- missing_content = set(self.content_missing(content_with_data))
- missing_skipped = set(_unique_key(hashes) for hashes
- in self.skipped_content_missing(
- content_without_data))
+ (content_with_data, content_without_data, summary) = \
+ self._filter_new_content(content)
def add_to_objstorage():
"""Add to objstorage the new missing_content
Returns:
Sum of all the content's data length pushed to the
- objstorage. No filtering is done on contents here, so
- we might send over multiple times the same content and
- count as many times the contents' raw length bytes.
+ objstorage. Content present twice is only sent once.
"""
content_bytes_added = 0
data = {}
for cont in content_with_data:
- sha1 = cont['sha1']
- seen = data.get(sha1)
- if sha1 in missing_content and not seen:
- data[sha1] = cont['data']
- content_bytes_added += cont['length']
+ if cont['sha1'] not in data:
+ data[cont['sha1']] = cont['data']
+ content_bytes_added += max(0, cont['length'])
# FIXME: Since we do the filtering anyway now, we might as
# well make the objstorage's add_batch call return what we
@@ -200,49 +243,9 @@
with db.transaction() as cur:
with ThreadPoolExecutor(max_workers=1) as executor:
added_to_objstorage = executor.submit(add_to_objstorage)
- if missing_content:
- # create temporary table for metadata injection
- db.mktemp('content', cur)
-
- content_filtered = (cont for cont in content_with_data
- if cont['sha1'] in missing_content)
-
- db.copy_to(content_filtered, 'tmp_content',
- db.content_get_metadata_keys, cur)
-
- # move metadata in place
- try:
- db.content_add_from_temp(cur)
- except psycopg2.IntegrityError as e:
- from . import HashCollision
- if e.diag.sqlstate == '23505' and \
- e.diag.table_name == 'content':
- constraint_to_hash_name = {
- 'content_pkey': 'sha1',
- 'content_sha1_git_idx': 'sha1_git',
- 'content_sha256_idx': 'sha256',
- }
- colliding_hash_name = constraint_to_hash_name \
- .get(e.diag.constraint_name)
- raise HashCollision(colliding_hash_name)
- else:
- raise
-
- summary['content:add'] = len(missing_content)
-
- if missing_skipped:
- missing_filtered = (
- cont for cont in content_without_data
- if _unique_key(cont) in missing_skipped
- )
-
- db.mktemp('skipped_content', cur)
- db.copy_to(missing_filtered, 'tmp_skipped_content',
- db.skipped_content_keys, cur)
-
- # move metadata in place
- db.skipped_content_add_from_temp(cur)
- summary['skipped_content:add'] = len(missing_skipped)
+
+ self._content_add_metadata(
+ db, cur, content_with_data, content_without_data)
# Wait for objstorage addition before returning from the
# transaction, bubbling up any exception

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:49 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216643

Event Timeline