diff --git a/swh/web/ui/renderers.py b/swh/web/ui/renderers.py index 8a145fc6d..7dfca46b9 100644 --- a/swh/web/ui/renderers.py +++ b/swh/web/ui/renderers.py @@ -1,284 +1,284 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re import yaml import json from docutils.core import publish_parts from docutils.writers.html4css1 import Writer, HTMLTranslator from inspect import cleandoc from jinja2 import escape, Markup from flask import request, Response, render_template from flask import g from pygments import highlight from pygments.lexers import guess_lexer from pygments.formatters import HtmlFormatter from swh.web.ui import utils class SWHFilterEnricher(): """Global filter on fields. """ @classmethod def filter_by_fields(cls, data): """Extract a request parameter 'fields' if it exists to permit the filtering on the data dict's keys. If such field is not provided, returns the data as is. """ fields = request.args.get('fields') if fields: fields = set(fields.split(',')) data = utils.filter_field_keys(data, fields) return data class SWHComputeLinkHeader: """Add link header to response. Mixin intended to be used for example in SWHMultiResponse """ @classmethod def compute_link_header(cls, rv, options): """Add Link header in returned value results. Expects rv to be a dict with 'results' and 'headers' key: 'results': the returned value expected to be shown 'headers': dictionary with link-next and link-prev Args: rv (dict): with keys: - 'headers': potential headers with 'link-next' and 'link-prev' keys - 'results': containing the result to return options (dict): the initial dict to update with result if any Returns: Dict with optional keys 'link-next' and 'link-prev'. """ link_headers = [] if 'headers' not in rv: return {} rv_headers = rv['headers'] if 'link-next' in rv_headers: link_headers.append('<%s>; rel="next"' % ( rv_headers['link-next'])) if 'link-prev' in rv_headers: link_headers.append('<%s>; rel="previous"' % ( rv_headers['link-prev'])) if link_headers: link_header_str = ','.join(link_headers) headers = options.get('headers', {}) headers.update({ 'Link': link_header_str }) return headers return {} class SWHTransformProcessor: """Transform an eventual returned value with multiple layer of information with only what's necessary. If the returned value rv contains the 'results' key, this is the associated value which is returned. Otherwise, return the initial dict without the potential 'headers' key. """ @classmethod def transform(cls, rv): if 'results' in rv: return rv['results'] if 'headers' in rv: rv.pop('headers') return rv class SWHMultiResponse(Response, SWHFilterEnricher, SWHComputeLinkHeader, SWHTransformProcessor): """ A Flask Response subclass. Override force_type to transform dict/list responses into callable Flask response objects whose mimetype matches the request's Accept header: HTML template render, YAML dump or default to a JSON dump. """ @classmethod def make_response_from_mimetype(cls, rv, options={}): options = options.copy() if not (isinstance(rv, list) or isinstance(rv, dict)): return rv def wants_html(best_match): return best_match == 'text/html' and \ request.accept_mimetypes[best_match] > \ request.accept_mimetypes['application/json'] def wants_yaml(best_match): return best_match == 'application/yaml' and \ request.accept_mimetypes[best_match] > \ request.accept_mimetypes['application/json'] - rv = cls.filter_by_fields(rv) acc_mime = ['application/json', 'application/yaml', 'text/html'] best_match = request.accept_mimetypes.best_match(acc_mime) options['headers'] = cls.compute_link_header(rv, options) rv = cls.transform(rv) + rv = cls.filter_by_fields(rv) if wants_html(best_match): data = json.dumps(rv, sort_keys=True, indent=4, separators=(',', ': ')) env = g.get('doc_env', {}) env['response_data'] = data env['headers_data'] = None if options and 'headers' in options: env['headers_data'] = options['headers'] env['request'] = request rv = Response(render_template('apidoc.html', **env), content_type='text/html', **options) elif wants_yaml(best_match): rv = Response( yaml.dump(rv), content_type='application/yaml', **options) else: # jsonify is unhappy with lists in Flask 0.10.1, use json.dumps rv = Response( json.dumps(rv), content_type='application/json', **options) return rv @classmethod def force_type(cls, rv, environ=None): if isinstance(rv, dict) or isinstance(rv, list): rv = cls.make_response_from_mimetype(rv) return super().force_type(rv, environ) def error_response(error_code, error): """Private function to create a custom error response. """ error_opts = {'status': error_code} error_data = {'error': str(error)} return SWHMultiResponse.make_response_from_mimetype(error_data, options=error_opts) def urlize_api_links(text): """Utility function for decorating api links in browsable api. Args: text: whose content matching links should be transformed into contextual API or Browse html links. Returns The text transformed if any link is found. The text as is otherwise. """ return re.sub(r'(/api/.*/|/browse/.*/)', r'\1', str(escape(text))) def urlize_header_links(text): """Utility function for decorating headers links in browsable api. Args text: Text whose content contains Link header value Returns: The text transformed with html link if any link is found. The text as is otherwise. """ return re.sub(r'<(/api/.*|/browse/.*)>', r'<\1>', text) class NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] def visit_bullet_list(self, node): self.context.append((self.compact_simple, self.compact_p)) self.compact_p = None self.compact_simple = self.is_compactable(node) self.body.append(self.starttag(node, 'ul', CLASS='docstring')) DOCSTRING_WRITER = Writer() DOCSTRING_WRITER.translator_class = NoHeaderHTMLTranslator def safe_docstring_display(docstring): """ Utility function to htmlize reST-formatted documentation in browsable api. """ docstring = cleandoc(docstring) return publish_parts(docstring, writer=DOCSTRING_WRITER)['html_body'] def revision_id_from_url(url): """Utility function to obtain a revision's ID from its browsing URL.""" return re.sub(r'/browse/revision/([0-9a-f]{40}|[0-9a-f]{64})/.*', r'\1', url) def highlight_source(source_code_as_text): """Leverage pygments to guess and highlight source code. Args source_code_as_text (str): source code in plain text Returns: Highlighted text if possible or plain text otherwise """ try: maybe_lexer = guess_lexer(source_code_as_text) if maybe_lexer: r = highlight( source_code_as_text, maybe_lexer, HtmlFormatter(linenos=True, lineanchors='l', anchorlinenos=True)) else: r = '
%s
' % source_code_as_text except: r = '
%s
' % source_code_as_text return Markup(r) diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index 8cbed0ba3..ae765ac1f 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,939 +1,905 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dateutil import unittest from unittest.mock import patch, call from nose.tools import istest, nottest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @istest def prepare_data_for_view_default_encoding(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs) # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view_ko_cannot_decode(self): self.maxDiff = None # given inputs = { 'data': 'hé dude!'.encode('utf8'), } actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, { 'data': "Cannot decode the data bytes, try and set another " "encoding in the url (e.g. ?encoding=utf8) or " "download directly the " "content's raw data.", }) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_map(self): # when actual_res = utils.filter_field_keys( map(lambda x: {'i': x['i']+1, 'j': x['j']}, [{'i': 1, 'j': None}, {'i': 2, 'j': None}, {'i': 3, 'j': None}]), {'i'}) # then self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) - @istest - def filter_field_keys_complex_1(self): - actual_res = utils.filter_field_keys( - { - 'list': [ - {'directory': 1, 'file': 2, 'link': 3}, - {'dir': 1, 'fil': 2, 'lin': 3} - ] - }, - {'list', 'directory', 'dir'}) - # then - self.assertEqual(actual_res, - {'list': [{'directory': 1}, {'dir': 1}]}) - - @istest - def filter_field_keys_complex_2(self): - actual_res = utils.filter_field_keys( - { - 'list': [ - [{'directory': 1, 'file': 2, 'link': 3}], - [{'dir': 1, 'fil': 2, 'lin': 3}], - ] - }, - {'list', 'directory', 'dir'}) - # then - self.assertEqual( - actual_res, - { - 'list': [ - [{'directory': 1}], - [{'dir': 1}] - ] - }) - @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, None, 4], utils.fmap(lambda x: x+1, [1, 2, None, 3])) self.assertEquals([11, 12, 13], list(utils.fmap(lambda x: x+10, map(lambda x: x, [1, 2, 3])))) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) self.assertEquals({'a': [2, 6], 'b': 4}, utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) self.assertIsNone(utils.fmap(lambda x: x, None)) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def parse_timestamp(self): input_timestamps = [ '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2016, 1, 12, 9, 19, 12, tzinfo=dateutil.tz.tzoffset(None, 3600)), datetime.datetime(2047, 1, 1, 8, 21, tzinfo=datetime.timezone.utc), datetime.datetime(2016, 1, 12, 9, 39, 2, tzinfo=datetime.timezone.utc), ] for ts, exp_date in zip(input_timestamps, output_dates): self.assertEquals(utils.parse_timestamp(ts), exp_date) @istest def enrich_release_0(self): # when actual_release = utils.enrich_release({}) # then self.assertEqual(actual_release, {}) @patch('swh.web.ui.utils.flask') @istest def enrich_release_1(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/content/sha1_git:123/' # when actual_release = utils.enrich_release({'target': '123', 'target_type': 'content'}) # then self.assertEqual(actual_release, { 'target': '123', 'target_type': 'content', 'target_url': '/api/1/content/sha1_git:123/' }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:123') @patch('swh.web.ui.utils.flask') @istest def enrich_release_2(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/dir/23/' # when actual_release = utils.enrich_release({'target': '23', 'target_type': 'directory'}) # then self.assertEqual(actual_release, { 'target': '23', 'target_type': 'directory', 'target_url': '/api/1/dir/23/' }) mock_flask.url_for.assert_called_once_with('api_directory', q='23') @patch('swh.web.ui.utils.flask') @istest def enrich_release_3(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/rev/3/' # when actual_release = utils.enrich_release({'target': '3', 'target_type': 'revision'}) # then self.assertEqual(actual_release, { 'target': '3', 'target_type': 'revision', 'target_url': '/api/1/rev/3/' }) mock_flask.url_for.assert_called_once_with('api_revision', sha1_git='3') @patch('swh.web.ui.utils.flask') @istest def enrich_release_4(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/rev/4/' # when actual_release = utils.enrich_release({'target': '4', 'target_type': 'release'}) # then self.assertEqual(actual_release, { 'target': '4', 'target_type': 'release', 'target_url': '/api/1/rev/4/' }) mock_flask.url_for.assert_called_once_with('api_release', sha1_git='4') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_no_type(self, mock_flask): # when/then self.assertEqual(utils.enrich_directory({'id': 'dir-id'}), {'id': 'dir-id'}) # given mock_flask.url_for.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'target': '123', }) # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'target': '123', 'target_url': '/api/content/sha1_git:123/', }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:123') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_with_context_and_type_file(self, mock_flask): # given mock_flask.url_for.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', }, context_url='/api/revision/revsha1/directory/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', 'target_url': '/api/content/sha1_git:123/', 'file_url': '/api/revision/revsha1/directory' '/prefix/path/hy/' }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:789') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_with_context_and_type_dir(self, mock_flask): # given mock_flask.url_for.return_value = '/api/directory/456/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'dir', 'name': 'emacs-42', 'target_type': 'file', 'target': '456', }, context_url='/api/revision/origin/2/directory/some/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'dir', 'target_type': 'file', 'name': 'emacs-42', 'target': '456', 'target_url': '/api/directory/456/', 'dir_url': '/api/revision/origin/2/directory' '/some/prefix/path/emacs-42/' }) mock_flask.url_for.assert_called_once_with('api_directory', sha1_git='456') @istest def enrich_content_without_hashes(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.ui.utils.flask') @istest def enrich_content_with_hashes(self, mock_flask): for h in ['sha1', 'sha256', 'sha1_git']: # given mock_flask.url_for.side_effect = [ '/api/content/%s:123/raw/' % h, '/api/filetype/%s:123/' % h, '/api/language/%s:123/' % h, '/api/license/%s:123/' % h, ] # when enriched_content = utils.enrich_content( { 'id': '123', h: 'blahblah' } ) # then self.assertEqual( enriched_content, { 'id': '123', h: 'blahblah', 'data_url': '/api/content/%s:123/raw/' % h, 'filetype_url': '/api/filetype/%s:123/' % h, 'language_url': '/api/language/%s:123/' % h, 'license_url': '/api/license/%s:123/' % h, } ) mock_flask.url_for.assert_has_calls([ call('api_content_raw', q='%s:blahblah' % h), call('api_content_filetype', q='%s:blahblah' % h), call('api_content_language', q='%s:blahblah' % h), call('api_content_license', q='%s:blahblah' % h), ]) mock_flask.reset() @patch('swh.web.ui.utils.flask') @istest def enrich_content_with_hashes_and_top_level_url(self, mock_flask): for h in ['sha1', 'sha256', 'sha1_git']: # given mock_flask.url_for.side_effect = [ '/api/content/%s:123/' % h, '/api/content/%s:123/raw/' % h, '/api/filetype/%s:123/' % h, '/api/language/%s:123/' % h, '/api/license/%s:123/' % h, ] # when enriched_content = utils.enrich_content( { 'id': '123', h: 'blahblah' }, top_url=True ) # then self.assertEqual( enriched_content, { 'id': '123', h: 'blahblah', 'content_url': '/api/content/%s:123/' % h, 'data_url': '/api/content/%s:123/raw/' % h, 'filetype_url': '/api/filetype/%s:123/' % h, 'language_url': '/api/language/%s:123/' % h, 'license_url': '/api/license/%s:123/' % h, } ) mock_flask.url_for.assert_has_calls([ call('api_content_metadata', q='%s:blahblah' % h), call('api_content_raw', q='%s:blahblah' % h), call('api_content_filetype', q='%s:blahblah' % h), call('api_content_language', q='%s:blahblah' % h), call('api_content_license', q='%s:blahblah' % h), ]) mock_flask.reset() @istest def enrich_entity_identity(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.ui.utils.flask') @istest def enrich_entity_with_sha1(self, mock_flask): # given def url_for_test(fn, **entity): return '/api/entity/' + entity['uuid'] + '/' mock_flask.url_for.side_effect = url_for_test # when actual_entity = utils.enrich_entity({ 'uuid': 'uuid-1', 'parent': 'uuid-parent', 'name': 'something' }) # then self.assertEqual(actual_entity, { 'uuid': 'uuid-1', 'uuid_url': '/api/entity/uuid-1/', 'parent': 'uuid-parent', 'parent_url': '/api/entity/uuid-parent/', 'name': 'something', }) mock_flask.url_for.assert_has_calls([call('api_entity_by_uuid', uuid='uuid-1'), call('api_entity_by_uuid', uuid='uuid-parent')]) @nottest def _url_for_context_test(self, fn, **data): if fn == 'api_revision': if 'context' in data and data['context'] is not None: return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa else: return '/api/revision/%s/' % data['sha1_git'] elif fn == 'api_revision_log': if 'prev_sha1s' in data: return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % data['sha1_git'] @patch('swh.web.ui.utils.flask') @istest def enrich_revision_without_children_or_parent(self, mock_flask): # given def url_for_test(fn, **data): if fn == 'api_revision': return '/api/revision/' + data['sha1_git'] + '/' elif fn == 'api_revision_log': return '/api/revision/' + data['sha1_git'] + '/log/' elif fn == 'api_directory': return '/api/directory/' + data['sha1_git'] + '/' elif fn == 'api_person': return '/api/person/' + data['person_id'] + '/' mock_flask.url_for.side_effect = url_for_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'directory': '123', 'author': {'id': '1'}, 'committer': {'id': '2'}, }) expected_revision = { 'id': 'rev-id', 'directory': '123', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'directory_url': '/api/directory/123/', 'author': {'id': '1'}, 'author_url': '/api/person/1/', 'committer': {'id': '2'}, 'committer_url': '/api/person/2/' } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_person', person_id='1'), call('api_person', person_id='2'), call('api_directory', sha1_git='123')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_children_and_parent_no_dir(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_no_context(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }) expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/'], 'children': ['456'], 'children_urls': ['/api/revision/456/'] } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision', sha1_git='123'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_context_empty_prev_list(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'parents': ['123'], 'parent_urls': ['/api/revision/123/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'parents': ['123'], 'children': ['456']}, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_context_some_prev_list(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev1-rev/prev0-rev/log/'), 'parents': ['123'], 'parent_urls': ['/api/revision/123/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev0-rev/prev/prev1-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456']}, context='prev1-rev/prev0-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev0-rev', context='prev1-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev1-rev/prev0-rev'), call('api_revision', sha1_git='123'), call('api_revision', sha1_git='456')]) @nottest def _url_for_rev_message_test(self, fn, **data): if fn == 'api_revision': if 'context' in data and data['context'] is not None: return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa else: return '/api/revision/%s/' % data['sha1_git'] elif fn == 'api_revision_log': if 'prev_sha1s' in data and data['prev_sha1s'] is not None: return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % data['sha1_git'] elif fn == 'api_revision_raw_message': return '/api/revision/' + data['sha1_git'] + '/raw/' else: return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_no_message(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_rev_message_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'parents': ['123'], 'parent_urls': ['/api/revision/123/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_invalid_message(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_rev_message_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'message_decoding_failed': True, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'message_decoding_failed': True, 'message_url': '/api/revision/rev-id/raw/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123'), call('api_revision', sha1_git='456')]) diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py index 990deee16..ed06c64ae 100644 --- a/swh/web/ui/utils.py +++ b/swh/web/ui/utils.py @@ -1,385 +1,384 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import flask import re from dateutil import parser def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def fmap(f, data): """Map f to data at each level. This must keep the origin data structure type: - map -> map - dict -> dict - list -> list - None -> None Args: f: function that expects one argument. data: data to traverse to apply the f function. list, map, dict or bare value. Returns: The same data-structure with modified values by the f function. """ if not data: return data if isinstance(data, map): return map(lambda y: fmap(f, y), (x for x in data)) if isinstance(data, list): return [fmap(f, x) for x in data] if isinstance(data, dict): return {k: fmap(f, v) for (k, v) in data.items()} return f(data) def prepare_data_for_view(data, encoding='utf-8'): def prepare_data(s): # Note: can only be 'data' key with bytes of raw content if isinstance(s, bytes): try: return s.decode(encoding) except: return "Cannot decode the data bytes, try and set another " \ "encoding in the url (e.g. ?encoding=utf8) or " \ "download directly the " \ "content's raw data." if isinstance(s, str): return re.sub(r'/api/1/', r'/browse/', s) return s return fmap(prepare_data, data) def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return map(lambda x: filter_field_keys(x, field_keys), data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): - return {k: filter_field_keys(v, field_keys) - for (k, v) in data.items() if k in field_keys} + return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as datetime. Returns: a timezone-aware datetime representing the parsed value. If the parsed value doesn't specify a timezone, UTC is assumed. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ default_timestamp = datetime.datetime.utcfromtimestamp(0).replace( tzinfo=datetime.timezone.utc) try: res = parser.parse(timestamp, ignoretz=False, fuzzy=True, default=default_timestamp) except: res = datetime.datetime.utcfromtimestamp(float(timestamp)).replace( tzinfo=datetime.timezone.utc) return res def enrich_object(object): """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) Returns: Object enriched with target_url pointing to the right swh.web.ui.api urls for the pointing object (revision, release, content, directory) """ obj = object.copy() if 'target' in obj and 'target_type' in obj: if obj['target_type'] == 'revision': obj['target_url'] = flask.url_for('api_revision', sha1_git=obj['target']) elif obj['target_type'] == 'release': obj['target_url'] = flask.url_for('api_release', sha1_git=obj['target']) elif obj['target_type'] == 'content': obj['target_url'] = flask.url_for( 'api_content_metadata', q='sha1_git:' + obj['target']) elif obj['target_type'] == 'directory': obj['target_url'] = flask.url_for('api_directory', q=obj['target']) return obj enrich_release = enrich_object def enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = flask.url_for('api_content_metadata', q='sha1_git:%s' % target) if context_url: directory['file_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = flask.url_for('api_directory', sha1_git=target) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' return directory def enrich_metadata_endpoint(content): """Enrich metadata endpoint with link to the upper metadata endpoint. """ c = content.copy() c['content_url'] = flask.url_for('api_content_metadata', q='sha1:%s' % c['id']) return c def enrich_content(content, top_url=False): """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information """ for h in ['sha1', 'sha1_git', 'sha256']: if h in content: q = '%s:%s' % (h, content[h]) if top_url: content['content_url'] = flask.url_for('api_content_metadata', q=q) content['data_url'] = flask.url_for('api_content_raw', q=q) content['filetype_url'] = flask.url_for('api_content_filetype', q=q) content['language_url'] = flask.url_for('api_content_language', q=q) content['license_url'] = flask.url_for('api_content_license', q=q) break return content def enrich_entity(entity): """Enrich entity with """ if 'uuid' in entity: entity['uuid_url'] = flask.url_for('api_entity_by_uuid', uuid=entity['uuid']) if 'parent' in entity and entity['parent']: entity['parent_url'] = flask.url_for('api_entity_by_uuid', uuid=entity['parent']) return entity def _get_path_list(path_string): """Helper for enrich_revision: get a list of the sha1 id of the navigation breadcrumbs, ordered from the oldest to the most recent. Args: path_string: the path as a '/'-separated string Returns: The navigation context as a list of sha1 revision ids """ return path_string.split('/') def _get_revision_contexts(rev_id, context): """Helper for enrich_revision: retrieve for the revision id and potentially the navigation breadcrumbs the context to pass to parents and children of of the revision. Args: rev_id: the revision's sha1 id context: the current navigation context Returns: The context for parents, children and the url of the direct child as a tuple in that order. """ context_for_parents = None context_for_children = None url_direct_child = None if not context: return (rev_id, None, None) path_list = _get_path_list(context) context_for_parents = '%s/%s' % (context, rev_id) prev_for_children = path_list[:-1] if len(prev_for_children) > 0: context_for_children = '/'.join(prev_for_children) child_id = path_list[-1] # This commit is not the first commit in the path if context_for_children: url_direct_child = flask.url_for( 'api_revision', sha1_git=child_id, context=context_for_children) # This commit is the first commit in the path else: url_direct_child = flask.url_for( 'api_revision', sha1_git=child_id) return (context_for_parents, context_for_children, url_direct_child) def _make_child_url(rev_children, context): """Helper for enrich_revision: retrieve the list of urls corresponding to the children of the current revision according to the navigation breadcrumbs. Args: rev_children: a list of revision id context: the '/'-separated navigation breadcrumbs Returns: the list of the children urls according to the context """ children = [] for child in rev_children: if context and child != _get_path_list(context)[-1]: children.append(flask.url_for('api_revision', sha1_git=child)) elif not context: children.append(flask.url_for('api_revision', sha1_git=child)) return children def enrich_revision(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict context: the navigation breadcrumbs as a /-separated string of revision sha1_git """ ctx_parents, ctx_children, url_direct_child = _get_revision_contexts( revision['id'], context) revision['url'] = flask.url_for('api_revision', sha1_git=revision['id']) revision['history_url'] = flask.url_for('api_revision_log', sha1_git=revision['id']) if context: revision['history_context_url'] = flask.url_for( 'api_revision_log', sha1_git=revision['id'], prev_sha1s=context) if 'author' in revision: author = revision['author'] revision['author_url'] = flask.url_for('api_person', person_id=author['id']) if 'committer' in revision: committer = revision['committer'] revision['committer_url'] = flask.url_for('api_person', person_id=committer['id']) if 'directory' in revision: revision['directory_url'] = flask.url_for( 'api_directory', sha1_git=revision['directory']) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append(flask.url_for('api_revision', sha1_git=parent)) revision['parent_urls'] = parents if 'children' in revision: children = _make_child_url(revision['children'], context) if url_direct_child: children.append(url_direct_child) revision['children_urls'] = children else: if url_direct_child: revision['children_urls'] = [url_direct_child] if 'message_decoding_failed' in revision: revision['message_url'] = flask.url_for( 'api_revision_raw_message', sha1_git=revision['id']) return revision