Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/tests/objstorage_testing.py
Show First 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | def check_present(self): | ||||
content, obj_id = self.hash_content(b'check_missing') | content, obj_id = self.hash_content(b'check_missing') | ||||
self.storage.add(content) | self.storage.add(content) | ||||
try: | try: | ||||
self.storage.check(obj_id) | self.storage.check(obj_id) | ||||
except: | except: | ||||
self.fail('Integrity check failed') | self.fail('Integrity check failed') | ||||
@istest | @istest | ||||
def delete_missing(self): | |||||
self.storage.allow_delete = True | |||||
content, obj_id = self.hash_content(b'missing_content_to_delete') | |||||
with self.assertRaises(exc.Error): | |||||
self.storage.delete(obj_id) | |||||
@istest | |||||
def delete_present(self): | |||||
self.storage.allow_delete = True | |||||
content, obj_id = self.hash_content(b'content_to_delete') | |||||
self.storage.add(content, obj_id=obj_id) | |||||
self.assertTrue(self.storage.delete(obj_id)) | |||||
with self.assertRaises(exc.Error): | |||||
self.storage.get(obj_id) | |||||
@istest | |||||
def delete_not_allowed(self): | |||||
self.storage.allow_delete = False | |||||
content, obj_id = self.hash_content(b'content_to_delete') | |||||
self.storage.add(content, obj_id=obj_id) | |||||
with self.assertRaises(PermissionError): | |||||
self.assertTrue(self.storage.delete(obj_id)) | |||||
@istest | |||||
def delete_not_allowed_by_default(self): | |||||
content, obj_id = self.hash_content(b'content_to_delete') | |||||
self.storage.add(content, obj_id=obj_id) | |||||
with self.assertRaises(PermissionError): | |||||
self.assertTrue(self.storage.delete(obj_id)) | |||||
@istest | |||||
def add_stream(self): | def add_stream(self): | ||||
content = [b'chunk1', b'chunk2'] | content = [b'chunk1', b'chunk2'] | ||||
_, obj_id = self.hash_content(b''.join(content)) | _, obj_id = self.hash_content(b''.join(content)) | ||||
try: | try: | ||||
self.storage.add_stream(iter(content), obj_id=obj_id) | self.storage.add_stream(iter(content), obj_id=obj_id) | ||||
except NotImplementedError: | except NotImplementedError: | ||||
return | return | ||||
self.assertContentMatch(obj_id, b''.join(content)) | self.assertContentMatch(obj_id, b''.join(content)) | ||||
Show All 25 Lines |