Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/tests/objstorage_testing.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections.abc import Iterator | |||||
import inspect | import inspect | ||||
import time | |||||
from swh.objstorage import exc | from swh.objstorage import exc | ||||
from swh.objstorage.interface import ObjStorageInterface | from swh.objstorage.interface import ObjStorageInterface | ||||
from swh.objstorage.objstorage import compute_hash | from swh.objstorage.objstorage import compute_hash | ||||
class ObjStorageTestFixture: | class ObjStorageTestFixture: | ||||
def test_types(self): | def test_types(self): | ||||
▲ Show 20 Lines • Show All 142 Lines • ▼ Show 20 Lines | def test_delete_not_allowed(self): | ||||
self.storage.delete(obj_id) | self.storage.delete(obj_id) | ||||
def test_delete_not_allowed_by_default(self): | def test_delete_not_allowed_by_default(self): | ||||
content, obj_id = self.hash_content(b"content_to_delete") | content, obj_id = self.hash_content(b"content_to_delete") | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
with self.assertRaises(PermissionError): | with self.assertRaises(PermissionError): | ||||
self.assertTrue(self.storage.delete(obj_id)) | self.assertTrue(self.storage.delete(obj_id)) | ||||
def test_add_stream(self): | |||||
content = [b"chunk1", b"chunk2"] | |||||
_, obj_id = self.hash_content(b"".join(content)) | |||||
try: | |||||
self.storage.add_stream(iter(content), obj_id=obj_id) | |||||
except NotImplementedError: | |||||
return | |||||
self.assertContentMatch(obj_id, b"".join(content)) | |||||
def test_add_stream_sleep(self): | |||||
def gen_content(): | |||||
yield b"chunk1" | |||||
time.sleep(0.5) | |||||
yield b"chunk42" | |||||
_, obj_id = self.hash_content(b"placeholder_id") | |||||
try: | |||||
self.storage.add_stream(gen_content(), obj_id=obj_id) | |||||
except NotImplementedError: | |||||
return | |||||
self.assertContentMatch(obj_id, b"chunk1chunk42") | |||||
def test_get_stream(self): | |||||
content = b"123456789" | |||||
_, obj_id = self.hash_content(content) | |||||
self.storage.add(content, obj_id=obj_id) | |||||
r = self.storage.get(obj_id) | |||||
self.assertEqual(r, content) | |||||
try: | |||||
r = self.storage.get_stream(obj_id, chunk_size=1) | |||||
except NotImplementedError: | |||||
return | |||||
self.assertTrue(isinstance(r, Iterator)) | |||||
r = list(r) | |||||
self.assertEqual(b"".join(r), content) | |||||
def test_add_batch(self): | def test_add_batch(self): | ||||
contents = {} | contents = {} | ||||
expected_content_add = 0 | expected_content_add = 0 | ||||
expected_content_add_bytes = 0 | expected_content_add_bytes = 0 | ||||
for i in range(50): | for i in range(50): | ||||
content = b"Test content %02d" % i | content = b"Test content %02d" % i | ||||
content, obj_id = self.hash_content(content) | content, obj_id = self.hash_content(content) | ||||
contents[obj_id] = content | contents[obj_id] = content | ||||
▲ Show 20 Lines • Show All 53 Lines • Show Last 20 Lines |