Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9125174
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
85 KB
Subscribers
None
View Options
diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py
index ae765ac1f..7921c5474 100644
--- a/swh/web/ui/tests/test_utils.py
+++ b/swh/web/ui/tests/test_utils.py
@@ -1,905 +1,907 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import dateutil
import unittest
from unittest.mock import patch, call
from nose.tools import istest, nottest
from swh.web.ui import utils
class UtilsTestCase(unittest.TestCase):
def setUp(self):
self.url_map = [dict(rule='/other/<slug>',
methods=set(['GET', 'POST', 'HEAD']),
endpoint='foo'),
dict(rule='/some/old/url/<slug>',
methods=set(['GET', 'POST']),
endpoint='blablafn'),
dict(rule='/other/old/url/<int:id>',
methods=set(['GET', 'HEAD']),
endpoint='bar'),
dict(rule='/other',
methods=set([]),
endpoint=None),
dict(rule='/other2',
methods=set([]),
endpoint=None)]
@istest
def filter_endpoints_1(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/some')
# then
self.assertEquals(actual_data, {
'/some/old/url/<slug>': {
'methods': ['GET', 'POST'],
'endpoint': 'blablafn'
}
})
@istest
def filter_endpoints_2(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/other',
blacklist=['/other2'])
# then
# rules /other is skipped because its' exactly the prefix url
# rules /other2 is skipped because it's blacklisted
self.assertEquals(actual_data, {
'/other/<slug>': {
'methods': ['GET', 'HEAD', 'POST'],
'endpoint': 'foo'
},
'/other/old/url/<int:id>': {
'methods': ['GET', 'HEAD'],
'endpoint': 'bar'
}
})
@istest
def prepare_data_for_view_default_encoding(self):
self.maxDiff = None
# given
inputs = [
{
'data': b'some blah data'
},
{
'data': 1,
'data_url': '/api/1/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}]
# when
actual_result = utils.prepare_data_for_view(inputs)
# then
self.assertEquals(actual_result, [
{
'data': 'some blah data',
},
{
'data': 1,
'data_url': '/browse/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}
])
@istest
def prepare_data_for_view(self):
self.maxDiff = None
# given
inputs = [
{
'data': b'some blah data'
},
{
'data': 1,
'data_url': '/api/1/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}]
# when
actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
# then
self.assertEquals(actual_result, [
{
'data': 'some blah data',
},
{
'data': 1,
'data_url': '/browse/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}
])
@istest
def prepare_data_for_view_ko_cannot_decode(self):
self.maxDiff = None
# given
inputs = {
'data': 'hé dude!'.encode('utf8'),
}
actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
# then
self.assertEquals(actual_result, {
'data': "Cannot decode the data bytes, try and set another "
"encoding in the url (e.g. ?encoding=utf8) or "
"download directly the "
"content's raw data.",
})
@istest
def filter_field_keys_dict_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory1', 'file2'})
# then
self.assertEqual(actual_res, {})
@istest
def filter_field_keys_dict(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory', 'link'})
# then
self.assertEqual(actual_res, {'directory': 1, 'link': 3})
@istest
def filter_field_keys_list_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'1': 1, '2': 2, 'link': 3}],
{'d'})
# then
self.assertEqual(actual_res, [{}, {}])
@istest
def filter_field_keys_map(self):
# when
actual_res = utils.filter_field_keys(
map(lambda x: {'i': x['i']+1, 'j': x['j']},
[{'i': 1, 'j': None},
{'i': 2, 'j': None},
{'i': 3, 'j': None}]),
{'i'})
# then
self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}])
@istest
def filter_field_keys_list(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'dir': 1, 'fil': 2, 'lin': 3}],
{'directory', 'dir'})
# then
self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}])
@istest
def filter_field_keys_other(self):
# given
input_set = {1, 2}
# when
actual_res = utils.filter_field_keys(input_set, {'a', '1'})
# then
self.assertEqual(actual_res, input_set)
@istest
def fmap(self):
self.assertEquals([2, 3, None, 4],
utils.fmap(lambda x: x+1, [1, 2, None, 3]))
self.assertEquals([11, 12, 13],
list(utils.fmap(lambda x: x+10,
map(lambda x: x, [1, 2, 3]))))
self.assertEquals({'a': 2, 'b': 4},
utils.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
self.assertEquals(100,
utils.fmap(lambda x: x*10, 10))
self.assertEquals({'a': [2, 6], 'b': 4},
utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2}))
self.assertIsNone(utils.fmap(lambda x: x, None))
@istest
def person_to_string(self):
self.assertEqual(utils.person_to_string(dict(name='raboof',
email='foo@bar')),
'raboof <foo@bar>')
@istest
def parse_timestamp(self):
input_timestamps = [
+ None,
'2016-01-12',
'2016-01-12T09:19:12+0100',
'Today is January 1, 2047 at 8:21:00AM',
'1452591542',
]
output_dates = [
- datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc),
+ None,
+ datetime.datetime(2016, 1, 12, 0, 0),
datetime.datetime(2016, 1, 12, 9, 19, 12,
tzinfo=dateutil.tz.tzoffset(None, 3600)),
- datetime.datetime(2047, 1, 1, 8, 21, tzinfo=datetime.timezone.utc),
+ datetime.datetime(2047, 1, 1, 8, 21),
datetime.datetime(2016, 1, 12, 9, 39, 2,
tzinfo=datetime.timezone.utc),
]
for ts, exp_date in zip(input_timestamps, output_dates):
self.assertEquals(utils.parse_timestamp(ts), exp_date)
@istest
def enrich_release_0(self):
# when
actual_release = utils.enrich_release({})
# then
self.assertEqual(actual_release, {})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_1(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/content/sha1_git:123/'
# when
actual_release = utils.enrich_release({'target': '123',
'target_type': 'content'})
# then
self.assertEqual(actual_release, {
'target': '123',
'target_type': 'content',
'target_url': '/api/1/content/sha1_git:123/'
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:123')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_2(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/dir/23/'
# when
actual_release = utils.enrich_release({'target': '23',
'target_type': 'directory'})
# then
self.assertEqual(actual_release, {
'target': '23',
'target_type': 'directory',
'target_url': '/api/1/dir/23/'
})
mock_flask.url_for.assert_called_once_with('api_directory',
q='23')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_3(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/rev/3/'
# when
actual_release = utils.enrich_release({'target': '3',
'target_type': 'revision'})
# then
self.assertEqual(actual_release, {
'target': '3',
'target_type': 'revision',
'target_url': '/api/1/rev/3/'
})
mock_flask.url_for.assert_called_once_with('api_revision',
sha1_git='3')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_4(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/rev/4/'
# when
actual_release = utils.enrich_release({'target': '4',
'target_type': 'release'})
# then
self.assertEqual(actual_release, {
'target': '4',
'target_type': 'release',
'target_url': '/api/1/rev/4/'
})
mock_flask.url_for.assert_called_once_with('api_release',
sha1_git='4')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_no_type(self, mock_flask):
# when/then
self.assertEqual(utils.enrich_directory({'id': 'dir-id'}),
{'id': 'dir-id'})
# given
mock_flask.url_for.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'target': '123',
})
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'target': '123',
'target_url': '/api/content/sha1_git:123/',
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:123')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_with_context_and_type_file(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
}, context_url='/api/revision/revsha1/directory/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
'target_url': '/api/content/sha1_git:123/',
'file_url': '/api/revision/revsha1/directory'
'/prefix/path/hy/'
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:789')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_with_context_and_type_dir(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/directory/456/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'dir',
'name': 'emacs-42',
'target_type': 'file',
'target': '456',
}, context_url='/api/revision/origin/2/directory/some/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'dir',
'target_type': 'file',
'name': 'emacs-42',
'target': '456',
'target_url': '/api/directory/456/',
'dir_url': '/api/revision/origin/2/directory'
'/some/prefix/path/emacs-42/'
})
mock_flask.url_for.assert_called_once_with('api_directory',
sha1_git='456')
@istest
def enrich_content_without_hashes(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_content_with_hashes(self, mock_flask):
for h in ['sha1', 'sha256', 'sha1_git']:
# given
mock_flask.url_for.side_effect = [
'/api/content/%s:123/raw/' % h,
'/api/filetype/%s:123/' % h,
'/api/language/%s:123/' % h,
'/api/license/%s:123/' % h,
]
# when
enriched_content = utils.enrich_content(
{
'id': '123',
h: 'blahblah'
}
)
# then
self.assertEqual(
enriched_content,
{
'id': '123',
h: 'blahblah',
'data_url': '/api/content/%s:123/raw/' % h,
'filetype_url': '/api/filetype/%s:123/' % h,
'language_url': '/api/language/%s:123/' % h,
'license_url': '/api/license/%s:123/' % h,
}
)
mock_flask.url_for.assert_has_calls([
call('api_content_raw', q='%s:blahblah' % h),
call('api_content_filetype', q='%s:blahblah' % h),
call('api_content_language', q='%s:blahblah' % h),
call('api_content_license', q='%s:blahblah' % h),
])
mock_flask.reset()
@patch('swh.web.ui.utils.flask')
@istest
def enrich_content_with_hashes_and_top_level_url(self, mock_flask):
for h in ['sha1', 'sha256', 'sha1_git']:
# given
mock_flask.url_for.side_effect = [
'/api/content/%s:123/' % h,
'/api/content/%s:123/raw/' % h,
'/api/filetype/%s:123/' % h,
'/api/language/%s:123/' % h,
'/api/license/%s:123/' % h,
]
# when
enriched_content = utils.enrich_content(
{
'id': '123',
h: 'blahblah'
},
top_url=True
)
# then
self.assertEqual(
enriched_content,
{
'id': '123',
h: 'blahblah',
'content_url': '/api/content/%s:123/' % h,
'data_url': '/api/content/%s:123/raw/' % h,
'filetype_url': '/api/filetype/%s:123/' % h,
'language_url': '/api/language/%s:123/' % h,
'license_url': '/api/license/%s:123/' % h,
}
)
mock_flask.url_for.assert_has_calls([
call('api_content_metadata', q='%s:blahblah' % h),
call('api_content_raw', q='%s:blahblah' % h),
call('api_content_filetype', q='%s:blahblah' % h),
call('api_content_language', q='%s:blahblah' % h),
call('api_content_license', q='%s:blahblah' % h),
])
mock_flask.reset()
@istest
def enrich_entity_identity(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_entity_with_sha1(self, mock_flask):
# given
def url_for_test(fn, **entity):
return '/api/entity/' + entity['uuid'] + '/'
mock_flask.url_for.side_effect = url_for_test
# when
actual_entity = utils.enrich_entity({
'uuid': 'uuid-1',
'parent': 'uuid-parent',
'name': 'something'
})
# then
self.assertEqual(actual_entity, {
'uuid': 'uuid-1',
'uuid_url': '/api/entity/uuid-1/',
'parent': 'uuid-parent',
'parent_url': '/api/entity/uuid-parent/',
'name': 'something',
})
mock_flask.url_for.assert_has_calls([call('api_entity_by_uuid',
uuid='uuid-1'),
call('api_entity_by_uuid',
uuid='uuid-parent')])
@nottest
def _url_for_context_test(self, fn, **data):
if fn == 'api_revision':
if 'context' in data and data['context'] is not None:
return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa
else:
return '/api/revision/%s/' % data['sha1_git']
elif fn == 'api_revision_log':
if 'prev_sha1s' in data:
return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % data['sha1_git']
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_without_children_or_parent(self, mock_flask):
# given
def url_for_test(fn, **data):
if fn == 'api_revision':
return '/api/revision/' + data['sha1_git'] + '/'
elif fn == 'api_revision_log':
return '/api/revision/' + data['sha1_git'] + '/log/'
elif fn == 'api_directory':
return '/api/directory/' + data['sha1_git'] + '/'
elif fn == 'api_person':
return '/api/person/' + data['person_id'] + '/'
mock_flask.url_for.side_effect = url_for_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'directory': '123',
'author': {'id': '1'},
'committer': {'id': '2'},
})
expected_revision = {
'id': 'rev-id',
'directory': '123',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'directory_url': '/api/directory/123/',
'author': {'id': '1'},
'author_url': '/api/person/1/',
'committer': {'id': '2'},
'committer_url': '/api/person/2/'
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_person',
person_id='1'),
call('api_person',
person_id='2'),
call('api_directory',
sha1_git='123')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_children_and_parent_no_dir(self,
mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_no_context(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
})
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/']
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_context_empty_prev_list(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'parents': ['123'],
'children': ['456']}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_context_some_prev_list(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev1-rev/prev0-rev/log/'),
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev0-rev/prev/prev1-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456']}, context='prev1-rev/prev0-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev0-rev',
context='prev1-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev1-rev/prev0-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@nottest
def _url_for_rev_message_test(self, fn, **data):
if fn == 'api_revision':
if 'context' in data and data['context'] is not None:
return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa
else:
return '/api/revision/%s/' % data['sha1_git']
elif fn == 'api_revision_log':
if 'prev_sha1s' in data and data['prev_sha1s'] is not None:
return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % data['sha1_git']
elif fn == 'api_revision_raw_message':
return '/api/revision/' + data['sha1_git'] + '/raw/'
else:
return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_no_message(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_rev_message_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_invalid_message(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_rev_message_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'message_decoding_failed': True,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'message_decoding_failed': True,
'message_url': '/api/revision/rev-id/raw/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py
index ed06c64ae..e6fe7a77d 100644
--- a/swh/web/ui/utils.py
+++ b/swh/web/ui/utils.py
@@ -1,384 +1,390 @@
-# Copyright (C) 2015-2016 The Software Heritage developers
+# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import datetime
+
import flask
import re
+from datetime import datetime, timezone
from dateutil import parser
+from .exc import BadInputExc
+
def filter_endpoints(url_map, prefix_url_rule, blacklist=[]):
"""Filter endpoints by prefix url rule.
Args:
- url_map: Url Werkzeug.Map of rules
- prefix_url_rule: prefix url string
- blacklist: blacklist of some url
Returns:
Dictionary of url_rule with values methods and endpoint.
The key is the url, the associated value is a dictionary of
'methods' (possible http methods) and 'endpoint' (python function)
"""
out = {}
for r in url_map:
rule = r['rule']
if rule == prefix_url_rule or rule in blacklist:
continue
if rule.startswith(prefix_url_rule):
out[rule] = {'methods': sorted(map(str, r['methods'])),
'endpoint': r['endpoint']}
return out
def fmap(f, data):
"""Map f to data at each level.
This must keep the origin data structure type:
- map -> map
- dict -> dict
- list -> list
- None -> None
Args:
f: function that expects one argument.
data: data to traverse to apply the f function.
list, map, dict or bare value.
Returns:
The same data-structure with modified values by the f function.
"""
- if not data:
+ if data is None:
return data
if isinstance(data, map):
return map(lambda y: fmap(f, y), (x for x in data))
if isinstance(data, list):
return [fmap(f, x) for x in data]
if isinstance(data, dict):
return {k: fmap(f, v) for (k, v) in data.items()}
return f(data)
def prepare_data_for_view(data, encoding='utf-8'):
def prepare_data(s):
# Note: can only be 'data' key with bytes of raw content
if isinstance(s, bytes):
try:
return s.decode(encoding)
except:
return "Cannot decode the data bytes, try and set another " \
"encoding in the url (e.g. ?encoding=utf8) or " \
"download directly the " \
"content's raw data."
if isinstance(s, str):
return re.sub(r'/api/1/', r'/browse/', s)
return s
return fmap(prepare_data, data)
def filter_field_keys(data, field_keys):
"""Given an object instance (directory or list), and a csv field keys
to filter on.
Return the object instance with filtered keys.
Note: Returns obj as is if it's an instance of types not in (dictionary,
list)
Args:
- data: one object (dictionary, list...) to filter.
- field_keys: csv or set of keys to filter the object on
Returns:
obj filtered on field_keys
"""
if isinstance(data, map):
return map(lambda x: filter_field_keys(x, field_keys), data)
if isinstance(data, list):
return [filter_field_keys(x, field_keys) for x in data]
if isinstance(data, dict):
return {k: v for (k, v) in data.items() if k in field_keys}
return data
def person_to_string(person):
"""Map a person (person, committer, tagger, etc...) to a string.
"""
return ''.join([person['name'], ' <', person['email'], '>'])
def parse_timestamp(timestamp):
"""Given a time or timestamp (as string), parse the result as datetime.
Returns:
- a timezone-aware datetime representing the parsed value. If the parsed
- value doesn't specify a timezone, UTC is assumed.
+ a timezone-aware datetime representing the parsed value.
+ None if the parsing fails.
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- Today is January 1, 2047 at 8:21:00AM
- 1452591542
+
"""
- default_timestamp = datetime.datetime.utcfromtimestamp(0).replace(
- tzinfo=datetime.timezone.utc)
+ if not timestamp:
+ return None
+
try:
- res = parser.parse(timestamp, ignoretz=False, fuzzy=True,
- default=default_timestamp)
+ return parser.parse(timestamp, ignoretz=False, fuzzy=True)
except:
- res = datetime.datetime.utcfromtimestamp(float(timestamp)).replace(
- tzinfo=datetime.timezone.utc)
- return res
+ try:
+ return datetime.utcfromtimestamp(float(timestamp)).replace(
+ tzinfo=timezone.utc)
+ except (ValueError, OverflowError) as e:
+ raise BadInputExc(e)
def enrich_object(object):
"""Enrich an object (revision, release) with link to the 'target' of
type 'target_type'.
Args:
object: An object with target and target_type keys
(e.g. release, revision)
Returns:
Object enriched with target_url pointing to the right
swh.web.ui.api urls for the pointing object (revision,
release, content, directory)
"""
obj = object.copy()
if 'target' in obj and 'target_type' in obj:
if obj['target_type'] == 'revision':
obj['target_url'] = flask.url_for('api_revision',
sha1_git=obj['target'])
elif obj['target_type'] == 'release':
obj['target_url'] = flask.url_for('api_release',
sha1_git=obj['target'])
elif obj['target_type'] == 'content':
obj['target_url'] = flask.url_for(
'api_content_metadata',
q='sha1_git:' + obj['target'])
elif obj['target_type'] == 'directory':
obj['target_url'] = flask.url_for('api_directory',
q=obj['target'])
return obj
enrich_release = enrich_object
def enrich_directory(directory, context_url=None):
"""Enrich directory with url to content or directory.
"""
if 'type' in directory:
target_type = directory['type']
target = directory['target']
if target_type == 'file':
directory['target_url'] = flask.url_for('api_content_metadata',
q='sha1_git:%s' % target)
if context_url:
directory['file_url'] = context_url + directory['name'] + '/'
else:
directory['target_url'] = flask.url_for('api_directory',
sha1_git=target)
if context_url:
directory['dir_url'] = context_url + directory['name'] + '/'
return directory
def enrich_metadata_endpoint(content):
"""Enrich metadata endpoint with link to the upper metadata endpoint.
"""
c = content.copy()
c['content_url'] = flask.url_for('api_content_metadata',
q='sha1:%s' % c['id'])
return c
def enrich_content(content, top_url=False):
"""Enrich content with links to:
- data_url: its raw data
- filetype_url: its filetype information
"""
for h in ['sha1', 'sha1_git', 'sha256']:
if h in content:
q = '%s:%s' % (h, content[h])
if top_url:
content['content_url'] = flask.url_for('api_content_metadata',
q=q)
content['data_url'] = flask.url_for('api_content_raw', q=q)
content['filetype_url'] = flask.url_for('api_content_filetype',
q=q)
content['language_url'] = flask.url_for('api_content_language',
q=q)
content['license_url'] = flask.url_for('api_content_license',
q=q)
break
return content
def enrich_entity(entity):
"""Enrich entity with
"""
if 'uuid' in entity:
entity['uuid_url'] = flask.url_for('api_entity_by_uuid',
uuid=entity['uuid'])
if 'parent' in entity and entity['parent']:
entity['parent_url'] = flask.url_for('api_entity_by_uuid',
uuid=entity['parent'])
return entity
def _get_path_list(path_string):
"""Helper for enrich_revision: get a list of the sha1 id of the navigation
breadcrumbs, ordered from the oldest to the most recent.
Args:
path_string: the path as a '/'-separated string
Returns:
The navigation context as a list of sha1 revision ids
"""
return path_string.split('/')
def _get_revision_contexts(rev_id, context):
"""Helper for enrich_revision: retrieve for the revision id and potentially
the navigation breadcrumbs the context to pass to parents and children of
of the revision.
Args:
rev_id: the revision's sha1 id
context: the current navigation context
Returns:
The context for parents, children and the url of the direct child as a
tuple in that order.
"""
context_for_parents = None
context_for_children = None
url_direct_child = None
if not context:
return (rev_id, None, None)
path_list = _get_path_list(context)
context_for_parents = '%s/%s' % (context, rev_id)
prev_for_children = path_list[:-1]
if len(prev_for_children) > 0:
context_for_children = '/'.join(prev_for_children)
child_id = path_list[-1]
# This commit is not the first commit in the path
if context_for_children:
url_direct_child = flask.url_for(
'api_revision',
sha1_git=child_id,
context=context_for_children)
# This commit is the first commit in the path
else:
url_direct_child = flask.url_for(
'api_revision',
sha1_git=child_id)
return (context_for_parents, context_for_children, url_direct_child)
def _make_child_url(rev_children, context):
"""Helper for enrich_revision: retrieve the list of urls corresponding
to the children of the current revision according to the navigation
breadcrumbs.
Args:
rev_children: a list of revision id
context: the '/'-separated navigation breadcrumbs
Returns:
the list of the children urls according to the context
"""
children = []
for child in rev_children:
if context and child != _get_path_list(context)[-1]:
children.append(flask.url_for('api_revision', sha1_git=child))
elif not context:
children.append(flask.url_for('api_revision', sha1_git=child))
return children
def enrich_revision(revision, context=None):
"""Enrich revision with links where it makes sense (directory, parents).
Keep track of the navigation breadcrumbs if they are specified.
Args:
revision: the revision as a dict
context: the navigation breadcrumbs as a /-separated string of revision
sha1_git
"""
ctx_parents, ctx_children, url_direct_child = _get_revision_contexts(
revision['id'], context)
revision['url'] = flask.url_for('api_revision', sha1_git=revision['id'])
revision['history_url'] = flask.url_for('api_revision_log',
sha1_git=revision['id'])
if context:
revision['history_context_url'] = flask.url_for(
'api_revision_log',
sha1_git=revision['id'],
prev_sha1s=context)
if 'author' in revision:
author = revision['author']
revision['author_url'] = flask.url_for('api_person',
person_id=author['id'])
if 'committer' in revision:
committer = revision['committer']
revision['committer_url'] = flask.url_for('api_person',
person_id=committer['id'])
if 'directory' in revision:
revision['directory_url'] = flask.url_for(
'api_directory',
sha1_git=revision['directory'])
if 'parents' in revision:
parents = []
for parent in revision['parents']:
parents.append(flask.url_for('api_revision',
sha1_git=parent))
revision['parent_urls'] = parents
if 'children' in revision:
children = _make_child_url(revision['children'], context)
if url_direct_child:
children.append(url_direct_child)
revision['children_urls'] = children
else:
if url_direct_child:
revision['children_urls'] = [url_direct_child]
if 'message_decoding_failed' in revision:
revision['message_url'] = flask.url_for(
'api_revision_raw_message',
sha1_git=revision['id'])
return revision
diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py
index 1717080af..7a5a3e921 100644
--- a/swh/web/ui/views/api.py
+++ b/swh/web/ui/views/api.py
@@ -1,1098 +1,1096 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for
from swh.web.ui import service, utils, apidoc as doc
from swh.web.ui.exc import NotFoundExc
from swh.web.ui.main import app
# canned doc string snippets that are used in several doc strings
_doc_arg_content_id = """A "[hash_type:]hash" content identifier, where
hash_type is one of "sha1" (the default), "sha1_git", "sha256", and hash is
a checksum obtained with the hash_type hashing algorithm."""
_doc_arg_last_elt = 'element to start listing from, for pagination purposes'
_doc_arg_per_page = 'number of elements to list, for pagination purposes'
_doc_exc_bad_id = 'syntax error in the given identifier(s)'
_doc_exc_id_not_found = 'no object matching the given criteria could be found'
_doc_ret_revision_meta = 'metadata of the revision identified by sha1_git'
_doc_ret_revision_log = """list of dictionaries representing the metadata of
each revision found in the commit log heading to revision sha1_git.
For each commit at least the following information are returned:
author/committer, authoring/commit timestamps, revision id, commit message,
parent (i.e., immediately preceding) commits, "root" directory id."""
_doc_header_link = """indicates that a subsequent result page is available,
pointing to it"""
@app.route('/api/1/stat/counters/')
@doc.route('/api/1/stat/counters/', noargs=True)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""dictionary mapping object types to the amount of
corresponding objects currently available in the archive""")
def api_stats():
"""Get statistics about the content of the archive.
"""
return service.stat_counters()
@app.route('/api/1/origin/<int:origin_id>/visits/')
@doc.route('/api/1/origin/visits/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.header('Link', doc=_doc_header_link)
@doc.param('last_visit', default=None,
argtype=doc.argtypes.int,
doc=_doc_arg_last_elt)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.returns(rettype=doc.rettypes.list,
retdoc="""a list of dictionaries describing individual visits.
For each visit, its identifier, timestamp (as UNIX time), outcome,
and visit-specific URL for more information are given.""")
def api_origin_visits(origin_id):
"""Get information about all visits of a given software origin.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
last_visit = request.args.get('last_visit')
if last_visit:
last_visit = int(last_visit)
def _lookup_origin_visits(
origin_id, last_visit=last_visit, per_page=per_page):
return service.lookup_origin_visits(
origin_id, last_visit=last_visit, per_page=per_page)
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_visit_url'] = url_for('api_origin_visit',
origin_id=ov['origin'],
visit_id=ov['visit'])
return ov
r = _api_lookup(
origin_id,
_lookup_origin_visits,
error_msg_if_not_found='No origin %s found' % origin_id,
enrich_fn=_enrich_origin_visit)
if r:
l = len(r)
if l == per_page:
new_last_visit = r[-1]['visit']
params = {
'origin_id': origin_id,
'last_visit': new_last_visit
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_origin_visits', **params)
}
result.update({
'results': r
})
return result
@app.route('/api/1/origin/<int:origin_id>/visit/<int:visit_id>/')
@doc.route('/api/1/origin/visit/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.arg('visit_id',
default=1,
argtype=doc.argtypes.int,
argdoc="""visit identifier, relative to the origin identified by
origin_id""")
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""dictionary containing both metadata for the entire
visit (e.g., timestamp as UNIX time, visit outcome, etc.) and what
was at the software origin during the visit (i.e., a mapping from
branches to other archive objects""")
def api_origin_visit(origin_id, visit_id):
"""Get information about a specific visit of a software origin.
"""
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_url'] = url_for('api_origin', origin_id=ov['origin'])
if 'occurrences' in ov:
ov['occurrences'] = {
k: utils.enrich_object(v)
for k, v in ov['occurrences'].items()
}
return ov
return _api_lookup(
origin_id,
service.lookup_origin_visit,
'No visit %s for origin %s found' % (visit_id, origin_id),
_enrich_origin_visit,
visit_id)
@app.route('/api/1/content/symbol/', methods=['POST'])
@app.route('/api/1/content/symbol/<string:q>/')
@doc.route('/api/1/content/symbol/', tags=['upcoming'])
@doc.arg('q',
default='hello',
argtype=doc.argtypes.str,
argdoc="""An expression string to lookup in swh's raw content""")
@doc.header('Link', doc=_doc_header_link)
@doc.param('last_sha1', default=None,
argtype=doc.argtypes.str,
doc=_doc_arg_last_elt)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.returns(rettype=doc.rettypes.list,
retdoc="""A list of dict whose content matches the expression.
Each dict has the following keys:
- id (bytes): identifier of the content
- name (text): symbol whose content match the expression
- kind (text): kind of the symbol that matched
- lang (text): Language for that entry
- line (int): Number line for the symbol
""")
def api_content_symbol(q=None):
"""Search content objects by `Ctags <http://ctags.sourceforge.net/>`_-style
symbol (e.g., function name, data type, method, ...).
"""
result = {}
last_sha1 = request.args.get('last_sha1', None)
per_page = int(request.args.get('per_page', '10'))
def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page):
return service.lookup_expression(exp, last_sha1, per_page)
symbols = _api_lookup(
q,
lookup_fn=lookup_exp,
error_msg_if_not_found='No indexed raw content match expression \''
'%s\'.' % q,
enrich_fn=lambda x: utils.enrich_content(x, top_url=True))
if symbols:
l = len(symbols)
if l == per_page:
new_last_sha1 = symbols[-1]['sha1']
params = {
'q': q,
'last_sha1': new_last_sha1,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_content_symbol', **params),
}
result.update({
'results': symbols
})
return result
@app.route('/api/1/content/known/', methods=['POST'])
@app.route('/api/1/content/known/<string:q>/')
@doc.route('/api/1/content/known/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.sha1,
argdoc='content identifier as a sha1 checksum')
# @doc.param('q', default=None,
# argtype=doc.argtypes.str,
# doc="""(POST request) An algo_hash:hash string, where algo_hash
# is one of sha1, sha1_git or sha256 and hash is the hash to
# search for in SWH""")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""a dictionary with results (found/not found for each given
identifier) and statistics about how many identifiers were
found""")
def api_check_content_known(q=None):
"""Check whether some content (AKA "blob") is present in the archive.
Lookup can be performed by various means:
- a GET request with one or several hashes, separated by ','
- a POST request with one or several hashes, passed as (multiple) values
for parameter 'q'
"""
response = {'search_res': None,
'search_stats': None}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
queries = []
# GET: Many hash separated values request
if q:
hashes = q.split(',')
for v in hashes:
queries.append({'filename': None, 'sha1': v})
# POST: Many hash requests in post form submission
elif request.method == 'POST':
data = request.form
# Remove potential inputs with no associated value
for k, v in data.items():
if v is not None:
if k == 'q' and len(v) > 0:
queries.append({'filename': None, 'sha1': v})
elif v != '':
queries.append({'filename': k, 'sha1': v})
if queries:
lookup = service.lookup_multiple_hashes(queries)
result = []
l = len(queries)
for el in lookup:
res_d = {'sha1': el['sha1'],
'found': el['found']}
if 'filename' in el and el['filename']:
res_d['filename'] = el['filename']
result.append(res_d)
search_res = result
nbfound = len([x for x in lookup if x['found']])
search_stats['nbfiles'] = l
search_stats['pct'] = (nbfound / l) * 100
response['search_res'] = search_res
response['search_stats'] = search_stats
return response
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
return [enrich_fn(x) for x in res]
return enrich_fn(res)
@app.route('/api/1/origin/<int:origin_id>/')
@app.route('/api/1/origin/<string:origin_type>/url/<path:origin_url>')
@doc.route('/api/1/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='origin identifier (when looking up by ID)')
@doc.arg('origin_type',
default='git',
argtype=doc.argtypes.str,
argdoc='origin type (when looking up by type+URL)')
@doc.arg('origin_url',
default='https://github.com/hylang/hy',
argtype=doc.argtypes.path,
argdoc='origin URL (when looking up by type+URL')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the origin corresponding to the given
criteria""")
def api_origin(origin_id=None, origin_type=None, origin_url=None):
"""Get information about a software origin.
Software origins might be looked up by origin type and canonical URL (e.g.,
"git" + a "git clone" URL), or by their unique (but otherwise meaningless)
identifier.
"""
ori_dict = {
'id': origin_id,
'type': origin_type,
'url': origin_url
}
ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]}
if 'id' in ori_dict:
error_msg = 'Origin with id %s not found.' % ori_dict['id']
else:
error_msg = 'Origin with type %s and URL %s not found' % (
ori_dict['type'], ori_dict['url'])
def _enrich_origin(origin):
if 'id' in origin:
o = origin.copy()
o['origin_visits_url'] = url_for('api_origin_visits',
origin_id=o['id'])
return o
return origin
return _api_lookup(
ori_dict, lookup_fn=service.lookup_origin,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_origin)
@app.route('/api/1/person/<int:person_id>/')
@doc.route('/api/1/person/')
@doc.arg('person_id',
default=42,
argtype=doc.argtypes.int,
argdoc='person identifier')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the person identified by person_id')
def api_person(person_id):
"""Get information about a person.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release/<string:sha1_git>/')
@doc.route('/api/1/release/')
@doc.arg('sha1_git',
default='7045404f3d1c54e6473c71bbb716529fbad4be24',
argtype=doc.argtypes.sha1_git,
argdoc='release identifier')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the release identified by sha1_git')
def api_release(sha1_git):
"""Get information about a release.
Releases are identified by SHA1 checksums, compatible with Git tag
identifiers. See ``release_identifier`` in our `data model module
<https://forge.softwareheritage.org/source/swh-model/browse/master/swh/model/identifiers.py>`_
for details about how they are computed.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_release)
def _revision_directory_by(revision, path, request_path,
limit=100, with_data=False):
"""Compute the revision matching criterion's directory or content data.
Args:
revision: dictionary of criterions representing a revision to lookup
path: directory's path to lookup
request_path: request path which holds the original context to
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
"""
def enrich_directory_local(dir, context_url=request_path):
return utils.enrich_directory(dir, context_url)
rev_id, result = service.lookup_directory_through_revision(
revision, path, limit=limit, with_data=with_data)
content = result['content']
if result['type'] == 'dir': # dir_entries
result['content'] = list(map(enrich_directory_local, content))
else: # content
result['content'] = utils.enrich_content(content)
return result
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/<path:path>/')
@doc.route('/api/1/revision/origin/directory/', tags=['hidden'])
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's origin's SWH identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""Optional timestamp (default to the nearest time
crawl of timestamp)""")
@doc.arg('path',
default='Dockerfile',
argtype=doc.argtypes.path,
argdoc='The path to the directory or file to display')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision corresponding to the
given criteria""")
def api_directory_through_revision_origin(origin_id,
branch_name="refs/heads/master",
ts=None,
path=None,
with_data=False):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _revision_directory_by(
{
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts
},
path,
request.path,
with_data=with_data)
@app.route('/api/1/revision'
'/origin/<int:origin_id>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/')
@doc.route('/api/1/revision/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""(optional) fully-qualified branch name, e.g.,
"refs/heads/master". Defaults to the master branch.""")
@doc.arg('ts',
- default='2000-01-17T11:23:54+00:00',
+ default=None,
argtype=doc.argtypes.ts,
argdoc="""(optional) timestamp close to which the revision pointed by
the given branch should be looked up. Defaults to now.""")
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_meta)
def api_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Get information about a revision, searching for it based on software
origin, branch name, and/or visit timestamp.
This endpoint behaves like ``/revision``, but operates on the revision that
has been found at a given software origin, close to a given point in time,
pointed by a given branch.
"""
- if ts:
- ts = utils.parse_timestamp(ts)
-
+ ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
utils.enrich_revision,
branch_name,
ts)
@app.route('/api/1/revision/<string:sha1_git>/prev/<path:context>/')
@doc.route('/api/1/revision/prev/', tags=['hidden'])
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier")
@doc.arg('context',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the revision identified by sha1_git')
def api_revision_with_context(sha1_git, context):
"""Return information about revision with id sha1_git.
"""
def _enrich_revision(revision, context=context):
return utils.enrich_revision(revision, context)
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
_enrich_revision)
@app.route('/api/1/revision/<string:sha1_git>/')
@doc.route('/api/1/revision/')
@doc.arg('sha1_git',
default='aafb16d69fd30ff58afdd69036a26047f3aebdc6',
argtype=doc.argtypes.sha1_git,
argdoc="revision identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_meta)
def api_revision(sha1_git):
"""Get information about a revision.
Revisions are identified by SHA1 checksums, compatible with Git commit
identifiers. See ``revision_identifier`` in our `data model module
<https://forge.softwareheritage.org/source/swh-model/browse/master/swh/model/identifiers.py>`_
for details about how they are computed.
"""
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
utils.enrich_revision)
@app.route('/api/1/revision/<string:sha1_git>/raw/')
@doc.route('/api/1/revision/raw/', tags=['hidden'])
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The queried revision's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc="""The message of the revision identified by sha1_git
as a downloadable octet stream""")
def api_revision_raw_message(sha1_git):
"""Return the raw data of the message of revision identified by sha1_git
"""
raw = service.lookup_revision_message(sha1_git)
return app.response_class(raw['message'],
headers={'Content-disposition': 'attachment;'
'filename=rev_%s_raw' % sha1_git},
mimetype='application/octet-stream')
@app.route('/api/1/revision/<string:sha1_git>/directory/')
@app.route('/api/1/revision/<string:sha1_git>/directory/<path:dir_path>/')
@doc.route('/api/1/revision/directory/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc='revision identifier')
@doc.arg('dir_path',
default='Documentation/BUG-HUNTING',
argtype=doc.argtypes.path,
argdoc="""path relative to the root directory of revision identifier by
sha1_git""")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""either a list of directory entries with their metadata,
or the metadata of a single directory entry""")
def api_revision_directory(sha1_git,
dir_path=None,
with_data=False):
"""Get information about directory (entry) objects associated to revisions.
Each revision is associated to a single "root" directory. This endpoint
behaves like ``/directory/``, but operates on the root directory associated
to a given revision.
"""
return _revision_directory_by(
{
'sha1_git': sha1_git
},
dir_path,
request.path,
with_data=with_data)
@app.route('/api/1/revision/<string:sha1_git>/log/')
@app.route('/api/1/revision/<string:sha1_git>/prev/<path:prev_sha1s>/log/')
@doc.route('/api/1/revision/log/')
@doc.arg('sha1_git',
default='37fc9e08d0c4b71807a4f1ecb06112e78d91c283',
argtype=doc.argtypes.sha1_git,
argdoc='revision identifier')
# @doc.arg('prev_sha1s',
# default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
# argtype=doc.argtypes.path,
# argdoc="""(Optional) Navigation breadcrumbs (descendant revisions
# previously visited). If multiple values, use / as delimiter. """)
@doc.header('Link', doc=_doc_header_link)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_log)
def api_revision_log(sha1_git, prev_sha1s=None):
"""Get a list of all revisions heading to a given one, i.e., show the
commit log.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
def lookup_revision_log_with_limit(s, limit=per_page+1):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
rev_get = _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
l = len(rev_get)
if l == per_page+1:
rev_backward = rev_get[:-1]
new_last_sha1 = rev_get[-1]['id']
params = {
'sha1_git': new_last_sha1,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_revision_log', **params)
}
else:
rev_backward = rev_get
if not prev_sha1s: # no nav breadcrumbs, so we're done
revisions = rev_backward
else:
rev_forward_ids = prev_sha1s.split('/')
rev_forward = _api_lookup(rev_forward_ids,
lookup_fn=service.lookup_revision_multiple,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
revisions = rev_forward + rev_backward
result.update({
'results': revisions
})
return result
@app.route('/api/1/revision'
'/origin/<int:origin_id>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/log/')
@doc.route('/api/1/revision/origin/log/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's SWH origin identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""(Optional) The revision's branch name within the origin specified.
Defaults to 'refs/heads/master'.""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""(Optional) A time or timestamp string to parse""")
@doc.header('Link', doc=_doc_header_link)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_log)
def api_revision_log_by(origin_id,
branch_name='refs/heads/master',
ts=None):
"""Show the commit log for a revision, searching for it based on software
origin, branch name, and/or visit timestamp.
This endpoint behaves like ``/log``, but operates on the revision that
has been found at a given software origin, close to a given point in time,
pointed by a given branch.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
if ts:
ts = utils.parse_timestamp(ts)
def lookup_revision_log_by_with_limit(o_id, br, ts, limit=per_page+1):
return service.lookup_revision_log_by(o_id, br, ts, limit)
error_msg = 'No revision matching origin %s ' % origin_id
error_msg += ', branch name %s' % branch_name
error_msg += (' and time stamp %s.' % ts) if ts else '.'
rev_get = _api_lookup(origin_id,
lookup_revision_log_by_with_limit,
error_msg,
utils.enrich_revision,
branch_name,
ts)
l = len(rev_get)
if l == per_page+1:
revisions = rev_get[:-1]
last_sha1_git = rev_get[-1]['id']
params = {
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts,
'sha1_git': last_sha1_git,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_revision_log_by', **params),
}
else:
revisions = rev_get
result.update({'results': revisions})
return result
@app.route('/api/1/directory/<string:sha1_git>/')
@app.route('/api/1/directory/<string:sha1_git>/<path:path>/')
@doc.route('/api/1/directory/')
@doc.arg('sha1_git',
default='1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8',
argtype=doc.argtypes.sha1_git,
argdoc='directory identifier')
@doc.arg('path',
default='codec/demux',
argtype=doc.argtypes.path,
argdoc='path relative to directory identified by sha1_git')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""either a list of directory entries with their metadata,
or the metadata of a single directory entry""")
def api_directory(sha1_git,
path=None):
"""Get information about directory or directory entry objects.
Directories are identified by SHA1 checksums, compatible with Git directory
identifiers. See ``directory_identifier`` in our `data model module
<https://forge.softwareheritage.org/source/swh-model/browse/master/swh/model/identifiers.py>`_
for details about how they are computed.
When given only a directory identifier, this endpoint returns information
about the directory itself, returning its content (usually a list of
directory entries). When given a directory identifier and a path, this
endpoint returns information about the directory entry pointed by the
relative path, starting path resolution from the given directory.
"""
if path:
error_msg_path = ('Entry with path %s relative to directory '
'with sha1_git %s not found.') % (path, sha1_git)
return _api_lookup(
sha1_git,
service.lookup_directory_with_path,
error_msg_path,
utils.enrich_directory,
path)
else:
error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
service.lookup_directory,
error_msg_nopath,
utils.enrich_directory)
@app.route('/api/1/provenance/<string:q>/')
@doc.route('/api/1/provenance/', tags=['hidden'])
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""List of provenance information (dict) for the matched
content.""")
def api_content_provenance(q):
"""Return content's provenance information if any.
"""
def _enrich_revision(provenance):
p = provenance.copy()
p['revision_url'] = url_for('api_revision',
sha1_git=provenance['revision'])
p['content_url'] = url_for('api_content_metadata',
q='sha1_git:%s' % provenance['content'])
p['origin_url'] = url_for('api_origin',
origin_id=provenance['origin'])
p['origin_visits_url'] = url_for('api_origin_visits',
origin_id=provenance['origin'])
p['origin_visit_url'] = url_for('api_origin_visit',
origin_id=provenance['origin'],
visit_id=provenance['visit'])
return p
return _api_lookup(
q,
lookup_fn=service.lookup_content_provenance,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_revision)
@app.route('/api/1/filetype/<string:q>/')
@doc.route('/api/1/filetype/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Filetype information (dict) for the matched
content.""")
def api_content_filetype(q):
"""Get information about the detected MIME type of a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_filetype,
error_msg_if_not_found='No filetype information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/language/<string:q>/')
@doc.route('/api/1/language/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Language information (dict) for the matched
content.""")
def api_content_language(q):
"""Get information about the detected (programming) language of a content
object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_language,
error_msg_if_not_found='No language information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/license/<string:q>/')
@doc.route('/api/1/license/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""License information (dict) for the matched
content.""")
def api_content_license(q):
"""Get information about the detected license of a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_license,
error_msg_if_not_found='No license information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/ctags/<string:q>/')
@doc.route('/api/1/ctags/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Ctags symbol (dict) for the matched
content.""")
def api_content_ctags(q):
"""Get information about all `Ctags <http://ctags.sourceforge.net/>`_-style
symbols defined in a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_ctags,
error_msg_if_not_found='No ctags symbol found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/content/<string:q>/raw/')
@doc.route('/api/1/content/raw/', tags=['upcoming'])
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc='The raw content data as an octet stream')
def api_content_raw(q):
"""Get the raw content of a content object (AKA "blob"), as a byte sequence.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return app.response_class(generate(content),
headers={'Content-disposition': 'attachment;'
'filename=content_%s_raw' % q},
mimetype='application/octet-stream')
@app.route('/api/1/content/<string:q>/')
@doc.route('/api/1/content/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""known metadata for content identified by q""")
def api_content_metadata(q):
"""Get information about a content (AKA "blob") object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=utils.enrich_content)
@app.route('/api/1/entity/<string:uuid>/')
@doc.route('/api/1/entity/', tags=['hidden'])
@doc.arg('uuid',
default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
argtype=doc.argtypes.uuid,
argdoc="The entity's uuid identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the entity identified by uuid')
def api_entity_by_uuid(uuid):
"""Return content information if content is found.
"""
return _api_lookup(
uuid,
lookup_fn=service.lookup_entity_by_uuid,
error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
enrich_fn=utils.enrich_entity)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 8:12 PM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3321620
Attached To
R65 Staging repository
Event Timeline
Log In to Comment