diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index 04af353e..75a5fe63 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,416 +1,300 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dateutil import unittest from unittest.mock import patch, call from nose.tools import istest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @patch('swh.web.ui.utils.flask') @istest def prepare_directory_listing(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_directory': sha1_git = kwds['sha1_git'] return '/path/to/url/dir' + '/' + sha1_git else: sha1_git = kwds['q'] return '/path/to/url/file' + '/' + sha1_git mock_flask.url_for.side_effect = mock_url_for inputs = [{'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'}] expected_output = [{'link': '/path/to/url/dir/123', 'name': 'some-dir-name', 'type': 'dir'}, {'link': '/path/to/url/file/654', 'name': 'some-filename', 'type': 'file'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname', 'type': 'dir'}] # when actual_outputs = utils.prepare_directory_listing(inputs) # then self.assertEquals(actual_outputs, expected_output) mock_flask.url_for.assert_has_calls([ call('browse_directory', sha1_git='123'), call('browse_content_data', q='654'), call('browse_directory', sha1_git='987'), ]) - @patch('swh.web.ui.utils.flask') - @istest - def prepare_directory_listing_with_revision(self, mock_flask): - # given - def mock_url_for(url_key, **kwds): - if url_key == 'browse_revision_directory': - sha1_git = kwds['sha1_git'] - path = kwds['path'] - return '/browse/revision/' + sha1_git + '/directory/' + path - - mock_flask.url_for.side_effect = mock_url_for - - inputs = [{'type': 'dir', - 'target': '123', - 'name': 'some-dir-name'}, - {'type': 'file', - 'sha1': '654', - 'name': 'some-filename'}, - {'type': 'dir', - 'target': '987', - 'name': 'some-other-dirname'}] - - expected_output = [{'link': '/browse/revision/revsha1/directory/' - 'some/path/some-dir-name', - 'name': 'some-dir-name', - 'type': 'dir'}, - {'link': '/browse/revision/revsha1/directory/' - 'some/path/some-filename', - 'name': 'some-filename', - 'type': 'file'}, - {'link': '/browse/revision/revsha1/directory/' - 'some/path/some-other-dirname', - 'name': 'some-other-dirname', - 'type': 'dir'}] - - # when - actual_outputs = utils.prepare_directory_listing_with_revision( - 'revsha1', 'some/path', inputs) - - # then - self.assertEquals(actual_outputs, expected_output) - - mock_flask.url_for.assert_has_calls([ - call('browse_revision_directory', - sha1_git='revsha1', - path='some/path/some-dir-name'), - call('browse_revision_directory', - sha1_git='revsha1', - path='some/path/some-filename'), - call('browse_revision_directory', - sha1_git='revsha1', - path='some/path/some-other-dirname'), - ]) - @patch('swh.web.ui.utils.flask') @istest def prepare_revision_view(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_directory': return '/browse/directory/' + kwds['sha1_git'] elif url_key == 'browse_revision': return '/browse/revision/' + kwds['sha1_git'] mock_flask.url_for.side_effect = mock_url_for rev_input = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ 'a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], 'children': [ 'b696e2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } expected_rev = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': 'Linus Torvalds ', 'type': 'git', 'author': 'Linus Torvalds ', 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '/browse/directory/' '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ '/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], 'children': [ '/browse/revision/b696e2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } # when actual_rev = utils.prepare_revision_view(rev_input) # then self.assertEquals(actual_rev, expected_rev) mock_flask.url_for.assert_has_calls([ call('browse_revision', sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'), call('browse_revision', sha1_git='b696e2b738bfa26326b3f1f40f0f1eda0c067ccf'), call('browse_directory', sha1_git='2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b'), ]) - @patch('swh.web.ui.utils.flask') - @istest - def prepare_directory_listing_with_revision_history(self, mock_flask): - # given - def mock_url_for(url_key, **kwds): - if url_key == 'browse_revision_history_directory': - sha1_git_root = kwds['sha1_git_root'] - sha1_git = kwds['sha1_git'] - path = kwds['path'] - return '/browse/revision/' + sha1_git_root + \ - '/history/' + sha1_git + '/directory/' + path - - mock_flask.url_for.side_effect = mock_url_for - - inputs = [{'type': 'dir', - 'target': '123', - 'name': 'some-dir-name'}, - {'type': 'file', - 'sha1': '654', - 'name': 'some-filename'}, - {'type': 'dir', - 'target': '987', - 'name': 'some-other-dirname'}] - - expected_output = [{'link': '/browse/revision/revsha1root/history/' - 'revsha1/directory/' - 'some/path/some-dir-name', - 'name': 'some-dir-name', - 'type': 'dir'}, - {'link': '/browse/revision/revsha1root/history/' - 'revsha1/directory/' - 'some/path/some-filename', - 'name': 'some-filename', - 'type': 'file'}, - {'link': '/browse/revision/revsha1root/history/' - 'revsha1/directory/' - 'some/path/some-other-dirname', - 'name': 'some-other-dirname', - 'type': 'dir'}] - - # when - actual_outputs = utils.prepare_directory_listing_with_revision_history( - 'revsha1root', 'revsha1', 'some/path', inputs) - - # then - self.assertEquals(actual_outputs, expected_output) - - mock_flask.url_for.assert_has_calls([ - call('browse_revision_history_directory', - sha1_git_root='revsha1root', - sha1_git='revsha1', - path='some/path/some-dir-name'), - call('browse_revision_history_directory', - sha1_git_root='revsha1root', - sha1_git='revsha1', - path='some/path/some-filename'), - call('browse_revision_history_directory', - sha1_git_root='revsha1root', - sha1_git='revsha1', - path='some/path/some-other-dirname'), - ]) - @istest def prepare_data_for_view(self): # given inputs = [ { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs) # then self.assertEquals(actual_result, [ { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, 4], utils.fmap(lambda x: x+1, [1, 2, 3])) self.assertEquals([11, 12, 13], list(utils.fmap(lambda x: x+10, map(lambda x: x, [1, 2, 3])))) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) self.assertEquals({'a': [2, 6], 'b': 4}, utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def parse_timestamp(self): input_timestamps = [ '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ datetime.datetime(2016, 1, 12, 0, 0), datetime.datetime(2016, 1, 12, 9, 19, 12, tzinfo=dateutil.tz.tzoffset(None, 3600)), datetime.datetime(2047, 1, 1, 8, 21), datetime.datetime(2016, 1, 12, 10, 39, 2), ] for ts, exp_date in zip(input_timestamps, output_dates): self.assertEquals(utils.parse_timestamp(ts), exp_date) diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py index adb7301b..46d71600 100644 --- a/swh/web/ui/utils.py +++ b/swh/web/ui/utils.py @@ -1,240 +1,178 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import flask import re from dateutil import parser def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def prepare_directory_listing(files): """Given a list of dictionary files, return a dictionary ready for view. Args: files: List of files to enrich Returns: List of enriched files with urls to other resources """ ls = [] for entry in files: new_entry = {'name': entry['name'], 'type': entry['type']} if entry['type'] == 'dir': new_entry['link'] = flask.url_for('browse_directory', sha1_git=entry['target']) else: new_entry['link'] = flask.url_for('browse_content_data', q=entry['sha1']) ls.append(new_entry) return ls -def prepare_directory_listing_with_revision(rev_sha1_git, prefix_path, files): - """Given a list of dictionary files, return a dictionary ready for view. - - Args: - rev_sha1_git: The revision identifier - prefix_path: the path to append (could be None) - files: List of files to enrich - - Returns: - List of enriched files with urls to other resources - - """ - ls = [] - for entry in files: - new_entry = {'name': entry['name'], - 'type': entry['type']} - new_path = ''.join([ - '' if not prefix_path - else (prefix_path + '/'), - entry['name']]) - - new_entry['link'] = flask.url_for('browse_revision_directory', - sha1_git=rev_sha1_git, - path=new_path) - ls.append(new_entry) - - return ls - - -def prepare_directory_listing_with_revision_history(sha1_git_root, - sha1_git, - prefix_path, - files): - """Given a list of dictionary files, return a dictionary ready for view. - - Args: - rev_sha1_git: The revision identifier - prefix_path: the path to append (could be None) - files: List of files to enrich - - Returns: - List of enriched files with urls to other resources - - """ - ls = [] - for entry in files: - new_entry = {'name': entry['name'], - 'type': entry['type']} - new_path = ''.join([ - '' if not prefix_path - else (prefix_path + '/'), - entry['name']]) - - new_entry['link'] = flask.url_for('browse_revision_history_directory', - sha1_git_root=sha1_git_root, - sha1_git=sha1_git, - path=new_path) - ls.append(new_entry) - - return ls - - def prepare_revision_view(revision): """Given a revision, return a dictionary ready view. """ author = revision.get('author') if author: revision['author'] = person_to_string(author) committer = revision.get('committer') if committer: revision['committer'] = person_to_string(committer) revision['parents'] = list(map(lambda p: flask.url_for('browse_revision', sha1_git=p), revision.get('parents', []))) if 'children' in revision: revision['children'] = list(map(lambda child: flask.url_for( 'browse_revision', sha1_git=child), revision['children'])) directory = revision.get('directory') if directory: revision['directory'] = flask.url_for('browse_directory', sha1_git=revision['directory']) return revision def fmap(f, data): """Map f to data. Keep the initial data structure as original but map function f to each level. Args: f: function that expects one argument. data: data to traverse to apply the f function. list, map, dict or bare value. Returns: The same data-structure with modified values by the f function. """ if isinstance(data, (list, map)): return [fmap(f, x) for x in data] if isinstance(data, dict): return {k: fmap(f, v) for (k, v) in data.items()} return f(data) def prepare_data_for_view(data): def replace_api_url(s): if isinstance(s, str): return re.sub(r'/api/1/', r'/browse/', s) return s return fmap(replace_api_url, data) def filter_field_keys(obj, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - obj: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(obj, dict): filt_dict = {} for key, value in obj.items(): if key in field_keys: filt_dict[key] = value return filt_dict elif isinstance(obj, list): filt_list = [] for e in obj: filt_list.append(filter_field_keys(e, field_keys)) return filt_list return obj def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as datetime. Returns: datetime result of parsing values. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ try: res = parser.parse(timestamp, ignoretz=False, fuzzy=True) except: res = datetime.datetime.fromtimestamp(float(timestamp)) return res