Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9336819
D523.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D523.diff
View Options
diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py
--- a/swh/objstorage/tests/objstorage_testing.py
+++ b/swh/objstorage/tests/objstorage_testing.py
@@ -5,8 +5,6 @@
import time
-from nose.tools import istest
-
from swh.model import hashutil
from swh.objstorage import exc
@@ -24,42 +22,36 @@
content = self.storage.get(obj_id)
self.assertEqual(content, expected_content)
- @istest
- def check_config(self):
+ def test_check_config(self):
self.assertTrue(self.storage.check_config(check_write=False))
self.assertTrue(self.storage.check_config(check_write=True))
- @istest
- def contains(self):
+ def test_contains(self):
content_p, obj_id_p = self.hash_content(b'contains_present')
content_m, obj_id_m = self.hash_content(b'contains_missing')
self.storage.add(content_p, obj_id=obj_id_p)
self.assertIn(obj_id_p, self.storage)
self.assertNotIn(obj_id_m, self.storage)
- @istest
- def add_get_w_id(self):
+ def test_add_get_w_id(self):
content, obj_id = self.hash_content(b'add_get_w_id')
r = self.storage.add(content, obj_id=obj_id)
self.assertEqual(obj_id, r)
self.assertContentMatch(obj_id, content)
- @istest
- def add_big(self):
+ def test_add_big(self):
content, obj_id = self.hash_content(b'add_big' * 1024 * 1024)
r = self.storage.add(content, obj_id=obj_id)
self.assertEqual(obj_id, r)
self.assertContentMatch(obj_id, content)
- @istest
- def add_get_wo_id(self):
+ def test_add_get_wo_id(self):
content, obj_id = self.hash_content(b'add_get_wo_id')
r = self.storage.add(content)
self.assertEqual(obj_id, r)
self.assertContentMatch(obj_id, content)
- @istest
- def add_get_batch(self):
+ def test_add_get_batch(self):
content1, obj_id1 = self.hash_content(b'add_get_batch_1')
content2, obj_id2 = self.hash_content(b'add_get_batch_2')
self.storage.add(content1, obj_id1)
@@ -68,15 +60,13 @@
self.assertEqual(cr1, content1)
self.assertEqual(cr2, content2)
- @istest
- def get_batch_unexisting_content(self):
+ def test_get_batch_unexisting_content(self):
content, obj_id = self.hash_content(b'get_batch_unexisting_content')
result = list(self.storage.get_batch([obj_id]))
self.assertTrue(len(result) == 1)
self.assertIsNone(result[0])
- @istest
- def restore_content(self):
+ def test_restore_content(self):
valid_content, valid_obj_id = self.hash_content(b'restore_content')
invalid_content = b'unexpected content'
id_adding = self.storage.add(invalid_content, valid_obj_id)
@@ -87,22 +77,19 @@
self.assertEqual(id_restore, valid_obj_id)
self.assertContentMatch(valid_obj_id, valid_content)
- @istest
- def get_missing(self):
+ def test_get_missing(self):
content, obj_id = self.hash_content(b'get_missing')
with self.assertRaises(exc.ObjNotFoundError) as e:
self.storage.get(obj_id)
self.assertIn(obj_id, e.exception.args)
- @istest
- def check_missing(self):
+ def test_check_missing(self):
content, obj_id = self.hash_content(b'check_missing')
with self.assertRaises(exc.Error):
self.storage.check(obj_id)
- @istest
- def check_present(self):
+ def test_check_present(self):
content, obj_id = self.hash_content(b'check_present')
self.storage.add(content, obj_id)
try:
@@ -110,15 +97,13 @@
except exc.Error:
self.fail('Integrity check failed')
- @istest
- def delete_missing(self):
+ def test_delete_missing(self):
self.storage.allow_delete = True
content, obj_id = self.hash_content(b'missing_content_to_delete')
with self.assertRaises(exc.Error):
self.storage.delete(obj_id)
- @istest
- def delete_present(self):
+ def test_delete_present(self):
self.storage.allow_delete = True
content, obj_id = self.hash_content(b'content_to_delete')
self.storage.add(content, obj_id=obj_id)
@@ -126,23 +111,20 @@
with self.assertRaises(exc.Error):
self.storage.get(obj_id)
- @istest
- def delete_not_allowed(self):
+ def test_delete_not_allowed(self):
self.storage.allow_delete = False
content, obj_id = self.hash_content(b'content_to_delete')
self.storage.add(content, obj_id=obj_id)
with self.assertRaises(PermissionError):
self.assertTrue(self.storage.delete(obj_id))
- @istest
- def delete_not_allowed_by_default(self):
+ def test_delete_not_allowed_by_default(self):
content, obj_id = self.hash_content(b'content_to_delete')
self.storage.add(content, obj_id=obj_id)
with self.assertRaises(PermissionError):
self.assertTrue(self.storage.delete(obj_id))
- @istest
- def add_stream(self):
+ def test_add_stream(self):
content = [b'chunk1', b'chunk2']
_, obj_id = self.hash_content(b''.join(content))
try:
@@ -151,8 +133,7 @@
return
self.assertContentMatch(obj_id, b''.join(content))
- @istest
- def add_stream_sleep(self):
+ def test_add_stream_sleep(self):
def gen_content():
yield b'chunk1'
time.sleep(0.5)
@@ -164,8 +145,7 @@
return
self.assertContentMatch(obj_id, b'chunk1chunk2')
- @istest
- def get_stream(self):
+ def test_get_stream(self):
content_l = [b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9']
content = b''.join(content_l)
_, obj_id = self.hash_content(content)
@@ -176,8 +156,7 @@
return
self.assertEqual(r, content_l)
- @istest
- def add_batch(self):
+ def test_add_batch(self):
contents = {}
for i in range(50):
content = b'Test content %02d' % i
diff --git a/swh/objstorage/tests/test_multiplexer_filter.py b/swh/objstorage/tests/test_multiplexer_filter.py
--- a/swh/objstorage/tests/test_multiplexer_filter.py
+++ b/swh/objstorage/tests/test_multiplexer_filter.py
@@ -7,15 +7,12 @@
import shutil
import tempfile
import unittest
-
from string import ascii_lowercase
-from nose.tools import istest
-
from swh.model import hashutil
-from swh.objstorage.exc import ObjNotFoundError, Error
from swh.objstorage import get_objstorage
-from swh.objstorage.multiplexer.filter import read_only, id_prefix, id_regex
+from swh.objstorage.exc import Error, ObjNotFoundError
+from swh.objstorage.multiplexer.filter import id_prefix, id_regex, read_only
def get_random_content():
@@ -52,50 +49,42 @@
super().tearDown()
shutil.rmtree(self.tmpdir)
- @istest
- def can_contains(self):
+ def test_can_contains(self):
self.assertTrue(self.valid_id in self.storage)
self.assertTrue(self.invalid_id in self.storage)
self.assertFalse(self.absent_id in self.storage)
- @istest
- def can_iter(self):
+ def test_can_iter(self):
self.assertIn(self.valid_id, iter(self.storage))
self.assertIn(self.invalid_id, iter(self.storage))
- @istest
- def can_len(self):
+ def test_can_len(self):
self.assertEqual(2, len(self.storage))
- @istest
- def can_get(self):
+ def test_can_get(self):
self.assertEqual(self.valid_content, self.storage.get(self.valid_id))
self.assertEqual(self.invalid_content,
self.storage.get(self.invalid_id))
- @istest
- def can_check(self):
+ def test_can_check(self):
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.absent_id)
with self.assertRaises(Error):
self.storage.check(self.invalid_id)
self.storage.check(self.valid_id)
- @istest
- def can_get_random(self):
+ def test_can_get_random(self):
self.assertEqual(1, len(list(self.storage.get_random(1))))
self.assertEqual(len(list(self.storage)),
len(set(self.storage.get_random(1000))))
- @istest
- def cannot_add(self):
+ def test_cannot_add(self):
new_id = self.storage.add(b'New content')
result = self.storage.add(self.valid_content, self.valid_id)
self.assertIsNone(new_id, self.storage)
self.assertIsNone(result)
- @istest
- def cannot_restore(self):
+ def test_cannot_restore(self):
result = self.storage.restore(self.valid_content, self.valid_id)
self.assertIsNone(result)
@@ -200,8 +189,7 @@
content = get_random_content()
return content
- @istest
- def contains(self):
+ def test_contains(self):
# Both contents are present, but the invalid one should be ignored.
self.assertTrue(self.present_valid_id in self.storage)
self.assertFalse(self.present_invalid_id in self.storage)
@@ -212,8 +200,7 @@
self.assertFalse(self.missing_corrupted_valid_id in self.storage)
self.assertFalse(self.missing_corrupted_invalid_id in self.storage)
- @istest
- def iter(self):
+ def test_iter(self):
self.assertIn(self.present_valid_id, iter(self.storage))
self.assertNotIn(self.present_invalid_id, iter(self.storage))
self.assertNotIn(self.missing_valid_id, iter(self.storage))
@@ -223,13 +210,11 @@
self.assertNotIn(self.missing_corrupted_valid_id, iter(self.storage))
self.assertNotIn(self.missing_corrupted_invalid_id, iter(self.storage))
- @istest
- def len(self):
+ def test_len(self):
# Four contents are present, but only two should be valid.
self.assertEqual(2, len(self.storage))
- @istest
- def get(self):
+ def test_get(self):
self.assertEqual(self.present_valid_content,
self.storage.get(self.present_valid_id))
with self.assertRaises(ObjNotFoundError):
@@ -247,8 +232,7 @@
with self.assertRaises(ObjNotFoundError):
self.storage.get(self.missing_corrupted_invalid_id)
- @istest
- def check(self):
+ def test_check(self):
self.storage.check(self.present_valid_id)
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.present_invalid_id)
@@ -265,8 +249,7 @@
with self.assertRaises(ObjNotFoundError):
self.storage.check(self.missing_corrupted_invalid_id)
- @istest
- def get_random(self):
+ def test_get_random(self):
self.assertEqual(0, len(list(self.storage.get_random(0))))
random_content = list(self.storage.get_random(1000))
@@ -279,8 +262,7 @@
self.assertNotIn(self.missing_corrupted_valid_id, random_content)
self.assertNotIn(self.missing_corrupted_invalid_id, random_content)
- @istest
- def add(self):
+ def test_add(self):
# Add valid and invalid contents to the storage and check their
# presence with the unfiltered storage.
valid_content = self.ensure_valid(b'ulepsrjbgt')
@@ -292,8 +274,7 @@
self.assertTrue(valid_id in self.base_storage)
self.assertFalse(invalid_id in self.base_storage)
- @istest
- def restore(self):
+ def test_restore(self):
# Add corrupted content to the storage and the try to restore it
valid_content = self.ensure_valid(b'ulepsrjbgt')
valid_id = self.base_storage.id(valid_content)
diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py
--- a/swh/objstorage/tests/test_objstorage_api.py
+++ b/swh/objstorage/tests/test_objstorage_api.py
@@ -8,10 +8,9 @@
import unittest
from swh.core.tests.server_testing import ServerTestFixtureAsync
-
from swh.objstorage import get_objstorage
-from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
from swh.objstorage.api.server import app
+from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
class TestRemoteObjStorage(ServerTestFixtureAsync, ObjStorageTestFixture,
diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py
--- a/swh/objstorage/tests/test_objstorage_azure.py
+++ b/swh/objstorage/tests/test_objstorage_azure.py
@@ -3,14 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from collections import defaultdict
import unittest
+from collections import defaultdict
from unittest.mock import patch
-from nose.tools import istest
-
from azure.common import AzureMissingResourceHttpError
-
from swh.model.hashutil import hash_to_hex
from swh.objstorage import get_objstorage
@@ -106,8 +103,7 @@
'accounts': self.accounts
})
- @istest
- def prefixedazure_instantiation_missing_prefixes(self):
+ def test_prefixedazure_instantiation_missing_prefixes(self):
del self.accounts['d']
del self.accounts['e']
@@ -116,8 +112,7 @@
'accounts': self.accounts
})
- @istest
- def prefixedazure_instantiation_inconsistent_prefixes(self):
+ def test_prefixedazure_instantiation_inconsistent_prefixes(self):
self.accounts['00'] = self.accounts['0']
with self.assertRaisesRegex(ValueError, 'Inconsistent prefixes'):
@@ -125,8 +120,7 @@
'accounts': self.accounts
})
- @istest
- def prefixedazure_sharding_behavior(self):
+ def test_prefixedazure_sharding_behavior(self):
for i in range(100):
content, obj_id = self.hash_content(b'test_content_%02d' % i)
self.storage.add(content, obj_id=obj_id)
diff --git a/swh/objstorage/tests/test_objstorage_cloud.py b/swh/objstorage/tests/test_objstorage_cloud.py
--- a/swh/objstorage/tests/test_objstorage_cloud.py
+++ b/swh/objstorage/tests/test_objstorage_cloud.py
@@ -5,14 +5,13 @@
import unittest
-from swh.objstorage.cloud.objstorage_cloud import CloudObjStorage
-from libcloud.storage.types import (ObjectDoesNotExistError,
- ContainerDoesNotExistError)
from libcloud.common.types import InvalidCredsError
+from libcloud.storage.types import (ContainerDoesNotExistError,
+ ObjectDoesNotExistError)
+from swh.objstorage.cloud.objstorage_cloud import CloudObjStorage
from .objstorage_testing import ObjStorageTestFixture
-
API_KEY = 'API_KEY'
API_SECRET_KEY = 'API SECRET KEY'
CONTAINER_NAME = 'test_container'
diff --git a/swh/objstorage/tests/test_objstorage_instantiation.py b/swh/objstorage/tests/test_objstorage_instantiation.py
--- a/swh/objstorage/tests/test_objstorage_instantiation.py
+++ b/swh/objstorage/tests/test_objstorage_instantiation.py
@@ -7,11 +7,9 @@
import tempfile
import unittest
-from nose.tools import istest
-
from swh.objstorage import get_objstorage
-from swh.objstorage.objstorage_pathslicing import PathSlicingObjStorage
from swh.objstorage.api.client import RemoteObjStorage
+from swh.objstorage.objstorage_pathslicing import PathSlicingObjStorage
class TestObjStorageInitialization(unittest.TestCase):
@@ -32,8 +30,7 @@
shutil.rmtree(self.path)
shutil.rmtree(self.path2)
- @istest
- def pathslicing_objstorage(self):
+ def test_pathslicing_objstorage(self):
conf = {
'cls': 'pathslicing',
'args': {'root': self.path, 'slicing': '0:2/0:5'}
@@ -41,8 +38,7 @@
st = get_objstorage(**conf)
self.assertTrue(isinstance(st, PathSlicingObjStorage))
- @istest
- def remote_objstorage(self):
+ def test_remote_objstorage(self):
conf = {
'cls': 'remote',
'args': {
diff --git a/swh/objstorage/tests/test_objstorage_multiplexer.py b/swh/objstorage/tests/test_objstorage_multiplexer.py
--- a/swh/objstorage/tests/test_objstorage_multiplexer.py
+++ b/swh/objstorage/tests/test_objstorage_multiplexer.py
@@ -8,8 +8,6 @@
import tempfile
import unittest
-from nose.tools import istest
-
from swh.objstorage import PathSlicingObjStorage
from swh.objstorage.multiplexer import MultiplexerObjStorage
from swh.objstorage.multiplexer.filter import add_filter, read_only
@@ -37,36 +35,31 @@
super().tearDown()
shutil.rmtree(self.tmpdir)
- @istest
- def contains(self):
+ def test_contains(self):
content_p, obj_id_p = self.hash_content(b'contains_present')
content_m, obj_id_m = self.hash_content(b'contains_missing')
self.storage.add(content_p, obj_id=obj_id_p)
self.assertIn(obj_id_p, self.storage)
self.assertNotIn(obj_id_m, self.storage)
- @istest
- def delete_missing(self):
+ def test_delete_missing(self):
self.storage_v1.allow_delete = True
self.storage_v2.allow_delete = True
- super().delete_missing()
+ super().test_delete_missing()
- @istest
- def delete_present(self):
+ def test_delete_present(self):
self.storage_v1.allow_delete = True
self.storage_v2.allow_delete = True
- super().delete_present()
+ super().test_delete_present()
- @istest
- def get_random_contents(self):
+ def test_get_random_contents(self):
content, obj_id = self.hash_content(b'get_random_content')
self.storage.add(content)
random_contents = list(self.storage.get_random(1))
self.assertEqual(1, len(random_contents))
self.assertIn(obj_id, random_contents)
- @istest
- def access_readonly(self):
+ def test_access_readonly(self):
# Add a content to the readonly storage
content, obj_id = self.hash_content(b'content in read-only')
self.storage_v1.add(content)
diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py
--- a/swh/objstorage/tests/test_objstorage_pathslicing.py
+++ b/swh/objstorage/tests/test_objstorage_pathslicing.py
@@ -7,11 +7,8 @@
import tempfile
import unittest
-from nose.tools import istest
-
from swh.model import hashutil
-from swh.objstorage import exc
-from swh.objstorage import get_objstorage
+from swh.objstorage import exc, get_objstorage
from .objstorage_testing import ObjStorageTestFixture
@@ -35,22 +32,19 @@
hex_obj_id = hashutil.hash_to_hex(obj_id)
return self.storage._obj_path(hex_obj_id)
- @istest
- def iter(self):
+ def test_iter(self):
content, obj_id = self.hash_content(b'iter')
self.assertEqual(list(iter(self.storage)), [])
self.storage.add(content, obj_id=obj_id)
self.assertEqual(list(iter(self.storage)), [obj_id])
- @istest
- def len(self):
+ def test_len(self):
content, obj_id = self.hash_content(b'len')
self.assertEqual(len(self.storage), 0)
self.storage.add(content, obj_id=obj_id)
self.assertEqual(len(self.storage), 1)
- @istest
- def check_not_gzip(self):
+ def test_check_not_gzip(self):
content, obj_id = self.hash_content(b'check_not_gzip')
self.storage.add(content, obj_id=obj_id)
with open(self.content_path(obj_id), 'ab') as f: # Add garbage.
@@ -58,8 +52,7 @@
with self.assertRaises(exc.Error):
self.storage.check(obj_id)
- @istest
- def check_id_mismatch(self):
+ def test_check_id_mismatch(self):
content, obj_id = self.hash_content(b'check_id_mismatch')
self.storage.add(content, obj_id=obj_id)
with open(self.content_path(obj_id), 'wb') as f:
@@ -67,8 +60,7 @@
with self.assertRaises(exc.Error):
self.storage.check(obj_id)
- @istest
- def get_random_contents(self):
+ def test_get_random_contents(self):
content, obj_id = self.hash_content(b'get_random_content')
self.storage.add(content, obj_id=obj_id)
random_contents = list(self.storage.get_random(1))
diff --git a/swh/objstorage/tests/test_objstorage_striping.py b/swh/objstorage/tests/test_objstorage_striping.py
--- a/swh/objstorage/tests/test_objstorage_striping.py
+++ b/swh/objstorage/tests/test_objstorage_striping.py
@@ -8,9 +8,8 @@
import tempfile
import unittest
-from nose.tools import istest
-
from swh.objstorage import get_objstorage
+
from .objstorage_testing import ObjStorageTestFixture
@@ -49,12 +48,10 @@
def tearDown(self):
shutil.rmtree(self.base_dir)
- @istest
- def add_get_wo_id(self):
+ def test_add_get_wo_id(self):
self.skipTest("can't add without id in the multiplexer storage")
- @istest
- def add_striping_behavior(self):
+ def test_add_striping_behavior(self):
exp_storage_counts = [0, 0]
storage_counts = [0, 0]
for i in range(100):
@@ -72,8 +69,7 @@
self.assertEqual(count, 1)
self.assertEqual(storage_counts, exp_storage_counts)
- @istest
- def get_striping_behavior(self):
+ def test_get_striping_behavior(self):
# Make sure we can read objects that are available in any backend
# storage
content, obj_id = self.hash_content(b'striping_behavior_test')
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 7:45 AM (10 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221683
Attached To
D523: Rename test methods to test_ to allow py.test collection
Event Timeline
Log In to Comment