diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -247,6 +247,7 @@ """ _check_duplicates(mimetypes, 'id') + mimetypes.sort(key=lambda m: m['id']) db.mktemp_content_mimetype(cur) db.copy_to(mimetypes, 'tmp_content_mimetype', ['id', 'mimetype', 'encoding', 'indexer_configuration_id'], @@ -332,6 +333,7 @@ """ _check_duplicates(languages, 'id') + languages.sort(key=lambda m: m['id']) db.mktemp_content_language(cur) # empty language is mapped to 'unknown' db.copy_to( @@ -402,6 +404,7 @@ """ _check_duplicates(ctags, 'id') + ctags.sort(key=lambda m: m['id']) def _convert_ctags(__ctags): """Convert ctags dict to list of ctags. @@ -484,6 +487,7 @@ """ _check_duplicates(licenses, 'id') + licenses.sort(key=lambda m: m['id']) db.mktemp_content_fossology_license(cur) db.copy_to( ({ @@ -582,6 +586,7 @@ """ _check_duplicates(metadata, 'id') + metadata.sort(key=lambda m: m['id']) db.mktemp_content_metadata(cur) @@ -651,6 +656,7 @@ """ _check_duplicates(metadata, 'id') + metadata.sort(key=lambda m: m['id']) db.mktemp_revision_metadata(cur) @@ -718,6 +724,7 @@ """ _check_duplicates(metadata, 'origin_id') + metadata.sort(key=lambda m: m['origin_id']) db.mktemp_origin_intrinsic_metadata(cur) diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -4,9 +4,10 @@ # See top-level LICENSE file for more information import os -import pytest +import threading import unittest +import pytest from hypothesis import given from swh.model.hashutil import hash_to_bytes @@ -230,6 +231,84 @@ # data did change as the v2 was used to overwrite v1 self.assertEqual(actual_data, expected_data_v2) + @rename + def add__update_in_place_deadlock(self): + # given + tool_id = self.tools[tool_name]['id'] + + hashes = [ + hash_to_bytes( + '34973274ccef6ab4dfaaf86599792fa9c3fe4{:03d}'.format(i)) + for i in range(1000)] + + data_v1 = [ + { + 'id': hash_, + **example_data1, + 'indexer_configuration_id': tool_id, + } + for hash_ in hashes + ] + data_v2 = [ + { + 'id': hash_, + **example_data2, + 'indexer_configuration_id': tool_id, + } + for hash_ in hashes + ] + + # Remove one item from each, so that both queries have to succeed for + # all items to be in the DB. + data_v2a = data_v2[1:] + data_v2b = list(reversed(data_v2[0:-1])) + + # given + endpoint(self, 'add')(data_v1) + + # when + actual_data = list(endpoint(self, 'get')(hashes)) + + expected_data_v1 = [ + { + 'id': hash_, + **example_data1, + 'tool': self.tools[tool_name], + } + for hash_ in hashes + ] + + # then + self.assertEqual(actual_data, expected_data_v1) + + # given + def f1(): + endpoint(self, 'add')(data_v2a, conflict_update=True) + + def f2(): + endpoint(self, 'add')(data_v2b, conflict_update=True) + + t1 = threading.Thread(target=f1) + t2 = threading.Thread(target=f2) + t2.start() + t1.start() + + t1.join() + t2.join() + + actual_data = list(endpoint(self, 'get')(hashes)) + + expected_data_v2 = [ + { + 'id': hash_, + **example_data2, + 'tool': self.tools[tool_name], + } + for hash_ in hashes + ] + + self.assertCountEqual(actual_data, expected_data_v2) + def add__duplicate_twice(self): # given tool_id = self.tools[tool_name]['id'] @@ -335,6 +414,7 @@ missing, add__drop_duplicate, add__update_in_place_duplicate, + add__update_in_place_deadlock, add__duplicate_twice, get, delete, @@ -383,6 +463,7 @@ test_content_mimetype_missing, test_content_mimetype_add__drop_duplicate, test_content_mimetype_add__update_in_place_duplicate, + test_content_mimetype_add__update_in_place_deadlock, test_content_mimetype_add__duplicate_twice, test_content_mimetype_get, _, # content_mimetype_detete, @@ -405,6 +486,7 @@ test_content_language_missing, test_content_language_add__drop_duplicate, test_content_language_add__update_in_place_duplicate, + test_content_language_add__update_in_place_deadlock, test_content_language_add__duplicate_twice, test_content_language_get, _, # test_content_language_delete, @@ -426,6 +508,7 @@ # the following tests are disabled because CTAGS behave differently _, # test_content_ctags_add__drop_duplicate, _, # test_content_ctags_add__update_in_place_duplicate, + _, # test_content_ctags_add__update_in_place_deadlock, _, # test_content_ctags_add__duplicate_twice, _, # test_content_ctags_get, _, # test_content_ctags_delete, @@ -768,6 +851,7 @@ # behave differently _, # test_content_fossology_license_add__drop_duplicate, _, # test_content_fossology_license_add__update_in_place_duplicate, + _, # test_content_fossology_license_add__update_in_place_deadlock, _, # test_content_metadata_add__duplicate_twice, _, # test_content_fossology_license_get, _, # test_content_fossology_license_delete, @@ -838,6 +922,7 @@ test_content_metadata_missing, test_content_metadata_add__drop_duplicate, test_content_metadata_add__update_in_place_duplicate, + test_content_metadata_add__update_in_place_deadlock, test_content_metadata_add__duplicate_twice, test_content_metadata_get, _, # test_content_metadata_delete, @@ -871,6 +956,7 @@ test_revision_metadata_missing, test_revision_metadata_add__drop_duplicate, test_revision_metadata_add__update_in_place_duplicate, + test_revision_metadata_add__update_in_place_deadlock, test_revision_metadata_add__duplicate_twice, test_revision_metadata_get, test_revision_metadata_delete, @@ -962,10 +1048,14 @@ 'mappings': ['mapping1'], 'from_revision': self.revision_id_2, } + metadata_origin2 = metadata_origin.copy() + metadata_origin2['origin_id'] = self.origin_id_2 # when self.storage.revision_metadata_add([metadata_rev]) - self.storage.origin_intrinsic_metadata_add([metadata_origin]) + self.storage.origin_intrinsic_metadata_add([ + metadata_origin, metadata_origin2]) + self.storage.origin_intrinsic_metadata_delete([ { 'origin_id': self.origin_id_1, @@ -975,9 +1065,10 @@ # then actual_metadata = list(self.storage.origin_intrinsic_metadata_get( - [self.origin_id_1, 42])) - - self.assertEqual(actual_metadata, []) + [self.origin_id_1, self.origin_id_2, 42])) + for item in actual_metadata: + item['indexer_configuration_id'] = item.pop('tool')['id'] + self.assertEqual(actual_metadata, [metadata_origin2]) def test_origin_intrinsic_metadata_delete_nonexisting(self): tool_id = self.tools['swh-metadata-detector']['id'] @@ -1119,6 +1210,113 @@ # metadata did change as the v2 was used to overwrite v1 self.assertEqual(actual_metadata, expected_metadata_v2) + def test_origin_intrinsic_metadata_add__update_in_place_deadlock(self): + # given + tool_id = self.tools['swh-metadata-detector']['id'] + + ids = list(range(1000)) + + example_data1 = { + 'metadata': { + 'version': None, + 'name': None, + }, + 'mappings': [], + } + example_data2 = { + 'metadata': { + 'version': 'v1.1.1', + 'name': 'foo', + }, + 'mappings': [], + } + + metadata_rev_v1 = { + 'id': self.revision_id_2, + 'translated_metadata': { + 'version': None, + 'name': None, + }, + 'mappings': [], + 'indexer_configuration_id': tool_id, + } + + data_v1 = [ + { + 'origin_id': id_, + 'from_revision': self.revision_id_2, + **example_data1, + 'indexer_configuration_id': tool_id, + } + for id_ in ids + ] + data_v2 = [ + { + 'origin_id': id_, + 'from_revision': self.revision_id_2, + **example_data2, + 'indexer_configuration_id': tool_id, + } + for id_ in ids + ] + + # Remove one item from each, so that both queries have to succeed for + # all items to be in the DB. + data_v2a = data_v2[1:] + data_v2b = list(reversed(data_v2[0:-1])) + + # given + self.storage.revision_metadata_add([metadata_rev_v1]) + self.storage.origin_intrinsic_metadata_add(data_v1) + + # when + actual_data = list(self.storage.origin_intrinsic_metadata_get(ids)) + + expected_data_v1 = [ + { + 'origin_id': id_, + 'from_revision': self.revision_id_2, + **example_data1, + 'tool': self.tools['swh-metadata-detector'], + } + for id_ in ids + ] + + # then + self.assertEqual(actual_data, expected_data_v1) + + # given + def f1(): + self.storage.origin_intrinsic_metadata_add( + data_v2a, conflict_update=True) + + def f2(): + self.storage.origin_intrinsic_metadata_add( + data_v2b, conflict_update=True) + + t1 = threading.Thread(target=f1) + t2 = threading.Thread(target=f2) + t2.start() + t1.start() + + t1.join() + t2.join() + + actual_data = list(self.storage.origin_intrinsic_metadata_get(ids)) + + expected_data_v2 = [ + { + 'origin_id': id_, + 'from_revision': self.revision_id_2, + **example_data2, + 'tool': self.tools['swh-metadata-detector'], + } + for id_ in ids + ] + + self.maxDiff = None + self.assertCountEqual(actual_data, expected_data_v2) + def test_origin_intrinsic_metadata_add__duplicate_twice(self): # given tool_id = self.tools['swh-metadata-detector']['id']