Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/tests/test_objstorage_pathslicing.py
Show All 9 Lines | |||||
from swh.core import hashutil | from swh.core import hashutil | ||||
from swh.objstorage import exc | from swh.objstorage import exc | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from objstorage_testing import ObjStorageTestFixture | from objstorage_testing import ObjStorageTestFixture | ||||
class TestpathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): | class TestPathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.slicing = '0:2/2:4/4:6' | self.slicing = '0:2/2:4/4:6' | ||||
self.tmpdir = tempfile.mkdtemp() | self.tmpdir = tempfile.mkdtemp() | ||||
self.storage = get_objstorage( | self.storage = get_objstorage( | ||||
'pathslicing', | 'pathslicing', | ||||
{'root': self.tmpdir, 'slicing': self.slicing} | {'root': self.tmpdir, 'slicing': self.slicing} | ||||
) | ) | ||||
def content_path(self, obj_id): | def content_path(self, obj_id): | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
return self.storage._obj_path(hex_obj_id) | return self.storage._obj_path(hex_obj_id) | ||||
@istest | @istest | ||||
def contains(self): | |||||
content_p, obj_id_p = self.hash_content(b'contains_present') | |||||
content_m, obj_id_m = self.hash_content(b'contains_missing') | |||||
self.storage.add(content_p, obj_id=obj_id_p) | |||||
self.assertIn(obj_id_p, self.storage) | |||||
self.assertNotIn(obj_id_m, self.storage) | |||||
@istest | |||||
def iter(self): | def iter(self): | ||||
content, obj_id = self.hash_content(b'iter') | content, obj_id = self.hash_content(b'iter') | ||||
self.assertEqual(list(iter(self.storage)), []) | self.assertEqual(list(iter(self.storage)), []) | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
self.assertEqual(list(iter(self.storage)), [obj_id]) | self.assertEqual(list(iter(self.storage)), [obj_id]) | ||||
@istest | @istest | ||||
def len(self): | def len(self): | ||||
content, obj_id = self.hash_content(b'check_not_gzip') | content, obj_id = self.hash_content(b'len') | ||||
self.assertEqual(len(self.storage), 0) | self.assertEqual(len(self.storage), 0) | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
self.assertEqual(len(self.storage), 1) | self.assertEqual(len(self.storage), 1) | ||||
@istest | @istest | ||||
def check_not_gzip(self): | def check_not_gzip(self): | ||||
content, obj_id = self.hash_content(b'check_not_gzip') | content, obj_id = self.hash_content(b'check_not_gzip') | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
Show All 21 Lines |