diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py index e3947ab9..fc639fda 100644 --- a/swh/web/ui/backend.py +++ b/swh/web/ui/backend.py @@ -1,262 +1,261 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.web.ui import main def content_get(sha1_bin): """Lookup the content designed by {algo: hash_bin}. Args: sha1_bin: content's binary sha1. Returns: Content as dict with 'sha1' and 'data' keys. data representing its raw data. """ contents = main.storage().content_get([sha1_bin]) if contents and len(contents) >= 1: return contents[0] return None def content_find(algo, hash_bin): """Retrieve the content with binary hash hash_bin Args: algo: nature of the hash hash_bin. hash_bin: content's hash searched for. Returns: A triplet (sha1, sha1_git, sha256) if the content exist or None otherwise. """ return main.storage().content_find({algo: hash_bin}) def content_find_occurrence(algo, hash_bin): """Find the content's occurrence. Args: algo: nature of the hash hash_bin. hash_bin: content's hash searched for. Returns: The occurrence of the content. """ return main.storage().content_find_occurrence({algo: hash_bin}) def content_missing_per_sha1(sha1list): """List content missing from storage based on sha1 Args: sha1s: Iterable of sha1 to check for absence Returns: an iterable of sha1s missing from the storage """ return main.storage().content_missing_per_sha1(sha1list) def directory_get(sha1_bin): """Retrieve information on one directory. Args: sha1_bin: Directory's identifier Returns: The directory's information. """ res = main.storage().directory_get([sha1_bin]) if res and len(res) >= 1: return res[0] def origin_get(origin_id): """Return information about the origin with id origin_id. Args: origin_id: origin's identifier Returns: Origin information as dict. """ return main.storage().origin_get({'id': origin_id}) def person_get(person_id): """Return information about the person with id person_id. Args: person_id: person's identifier.v Returns: Person information as dict. """ res = main.storage().person_get([person_id]) if res and len(res) >= 1: return res[0] def directory_ls(sha1_git_bin, recursive=False): """Return information about the directory with id sha1_git. Args: sha1_git: directory's identifier. recursive: Optional recursive flag default to False Returns: Directory information as dict. """ directory_entries = main.storage().directory_ls(sha1_git_bin, recursive) if not directory_entries: return [] return directory_entries def release_get(sha1_git_bin): """Return information about the release with sha1 sha1_git_bin. Args: sha1_git_bin: The release's sha1 as bytes. Returns: Release information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().release_get([sha1_git_bin]) if res and len(res) >= 1: return res[0] return None def revision_get(sha1_git_bin): """Return information about the revision with sha1 sha1_git_bin. Args: sha1_git_bin: The revision's sha1 as bytes. Returns: Revision information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().revision_get([sha1_git_bin]) if res and len(res) >= 1: return res[0] return None def revision_get_multiple(sha1_git_bin_list): """Return information about the revisions in sha1_git_bin_list Args: sha1_git_bin_list: The revisions' sha1s as a list of bytes. Returns: Revisions' information as an iterable of dicts if any found, an empty list otherwise Raises: ValueError if the identifier provided is not of sha1 nature. """ - print(sha1_git_bin_list) res = main.storage().revision_get(sha1_git_bin_list) if res and len(res) >= 1: return res return [] def revision_log(sha1_git_bin, limit=100): """Return information about the revision with sha1 sha1_git_bin. Args: sha1_git_bin: The revision's sha1 as bytes. limit: the maximum number of revisions returned. Returns: Revision information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ return main.storage().revision_log([sha1_git_bin], limit) def revision_log_by(origin_id, branch_name, ts, limit=100): """Return information about the revision matching the timestamp ts, from origin origin_id, in branch branch_name. Args: origin_id: origin of the revision - branch_name: revision's branch. - timestamp: revision's time frame. Returns: Information for the revision matching the criterions. """ return main.storage().revision_log_by(origin_id, branch_name, ts) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return main.storage().stat_counters() def revision_get_by(origin_id, branch_name, timestamp): """Return occurrence information matching the criterions origin_id, branch_name, ts. """ res = main.storage().revision_get_by(origin_id, branch_name, timestamp=timestamp, limit=1) if not res: return None return res[0] def directory_entry_get_by_path(directory, path): """Return a directory entry by its path. """ paths = path.strip(os.path.sep).split(os.path.sep) return main.storage().directory_entry_get_by_path( directory, list(map(lambda p: p.encode('utf-8'), paths))) def entity_get(uuid): """Retrieve the entity per its uuid. """ return main.storage().entity_get(uuid) diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index 33fbbca4..edbeec49 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,833 +1,829 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dateutil import unittest from unittest.mock import patch, call from nose.tools import istest, nottest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @istest def prepare_data_for_view_default_encoding(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs) # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view_ko_cannot_decode(self): self.maxDiff = None # given inputs = { 'data': 'hé dude!'.encode('utf8'), } actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, { 'data': "Cannot decode the data bytes, try and set another " "encoding in the url (e.g. ?encoding=utf8) or " "download directly the " "content's raw data.", }) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, 4], utils.fmap(lambda x: x+1, [1, 2, 3])) self.assertEquals([11, 12, 13], list(utils.fmap(lambda x: x+10, map(lambda x: x, [1, 2, 3])))) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) self.assertEquals({'a': [2, 6], 'b': 4}, utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def parse_timestamp(self): input_timestamps = [ '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ datetime.datetime(2016, 1, 12, 0, 0), datetime.datetime(2016, 1, 12, 9, 19, 12, tzinfo=dateutil.tz.tzoffset(None, 3600)), datetime.datetime(2047, 1, 1, 8, 21), datetime.datetime(2016, 1, 12, 10, 39, 2), ] for ts, exp_date in zip(input_timestamps, output_dates): self.assertEquals(utils.parse_timestamp(ts), exp_date) @istest def enrich_release_0(self): # when actual_release = utils.enrich_release({}) # then self.assertEqual(actual_release, {}) @patch('swh.web.ui.utils.flask') @istest def enrich_release_1(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/content/sha1_git:123/' # when actual_release = utils.enrich_release({'target': '123', 'target_type': 'content'}) # then self.assertEqual(actual_release, { 'target': '123', 'target_type': 'content', 'target_url': '/api/1/content/sha1_git:123/' }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:123') @patch('swh.web.ui.utils.flask') @istest def enrich_release_2(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/dir/23/' # when actual_release = utils.enrich_release({'target': '23', 'target_type': 'directory'}) # then self.assertEqual(actual_release, { 'target': '23', 'target_type': 'directory', 'target_url': '/api/1/dir/23/' }) mock_flask.url_for.assert_called_once_with('api_directory', q='23') @patch('swh.web.ui.utils.flask') @istest def enrich_release_3(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/rev/3/' # when actual_release = utils.enrich_release({'target': '3', 'target_type': 'revision'}) # then self.assertEqual(actual_release, { 'target': '3', 'target_type': 'revision', 'target_url': '/api/1/rev/3/' }) mock_flask.url_for.assert_called_once_with('api_revision', sha1_git='3') @patch('swh.web.ui.utils.flask') @istest def enrich_release_4(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/rev/4/' # when actual_release = utils.enrich_release({'target': '4', 'target_type': 'release'}) # then self.assertEqual(actual_release, { 'target': '4', 'target_type': 'release', 'target_url': '/api/1/rev/4/' }) mock_flask.url_for.assert_called_once_with('api_release', sha1_git='4') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_no_type(self, mock_flask): # when/then self.assertEqual(utils.enrich_directory({'id': 'dir-id'}), {'id': 'dir-id'}) # given mock_flask.url_for.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'target': '123', }) # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'target': '123', 'target_url': '/api/content/sha1_git:123/', }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:123') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_with_context_and_type_file(self, mock_flask): # given mock_flask.url_for.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', }, context_url='/api/revision/revsha1/directory/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', 'target_url': '/api/content/sha1_git:123/', 'file_url': '/api/revision/revsha1/directory' '/prefix/path/hy/' }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:789') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_with_context_and_type_dir(self, mock_flask): # given mock_flask.url_for.return_value = '/api/directory/456/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'dir', 'name': 'emacs-42', 'target_type': 'file', 'target': '456', }, context_url='/api/revision/origin/2/directory/some/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'dir', 'target_type': 'file', 'name': 'emacs-42', 'target': '456', 'target_url': '/api/directory/456/', 'dir_url': '/api/revision/origin/2/directory' '/some/prefix/path/emacs-42/' }) mock_flask.url_for.assert_called_once_with('api_directory', sha1_git='456') @istest def enrich_content_without_sha1(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.ui.utils.flask') @istest def enrich_content_with_sha1(self, mock_flask): # given mock_flask.url_for.return_value = '/api/content/sha1:123/raw/' # when/then self.assertEqual(utils.enrich_content( {'id': '123', 'sha1': 'blahblah'}), {'id': '123', 'sha1': 'blahblah', 'data_url': '/api/content/sha1:123/raw/'}) mock_flask.url_for.assert_called_once_with('api_content_raw', q='blahblah') @istest def enrich_entity_identity(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.ui.utils.flask') @istest def enrich_entity_with_sha1(self, mock_flask): # given def url_for_test(fn, **entity): return '/api/entity/' + entity['uuid'] + '/' mock_flask.url_for.side_effect = url_for_test # when actual_entity = utils.enrich_entity({ 'uuid': 'uuid-1', 'parent': 'uuid-parent', 'name': 'something' }) # then self.assertEqual(actual_entity, { 'uuid': 'uuid-1', 'uuid_url': '/api/entity/uuid-1/', 'parent': 'uuid-parent', 'parent_url': '/api/entity/uuid-parent/', 'name': 'something', }) mock_flask.url_for.assert_has_calls([call('api_entity_by_uuid', uuid='uuid-1'), call('api_entity_by_uuid', uuid='uuid-parent')]) @nottest - def url_for_context_test(self, fn, **data): + def _url_for_context_test(self, fn, **data): if fn == 'api_revision': if 'context' in data and data['context'] is not None: return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa else: return '/api/revision/%s/' % data['sha1_git'] elif fn == 'api_revision_log': if 'prev_sha1s' in data: return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % data['sha1_git'] @patch('swh.web.ui.utils.flask') @istest def enrich_revision_without_children_or_parent(self, mock_flask): # given def url_for_test(fn, **data): - if fn == 'api_revision' or fn == 'api_revision_log': - return self.url_for_context_test(fn, **data) + if fn == 'api_revision': + return '/api/revision/' + data['sha1_git'] + '/' + elif fn == 'api_revision_log': + return '/api/revision/' + data['sha1_git'] + '/log/' elif fn == 'api_directory': return '/api/directory/' + data['sha1_git'] + '/' elif fn == 'api_person': return '/api/person/' + data['person_id'] + '/' mock_flask.url_for.side_effect = url_for_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'directory': '123', 'author': {'id': '1'}, 'committer': {'id': '2'}, }) expected_revision = { 'id': 'rev-id', 'directory': '123', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'directory_url': '/api/directory/123/', 'author': {'id': '1'}, 'author_url': '/api/person/1/', 'committer': {'id': '2'}, 'committer_url': '/api/person/2/' } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_person', person_id='1'), call('api_person', person_id='2'), call('api_directory', sha1_git='123')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_children_and_parent_no_dir(self, mock_flask): # given def url_for_test(fn, **data): - if fn == 'api_revision' or fn == 'api_revision_log': - return self.url_for_context_test(fn, **data) - else: - return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa + return self._url_for_context_test(fn, **data) mock_flask.url_for.side_effect = url_for_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_no_context(self, mock_flask): # given def url_for_test(fn, **data): - if fn == 'api_revision' or fn == 'api_revision_log': - return self.url_for_context_test(fn, **data) - else: - return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa + return self._url_for_context_test(fn, **data) mock_flask.url_for.side_effect = url_for_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }) expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/'] } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision', sha1_git='123', context='rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_context_empty_prev_list(self, mock_flask): # given - mock_flask.url_for.side_effect = self.url_for_context_test + mock_flask.url_for.side_effect = self._url_for_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'parents': ['123'], 'children': ['456']}, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_context_some_prev_list(self, mock_flask): # given - mock_flask.url_for.side_effect = self.url_for_context_test + mock_flask.url_for.side_effect = self._url_for_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev1-rev/prev0-rev/log/'), 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/' 'prev1-rev/prev0-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev0-rev/prev/prev1-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456']}, context='prev1-rev/prev0-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev0-rev', context='prev1-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev1-rev/prev0-rev'), call('api_revision', sha1_git='123', context='prev1-rev/prev0-rev/rev-id'), call('api_revision', sha1_git='456')]) @nottest def _url_for_rev_message_test(self, fn, **data): if fn == 'api_revision': if 'context' in data and data['context'] is not None: return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa else: return '/api/revision/%s/' % data['sha1_git'] elif fn == 'api_revision_log': if 'prev_sha1s' in data and data['prev_sha1s'] is not None: return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % data['sha1_git'] elif fn == 'api_revision_raw_message': return '/api/revision/' + data['sha1_git'] + '/raw/' else: return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_no_message(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_rev_message_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_invalid_message(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_rev_message_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'message_decoding_failed': True, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'message_decoding_failed': True, 'message_url': '/api/revision/rev-id/raw/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py index 8328da21..55cc76e6 100644 --- a/swh/web/ui/utils.py +++ b/swh/web/ui/utils.py @@ -1,290 +1,290 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import flask import re from dateutil import parser def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def fmap(f, data): """Map f to data. Keep the initial data structure as original but map function f to each level. Args: f: function that expects one argument. data: data to traverse to apply the f function. list, map, dict or bare value. Returns: The same data-structure with modified values by the f function. """ if isinstance(data, map): return (fmap(f, x) for x in data) if isinstance(data, list): return [fmap(f, x) for x in data] if isinstance(data, dict): return {k: fmap(f, v) for (k, v) in data.items()} return f(data) def prepare_data_for_view(data, encoding='utf-8'): def prepare_data(s): # Note: can only be 'data' key with bytes of raw content if isinstance(s, bytes): try: return s.decode(encoding) except: return "Cannot decode the data bytes, try and set another " \ "encoding in the url (e.g. ?encoding=utf8) or " \ "download directly the " \ "content's raw data." if isinstance(s, str): return re.sub(r'/api/1/', r'/browse/', s) return s return fmap(prepare_data, data) def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return (filter_field_keys(x, field_keys) for x in data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as datetime. Returns: datetime result of parsing values. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ try: res = parser.parse(timestamp, ignoretz=False, fuzzy=True) except: res = datetime.datetime.fromtimestamp(float(timestamp)) return res def enrich_release(release): """Enrich a release with link to the 'target' of 'type' revision. """ if 'target' in release and 'target_type' in release: if release['target_type'] == 'revision': release['target_url'] = flask.url_for('api_revision', sha1_git=release['target']) elif release['target_type'] == 'release': release['target_url'] = flask.url_for('api_release', sha1_git=release['target']) elif release['target_type'] == 'content': release['target_url'] = flask.url_for( 'api_content_metadata', q='sha1_git:' + release['target']) elif release['target_type'] == 'directory': release['target_url'] = flask.url_for('api_directory', q=release['target']) return release def enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = flask.url_for('api_content_metadata', q='sha1_git:%s' % target) if context_url: directory['file_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = flask.url_for('api_directory', sha1_git=target) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' return directory def enrich_content(content): """Enrich content with 'data', a link to its raw content. """ if 'sha1' in content: content['data_url'] = flask.url_for('api_content_raw', q=content['sha1']) return content def enrich_entity(entity): """Enrich entity with """ if 'uuid' in entity: entity['uuid_url'] = flask.url_for('api_entity_by_uuid', uuid=entity['uuid']) if 'parent' in entity and entity['parent']: entity['parent_url'] = flask.url_for('api_entity_by_uuid', uuid=entity['parent']) return entity def enrich_revision(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict context: the navigation breadcrumbs as a /-separated string of revision sha1_git """ context_for_parents = None context_for_children = None - url_imm_child = None + url_immediate_child = None if not context: context_for_parents = revision['id'] else: context_for_parents = '%s/%s' % (context, revision['id']) prev_for_children = context.split('/')[:-1] if len(prev_for_children) > 0: context_for_children = '/'.join(prev_for_children) # This commit is not the first commit in the path if context_for_children: - url_imm_child = flask.url_for( + url_immediate_child = flask.url_for( 'api_revision', sha1_git=context.split('/')[-1], context=context_for_children) # This commit is the first commit in the path else: - url_imm_child = flask.url_for( + url_immediate_child = flask.url_for( 'api_revision', sha1_git=context.split('/')[-1]) revision['url'] = flask.url_for('api_revision', sha1_git=revision['id']) revision['history_url'] = flask.url_for('api_revision_log', sha1_git=revision['id']) if context is not None: revision['history_context_url'] = flask.url_for( 'api_revision_log', sha1_git=revision['id'], prev_sha1s=context) if 'author' in revision: author = revision['author'] revision['author_url'] = flask.url_for('api_person', person_id=author['id']) if 'committer' in revision: committer = revision['committer'] revision['committer_url'] = flask.url_for('api_person', person_id=committer['id']) if 'directory' in revision: revision['directory_url'] = flask.url_for( 'api_directory', sha1_git=revision['directory']) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append(flask.url_for('api_revision', sha1_git=parent, context=context_for_parents)) revision['parent_urls'] = parents if 'children' in revision: children = [] for child in revision['children']: # child not the breadcrumb-indicated child > new breadcrumb start if context: if child != context.split('/')[-1]: children.append(flask.url_for('api_revision', sha1_git=child)) else: children.append(flask.url_for('api_revision', sha1_git=child)) - if url_imm_child: - children.append(url_imm_child) + if url_immediate_child: + children.append(url_immediate_child) revision['children_urls'] = children else: - if url_imm_child: - revision['children_urls'] = [url_imm_child] + if url_immediate_child: + revision['children_urls'] = [url_immediate_child] if 'message_decoding_failed' in revision: revision['message_url'] = flask.url_for( 'api_revision_raw_message', sha1_git=revision['id']) return revision