diff --git a/swh/web/ui/exc.py b/swh/web/ui/exc.py index 43d32e54..b415bfdf 100644 --- a/swh/web/ui/exc.py +++ b/swh/web/ui/exc.py @@ -1,14 +1,12 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information class BadInputExc(ValueError): - def __init__(self, errorMsg): - super().__init__(errorMsg) + pass class NotFoundExc(Exception): - def __init__(self, errorMsg): - super().__init__(errorMsg) + pass diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py index e8cd5991..dffc9a93 100644 --- a/swh/web/ui/service.py +++ b/swh/web/ui/service.py @@ -1,296 +1,296 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from swh.core import hashutil from swh.web.ui import converters, main, query, upload from swh.web.ui.exc import BadInputExc, NotFoundExc def hash_and_search(filepath): """Hash the filepath's content as sha1, then search in storage if it exists. Args: Filepath of the file to hash and search. Returns: Tuple (hex sha1, found as True or false). The found boolean, according to whether the sha1 of the file is present or not. """ h = hashutil.hashfile(filepath) c = main.storage().content_find(h) if c: r = converters.from_content(c) r['found'] = True return r else: return {'sha1': hashutil.hash_to_hex(h['sha1']), 'found': False} def upload_and_search(file): """Upload a file and compute its hash. """ tmpdir, filename, filepath = upload.save_in_upload_folder(file) res = {'filename': filename} try: content = hash_and_search(filepath) res.update(content) return res finally: # clean up if tmpdir: upload.cleanup(tmpdir) def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ (algo, hash) = query.parse_hash(q) found = main.storage().content_find({algo: hash}) return {'found': found, 'algo': algo} def lookup_hash_origin(q): """Return information about the checksum contained in the query q. Args: query string of the form Returns: origin as dictionary if found for the given content. """ algo, h = query.parse_hash(q) origin = main.storage().content_find_occurrence({algo: h}) return converters.from_origin(origin) def lookup_origin(origin_id): """Return information about the origin with id origin_id. Args: origin_id as string Returns: origin information as dict. """ return main.storage().origin_get({'id': origin_id}) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ persons = main.storage().person_get([person_id]) if not persons: return None return converters.from_person(persons[0]) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ algo, hBinSha1 = query.parse_hash(sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') directory_entries = main.storage().directory_get(hBinSha1) if not directory_entries: return None return map(converters.from_directory_entry, directory_entries) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ algo, hBinSha1 = query.parse_hash(release_sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') res = main.storage().release_get([hBinSha1]) if res and len(res) >= 1: return converters.from_release(res[0]) return None def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ algo, hBinSha1 = query.parse_hash(rev_sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') res = main.storage().revision_get([hBinSha1]) if res and len(res) >= 1: return converters.from_revision(res[0]) return None def lookup_revision_log(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ - algo, hBinSha1 = query.parse_hash(rev_sha1_git) + algo, bin_sha1 = query.parse_hash(rev_sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') - revision_entries = main.storage().revision_log(hBinSha1) + revision_entries = main.storage().revision_log(bin_sha1) return map(converters.from_revision, revision_entries) def lookup_revision_with_context(sha1_git_root, sha1_git): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history sha1_git: one of sha1_git_root's ancestors Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ revision = lookup_revision(sha1_git) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) revision_root = lookup_revision(sha1_git_root) if not revision_root: raise NotFoundExc('Revision %s not found' % sha1_git_root) - hBinRootSha1 = hashutil.hex_to_hash(sha1_git_root) + bin_sha1_root = hashutil.hex_to_hash(sha1_git_root) - revision_log = main.storage().revision_log(hBinRootSha1) + revision_log = main.storage().revision_log(bin_sha1_root) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = hashutil.hash_to_hex(rev['id']) parents[rev_id] = [] for parent_id in rev['parents']: parent_id = hashutil.hash_to_hex(parent_id) parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return revision def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ (algo, hash) = query.parse_hash(q) c = main.storage().content_find({algo: hash}) if c: return converters.from_content(c) return None def lookup_content_raw(q): """Lookup the content designed by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ (algo, hash) = query.parse_hash(q) c = main.storage().content_find({algo: hash}) if not c: return None sha1 = c['sha1'] contents = main.storage().content_get([sha1]) if contents and len(contents) >= 1: return converters.from_content(contents[0]) return None def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return main.storage().stat_counters() diff --git a/swh/web/ui/tests/test_renderers.py b/swh/web/ui/tests/test_renderers.py index 8cce1a92..442430cc 100644 --- a/swh/web/ui/tests/test_renderers.py +++ b/swh/web/ui/tests/test_renderers.py @@ -1,219 +1,219 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from flask_api.mediatypes import MediaType from nose.tools import istest from unittest.mock import patch from swh.web.ui import renderers class RendererTestCase(unittest.TestCase): @patch('swh.web.ui.renderers.request') @istest - def SWHFilterRenderer_do_nothing(self, mock_request): + def swh_filter_renderer_do_nothing(self, mock_request): # given mock_request.args = {} - swhFilterRenderer = renderers.SWHFilterEnricher() + swh_filter_renderer = renderers.SWHFilterEnricher() input_data = {'a': 'some-data'} # when - actual_data = swhFilterRenderer.filter_by_fields(input_data) + actual_data = swh_filter_renderer.filter_by_fields(input_data) # then self.assertEquals(actual_data, input_data) @patch('swh.web.ui.renderers.utils') @patch('swh.web.ui.renderers.request') @istest - def SWHFilterRenderer_do_filter(self, mock_request, mock_utils): + def swh_filter_renderer_do_filter(self, mock_request, mock_utils): # given mock_request.args = {'fields': 'a,c'} mock_utils.filter_field_keys.return_value = {'a': 'some-data'} - swhFilterRenderer = renderers.SWHFilterEnricher() + swh_filter_renderer = renderers.SWHFilterEnricher() input_data = {'a': 'some-data', 'b': 'some-other-data'} # when - actual_data = swhFilterRenderer.filter_by_fields(input_data) + actual_data = swh_filter_renderer.filter_by_fields(input_data) # then self.assertEquals(actual_data, {'a': 'some-data'}) mock_utils.filter_field_keys.assert_called_once_with(input_data, {'a', 'c'}) @patch('swh.web.ui.renderers.request') @istest - def yamlRenderer_without_filter(self, mock_request): + def yaml_renderer_without_filter(self, mock_request): # given mock_request.args = {} - yamlRenderer = renderers.YAMLRenderer() + yaml_renderer = renderers.YAMLRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} expected_data = input_data # when - actual_data = yamlRenderer.render(input_data, 'application/yaml') + actual_data = yaml_renderer.render(input_data, 'application/yaml') # then self.assertEqual(yaml.load(actual_data), expected_data) @patch('swh.web.ui.renderers.request') @istest - def yamlRenderer(self, mock_request): + def yaml_renderer(self, mock_request): # given mock_request.args = {'fields': 'type,target'} - yamlRenderer = renderers.YAMLRenderer() + yaml_renderer = renderers.YAMLRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} expected_data = {'target': 'sha1-dir', 'type': 'dir'} # when - actual_data = yamlRenderer.render(input_data, 'application/yaml') + actual_data = yaml_renderer.render(input_data, 'application/yaml') # then self.assertEqual(yaml.load(actual_data), expected_data) @patch('swh.web.ui.renderers.request') @istest - def jsonRenderer_basic(self, mock_request): + def json_renderer_basic(self, mock_request): # given mock_request.args = {} - jsonRenderer = renderers.SWHJSONRenderer() + json_renderer = renderers.SWHJSONRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} expected_data = input_data # when - actual_data = jsonRenderer.render(input_data, MediaType( + actual_data = json_renderer.render(input_data, MediaType( 'application/json')) # then self.assertEqual(json.loads(actual_data), expected_data) @patch('swh.web.ui.renderers.request') @istest - def jsonRenderer_basic_with_filter(self, mock_request): + def json_renderer_basic_with_filter(self, mock_request): # given mock_request.args = {'fields': 'target'} - jsonRenderer = renderers.SWHJSONRenderer() + json_renderer = renderers.SWHJSONRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} expected_data = {'target': 'sha1-dir'} # when - actual_data = jsonRenderer.render(input_data, MediaType( + actual_data = json_renderer.render(input_data, MediaType( 'application/json')) # then self.assertEqual(json.loads(actual_data), expected_data) @patch('swh.web.ui.renderers.request') @istest - def jsonRenderer_basic_with_filter_and_jsonp(self, mock_request): + def json_renderer_basic_with_filter_and_jsonp(self, mock_request): # given mock_request.args = {'fields': 'target', 'callback': 'jsonpfn'} - jsonRenderer = renderers.SWHJSONRenderer() + json_renderer = renderers.SWHJSONRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} # when - actual_data = jsonRenderer.render(input_data, MediaType( + actual_data = json_renderer.render(input_data, MediaType( 'application/json')) # then self.assertEqual(actual_data, 'jsonpfn({"target": "sha1-dir"})') @patch('swh.web.ui.renderers.request') @istest - def jsonpEnricher_basic_with_filter_and_jsonp(self, mock_request): + def jsonp_enricher_basic_with_filter_and_jsonp(self, mock_request): # given mock_request.args = {'callback': 'jsonpfn'} - jsonpEnricher = renderers.JSONPEnricher() + jsonp_enricher = renderers.JSONPEnricher() # when - actual_output = jsonpEnricher.enrich_with_jsonp({'output': 'test'}) + actual_output = jsonp_enricher.enrich_with_jsonp({'output': 'test'}) # then self.assertEqual(actual_output, "jsonpfn({'output': 'test'})") @patch('swh.web.ui.renderers.request') @istest - def jsonpEnricher_do_nothing(self, mock_request): + def jsonp_enricher_do_nothing(self, mock_request): # given mock_request.args = {} - jsonpEnricher = renderers.JSONPEnricher() + jsonp_enricher = renderers.JSONPEnricher() # when - actual_output = jsonpEnricher.enrich_with_jsonp({'output': 'test'}) + actual_output = jsonp_enricher.enrich_with_jsonp({'output': 'test'}) # then self.assertEqual(actual_output, {'output': 'test'}) @istest def urlize_api_links(self): # update api link with html links content with links content = '{"url": "/api/1/abc/"}' expected_content = '{"url": "/api/1/abc/"}' self.assertEquals(renderers.urlize_api_links(content), expected_content) # will do nothing since it's not an api url other_content = '{"url": "/something/api/1/other"}' self.assertEquals(renderers.urlize_api_links(other_content), other_content) @istest def safe_docstring_display(self): # update api link with html links content with links docstring = """

Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git.

