diff --git a/swh/web/ui/tests/test_app.py b/swh/web/ui/tests/test_app.py index 2b4f57d3..57b43c1d 100644 --- a/swh/web/ui/tests/test_app.py +++ b/swh/web/ui/tests/test_app.py @@ -1,52 +1,53 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information # Functions defined here are NOT DESIGNED FOR PRODUCTION from swh.web.ui import controller from swh.storage.api.client import RemoteStorage as Storage # Because the Storage's __init__ function does side effect at startup... class RemoteStorageAdapter(Storage): def __init__(self, base_url): self.base_url = base_url def _init_mock_storage(base_url='https://somewhere.org:4321'): """Instanciate a remote storage whose goal is to be mocked in a test context. NOT FOR PRODUCTION Returns: An instance of swh.storage.api.client.RemoteStorage destined to be mocked (it does not do any rest call) """ return RemoteStorageAdapter(base_url) # destined to be used as mock def init_app(base_url='https://somewhere.org:4321'): """Function to initiate a flask app with storage designed to be mocked. Returns: Tuple app and storage. NOT FOR PRODUCTION """ storage = _init_mock_storage(base_url) # inject the mock data conf = {'storage': storage, - 'upload_folder': '/some/upload-dir'} + 'upload_folder': '/some/upload-dir', + 'upload_allowed_extensions': ['txt']} controller.app.config['TESTING'] = True controller.app.config.update({'conf': conf}) app = controller.app.test_client() - return app, storage + return app, controller.app.config, storage diff --git a/swh/web/ui/tests/test_controller.py b/swh/web/ui/tests/test_controller.py index 70eeeb1b..cb762553 100644 --- a/swh/web/ui/tests/test_controller.py +++ b/swh/web/ui/tests/test_controller.py @@ -1,50 +1,50 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from swh.web.ui.tests import test_app class ApiTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - cls.app, _ = test_app.init_app() + cls.app, _, _ = test_app.init_app() @istest def info(self): # when rv = self.app.get('/about') self.assertEquals(rv.status_code, 200) self.assertIn(b'About', rv.data) # @istest def search_1(self): # when rv = self.app.get('/search') self.assertEquals(rv.status_code, 200) # check this api self.assertRegexpMatches(rv.data, b'name=q value=>') # @istest def search_2(self): # when rv = self.app.get('/search?q=one-hash-to-look-for:another-one') self.assertEquals(rv.status_code, 200) # check this api self.assertRegexpMatches( rv.data, b'name=q value=one-hash-to-look-for:another-one') # @istest def api_1_stat_counters(self): rv = self.app.get('/api/1/stat/counters') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py index 2f1380a7..6f540755 100644 --- a/swh/web/ui/tests/test_service.py +++ b/swh/web/ui/tests/test_service.py @@ -1,153 +1,153 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from unittest.mock import MagicMock, patch from swh.core.hashutil import hex_to_hash from swh.web.ui import service from swh.web.ui.tests import test_app class ServiceTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - cls.app, cls.storage = test_app.init_app() + _, _, cls.storage = test_app.init_app() @istest def lookup_hash_does_not_exist(self): # given self.storage.content_exist = MagicMock(return_value=False) # when actual_lookup = service.lookup_hash( 'sha1:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertFalse(actual_lookup) # check the function has been called with parameters self.storage.content_exist.assert_called_with({ 'sha1': hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')}) @istest def lookup_hash_exist(self): # given self.storage.content_exist = MagicMock(return_value=True) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertTrue(actual_lookup) self.storage.content_exist.assert_called_with({ 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')}) @istest def lookup_hash_origin(self): # given self.storage.content_find_occurrence = MagicMock(return_value={ 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa }) expected_origin = { 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc' '_002dS_005fISREG.html' } # when actual_origin = service.lookup_hash_origin( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_origin, expected_origin) self.storage.content_find_occurrence.assert_called_with( {'sha1_git': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')}) @istest def stat_counters(self): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } self.storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) self.storage.stat_counters.assert_called_with() @istest def hash_and_search(self): # given self.storage.content_exist = MagicMock(return_value=False) bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') # when with patch( 'swh.core.hashutil.hashfile', return_value={'sha1': bhash}): actual_hash, actual_search = service.hash_and_search('/some/path') # then self.assertEqual(actual_hash, '456caf10e9535160d90e874b45aa426de762f19f') self.assertFalse(actual_search) self.storage.content_exist.assert_called_with({'sha1': bhash}) @patch('swh.web.ui.service.upload') @istest def test_upload_and_search_upload_OK(self, mock_upload): # given (cf. decorators patch) mock_upload.save_in_upload_folder.return_value = ( '/tmp/blah', 'some-filename', None) mock_upload.cleanup.return_value = None # when actual_file, actual_hash, actual_search = service.upload_and_search( '/some/path/to/file') # then self.assertEqual(actual_file, 'some-filename') self.assertIsNone(actual_hash) self.assertIsNone(actual_search) mock_upload.save_in_upload_folder.assert_called_with( '/some/path/to/file') mock_upload.cleanup.assert_called_with('/tmp/blah') diff --git a/swh/web/ui/tests/test_upload.py b/swh/web/ui/tests/test_upload.py index c7b1f846..6f27752f 100644 --- a/swh/web/ui/tests/test_upload.py +++ b/swh/web/ui/tests/test_upload.py @@ -1,94 +1,154 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest -from unittest.mock import patch +from unittest.mock import patch, MagicMock from swh.web.ui import upload from swh.web.ui.tests import test_app class UploadTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - cls.app, cls.storage = test_app.init_app() + cls.app, cls.config, cls.storage = test_app.init_app() @istest def allowed_file_ok(self): # when actual_perm = upload.allowed_file('README') self.assertTrue(actual_perm) # when actual_perm2 = upload.allowed_file('README', []) self.assertTrue(actual_perm2) # when actual_perm3 = upload.allowed_file('README', ['README', 'LICENCE', 'BUGS']) self.assertTrue(actual_perm3) # when actual_perm4 = upload.allowed_file('some-filename.txt', ['txt', 'blah', 'gz']) self.assertTrue(actual_perm4) # when actual_perm5 = upload.allowed_file('something.tar.gz', ['gz', 'txt', 'tar.gz']) # then self.assertTrue(actual_perm5) @istest def allowed_file_denied(self): # when actual_perm = upload.allowed_file('some-filename', ['blah']) self.assertFalse(actual_perm) # when actual_perm = upload.allowed_file('something.tgz', ['gz', 'txt', 'tar.gz']) # then self.assertFalse(actual_perm) @patch('swh.web.ui.upload.os.path') @patch('swh.web.ui.upload.shutil') @istest def cleanup_ok(self, mock_shutil, mock_os_path): # given mock_os_path.commonprefix.return_value = '/some/upload-dir' mock_shutil.rmtree.return_value = True # when upload.cleanup('/some/upload-dir/some-dummy-path') # then mock_os_path.commonprefix.assert_called_with( ['/some/upload-dir', '/some/upload-dir/some-dummy-path']) mock_shutil.rmtree.assert_called_with( '/some/upload-dir/some-dummy-path') @patch('swh.web.ui.upload.os.path') @patch('swh.web.ui.upload.shutil') @istest def cleanup_should_fail(self, mock_shutil, mock_os_path): # given mock_os_path.commonprefix.return_value = '/somewhere/forbidden' mock_shutil.rmtree.return_value = True # when with self.assertRaises(AssertionError): upload.cleanup('/some/upload-dir/some-dummy-path') # then mock_os_path.commonprefix.assert_called_with( ['/some/upload-dir', '/some/upload-dir/some-dummy-path']) self.assertTrue(mock_shutil.rmtree.not_called) + + @istest + def save_in_upload_folder_no_file(self): + # when + act_tmpdir, act_name, act_path = upload.save_in_upload_folder(None) + + # then + self.assertIsNone(act_tmpdir) + self.assertIsNone(act_name) + self.assertIsNone(act_path) + + @istest + def save_in_upload_folder_file_not_allowed(self): + # given + file = MagicMock() + file.filename = 'some-non-file-allowed.ext' + + # when + with self.assertRaises(ValueError) as exc: + act_tmpdir, act_name, act_path = upload.save_in_upload_folder(file) + + # then + self.assertIn('Only', exc.exception.args[0]) + self.assertIn('extensions are valid for upload', exc.exception.args[0]) + + @patch('swh.web.ui.upload.werkzeug') + @patch('swh.web.ui.upload.tempfile') + @istest + def save_in_upload_folder_OK(self, mock_tempfile, mock_werkzeug): + # given + upload_folder = self.config['conf']['upload_folder'] + + # mock the dependencies + mock_werkzeug.secure_filename.return_value = 'some-allowed-file.txt' + tmpdir = upload_folder + '/foobar/' + mock_tempfile.mkdtemp.return_value = tmpdir + + # mock the input + file = MagicMock() + file.filename = 'some-allowed-file.txt' + + # when + act_tmpdir, act_name, act_path = upload.save_in_upload_folder(file) + + # then + expected_tmpdir = tmpdir + expected_filename = 'some-allowed-file.txt' + expected_filepath = tmpdir + 'some-allowed-file.txt' + + self.assertEqual(act_tmpdir, expected_tmpdir) + self.assertEqual(act_name, expected_filename) + self.assertEqual(act_path, expected_filepath) + + mock_werkzeug.secure_filename.assert_called_with(expected_filename) + file.save.assert_called_once_with(expected_filepath) + + mock_tempfile.mkdtemp.assert_called_with( + suffix='tmp', + prefix='swh.web.ui-', + dir=upload_folder) diff --git a/swh/web/ui/upload.py b/swh/web/ui/upload.py index 4af37249..a577b2da 100644 --- a/swh/web/ui/upload.py +++ b/swh/web/ui/upload.py @@ -1,80 +1,79 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import shutil - -from werkzeug import secure_filename +import werkzeug from swh.web.ui import main def allowed_file(filename, allowed_extensions=[]): """Filter on filename extension. The filename to check for permission. Args: filename. If no extension on the filename, the filename itself is checked against allowed extensions (example of current extensionless filenames: README, LICENCE, BUGS, etc...) Returns: True if allowed, False otherwise. """ if allowed_extensions == []: return True if '.' in filename: return filename.rsplit('.', 1)[1] in allowed_extensions return filename in allowed_extensions def save_in_upload_folder(file): """Persist uploaded file on server. Args: File object (as per Flask's submission form) Returns: a triplet: - the temporary directory holding the persisted file - the filename without any path from the file entry - the complete path filepath """ main_conf = main.app.config['conf'] upload_folder = main_conf['upload_folder'] allowed_extensions = main_conf['upload_allowed_extensions'] if not file: return None, None, None filename = file.filename if allowed_file(filename, allowed_extensions): - filename = secure_filename(filename) + filename = werkzeug.secure_filename(filename) tmpdir = tempfile.mkdtemp(suffix='tmp', prefix='swh.web.ui-', dir=upload_folder) filepath = os.path.join(tmpdir, filename) file.save(filepath) # persist on disk (not found how to avoid this) return tmpdir, filename, filepath else: raise ValueError( 'Only %s extensions are valid for upload.' % allowed_extensions) def cleanup(tmpdir): """Clean up after oneself. Args: The directory dir to destroy. """ upload_folder = main.app.config['conf']['upload_folder'] assert (os.path.commonprefix([upload_folder, tmpdir]) == upload_folder) shutil.rmtree(tmpdir)