diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -20,6 +20,9 @@ def content_add(self, content): return self.post('content/add', {'content': content}) + def content_add_metadata(self, content): + return self.post('content/add_metadata', {'content': content}) + def content_update(self, content, keys=[]): return self.post('content/update', {'content': content, 'keys': keys}) diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -163,6 +163,14 @@ return get_storage().content_add(**decode_request(request)) +@app.route('/content/add_metadata', methods=['POST']) +@timed +@encode +@process_metrics +def content_add_metadata(): + return get_storage().content_add_metadata(**decode_request(request)) + + @app.route('/content/update', methods=['POST']) @timed def content_update(): diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -59,35 +59,7 @@ """Check that the storage is configured and ready to go.""" return True - def content_add(self, contents): - """Add content blobs to the storage - - Args: - content (iterable): iterable of dictionaries representing - individual pieces of content to add. Each dictionary has the - following keys: - - - data (bytes): the actual content - - length (int): content length (default: -1) - - one key for each checksum algorithm in - :data:`swh.model.hashutil.DEFAULT_ALGORITHMS`, mapped to the - corresponding checksum - - status (str): one of visible, hidden, absent - - reason (str): if status = absent, the reason why - - origin (int): if status = absent, the origin we saw the - content in - - Raises: - HashCollision in case of collision - - Returns: - Summary dict with the following key and associated values: - - content:add: New contents added - content_bytes:add: Sum of the contents' length data - skipped_content:add: New skipped contents (no data) added - - """ + def _content_add(self, contents, with_data): if self.journal_writer: for content in contents: if 'data' in content: @@ -117,16 +89,82 @@ count_contents += 1 if self._contents[key]['status'] == 'visible': count_content_added += 1 - content_data = self._contents[key].pop('data') - count_content_bytes_added += len(content_data) - self.objstorage.add(content_data, content['sha1']) + if with_data: + content_data = self._contents[key].pop('data') + count_content_bytes_added += len(content_data) + self.objstorage.add(content_data, content['sha1']) - return { + summary = { 'content:add': count_content_added, - 'content:bytes:add': count_content_bytes_added, 'skipped_content:add': count_contents - count_content_added, } + if with_data: + summary['content:bytes:add'] = count_content_bytes_added + + return summary + + def content_add(self, contents): + """Add content blobs to the storage + + Args: + content (iterable): iterable of dictionaries representing + individual pieces of content to add. Each dictionary has the + following keys: + + - data (bytes): the actual content + - length (int): content length (default: -1) + - one key for each checksum algorithm in + :data:`swh.model.hashutil.DEFAULT_ALGORITHMS`, mapped to the + corresponding checksum + - status (str): one of visible, hidden, absent + - reason (str): if status = absent, the reason why + - origin (int): if status = absent, the origin we saw the + content in + + Raises: + HashCollision in case of collision + + Returns: + Summary dict with the following key and associated values: + + content:add: New contents added + content_bytes:add: Sum of the contents' length data + skipped_content:add: New skipped contents (no data) added + + """ + return self._content_add(contents, with_data=True) + + def content_add_metadata(self, contents): + """Add content metadata to the storage (like `content_add`, but + without inserting to the objstorage). + + Args: + content (iterable): iterable of dictionaries representing + individual pieces of content to add. Each dictionary has the + following keys: + + - length (int): content length (default: -1) + - one key for each checksum algorithm in + :data:`swh.model.hashutil.DEFAULT_ALGORITHMS`, mapped to the + corresponding checksum + - status (str): one of visible, hidden, absent + - reason (str): if status = absent, the reason why + - origin (int): if status = absent, the origin we saw the + content in + + Raises: + HashCollision in case of collision + + Returns: + Summary dict with the following key and associated values: + + content:add: New contents added + skipped_content:add: New skipped contents (no data) added + + """ + return self._content_add(contents, with_data=False) + def content_get(self, ids): """Retrieve in bulk contents and their data. diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -93,6 +93,82 @@ return True + def _content_unique_key(self, hash): + """Given a hash (tuple or dict), return a unique key from the + aggregation of keys. + + """ + keys = self.get_db().content_hash_keys + if isinstance(hash, tuple): + return hash + return tuple([hash[k] for k in keys]) + + def _filter_new_content(self, content): + content_by_status = defaultdict(list) + for d in content: + if 'status' not in d: + d['status'] = 'visible' + if 'length' not in d: + d['length'] = -1 + content_by_status[d['status']].append(d) + + content_with_data = content_by_status['visible'] + content_without_data = content_by_status['absent'] + + missing_content = set(self.content_missing(content_with_data)) + missing_skipped = set(self._content_unique_key(hashes) for hashes + in self.skipped_content_missing( + content_without_data)) + + content_with_data = [ + cont for cont in content_with_data + if cont['sha1'] in missing_content] + content_without_data = [ + cont for cont in content_without_data + if self._content_unique_key(cont) in missing_skipped] + + summary = { + 'content:add': len(missing_content), + 'skipped_content:add': len(missing_skipped), + } + + return (content_with_data, content_without_data, summary) + + def _content_add_metadata(self, db, cur, + content_with_data, content_without_data): + if content_with_data: + # create temporary table for metadata injection + db.mktemp('content', cur) + + db.copy_to(content_with_data, 'tmp_content', + db.content_get_metadata_keys, cur) + + # move metadata in place + try: + db.content_add_from_temp(cur) + except psycopg2.IntegrityError as e: + from . import HashCollision + if e.diag.sqlstate == '23505' and \ + e.diag.table_name == 'content': + constraint_to_hash_name = { + 'content_pkey': 'sha1', + 'content_sha1_git_idx': 'sha1_git', + 'content_sha256_idx': 'sha256', + } + colliding_hash_name = constraint_to_hash_name \ + .get(e.diag.constraint_name) + raise HashCollision(colliding_hash_name) from None + else: + raise + + if content_without_data: + db.mktemp('skipped_content', cur) + db.copy_to(content_without_data, 'tmp_skipped_content', + db.skipped_content_keys, cur) + + # move metadata in place + db.skipped_content_add_from_temp(cur) + def content_add(self, content): """Add content blobs to the storage @@ -130,13 +206,7 @@ content:add: New contents added content:bytes:add: Sum of the contents' length data skipped_content:add: New skipped contents (no data) added - """ - summary = { - 'content:add': 0, - 'content:bytes:add': 0, - 'skipped_content:add': 0, - } if self.journal_writer: for item in content: @@ -147,50 +217,23 @@ db = self.get_db() - def _unique_key(hash, keys=db.content_hash_keys): - """Given a hash (tuple or dict), return a unique key from the - aggregation of keys. - - """ - if isinstance(hash, tuple): - return hash - return tuple([hash[k] for k in keys]) - - content_by_status = defaultdict(list) - for d in content: - if 'status' not in d: - d['status'] = 'visible' - length = d.get('length') - if length is None: - d['length'] = -1 - content_by_status[d['status']].append(d) - - content_with_data = content_by_status['visible'] - content_without_data = content_by_status['absent'] - - missing_content = set(self.content_missing(content_with_data)) - missing_skipped = set(_unique_key(hashes) for hashes - in self.skipped_content_missing( - content_without_data)) + (content_with_data, content_without_data, summary) = \ + self._filter_new_content(content) def add_to_objstorage(): """Add to objstorage the new missing_content Returns: Sum of all the content's data length pushed to the - objstorage. No filtering is done on contents here, so - we might send over multiple times the same content and - count as many times the contents' raw length bytes. + objstorage. Content present twice is only sent once. """ content_bytes_added = 0 data = {} for cont in content_with_data: - sha1 = cont['sha1'] - seen = data.get(sha1) - if sha1 in missing_content and not seen: - data[sha1] = cont['data'] - content_bytes_added += cont['length'] + if cont['sha1'] not in data: + data[cont['sha1']] = cont['data'] + content_bytes_added += max(0, cont['length']) # FIXME: Since we do the filtering anyway now, we might as # well make the objstorage's add_batch call return what we @@ -201,49 +244,9 @@ with db.transaction() as cur: with ThreadPoolExecutor(max_workers=1) as executor: added_to_objstorage = executor.submit(add_to_objstorage) - if missing_content: - # create temporary table for metadata injection - db.mktemp('content', cur) - - content_filtered = (cont for cont in content_with_data - if cont['sha1'] in missing_content) - - db.copy_to(content_filtered, 'tmp_content', - db.content_get_metadata_keys, cur) - - # move metadata in place - try: - db.content_add_from_temp(cur) - except psycopg2.IntegrityError as e: - from . import HashCollision - if e.diag.sqlstate == '23505' and \ - e.diag.table_name == 'content': - constraint_to_hash_name = { - 'content_pkey': 'sha1', - 'content_sha1_git_idx': 'sha1_git', - 'content_sha256_idx': 'sha256', - } - colliding_hash_name = constraint_to_hash_name \ - .get(e.diag.constraint_name) - raise HashCollision(colliding_hash_name) - else: - raise - - summary['content:add'] = len(missing_content) - - if missing_skipped: - missing_filtered = ( - cont for cont in content_without_data - if _unique_key(cont) in missing_skipped - ) - - db.mktemp('skipped_content', cur) - db.copy_to(missing_filtered, 'tmp_skipped_content', - db.skipped_content_keys, cur) - - # move metadata in place - db.skipped_content_add_from_temp(cur) - summary['skipped_content:add'] = len(missing_skipped) + + self._content_add_metadata( + db, cur, content_with_data, content_without_data) # Wait for objstorage addition before returning from the # transaction, bubbling up any exception @@ -286,6 +289,46 @@ db.content_update_from_temp(keys_to_update=keys, cur=cur) + def content_add_metadata(self, content): + """Add content metadata to the storage (like `content_add`, but + without inserting to the objstorage). + + Args: + contents (iterable): iterable of dictionaries representing + individual pieces of content to add. Each dictionary has the + following keys: + + - length (int): content length (default: -1) + - one key for each checksum algorithm in + :data:`swh.model.hashutil.ALGORITHMS`, mapped to the + corresponding checksum + - status (str): one of visible, hidden, absent + - reason (str): if status = absent, the reason why + - origin (int): if status = absent, the origin we saw the + content in + + Returns: + Summary dict with the following key and associated values: + + content:add: New contents added + skipped_content:add: New skipped contents (no data) added + """ + if self.journal_writer: + for item in content: + assert 'data' not in content + self.journal_writer.write_addition('content', item) + + db = self.get_db() + + (content_with_data, content_without_data, summary) = \ + self._filter_new_content(content) + + with db.transaction() as cur: + self._content_add_metadata( + db, cur, content_with_data, content_without_data) + + return summary + def content_get(self, content): """Retrieve in bulk contents and their data. diff --git a/swh/storage/tests/test_in_memory.py b/swh/storage/tests/test_in_memory.py --- a/swh/storage/tests/test_in_memory.py +++ b/swh/storage/tests/test_in_memory.py @@ -32,6 +32,10 @@ def test_skipped_content_add(self): pass + @pytest.mark.skip('postgresql-specific test') + def test_content_add_metadata_db(self): + pass + @pytest.mark.db @pytest.mark.property_based diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -632,6 +632,86 @@ self.assertIn(cm.exception.args[0], ['sha1', 'sha1_git', 'blake2s256']) + def test_content_add_metadata(self): + cont = self.cont.copy() + del cont['data'] + + actual_result = self.storage.content_add_metadata([cont]) + self.assertEqual(actual_result, { + 'content:add': 1, + 'skipped_content:add': 0 + }) + + self.assertEqual( + list(self.storage.content_get_metadata([cont['sha1']])), + [cont]) + + self.assertEqual(list(self.journal_writer.objects), + [('content', cont)]) + + def test_content_add_metadata_same_input(self): + cont = self.cont.copy() + del cont['data'] + + actual_result = self.storage.content_add_metadata([cont, cont]) + self.assertEqual(actual_result, { + 'content:add': 1, + 'skipped_content:add': 0 + }) + + def test_content_add_metadata_different_input(self): + cont = self.cont.copy() + del cont['data'] + cont2 = self.cont2.copy() + del cont2['data'] + + actual_result = self.storage.content_add_metadata([cont, cont2]) + self.assertEqual(actual_result, { + 'content:add': 2, + 'skipped_content:add': 0 + }) + + def test_content_add_metadata_db(self): + cont = self.cont.copy() + del cont['data'] + + actual_result = self.storage.content_add_metadata([cont]) + + self.assertEqual(actual_result, { + 'content:add': 1, + 'skipped_content:add': 0 + }) + + if hasattr(self.storage, 'objstorage'): + self.assertNotIn(cont['sha1'], self.storage.objstorage) + self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' + ' FROM content WHERE sha1 = %s', + (cont['sha1'],)) + datum = self.cursor.fetchone() + self.assertEqual( + (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), + datum[3], datum[4]), + (cont['sha1'], cont['sha1_git'], cont['sha256'], + cont['length'], 'visible')) + + self.assertEqual(list(self.journal_writer.objects), + [('content', cont)]) + + def test_content_add_metadata_collision(self): + cont1 = self.cont.copy() + del cont1['data'] + + # create (corrupted) content with same sha1{,_git} but != sha256 + cont1b = cont1.copy() + sha256_array = bytearray(cont1b['sha256']) + sha256_array[0] += 1 + cont1b['sha256'] = bytes(sha256_array) + + with self.assertRaises(HashCollision) as cm: + self.storage.content_add_metadata([cont1, cont1b]) + + self.assertIn(cm.exception.args[0], ['sha1', 'sha1_git', 'blake2s256']) + def test_skipped_content_add(self): cont = self.skipped_cont.copy() cont2 = self.skipped_cont2.copy()