Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py
index ae765ac1f..7921c5474 100644
--- a/swh/web/ui/tests/test_utils.py
+++ b/swh/web/ui/tests/test_utils.py
@@ -1,905 +1,907 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import dateutil
import unittest
from unittest.mock import patch, call
from nose.tools import istest, nottest
from swh.web.ui import utils
class UtilsTestCase(unittest.TestCase):
def setUp(self):
self.url_map = [dict(rule='/other/<slug>',
methods=set(['GET', 'POST', 'HEAD']),
endpoint='foo'),
dict(rule='/some/old/url/<slug>',
methods=set(['GET', 'POST']),
endpoint='blablafn'),
dict(rule='/other/old/url/<int:id>',
methods=set(['GET', 'HEAD']),
endpoint='bar'),
dict(rule='/other',
methods=set([]),
endpoint=None),
dict(rule='/other2',
methods=set([]),
endpoint=None)]
@istest
def filter_endpoints_1(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/some')
# then
self.assertEquals(actual_data, {
'/some/old/url/<slug>': {
'methods': ['GET', 'POST'],
'endpoint': 'blablafn'
}
})
@istest
def filter_endpoints_2(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/other',
blacklist=['/other2'])
# then
# rules /other is skipped because its' exactly the prefix url
# rules /other2 is skipped because it's blacklisted
self.assertEquals(actual_data, {
'/other/<slug>': {
'methods': ['GET', 'HEAD', 'POST'],
'endpoint': 'foo'
},
'/other/old/url/<int:id>': {
'methods': ['GET', 'HEAD'],
'endpoint': 'bar'
}
})
@istest
def prepare_data_for_view_default_encoding(self):
self.maxDiff = None
# given
inputs = [
{
'data': b'some blah data'
},
{
'data': 1,
'data_url': '/api/1/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}]
# when
actual_result = utils.prepare_data_for_view(inputs)
# then
self.assertEquals(actual_result, [
{
'data': 'some blah data',
},
{
'data': 1,
'data_url': '/browse/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}
])
@istest
def prepare_data_for_view(self):
self.maxDiff = None
# given
inputs = [
{
'data': b'some blah data'
},
{
'data': 1,
'data_url': '/api/1/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}]
# when
actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
# then
self.assertEquals(actual_result, [
{
'data': 'some blah data',
},
{
'data': 1,
'data_url': '/browse/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}
])
@istest
def prepare_data_for_view_ko_cannot_decode(self):
self.maxDiff = None
# given
inputs = {
'data': 'hé dude!'.encode('utf8'),
}
actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
# then
self.assertEquals(actual_result, {
'data': "Cannot decode the data bytes, try and set another "
"encoding in the url (e.g. ?encoding=utf8) or "
"download directly the "
"content's raw data.",
})
@istest
def filter_field_keys_dict_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory1', 'file2'})
# then
self.assertEqual(actual_res, {})
@istest
def filter_field_keys_dict(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory', 'link'})
# then
self.assertEqual(actual_res, {'directory': 1, 'link': 3})
@istest
def filter_field_keys_list_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'1': 1, '2': 2, 'link': 3}],
{'d'})
# then
self.assertEqual(actual_res, [{}, {}])
@istest
def filter_field_keys_map(self):
# when
actual_res = utils.filter_field_keys(
map(lambda x: {'i': x['i']+1, 'j': x['j']},
[{'i': 1, 'j': None},
{'i': 2, 'j': None},
{'i': 3, 'j': None}]),
{'i'})
# then
self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}])
@istest
def filter_field_keys_list(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'dir': 1, 'fil': 2, 'lin': 3}],
{'directory', 'dir'})
# then
self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}])
@istest
def filter_field_keys_other(self):
# given
input_set = {1, 2}
# when
actual_res = utils.filter_field_keys(input_set, {'a', '1'})
# then
self.assertEqual(actual_res, input_set)
@istest
def fmap(self):
self.assertEquals([2, 3, None, 4],
utils.fmap(lambda x: x+1, [1, 2, None, 3]))
self.assertEquals([11, 12, 13],
list(utils.fmap(lambda x: x+10,
map(lambda x: x, [1, 2, 3]))))
self.assertEquals({'a': 2, 'b': 4},
utils.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
self.assertEquals(100,
utils.fmap(lambda x: x*10, 10))
self.assertEquals({'a': [2, 6], 'b': 4},
utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2}))
self.assertIsNone(utils.fmap(lambda x: x, None))
@istest
def person_to_string(self):
self.assertEqual(utils.person_to_string(dict(name='raboof',
email='foo@bar')),
'raboof <foo@bar>')
@istest
def parse_timestamp(self):
input_timestamps = [
+ None,
'2016-01-12',
'2016-01-12T09:19:12+0100',
'Today is January 1, 2047 at 8:21:00AM',
'1452591542',
]
output_dates = [
- datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc),
+ None,
+ datetime.datetime(2016, 1, 12, 0, 0),
datetime.datetime(2016, 1, 12, 9, 19, 12,
tzinfo=dateutil.tz.tzoffset(None, 3600)),
- datetime.datetime(2047, 1, 1, 8, 21, tzinfo=datetime.timezone.utc),
+ datetime.datetime(2047, 1, 1, 8, 21),
datetime.datetime(2016, 1, 12, 9, 39, 2,
tzinfo=datetime.timezone.utc),
]
for ts, exp_date in zip(input_timestamps, output_dates):
self.assertEquals(utils.parse_timestamp(ts), exp_date)
@istest
def enrich_release_0(self):
# when
actual_release = utils.enrich_release({})
# then
self.assertEqual(actual_release, {})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_1(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/content/sha1_git:123/'
# when
actual_release = utils.enrich_release({'target': '123',
'target_type': 'content'})
# then
self.assertEqual(actual_release, {
'target': '123',
'target_type': 'content',
'target_url': '/api/1/content/sha1_git:123/'
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:123')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_2(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/dir/23/'
# when
actual_release = utils.enrich_release({'target': '23',
'target_type': 'directory'})
# then
self.assertEqual(actual_release, {
'target': '23',
'target_type': 'directory',
'target_url': '/api/1/dir/23/'
})
mock_flask.url_for.assert_called_once_with('api_directory',
q='23')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_3(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/rev/3/'
# when
actual_release = utils.enrich_release({'target': '3',
'target_type': 'revision'})
# then
self.assertEqual(actual_release, {
'target': '3',
'target_type': 'revision',
'target_url': '/api/1/rev/3/'
})
mock_flask.url_for.assert_called_once_with('api_revision',
sha1_git='3')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_4(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/rev/4/'
# when
actual_release = utils.enrich_release({'target': '4',
'target_type': 'release'})
# then
self.assertEqual(actual_release, {
'target': '4',
'target_type': 'release',
'target_url': '/api/1/rev/4/'
})
mock_flask.url_for.assert_called_once_with('api_release',
sha1_git='4')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_no_type(self, mock_flask):
# when/then
self.assertEqual(utils.enrich_directory({'id': 'dir-id'}),
{'id': 'dir-id'})
# given
mock_flask.url_for.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'target': '123',
})
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'target': '123',
'target_url': '/api/content/sha1_git:123/',
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:123')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_with_context_and_type_file(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
}, context_url='/api/revision/revsha1/directory/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
'target_url': '/api/content/sha1_git:123/',
'file_url': '/api/revision/revsha1/directory'
'/prefix/path/hy/'
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:789')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_with_context_and_type_dir(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/directory/456/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'dir',
'name': 'emacs-42',
'target_type': 'file',
'target': '456',
}, context_url='/api/revision/origin/2/directory/some/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'dir',
'target_type': 'file',
'name': 'emacs-42',
'target': '456',
'target_url': '/api/directory/456/',
'dir_url': '/api/revision/origin/2/directory'
'/some/prefix/path/emacs-42/'
})
mock_flask.url_for.assert_called_once_with('api_directory',
sha1_git='456')
@istest
def enrich_content_without_hashes(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_content_with_hashes(self, mock_flask):
for h in ['sha1', 'sha256', 'sha1_git']:
# given
mock_flask.url_for.side_effect = [
'/api/content/%s:123/raw/' % h,
'/api/filetype/%s:123/' % h,
'/api/language/%s:123/' % h,
'/api/license/%s:123/' % h,
]
# when
enriched_content = utils.enrich_content(
{
'id': '123',
h: 'blahblah'
}
)
# then
self.assertEqual(
enriched_content,
{
'id': '123',
h: 'blahblah',
'data_url': '/api/content/%s:123/raw/' % h,
'filetype_url': '/api/filetype/%s:123/' % h,
'language_url': '/api/language/%s:123/' % h,
'license_url': '/api/license/%s:123/' % h,
}
)
mock_flask.url_for.assert_has_calls([
call('api_content_raw', q='%s:blahblah' % h),
call('api_content_filetype', q='%s:blahblah' % h),
call('api_content_language', q='%s:blahblah' % h),
call('api_content_license', q='%s:blahblah' % h),
])
mock_flask.reset()
@patch('swh.web.ui.utils.flask')
@istest
def enrich_content_with_hashes_and_top_level_url(self, mock_flask):
for h in ['sha1', 'sha256', 'sha1_git']:
# given
mock_flask.url_for.side_effect = [
'/api/content/%s:123/' % h,
'/api/content/%s:123/raw/' % h,
'/api/filetype/%s:123/' % h,
'/api/language/%s:123/' % h,
'/api/license/%s:123/' % h,
]
# when
enriched_content = utils.enrich_content(
{
'id': '123',
h: 'blahblah'
},
top_url=True
)
# then
self.assertEqual(
enriched_content,
{
'id': '123',
h: 'blahblah',
'content_url': '/api/content/%s:123/' % h,
'data_url': '/api/content/%s:123/raw/' % h,
'filetype_url': '/api/filetype/%s:123/' % h,
'language_url': '/api/language/%s:123/' % h,
'license_url': '/api/license/%s:123/' % h,
}
)
mock_flask.url_for.assert_has_calls([
call('api_content_metadata', q='%s:blahblah' % h),
call('api_content_raw', q='%s:blahblah' % h),
call('api_content_filetype', q='%s:blahblah' % h),
call('api_content_language', q='%s:blahblah' % h),
call('api_content_license', q='%s:blahblah' % h),
])
mock_flask.reset()
@istest
def enrich_entity_identity(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_entity_with_sha1(self, mock_flask):
# given
def url_for_test(fn, **entity):
return '/api/entity/' + entity['uuid'] + '/'
mock_flask.url_for.side_effect = url_for_test
# when
actual_entity = utils.enrich_entity({
'uuid': 'uuid-1',
'parent': 'uuid-parent',
'name': 'something'
})
# then
self.assertEqual(actual_entity, {
'uuid': 'uuid-1',
'uuid_url': '/api/entity/uuid-1/',
'parent': 'uuid-parent',
'parent_url': '/api/entity/uuid-parent/',
'name': 'something',
})
mock_flask.url_for.assert_has_calls([call('api_entity_by_uuid',
uuid='uuid-1'),
call('api_entity_by_uuid',
uuid='uuid-parent')])
@nottest
def _url_for_context_test(self, fn, **data):
if fn == 'api_revision':
if 'context' in data and data['context'] is not None:
return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa
else:
return '/api/revision/%s/' % data['sha1_git']
elif fn == 'api_revision_log':
if 'prev_sha1s' in data:
return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % data['sha1_git']
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_without_children_or_parent(self, mock_flask):
# given
def url_for_test(fn, **data):
if fn == 'api_revision':
return '/api/revision/' + data['sha1_git'] + '/'
elif fn == 'api_revision_log':
return '/api/revision/' + data['sha1_git'] + '/log/'
elif fn == 'api_directory':
return '/api/directory/' + data['sha1_git'] + '/'
elif fn == 'api_person':
return '/api/person/' + data['person_id'] + '/'
mock_flask.url_for.side_effect = url_for_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'directory': '123',
'author': {'id': '1'},
'committer': {'id': '2'},
})
expected_revision = {
'id': 'rev-id',
'directory': '123',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'directory_url': '/api/directory/123/',
'author': {'id': '1'},
'author_url': '/api/person/1/',
'committer': {'id': '2'},
'committer_url': '/api/person/2/'
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_person',
person_id='1'),
call('api_person',
person_id='2'),
call('api_directory',
sha1_git='123')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_children_and_parent_no_dir(self,
mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_no_context(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
})
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/']
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_context_empty_prev_list(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'parents': ['123'],
'children': ['456']}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_context_some_prev_list(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev1-rev/prev0-rev/log/'),
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev0-rev/prev/prev1-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456']}, context='prev1-rev/prev0-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev0-rev',
context='prev1-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev1-rev/prev0-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@nottest
def _url_for_rev_message_test(self, fn, **data):
if fn == 'api_revision':
if 'context' in data and data['context'] is not None:
return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa
else:
return '/api/revision/%s/' % data['sha1_git']
elif fn == 'api_revision_log':
if 'prev_sha1s' in data and data['prev_sha1s'] is not None:
return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % data['sha1_git']
elif fn == 'api_revision_raw_message':
return '/api/revision/' + data['sha1_git'] + '/raw/'
else:
return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_no_message(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_rev_message_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_invalid_message(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_rev_message_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'message_decoding_failed': True,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'message_decoding_failed': True,
'message_url': '/api/revision/rev-id/raw/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123'),
call('api_revision',
sha1_git='456')])
diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py
index ed06c64ae..e6fe7a77d 100644
--- a/swh/web/ui/utils.py
+++ b/swh/web/ui/utils.py
@@ -1,384 +1,390 @@
-# Copyright (C) 2015-2016 The Software Heritage developers
+# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import datetime
+
import flask
import re
+from datetime import datetime, timezone
from dateutil import parser
+from .exc import BadInputExc
+
def filter_endpoints(url_map, prefix_url_rule, blacklist=[]):
"""Filter endpoints by prefix url rule.
Args:
- url_map: Url Werkzeug.Map of rules
- prefix_url_rule: prefix url string
- blacklist: blacklist of some url
Returns:
Dictionary of url_rule with values methods and endpoint.
The key is the url, the associated value is a dictionary of
'methods' (possible http methods) and 'endpoint' (python function)
"""
out = {}
for r in url_map:
rule = r['rule']
if rule == prefix_url_rule or rule in blacklist:
continue
if rule.startswith(prefix_url_rule):
out[rule] = {'methods': sorted(map(str, r['methods'])),
'endpoint': r['endpoint']}
return out
def fmap(f, data):
"""Map f to data at each level.
This must keep the origin data structure type:
- map -> map
- dict -> dict
- list -> list
- None -> None
Args:
f: function that expects one argument.
data: data to traverse to apply the f function.
list, map, dict or bare value.
Returns:
The same data-structure with modified values by the f function.
"""
- if not data:
+ if data is None:
return data
if isinstance(data, map):
return map(lambda y: fmap(f, y), (x for x in data))
if isinstance(data, list):
return [fmap(f, x) for x in data]
if isinstance(data, dict):
return {k: fmap(f, v) for (k, v) in data.items()}
return f(data)
def prepare_data_for_view(data, encoding='utf-8'):
def prepare_data(s):
# Note: can only be 'data' key with bytes of raw content
if isinstance(s, bytes):
try:
return s.decode(encoding)
except:
return "Cannot decode the data bytes, try and set another " \
"encoding in the url (e.g. ?encoding=utf8) or " \
"download directly the " \
"content's raw data."
if isinstance(s, str):
return re.sub(r'/api/1/', r'/browse/', s)
return s
return fmap(prepare_data, data)
def filter_field_keys(data, field_keys):
"""Given an object instance (directory or list), and a csv field keys
to filter on.
Return the object instance with filtered keys.
Note: Returns obj as is if it's an instance of types not in (dictionary,
list)
Args:
- data: one object (dictionary, list...) to filter.
- field_keys: csv or set of keys to filter the object on
Returns:
obj filtered on field_keys
"""
if isinstance(data, map):
return map(lambda x: filter_field_keys(x, field_keys), data)
if isinstance(data, list):
return [filter_field_keys(x, field_keys) for x in data]
if isinstance(data, dict):
return {k: v for (k, v) in data.items() if k in field_keys}
return data
def person_to_string(person):
"""Map a person (person, committer, tagger, etc...) to a string.
"""
return ''.join([person['name'], ' <', person['email'], '>'])
def parse_timestamp(timestamp):
"""Given a time or timestamp (as string), parse the result as datetime.
Returns:
- a timezone-aware datetime representing the parsed value. If the parsed
- value doesn't specify a timezone, UTC is assumed.
+ a timezone-aware datetime representing the parsed value.
+ None if the parsing fails.
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- Today is January 1, 2047 at 8:21:00AM
- 1452591542
+
"""
- default_timestamp = datetime.datetime.utcfromtimestamp(0).replace(
- tzinfo=datetime.timezone.utc)
+ if not timestamp:
+ return None
+
try:
- res = parser.parse(timestamp, ignoretz=False, fuzzy=True,
- default=default_timestamp)
+ return parser.parse(timestamp, ignoretz=False, fuzzy=True)
except:
- res = datetime.datetime.utcfromtimestamp(float(timestamp)).replace(
- tzinfo=datetime.timezone.utc)
- return res
+ try:
+ return datetime.utcfromtimestamp(float(timestamp)).replace(
+ tzinfo=timezone.utc)
+ except (ValueError, OverflowError) as e:
+ raise BadInputExc(e)
def enrich_object(object):
"""Enrich an object (revision, release) with link to the 'target' of
type 'target_type'.
Args:
object: An object with target and target_type keys
(e.g. release, revision)
Returns:
Object enriched with target_url pointing to the right
swh.web.ui.api urls for the pointing object (revision,
release, content, directory)
"""
obj = object.copy()
if 'target' in obj and 'target_type' in obj:
if obj['target_type'] == 'revision':
obj['target_url'] = flask.url_for('api_revision',
sha1_git=obj['target'])
elif obj['target_type'] == 'release':
obj['target_url'] = flask.url_for('api_release',
sha1_git=obj['target'])
elif obj['target_type'] == 'content':
obj['target_url'] = flask.url_for(
'api_content_metadata',
q='sha1_git:' + obj['target'])
elif obj['target_type'] == 'directory':
obj['target_url'] = flask.url_for('api_directory',
q=obj['target'])
return obj
enrich_release = enrich_object
def enrich_directory(directory, context_url=None):
"""Enrich directory with url to content or directory.
"""
if 'type' in directory:
target_type = directory['type']
target = directory['target']
if target_type == 'file':
directory['target_url'] = flask.url_for('api_content_metadata',
q='sha1_git:%s' % target)
if context_url:
directory['file_url'] = context_url + directory['name'] + '/'
else:
directory['target_url'] = flask.url_for('api_directory',
sha1_git=target)
if context_url:
directory['dir_url'] = context_url + directory['name'] + '/'
return directory
def enrich_metadata_endpoint(content):
"""Enrich metadata endpoint with link to the upper metadata endpoint.
"""
c = content.copy()
c['content_url'] = flask.url_for('api_content_metadata',
q='sha1:%s' % c['id'])
return c
def enrich_content(content, top_url=False):
"""Enrich content with links to:
- data_url: its raw data
- filetype_url: its filetype information
"""
for h in ['sha1', 'sha1_git', 'sha256']:
if h in content:
q = '%s:%s' % (h, content[h])
if top_url:
content['content_url'] = flask.url_for('api_content_metadata',
q=q)
content['data_url'] = flask.url_for('api_content_raw', q=q)
content['filetype_url'] = flask.url_for('api_content_filetype',
q=q)
content['language_url'] = flask.url_for('api_content_language',
q=q)
content['license_url'] = flask.url_for('api_content_license',
q=q)
break
return content
def enrich_entity(entity):
"""Enrich entity with
"""
if 'uuid' in entity:
entity['uuid_url'] = flask.url_for('api_entity_by_uuid',
uuid=entity['uuid'])
if 'parent' in entity and entity['parent']:
entity['parent_url'] = flask.url_for('api_entity_by_uuid',
uuid=entity['parent'])
return entity
def _get_path_list(path_string):
"""Helper for enrich_revision: get a list of the sha1 id of the navigation
breadcrumbs, ordered from the oldest to the most recent.
Args:
path_string: the path as a '/'-separated string
Returns:
The navigation context as a list of sha1 revision ids
"""
return path_string.split('/')
def _get_revision_contexts(rev_id, context):
"""Helper for enrich_revision: retrieve for the revision id and potentially
the navigation breadcrumbs the context to pass to parents and children of
of the revision.
Args:
rev_id: the revision's sha1 id
context: the current navigation context
Returns:
The context for parents, children and the url of the direct child as a
tuple in that order.
"""
context_for_parents = None
context_for_children = None
url_direct_child = None
if not context:
return (rev_id, None, None)
path_list = _get_path_list(context)
context_for_parents = '%s/%s' % (context, rev_id)
prev_for_children = path_list[:-1]
if len(prev_for_children) > 0:
context_for_children = '/'.join(prev_for_children)
child_id = path_list[-1]
# This commit is not the first commit in the path
if context_for_children:
url_direct_child = flask.url_for(
'api_revision',
sha1_git=child_id,
context=context_for_children)
# This commit is the first commit in the path
else:
url_direct_child = flask.url_for(
'api_revision',
sha1_git=child_id)
return (context_for_parents, context_for_children, url_direct_child)
def _make_child_url(rev_children, context):
"""Helper for enrich_revision: retrieve the list of urls corresponding
to the children of the current revision according to the navigation
breadcrumbs.
Args:
rev_children: a list of revision id
context: the '/'-separated navigation breadcrumbs
Returns:
the list of the children urls according to the context
"""
children = []
for child in rev_children:
if context and child != _get_path_list(context)[-1]:
children.append(flask.url_for('api_revision', sha1_git=child))
elif not context:
children.append(flask.url_for('api_revision', sha1_git=child))
return children
def enrich_revision(revision, context=None):
"""Enrich revision with links where it makes sense (directory, parents).
Keep track of the navigation breadcrumbs if they are specified.
Args:
revision: the revision as a dict
context: the navigation breadcrumbs as a /-separated string of revision
sha1_git
"""
ctx_parents, ctx_children, url_direct_child = _get_revision_contexts(
revision['id'], context)
revision['url'] = flask.url_for('api_revision', sha1_git=revision['id'])
revision['history_url'] = flask.url_for('api_revision_log',
sha1_git=revision['id'])
if context:
revision['history_context_url'] = flask.url_for(
'api_revision_log',
sha1_git=revision['id'],
prev_sha1s=context)
if 'author' in revision:
author = revision['author']
revision['author_url'] = flask.url_for('api_person',
person_id=author['id'])
if 'committer' in revision:
committer = revision['committer']
revision['committer_url'] = flask.url_for('api_person',
person_id=committer['id'])
if 'directory' in revision:
revision['directory_url'] = flask.url_for(
'api_directory',
sha1_git=revision['directory'])
if 'parents' in revision:
parents = []
for parent in revision['parents']:
parents.append(flask.url_for('api_revision',
sha1_git=parent))
revision['parent_urls'] = parents
if 'children' in revision:
children = _make_child_url(revision['children'], context)
if url_direct_child:
children.append(url_direct_child)
revision['children_urls'] = children
else:
if url_direct_child:
revision['children_urls'] = [url_direct_child]
if 'message_decoding_failed' in revision:
revision['message_url'] = flask.url_for(
'api_revision_raw_message',
sha1_git=revision['id'])
return revision
diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py
index 1717080af..7a5a3e921 100644
--- a/swh/web/ui/views/api.py
+++ b/swh/web/ui/views/api.py
@@ -1,1098 +1,1096 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for
from swh.web.ui import service, utils, apidoc as doc
from swh.web.ui.exc import NotFoundExc
from swh.web.ui.main import app
# canned doc string snippets that are used in several doc strings
_doc_arg_content_id = """A "[hash_type:]hash" content identifier, where
hash_type is one of "sha1" (the default), "sha1_git", "sha256", and hash is
a checksum obtained with the hash_type hashing algorithm."""
_doc_arg_last_elt = 'element to start listing from, for pagination purposes'
_doc_arg_per_page = 'number of elements to list, for pagination purposes'
_doc_exc_bad_id = 'syntax error in the given identifier(s)'
_doc_exc_id_not_found = 'no object matching the given criteria could be found'
_doc_ret_revision_meta = 'metadata of the revision identified by sha1_git'
_doc_ret_revision_log = """list of dictionaries representing the metadata of
each revision found in the commit log heading to revision sha1_git.
For each commit at least the following information are returned:
author/committer, authoring/commit timestamps, revision id, commit message,
parent (i.e., immediately preceding) commits, "root" directory id."""
_doc_header_link = """indicates that a subsequent result page is available,
pointing to it"""
@app.route('/api/1/stat/counters/')
@doc.route('/api/1/stat/counters/', noargs=True)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""dictionary mapping object types to the amount of
corresponding objects currently available in the archive""")
def api_stats():
"""Get statistics about the content of the archive.
"""
return service.stat_counters()
@app.route('/api/1/origin/<int:origin_id>/visits/')
@doc.route('/api/1/origin/visits/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.header('Link', doc=_doc_header_link)
@doc.param('last_visit', default=None,
argtype=doc.argtypes.int,
doc=_doc_arg_last_elt)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.returns(rettype=doc.rettypes.list,
retdoc="""a list of dictionaries describing individual visits.
For each visit, its identifier, timestamp (as UNIX time), outcome,
and visit-specific URL for more information are given.""")
def api_origin_visits(origin_id):
"""Get information about all visits of a given software origin.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
last_visit = request.args.get('last_visit')
if last_visit:
last_visit = int(last_visit)
def _lookup_origin_visits(
origin_id, last_visit=last_visit, per_page=per_page):
return service.lookup_origin_visits(
origin_id, last_visit=last_visit, per_page=per_page)
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_visit_url'] = url_for('api_origin_visit',
origin_id=ov['origin'],
visit_id=ov['visit'])
return ov
r = _api_lookup(
origin_id,
_lookup_origin_visits,
error_msg_if_not_found='No origin %s found' % origin_id,
enrich_fn=_enrich_origin_visit)
if r:
l = len(r)
if l == per_page:
new_last_visit = r[-1]['visit']
params = {
'origin_id': origin_id,
'last_visit': new_last_visit
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_origin_visits', **params)
}
result.update({
'results': r
})
return result
@app.route('/api/1/origin/<int:origin_id>/visit/<int:visit_id>/')
@doc.route('/api/1/origin/visit/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.arg('visit_id',
default=1,
argtype=doc.argtypes.int,
argdoc="""visit identifier, relative to the origin identified by
origin_id""")
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""dictionary containing both metadata for the entire
visit (e.g., timestamp as UNIX time, visit outcome, etc.) and what
was at the software origin during the visit (i.e., a mapping from
branches to other archive objects""")
def api_origin_visit(origin_id, visit_id):
"""Get information about a specific visit of a software origin.
"""
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_url'] = url_for('api_origin', origin_id=ov['origin'])
if 'occurrences' in ov:
ov['occurrences'] = {
k: utils.enrich_object(v)
for k, v in ov['occurrences'].items()
}
return ov
return _api_lookup(
origin_id,
service.lookup_origin_visit,
'No visit %s for origin %s found' % (visit_id, origin_id),
_enrich_origin_visit,
visit_id)
@app.route('/api/1/content/symbol/', methods=['POST'])
@app.route('/api/1/content/symbol/<string:q>/')
@doc.route('/api/1/content/symbol/', tags=['upcoming'])
@doc.arg('q',
default='hello',
argtype=doc.argtypes.str,
argdoc="""An expression string to lookup in swh's raw content""")
@doc.header('Link', doc=_doc_header_link)
@doc.param('last_sha1', default=None,
argtype=doc.argtypes.str,
doc=_doc_arg_last_elt)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.returns(rettype=doc.rettypes.list,
retdoc="""A list of dict whose content matches the expression.
Each dict has the following keys:
- id (bytes): identifier of the content
- name (text): symbol whose content match the expression
- kind (text): kind of the symbol that matched
- lang (text): Language for that entry
- line (int): Number line for the symbol
""")
def api_content_symbol(q=None):
"""Search content objects by `Ctags <http://ctags.sourceforge.net/>`_-style
symbol (e.g., function name, data type, method, ...).
"""
result = {}
last_sha1 = request.args.get('last_sha1', None)
per_page = int(request.args.get('per_page', '10'))
def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page):
return service.lookup_expression(exp, last_sha1, per_page)
symbols = _api_lookup(
q,
lookup_fn=lookup_exp,
error_msg_if_not_found='No indexed raw content match expression \''
'%s\'.' % q,
enrich_fn=lambda x: utils.enrich_content(x, top_url=True))
if symbols:
l = len(symbols)
if l == per_page:
new_last_sha1 = symbols[-1]['sha1']
params = {
'q': q,
'last_sha1': new_last_sha1,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_content_symbol', **params),
}
result.update({
'results': symbols
})
return result
@app.route('/api/1/content/known/', methods=['POST'])
@app.route('/api/1/content/known/<string:q>/')
@doc.route('/api/1/content/known/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.sha1,
argdoc='content identifier as a sha1 checksum')
# @doc.param('q', default=None,
# argtype=doc.argtypes.str,
# doc="""(POST request) An algo_hash:hash string, where algo_hash
# is one of sha1, sha1_git or sha256 and hash is the hash to
# search for in SWH""")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""a dictionary with results (found/not found for each given
identifier) and statistics about how many identifiers were
found""")
def api_check_content_known(q=None):
"""Check whether some content (AKA "blob") is present in the archive.
Lookup can be performed by various means:
- a GET request with one or several hashes, separated by ','
- a POST request with one or several hashes, passed as (multiple) values
for parameter 'q'
"""
response = {'search_res': None,
'search_stats': None}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
queries = []
# GET: Many hash separated values request
if q:
hashes = q.split(',')
for v in hashes:
queries.append({'filename': None, 'sha1': v})
# POST: Many hash requests in post form submission
elif request.method == 'POST':
data = request.form
# Remove potential inputs with no associated value
for k, v in data.items():
if v is not None:
if k == 'q' and len(v) > 0:
queries.append({'filename': None, 'sha1': v})
elif v != '':
queries.append({'filename': k, 'sha1': v})
if queries:
lookup = service.lookup_multiple_hashes(queries)
result = []
l = len(queries)
for el in lookup:
res_d = {'sha1': el['sha1'],
'found': el['found']}
if 'filename' in el and el['filename']:
res_d['filename'] = el['filename']
result.append(res_d)
search_res = result
nbfound = len([x for x in lookup if x['found']])
search_stats['nbfiles'] = l
search_stats['pct'] = (nbfound / l) * 100
response['search_res'] = search_res
response['search_stats'] = search_stats
return response
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
return [enrich_fn(x) for x in res]
return enrich_fn(res)
@app.route('/api/1/origin/<int:origin_id>/')
@app.route('/api/1/origin/<string:origin_type>/url/<path:origin_url>')
@doc.route('/api/1/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='origin identifier (when looking up by ID)')
@doc.arg('origin_type',
default='git',
argtype=doc.argtypes.str,
argdoc='origin type (when looking up by type+URL)')
@doc.arg('origin_url',
default='https://github.com/hylang/hy',
argtype=doc.argtypes.path,
argdoc='origin URL (when looking up by type+URL')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the origin corresponding to the given
criteria""")
def api_origin(origin_id=None, origin_type=None, origin_url=None):
"""Get information about a software origin.
Software origins might be looked up by origin type and canonical URL (e.g.,
"git" + a "git clone" URL), or by their unique (but otherwise meaningless)
identifier.
"""
ori_dict = {
'id': origin_id,
'type': origin_type,
'url': origin_url
}
ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]}
if 'id' in ori_dict:
error_msg = 'Origin with id %s not found.' % ori_dict['id']
else:
error_msg = 'Origin with type %s and URL %s not found' % (
ori_dict['type'], ori_dict['url'])
def _enrich_origin(origin):
if 'id' in origin:
o = origin.copy()
o['origin_visits_url'] = url_for('api_origin_visits',
origin_id=o['id'])
return o
return origin
return _api_lookup(
ori_dict, lookup_fn=service.lookup_origin,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_origin)
@app.route('/api/1/person/<int:person_id>/')
@doc.route('/api/1/person/')
@doc.arg('person_id',
default=42,
argtype=doc.argtypes.int,
argdoc='person identifier')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the person identified by person_id')
def api_person(person_id):
"""Get information about a person.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release/<string:sha1_git>/')
@doc.route('/api/1/release/')
@doc.arg('sha1_git',
default='7045404f3d1c54e6473c71bbb716529fbad4be24',
argtype=doc.argtypes.sha1_git,
argdoc='release identifier')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the release identified by sha1_git')
def api_release(sha1_git):
"""Get information about a release.
Releases are identified by SHA1 checksums, compatible with Git tag
identifiers. See ``release_identifier`` in our `data model module
<https://forge.softwareheritage.org/source/swh-model/browse/master/swh/model/identifiers.py>`_
for details about how they are computed.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_release)
def _revision_directory_by(revision, path, request_path,
limit=100, with_data=False):
"""Compute the revision matching criterion's directory or content data.
Args:
revision: dictionary of criterions representing a revision to lookup
path: directory's path to lookup
request_path: request path which holds the original context to
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
"""
def enrich_directory_local(dir, context_url=request_path):
return utils.enrich_directory(dir, context_url)
rev_id, result = service.lookup_directory_through_revision(
revision, path, limit=limit, with_data=with_data)
content = result['content']
if result['type'] == 'dir': # dir_entries
result['content'] = list(map(enrich_directory_local, content))
else: # content
result['content'] = utils.enrich_content(content)
return result
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/<path:path>/')
@doc.route('/api/1/revision/origin/directory/', tags=['hidden'])
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's origin's SWH identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""Optional timestamp (default to the nearest time
crawl of timestamp)""")
@doc.arg('path',
default='Dockerfile',
argtype=doc.argtypes.path,
argdoc='The path to the directory or file to display')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision corresponding to the
given criteria""")
def api_directory_through_revision_origin(origin_id,
branch_name="refs/heads/master",
ts=None,
path=None,
with_data=False):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _revision_directory_by(
{
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts
},
path,
request.path,
with_data=with_data)
@app.route('/api/1/revision'
'/origin/<int:origin_id>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/')
@doc.route('/api/1/revision/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""(optional) fully-qualified branch name, e.g.,
"refs/heads/master". Defaults to the master branch.""")
@doc.arg('ts',
- default='2000-01-17T11:23:54+00:00',
+ default=None,
argtype=doc.argtypes.ts,
argdoc="""(optional) timestamp close to which the revision pointed by
the given branch should be looked up. Defaults to now.""")
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_meta)
def api_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Get information about a revision, searching for it based on software
origin, branch name, and/or visit timestamp.
This endpoint behaves like ``/revision``, but operates on the revision that
has been found at a given software origin, close to a given point in time,
pointed by a given branch.
"""
- if ts:
- ts = utils.parse_timestamp(ts)
-
+ ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
utils.enrich_revision,
branch_name,
ts)
@app.route('/api/1/revision/<string:sha1_git>/prev/<path:context>/')
@doc.route('/api/1/revision/prev/', tags=['hidden'])
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier")
@doc.arg('context',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the revision identified by sha1_git')
def api_revision_with_context(sha1_git, context):
"""Return information about revision with id sha1_git.
"""
def _enrich_revision(revision, context=context):
return utils.enrich_revision(revision, context)
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
_enrich_revision)
@app.route('/api/1/revision/<string:sha1_git>/')
@doc.route('/api/1/revision/')
@doc.arg('sha1_git',
default='aafb16d69fd30ff58afdd69036a26047f3aebdc6',
argtype=doc.argtypes.sha1_git,
argdoc="revision identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_meta)
def api_revision(sha1_git):
"""Get information about a revision.
Revisions are identified by SHA1 checksums, compatible with Git commit
identifiers. See ``revision_identifier`` in our `data model module
<https://forge.softwareheritage.org/source/swh-model/browse/master/swh/model/identifiers.py>`_
for details about how they are computed.
"""
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
utils.enrich_revision)
@app.route('/api/1/revision/<string:sha1_git>/raw/')
@doc.route('/api/1/revision/raw/', tags=['hidden'])
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The queried revision's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc="""The message of the revision identified by sha1_git
as a downloadable octet stream""")
def api_revision_raw_message(sha1_git):
"""Return the raw data of the message of revision identified by sha1_git
"""
raw = service.lookup_revision_message(sha1_git)
return app.response_class(raw['message'],
headers={'Content-disposition': 'attachment;'
'filename=rev_%s_raw' % sha1_git},
mimetype='application/octet-stream')
@app.route('/api/1/revision/<string:sha1_git>/directory/')
@app.route('/api/1/revision/<string:sha1_git>/directory/<path:dir_path>/')
@doc.route('/api/1/revision/directory/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc='revision identifier')
@doc.arg('dir_path',
default='Documentation/BUG-HUNTING',
argtype=doc.argtypes.path,
argdoc="""path relative to the root directory of revision identifier by
sha1_git""")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""either a list of directory entries with their metadata,
or the metadata of a single directory entry""")
def api_revision_directory(sha1_git,
dir_path=None,
with_data=False):
"""Get information about directory (entry) objects associated to revisions.
Each revision is associated to a single "root" directory. This endpoint
behaves like ``/directory/``, but operates on the root directory associated
to a given revision.
"""
return _revision_directory_by(
{
'sha1_git': sha1_git
},
dir_path,
request.path,
with_data=with_data)
@app.route('/api/1/revision/<string:sha1_git>/log/')
@app.route('/api/1/revision/<string:sha1_git>/prev/<path:prev_sha1s>/log/')
@doc.route('/api/1/revision/log/')
@doc.arg('sha1_git',
default='37fc9e08d0c4b71807a4f1ecb06112e78d91c283',
argtype=doc.argtypes.sha1_git,
argdoc='revision identifier')
# @doc.arg('prev_sha1s',
# default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
# argtype=doc.argtypes.path,
# argdoc="""(Optional) Navigation breadcrumbs (descendant revisions
# previously visited). If multiple values, use / as delimiter. """)
@doc.header('Link', doc=_doc_header_link)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_log)
def api_revision_log(sha1_git, prev_sha1s=None):
"""Get a list of all revisions heading to a given one, i.e., show the
commit log.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
def lookup_revision_log_with_limit(s, limit=per_page+1):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
rev_get = _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
l = len(rev_get)
if l == per_page+1:
rev_backward = rev_get[:-1]
new_last_sha1 = rev_get[-1]['id']
params = {
'sha1_git': new_last_sha1,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_revision_log', **params)
}
else:
rev_backward = rev_get
if not prev_sha1s: # no nav breadcrumbs, so we're done
revisions = rev_backward
else:
rev_forward_ids = prev_sha1s.split('/')
rev_forward = _api_lookup(rev_forward_ids,
lookup_fn=service.lookup_revision_multiple,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
revisions = rev_forward + rev_backward
result.update({
'results': revisions
})
return result
@app.route('/api/1/revision'
'/origin/<int:origin_id>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/log/')
@doc.route('/api/1/revision/origin/log/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's SWH origin identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""(Optional) The revision's branch name within the origin specified.
Defaults to 'refs/heads/master'.""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""(Optional) A time or timestamp string to parse""")
@doc.header('Link', doc=_doc_header_link)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_log)
def api_revision_log_by(origin_id,
branch_name='refs/heads/master',
ts=None):
"""Show the commit log for a revision, searching for it based on software
origin, branch name, and/or visit timestamp.
This endpoint behaves like ``/log``, but operates on the revision that
has been found at a given software origin, close to a given point in time,
pointed by a given branch.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
if ts:
ts = utils.parse_timestamp(ts)
def lookup_revision_log_by_with_limit(o_id, br, ts, limit=per_page+1):
return service.lookup_revision_log_by(o_id, br, ts, limit)
error_msg = 'No revision matching origin %s ' % origin_id
error_msg += ', branch name %s' % branch_name
error_msg += (' and time stamp %s.' % ts) if ts else '.'
rev_get = _api_lookup(origin_id,
lookup_revision_log_by_with_limit,
error_msg,
utils.enrich_revision,
branch_name,
ts)
l = len(rev_get)
if l == per_page+1:
revisions = rev_get[:-1]
last_sha1_git = rev_get[-1]['id']
params = {
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts,
'sha1_git': last_sha1_git,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_revision_log_by', **params),
}
else:
revisions = rev_get
result.update({'results': revisions})
return result
@app.route('/api/1/directory/<string:sha1_git>/')
@app.route('/api/1/directory/<string:sha1_git>/<path:path>/')
@doc.route('/api/1/directory/')
@doc.arg('sha1_git',
default='1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8',
argtype=doc.argtypes.sha1_git,
argdoc='directory identifier')
@doc.arg('path',
default='codec/demux',
argtype=doc.argtypes.path,
argdoc='path relative to directory identified by sha1_git')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""either a list of directory entries with their metadata,
or the metadata of a single directory entry""")
def api_directory(sha1_git,
path=None):
"""Get information about directory or directory entry objects.
Directories are identified by SHA1 checksums, compatible with Git directory
identifiers. See ``directory_identifier`` in our `data model module
<https://forge.softwareheritage.org/source/swh-model/browse/master/swh/model/identifiers.py>`_
for details about how they are computed.
When given only a directory identifier, this endpoint returns information
about the directory itself, returning its content (usually a list of
directory entries). When given a directory identifier and a path, this
endpoint returns information about the directory entry pointed by the
relative path, starting path resolution from the given directory.
"""
if path:
error_msg_path = ('Entry with path %s relative to directory '
'with sha1_git %s not found.') % (path, sha1_git)
return _api_lookup(
sha1_git,
service.lookup_directory_with_path,
error_msg_path,
utils.enrich_directory,
path)
else:
error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
service.lookup_directory,
error_msg_nopath,
utils.enrich_directory)
@app.route('/api/1/provenance/<string:q>/')
@doc.route('/api/1/provenance/', tags=['hidden'])
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""List of provenance information (dict) for the matched
content.""")
def api_content_provenance(q):
"""Return content's provenance information if any.
"""
def _enrich_revision(provenance):
p = provenance.copy()
p['revision_url'] = url_for('api_revision',
sha1_git=provenance['revision'])
p['content_url'] = url_for('api_content_metadata',
q='sha1_git:%s' % provenance['content'])
p['origin_url'] = url_for('api_origin',
origin_id=provenance['origin'])
p['origin_visits_url'] = url_for('api_origin_visits',
origin_id=provenance['origin'])
p['origin_visit_url'] = url_for('api_origin_visit',
origin_id=provenance['origin'],
visit_id=provenance['visit'])
return p
return _api_lookup(
q,
lookup_fn=service.lookup_content_provenance,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_revision)
@app.route('/api/1/filetype/<string:q>/')
@doc.route('/api/1/filetype/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Filetype information (dict) for the matched
content.""")
def api_content_filetype(q):
"""Get information about the detected MIME type of a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_filetype,
error_msg_if_not_found='No filetype information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/language/<string:q>/')
@doc.route('/api/1/language/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Language information (dict) for the matched
content.""")
def api_content_language(q):
"""Get information about the detected (programming) language of a content
object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_language,
error_msg_if_not_found='No language information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/license/<string:q>/')
@doc.route('/api/1/license/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""License information (dict) for the matched
content.""")
def api_content_license(q):
"""Get information about the detected license of a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_license,
error_msg_if_not_found='No license information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/ctags/<string:q>/')
@doc.route('/api/1/ctags/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Ctags symbol (dict) for the matched
content.""")
def api_content_ctags(q):
"""Get information about all `Ctags <http://ctags.sourceforge.net/>`_-style
symbols defined in a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_ctags,
error_msg_if_not_found='No ctags symbol found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/content/<string:q>/raw/')
@doc.route('/api/1/content/raw/', tags=['upcoming'])
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc='The raw content data as an octet stream')
def api_content_raw(q):
"""Get the raw content of a content object (AKA "blob"), as a byte sequence.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return app.response_class(generate(content),
headers={'Content-disposition': 'attachment;'
'filename=content_%s_raw' % q},
mimetype='application/octet-stream')
@app.route('/api/1/content/<string:q>/')
@doc.route('/api/1/content/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""known metadata for content identified by q""")
def api_content_metadata(q):
"""Get information about a content (AKA "blob") object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=utils.enrich_content)
@app.route('/api/1/entity/<string:uuid>/')
@doc.route('/api/1/entity/', tags=['hidden'])
@doc.arg('uuid',
default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
argtype=doc.argtypes.uuid,
argdoc="The entity's uuid identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the entity identified by uuid')
def api_entity_by_uuid(uuid):
"""Return content information if content is found.
"""
return _api_lookup(
uuid,
lookup_fn=service.lookup_entity_by_uuid,
error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
enrich_fn=utils.enrich_entity)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 8:12 PM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3321620

Event Timeline