Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9342554
D1343.id4299.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D1343.id4299.diff
View Options
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -92,6 +92,82 @@
return True
+ def _content_unique_key(self, hash):
+ """Given a hash (tuple or dict), return a unique key from the
+ aggregation of keys.
+
+ """
+ keys = self.get_db().content_hash_keys
+ if isinstance(hash, tuple):
+ return hash
+ return tuple([hash[k] for k in keys])
+
+ def _filter_new_content(self, content):
+ content_by_status = defaultdict(list)
+ for d in content:
+ if 'status' not in d:
+ d['status'] = 'visible'
+ if 'length' not in d:
+ d['length'] = -1
+ content_by_status[d['status']].append(d)
+
+ content_with_data = content_by_status['visible']
+ content_without_data = content_by_status['absent']
+
+ missing_content = set(self.content_missing(content_with_data))
+ missing_skipped = set(self._content_unique_key(hashes) for hashes
+ in self.skipped_content_missing(
+ content_without_data))
+
+ content_with_data = [
+ cont for cont in content_with_data
+ if cont['sha1'] in missing_content]
+ content_without_data = [
+ cont for cont in content_without_data
+ if self._content_unique_key(cont) in missing_skipped]
+
+ summary = {
+ 'content:add': len(missing_content),
+ 'skipped_content:add': len(missing_skipped),
+ }
+
+ return (content_with_data, content_without_data, summary)
+
+ def _content_add_metadata(self, db, cur,
+ content_with_data, content_without_data):
+ if content_with_data:
+ # create temporary table for metadata injection
+ db.mktemp('content', cur)
+
+ db.copy_to(content_with_data, 'tmp_content',
+ db.content_get_metadata_keys, cur)
+
+ # move metadata in place
+ try:
+ db.content_add_from_temp(cur)
+ except psycopg2.IntegrityError as e:
+ from . import HashCollision
+ if e.diag.sqlstate == '23505' and \
+ e.diag.table_name == 'content':
+ constraint_to_hash_name = {
+ 'content_pkey': 'sha1',
+ 'content_sha1_git_idx': 'sha1_git',
+ 'content_sha256_idx': 'sha256',
+ }
+ colliding_hash_name = constraint_to_hash_name \
+ .get(e.diag.constraint_name)
+ raise HashCollision(colliding_hash_name)
+ else:
+ raise
+
+ if content_without_data:
+ db.mktemp('skipped_content', cur)
+ db.copy_to(content_without_data, 'tmp_skipped_content',
+ db.skipped_content_keys, cur)
+
+ # move metadata in place
+ db.skipped_content_add_from_temp(cur)
+
def content_add(self, content):
"""Add content blobs to the storage
@@ -129,13 +205,7 @@
content:add: New contents added
content:bytes:add: Sum of the contents' length data
skipped_content:add: New skipped contents (no data) added
-
"""
- summary = {
- 'content:add': 0,
- 'content:bytes:add': 0,
- 'skipped_content:add': 0,
- }
if self.journal_writer:
for item in content:
@@ -146,50 +216,23 @@
db = self.get_db()
- def _unique_key(hash, keys=db.content_hash_keys):
- """Given a hash (tuple or dict), return a unique key from the
- aggregation of keys.
-
- """
- if isinstance(hash, tuple):
- return hash
- return tuple([hash[k] for k in keys])
-
- content_by_status = defaultdict(list)
- for d in content:
- if 'status' not in d:
- d['status'] = 'visible'
- length = d.get('length')
- if length is None:
- d['length'] = -1
- content_by_status[d['status']].append(d)
-
- content_with_data = content_by_status['visible']
- content_without_data = content_by_status['absent']
-
- missing_content = set(self.content_missing(content_with_data))
- missing_skipped = set(_unique_key(hashes) for hashes
- in self.skipped_content_missing(
- content_without_data))
+ (content_with_data, content_without_data, summary) = \
+ self._filter_new_content(content)
def add_to_objstorage():
"""Add to objstorage the new missing_content
Returns:
Sum of all the content's data length pushed to the
- objstorage. No filtering is done on contents here, so
- we might send over multiple times the same content and
- count as many times the contents' raw length bytes.
+ objstorage. Content present twice is only sent once.
"""
content_bytes_added = 0
data = {}
for cont in content_with_data:
- sha1 = cont['sha1']
- seen = data.get(sha1)
- if sha1 in missing_content and not seen:
- data[sha1] = cont['data']
- content_bytes_added += cont['length']
+ if cont['sha1'] not in data:
+ data[cont['sha1']] = cont['data']
+ content_bytes_added += max(0, cont['length'])
# FIXME: Since we do the filtering anyway now, we might as
# well make the objstorage's add_batch call return what we
@@ -200,49 +243,9 @@
with db.transaction() as cur:
with ThreadPoolExecutor(max_workers=1) as executor:
added_to_objstorage = executor.submit(add_to_objstorage)
- if missing_content:
- # create temporary table for metadata injection
- db.mktemp('content', cur)
-
- content_filtered = (cont for cont in content_with_data
- if cont['sha1'] in missing_content)
-
- db.copy_to(content_filtered, 'tmp_content',
- db.content_get_metadata_keys, cur)
-
- # move metadata in place
- try:
- db.content_add_from_temp(cur)
- except psycopg2.IntegrityError as e:
- from . import HashCollision
- if e.diag.sqlstate == '23505' and \
- e.diag.table_name == 'content':
- constraint_to_hash_name = {
- 'content_pkey': 'sha1',
- 'content_sha1_git_idx': 'sha1_git',
- 'content_sha256_idx': 'sha256',
- }
- colliding_hash_name = constraint_to_hash_name \
- .get(e.diag.constraint_name)
- raise HashCollision(colliding_hash_name)
- else:
- raise
-
- summary['content:add'] = len(missing_content)
-
- if missing_skipped:
- missing_filtered = (
- cont for cont in content_without_data
- if _unique_key(cont) in missing_skipped
- )
-
- db.mktemp('skipped_content', cur)
- db.copy_to(missing_filtered, 'tmp_skipped_content',
- db.skipped_content_keys, cur)
-
- # move metadata in place
- db.skipped_content_add_from_temp(cur)
- summary['skipped_content:add'] = len(missing_skipped)
+
+ self._content_add_metadata(
+ db, cur, content_with_data, content_without_data)
# Wait for objstorage addition before returning from the
# transaction, bubbling up any exception
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 12:49 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216643
Attached To
D1343: Refactor content_add's code into smaller functions.
Event Timeline
Log In to Comment