diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -92,6 +92,82 @@ return True + def _content_unique_key(self, hash): + """Given a hash (tuple or dict), return a unique key from the + aggregation of keys. + + """ + keys = self.get_db().content_hash_keys + if isinstance(hash, tuple): + return hash + return tuple([hash[k] for k in keys]) + + def _filter_new_content(self, content): + content_by_status = defaultdict(list) + for d in content: + if 'status' not in d: + d['status'] = 'visible' + if 'length' not in d: + d['length'] = -1 + content_by_status[d['status']].append(d) + + content_with_data = content_by_status['visible'] + content_without_data = content_by_status['absent'] + + missing_content = set(self.content_missing(content_with_data)) + missing_skipped = set(self._content_unique_key(hashes) for hashes + in self.skipped_content_missing( + content_without_data)) + + content_with_data = [ + cont for cont in content_with_data + if cont['sha1'] in missing_content] + content_without_data = [ + cont for cont in content_without_data + if self._content_unique_key(cont) in missing_skipped] + + summary = { + 'content:add': len(missing_content), + 'skipped_content:add': len(missing_skipped), + } + + return (content_with_data, content_without_data, summary) + + def _content_add_metadata(self, db, cur, + content_with_data, content_without_data): + if content_with_data: + # create temporary table for metadata injection + db.mktemp('content', cur) + + db.copy_to(content_with_data, 'tmp_content', + db.content_get_metadata_keys, cur) + + # move metadata in place + try: + db.content_add_from_temp(cur) + except psycopg2.IntegrityError as e: + from . import HashCollision + if e.diag.sqlstate == '23505' and \ + e.diag.table_name == 'content': + constraint_to_hash_name = { + 'content_pkey': 'sha1', + 'content_sha1_git_idx': 'sha1_git', + 'content_sha256_idx': 'sha256', + } + colliding_hash_name = constraint_to_hash_name \ + .get(e.diag.constraint_name) + raise HashCollision(colliding_hash_name) + else: + raise + + if content_without_data: + db.mktemp('skipped_content', cur) + db.copy_to(content_without_data, 'tmp_skipped_content', + db.skipped_content_keys, cur) + + # move metadata in place + db.skipped_content_add_from_temp(cur) + def content_add(self, content): """Add content blobs to the storage @@ -129,13 +205,7 @@ content:add: New contents added content:bytes:add: Sum of the contents' length data skipped_content:add: New skipped contents (no data) added - """ - summary = { - 'content:add': 0, - 'content:bytes:add': 0, - 'skipped_content:add': 0, - } if self.journal_writer: for item in content: @@ -146,50 +216,23 @@ db = self.get_db() - def _unique_key(hash, keys=db.content_hash_keys): - """Given a hash (tuple or dict), return a unique key from the - aggregation of keys. - - """ - if isinstance(hash, tuple): - return hash - return tuple([hash[k] for k in keys]) - - content_by_status = defaultdict(list) - for d in content: - if 'status' not in d: - d['status'] = 'visible' - length = d.get('length') - if length is None: - d['length'] = -1 - content_by_status[d['status']].append(d) - - content_with_data = content_by_status['visible'] - content_without_data = content_by_status['absent'] - - missing_content = set(self.content_missing(content_with_data)) - missing_skipped = set(_unique_key(hashes) for hashes - in self.skipped_content_missing( - content_without_data)) + (content_with_data, content_without_data, summary) = \ + self._filter_new_content(content) def add_to_objstorage(): """Add to objstorage the new missing_content Returns: Sum of all the content's data length pushed to the - objstorage. No filtering is done on contents here, so - we might send over multiple times the same content and - count as many times the contents' raw length bytes. + objstorage. Content present twice is only sent once. """ content_bytes_added = 0 data = {} for cont in content_with_data: - sha1 = cont['sha1'] - seen = data.get(sha1) - if sha1 in missing_content and not seen: - data[sha1] = cont['data'] - content_bytes_added += cont['length'] + if cont['sha1'] not in data: + data[cont['sha1']] = cont['data'] + content_bytes_added += max(0, cont['length']) # FIXME: Since we do the filtering anyway now, we might as # well make the objstorage's add_batch call return what we @@ -200,49 +243,9 @@ with db.transaction() as cur: with ThreadPoolExecutor(max_workers=1) as executor: added_to_objstorage = executor.submit(add_to_objstorage) - if missing_content: - # create temporary table for metadata injection - db.mktemp('content', cur) - - content_filtered = (cont for cont in content_with_data - if cont['sha1'] in missing_content) - - db.copy_to(content_filtered, 'tmp_content', - db.content_get_metadata_keys, cur) - - # move metadata in place - try: - db.content_add_from_temp(cur) - except psycopg2.IntegrityError as e: - from . import HashCollision - if e.diag.sqlstate == '23505' and \ - e.diag.table_name == 'content': - constraint_to_hash_name = { - 'content_pkey': 'sha1', - 'content_sha1_git_idx': 'sha1_git', - 'content_sha256_idx': 'sha256', - } - colliding_hash_name = constraint_to_hash_name \ - .get(e.diag.constraint_name) - raise HashCollision(colliding_hash_name) - else: - raise - - summary['content:add'] = len(missing_content) - - if missing_skipped: - missing_filtered = ( - cont for cont in content_without_data - if _unique_key(cont) in missing_skipped - ) - - db.mktemp('skipped_content', cur) - db.copy_to(missing_filtered, 'tmp_skipped_content', - db.skipped_content_keys, cur) - - # move metadata in place - db.skipped_content_add_from_temp(cur) - summary['skipped_content:add'] = len(missing_skipped) + + self._content_add_metadata( + db, cur, content_with_data, content_without_data) # Wait for objstorage addition before returning from the # transaction, bubbling up any exception