diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py index bd02a31..79835f2 100644 --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -1,190 +1,169 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time -from nose.tools import istest - from swh.model import hashutil from swh.objstorage import exc class ObjStorageTestFixture(): def setUp(self): super().setUp() def hash_content(self, content): obj_id = hashutil.hash_data(content)['sha1'] return content, obj_id def assertContentMatch(self, obj_id, expected_content): # noqa content = self.storage.get(obj_id) self.assertEqual(content, expected_content) - @istest - def check_config(self): + def test_check_config(self): self.assertTrue(self.storage.check_config(check_write=False)) self.assertTrue(self.storage.check_config(check_write=True)) - @istest - def contains(self): + def test_contains(self): content_p, obj_id_p = self.hash_content(b'contains_present') content_m, obj_id_m = self.hash_content(b'contains_missing') self.storage.add(content_p, obj_id=obj_id_p) self.assertIn(obj_id_p, self.storage) self.assertNotIn(obj_id_m, self.storage) - @istest - def add_get_w_id(self): + def test_add_get_w_id(self): content, obj_id = self.hash_content(b'add_get_w_id') r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) - @istest - def add_big(self): + def test_add_big(self): content, obj_id = self.hash_content(b'add_big' * 1024 * 1024) r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) - @istest - def add_get_wo_id(self): + def test_add_get_wo_id(self): content, obj_id = self.hash_content(b'add_get_wo_id') r = self.storage.add(content) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) - @istest - def add_get_batch(self): + def test_add_get_batch(self): content1, obj_id1 = self.hash_content(b'add_get_batch_1') content2, obj_id2 = self.hash_content(b'add_get_batch_2') self.storage.add(content1, obj_id1) self.storage.add(content2, obj_id2) cr1, cr2 = self.storage.get_batch([obj_id1, obj_id2]) self.assertEqual(cr1, content1) self.assertEqual(cr2, content2) - @istest - def get_batch_unexisting_content(self): + def test_get_batch_unexisting_content(self): content, obj_id = self.hash_content(b'get_batch_unexisting_content') result = list(self.storage.get_batch([obj_id])) self.assertTrue(len(result) == 1) self.assertIsNone(result[0]) - @istest - def restore_content(self): + def test_restore_content(self): valid_content, valid_obj_id = self.hash_content(b'restore_content') invalid_content = b'unexpected content' id_adding = self.storage.add(invalid_content, valid_obj_id) self.assertEqual(id_adding, valid_obj_id) with self.assertRaises(exc.Error): self.storage.check(id_adding) id_restore = self.storage.restore(valid_content, valid_obj_id) self.assertEqual(id_restore, valid_obj_id) self.assertContentMatch(valid_obj_id, valid_content) - @istest - def get_missing(self): + def test_get_missing(self): content, obj_id = self.hash_content(b'get_missing') with self.assertRaises(exc.ObjNotFoundError) as e: self.storage.get(obj_id) self.assertIn(obj_id, e.exception.args) - @istest - def check_missing(self): + def test_check_missing(self): content, obj_id = self.hash_content(b'check_missing') with self.assertRaises(exc.Error): self.storage.check(obj_id) - @istest - def check_present(self): + def test_check_present(self): content, obj_id = self.hash_content(b'check_present') self.storage.add(content, obj_id) try: self.storage.check(obj_id) except exc.Error: self.fail('Integrity check failed') - @istest - def delete_missing(self): + def test_delete_missing(self): self.storage.allow_delete = True content, obj_id = self.hash_content(b'missing_content_to_delete') with self.assertRaises(exc.Error): self.storage.delete(obj_id) - @istest - def delete_present(self): + def test_delete_present(self): self.storage.allow_delete = True content, obj_id = self.hash_content(b'content_to_delete') self.storage.add(content, obj_id=obj_id) self.assertTrue(self.storage.delete(obj_id)) with self.assertRaises(exc.Error): self.storage.get(obj_id) - @istest - def delete_not_allowed(self): + def test_delete_not_allowed(self): self.storage.allow_delete = False content, obj_id = self.hash_content(b'content_to_delete') self.storage.add(content, obj_id=obj_id) with self.assertRaises(PermissionError): self.assertTrue(self.storage.delete(obj_id)) - @istest - def delete_not_allowed_by_default(self): + def test_delete_not_allowed_by_default(self): content, obj_id = self.hash_content(b'content_to_delete') self.storage.add(content, obj_id=obj_id) with self.assertRaises(PermissionError): self.assertTrue(self.storage.delete(obj_id)) - @istest - def add_stream(self): + def test_add_stream(self): content = [b'chunk1', b'chunk2'] _, obj_id = self.hash_content(b''.join(content)) try: self.storage.add_stream(iter(content), obj_id=obj_id) except NotImplementedError: return self.assertContentMatch(obj_id, b''.join(content)) - @istest - def add_stream_sleep(self): + def test_add_stream_sleep(self): def gen_content(): yield b'chunk1' time.sleep(0.5) yield b'chunk2' _, obj_id = self.hash_content(b'placeholder_id') try: self.storage.add_stream(gen_content(), obj_id=obj_id) except NotImplementedError: return self.assertContentMatch(obj_id, b'chunk1chunk2') - @istest - def get_stream(self): + def test_get_stream(self): content_l = [b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9'] content = b''.join(content_l) _, obj_id = self.hash_content(content) self.storage.add(content, obj_id=obj_id) try: r = list(self.storage.get_stream(obj_id, chunk_size=1)) except NotImplementedError: return self.assertEqual(r, content_l) - @istest - def add_batch(self): + def test_add_batch(self): contents = {} for i in range(50): content = b'Test content %02d' % i content, obj_id = self.hash_content(content) contents[obj_id] = content ret = self.storage.add_batch(contents) self.assertEqual(len(contents), ret) for obj_id in contents: self.assertIn(obj_id, self.storage) diff --git a/swh/objstorage/tests/test_multiplexer_filter.py b/swh/objstorage/tests/test_multiplexer_filter.py index 64c55db..6e8e4e3 100644 --- a/swh/objstorage/tests/test_multiplexer_filter.py +++ b/swh/objstorage/tests/test_multiplexer_filter.py @@ -1,342 +1,323 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import shutil import tempfile import unittest - from string import ascii_lowercase -from nose.tools import istest - from swh.model import hashutil -from swh.objstorage.exc import ObjNotFoundError, Error from swh.objstorage import get_objstorage -from swh.objstorage.multiplexer.filter import read_only, id_prefix, id_regex +from swh.objstorage.exc import Error, ObjNotFoundError +from swh.objstorage.multiplexer.filter import id_prefix, id_regex, read_only def get_random_content(): return bytes(''.join(random.sample(ascii_lowercase, 10)), 'utf8') class MixinTestReadFilter(unittest.TestCase): # Read only filter should not allow writing def setUp(self): super().setUp() self.tmpdir = tempfile.mkdtemp() pstorage = {'cls': 'pathslicing', 'args': {'root': self.tmpdir, 'slicing': '0:5'}} base_storage = get_objstorage(**pstorage) base_storage.id = lambda cont: hashutil.hash_data(cont)['sha1'] self.storage = get_objstorage('filtered', {'storage_conf': pstorage, 'filters_conf': [read_only()]}) self.valid_content = b'pre-existing content' self.invalid_content = b'invalid_content' self.true_invalid_content = b'Anything that is not correct' self.absent_content = b'non-existent content' # Create a valid content. self.valid_id = base_storage.add(self.valid_content) # Create an invalid id and add a content with it. self.invalid_id = base_storage.id(self.true_invalid_content) base_storage.add(self.invalid_content, obj_id=self.invalid_id) # Compute an id for a non-existing content. self.absent_id = base_storage.id(self.absent_content) def tearDown(self): super().tearDown() shutil.rmtree(self.tmpdir) - @istest - def can_contains(self): + def test_can_contains(self): self.assertTrue(self.valid_id in self.storage) self.assertTrue(self.invalid_id in self.storage) self.assertFalse(self.absent_id in self.storage) - @istest - def can_iter(self): + def test_can_iter(self): self.assertIn(self.valid_id, iter(self.storage)) self.assertIn(self.invalid_id, iter(self.storage)) - @istest - def can_len(self): + def test_can_len(self): self.assertEqual(2, len(self.storage)) - @istest - def can_get(self): + def test_can_get(self): self.assertEqual(self.valid_content, self.storage.get(self.valid_id)) self.assertEqual(self.invalid_content, self.storage.get(self.invalid_id)) - @istest - def can_check(self): + def test_can_check(self): with self.assertRaises(ObjNotFoundError): self.storage.check(self.absent_id) with self.assertRaises(Error): self.storage.check(self.invalid_id) self.storage.check(self.valid_id) - @istest - def can_get_random(self): + def test_can_get_random(self): self.assertEqual(1, len(list(self.storage.get_random(1)))) self.assertEqual(len(list(self.storage)), len(set(self.storage.get_random(1000)))) - @istest - def cannot_add(self): + def test_cannot_add(self): new_id = self.storage.add(b'New content') result = self.storage.add(self.valid_content, self.valid_id) self.assertIsNone(new_id, self.storage) self.assertIsNone(result) - @istest - def cannot_restore(self): + def test_cannot_restore(self): result = self.storage.restore(self.valid_content, self.valid_id) self.assertIsNone(result) class MixinTestIdFilter(): """ Mixin class that tests the filters based on filter.IdFilter Methods "make_valid", "make_invalid" and "filter_storage" must be implemented by subclasses. """ def setUp(self): super().setUp() # Use a hack here : as the mock uses the content as id, it is easy to # create contents that are filtered or not. self.prefix = '71' self.tmpdir = tempfile.mkdtemp() # Make the storage filtered self.sconf = {'cls': 'pathslicing', 'args': {'root': self.tmpdir, 'slicing': '0:5'}} storage = get_objstorage(**self.sconf) self.base_storage = storage self.storage = self.filter_storage(self.sconf) # Set the id calculators storage.id = lambda cont: hashutil.hash_data(cont)['sha1'] # Present content with valid id self.present_valid_content = self.ensure_valid(b'yroqdtotji') self.present_valid_id = storage.id(self.present_valid_content) # Present content with invalid id self.present_invalid_content = self.ensure_invalid(b'glxddlmmzb') self.present_invalid_id = storage.id(self.present_invalid_content) # Missing content with valid id self.missing_valid_content = self.ensure_valid(b'rmzkdclkez') self.missing_valid_id = storage.id(self.missing_valid_content) # Missing content with invalid id self.missing_invalid_content = self.ensure_invalid(b'hlejfuginh') self.missing_invalid_id = storage.id(self.missing_invalid_content) # Present corrupted content with valid id self.present_corrupted_valid_content = self.ensure_valid(b'cdsjwnpaij') self.true_present_corrupted_valid_content = self.ensure_valid( b'mgsdpawcrr') self.present_corrupted_valid_id = storage.id( self.true_present_corrupted_valid_content) # Present corrupted content with invalid id self.present_corrupted_invalid_content = self.ensure_invalid( b'pspjljnrco') self.true_present_corrupted_invalid_content = self.ensure_invalid( b'rjocbnnbso') self.present_corrupted_invalid_id = storage.id( self.true_present_corrupted_invalid_content) # Missing (potentially) corrupted content with valid id self.missing_corrupted_valid_content = self.ensure_valid( b'zxkokfgtou') self.true_missing_corrupted_valid_content = self.ensure_valid( b'royoncooqa') self.missing_corrupted_valid_id = storage.id( self.true_missing_corrupted_valid_content) # Missing (potentially) corrupted content with invalid id self.missing_corrupted_invalid_content = self.ensure_invalid( b'hxaxnrmnyk') self.true_missing_corrupted_invalid_content = self.ensure_invalid( b'qhbolyuifr') self.missing_corrupted_invalid_id = storage.id( self.true_missing_corrupted_invalid_content) # Add the content that are supposed to be present self.storage.add(self.present_valid_content) self.storage.add(self.present_invalid_content) self.storage.add(self.present_corrupted_valid_content, obj_id=self.present_corrupted_valid_id) self.storage.add(self.present_corrupted_invalid_content, obj_id=self.present_corrupted_invalid_id) def tearDown(self): super().tearDown() shutil.rmtree(self.tmpdir) def filter_storage(self, sconf): raise NotImplementedError( 'Id_filter test class must have a filter_storage method') def ensure_valid(self, content=None): if content is None: content = get_random_content() while not self.storage.is_valid(self.base_storage.id(content)): content = get_random_content() return content def ensure_invalid(self, content=None): if content is None: content = get_random_content() while self.storage.is_valid(self.base_storage.id(content)): content = get_random_content() return content - @istest - def contains(self): + def test_contains(self): # Both contents are present, but the invalid one should be ignored. self.assertTrue(self.present_valid_id in self.storage) self.assertFalse(self.present_invalid_id in self.storage) self.assertFalse(self.missing_valid_id in self.storage) self.assertFalse(self.missing_invalid_id in self.storage) self.assertTrue(self.present_corrupted_valid_id in self.storage) self.assertFalse(self.present_corrupted_invalid_id in self.storage) self.assertFalse(self.missing_corrupted_valid_id in self.storage) self.assertFalse(self.missing_corrupted_invalid_id in self.storage) - @istest - def iter(self): + def test_iter(self): self.assertIn(self.present_valid_id, iter(self.storage)) self.assertNotIn(self.present_invalid_id, iter(self.storage)) self.assertNotIn(self.missing_valid_id, iter(self.storage)) self.assertNotIn(self.missing_invalid_id, iter(self.storage)) self.assertIn(self.present_corrupted_valid_id, iter(self.storage)) self.assertNotIn(self.present_corrupted_invalid_id, iter(self.storage)) self.assertNotIn(self.missing_corrupted_valid_id, iter(self.storage)) self.assertNotIn(self.missing_corrupted_invalid_id, iter(self.storage)) - @istest - def len(self): + def test_len(self): # Four contents are present, but only two should be valid. self.assertEqual(2, len(self.storage)) - @istest - def get(self): + def test_get(self): self.assertEqual(self.present_valid_content, self.storage.get(self.present_valid_id)) with self.assertRaises(ObjNotFoundError): self.storage.get(self.present_invalid_id) with self.assertRaises(ObjNotFoundError): self.storage.get(self.missing_valid_id) with self.assertRaises(ObjNotFoundError): self.storage.get(self.missing_invalid_id) self.assertEqual(self.present_corrupted_valid_content, self.storage.get(self.present_corrupted_valid_id)) with self.assertRaises(ObjNotFoundError): self.storage.get(self.present_corrupted_invalid_id) with self.assertRaises(ObjNotFoundError): self.storage.get(self.missing_corrupted_valid_id) with self.assertRaises(ObjNotFoundError): self.storage.get(self.missing_corrupted_invalid_id) - @istest - def check(self): + def test_check(self): self.storage.check(self.present_valid_id) with self.assertRaises(ObjNotFoundError): self.storage.check(self.present_invalid_id) with self.assertRaises(ObjNotFoundError): self.storage.check(self.missing_valid_id) with self.assertRaises(ObjNotFoundError): self.storage.check(self.missing_invalid_id) with self.assertRaises(Error): self.storage.check(self.present_corrupted_valid_id) with self.assertRaises(ObjNotFoundError): self.storage.check(self.present_corrupted_invalid_id) with self.assertRaises(ObjNotFoundError): self.storage.check(self.missing_corrupted_valid_id) with self.assertRaises(ObjNotFoundError): self.storage.check(self.missing_corrupted_invalid_id) - @istest - def get_random(self): + def test_get_random(self): self.assertEqual(0, len(list(self.storage.get_random(0)))) random_content = list(self.storage.get_random(1000)) self.assertIn(self.present_valid_id, random_content) self.assertNotIn(self.present_invalid_id, random_content) self.assertNotIn(self.missing_valid_id, random_content) self.assertNotIn(self.missing_invalid_id, random_content) self.assertIn(self.present_corrupted_valid_id, random_content) self.assertNotIn(self.present_corrupted_invalid_id, random_content) self.assertNotIn(self.missing_corrupted_valid_id, random_content) self.assertNotIn(self.missing_corrupted_invalid_id, random_content) - @istest - def add(self): + def test_add(self): # Add valid and invalid contents to the storage and check their # presence with the unfiltered storage. valid_content = self.ensure_valid(b'ulepsrjbgt') valid_id = self.base_storage.id(valid_content) invalid_content = self.ensure_invalid(b'znvghkjked') invalid_id = self.base_storage.id(invalid_content) self.storage.add(valid_content) self.storage.add(invalid_content) self.assertTrue(valid_id in self.base_storage) self.assertFalse(invalid_id in self.base_storage) - @istest - def restore(self): + def test_restore(self): # Add corrupted content to the storage and the try to restore it valid_content = self.ensure_valid(b'ulepsrjbgt') valid_id = self.base_storage.id(valid_content) corrupted_content = self.ensure_valid(b'ltjkjsloyb') corrupted_id = self.base_storage.id(corrupted_content) self.storage.add(corrupted_content, obj_id=valid_id) with self.assertRaises(ObjNotFoundError): self.storage.check(corrupted_id) with self.assertRaises(Error): self.storage.check(valid_id) self.storage.restore(valid_content) self.storage.check(valid_id) class TestPrefixFilter(MixinTestIdFilter, unittest.TestCase): def setUp(self): self.prefix = b'71' super().setUp() def ensure_valid(self, content): obj_id = hashutil.hash_data(content)['sha1'] hex_obj_id = hashutil.hash_to_hex(obj_id) self.assertTrue(hex_obj_id.startswith(self.prefix)) return content def ensure_invalid(self, content): obj_id = hashutil.hash_data(content)['sha1'] hex_obj_id = hashutil.hash_to_hex(obj_id) self.assertFalse(hex_obj_id.startswith(self.prefix)) return content def filter_storage(self, sconf): return get_objstorage('filtered', {'storage_conf': sconf, 'filters_conf': [id_prefix(self.prefix)]}) class TestRegexFilter(MixinTestIdFilter, unittest.TestCase): def setUp(self): self.regex = r'[a-f][0-9].*' super().setUp() def filter_storage(self, sconf): return get_objstorage('filtered', {'storage_conf': sconf, 'filters_conf': [id_regex(self.regex)]}) diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py index 418c8b4..3dce298 100644 --- a/swh/objstorage/tests/test_objstorage_api.py +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -1,43 +1,42 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import shutil import tempfile import unittest from swh.core.tests.server_testing import ServerTestFixtureAsync - from swh.objstorage import get_objstorage -from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture from swh.objstorage.api.server import app +from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture class TestRemoteObjStorage(ServerTestFixtureAsync, ObjStorageTestFixture, unittest.TestCase): """ Test the remote archive API. """ def setUp(self): self.tmpdir = tempfile.mkdtemp() self.config = { 'cls': 'pathslicing', 'args': { 'root': self.tmpdir, 'slicing': '0:1/0:5', 'allow_delete': True, }, 'client_max_size': 8 * 1024 * 1024, } self.app = app self.app['config'] = self.config super().setUp() self.storage = get_objstorage('remote', { 'url': self.url() }) def tearDown(self): super().tearDown() shutil.rmtree(self.tmpdir) diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py index 01bc0b3..dc25ba2 100644 --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -1,138 +1,132 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict import unittest +from collections import defaultdict from unittest.mock import patch -from nose.tools import istest - from azure.common import AzureMissingResourceHttpError - from swh.model.hashutil import hash_to_hex from swh.objstorage import get_objstorage from .objstorage_testing import ObjStorageTestFixture class MockBlob(): """ Libcloud object mock that replicates its API """ def __init__(self, name, content): self.name = name self.content = content class MockBlockBlobService(): """Mock internal azure library which AzureCloudObjStorage depends upon. """ data = {} def __init__(self, account_name, account_key, **kwargs): # do not care for the account_name and the api_secret_key here self.data = defaultdict(dict) def get_container_properties(self, container_name): self.data[container_name] return container_name in self.data def create_blob_from_bytes(self, container_name, blob_name, blob): self.data[container_name][blob_name] = blob def get_blob_to_bytes(self, container_name, blob_name): if blob_name not in self.data[container_name]: raise AzureMissingResourceHttpError( 'Blob %s not found' % blob_name, 404) return MockBlob(name=blob_name, content=self.data[container_name][blob_name]) def delete_blob(self, container_name, blob_name): try: self.data[container_name].pop(blob_name) except KeyError: raise AzureMissingResourceHttpError( 'Blob %s not found' % blob_name, 404) return True def exists(self, container_name, blob_name): return blob_name in self.data[container_name] def list_blobs(self, container_name): for blob_name, content in self.data[container_name].items(): yield MockBlob(name=blob_name, content=content) class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() patcher = patch( 'swh.objstorage.cloud.objstorage_azure.BlockBlobService', MockBlockBlobService, ) patcher.start() self.addCleanup(patcher.stop) self.storage = get_objstorage('azure', { 'account_name': 'account-name', 'api_secret_key': 'api-secret-key', 'container_name': 'container-name', }) class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() patcher = patch( 'swh.objstorage.cloud.objstorage_azure.BlockBlobService', MockBlockBlobService, ) patcher.start() self.addCleanup(patcher.stop) self.accounts = {} for prefix in '0123456789abcdef': self.accounts[prefix] = { 'account_name': 'account_%s' % prefix, 'api_secret_key': 'secret_key_%s' % prefix, 'container_name': 'container_%s' % prefix, } self.storage = get_objstorage('azure-prefixed', { 'accounts': self.accounts }) - @istest - def prefixedazure_instantiation_missing_prefixes(self): + def test_prefixedazure_instantiation_missing_prefixes(self): del self.accounts['d'] del self.accounts['e'] with self.assertRaisesRegex(ValueError, 'Missing prefixes'): get_objstorage('azure-prefixed', { 'accounts': self.accounts }) - @istest - def prefixedazure_instantiation_inconsistent_prefixes(self): + def test_prefixedazure_instantiation_inconsistent_prefixes(self): self.accounts['00'] = self.accounts['0'] with self.assertRaisesRegex(ValueError, 'Inconsistent prefixes'): get_objstorage('azure-prefixed', { 'accounts': self.accounts }) - @istest - def prefixedazure_sharding_behavior(self): + def test_prefixedazure_sharding_behavior(self): for i in range(100): content, obj_id = self.hash_content(b'test_content_%02d' % i) self.storage.add(content, obj_id=obj_id) hex_obj_id = hash_to_hex(obj_id) prefix = hex_obj_id[0] self.assertTrue( self.storage.prefixes[prefix][0].exists( self.accounts[prefix]['container_name'], hex_obj_id )) diff --git a/swh/objstorage/tests/test_objstorage_cloud.py b/swh/objstorage/tests/test_objstorage_cloud.py index 5dbdfdf..66cea6a 100644 --- a/swh/objstorage/tests/test_objstorage_cloud.py +++ b/swh/objstorage/tests/test_objstorage_cloud.py @@ -1,97 +1,96 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from swh.objstorage.cloud.objstorage_cloud import CloudObjStorage -from libcloud.storage.types import (ObjectDoesNotExistError, - ContainerDoesNotExistError) from libcloud.common.types import InvalidCredsError +from libcloud.storage.types import (ContainerDoesNotExistError, + ObjectDoesNotExistError) +from swh.objstorage.cloud.objstorage_cloud import CloudObjStorage from .objstorage_testing import ObjStorageTestFixture - API_KEY = 'API_KEY' API_SECRET_KEY = 'API SECRET KEY' CONTAINER_NAME = 'test_container' class MockLibcloudObject(): """ Libcloud object mock that replicates its API """ def __init__(self, name, content): self.name = name self.content = list(content) def as_stream(self): yield from iter(self.content) class MockLibcloudDriver(): """ Mock driver that replicates the used LibCloud API """ def __init__(self, api_key, api_secret_key): self.containers = {CONTAINER_NAME: {}} # Storage is initialized self.api_key = api_key self.api_secret_key = api_secret_key def _check_credentials(self): # Private method may be known as another name in Libcloud but is used # to replicate libcloud behavior (i.e. check credential at each # request) if self.api_key != API_KEY or self.api_secret_key != API_SECRET_KEY: raise InvalidCredsError() def get_container(self, container_name): try: return self.containers[container_name] except KeyError: raise ContainerDoesNotExistError(container_name=container_name, driver=self, value=None) def iterate_container_objects(self, container): self._check_credentials() yield from container.values() def get_object(self, container_name, obj_id): self._check_credentials() try: container = self.get_container(container_name) return container[obj_id] except KeyError: raise ObjectDoesNotExistError(object_name=obj_id, driver=self, value=None) def delete_object(self, obj): self._check_credentials() try: container = self.get_container(CONTAINER_NAME) container.pop(obj.name) return True except KeyError: raise ObjectDoesNotExistError(object_name=obj.name, driver=self, value=None) def upload_object_via_stream(self, content, container, obj_id): self._check_credentials() obj = MockLibcloudObject(obj_id, content) container[obj_id] = obj class MockCloudObjStorage(CloudObjStorage): """ Cloud object storage that uses a mocked driver """ def _get_driver(self, api_key, api_secret_key): return MockLibcloudDriver(api_key, api_secret_key) def _get_provider(self): # Implement this for the abc requirement, but behavior is defined in # _get_driver. pass class TestCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() self.storage = MockCloudObjStorage(API_KEY, API_SECRET_KEY, CONTAINER_NAME) diff --git a/swh/objstorage/tests/test_objstorage_instantiation.py b/swh/objstorage/tests/test_objstorage_instantiation.py index 4a1e10a..674f9be 100644 --- a/swh/objstorage/tests/test_objstorage_instantiation.py +++ b/swh/objstorage/tests/test_objstorage_instantiation.py @@ -1,53 +1,49 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import shutil import tempfile import unittest -from nose.tools import istest - from swh.objstorage import get_objstorage -from swh.objstorage.objstorage_pathslicing import PathSlicingObjStorage from swh.objstorage.api.client import RemoteObjStorage +from swh.objstorage.objstorage_pathslicing import PathSlicingObjStorage class TestObjStorageInitialization(unittest.TestCase): """ Test that the methods for ObjStorage initializations with `get_objstorage` works properly. """ def setUp(self): self.path = tempfile.mkdtemp() self.path2 = tempfile.mkdtemp() # Server is launched at self.url() self.config = {'storage_base': self.path2, 'storage_slicing': '0:1/0:5'} super().setUp() def tearDown(self): super().tearDown() shutil.rmtree(self.path) shutil.rmtree(self.path2) - @istest - def pathslicing_objstorage(self): + def test_pathslicing_objstorage(self): conf = { 'cls': 'pathslicing', 'args': {'root': self.path, 'slicing': '0:2/0:5'} } st = get_objstorage(**conf) self.assertTrue(isinstance(st, PathSlicingObjStorage)) - @istest - def remote_objstorage(self): + def test_remote_objstorage(self): conf = { 'cls': 'remote', 'args': { 'url': 'http://127.0.0.1:4242/' } } st = get_objstorage(**conf) self.assertTrue(isinstance(st, RemoteObjStorage)) diff --git a/swh/objstorage/tests/test_objstorage_multiplexer.py b/swh/objstorage/tests/test_objstorage_multiplexer.py index 1404460..843d07e 100644 --- a/swh/objstorage/tests/test_objstorage_multiplexer.py +++ b/swh/objstorage/tests/test_objstorage_multiplexer.py @@ -1,74 +1,67 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import tempfile import unittest -from nose.tools import istest - from swh.objstorage import PathSlicingObjStorage from swh.objstorage.multiplexer import MultiplexerObjStorage from swh.objstorage.multiplexer.filter import add_filter, read_only from .objstorage_testing import ObjStorageTestFixture class TestMultiplexerObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() self.tmpdir = tempfile.mkdtemp() os.mkdir(os.path.join(self.tmpdir, 'root1')) os.mkdir(os.path.join(self.tmpdir, 'root2')) self.storage_v1 = PathSlicingObjStorage( os.path.join(self.tmpdir, 'root1'), '0:2/2:4') self.storage_v2 = PathSlicingObjStorage( os.path.join(self.tmpdir, 'root2'), '0:1/0:5') self.r_storage = add_filter(self.storage_v1, read_only()) self.w_storage = self.storage_v2 self.storage = MultiplexerObjStorage([self.r_storage, self.w_storage]) def tearDown(self): super().tearDown() shutil.rmtree(self.tmpdir) - @istest - def contains(self): + def test_contains(self): content_p, obj_id_p = self.hash_content(b'contains_present') content_m, obj_id_m = self.hash_content(b'contains_missing') self.storage.add(content_p, obj_id=obj_id_p) self.assertIn(obj_id_p, self.storage) self.assertNotIn(obj_id_m, self.storage) - @istest - def delete_missing(self): + def test_delete_missing(self): self.storage_v1.allow_delete = True self.storage_v2.allow_delete = True - super().delete_missing() + super().test_delete_missing() - @istest - def delete_present(self): + def test_delete_present(self): self.storage_v1.allow_delete = True self.storage_v2.allow_delete = True - super().delete_present() + super().test_delete_present() - @istest - def get_random_contents(self): + def test_get_random_contents(self): content, obj_id = self.hash_content(b'get_random_content') self.storage.add(content) random_contents = list(self.storage.get_random(1)) self.assertEqual(1, len(random_contents)) self.assertIn(obj_id, random_contents) - @istest - def access_readonly(self): + def test_access_readonly(self): # Add a content to the readonly storage content, obj_id = self.hash_content(b'content in read-only') self.storage_v1.add(content) # Try to retrieve it on the main storage self.assertIn(obj_id, self.storage) diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py index e209c8d..d9dac4d 100644 --- a/swh/objstorage/tests/test_objstorage_pathslicing.py +++ b/swh/objstorage/tests/test_objstorage_pathslicing.py @@ -1,76 +1,68 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import shutil import tempfile import unittest -from nose.tools import istest - from swh.model import hashutil -from swh.objstorage import exc -from swh.objstorage import get_objstorage +from swh.objstorage import exc, get_objstorage from .objstorage_testing import ObjStorageTestFixture class TestPathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() self.slicing = '0:2/2:4/4:6' self.tmpdir = tempfile.mkdtemp() self.storage = get_objstorage( 'pathslicing', {'root': self.tmpdir, 'slicing': self.slicing} ) def tearDown(self): super().tearDown() shutil.rmtree(self.tmpdir) def content_path(self, obj_id): hex_obj_id = hashutil.hash_to_hex(obj_id) return self.storage._obj_path(hex_obj_id) - @istest - def iter(self): + def test_iter(self): content, obj_id = self.hash_content(b'iter') self.assertEqual(list(iter(self.storage)), []) self.storage.add(content, obj_id=obj_id) self.assertEqual(list(iter(self.storage)), [obj_id]) - @istest - def len(self): + def test_len(self): content, obj_id = self.hash_content(b'len') self.assertEqual(len(self.storage), 0) self.storage.add(content, obj_id=obj_id) self.assertEqual(len(self.storage), 1) - @istest - def check_not_gzip(self): + def test_check_not_gzip(self): content, obj_id = self.hash_content(b'check_not_gzip') self.storage.add(content, obj_id=obj_id) with open(self.content_path(obj_id), 'ab') as f: # Add garbage. f.write(b'garbage') with self.assertRaises(exc.Error): self.storage.check(obj_id) - @istest - def check_id_mismatch(self): + def test_check_id_mismatch(self): content, obj_id = self.hash_content(b'check_id_mismatch') self.storage.add(content, obj_id=obj_id) with open(self.content_path(obj_id), 'wb') as f: f.write(b'unexpected content') with self.assertRaises(exc.Error): self.storage.check(obj_id) - @istest - def get_random_contents(self): + def test_get_random_contents(self): content, obj_id = self.hash_content(b'get_random_content') self.storage.add(content, obj_id=obj_id) random_contents = list(self.storage.get_random(1)) self.assertEqual(1, len(random_contents)) self.assertIn(obj_id, random_contents) diff --git a/swh/objstorage/tests/test_objstorage_striping.py b/swh/objstorage/tests/test_objstorage_striping.py index 6c2beb7..6bf214c 100644 --- a/swh/objstorage/tests/test_objstorage_striping.py +++ b/swh/objstorage/tests/test_objstorage_striping.py @@ -1,84 +1,80 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import tempfile import unittest -from nose.tools import istest - from swh.objstorage import get_objstorage + from .objstorage_testing import ObjStorageTestFixture class TestStripingObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() self.base_dir = tempfile.mkdtemp() os.mkdir(os.path.join(self.base_dir, 'root1')) os.mkdir(os.path.join(self.base_dir, 'root2')) storage_config = { 'cls': 'striping', 'args': { 'objstorages': [ { 'cls': 'pathslicing', 'args': { 'root': os.path.join(self.base_dir, 'root1'), 'slicing': '0:2', 'allow_delete': True, } }, { 'cls': 'pathslicing', 'args': { 'root': os.path.join(self.base_dir, 'root2'), 'slicing': '0:2', 'allow_delete': True, } }, ] } } self.storage = get_objstorage(**storage_config) def tearDown(self): shutil.rmtree(self.base_dir) - @istest - def add_get_wo_id(self): + def test_add_get_wo_id(self): self.skipTest("can't add without id in the multiplexer storage") - @istest - def add_striping_behavior(self): + def test_add_striping_behavior(self): exp_storage_counts = [0, 0] storage_counts = [0, 0] for i in range(100): content, obj_id = self.hash_content( b'striping_behavior_test%02d' % i ) self.storage.add(content, obj_id) exp_storage_counts[self.storage.get_storage_index(obj_id)] += 1 count = 0 for i, storage in enumerate(self.storage.storages): if obj_id not in storage: continue count += 1 storage_counts[i] += 1 self.assertEqual(count, 1) self.assertEqual(storage_counts, exp_storage_counts) - @istest - def get_striping_behavior(self): + def test_get_striping_behavior(self): # Make sure we can read objects that are available in any backend # storage content, obj_id = self.hash_content(b'striping_behavior_test') for storage in self.storage.storages: storage.add(content, obj_id) self.assertIn(obj_id, self.storage) storage.delete(obj_id) self.assertNotIn(obj_id, self.storage)