diff --git a/swh/web/ui/templates/revision-directory.html b/swh/web/ui/templates/revision-directory.html
new file mode 100644
index 00000000..774d083f
--- /dev/null
+++ b/swh/web/ui/templates/revision-directory.html
@@ -0,0 +1,29 @@
+{% extends "layout.html" %}
+{% block title %}Browse revision at path{% endblock %}
+{% block content %}
+
+{% if message is not none %}
+{{ message }}
+{% endif %}
+
+{% if result is not none %}
+
Browse revision '{{ sha1_git }}' with {{ result['type'] }}path '{{ path }}':
+
+ {% if result['type'] == 'dir' %}
+ {% if result['content'] is not none %}
+ {% for e in result['content'] %}
+
+ {% endfor %}
+ {% endif %}
+ {% else %}
+ {% if result['content'] is not none %}
+ {% for key in ['sha1', 'sha256', 'sha1_git', 'status', 'length', 'ctime'] %}
+
+
{{ key }}
+
{{ result['content'][key] }}
+
+ {% endfor %}
+ {% endif %}
+ {% endif %}
+{% endif %}
+{% endblock %}
diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py
index d6cb6a23..886f425d 100644
--- a/swh/web/ui/tests/test_utils.py
+++ b/swh/web/ui/tests/test_utils.py
@@ -1,267 +1,321 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import dateutil
import unittest
from unittest.mock import patch, call
from nose.tools import istest
from swh.web.ui import utils
class UtilsTestCase(unittest.TestCase):
def setUp(self):
self.url_map = [dict(rule='/other/',
methods=set(['GET', 'POST', 'HEAD']),
endpoint='foo'),
dict(rule='/some/old/url/',
methods=set(['GET', 'POST']),
endpoint='blablafn'),
dict(rule='/other/old/url/',
methods=set(['GET', 'HEAD']),
endpoint='bar'),
dict(rule='/other',
methods=set([]),
endpoint=None),
dict(rule='/other2',
methods=set([]),
endpoint=None)]
@istest
def filter_endpoints_1(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/some')
# then
self.assertEquals(actual_data, {
'/some/old/url/': {
'methods': ['GET', 'POST'],
'endpoint': 'blablafn'
}
})
@istest
def filter_endpoints_2(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/other',
blacklist=['/other2'])
# then
# rules /other is skipped because its' exactly the prefix url
# rules /other2 is skipped because it's blacklisted
self.assertEquals(actual_data, {
'/other/': {
'methods': ['GET', 'HEAD', 'POST'],
'endpoint': 'foo'
},
'/other/old/url/': {
'methods': ['GET', 'HEAD'],
'endpoint': 'bar'
}
})
@patch('swh.web.ui.utils.flask')
@istest
def prepare_directory_listing(self, mock_flask):
# given
def mock_url_for(url_key, **kwds):
if url_key == 'browse_directory':
sha1_git = kwds['sha1_git']
return '/path/to/url/dir' + '/' + sha1_git
else:
sha1_git = kwds['q']
return '/path/to/url/file' + '/' + sha1_git
mock_flask.url_for.side_effect = mock_url_for
inputs = [{'type': 'dir',
'target': '123',
'name': 'some-dir-name'},
{'type': 'file',
'sha1': '654',
'name': 'some-filename'},
{'type': 'dir',
'target': '987',
'name': 'some-other-dirname'}]
expected_output = [{'link': '/path/to/url/dir/123',
'name': 'some-dir-name',
'type': 'dir'},
{'link': '/path/to/url/file/654',
'name': 'some-filename',
'type': 'file'},
{'link': '/path/to/url/dir/987',
'name': 'some-other-dirname',
'type': 'dir'}]
# when
actual_outputs = utils.prepare_directory_listing(inputs)
# then
self.assertEquals(actual_outputs, expected_output)
mock_flask.url_for.assert_has_calls([
call('browse_directory', sha1_git='123'),
call('browse_content_data', q='654'),
call('browse_directory', sha1_git='987'),
])
+ @patch('swh.web.ui.utils.flask')
+ @istest
+ def prepare_directory_listing_with_revision(self, mock_flask):
+ # given
+ def mock_url_for(url_key, **kwds):
+ if url_key == 'browse_revision_directory':
+ sha1_git = kwds['sha1_git']
+ path = kwds['path']
+ return '/browse/revision/' + sha1_git + '/directory/' + path
+
+ mock_flask.url_for.side_effect = mock_url_for
+
+ inputs = [{'type': 'dir',
+ 'target': '123',
+ 'name': 'some-dir-name'},
+ {'type': 'file',
+ 'sha1': '654',
+ 'name': 'some-filename'},
+ {'type': 'dir',
+ 'target': '987',
+ 'name': 'some-other-dirname'}]
+
+ expected_output = [{'link': '/browse/revision/revsha1/directory/'
+ 'some/path/some-dir-name',
+ 'name': 'some-dir-name',
+ 'type': 'dir'},
+ {'link': '/browse/revision/revsha1/directory/'
+ 'some/path/some-filename',
+ 'name': 'some-filename',
+ 'type': 'file'},
+ {'link': '/browse/revision/revsha1/directory/'
+ 'some/path/some-other-dirname',
+ 'name': 'some-other-dirname',
+ 'type': 'dir'}]
+
+ # when
+ actual_outputs = utils.prepare_directory_listing_with_revision(
+ 'revsha1', 'some/path', inputs)
+
+ # then
+ self.assertEquals(actual_outputs, expected_output)
+
+ mock_flask.url_for.assert_has_calls([
+ call('browse_revision_directory',
+ sha1_git='revsha1',
+ path='some/path/some-dir-name'),
+ call('browse_revision_directory',
+ sha1_git='revsha1',
+ path='some/path/some-filename'),
+ call('browse_revision_directory',
+ sha1_git='revsha1',
+ path='some/path/some-other-dirname'),
+ ])
+
@patch('swh.web.ui.utils.flask')
@istest
def prepare_revision_view(self, mock_flask):
# given
def mock_url_for(url_key, **kwds):
if url_key == 'browse_directory':
return '/browse/directory/' + kwds['sha1_git']
elif url_key == 'browse_revision':
return '/browse/revision/' + kwds['sha1_git']
mock_flask.url_for.side_effect = mock_url_for
rev_input = {
'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754',
'date': 'Sun, 05 Jul 2015 18:01:52 GMT',
'committer': {
'email': 'torvalds@linux-foundation.org',
'name': 'Linus Torvalds'
},
'type': 'git',
'author': {
'email': 'torvalds@linux-foundation.org',
'name': 'Linus Torvalds'
},
'message': 'Linux 4.2-rc1\n',
'synthetic': False,
'directory': '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b',
'parents': [
'a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'
],
'children': [
'b696e2b738bfa26326b3f1f40f0f1eda0c067ccf'
],
}
expected_rev = {
'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754',
'date': 'Sun, 05 Jul 2015 18:01:52 GMT',
'committer': 'Linus Torvalds ',
'type': 'git',
'author': 'Linus Torvalds ',
'message': 'Linux 4.2-rc1\n',
'synthetic': False,
'directory': '/browse/directory/'
'2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b',
'parents': [
'/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'
],
'children': [
'/browse/revision/b696e2b738bfa26326b3f1f40f0f1eda0c067ccf'
],
}
# when
actual_rev = utils.prepare_revision_view(rev_input)
# then
self.assertEquals(actual_rev, expected_rev)
mock_flask.url_for.assert_has_calls([
call('browse_revision',
sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'),
call('browse_revision',
sha1_git='b696e2b738bfa26326b3f1f40f0f1eda0c067ccf'),
call('browse_directory',
sha1_git='2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b'),
])
@istest
def filter_field_keys_dict_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory1', 'file2'})
# then
self.assertEqual(actual_res, {})
@istest
def filter_field_keys_dict(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory', 'link'})
# then
self.assertEqual(actual_res, {'directory': 1, 'link': 3})
@istest
def filter_field_keys_list_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'1': 1, '2': 2, 'link': 3}],
{'d'})
# then
self.assertEqual(actual_res, [{}, {}])
@istest
def filter_field_keys_list(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'dir': 1, 'fil': 2, 'lin': 3}],
{'directory', 'dir'})
# then
self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}])
@istest
def filter_field_keys_other(self):
# given
input_set = {1, 2}
# when
actual_res = utils.filter_field_keys(input_set, {'a', '1'})
# then
self.assertEqual(actual_res, input_set)
@istest
def fmap(self):
self.assertEquals([2, 3, 4],
utils.fmap(lambda x: x+1, [1, 2, 3]))
self.assertEquals({'a': 2, 'b': 4},
utils.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
self.assertEquals(100,
utils.fmap(lambda x: x*10, 10))
@istest
def person_to_string(self):
self.assertEqual(utils.person_to_string(dict(name='raboof',
email='foo@bar')),
'raboof ')
@istest
def parse_timestamp(self):
input_timestamps = [
'2016-01-12',
'2016-01-12T09:19:12+0100',
'Today is January 1, 2047 at 8:21:00AM',
'1452591542',
]
output_dates = [
datetime.datetime(2016, 1, 12, 0, 0),
datetime.datetime(2016, 1, 12, 9, 19, 12,
tzinfo=dateutil.tz.tzoffset(None, 3600)),
datetime.datetime(2047, 1, 1, 8, 21),
datetime.datetime(2016, 1, 12, 10, 39, 2),
]
for ts, exp_date in zip(input_timestamps, output_dates):
self.assertEquals(utils.parse_timestamp(ts), exp_date)
diff --git a/swh/web/ui/tests/test_views.py b/swh/web/ui/tests/test_views.py
index 9e26b182..34f266e0 100644
--- a/swh/web/ui/tests/test_views.py
+++ b/swh/web/ui/tests/test_views.py
@@ -1,965 +1,1070 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from nose.tools import istest
from swh.web.ui.tests import test_app
from unittest.mock import patch
from swh.web.ui.exc import BadInputExc, NotFoundExc
class FileMock():
+
def __init__(self, filename):
self.filename = filename
class ViewTestCase(test_app.SWHViewTestCase):
render_template = False
@patch('swh.web.ui.views.flask')
@istest
def homepage(self, mock_flask):
# given
mock_flask.flash.return_value = 'something'
# when
rv = self.client.get('/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('home.html')
mock_flask.flash.assert_called_once_with(
'This Web app is still work in progress, use at your own risk',
'warning')
@istest
def info(self):
# when
rv = self.client.get('/about/')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('about.html')
self.assertIn(b'About', rv.data)
@istest
def search_default(self):
# when
rv = self.client.get('/search/')
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), '')
self.assertEqual(self.get_context_variable('messages'), [])
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('file'), None)
self.assert_template_used('upload_and_search.html')
@patch('swh.web.ui.views.service')
@istest
def search_get_query_hash_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': None}
# when
rv = self.client.get('/search/?q=sha1:456')
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), 'sha1:456')
self.assertEqual(self.get_context_variable('messages'),
['Content with hash sha1:456 not found!'])
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('file'), None)
self.assert_template_used('upload_and_search.html')
mock_service.lookup_hash.assert_called_once_with('sha1:456')
@patch('swh.web.ui.views.service')
@istest
def search_get_query_hash_bad_input(self, mock_service):
# given
mock_service.lookup_hash.side_effect = BadInputExc('error msg')
# when
rv = self.client.get('/search/?q=sha1_git:789')
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), 'sha1_git:789')
self.assertEqual(self.get_context_variable('messages'),
['error msg'])
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('file'), None)
self.assert_template_used('upload_and_search.html')
mock_service.lookup_hash.assert_called_once_with('sha1_git:789')
@patch('swh.web.ui.views.service')
@istest
def search_get_query_hash_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': True}
# when
rv = self.client.get('/search/?q=sha1:123')
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), 'sha1:123')
self.assertEqual(self.get_context_variable('messages'),
['Content with hash sha1:123 found!'])
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('file'), None)
self.assert_template_used('upload_and_search.html')
mock_service.lookup_hash.assert_called_once_with('sha1:123')
@patch('swh.web.ui.views.service')
@istest
def search_post_query_hash_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': None}
# when
rv = self.client.get('/search/?q=sha1:456')
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), 'sha1:456')
self.assertEqual(self.get_context_variable('messages'),
['Content with hash sha1:456 not found!'])
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('file'), None)
self.assert_template_used('upload_and_search.html')
mock_service.lookup_hash.assert_called_once_with('sha1:456')
@patch('swh.web.ui.views.service')
@istest
def search_post_query_hash_bad_input(self, mock_service):
# given
mock_service.lookup_hash.side_effect = BadInputExc('error msg!')
# when
rv = self.client.post('/search/', data=dict(q='sha1_git:987'))
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), 'sha1_git:987')
self.assertEqual(self.get_context_variable('messages'),
['error msg!'])
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('file'), None)
self.assert_template_used('upload_and_search.html')
mock_service.lookup_hash.assert_called_once_with('sha1_git:987')
@patch('swh.web.ui.views.service')
@istest
def search_post_query_hash_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': True}
# when
rv = self.client.post('/search/', data=dict(q='sha1:321'))
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), 'sha1:321')
self.assertEqual(self.get_context_variable('messages'),
['Content with hash sha1:321 found!'])
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('file'), None)
self.assert_template_used('upload_and_search.html')
mock_service.lookup_hash.assert_called_once_with('sha1:321')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def search_post_upload_and_hash_bad_input(self, mock_request,
mock_service):
# given
mock_request.data = {}
mock_request.method = 'POST'
mock_request.files = dict(filename=FileMock('foobar'))
mock_service.upload_and_search.side_effect = BadInputExc(
'error bad input')
# when (mock_request completes the post request)
rv = self.client.post('/search/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('messages'),
['error bad input'])
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def search_post_upload_and_hash_not_found(self, mock_request,
mock_service):
# given
mock_request.data = {}
mock_request.method = 'POST'
mock_request.files = dict(filename=FileMock('foobar'))
mock_service.upload_and_search.return_value = {'filename': 'foobar',
'sha1': 'blahhash',
'found': False}
# when (mock_request completes the post request)
rv = self.client.post('/search/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('messages'),
["File foobar with hash blahhash not found!"])
self.assertEqual(self.get_context_variable('filename'), 'foobar')
self.assertEqual(self.get_context_variable('sha1'), 'blahhash')
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def search_post_upload_and_hash_found(self, mock_request, mock_service):
# given
mock_request.data = {}
mock_request.method = 'POST'
mock_request.files = dict(filename=FileMock('foobar'))
mock_service.upload_and_search.return_value = {'filename': 'foobar',
'sha1': '123456789',
'found': True}
# when (mock_request completes the post request)
rv = self.client.post('/search/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('messages'),
["File foobar with hash 123456789 found!"])
self.assertEqual(self.get_context_variable('filename'), 'foobar')
self.assertEqual(self.get_context_variable('sha1'), '123456789')
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
@patch('swh.web.ui.views.service')
@istest
def browse_content_detail_not_found(self, mock_service):
# given
mock_service.lookup_content.return_value = None
# when
rv = self.client.get('/browse/content/sha1:sha1-hash/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertEqual(self.get_context_variable('message'),
'Content with sha1:sha1-hash not found.')
self.assertEqual(self.get_context_variable('content'),
None)
mock_service.lookup_content.assert_called_once_with(
'sha1:sha1-hash')
@patch('swh.web.ui.views.service')
@istest
def browse_content_detail_bad_input(self, mock_service):
# given
mock_service.lookup_content.side_effect = BadInputExc('Bad input!')
# when
rv = self.client.get('/browse/content/sha1:sha1-hash/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertEqual(self.get_context_variable('message'),
'Bad input!')
self.assertIsNone(self.get_context_variable('content'))
mock_service.lookup_content.assert_called_once_with(
'sha1:sha1-hash')
@patch('swh.web.ui.views.service')
@istest
def browse_content_detail(self, mock_service):
# given
stub_content = {'sha1': 'sha1_hash'}
mock_service.lookup_content.return_value = stub_content
# when
rv = self.client.get('/browse/content/sha1:sha1-hash/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertIsNone(self.get_context_variable('message'))
self.assertEqual(self.get_context_variable('content'),
{'sha1': 'sha1_hash'})
mock_service.lookup_content.assert_called_once_with(
'sha1:sha1-hash')
@patch('swh.web.ui.views.service')
@istest
def browse_content_data(self, mock_service):
# given
stub_content_raw = {
'sha1': 'sha1-hash',
'data': b'some-data'
}
mock_service.lookup_content_raw.return_value = stub_content_raw
# when
rv = self.client.get('/browse/content/sha1:sha1-hash/raw/')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content-data.html')
self.assertEqual(self.get_context_variable('message'),
'Content sha1-hash')
self.assertEqual(self.get_context_variable('content'),
stub_content_raw)
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:sha1-hash')
@patch('swh.web.ui.views.service')
@istest
def browse_content_data_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.client.get('/browse/content/sha1:sha1-unknown/raw/')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content-data.html')
self.assertEqual(self.get_context_variable('message'),
'Content with sha1:sha1-unknown not found.')
self.assertEqual(self.get_context_variable('content'), None)
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:sha1-unknown')
@patch('swh.web.ui.views.service')
@istest
def browse_content_data_invalid_hash(self, mock_service):
# given
mock_service.lookup_content_raw.side_effect = BadInputExc(
'Invalid hash')
# when
rv = self.client.get('/browse/content/sha2:sha1-invalid/raw/')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content-data.html')
self.assertEqual(self.get_context_variable('message'),
'Invalid hash')
self.assertEqual(self.get_context_variable('content'), None)
mock_service.lookup_content_raw.assert_called_once_with(
'sha2:sha1-invalid')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.utils')
@istest
def browse_directory_bad_input(self, mock_utils, mock_service):
# given
mock_service.lookup_directory.side_effect = BadInputExc('Invalid hash')
# when
rv = self.client.get('/browse/directory/sha2-invalid/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Invalid hash')
self.assertEqual(self.get_context_variable('files'), [])
mock_service.lookup_directory.assert_called_once_with(
'sha2-invalid')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.utils')
@istest
def browse_directory_empty_result(self, mock_utils, mock_service):
# given
mock_service.lookup_directory.return_value = None
# when
rv = self.client.get('/browse/directory/some-sha1/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Directory some-sha1 not found.')
self.assertEqual(self.get_context_variable('files'), [])
mock_service.lookup_directory.assert_called_once_with(
'some-sha1')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.utils')
@istest
def browse_directory(self, mock_utils, mock_service):
# given
stub_directory_ls = [
{'type': 'dir',
'target': '123',
'name': 'some-dir-name'},
{'type': 'file',
'sha1': '654',
'name': 'some-filename'},
{'type': 'dir',
'target': '987',
'name': 'some-other-dirname'}
]
mock_service.lookup_directory.return_value = stub_directory_ls
stub_directory_map = [
{'link': '/path/to/url/dir/123',
'name': 'some-dir-name'},
{'link': '/path/to/url/file/654',
'name': 'some-filename'},
{'link': '/path/to/url/dir/987',
'name': 'some-other-dirname'}
]
mock_utils.prepare_directory_listing.return_value = stub_directory_map
# when
rv = self.client.get('/browse/directory/some-sha1/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Listing for directory some-sha1:')
self.assertEqual(self.get_context_variable('files'),
stub_directory_map)
mock_service.lookup_directory.assert_called_once_with(
'some-sha1')
mock_utils.prepare_directory_listing.assert_called_once_with(
stub_directory_ls)
@patch('swh.web.ui.views.service')
# @istest
def browse_content_with_origin_content_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': False}
# when
rv = self.client.get('/browse/content/sha256:some-sha256/origin/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content-with-origin.html')
self.assertEqual(self.get_context_variable('message'),
'Hash sha256:some-sha256 was not found.')
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.views.service')
# @istest
def browse_content_with_origin_bad_input(self, mock_service):
# given
mock_service.lookup_hash.side_effect = BadInputExc('Invalid hash')
# when
rv = self.client.get('/browse/content/sha256:some-sha256/origin/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content-with-origin.html')
self.assertEqual(
self.get_context_variable('message'), 'Invalid hash')
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.views.service')
# @istest
def browse_content_with_origin(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': True}
mock_service.lookup_hash_origin.return_value = {
'origin_type': 'ftp',
'origin_url': '/some/url',
'revision': 'revision-hash',
'branch': 'master',
'path': '/path/to',
}
# when
rv = self.client.get('/browse/content/sha256:some-sha256/origin/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content-with-origin.html')
self.assertEqual(
self.get_context_variable('message'),
"The content with hash sha256:some-sha256 has been seen on " +
"origin with type 'ftp'\n" +
"at url '/some/url'. The revision was identified at " +
"'revision-hash' on branch 'master'.\n" +
"The file's path referenced was '/path/to'.")
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.assert_called_once_with(
'sha256:some-sha256')
@patch('swh.web.ui.views.service')
@istest
def browse_origin_not_found(self, mock_service):
# given
mock_service.lookup_origin.return_value = None
# when
rv = self.client.get('/browse/origin/1/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('origin.html')
self.assertEqual(self.get_context_variable('origin_id'), 1)
self.assertEqual(
self.get_context_variable('message'),
'Origin 1 not found!')
mock_service.lookup_origin.assert_called_once_with(1)
@patch('swh.web.ui.views.service')
@istest
def browse_origin_found(self, mock_service):
# given
mock_origin = {'type': 'git',
'lister': None,
'project': None,
'url': 'rsync://some/url',
'id': 426}
mock_service.lookup_origin.return_value = mock_origin
# when
rv = self.client.get('/browse/origin/426/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('origin.html')
self.assertEqual(self.get_context_variable('origin_id'), 426)
self.assertEqual(self.get_context_variable('origin'), mock_origin)
mock_service.lookup_origin.assert_called_once_with(426)
@patch('swh.web.ui.views.service')
@istest
def browse_origin_bad_input(self, mock_service):
# given
mock_service.lookup_origin.side_effect = BadInputExc('wrong input')
# when
rv = self.client.get('/browse/origin/426/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('origin.html')
self.assertEqual(self.get_context_variable('origin_id'), 426)
mock_service.lookup_origin.assert_called_once_with(426)
@patch('swh.web.ui.views.service')
@istest
def browse_person_not_found(self, mock_service):
# given
mock_service.lookup_person.return_value = None
# when
rv = self.client.get('/browse/person/1/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('person.html')
self.assertEqual(self.get_context_variable('person_id'), 1)
self.assertEqual(
self.get_context_variable('message'),
'Person 1 not found!')
mock_service.lookup_person.assert_called_once_with(1)
@patch('swh.web.ui.views.service')
@istest
def browse_person_found(self, mock_service):
# given
mock_person = {'type': 'git',
'lister': None,
'project': None,
'url': 'rsync://some/url',
'id': 426}
mock_service.lookup_person.return_value = mock_person
# when
rv = self.client.get('/browse/person/426/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('person.html')
self.assertEqual(self.get_context_variable('person_id'), 426)
self.assertEqual(self.get_context_variable('person'), mock_person)
mock_service.lookup_person.assert_called_once_with(426)
@patch('swh.web.ui.views.service')
@istest
def browse_person_bad_input(self, mock_service):
# given
mock_service.lookup_person.side_effect = BadInputExc('wrong input')
# when
rv = self.client.get('/browse/person/426/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('person.html')
self.assertEqual(self.get_context_variable('person_id'), 426)
mock_service.lookup_person.assert_called_once_with(426)
@patch('swh.web.ui.views.service')
@istest
def browse_release_not_found(self, mock_service):
# given
mock_service.lookup_release.return_value = None
# when
rv = self.client.get('/browse/release/1/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('release.html')
self.assertEqual(self.get_context_variable('sha1_git'), '1')
self.assertEqual(
self.get_context_variable('message'),
'Release 1 not found!')
mock_service.lookup_release.assert_called_once_with('1')
@patch('swh.web.ui.views.service')
@istest
def browse_release_bad_input(self, mock_service):
# given
mock_service.lookup_release.side_effect = BadInputExc('wrong input')
# when
rv = self.client.get('/browse/release/426/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('release.html')
self.assertEqual(self.get_context_variable('sha1_git'), '426')
mock_service.lookup_release.assert_called_once_with('426')
@patch('swh.web.ui.views.service')
@istest
def browse_release(self, mock_service):
# given
mock_release = {
"date": "Sun, 05 Jul 2015 18:02:06 GMT",
"id": "1e951912027ea6873da6985b91e50c47f645ae1a",
"target": "d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754",
"synthetic": False,
"target_type": "revision",
"author": {
"email": "torvalds@linux-foundation.org",
"name": "Linus Torvalds"
},
"message": "Linux 4.2-rc1\n",
"name": "v4.2-rc1"
}
mock_service.lookup_release.return_value = mock_release
expected_release = {
"date": "Sun, 05 Jul 2015 18:02:06 GMT",
"id": "1e951912027ea6873da6985b91e50c47f645ae1a",
"target": '/browse/revision/d770e558e21961ad6cfdf0ff7df0'
'eb5d7d4f0754/',
"synthetic": False,
"target_type": "revision",
"author": "Linus Torvalds ",
"message": "Linux 4.2-rc1\n",
"name": "v4.2-rc1"
}
# when
rv = self.client.get('/browse/release/426/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('release.html')
self.assertEqual(self.get_context_variable('sha1_git'), '426')
self.assertEqual(self.get_context_variable('release'),
expected_release)
self.assertEqual(self.get_context_variable('keys'), [
'id', 'name', 'date', 'message', 'author', 'target',
'target_type'])
mock_service.lookup_release.assert_called_once_with('426')
@patch('swh.web.ui.views.service')
@istest
def browse_revision_not_found(self, mock_service):
# given
mock_service.lookup_revision.return_value = None
# when
rv = self.client.get('/browse/revision/1/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('revision.html')
self.assertEqual(self.get_context_variable('sha1_git'), '1')
self.assertEqual(
self.get_context_variable('message'),
'Revision 1 not found!')
mock_service.lookup_revision.assert_called_once_with('1')
@patch('swh.web.ui.views.service')
@istest
def browse_revision_bad_input(self, mock_service):
# given
mock_service.lookup_revision.side_effect = BadInputExc('wrong input')
# when
rv = self.client.get('/browse/revision/426/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('revision.html')
self.assertEqual(self.get_context_variable('sha1_git'), '426')
mock_service.lookup_revision.assert_called_once_with('426')
@patch('swh.web.ui.views.service')
@istest
def browse_revision_found(self, mock_service):
# given
mock_revision = {
'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754',
'date': 'Sun, 05 Jul 2015 18:01:52 GMT',
'committer': {
'email': 'torvalds@linux-foundation.org',
'name': 'Linus Torvalds'
},
'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT',
'metadata': None,
'type': 'git',
'author': {
'email': 'torvalds@linux-foundation.org',
'name': 'Linus Torvalds'
},
'message': 'Linux 4.2-rc1\n',
'synthetic': False,
'directory': '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b',
'parents': [
'a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'
],
}
mock_service.lookup_revision.return_value = mock_revision
expected_revision = {
'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754',
'date': 'Sun, 05 Jul 2015 18:01:52 GMT',
'committer': 'Linus Torvalds ',
'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT',
'type': 'git',
'author': 'Linus Torvalds ',
'message': 'Linux 4.2-rc1\n',
'synthetic': False,
'metadata': None,
'parents': [
'/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf/'
],
'directory': '/browse/directory/2a1dbabeed4dcf1f4a4c441993b2f'
'fc9d972780b/',
}
# when
rv = self.client.get('/browse/revision/426/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('revision.html')
self.assertEqual(self.get_context_variable('sha1_git'), '426')
self.assertEqual(self.get_context_variable('revision'),
expected_revision)
self.assertEqual(self.get_context_variable('keys'),
set(expected_revision.keys()) - set(['directory',
'parents',
'children']))
mock_service.lookup_revision.assert_called_once_with('426')
@istest
def browse_revision_history_same_sha1(self):
# when
rv = self.client.get('/browse/revision/10/history/10/')
# then
self.assertEquals(rv.status_code, 302)
@patch('swh.web.ui.views.service')
@istest
def browse_revision_history_not_found(self, mock_service):
# given
mock_service.lookup_revision_with_context.return_value = None
# when
rv = self.client.get('/browse/revision/1/history/2/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('revision.html')
self.assertEqual(self.get_context_variable('sha1_git_root'), '1')
self.assertEqual(self.get_context_variable('sha1_git'), '2')
self.assertEqual(
self.get_context_variable('message'),
"Possibly sha1_git '2' is not an ancestor of sha1_git_root '1'")
mock_service.lookup_revision_with_context.assert_called_once_with(
'1', '2', 100)
@patch('swh.web.ui.views.service')
@istest
def browse_revision_history_root_not_found(self, mock_service):
# given
mock_service.lookup_revision_with_context.side_effect = NotFoundExc(
'Revision root 123 not found')
# when
rv = self.client.get('/browse/revision/123/history/456/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('revision.html')
self.assertEqual(self.get_context_variable('sha1_git_root'), '123')
self.assertEqual(self.get_context_variable('sha1_git'), '456')
self.assertEqual(
self.get_context_variable('message'),
"Revision root 123 not found")
mock_service.lookup_revision_with_context.assert_called_once_with(
'123', '456', 100)
@patch('swh.web.ui.views.service')
@istest
def browse_revision_history_root_bad_input(self, mock_service):
# given
mock_service.lookup_revision_with_context.side_effect = NotFoundExc(
'Input incorrect')
# when
rv = self.client.get('/browse/revision/321/history/654/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('revision.html')
self.assertEqual(self.get_context_variable('sha1_git_root'), '321')
self.assertEqual(self.get_context_variable('sha1_git'), '654')
self.assertEqual(
self.get_context_variable('message'),
"Input incorrect")
mock_service.lookup_revision_with_context.assert_called_once_with(
'321', '654', 100)
@patch('swh.web.ui.views.utils')
@patch('swh.web.ui.views.service')
@istest
def browse_revision_history(self, mock_service, mock_utils):
# given
stub_revision = {'id': 'some-rev'}
mock_service.lookup_revision_with_context.return_value = stub_revision
expected_revision = {'id': 'some-rev-id'}
mock_utils.prepare_revision_view.return_value = expected_revision
# when
rv = self.client.get('/browse/revision/426/history/789/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('revision.html')
self.assertEqual(self.get_context_variable('sha1_git_root'), '426')
self.assertEqual(self.get_context_variable('sha1_git'), '789')
self.assertEqual(self.get_context_variable('revision'),
expected_revision)
self.assertEqual(self.get_context_variable('keys'),
set(expected_revision.keys()) - set(['directory',
'parents',
'children']))
mock_service.lookup_revision_with_context.assert_called_once_with(
'426', '789', 100)
mock_utils.prepare_revision_view.assert_called_once_with(stub_revision)
+ @patch('swh.web.ui.views.service')
+ @istest
+ def browse_revision_directory_not_found(self, mock_service):
+ # given
+ mock_service.lookup_directory_with_revision.side_effect = NotFoundExc(
+ 'Not found!')
+
+ # when
+ rv = self.client.get('/browse/revision/1/directory/')
+
+ # then
+ self.assertEquals(rv.status_code, 200)
+ self.assert_template_used('revision-directory.html')
+ self.assertEqual(self.get_context_variable('sha1_git'), '1')
+ self.assertEqual(self.get_context_variable('path'), '.')
+ self.assertIsNone(self.get_context_variable('result'))
+ self.assertEqual(
+ self.get_context_variable('message'),
+ "Not found!")
+
+ mock_service.lookup_directory_with_revision.assert_called_once_with(
+ '1', None)
+
+ @patch('swh.web.ui.views.service')
+ @istest
+ def browse_revision_directory_bad_input(self, mock_service):
+ # given
+ mock_service.lookup_directory_with_revision.side_effect = BadInputExc(
+ 'Bad input!')
+
+ # when
+ rv = self.client.get('/browse/revision/10/directory/')
+
+ # then
+ self.assertEquals(rv.status_code, 200)
+ self.assert_template_used('revision-directory.html')
+ self.assertEqual(self.get_context_variable('sha1_git'), '10')
+ self.assertEqual(self.get_context_variable('path'), '.')
+ self.assertIsNone(self.get_context_variable('result'))
+ self.assertEqual(
+ self.get_context_variable('message'),
+ "Bad input!")
+
+ mock_service.lookup_directory_with_revision.assert_called_once_with(
+ '10', None)
+
+ @patch('swh.web.ui.views.service')
+ @istest
+ def browse_revision_directory_not_implemented(self, mock_service):
+ # given
+ mock_service.lookup_directory_with_revision.side_effect = NotImplementedError( # noqa
+ 'Oops! Not implemented!')
+
+ # when
+ rv = self.client.get('/browse/revision/10/directory/path/')
+
+ # then
+ self.assertEquals(rv.status_code, 200)
+ self.assert_template_used('revision-directory.html')
+ self.assertEqual(self.get_context_variable('sha1_git'), '10')
+ self.assertEqual(self.get_context_variable('path'), 'path')
+ self.assertIsNone(self.get_context_variable('result'))
+ self.assertEqual(
+ self.get_context_variable('message'),
+ 'Oops! Not implemented!')
+
+ mock_service.lookup_directory_with_revision.assert_called_once_with(
+ '10', 'path')
+
+ from nose.plugins.attrib import attr
+
+ @attr('one')
+ @patch('swh.web.ui.views.service')
+ @istest
+ def browse_revision_directory(self, mock_service):
+ # given
+ stub_result0 = {'type': 'dir',
+ 'content': [{'id': 'some-result',
+ 'type': 'file',
+ 'name': 'blah'}]}
+ mock_service.lookup_directory_with_revision.return_value = stub_result0
+ stub_result1 = {
+ 'type': 'dir',
+ 'content': [
+ {'type': 'file',
+ 'name': 'blah',
+ 'link': '/browse/revision/100/directory/some/path/blah/'}
+ ]
+ }
+
+ # when
+ rv = self.client.get('/browse/revision/100/directory/some/path/')
+
+ # then
+ self.assertEquals(rv.status_code, 200)
+ self.assert_template_used('revision-directory.html')
+ self.assertEqual(self.get_context_variable('sha1_git'), '100')
+ self.assertEqual(self.get_context_variable('path'), 'some/path')
+ self.assertIsNone(self.get_context_variable('message'))
+ self.assertEqual(self.get_context_variable('result'), stub_result1)
+
+ mock_service.lookup_directory_with_revision.assert_called_once_with(
+ '100', 'some/path')
+
@patch('swh.web.ui.views.service')
@istest
def browse_entity_not_found(self, mock_service):
# given
mock_service.lookup_entity_by_uuid.return_value = []
# when
rv = self.client.get('/browse/entity/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('entity.html')
self.assertEqual(self.get_context_variable('entities'), [])
self.assertEqual(
self.get_context_variable('message'),
"Entity '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not found!")
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'5f4d4c51-498a-4e28-88b3-b3e4e8396cba')
@patch('swh.web.ui.views.service')
@istest
def browse_entity_bad_input(self, mock_service):
# given
mock_service.lookup_entity_by_uuid.side_effect = BadInputExc(
'wrong input')
# when
rv = self.client.get('/browse/entity/blah-blah-uuid/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('entity.html')
self.assertEqual(self.get_context_variable('entities'), [])
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'blah-blah-uuid')
@patch('swh.web.ui.views.service')
@istest
def browse_entity(self, mock_service):
# given
stub_entities = [
{'id': '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba'}]
mock_service.lookup_entity_by_uuid.return_value = stub_entities
# when
rv = self.client.get('/browse/entity/'
'5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba/')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('entity.html')
self.assertEqual(self.get_context_variable('entities'), stub_entities)
self.assertIsNone(self.get_context_variable('message'))
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba')
diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py
index 22df9bca..4ad1dcf9 100644
--- a/swh/web/ui/utils.py
+++ b/swh/web/ui/utils.py
@@ -1,150 +1,185 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import flask
from dateutil import parser
def filter_endpoints(url_map, prefix_url_rule, blacklist=[]):
"""Filter endpoints by prefix url rule.
Args:
- url_map: Url Werkzeug.Map of rules
- prefix_url_rule: prefix url string
- blacklist: blacklist of some url
Returns:
Dictionary of url_rule with values methods and endpoint.
The key is the url, the associated value is a dictionary of
'methods' (possible http methods) and 'endpoint' (python function)
"""
out = {}
for r in url_map:
rule = r['rule']
if rule == prefix_url_rule or rule in blacklist:
continue
if rule.startswith(prefix_url_rule):
out[rule] = {'methods': sorted(map(str, r['methods'])),
'endpoint': r['endpoint']}
return out
def prepare_directory_listing(files):
- """Given a list of dictionary files, return a view ready dictionary.
+ """Given a list of dictionary files, return a dictionary ready for view.
+
+ Args:
+ files: List of files to enrich
+
+ Returns:
+ List of enriched files with urls to other resources
"""
ls = []
for entry in files:
new_entry = {'name': entry['name'],
'type': entry['type']}
if entry['type'] == 'dir':
new_entry['link'] = flask.url_for('browse_directory',
sha1_git=entry['target'])
else:
new_entry['link'] = flask.url_for('browse_content_data',
q=entry['sha1'])
ls.append(new_entry)
return ls
+def prepare_directory_listing_with_revision(rev_sha1_git, prefix_path, files):
+ """Given a list of dictionary files, return a dictionary ready for view.
+
+ Args:
+ rev_sha1_git: The revision identifier
+ prefix_path: the path to append (could be None)
+ files: List of files to enrich
+
+ Returns:
+ List of enriched files with urls to other resources
+
+ """
+ ls = []
+ for entry in files:
+ new_entry = {'name': entry['name'],
+ 'type': entry['type']}
+ new_path = ''.join([
+ '' if not prefix_path
+ else (prefix_path + '/'),
+ entry['name']])
+
+ new_entry['link'] = flask.url_for('browse_revision_directory',
+ sha1_git=rev_sha1_git,
+ path=new_path)
+ ls.append(new_entry)
+
+ return ls
+
+
def prepare_revision_view(revision):
- """Prepare revision for display.
+ """Given a revision, return a dictionary ready view.
"""
author = revision.get('author')
if author:
revision['author'] = person_to_string(author)
committer = revision.get('committer')
if committer:
revision['committer'] = person_to_string(committer)
revision['parents'] = list(map(lambda p: flask.url_for('browse_revision',
sha1_git=p),
revision.get('parents', [])))
if 'children' in revision:
revision['children'] = list(map(lambda child: flask.url_for(
'browse_revision',
sha1_git=child),
revision['children']))
directory = revision.get('directory')
if directory:
revision['directory'] = flask.url_for('browse_directory',
sha1_git=revision['directory'])
return revision
def filter_field_keys(obj, field_keys):
"""Given an object instance (directory or list), and a csv field keys
to filter on.
Return the object instance with filtered keys.
Note: Returns obj as is if it's an instance of types not in (dictionary,
list)
Args:
- obj: one object (dictionary, list...) to filter.
- field_keys: csv or set of keys to filter the object on
Returns:
obj filtered on field_keys
"""
if isinstance(obj, dict):
filt_dict = {}
for key, value in obj.items():
if key in field_keys:
filt_dict[key] = value
return filt_dict
elif isinstance(obj, list):
filt_list = []
for e in obj:
filt_list.append(filter_field_keys(e, field_keys))
return filt_list
return obj
def fmap(f, data):
if isinstance(data, list):
return [f(x) for x in data]
if isinstance(data, dict):
return {k: f(v) for (k, v) in data.items()}
return f(data)
def person_to_string(person):
"""Map a person (person, committer, tagger, etc...) to a string.
"""
return ''.join([person['name'], ' <', person['email'], '>'])
def parse_timestamp(timestamp):
"""Given a time or timestamp (as string), parse the result as datetime.
Returns:
datetime result of parsing values.
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- Today is January 1, 2047 at 8:21:00AM
- 1452591542
"""
try:
res = parser.parse(timestamp, ignoretz=False, fuzzy=True)
except:
res = datetime.datetime.fromtimestamp(float(timestamp))
return res
diff --git a/swh/web/ui/views.py b/swh/web/ui/views.py
index 49df0642..bd72e71d 100644
--- a/swh/web/ui/views.py
+++ b/swh/web/ui/views.py
@@ -1,449 +1,477 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import flask
from flask import render_template, request, url_for, redirect
from flask.ext.api.decorators import set_renderers
from flask.ext.api.renderers import HTMLRenderer
from swh.core.hashutil import ALGORITHMS
from swh.web.ui import service, utils
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.main import app
hash_filter_keys = ALGORITHMS
@app.route('/')
@set_renderers(HTMLRenderer)
def homepage():
"""Home page
"""
flask.flash('This Web app is still work in progress, use at your own risk',
'warning')
return render_template('home.html')
@app.route('/about/')
@set_renderers(HTMLRenderer)
def about():
return render_template('about.html')
@app.route('/search/', methods=['GET', 'POST'])
@set_renderers(HTMLRenderer)
def search():
"""Search for hashes in swh-storage.
One form to submit either:
- hash query to look up in swh storage
- some file content to upload, compute its hash and look it up in swh
storage
- both
Returns:
dict representing data to look for in swh storage.
The following keys are returned:
- file: File submitted for upload
- filename: Filename submitted for upload
- q: Query on hash to look for
- message: Message detailing if data has been found or not.
"""
env = {'filename': None,
'q': None,
'file': None}
data = None
q = env['q']
file = env['file']
if request.method == 'GET':
data = request.args
elif request.method == 'POST':
data = request.data
# or hash and search a file
file = request.files.get('filename')
# could either be a query for sha1 hash
q = data.get('q')
messages = []
if q:
env['q'] = q
try:
r = service.lookup_hash(q)
messages.append('Content with hash %s%sfound!' % (
q, ' ' if r.get('found') else ' not '))
except BadInputExc as e:
messages.append(str(e))
if file and file.filename:
env['file'] = file
try:
uploaded_content = service.upload_and_search(file)
filename = uploaded_content['filename']
sha1 = uploaded_content['sha1']
found = uploaded_content['found']
messages.append('File %s with hash %s%sfound!' % (
filename, sha1, ' ' if found else ' not '))
env.update({
'filename': filename,
'sha1': sha1,
})
except BadInputExc as e:
messages.append(str(e))
env['q'] = q if q else ''
env['messages'] = messages
return render_template('upload_and_search.html', **env)
def _origin_seen(q, data):
"""Given an origin, compute a message string with the right information.
Args:
origin: a dictionary with keys:
- origin: a dictionary with type and url keys
- occurrence: a dictionary with a validity range
Returns:
Message as a string
"""
origin_type = data['origin_type']
origin_url = data['origin_url']
revision = data['revision']
branch = data['branch']
path = data['path']
return """The content with hash %s has been seen on origin with type '%s'
at url '%s'. The revision was identified at '%s' on branch '%s'.
The file's path referenced was '%s'.""" % (q,
origin_type,
origin_url,
revision,
branch,
path)
@app.route('/browse/content/')
@app.route('/browse/content//')
@set_renderers(HTMLRenderer)
def browse_content_detail(q='5d448a06f02d9de748b6b0b9620cba1bed8480da'):
"""Given a hash and a checksum, display the content's meta-data.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
env = {}
message = None
content = None
try:
content = service.lookup_content(q)
if not content:
message = 'Content with %s not found.' % q
except BadInputExc as e:
message = str(e)
env['message'] = message
env['content'] = content
return render_template('content.html', **env)
@app.route('/browse/content//raw/')
@set_renderers(HTMLRenderer)
def browse_content_data(q):
"""Given a hash and a checksum, display the content's raw data.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
env = {}
content = None
try:
content = service.lookup_content_raw(q)
if content:
# FIXME: will break if not utf-8
content['data'] = content['data'].decode('utf-8')
message = 'Content %s' % content['sha1']
else:
message = 'Content with %s not found.' % q
except BadInputExc as e:
message = str(e)
env['message'] = message
env['content'] = content
return render_template('content-data.html', **env)
# @app.route('/browse/content//origin/')
@set_renderers(HTMLRenderer)
def browse_content_with_origin(
q='sha1:4320781056e5a735a39de0b8c229aea224590052'):
"""Show content information.
Args:
- q: query string of the form with
`algo_hash` in sha1, sha1_git, sha256.
This means that several different URLs (at least one per
HASH_ALGO) will point to the same content sha: the sha with
'hash' format
Returns:
The content's information at for a given checksum.
"""
env = {'q': q}
try:
content = service.lookup_hash(q)
if not content.get('found'):
message = "Hash %s was not found." % q
else:
origin = service.lookup_hash_origin(q)
message = _origin_seen(q, origin)
except BadInputExc as e: # do not like it but do not duplicate code
message = str(e)
env['message'] = message
return render_template('content-with-origin.html', **env)
@app.route('/browse/directory/')
@app.route('/browse/directory//')
@set_renderers(HTMLRenderer)
def browse_directory(sha1_git='dcf3289b576b1c8697f2a2d46909d36104208ba3'):
"""Show directory information.
Args:
- sha1_git: the directory's sha1 git identifier.
Returns:
The content's information at sha1_git
"""
env = {'sha1_git': sha1_git}
files = []
try:
directory_files = service.lookup_directory(sha1_git)
if directory_files:
message = "Listing for directory %s:" % sha1_git
files = utils.prepare_directory_listing(directory_files)
else:
message = "Directory %s not found." % sha1_git
except BadInputExc as e: # do not like it but do not duplicate code
message = str(e)
env['message'] = message
env['files'] = files
return render_template('directory.html', **env)
@app.route('/browse/origin/')
@app.route('/browse/origin//')
@set_renderers(HTMLRenderer)
def browse_origin(origin_id=1):
"""Browse origin with id id.
"""
env = {'origin_id': origin_id,
'origin': None}
try:
ori = service.lookup_origin(origin_id)
if ori:
env.update({'origin': ori})
else:
env.update({'message': 'Origin %s not found!' % origin_id})
except BadInputExc as e:
env.update({'message': str(e)})
return render_template('origin.html', **env)
@app.route('/browse/person/')
@app.route('/browse/person//')
@set_renderers(HTMLRenderer)
def browse_person(person_id=1):
"""Browse person with id id.
"""
env = {'person_id': person_id,
'person': None}
try:
ori = service.lookup_person(person_id)
if ori:
env.update({'person': ori})
else:
env.update({'message': 'Person %s not found!' % person_id})
except BadInputExc as e:
env.update({'message': str(e)})
return render_template('person.html', **env)
@app.route('/browse/release/')
@app.route('/browse/release//')
@set_renderers(HTMLRenderer)
def browse_release(sha1_git='1e951912027ea6873da6985b91e50c47f645ae1a'):
"""Browse release with sha1_git.
"""
env = {'sha1_git': sha1_git,
'release': None}
try:
rel = service.lookup_release(sha1_git)
if rel:
author = rel.get('author')
if author:
rel['author'] = utils.person_to_string(author)
target_type = rel.get('target_type')
if target_type == 'revision':
rel['target'] = url_for('browse_revision',
sha1_git=rel['target'])
env.update({'release': rel,
'keys': ['id', 'name', 'date', 'message', 'author',
'target', 'target_type']})
else:
env.update({'message': 'Release %s not found!' % sha1_git})
except BadInputExc as e:
env.update({'message': str(e)})
return render_template('release.html', **env)
@app.route('/browse/revision/')
@app.route('/browse/revision//')
@set_renderers(HTMLRenderer)
def browse_revision(sha1_git='d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754'):
"""Browse revision with sha1_git.
"""
env = {'sha1_git': sha1_git,
'keys': [],
'revision': None}
try:
rev = service.lookup_revision(sha1_git)
if rev:
rev = utils.prepare_revision_view(rev)
env.update({
'revision': rev,
'keys': set(rev.keys()) - set(['directory', 'parents',
'children'])
})
else:
env.update({'message': 'Revision %s not found!' % sha1_git})
except BadInputExc as e:
env.update({'message': str(e)})
return render_template('revision.html', **env)
@app.route('/browse/revision//history//')
@set_renderers(HTMLRenderer)
def browse_revision_history(sha1_git_root, sha1_git):
"""Display information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
"""
limit = int(request.args.get('limit', '100'))
env = {'sha1_git_root': sha1_git_root,
'sha1_git': sha1_git,
'message': None,
'keys': [],
'revision': None}
if sha1_git == sha1_git_root:
return redirect(url_for('browse_revision',
sha1_git=sha1_git,
limit=limit))
try:
revision = service.lookup_revision_with_context(sha1_git_root,
sha1_git,
limit)
if revision:
revision = utils.prepare_revision_view(revision)
env.update({
'revision': revision,
'keys': set(revision.keys()) - set(['directory', 'parents',
'children'])
})
else:
env['message'] = "Possibly sha1_git '%s' is not an ancestor " \
"of sha1_git_root '%s'" % (sha1_git,
sha1_git_root)
except (BadInputExc, NotFoundExc) as e:
env['message'] = str(e)
return render_template('revision.html', **env)
+@app.route('/browse/revision//directory/')
+@app.route('/browse/revision//directory//')
+@set_renderers(HTMLRenderer)
+def browse_revision_directory(sha1_git, path=None):
+ """Browse directory from revision with sha1_git.
+
+ """
+ env = {
+ 'sha1_git': sha1_git,
+ 'path': '.' if not path else path,
+ 'message': None,
+ 'result': None
+ }
+
+ try:
+ result = service.lookup_directory_with_revision(sha1_git, path)
+ if result['type'] == 'dir': # dir_entries
+ result['content'] = utils.prepare_directory_listing_with_revision(
+ sha1_git,
+ path,
+ result['content'])
+ env['result'] = result
+ except (BadInputExc, NotFoundExc, NotImplementedError) as e:
+ env['message'] = str(e)
+
+ return render_template('revision-directory.html', **env)
+
+
@app.route('/browse/entity/')
@app.route('/browse/entity//')
@set_renderers(HTMLRenderer)
def browse_entity(uuid='5f4d4c51-498a-4e28-88b3-b3e4e8396cba'):
env = {'entities': [],
'message': None}
entities = env['entities']
try:
entities = service.lookup_entity_by_uuid(uuid)
if not entities:
env['message'] = "Entity '%s' not found!" % uuid
except BadInputExc as e:
env.update({'message': str(e)})
env['entities'] = entities
return render_template('entity.html', **env)