diff --git a/swh/web/ui/renderers.py b/swh/web/ui/renderers.py
index 8a145fc6d..7dfca46b9 100644
--- a/swh/web/ui/renderers.py
+++ b/swh/web/ui/renderers.py
@@ -1,284 +1,284 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
import yaml
import json
from docutils.core import publish_parts
from docutils.writers.html4css1 import Writer, HTMLTranslator
from inspect import cleandoc
from jinja2 import escape, Markup
from flask import request, Response, render_template
from flask import g
from pygments import highlight
from pygments.lexers import guess_lexer
from pygments.formatters import HtmlFormatter
from swh.web.ui import utils
class SWHFilterEnricher():
"""Global filter on fields.
"""
@classmethod
def filter_by_fields(cls, data):
"""Extract a request parameter 'fields' if it exists to permit the
filtering on the data dict's keys.
If such field is not provided, returns the data as is.
"""
fields = request.args.get('fields')
if fields:
fields = set(fields.split(','))
data = utils.filter_field_keys(data, fields)
return data
class SWHComputeLinkHeader:
"""Add link header to response.
Mixin intended to be used for example in SWHMultiResponse
"""
@classmethod
def compute_link_header(cls, rv, options):
"""Add Link header in returned value results.
Expects rv to be a dict with 'results' and 'headers' key:
'results': the returned value expected to be shown
'headers': dictionary with link-next and link-prev
Args:
rv (dict): with keys:
- 'headers': potential headers with 'link-next'
and 'link-prev' keys
- 'results': containing the result to return
options (dict): the initial dict to update with result if any
Returns:
Dict with optional keys 'link-next' and 'link-prev'.
"""
link_headers = []
if 'headers' not in rv:
return {}
rv_headers = rv['headers']
if 'link-next' in rv_headers:
link_headers.append('<%s>; rel="next"' % (
rv_headers['link-next']))
if 'link-prev' in rv_headers:
link_headers.append('<%s>; rel="previous"' % (
rv_headers['link-prev']))
if link_headers:
link_header_str = ','.join(link_headers)
headers = options.get('headers', {})
headers.update({
'Link': link_header_str
})
return headers
return {}
class SWHTransformProcessor:
"""Transform an eventual returned value with multiple layer of
information with only what's necessary.
If the returned value rv contains the 'results' key, this is the
associated value which is returned.
Otherwise, return the initial dict without the potential 'headers'
key.
"""
@classmethod
def transform(cls, rv):
if 'results' in rv:
return rv['results']
if 'headers' in rv:
rv.pop('headers')
return rv
class SWHMultiResponse(Response, SWHFilterEnricher,
SWHComputeLinkHeader, SWHTransformProcessor):
"""
A Flask Response subclass.
Override force_type to transform dict/list responses into callable Flask
response objects whose mimetype matches the request's Accept header: HTML
template render, YAML dump or default to a JSON dump.
"""
@classmethod
def make_response_from_mimetype(cls, rv, options={}):
options = options.copy()
if not (isinstance(rv, list) or isinstance(rv, dict)):
return rv
def wants_html(best_match):
return best_match == 'text/html' and \
request.accept_mimetypes[best_match] > \
request.accept_mimetypes['application/json']
def wants_yaml(best_match):
return best_match == 'application/yaml' and \
request.accept_mimetypes[best_match] > \
request.accept_mimetypes['application/json']
- rv = cls.filter_by_fields(rv)
acc_mime = ['application/json', 'application/yaml', 'text/html']
best_match = request.accept_mimetypes.best_match(acc_mime)
options['headers'] = cls.compute_link_header(rv, options)
rv = cls.transform(rv)
+ rv = cls.filter_by_fields(rv)
if wants_html(best_match):
data = json.dumps(rv, sort_keys=True,
indent=4, separators=(',', ': '))
env = g.get('doc_env', {})
env['response_data'] = data
env['headers_data'] = None
if options and 'headers' in options:
env['headers_data'] = options['headers']
env['request'] = request
rv = Response(render_template('apidoc.html', **env),
content_type='text/html',
**options)
elif wants_yaml(best_match):
rv = Response(
yaml.dump(rv),
content_type='application/yaml',
**options)
else:
# jsonify is unhappy with lists in Flask 0.10.1, use json.dumps
rv = Response(
json.dumps(rv),
content_type='application/json',
**options)
return rv
@classmethod
def force_type(cls, rv, environ=None):
if isinstance(rv, dict) or isinstance(rv, list):
rv = cls.make_response_from_mimetype(rv)
return super().force_type(rv, environ)
def error_response(error_code, error):
"""Private function to create a custom error response.
"""
error_opts = {'status': error_code}
error_data = {'error': str(error)}
return SWHMultiResponse.make_response_from_mimetype(error_data,
options=error_opts)
def urlize_api_links(text):
"""Utility function for decorating api links in browsable api.
Args:
text: whose content matching links should be transformed into
contextual API or Browse html links.
Returns
The text transformed if any link is found.
The text as is otherwise.
"""
return re.sub(r'(/api/.*/|/browse/.*/)',
r'\1',
str(escape(text)))
def urlize_header_links(text):
"""Utility function for decorating headers links in browsable api.
Args
text: Text whose content contains Link header value
Returns:
The text transformed with html link if any link is found.
The text as is otherwise.
"""
return re.sub(r'<(/api/.*|/browse/.*)>', r'<\1>',
text)
class NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
def visit_bullet_list(self, node):
self.context.append((self.compact_simple, self.compact_p))
self.compact_p = None
self.compact_simple = self.is_compactable(node)
self.body.append(self.starttag(node, 'ul', CLASS='docstring'))
DOCSTRING_WRITER = Writer()
DOCSTRING_WRITER.translator_class = NoHeaderHTMLTranslator
def safe_docstring_display(docstring):
"""
Utility function to htmlize reST-formatted documentation in browsable
api.
"""
docstring = cleandoc(docstring)
return publish_parts(docstring, writer=DOCSTRING_WRITER)['html_body']
def revision_id_from_url(url):
"""Utility function to obtain a revision's ID from its browsing URL."""
return re.sub(r'/browse/revision/([0-9a-f]{40}|[0-9a-f]{64})/.*',
r'\1', url)
def highlight_source(source_code_as_text):
"""Leverage pygments to guess and highlight source code.
Args
source_code_as_text (str): source code in plain text
Returns:
Highlighted text if possible or plain text otherwise
"""
try:
maybe_lexer = guess_lexer(source_code_as_text)
if maybe_lexer:
r = highlight(
source_code_as_text, maybe_lexer,
HtmlFormatter(linenos=True,
lineanchors='l',
anchorlinenos=True))
else:
r = '
%s
' % source_code_as_text
except:
r = '%s
' % source_code_as_text
return Markup(r)
diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py
index 8cbed0ba3..ae765ac1f 100644
--- a/swh/web/ui/tests/test_utils.py
+++ b/swh/web/ui/tests/test_utils.py
@@ -1,939 +1,905 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import dateutil
import unittest
from unittest.mock import patch, call
from nose.tools import istest, nottest
from swh.web.ui import utils
class UtilsTestCase(unittest.TestCase):
def setUp(self):
self.url_map = [dict(rule='/other/',
methods=set(['GET', 'POST', 'HEAD']),
endpoint='foo'),
dict(rule='/some/old/url/',
methods=set(['GET', 'POST']),
endpoint='blablafn'),
dict(rule='/other/old/url/',
methods=set(['GET', 'HEAD']),
endpoint='bar'),
dict(rule='/other',
methods=set([]),
endpoint=None),
dict(rule='/other2',
methods=set([]),
endpoint=None)]
@istest
def filter_endpoints_1(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/some')
# then
self.assertEquals(actual_data, {
'/some/old/url/': {
'methods': ['GET', 'POST'],
'endpoint': 'blablafn'
}
})
@istest
def filter_endpoints_2(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/other',
blacklist=['/other2'])
# then
# rules /other is skipped because its' exactly the prefix url
# rules /other2 is skipped because it's blacklisted
self.assertEquals(actual_data, {
'/other/': {
'methods': ['GET', 'HEAD', 'POST'],
'endpoint': 'foo'
},
'/other/old/url/': {
'methods': ['GET', 'HEAD'],
'endpoint': 'bar'
}
})
@istest
def prepare_data_for_view_default_encoding(self):
self.maxDiff = None
# given
inputs = [
{
'data': b'some blah data'
},
{
'data': 1,
'data_url': '/api/1/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}]
# when
actual_result = utils.prepare_data_for_view(inputs)
# then
self.assertEquals(actual_result, [
{
'data': 'some blah data',
},
{
'data': 1,
'data_url': '/browse/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}
])
@istest
def prepare_data_for_view(self):
self.maxDiff = None
# given
inputs = [
{
'data': b'some blah data'
},
{
'data': 1,
'data_url': '/api/1/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}]
# when
actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
# then
self.assertEquals(actual_result, [
{
'data': 'some blah data',
},
{
'data': 1,
'data_url': '/browse/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}
])
@istest
def prepare_data_for_view_ko_cannot_decode(self):
self.maxDiff = None
# given
inputs = {
'data': 'hé dude!'.encode('utf8'),
}
actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
# then
self.assertEquals(actual_result, {
'data': "Cannot decode the data bytes, try and set another "
"encoding in the url (e.g. ?encoding=utf8) or "
"download directly the "
"content's raw data.",
})
@istest
def filter_field_keys_dict_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory1', 'file2'})
# then
self.assertEqual(actual_res, {})
@istest
def filter_field_keys_dict(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory', 'link'})
# then
self.assertEqual(actual_res, {'directory': 1, 'link': 3})
@istest
def filter_field_keys_list_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'1': 1, '2': 2, 'link': 3}],
{'d'})
# then
self.assertEqual(actual_res, [{}, {}])
@istest
def filter_field_keys_map(self):
# when
actual_res = utils.filter_field_keys(
map(lambda x: {'i': x['i']+1, 'j': x['j']},
[{'i': 1, 'j': None},
{'i': 2, 'j': None},
{'i': 3, 'j': None}]),
{'i'})
# then
self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}])
@istest
def filter_field_keys_list(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'dir': 1, 'fil': 2, 'lin': 3}],
{'directory', 'dir'})
# then
self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}])
- @istest
- def filter_field_keys_complex_1(self):
- actual_res = utils.filter_field_keys(
- {
- 'list': [
- {'directory': 1, 'file': 2, 'link': 3},
- {'dir': 1, 'fil': 2, 'lin': 3}
- ]
- },
- {'list', 'directory', 'dir'})
- # then
- self.assertEqual(actual_res,
- {'list': [{'directory': 1}, {'dir': 1}]})
-
- @istest
- def filter_field_keys_complex_2(self):
- actual_res = utils.filter_field_keys(
- {
- 'list': [
- [{'directory': 1, 'file': 2, 'link': 3}],
- [{'dir': 1, 'fil': 2, 'lin': 3}],
- ]
- },
- {'list', 'directory', 'dir'})
- # then
- self.assertEqual(
- actual_res,
- {
- 'list': [
- [{'directory': 1}],
- [{'dir': 1}]
- ]
- })
-
@istest
def filter_field_keys_other(self):
# given
input_set = {1, 2}
# when
actual_res = utils.filter_field_keys(input_set, {'a', '1'})
# then
self.assertEqual(actual_res, input_set)
@istest
def fmap(self):
self.assertEquals([2, 3, None, 4],
utils.fmap(lambda x: x+1, [1, 2, None, 3]))
self.assertEquals([11, 12, 13],
list(utils.fmap(lambda x: x+10,
map(lambda x: x, [1, 2, 3]))))
self.assertEquals({'a': 2, 'b': 4},
utils.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
self.assertEquals(100,
utils.fmap(lambda x: x*10, 10))
self.assertEquals({'a': [2, 6], 'b': 4},
utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2}))
self.assertIsNone(utils.fmap(lambda x: x, None))
@istest
def person_to_string(self):
self.assertEqual(utils.person_to_string(dict(name='raboof',
email='foo@bar')),
'raboof ')
@istest
def parse_timestamp(self):
input_timestamps = [
'2016-01-12',
'2016-01-12T09:19:12+0100',
'Today is January 1, 2047 at 8:21:00AM',
'1452591542',
]
output_dates = [
datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc),
datetime.datetime(2016, 1, 12, 9, 19, 12,
tzinfo=dateutil.tz.tzoffset(None, 3600)),
datetime.datetime(2047, 1, 1, 8, 21, tzinfo=datetime.timezone.utc),
datetime.datetime(2016, 1, 12, 9, 39, 2,
tzinfo=datetime.timezone.utc),
]
for ts, exp_date in zip(input_timestamps, output_dates):
self.assertEquals(utils.parse_timestamp(ts), exp_date)
@istest
def enrich_release_0(self):
# when
actual_release = utils.enrich_release({})
# then
self.assertEqual(actual_release, {})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_1(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/content/sha1_git:123/'
# when
actual_release = utils.enrich_release({'target': '123',
'target_type': 'content'})
# then
self.assertEqual(actual_release, {
'target': '123',
'target_type': 'content',
'target_url': '/api/1/content/sha1_git:123/'
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:123')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_2(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/dir/23/'
# when
actual_release = utils.enrich_release({'target': '23',
'target_type': 'directory'})
# then
self.assertEqual(actual_release, {
'target': '23',
'target_type': 'directory',
'target_url': '/api/1/dir/23/'
})
mock_flask.url_for.assert_called_once_with('api_directory',
q='23')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_3(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/rev/3/'
# when
actual_release = utils.enrich_release({'target': '3',
'target_type': 'revision'})
# then
self.assertEqual(actual_release, {
'target': '3',
'target_type': 'revision',
'target_url': '/api/1/rev/3/'
})
mock_flask.url_for.assert_called_once_with('api_revision',
sha1_git='3')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_4(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/rev/4/'
# when
actual_release = utils.enrich_release({'target': '4',
'target_type': 'release'})
# then
self.assertEqual(actual_release, {
'target': '4',
'target_type': 'release',
'target_url': '/api/1/rev/4/'
})
mock_flask.url_for.assert_called_once_with('api_release',
sha1_git='4')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_no_type(self, mock_flask):
# when/then
self.assertEqual(utils.enrich_directory({'id': 'dir-id'}),
{'id': 'dir-id'})
# given
mock_flask.url_for.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'target': '123',
})
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'target': '123',
'target_url': '/api/content/sha1_git:123/',
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:123')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_with_context_and_type_file(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
}, context_url='/api/revision/revsha1/directory/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
'target_url': '/api/content/sha1_git:123/',
'file_url': '/api/revision/revsha1/directory'
'/prefix/path/hy/'
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:789')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_with_context_and_type_dir(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/directory/456/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'dir',
'name': 'emacs-42',
'target_type': 'file',
'target': '456',
}, context_url='/api/revision/origin/2/directory/some/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'dir',
'target_type': 'file',
'name': 'emacs-42',
'target': '456',
'target_url': '/api/directory/456/',
'dir_url': '/api/revision/origin/2/directory'
'/some/prefix/path/emacs-42/'
})
mock_flask.url_for.assert_called_once_with('api_directory',
sha1_git='456')
@istest
def enrich_content_without_hashes(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_content_with_hashes(self, mock_flask):
for h in ['sha1', 'sha256', 'sha1_git']:
# given
mock_flask.url_for.side_effect = [
'/api/content/%s:123/raw/' % h,
'/api/filetype/%s:123/' % h,
'/api/language/%s:123/' % h,
'/api/license/%s:123/' % h,
]
# when
enriched_content = utils.enrich_content(
{
'id': '123',
h: 'blahblah'
}
)
# then
self.assertEqual(
enriched_content,
{
'id': '123',
h: 'blahblah',
'data_url': '/api/content/%s:123/raw/' % h,
'filetype_url': '/api/filetype/%s:123/' % h,
'language_url': '/api/language/%s:123/' % h,
'license_url': '/api/license/%s:123/' % h,
}
)
mock_flask.url_for.assert_has_calls([
call('api_content_raw', q='%s:blahblah' % h),
call('api_content_filetype', q='%s:blahblah' % h),
call('api_content_language', q='%s:blahblah' % h),
call('api_content_license', q='%s:blahblah' % h),
])
mock_flask.reset()
@patch('swh.web.ui.utils.flask')
@istest
def enrich_content_with_hashes_and_top_level_url(self, mock_flask):
for h in ['sha1', 'sha256', 'sha1_git']:
# given
mock_flask.url_for.side_effect = [
'/api/content/%s:123/' % h,
'/api/content/%s:123/raw/' % h,
'/api/filetype/%s:123/' % h,
'/api/language/%s:123/' % h,
'/api/license/%s:123/' % h,
]
# when
enriched_content = utils.enrich_content(
{
'id': '123',
h: 'blahblah'
},
top_url=True
)
# then
self.assertEqual(
enriched_content,
{
'id': '123',
h: 'blahblah',
'content_url': '/api/content/%s:123/' % h,
'data_url': '/api/content/%s:123/raw/' % h,
'filetype_url': '/api/filetype/%s:123/' % h,
'language_url': '/api/language/%s:123/' % h,
'license_url': '/api/license/%s:123/' % h,
}
)
mock_flask.url_for.assert_has_calls([
call('api_content_metadata', q='%s:blahblah' % h),
call('api_content_raw', q='%s:blahblah' % h),
call('api_content_filetype', q='%s:blahblah' % h),
call('api_content_language', q='%s:blahblah' % h),
call('api_content_license', q='%s:blahblah' % h),
])
mock_flask.reset()
@istest
def enrich_entity_identity(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_entity_with_sha1(self, mock_flask):
# given
def url_for_test(fn, **entity):
return '/api/entity/' + entity['uuid'] + '/'
mock_flask.url_for.side_effect = url_for_test
# when
actual_entity = utils.enrich_entity({
'uuid': 'uuid-1',
'parent': 'uuid-parent',
'name': 'something'
})
# then
self.assertEqual(actual_entity, {
'uuid': 'uuid-1',
'uuid_url': '/api/entity/uuid-1/',
'parent': 'uuid-parent',
'parent_url': '/api/entity/uuid-parent/',
'name': 'something',
})
mock_flask.url_for.assert_has_calls([call('api_entity_by_uuid',
uuid='uuid-1'),
call('api_entity_by_uuid',
uuid='uuid-parent')])
@nottest
def _url_for_context_test(self, fn, **data):
if fn == 'api_revision':
if 'context' in data and data['context'] is not None:
return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa
else:
return '/api/revision/%s/' % data['sha1_git']
elif fn == 'api_revision_log':
if 'prev_sha1s' in data:
return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % data['sha1_git']
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_without_children_or_parent(self, mock_flask):
# given
def url_for_test(fn, **data):
if fn == 'api_revision':
return '/api/revision/' + data['sha1_git'] + '/'
elif fn == 'api_revision_log':
return '/api/revision/' + data['sha1_git'] + '/log/'
elif fn == 'api_directory':
return '/api/directory/' + data['sha1_git'] + '/'
elif fn == 'api_person':
return '/api/person/' + data['person_id'] + '/'
mock_flask.url_for.side_effect = url_for_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'directory': '123',
'author': {'id': '1'},
'committer': {'id': '2'},
})
expected_revision = {
'id': 'rev-id',
'directory': '123',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'directory_url': '/api/directory/123/',
'author': {'id': '1'},
'author_url': '/api/person/1/',
'committer': {'id': '2'},
'committer_url': '/api/person/2/'
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_person',
person_id='1'),
call('api_person',
person_id='2'),
call('api_directory',
sha1_git='123')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_children_and_parent_no_dir(self,
mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_no_context(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
})
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/']
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_context_empty_prev_list(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'parents': ['123'],
'children': ['456']}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_context_some_prev_list(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev1-rev/prev0-rev/log/'),
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev0-rev/prev/prev1-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456']}, context='prev1-rev/prev0-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev0-rev',
context='prev1-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev1-rev/prev0-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@nottest
def _url_for_rev_message_test(self, fn, **data):
if fn == 'api_revision':
if 'context' in data and data['context'] is not None:
return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa
else:
return '/api/revision/%s/' % data['sha1_git']
elif fn == 'api_revision_log':
if 'prev_sha1s' in data and data['prev_sha1s'] is not None:
return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % data['sha1_git']
elif fn == 'api_revision_raw_message':
return '/api/revision/' + data['sha1_git'] + '/raw/'
else:
return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_no_message(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_rev_message_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_invalid_message(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_rev_message_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'message_decoding_failed': True,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'message_decoding_failed': True,
'message_url': '/api/revision/rev-id/raw/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py
index 990deee16..ed06c64ae 100644
--- a/swh/web/ui/utils.py
+++ b/swh/web/ui/utils.py
@@ -1,385 +1,384 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import flask
import re
from dateutil import parser
def filter_endpoints(url_map, prefix_url_rule, blacklist=[]):
"""Filter endpoints by prefix url rule.
Args:
- url_map: Url Werkzeug.Map of rules
- prefix_url_rule: prefix url string
- blacklist: blacklist of some url
Returns:
Dictionary of url_rule with values methods and endpoint.
The key is the url, the associated value is a dictionary of
'methods' (possible http methods) and 'endpoint' (python function)
"""
out = {}
for r in url_map:
rule = r['rule']
if rule == prefix_url_rule or rule in blacklist:
continue
if rule.startswith(prefix_url_rule):
out[rule] = {'methods': sorted(map(str, r['methods'])),
'endpoint': r['endpoint']}
return out
def fmap(f, data):
"""Map f to data at each level.
This must keep the origin data structure type:
- map -> map
- dict -> dict
- list -> list
- None -> None
Args:
f: function that expects one argument.
data: data to traverse to apply the f function.
list, map, dict or bare value.
Returns:
The same data-structure with modified values by the f function.
"""
if not data:
return data
if isinstance(data, map):
return map(lambda y: fmap(f, y), (x for x in data))
if isinstance(data, list):
return [fmap(f, x) for x in data]
if isinstance(data, dict):
return {k: fmap(f, v) for (k, v) in data.items()}
return f(data)
def prepare_data_for_view(data, encoding='utf-8'):
def prepare_data(s):
# Note: can only be 'data' key with bytes of raw content
if isinstance(s, bytes):
try:
return s.decode(encoding)
except:
return "Cannot decode the data bytes, try and set another " \
"encoding in the url (e.g. ?encoding=utf8) or " \
"download directly the " \
"content's raw data."
if isinstance(s, str):
return re.sub(r'/api/1/', r'/browse/', s)
return s
return fmap(prepare_data, data)
def filter_field_keys(data, field_keys):
"""Given an object instance (directory or list), and a csv field keys
to filter on.
Return the object instance with filtered keys.
Note: Returns obj as is if it's an instance of types not in (dictionary,
list)
Args:
- data: one object (dictionary, list...) to filter.
- field_keys: csv or set of keys to filter the object on
Returns:
obj filtered on field_keys
"""
if isinstance(data, map):
return map(lambda x: filter_field_keys(x, field_keys), data)
if isinstance(data, list):
return [filter_field_keys(x, field_keys) for x in data]
if isinstance(data, dict):
- return {k: filter_field_keys(v, field_keys)
- for (k, v) in data.items() if k in field_keys}
+ return {k: v for (k, v) in data.items() if k in field_keys}
return data
def person_to_string(person):
"""Map a person (person, committer, tagger, etc...) to a string.
"""
return ''.join([person['name'], ' <', person['email'], '>'])
def parse_timestamp(timestamp):
"""Given a time or timestamp (as string), parse the result as datetime.
Returns:
a timezone-aware datetime representing the parsed value. If the parsed
value doesn't specify a timezone, UTC is assumed.
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- Today is January 1, 2047 at 8:21:00AM
- 1452591542
"""
default_timestamp = datetime.datetime.utcfromtimestamp(0).replace(
tzinfo=datetime.timezone.utc)
try:
res = parser.parse(timestamp, ignoretz=False, fuzzy=True,
default=default_timestamp)
except:
res = datetime.datetime.utcfromtimestamp(float(timestamp)).replace(
tzinfo=datetime.timezone.utc)
return res
def enrich_object(object):
"""Enrich an object (revision, release) with link to the 'target' of
type 'target_type'.
Args:
object: An object with target and target_type keys
(e.g. release, revision)
Returns:
Object enriched with target_url pointing to the right
swh.web.ui.api urls for the pointing object (revision,
release, content, directory)
"""
obj = object.copy()
if 'target' in obj and 'target_type' in obj:
if obj['target_type'] == 'revision':
obj['target_url'] = flask.url_for('api_revision',
sha1_git=obj['target'])
elif obj['target_type'] == 'release':
obj['target_url'] = flask.url_for('api_release',
sha1_git=obj['target'])
elif obj['target_type'] == 'content':
obj['target_url'] = flask.url_for(
'api_content_metadata',
q='sha1_git:' + obj['target'])
elif obj['target_type'] == 'directory':
obj['target_url'] = flask.url_for('api_directory',
q=obj['target'])
return obj
enrich_release = enrich_object
def enrich_directory(directory, context_url=None):
"""Enrich directory with url to content or directory.
"""
if 'type' in directory:
target_type = directory['type']
target = directory['target']
if target_type == 'file':
directory['target_url'] = flask.url_for('api_content_metadata',
q='sha1_git:%s' % target)
if context_url:
directory['file_url'] = context_url + directory['name'] + '/'
else:
directory['target_url'] = flask.url_for('api_directory',
sha1_git=target)
if context_url:
directory['dir_url'] = context_url + directory['name'] + '/'
return directory
def enrich_metadata_endpoint(content):
"""Enrich metadata endpoint with link to the upper metadata endpoint.
"""
c = content.copy()
c['content_url'] = flask.url_for('api_content_metadata',
q='sha1:%s' % c['id'])
return c
def enrich_content(content, top_url=False):
"""Enrich content with links to:
- data_url: its raw data
- filetype_url: its filetype information
"""
for h in ['sha1', 'sha1_git', 'sha256']:
if h in content:
q = '%s:%s' % (h, content[h])
if top_url:
content['content_url'] = flask.url_for('api_content_metadata',
q=q)
content['data_url'] = flask.url_for('api_content_raw', q=q)
content['filetype_url'] = flask.url_for('api_content_filetype',
q=q)
content['language_url'] = flask.url_for('api_content_language',
q=q)
content['license_url'] = flask.url_for('api_content_license',
q=q)
break
return content
def enrich_entity(entity):
"""Enrich entity with
"""
if 'uuid' in entity:
entity['uuid_url'] = flask.url_for('api_entity_by_uuid',
uuid=entity['uuid'])
if 'parent' in entity and entity['parent']:
entity['parent_url'] = flask.url_for('api_entity_by_uuid',
uuid=entity['parent'])
return entity
def _get_path_list(path_string):
"""Helper for enrich_revision: get a list of the sha1 id of the navigation
breadcrumbs, ordered from the oldest to the most recent.
Args:
path_string: the path as a '/'-separated string
Returns:
The navigation context as a list of sha1 revision ids
"""
return path_string.split('/')
def _get_revision_contexts(rev_id, context):
"""Helper for enrich_revision: retrieve for the revision id and potentially
the navigation breadcrumbs the context to pass to parents and children of
of the revision.
Args:
rev_id: the revision's sha1 id
context: the current navigation context
Returns:
The context for parents, children and the url of the direct child as a
tuple in that order.
"""
context_for_parents = None
context_for_children = None
url_direct_child = None
if not context:
return (rev_id, None, None)
path_list = _get_path_list(context)
context_for_parents = '%s/%s' % (context, rev_id)
prev_for_children = path_list[:-1]
if len(prev_for_children) > 0:
context_for_children = '/'.join(prev_for_children)
child_id = path_list[-1]
# This commit is not the first commit in the path
if context_for_children:
url_direct_child = flask.url_for(
'api_revision',
sha1_git=child_id,
context=context_for_children)
# This commit is the first commit in the path
else:
url_direct_child = flask.url_for(
'api_revision',
sha1_git=child_id)
return (context_for_parents, context_for_children, url_direct_child)
def _make_child_url(rev_children, context):
"""Helper for enrich_revision: retrieve the list of urls corresponding
to the children of the current revision according to the navigation
breadcrumbs.
Args:
rev_children: a list of revision id
context: the '/'-separated navigation breadcrumbs
Returns:
the list of the children urls according to the context
"""
children = []
for child in rev_children:
if context and child != _get_path_list(context)[-1]:
children.append(flask.url_for('api_revision', sha1_git=child))
elif not context:
children.append(flask.url_for('api_revision', sha1_git=child))
return children
def enrich_revision(revision, context=None):
"""Enrich revision with links where it makes sense (directory, parents).
Keep track of the navigation breadcrumbs if they are specified.
Args:
revision: the revision as a dict
context: the navigation breadcrumbs as a /-separated string of revision
sha1_git
"""
ctx_parents, ctx_children, url_direct_child = _get_revision_contexts(
revision['id'], context)
revision['url'] = flask.url_for('api_revision', sha1_git=revision['id'])
revision['history_url'] = flask.url_for('api_revision_log',
sha1_git=revision['id'])
if context:
revision['history_context_url'] = flask.url_for(
'api_revision_log',
sha1_git=revision['id'],
prev_sha1s=context)
if 'author' in revision:
author = revision['author']
revision['author_url'] = flask.url_for('api_person',
person_id=author['id'])
if 'committer' in revision:
committer = revision['committer']
revision['committer_url'] = flask.url_for('api_person',
person_id=committer['id'])
if 'directory' in revision:
revision['directory_url'] = flask.url_for(
'api_directory',
sha1_git=revision['directory'])
if 'parents' in revision:
parents = []
for parent in revision['parents']:
parents.append(flask.url_for('api_revision',
sha1_git=parent))
revision['parent_urls'] = parents
if 'children' in revision:
children = _make_child_url(revision['children'], context)
if url_direct_child:
children.append(url_direct_child)
revision['children_urls'] = children
else:
if url_direct_child:
revision['children_urls'] = [url_direct_child]
if 'message_decoding_failed' in revision:
revision['message_url'] = flask.url_for(
'api_revision_raw_message',
sha1_git=revision['id'])
return revision