Args: sha1_git: the revision's hash

Returns: Information on the revision if found.

Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the revision is not found.

""" expected_docstring = """

Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git.

Args:
   sha1_git: the revision's hash

Returns:
   Information on the revision if found.

Raises:
   BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the revision is not found.

""" self.assertEquals(renderers.safe_docstring_display(docstring), expected_docstring) diff --git a/swh/web/ui/tests/test_upload.py b/swh/web/ui/tests/test_upload.py index 331486f5..88668d6d 100644 --- a/swh/web/ui/tests/test_upload.py +++ b/swh/web/ui/tests/test_upload.py @@ -1,148 +1,148 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui import upload from swh.web.ui.tests import test_app class UploadTestCase(test_app.SWHApiTestCase): @istest def allowed_file_ok(self): # when actual_perm = upload.allowed_file('README') self.assertTrue(actual_perm) # when actual_perm2 = upload.allowed_file('README', []) self.assertTrue(actual_perm2) # when actual_perm3 = upload.allowed_file('README', ['README', 'LICENCE', 'BUGS']) self.assertTrue(actual_perm3) # when actual_perm4 = upload.allowed_file('some-filename.txt', ['txt', 'blah', 'gz']) self.assertTrue(actual_perm4) # when actual_perm5 = upload.allowed_file('something.tar.gz', ['gz', 'txt', 'tar.gz']) # then self.assertTrue(actual_perm5) @istest def allowed_file_denied(self): # when actual_perm = upload.allowed_file('some-filename', ['blah']) self.assertFalse(actual_perm) # when actual_perm = upload.allowed_file('something.tgz', ['gz', 'txt', 'tar.gz']) # then self.assertFalse(actual_perm) @patch('swh.web.ui.upload.os.path') @patch('swh.web.ui.upload.shutil') @istest def cleanup_ok(self, mock_shutil, mock_os_path): # given mock_os_path.commonprefix.return_value = '/some/upload-dir' mock_shutil.rmtree.return_value = True # when upload.cleanup('/some/upload-dir/some-dummy-path') # then mock_os_path.commonprefix.assert_called_with( ['/some/upload-dir', '/some/upload-dir/some-dummy-path']) mock_shutil.rmtree.assert_called_with( '/some/upload-dir/some-dummy-path') @patch('swh.web.ui.upload.os.path') @patch('swh.web.ui.upload.shutil') @istest def cleanup_should_fail(self, mock_shutil, mock_os_path): # given mock_os_path.commonprefix.return_value = '/somewhere/forbidden' mock_shutil.rmtree.return_value = True # when with self.assertRaises(AssertionError): upload.cleanup('/some/upload-dir/some-dummy-path') # then mock_os_path.commonprefix.assert_called_with( ['/some/upload-dir', '/some/upload-dir/some-dummy-path']) self.assertTrue(mock_shutil.rmtree.not_called) @istest def save_in_upload_folder_no_file(self): # when act_tmpdir, act_name, act_path = upload.save_in_upload_folder(None) # then self.assertIsNone(act_tmpdir) self.assertIsNone(act_name) self.assertIsNone(act_path) @istest def save_in_upload_folder_file_not_allowed(self): # given file = MagicMock() file.filename = 'some-non-file-allowed.ext' # when with self.assertRaises(ValueError) as exc: act_tmpdir, act_name, act_path = upload.save_in_upload_folder(file) # then self.assertIn('Only', exc.exception.args[0]) self.assertIn('extensions are valid for upload', exc.exception.args[0]) @patch('swh.web.ui.upload.werkzeug') @patch('swh.web.ui.upload.tempfile') @istest - def save_in_upload_folder_OK(self, mock_tempfile, mock_werkzeug): + def save_in_upload_folder_ok(self, mock_tempfile, mock_werkzeug): # given upload_folder = self.app_config['conf']['upload_folder'] # mock the dependencies mock_werkzeug.secure_filename.return_value = 'some-allowed-file.txt' tmpdir = upload_folder + '/foobar/' mock_tempfile.mkdtemp.return_value = tmpdir # mock the input file = MagicMock() file.filename = 'some-allowed-file.txt' # when act_tmpdir, act_name, act_path = upload.save_in_upload_folder(file) # then expected_tmpdir = tmpdir expected_filename = 'some-allowed-file.txt' expected_filepath = tmpdir + 'some-allowed-file.txt' self.assertEqual(act_tmpdir, expected_tmpdir) self.assertEqual(act_name, expected_filename) self.assertEqual(act_path, expected_filepath) mock_werkzeug.secure_filename.assert_called_with(expected_filename) file.save.assert_called_once_with(expected_filepath) mock_tempfile.mkdtemp.assert_called_with( suffix='tmp', prefix='swh.web.ui-', dir=upload_folder) diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index ab2d50b5..37476db6 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,171 +1,171 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import patch from nose.tools import istest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @patch('swh.web.ui.utils.flask') @istest def prepare_directory_listing(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_directory': sha1_git = kwds['sha1_git'] return '/path/to/url/dir' + '/' + sha1_git else: sha1_git = kwds['q'] return '/path/to/url/file' + '/' + sha1_git mock_flask.url_for.side_effect = mock_url_for inputs = [{'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'}] expected_output = [{'link': '/path/to/url/dir/123', 'name': 'some-dir-name', 'type': 'dir'}, {'link': '/path/to/url/file/654', 'name': 'some-filename', 'type': 'file'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname', 'type': 'dir'}] # when actual_outputs = utils.prepare_directory_listing(inputs) # then self.assertEquals(actual_outputs, expected_output) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given - inputSet = {1, 2} + input_set = {1, 2} # when - actual_res = utils.filter_field_keys(inputSet, {'a', '1'}) + actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then - self.assertEqual(actual_res, inputSet) + self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, 4], utils.fmap(lambda x: x+1, [1, 2, 3])) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ')