Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show All 28 Lines | |||||
from .generate_data_test import gen_contents | from .generate_data_test import gen_contents | ||||
@pytest.mark.db | @pytest.mark.db | ||||
class StorageTestDbFixture(StorageTestFixture): | class StorageTestDbFixture(StorageTestFixture): | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
db = self.test_db[self.TEST_DB_NAME] | |||||
self.conn = db.conn | |||||
self.cursor = db.cursor | |||||
self.maxDiff = None | self.maxDiff = None | ||||
def tearDown(self): | def tearDown(self): | ||||
self.reset_storage() | self.reset_storage() | ||||
if hasattr(self.storage, '_pool') and self.storage._pool: | if hasattr(self.storage, '_pool') and self.storage._pool: | ||||
self.storage._pool.closeall() | self.storage._pool.closeall() | ||||
super().tearDown() | super().tearDown() | ||||
▲ Show 20 Lines • Show All 656 Lines • ▼ Show 20 Lines | def test_content_add_db(self): | ||||
self.assertEqual(actual_result, { | self.assertEqual(actual_result, { | ||||
'content:add': 1, | 'content:add': 1, | ||||
'content:add:bytes': cont['length'], | 'content:add:bytes': cont['length'], | ||||
'skipped_content:add': 0 | 'skipped_content:add': 0 | ||||
}) | }) | ||||
if hasattr(self.storage, 'objstorage'): | if hasattr(self.storage, 'objstorage'): | ||||
self.assertIn(cont['sha1'], self.storage.objstorage) | self.assertIn(cont['sha1'], self.storage.objstorage) | ||||
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' | |||||
with self.db_transaction() as (_, cur): | |||||
cur.execute('SELECT sha1, sha1_git, sha256, length, status' | |||||
' FROM content WHERE sha1 = %s', | ' FROM content WHERE sha1 = %s', | ||||
(cont['sha1'],)) | (cont['sha1'],)) | ||||
datum = self.cursor.fetchone() | datum = cur.fetchone() | ||||
self.assertEqual( | self.assertEqual( | ||||
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), | datum, | ||||
datum[3], datum[4]), | |||||
(cont['sha1'], cont['sha1_git'], cont['sha256'], | (cont['sha1'], cont['sha1_git'], cont['sha256'], | ||||
cont['length'], 'visible')) | cont['length'], 'visible')) | ||||
expected_cont = cont.copy() | expected_cont = cont.copy() | ||||
del expected_cont['data'] | del expected_cont['data'] | ||||
journal_objects = list(self.journal_writer.objects) | journal_objects = list(self.journal_writer.objects) | ||||
for (obj_type, obj) in journal_objects: | for (obj_type, obj) in journal_objects: | ||||
del obj['ctime'] | del obj['ctime'] | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | def test_content_add_metadata_db(self): | ||||
self.assertEqual(actual_result, { | self.assertEqual(actual_result, { | ||||
'content:add': 1, | 'content:add': 1, | ||||
'skipped_content:add': 0 | 'skipped_content:add': 0 | ||||
}) | }) | ||||
if hasattr(self.storage, 'objstorage'): | if hasattr(self.storage, 'objstorage'): | ||||
self.assertNotIn(cont['sha1'], self.storage.objstorage) | self.assertNotIn(cont['sha1'], self.storage.objstorage) | ||||
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' | |||||
with self.db_transaction() as (_, cur): | |||||
cur.execute('SELECT sha1, sha1_git, sha256, length, status' | |||||
' FROM content WHERE sha1 = %s', | ' FROM content WHERE sha1 = %s', | ||||
(cont['sha1'],)) | (cont['sha1'],)) | ||||
datum = self.cursor.fetchone() | datum = cur.fetchone() | ||||
self.assertEqual( | self.assertEqual( | ||||
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), | datum, | ||||
datum[3], datum[4]), | |||||
(cont['sha1'], cont['sha1_git'], cont['sha256'], | (cont['sha1'], cont['sha1_git'], cont['sha256'], | ||||
cont['length'], 'visible')) | cont['length'], 'visible')) | ||||
self.assertEqual(list(self.journal_writer.objects), | self.assertEqual(list(self.journal_writer.objects), | ||||
[('content', cont)]) | [('content', cont)]) | ||||
def test_content_add_metadata_collision(self): | def test_content_add_metadata_collision(self): | ||||
cont1 = self.cont.copy() | cont1 = self.cont.copy() | ||||
Show All 19 Lines | def test_skipped_content_add_db(self): | ||||
actual_result = self.storage.content_add([cont, cont, cont2]) | actual_result = self.storage.content_add([cont, cont, cont2]) | ||||
self.assertEqual(actual_result, { | self.assertEqual(actual_result, { | ||||
'content:add': 0, | 'content:add': 0, | ||||
'content:add:bytes': 0, | 'content:add:bytes': 0, | ||||
'skipped_content:add': 2, | 'skipped_content:add': 2, | ||||
}) | }) | ||||
self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, ' | with self.db_transaction() as (_, cur): | ||||
cur.execute('SELECT sha1, sha1_git, sha256, blake2s256, ' | |||||
'length, status, reason ' | 'length, status, reason ' | ||||
'FROM skipped_content ORDER BY sha1_git') | 'FROM skipped_content ORDER BY sha1_git') | ||||
datums = self.cursor.fetchall() | data = cur.fetchall() | ||||
self.assertEqual(2, len(datums)) | self.assertEqual(2, len(data)) | ||||
datum = datums[0] | |||||
self.assertEqual( | self.assertEqual( | ||||
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), | data[0], | ||||
datum[3].tobytes(), datum[4], datum[5], datum[6]), | |||||
(cont['sha1'], cont['sha1_git'], cont['sha256'], | (cont['sha1'], cont['sha1_git'], cont['sha256'], | ||||
cont['blake2s256'], cont['length'], 'absent', | cont['blake2s256'], cont['length'], 'absent', | ||||
'Content too long') | 'Content too long') | ||||
) | ) | ||||
datum2 = datums[1] | |||||
self.assertEqual( | self.assertEqual( | ||||
(datum2[0].tobytes(), datum2[1].tobytes(), datum2[2].tobytes(), | data[1], | ||||
datum2[3], datum2[4], datum2[5], datum2[6]), | |||||
(cont2['sha1'], cont2['sha1_git'], cont2['sha256'], | (cont2['sha1'], cont2['sha1_git'], cont2['sha256'], | ||||
cont2['blake2s256'], cont2['length'], 'absent', | cont2['blake2s256'], cont2['length'], 'absent', | ||||
'Content too long') | 'Content too long') | ||||
) | ) | ||||
def test_skipped_content_add(self): | def test_skipped_content_add(self): | ||||
cont = self.skipped_cont.copy() | cont = self.skipped_cont.copy() | ||||
cont2 = self.skipped_cont2.copy() | cont2 = self.skipped_cont2.copy() | ||||
▲ Show 20 Lines • Show All 3,176 Lines • ▼ Show 20 Lines | |||||
class TestStorageRaceConditions(TestStorageData, StorageTestDbFixture, | class TestStorageRaceConditions(TestStorageData, StorageTestDbFixture, | ||||
unittest.TestCase): | unittest.TestCase): | ||||
@pytest.mark.xfail | @pytest.mark.xfail | ||||
def test_content_add_race(self): | def test_content_add_race(self): | ||||
results = queue.Queue() | results = queue.Queue() | ||||
def thread(): | def thread(): | ||||
db = None | |||||
try: | try: | ||||
db = self.storage.get_db() | with self.db_transaction() as (db, cur): | ||||
with db.transaction() as cur: | |||||
ret = self.storage.content_add([self.cont], db=db, | ret = self.storage.content_add([self.cont], db=db, | ||||
cur=cur) | cur=cur) | ||||
results.put((threading.get_ident(), 'data', ret)) | results.put((threading.get_ident(), 'data', ret)) | ||||
except Exception as e: | except Exception as e: | ||||
results.put((threading.get_ident(), 'exc', e)) | results.put((threading.get_ident(), 'exc', e)) | ||||
finally: | |||||
if db: | |||||
self.storage.put_db(db) | |||||
t1 = threading.Thread(target=thread) | t1 = threading.Thread(target=thread) | ||||
t2 = threading.Thread(target=thread) | t2 = threading.Thread(target=thread) | ||||
t1.start() | t1.start() | ||||
# this avoids the race condition | # this avoids the race condition | ||||
# import time | # import time | ||||
# time.sleep(1) | # time.sleep(1) | ||||
t2.start() | t2.start() | ||||
Show All 32 Lines | def test_content_update(self): | ||||
self.storage.content_add([cont]) | self.storage.content_add([cont]) | ||||
# alter the sha1_git for example | # alter the sha1_git for example | ||||
cont['sha1_git'] = hash_to_bytes( | cont['sha1_git'] = hash_to_bytes( | ||||
'3a60a5275d0333bf13468e8b3dcab90f4046e654') | '3a60a5275d0333bf13468e8b3dcab90f4046e654') | ||||
self.storage.content_update([cont], keys=['sha1_git']) | self.storage.content_update([cont], keys=['sha1_git']) | ||||
with self.storage.get_db().transaction() as cur: | with self.db_transaction() as (_, cur): | ||||
cur.execute('SELECT sha1, sha1_git, sha256, length, status' | cur.execute('SELECT sha1, sha1_git, sha256, length, status' | ||||
' FROM content WHERE sha1 = %s', | ' FROM content WHERE sha1 = %s', | ||||
(cont['sha1'],)) | (cont['sha1'],)) | ||||
datum = cur.fetchone() | datum = cur.fetchone() | ||||
self.assertEqual( | self.assertEqual( | ||||
(datum[0], datum[1], datum[2], | (datum[0], datum[1], datum[2], | ||||
datum[3], datum[4]), | datum[3], datum[4]), | ||||
(cont['sha1'], cont['sha1_git'], cont['sha256'], | (cont['sha1'], cont['sha1_git'], cont['sha256'], | ||||
cont['length'], 'visible')) | cont['length'], 'visible')) | ||||
def test_content_update_with_new_cols(self): | def test_content_update_with_new_cols(self): | ||||
self.storage.journal_writer = None # TODO, not supported | self.storage.journal_writer = None # TODO, not supported | ||||
with self.storage.get_db().transaction() as cur: | with self.db_transaction() as (db, cur): | ||||
cur.execute("""alter table content | cur.execute("""alter table content | ||||
add column test text default null, | add column test text default null, | ||||
add column test2 text default null""") | add column test2 text default null""") | ||||
cont = copy.deepcopy(self.cont2) | cont = copy.deepcopy(self.cont2) | ||||
self.storage.content_add([cont]) | self.storage.content_add([cont]) | ||||
cont['test'] = 'value-1' | cont['test'] = 'value-1' | ||||
cont['test2'] = 'value-2' | cont['test2'] = 'value-2' | ||||
self.storage.content_update([cont], keys=['test', 'test2']) | self.storage.content_update([cont], keys=['test', 'test2']) | ||||
with self.storage.get_db().transaction() as cur: | with self.db_transaction() as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
'SELECT sha1, sha1_git, sha256, length, status, test, test2' | '''SELECT sha1, sha1_git, sha256, length, status, | ||||
' FROM content WHERE sha1 = %s', | test, test2 | ||||
FROM content WHERE sha1 = %s''', | |||||
(cont['sha1'],)) | (cont['sha1'],)) | ||||
datum = cur.fetchone() | datum = cur.fetchone() | ||||
self.assertEqual( | self.assertEqual( | ||||
(datum[0], datum[1], datum[2], | (datum[0], datum[1], datum[2], | ||||
datum[3], datum[4], datum[5], datum[6]), | datum[3], datum[4], datum[5], datum[6]), | ||||
(cont['sha1'], cont['sha1_git'], cont['sha256'], | (cont['sha1'], cont['sha1_git'], cont['sha256'], | ||||
cont['length'], 'visible', cont['test'], cont['test2'])) | cont['length'], 'visible', cont['test'], cont['test2'])) | ||||
with self.storage.get_db().transaction() as cur: | with self.db_transaction() as (_, cur): | ||||
cur.execute("""alter table content drop column test, | cur.execute("""alter table content drop column test, | ||||
drop column test2""") | drop column test2""") |