diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/server_testing.py similarity index 59% copy from swh/storage/tests/test_api_client.py copy to swh/storage/tests/server_testing.py index 2af6b87..61277d3 100644 --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/server_testing.py @@ -1,77 +1,80 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import multiprocessing import socket import time -import unittest + from urllib.request import urlopen -from swh.storage.tests.test_storage import AbstractTestStorage -from swh.storage.api.client import RemoteStorage -from swh.storage.api.server import app +class ServerTestFixture(): + """ Base class for http client/server testing. + + Mix this in a test class in order to have access to an http flask + server running in background. + + Note that the subclass should define a dictionary in self.config + that contains the flask server config. + And a flask application in self.app that corresponds to the type of + server the tested client needs. -class TestRemoteStorage(AbstractTestStorage, unittest.TestCase): - """Test the remote storage API. + To ensure test isolation, each test will run in a different server + and a different repertory. - This class doesn't define any tests as we want identical - functionality between local and remote storage. All the tests are - therefore defined in AbstractTestStorage. + In order to correctly work, the subclass must call the parents class's + setUp() and tearDown() methods. """ def setUp(self): super().setUp() - self.start_server() - self.storage = RemoteStorage(self.url()) def tearDown(self): self.stop_server() - super().tearDown() def url(self): return 'http://127.0.0.1:%d/' % self.port def start_server(self): - """Spawn the API server using multiprocessing""" + """ Spawn the API server using multiprocessing. + """ self.process = None # WSGI app configuration - self.app = app - self.app.config['db'] = 'dbname=%s' % self.dbname - self.app.config['storage_base'] = self.objroot - + for key, value in self.config.items(): + self.app.config[key] = value # Get an available port number sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 0)) self.port = sock.getsockname()[1] sock.close() - # We need a worker function for multiprocessing + # Worker function for multiprocessing def worker(app, port): return app.run(port=port, use_reloader=False) self.process = multiprocessing.Process( target=worker, args=(self.app, self.port) ) self.process.start() - # Wait max. 5 seconds for server to spawn + # Wait max 5 seconds for server to spawn i = 0 while i < 20: try: urlopen(self.url()) except Exception: i += 1 time.sleep(0.25) else: - break + return def stop_server(self): - """Terminate the API server""" + """ Terminate the API server's process. + """ if self.process: self.process.terminate() diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py index 2af6b87..10d2125 100644 --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -1,77 +1,36 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import multiprocessing -import socket -import time import unittest -from urllib.request import urlopen +import tempfile from swh.storage.tests.test_storage import AbstractTestStorage +from swh.storage.tests.server_testing import ServerTestFixture from swh.storage.api.client import RemoteStorage from swh.storage.api.server import app -class TestRemoteStorage(AbstractTestStorage, unittest.TestCase): +class TestRemoteStorage(AbstractTestStorage, ServerTestFixture, + unittest.TestCase): """Test the remote storage API. This class doesn't define any tests as we want identical functionality between local and remote storage. All the tests are therefore defined in AbstractTestStorage. """ def setUp(self): + # ServerTestFixture needs to have self.objroot for + # setUp() method, but this field is defined in + # AbstractTestStorage's setUp() + # To avoid confusion, override the self.objroot to a + # one choosen in this class. + storage_base = tempfile.mkdtemp() + self.config = {'db': 'dbname=%s' % self.dbname, + 'storage_base': storage_base} + self.app = app super().setUp() - - self.start_server() self.storage = RemoteStorage(self.url()) - - def tearDown(self): - self.stop_server() - - super().tearDown() - - def url(self): - return 'http://127.0.0.1:%d/' % self.port - - def start_server(self): - """Spawn the API server using multiprocessing""" - self.process = None - - # WSGI app configuration - self.app = app - self.app.config['db'] = 'dbname=%s' % self.dbname - self.app.config['storage_base'] = self.objroot - - # Get an available port number - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('127.0.0.1', 0)) - self.port = sock.getsockname()[1] - sock.close() - - # We need a worker function for multiprocessing - def worker(app, port): - return app.run(port=port, use_reloader=False) - - self.process = multiprocessing.Process( - target=worker, args=(self.app, self.port) - ) - self.process.start() - - # Wait max. 5 seconds for server to spawn - i = 0 - while i < 20: - try: - urlopen(self.url()) - except Exception: - i += 1 - time.sleep(0.25) - else: - break - - def stop_server(self): - """Terminate the API server""" - if self.process: - self.process.terminate() + self.objroot = storage_base diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py new file mode 100644 index 0000000..e1f4968 --- /dev/null +++ b/swh/storage/tests/test_objstorage_api.py @@ -0,0 +1,83 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.storage.exc import ObjNotFoundError, Error +from swh.storage.tests.server_testing import ServerTestFixture +from swh.storage.objstorage.objstorage import _obj_path +from swh.storage.objstorage.api.client import RemoteObjStorage +from swh.storage.objstorage.api.server import app + + +@attr('!db') +class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): + """ Test the remote archive API. + """ + + def setUp(self): + self.config = {'storage_base': tempfile.mkdtemp(), + 'storage_depth': 3} + self.app = app + super().setUp() + self.objstorage = RemoteObjStorage(self.url()) + + def tearDown(self): + super().tearDown() + + @istest + def content_add(self): + content = bytes('Test content', 'utf8') + id = self.objstorage.content_add(content) + self.assertEquals(self.objstorage.content_get(id), content) + + @istest + def content_get_present(self): + content = bytes('content_get_present', 'utf8') + content_hash = hashutil.hashdata(content) + id = self.objstorage.content_add(content) + self.assertEquals(content_hash['sha1'], id) + + @istest + def content_get_missing(self): + content = bytes('content_get_missing', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_get(content_hash['sha1']) + + @istest + def content_check_invalid(self): + content = bytes('content_check_invalid', 'utf8') + id = self.objstorage.content_add(content) + path = _obj_path(hashutil.hash_to_hex(id), + self.app.config['storage_base'], + self.app.config['storage_depth']) + content = list(content) + with open(path, 'bw') as f: + content[0] = (content[0] + 1) % 128 + f.write(bytes(content)) + with self.assertRaises(Error): + self.objstorage.content_check(id) + + @istest + def content_check_valid(self): + content = bytes('content_check_valid', 'utf8') + id = self.objstorage.content_add(content) + try: + self.objstorage.content_check(id) + except: + self.fail('Integrity check failed') + + @istest + def content_check_missing(self): + content = bytes('content_check_valid', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_check(content_hash['sha1'])