diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index e0b6e723..650bff81 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,954 +1,961 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dateutil import unittest from unittest.mock import patch, call from nose.tools import istest, nottest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @istest def prepare_data_for_view_default_encoding(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs) # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view_ko_cannot_decode(self): self.maxDiff = None # given inputs = { 'data': 'hé dude!'.encode('utf8'), } actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, { 'data': "Cannot decode the data bytes, try and set another " "encoding in the url (e.g. ?encoding=utf8) or " "download directly the " "content's raw data.", }) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_map(self): # when actual_res = utils.filter_field_keys( map(lambda x: {'i': x['i']+1, 'j': x['j']}, [{'i': 1, 'j': None}, {'i': 2, 'j': None}, {'i': 3, 'j': None}]), {'i'}) # then self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_complex_1(self): actual_res = utils.filter_field_keys( { 'list': [ {'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3} ] }, {'list', 'directory', 'dir'}) # then self.assertEqual(actual_res, {'list': [{'directory': 1}, {'dir': 1}]}) @istest def filter_field_keys_complex_2(self): actual_res = utils.filter_field_keys( { 'list': [ [{'directory': 1, 'file': 2, 'link': 3}], [{'dir': 1, 'fil': 2, 'lin': 3}], ] }, {'list', 'directory', 'dir'}) # then self.assertEqual( actual_res, { 'list': [ [{'directory': 1}], [{'dir': 1}] ] }) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, None, 4], utils.fmap(lambda x: x+1, [1, 2, None, 3])) self.assertEquals([11, 12, 13], list(utils.fmap(lambda x: x+10, map(lambda x: x, [1, 2, 3])))) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) self.assertEquals({'a': [2, 6], 'b': 4}, utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) self.assertIsNone(utils.fmap(lambda x: x, None)) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def parse_timestamp(self): input_timestamps = [ '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2016, 1, 12, 9, 19, 12, tzinfo=dateutil.tz.tzoffset(None, 3600)), datetime.datetime(2047, 1, 1, 8, 21, tzinfo=datetime.timezone.utc), datetime.datetime(2016, 1, 12, 9, 39, 2, tzinfo=datetime.timezone.utc), ] for ts, exp_date in zip(input_timestamps, output_dates): self.assertEquals(utils.parse_timestamp(ts), exp_date) @istest def enrich_release_0(self): # when actual_release = utils.enrich_release({}) # then self.assertEqual(actual_release, {}) @patch('swh.web.ui.utils.flask') @istest def enrich_release_1(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/content/sha1_git:123/' # when actual_release = utils.enrich_release({'target': '123', 'target_type': 'content'}) # then self.assertEqual(actual_release, { 'target': '123', 'target_type': 'content', 'target_url': '/api/1/content/sha1_git:123/' }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:123') @patch('swh.web.ui.utils.flask') @istest def enrich_release_2(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/dir/23/' # when actual_release = utils.enrich_release({'target': '23', 'target_type': 'directory'}) # then self.assertEqual(actual_release, { 'target': '23', 'target_type': 'directory', 'target_url': '/api/1/dir/23/' }) mock_flask.url_for.assert_called_once_with('api_directory', q='23') @patch('swh.web.ui.utils.flask') @istest def enrich_release_3(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/rev/3/' # when actual_release = utils.enrich_release({'target': '3', 'target_type': 'revision'}) # then self.assertEqual(actual_release, { 'target': '3', 'target_type': 'revision', 'target_url': '/api/1/rev/3/' }) mock_flask.url_for.assert_called_once_with('api_revision', sha1_git='3') @patch('swh.web.ui.utils.flask') @istest def enrich_release_4(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/rev/4/' # when actual_release = utils.enrich_release({'target': '4', 'target_type': 'release'}) # then self.assertEqual(actual_release, { 'target': '4', 'target_type': 'release', 'target_url': '/api/1/rev/4/' }) mock_flask.url_for.assert_called_once_with('api_release', sha1_git='4') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_no_type(self, mock_flask): # when/then self.assertEqual(utils.enrich_directory({'id': 'dir-id'}), {'id': 'dir-id'}) # given mock_flask.url_for.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'target': '123', }) # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'target': '123', 'target_url': '/api/content/sha1_git:123/', }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:123') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_with_context_and_type_file(self, mock_flask): # given mock_flask.url_for.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', }, context_url='/api/revision/revsha1/directory/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', 'target_url': '/api/content/sha1_git:123/', 'file_url': '/api/revision/revsha1/directory' '/prefix/path/hy/' }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:789') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_with_context_and_type_dir(self, mock_flask): # given mock_flask.url_for.return_value = '/api/directory/456/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'dir', 'name': 'emacs-42', 'target_type': 'file', 'target': '456', }, context_url='/api/revision/origin/2/directory/some/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'dir', 'target_type': 'file', 'name': 'emacs-42', 'target': '456', 'target_url': '/api/directory/456/', 'dir_url': '/api/revision/origin/2/directory' '/some/prefix/path/emacs-42/' }) mock_flask.url_for.assert_called_once_with('api_directory', sha1_git='456') @istest def enrich_content_without_hashes(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.ui.utils.flask') @istest def enrich_content_with_hashes(self, mock_flask): for h in ['sha1', 'sha256', 'sha1_git']: # given mock_flask.url_for.side_effect = [ '/api/content/%s:123/raw/' % h, '/api/filetype/%s:123/' % h, '/api/language/%s:123/' % h, '/api/license/%s:123/' % h, ] # when enriched_content = utils.enrich_content( { 'id': '123', h: 'blahblah' } ) # then self.assertEqual( enriched_content, { 'id': '123', h: 'blahblah', 'data_url': '/api/content/%s:123/raw/' % h, 'filetype_url': '/api/filetype/%s:123/' % h, 'language_url': '/api/language/%s:123/' % h, 'license_url': '/api/license/%s:123/' % h, } ) mock_flask.url_for.assert_has_calls([ call('api_content_raw', q='%s:blahblah' % h), call('api_content_filetype', q='%s:blahblah' % h), call('api_content_language', q='%s:blahblah' % h), call('api_content_license', q='%s:blahblah' % h), ]) mock_flask.reset() @patch('swh.web.ui.utils.flask') @istest def enrich_content_with_hashes_and_top_level_url(self, mock_flask): for h in ['sha1', 'sha256', 'sha1_git']: # given mock_flask.url_for.side_effect = [ '/api/content/%s:123/' % h, '/api/content/%s:123/raw/' % h, '/api/filetype/%s:123/' % h, '/api/language/%s:123/' % h, '/api/license/%s:123/' % h, ] # when enriched_content = utils.enrich_content( { 'id': '123', h: 'blahblah' }, top_url=True ) # then self.assertEqual( enriched_content, { 'id': '123', h: 'blahblah', 'content_url': '/api/content/%s:123/' % h, 'data_url': '/api/content/%s:123/raw/' % h, 'filetype_url': '/api/filetype/%s:123/' % h, 'language_url': '/api/language/%s:123/' % h, 'license_url': '/api/license/%s:123/' % h, } ) mock_flask.url_for.assert_has_calls([ call('api_content_metadata', q='%s:blahblah' % h), call('api_content_raw', q='%s:blahblah' % h), call('api_content_filetype', q='%s:blahblah' % h), call('api_content_language', q='%s:blahblah' % h), call('api_content_license', q='%s:blahblah' % h), ]) mock_flask.reset() @istest def enrich_entity_identity(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.ui.utils.flask') @istest def enrich_entity_with_sha1(self, mock_flask): # given def url_for_test(fn, **entity): return '/api/entity/' + entity['uuid'] + '/' mock_flask.url_for.side_effect = url_for_test # when actual_entity = utils.enrich_entity({ 'uuid': 'uuid-1', 'parent': 'uuid-parent', 'name': 'something' }) # then self.assertEqual(actual_entity, { 'uuid': 'uuid-1', 'uuid_url': '/api/entity/uuid-1/', 'parent': 'uuid-parent', 'parent_url': '/api/entity/uuid-parent/', 'name': 'something', }) mock_flask.url_for.assert_has_calls([call('api_entity_by_uuid', uuid='uuid-1'), call('api_entity_by_uuid', uuid='uuid-parent')]) @nottest def _url_for_context_test(self, fn, **data): if fn == 'api_revision': if 'context' in data and data['context'] is not None: return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa else: return '/api/revision/%s/' % data['sha1_git'] elif fn == 'api_revision_log': if 'prev_sha1s' in data: return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % data['sha1_git'] @patch('swh.web.ui.utils.flask') @istest def enrich_revision_without_children_or_parent(self, mock_flask): # given def url_for_test(fn, **data): if fn == 'api_revision': return '/api/revision/' + data['sha1_git'] + '/' elif fn == 'api_revision_log': return '/api/revision/' + data['sha1_git'] + '/log/' elif fn == 'api_directory': return '/api/directory/' + data['sha1_git'] + '/' elif fn == 'api_person': return '/api/person/' + data['person_id'] + '/' mock_flask.url_for.side_effect = url_for_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'directory': '123', 'author': {'id': '1'}, 'committer': {'id': '2'}, }) expected_revision = { 'id': 'rev-id', 'directory': '123', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'directory_url': '/api/directory/123/', 'author': {'id': '1'}, 'author_url': '/api/person/1/', 'committer': {'id': '2'}, 'committer_url': '/api/person/2/' } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_person', person_id='1'), call('api_person', person_id='2'), call('api_directory', sha1_git='123')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_children_and_parent_no_dir(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_no_context(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }) expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/'] } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision', sha1_git='123', context='rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_context_empty_prev_list(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'parents': ['123'], 'children': ['456']}, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_context_some_prev_list(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev1-rev/prev0-rev/log/'), 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/' 'prev1-rev/prev0-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev0-rev/prev/prev1-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456']}, context='prev1-rev/prev0-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev0-rev', context='prev1-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev1-rev/prev0-rev'), call('api_revision', sha1_git='123', context='prev1-rev/prev0-rev/rev-id'), call('api_revision', sha1_git='456')]) @nottest def _url_for_rev_message_test(self, fn, **data): if fn == 'api_revision': if 'context' in data and data['context'] is not None: return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa else: return '/api/revision/%s/' % data['sha1_git'] elif fn == 'api_revision_log': if 'prev_sha1s' in data and data['prev_sha1s'] is not None: return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % data['sha1_git'] elif fn == 'api_revision_raw_message': return '/api/revision/' + data['sha1_git'] + '/raw/' else: return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_no_message(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_rev_message_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_invalid_message(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_rev_message_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'message_decoding_failed': True, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'message_decoding_failed': True, 'message_url': '/api/revision/rev-id/raw/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @istest def to_url(self): self.assertEquals(utils.to_url('some-url?foo=bar', (('param', 10),)), 'some-url?foo=bar¶m=10') self.assertEquals(utils.to_url('some-url', (('foo', 'bar'), ('bar', 'barfoo'),)), 'some-url?foo=bar&bar=barfoo') + + @istest + def to_url_no_param(self): + self.assertEquals(utils.to_url('some-url?foo=bar', None), + 'some-url?foo=bar') + self.assertEquals(utils.to_url('some-url', None), + 'some-url') diff --git a/swh/web/ui/tests/views/test_api.py b/swh/web/ui/tests/views/test_api.py index f8c0e211..03caebbf 100644 --- a/swh/web/ui/tests/views/test_api.py +++ b/swh/web/ui/tests/views/test_api.py @@ -1,2396 +1,2386 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui.tests import test_app from swh.web.ui import exc from swh.web.ui.views import api from swh.web.ui.exc import NotFoundExc, BadInputExc from swh.storage.exc import StorageDBError, StorageAPIError class ApiTestCase(test_app.SWHApiTestCase): def setUp(self): self.origin_visit1 = { 'date': 1104616800.0, 'origin': 10, 'visit': 100, 'metadata': None, 'status': 'full', } self.origin1 = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } @istest def generic_api_lookup_nothing_is_found(self): # given def test_generic_lookup_fn(sha1, another_unused_arg): assert another_unused_arg == 'unused arg' assert sha1 == 'sha1' return None # when with self.assertRaises(NotFoundExc) as cm: api._api_lookup('sha1', test_generic_lookup_fn, 'This will be raised because None is returned.', lambda x: x, 'unused arg') self.assertIn('This will be raised because None is returned.', cm.exception.args[0]) @istest def generic_api_map_are_enriched_and_transformed_to_list(self): # given def test_generic_lookup_fn_1(criteria0, param0, param1): assert criteria0 == 'something' return map(lambda x: x + 1, [1, 2, 3]) # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_1, 'This is not the error message you are looking for. Move along.', lambda x: x * 2, 'some param 0', 'some param 1') self.assertEqual(actual_result, [4, 6, 8]) @istest def generic_api_list_are_enriched_too(self): # given def test_generic_lookup_fn_2(crit): assert crit == 'something' return ['a', 'b', 'c'] # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_2, 'Not the error message you are looking for, it is. ' 'Along, you move!', lambda x: ''. join(['=', x, '='])) self.assertEqual(actual_result, ['=a=', '=b=', '=c=']) @istest def generic_api_generator_are_enriched_and_returned_as_list(self): # given def test_generic_lookup_fn_3(crit): assert crit == 'crit' return (i for i in [4, 5, 6]) # when actual_result = api._api_lookup( 'crit', test_generic_lookup_fn_3, 'Move!', lambda x: x - 1) self.assertEqual(actual_result, [3, 4, 5]) @istest def generic_api_simple_data_are_enriched_and_returned_too(self): # given def test_generic_lookup_fn_4(crit): assert crit == '123' return {'a': 10} def test_enrich_data(x): x['a'] = x['a'] * 10 return x # when actual_result = api._api_lookup( '123', test_generic_lookup_fn_4, 'Nothing to do', test_enrich_data) self.assertEqual(actual_result, {'a': 100}) @patch('swh.web.ui.views.api.service') @istest def api_content_filetype(self, mock_service): stub_filetype = { 'mimetype': 'application/xml', 'encoding': 'ascii', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', } mock_service.lookup_content_filetype.return_value = stub_filetype # when rv = self.app.get( '/api/1/filetype/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'mimetype': 'application/xml', 'encoding': 'ascii', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_filetype.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_filetype_sha_not_found(self, mock_service): # given mock_service.lookup_content_filetype.return_value = None # when rv = self.app.get( '/api/1/filetype/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No filetype information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_filetype.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_language(self, mock_service): stub_language = { 'lang': 'lisp', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', } mock_service.lookup_content_language.return_value = stub_language # when rv = self.app.get( '/api/1/language/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'lang': 'lisp', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_language.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_language_sha_not_found(self, mock_service): # given mock_service.lookup_content_language.return_value = None # when rv = self.app.get( '/api/1/language/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No language information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_language.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_symbol(self, mock_service): stub_ctag = [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.app.get('/api/1/content/symbol/foo/?last_sha1=sha1') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'data_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'license_url': '/api/1/license/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'filetype_url': '/api/1/filetype/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }]) actual_headers = dict(rv.headers) self.assertFalse('Link' in actual_headers) mock_service.lookup_expression.assert_called_once_with( 'foo', 'sha1', 10) @patch('swh.web.ui.views.api.service') @istest def api_content_symbol_2(self, mock_service): stub_ctag = [{ 'sha1': '12371b8614fcd89ccd17ca2b1d9e66c5b00a6456', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, }, { 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6678', 'name': 'foo', 'kind': 'Lisp', 'line': 10, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.app.get( '/api/1/content/symbol/foo/?last_sha1=prev-sha1&per_page=2') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_ctag) actual_headers = dict(rv.headers) self.assertEquals( actual_headers['Link'], '; rel="next"') # noqa mock_service.lookup_expression.assert_called_once_with( 'foo', 'prev-sha1', 2) @patch('swh.web.ui.views.api.service') # @istest def api_content_symbol_3(self, mock_service): stub_ctag = [{ 'sha1': '67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foo', 'kind': 'variable', 'line': 100, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.app.get('/api/1/content/symbol/foo/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'sha1': '67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foo', 'kind': 'variable', 'line': 100, 'content_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'data_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'license_url': '/api/1/license/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'filetype_url': '/api/1/filetype/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }]) actual_headers = dict(rv.headers) self.assertEquals( actual_headers['Link'], '') mock_service.lookup_expression.assert_called_once_with('foo', None, 10) @patch('swh.web.ui.views.api.service') @istest def api_content_symbol_not_found(self, mock_service): # given mock_service.lookup_expression.return_value = [] # when rv = self.app.get('/api/1/content/symbol/bar/?last_sha1=hash') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No indexed raw content match expression \'bar\'.' }) actual_headers = dict(rv.headers) self.assertFalse('Link' in actual_headers) mock_service.lookup_expression.assert_called_once_with( 'bar', 'hash', 10) @patch('swh.web.ui.views.api.service') @istest def api_content_ctags(self, mock_service): stub_ctags = { 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'ctags': [] } mock_service.lookup_content_ctags.return_value = stub_ctags # when rv = self.app.get( '/api/1/ctags/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'ctags': [], 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_ctags.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_license(self, mock_service): stub_license = { 'licenses': ['No_license_found', 'Apache-2.0'], 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'tool_name': 'nomos', } mock_service.lookup_content_license.return_value = stub_license # when rv = self.app.get( '/api/1/license/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'licenses': ['No_license_found', 'Apache-2.0'], 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'tool_name': 'nomos', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_license.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_license_sha_not_found(self, mock_service): # given mock_service.lookup_content_license.return_value = None # when rv = self.app.get( '/api/1/license/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No license information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_license.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_provenance(self, mock_service): stub_provenances = [{ 'origin': 1, 'visit': 2, 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }] mock_service.lookup_content_provenance.return_value = stub_provenances # when rv = self.app.get( '/api/1/provenance/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'origin': 1, 'visit': 2, 'origin_url': '/api/1/origin/1/', 'origin_visits_url': '/api/1/origin/1/visits/', 'origin_visit_url': '/api/1/origin/1/visit/2/', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'revision_url': '/api/1/revision/' 'b04caf10e9535160d90e874b45aa426de762f19f/', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }]) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_provenance_sha_not_found(self, mock_service): # given mock_service.lookup_content_provenance.return_value = None # when rv = self.app.get( '/api/1/provenance/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_metadata(self, mock_service): # given mock_service.lookup_content.return_value = { 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560' 'cde9b067a4f', 'length': 17, 'status': 'visible' } # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'data_url': '/api/1/content/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'filetype_url': '/api/1/filetype/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'license_url': '/api/1/license/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c' 'de9b067a4f', 'length': 17, 'status': 'visible' }) mock_service.lookup_content.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_not_found_as_json(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.ui.views.api.service') @istest def api_content_not_found_as_yaml(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/', headers={'accept': 'application/yaml'}) self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.ui.views.api.service') @istest def api_content_raw_ko_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_raw(self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/', headers={'Content-type': 'application/octet-stream', 'Content-disposition': 'attachment'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_check_content_known(self, mock_service): # given mock_service.lookup_multiple_hashes.return_value = [ {'found': True, 'filename': None, 'sha1': 'sha1:blah'} ] expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 100}, 'search_res': [{'filename': None, 'sha1': 'sha1:blah', 'found': True}] } # when rv = self.app.get('/api/1/content/known/sha1:blah/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:blah'}]) @patch('swh.web.ui.views.api.service') @istest def api_check_content_known_as_yaml(self, mock_service): # given mock_service.lookup_multiple_hashes.return_value = [ {'found': True, 'filename': None, 'sha1': 'sha1:halb'}, {'found': False, 'filename': None, 'sha1': 'sha1_git:hello'} ] expected_result = { 'search_stats': {'nbfiles': 2, 'pct': 50}, 'search_res': [{'filename': None, 'sha1': 'sha1:halb', 'found': True}, {'filename': None, 'sha1': 'sha1_git:hello', 'found': False}] } # when rv = self.app.get('/api/1/content/known/sha1:halb,sha1_git:hello/', headers={'Accept': 'application/yaml'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:halb'}, {'filename': None, 'sha1': 'sha1_git:hello'}]) @patch('swh.web.ui.views.api.service') @istest def api_check_content_known_post_as_yaml(self, mock_service): # given stub_result = [{'filename': None, 'sha1': '7e62b1fe10c88a3eddbba930b156bee2956b2435', 'found': True}, {'filename': 'filepath', 'sha1': '8e62b1fe10c88a3eddbba930b156bee2956b2435', 'found': True}, {'filename': 'filename', 'sha1': '64025b5d1520c615061842a6ce6a456cad962a3f', 'found': False}] mock_service.lookup_multiple_hashes.return_value = stub_result expected_result = { 'search_stats': {'nbfiles': 3, 'pct': 2/3 * 100}, 'search_res': stub_result } # when rv = self.app.post( '/api/1/content/known/', headers={'Accept': 'application/yaml'}, data=dict( q='7e62b1fe10c88a3eddbba930b156bee2956b2435', filepath='8e62b1fe10c88a3eddbba930b156bee2956b2435', filename='64025b5d1520c615061842a6ce6a456cad962a3f') ) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) @patch('swh.web.ui.views.api.service') @istest def api_check_content_known_not_found(self, mock_service): # given stub_result = [{'filename': None, 'sha1': 'sha1:halb', 'found': False}] mock_service.lookup_multiple_hashes.return_value = stub_result expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 0}, 'search_res': stub_result } # when rv = self.app.get('/api/1/content/known/sha1:halb/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:halb'}]) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_error(self, mock_service): # given mock_service.stat_counters.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_db(self, mock_service): # given mock_service.stat_counters.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_api(self, mock_service): # given mock_service.stat_counters.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters(self, mock_service): # given stub_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_service.stat_counters.return_value = stub_stats # when rv = self.app.get('/api/1/stat/counters/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_stats) mock_service.stat_counters.assert_called_once_with() @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_error(self, mock_service): # given mock_service.lookup_origin_visits.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_swh_storage_error_db( self, mock_service): # given mock_service.lookup_origin_visits.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_swh_storage_error_api( self, mock_service): # given mock_service.lookup_origin_visits.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits(self, mock_service): # given stub_visits = [ { 'date': 1293919200.0, 'origin': 1, 'visit': 2 }, { 'date': 1420149600.0, 'origin': 1, 'visit': 3 } ] mock_service.lookup_origin_visits.return_value = stub_visits # when rv = self.app.get('/api/1/origin/2/visits/?per_page=2&last_visit=1') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [ { 'date': 1293919200.0, 'origin': 1, 'visit': 2, 'origin_visit_url': '/api/1/origin/1/visit/2/', }, { 'date': 1420149600.0, 'origin': 1, 'visit': 3, 'origin_visit_url': '/api/1/origin/1/visit/3/', } ]) mock_service.lookup_origin_visits.assert_called_once_with( 2, last_visit=1, per_page=2) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visit(self, mock_service): # given origin_visit = self.origin_visit1.copy() origin_visit.update({ 'occurrences': { 'master': { 'target_type': 'revision', 'target': 'revision-id', } } }) mock_service.lookup_origin_visit.return_value = origin_visit expected_origin_visit = self.origin_visit1.copy() expected_origin_visit.update({ 'origin_url': '/api/1/origin/10/', 'occurrences': { 'master': { 'target_type': 'revision', 'target': 'revision-id', 'target_url': '/api/1/revision/revision-id/' } } }) # when rv = self.app.get('/api/1/origin/10/visit/100/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin_visit) mock_service.lookup_origin_visit.assert_called_once_with(10, 100) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visit_not_found(self, mock_service): # given mock_service.lookup_origin_visit.return_value = None # when rv = self.app.get('/api/1/origin/1/visit/1000/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No visit 1000 for origin 1 found' }) mock_service.lookup_origin_visit.assert_called_once_with(1, 1000) @patch('swh.web.ui.views.api.service') @istest def api_origin_by_id(self, mock_service): # given mock_service.lookup_origin.return_value = self.origin1 expected_origin = self.origin1.copy() expected_origin.update({ 'origin_visits_url': '/api/1/origin/1234/visits/' }) # when rv = self.app.get('/api/1/origin/1234/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin) mock_service.lookup_origin.assert_called_with({'id': 1234}) @patch('swh.web.ui.views.api.service') @istest def api_origin_by_type_url(self, mock_service): # given stub_origin = self.origin1.copy() stub_origin.update({ 'id': 987 }) mock_service.lookup_origin.return_value = stub_origin expected_origin = stub_origin.copy() expected_origin.update({ 'origin_visits_url': '/api/1/origin/987/visits/' }) # when rv = self.app.get('/api/1/origin/ftp/url/ftp://some/url/to/origin/0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin) mock_service.lookup_origin.assert_called_with( {'url': 'ftp://some/url/to/origin/0', 'type': 'ftp'}) @patch('swh.web.ui.views.api.service') @istest def api_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.app.get('/api/1/origin/4321/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Origin with id 4321 not found.' }) mock_service.lookup_origin.assert_called_with({'id': 4321}) @patch('swh.web.ui.views.api.service') @istest def api_release(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', 'target_url': '/api/1/revision/revision-sha1/', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.views.api.service') @istest def api_release_target_type_not_a_revision(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.views.api.service') @istest def api_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Release with sha1_git release-0 not found.' }) @patch('swh.web.ui.views.api.service') @istest def api_revision(self, mock_service): # given stub_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } mock_service.lookup_revision.return_value = stub_revision expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e' 'ff7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6' 'a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' ], 'parent_urls': [ '/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_revision) mock_service.lookup_revision.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.app.get('/api/1/revision/revision-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git revision-0 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ok(self, mock_service): # given stub_revision = {'message': 'synthetic revision message'} mock_service.lookup_revision_message.return_value = stub_revision # when rv = self.app.get('/api/1/revision/18d8be353ed3480476f032475e7c2' '33eff7371d5/raw/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, b'synthetic revision message') mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ok_no_msg(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No message for revision') # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No message for revision'}) self.assertEquals mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ko_no_rev(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No revision found') # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No revision found'}) mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin_not_found(self, mock_service): mock_service.lookup_revision_by.return_value = None rv = self.app.get('/api/1/revision/origin/123/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertIn('Revision with (origin_id: 123', response_data['error']) self.assertIn('not found', response_data['error']) mock_service.lookup_revision_by.assert_called_once_with( 123, 'refs/heads/master', None) @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin(self, mock_service): mock_revision = { 'id': '32', 'directory': '21', 'message': 'message 1', 'type': 'deb', } expected_revision = { 'id': '32', 'url': '/api/1/revision/32/', 'history_url': '/api/1/revision/32/log/', 'directory': '21', 'directory_url': '/api/1/directory/21/', 'message': 'message 1', 'type': 'deb', } mock_service.lookup_revision_by.return_value = mock_revision rv = self.app.get('/api/1/revision/origin/1/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/heads/master', None) @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin_and_branch_name(self, mock_service): mock_revision = { 'id': '12', 'directory': '23', 'message': 'message 2', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '12', 'url': '/api/1/revision/12/', 'history_url': '/api/1/revision/12/log/', 'directory': '23', 'directory_url': '/api/1/directory/23/', 'message': 'message 2', 'type': 'tar', } rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', None) @patch('swh.web.ui.views.api.service') @patch('swh.web.ui.views.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp(self, mock_utils, mock_service): mock_revision = { 'id': '123', 'directory': '456', 'message': 'message 3', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '123', 'url': '/api/1/revision/123/', 'history_url': '/api/1/revision/123/log/', 'directory': '456', 'directory_url': '/api/1/directory/456/', 'message': 'message 3', 'type': 'tar', } mock_utils.parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs/origin/dev' '/ts/1452591542/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with('1452591542') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.ui.views.api.service') @patch('swh.web.ui.views.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes( self, mock_utils, mock_service): mock_revision = { 'id': '999', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '999', 'url': '/api/1/revision/999/', 'history_url': '/api/1/revision/999/log/', } mock_utils.parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs%2Forigin%2Fdev' '/ts/Today%20is%20' 'January%201,%202047%20at%208:21:00AM/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with( 'Today is January 1, 2047 at 8:21:00AM') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_ko_raise(self, mock_service): # given mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa # when with self.assertRaises(NotFoundExc): api._revision_directory_by( {'sha1_git': 'id'}, None, '/api/1/revision/sha1/directory/') # then mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'id'}, None, limit=100, with_data=False) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_type_dir(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) # when actual_dir_content = api._revision_directory_by( {'sha1_git': 'blah-id'}, 'some/path', '/api/1/revision/sha1/directory/') # then self.assertEquals(actual_dir_content, { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'blah-id'}, 'some/path', limit=100, with_data=False) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_type_file(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) # when actual_dir_content = api._revision_directory_by( {'sha1_git': 'sha1'}, 'some/path', '/api/1/revision/origin/2/directory/', limit=1000, with_data=True) # then self.assertEquals(actual_dir_content, { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'sha1'}, 'some/path', limit=1000, with_data=True) @patch('swh.web.ui.views.api.utils') @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_directory_through_revision_origin_ko_not_found(self, mock_rev_dir, mock_utils): mock_rev_dir.side_effect = NotFoundExc('not found') mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00' rv = self.app.get('/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': 'not found'}) mock_rev_dir.assert_called_once_with( {'origin_id': 10, 'branch_name': 'refs/remote/origin/dev', 'ts': '2012-10-20 00:00:00'}, None, '/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_directory_through_revision_origin(self, mock_revision_dir): expected_res = [{ 'id': '123' }] mock_revision_dir.return_value = expected_res rv = self.app.get('/api/1/revision/origin/3/directory/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_res) mock_revision_dir.assert_called_once_with({ 'origin_id': 3, 'branch_name': 'refs/heads/master', 'ts': None}, None, '/api/1/revision/origin/3/directory/', with_data=False) @patch('swh.web.ui.views.api.service') @istest def api_revision_log(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] - expected_response = { - 'revisions': expected_revisions, - 'next_revs_url': None - } - # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) - self.assertEquals(response_data, expected_response) + self.assertEquals(response_data, expected_revisions) + self.assertIsNone(rv.headers.get('Link')) mock_service.lookup_revision_log.assert_called_once_with( - '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) + '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 11) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] - expected_response = { - 'revisions': expected_revisions, - 'next_revs_url': '/api/1/revision/25/log/' - } - # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' - 'b7e2a44e6/log/') + 'b7e2a44e6/log/?per_page=25') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) - self.assertEquals(response_data, expected_response) + self.assertEquals(response_data, expected_revisions) + self.assertEquals(rv.headers['Link'], + '; rel="next"') mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_not_found(self, mock_service): # given mock_service.lookup_revision_log.return_value = None # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7' 'e2a44e6/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git' ' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'}) + self.assertIsNone(rv.headers.get('Link')) mock_service.lookup_revision_log.assert_called_once_with( - '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) + '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 11) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_context(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions mock_service.lookup_revision_multiple.return_value = [{ 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'type': 'tar', 'synthetic': True, }] expected_revisions = [ { 'url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'history_url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/', 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory_url': '/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'parent_urls': [ '/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' '/prev/7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/' ], 'type': 'tar', 'synthetic': True, }, { 'url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/log/', 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] - expected_response = { - 'revisions': expected_revisions, - 'next_revs_url': None - } - # when rv = self.app.get('/api/1/revision/18d8be353ed3480476f0' '32475e7c233eff7371d5/prev/prev-rev/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) - self.assertEquals(response_data, expected_response) + self.assertEquals(response_data, expected_revisions) + self.assertIsNone(rv.headers.get('Link')) mock_service.lookup_revision_log.assert_called_once_with( - '18d8be353ed3480476f032475e7c233eff7371d5', 26) + '18d8be353ed3480476f032475e7c233eff7371d5', 11) mock_service.lookup_revision_multiple.assert_called_once_with( ['prev-rev']) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log_by.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_result = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log_by.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] expected_response = { 'revisions': expected_revisions, 'next_revs_url': '/api/1/revision/25/log/' } # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by_norev(self, mock_service): # given mock_service.lookup_revision_log_by.side_effect = NotFoundExc( 'No revision') # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'error': 'No revision'}) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_history(self, mock_service): # for readability purposes, we use: # - sha1 as 3 letters (url are way too long otherwise to respect pep8) # - only keys with modification steps (all other keys are kept as is) # given stub_revision = { 'id': '883', 'children': ['777', '999'], 'parents': [], 'directory': '272' } mock_service.lookup_revision.return_value = stub_revision # then rv = self.app.get('/api/1/revision/883/prev/999/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'id': '883', 'url': '/api/1/revision/883/', 'history_url': '/api/1/revision/883/log/', 'history_context_url': '/api/1/revision/883/prev/999/log/', 'children': ['777', '999'], 'children_urls': ['/api/1/revision/777/', '/api/1/revision/999/'], 'parents': [], 'parent_urls': [], 'directory': '272', 'directory_url': '/api/1/directory/272/' }) mock_service.lookup_revision.assert_called_once_with('883') @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.app.get('/api/1/revision/999/directory/some/path/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.app.get(url) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) @patch('swh.web.ui.views.api.service') @istest def api_person(self, mock_service): # given stub_person = { 'id': '198003', 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', } mock_service.lookup_person.return_value = stub_person # when rv = self.app.get('/api/1/person/198003/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_person) @patch('swh.web.ui.views.api.service') @istest def api_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.app.get('/api/1/person/666/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Person with id 666 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_directory(self, mock_service): # given stub_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', }] expected_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', 'target_url': '/api/1/directory/8be353ed3480476f032475e7c233eff737123456/', }] mock_service.lookup_directory.return_value = stub_directories # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_directories) mock_service.lookup_directory.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_directory_not_found(self, mock_service): # given mock_service.lookup_directory.return_value = [] # when rv = self.app.get('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_directory_with_path_found(self, mock_service): # given expected_dir = { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'name': 'bla', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', } mock_service.lookup_directory_with_path.return_value = expected_dir # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/bla/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_dir) mock_service.lookup_directory_with_path.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5', 'bla') @patch('swh.web.ui.views.api.service') @istest def api_directory_with_path_not_found(self, mock_service): # given mock_service.lookup_directory_with_path.return_value = None path = 'some/path/to/dir/' # when rv = self.app.get(('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/%s') % path) path = path.strip('/') # Path stripped of lead/trail separators # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': (('Entry with path %s relative to ' 'directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.') % path)}) @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid_not_found(self, mock_service): # when mock_service.lookup_entity_by_uuid.return_value = [] # when rv = self.app.get('/api/1/entity/' '5f4d4c51-498a-4e28-88b3-b3e4e8396cba/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': "Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " + "found."}) mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid_bad_request(self, mock_service): # when mock_service.lookup_entity_by_uuid.side_effect = BadInputExc( 'bad input: uuid malformed!') # when rv = self.app.get('/api/1/entity/uuid malformed/') self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'bad input: uuid malformed!'}) mock_service.lookup_entity_by_uuid.assert_called_once_with( 'uuid malformed') @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid(self, mock_service): # when stub_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' } ] mock_service.lookup_entity_by_uuid.return_value = stub_entities expected_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-' '785107f598e4/', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' } ] # when rv = self.app.get('/api/1/entity' '/34bd6b1b-463f-43e5-a697-785107f598e4/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_entities) mock_service.lookup_entity_by_uuid.assert_called_once_with( '34bd6b1b-463f-43e5-a697-785107f598e4') class ApiUtils(unittest.TestCase): @istest def api_lookup_not_found(self): # when with self.assertRaises(exc.NotFoundExc) as e: api._api_lookup('something', lambda x: None, 'this is the error message raised as it is None') self.assertEqual(e.exception.args[0], 'this is the error message raised as it is None') @istest def api_lookup_with_result(self): # when actual_result = api._api_lookup('something', lambda x: x + '!', 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, 'something!') @istest def api_lookup_with_result_as_map(self): # when actual_result = api._api_lookup([1, 2, 3], lambda x: map(lambda y: y+1, x), 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, [2, 3, 4]) diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py index 678d61fc..3303af0c 100644 --- a/swh/web/ui/utils.py +++ b/swh/web/ui/utils.py @@ -1,397 +1,399 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import flask import re from dateutil import parser from urllib.parse import urlencode def to_url(url, params): """Compute a url with a new param. """ + if not params: + return url end_url = urlencode(params) if '?' in url: return '%s&%s' % (url, end_url) return '%s?%s' % (url, end_url) def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def fmap(f, data): """Map f to data at each level. This must keep the origin data structure type: - map -> map - dict -> dict - list -> list - None -> None Args: f: function that expects one argument. data: data to traverse to apply the f function. list, map, dict or bare value. Returns: The same data-structure with modified values by the f function. """ if not data: return data if isinstance(data, map): return map(lambda y: fmap(f, y), (x for x in data)) if isinstance(data, list): return [fmap(f, x) for x in data] if isinstance(data, dict): return {k: fmap(f, v) for (k, v) in data.items()} return f(data) def prepare_data_for_view(data, encoding='utf-8'): def prepare_data(s): # Note: can only be 'data' key with bytes of raw content if isinstance(s, bytes): try: return s.decode(encoding) except: return "Cannot decode the data bytes, try and set another " \ "encoding in the url (e.g. ?encoding=utf8) or " \ "download directly the " \ "content's raw data." if isinstance(s, str): return re.sub(r'/api/1/', r'/browse/', s) return s return fmap(prepare_data, data) def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return map(lambda x: filter_field_keys(x, field_keys), data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: filter_field_keys(v, field_keys) for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as datetime. Returns: a timezone-aware datetime representing the parsed value. If the parsed value doesn't specify a timezone, UTC is assumed. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ default_timestamp = datetime.datetime.utcfromtimestamp(0).replace( tzinfo=datetime.timezone.utc) try: res = parser.parse(timestamp, ignoretz=False, fuzzy=True, default=default_timestamp) except: res = datetime.datetime.utcfromtimestamp(float(timestamp)).replace( tzinfo=datetime.timezone.utc) return res def enrich_object(object): """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) Returns: Object enriched with target_url pointing to the right swh.web.ui.api urls for the pointing object (revision, release, content, directory) """ obj = object.copy() if 'target' in obj and 'target_type' in obj: if obj['target_type'] == 'revision': obj['target_url'] = flask.url_for('api_revision', sha1_git=obj['target']) elif obj['target_type'] == 'release': obj['target_url'] = flask.url_for('api_release', sha1_git=obj['target']) elif obj['target_type'] == 'content': obj['target_url'] = flask.url_for( 'api_content_metadata', q='sha1_git:' + obj['target']) elif obj['target_type'] == 'directory': obj['target_url'] = flask.url_for('api_directory', q=obj['target']) return obj enrich_release = enrich_object def enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = flask.url_for('api_content_metadata', q='sha1_git:%s' % target) if context_url: directory['file_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = flask.url_for('api_directory', sha1_git=target) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' return directory def enrich_metadata_endpoint(content): """Enrich metadata endpoint with link to the upper metadata endpoint. """ c = content.copy() c['content_url'] = flask.url_for('api_content_metadata', q='sha1:%s' % c['id']) return c def enrich_content(content, top_url=False): """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information """ for h in ['sha1', 'sha1_git', 'sha256']: if h in content: q = '%s:%s' % (h, content[h]) if top_url: content['content_url'] = flask.url_for('api_content_metadata', q=q) content['data_url'] = flask.url_for('api_content_raw', q=q) content['filetype_url'] = flask.url_for('api_content_filetype', q=q) content['language_url'] = flask.url_for('api_content_language', q=q) content['license_url'] = flask.url_for('api_content_license', q=q) break return content def enrich_entity(entity): """Enrich entity with """ if 'uuid' in entity: entity['uuid_url'] = flask.url_for('api_entity_by_uuid', uuid=entity['uuid']) if 'parent' in entity and entity['parent']: entity['parent_url'] = flask.url_for('api_entity_by_uuid', uuid=entity['parent']) return entity def _get_path_list(path_string): """Helper for enrich_revision: get a list of the sha1 id of the navigation breadcrumbs, ordered from the oldest to the most recent. Args: path_string: the path as a '/'-separated string Returns: The navigation context as a list of sha1 revision ids """ return path_string.split('/') def _get_revision_contexts(rev_id, context): """Helper for enrich_revision: retrieve for the revision id and potentially the navigation breadcrumbs the context to pass to parents and children of of the revision. Args: rev_id: the revision's sha1 id context: the current navigation context Returns: The context for parents, children and the url of the direct child as a tuple in that order. """ context_for_parents = None context_for_children = None url_direct_child = None if not context: return (rev_id, None, None) path_list = _get_path_list(context) context_for_parents = '%s/%s' % (context, rev_id) prev_for_children = path_list[:-1] if len(prev_for_children) > 0: context_for_children = '/'.join(prev_for_children) child_id = path_list[-1] # This commit is not the first commit in the path if context_for_children: url_direct_child = flask.url_for( 'api_revision', sha1_git=child_id, context=context_for_children) # This commit is the first commit in the path else: url_direct_child = flask.url_for( 'api_revision', sha1_git=child_id) return (context_for_parents, context_for_children, url_direct_child) def _make_child_url(rev_children, context): """Helper for enrich_revision: retrieve the list of urls corresponding to the children of the current revision according to the navigation breadcrumbs. Args: rev_children: a list of revision id context: the '/'-separated navigation breadcrumbs Returns: the list of the children urls according to the context """ children = [] for child in rev_children: if context and child != _get_path_list(context)[-1]: children.append(flask.url_for('api_revision', sha1_git=child)) elif not context: children.append(flask.url_for('api_revision', sha1_git=child)) return children def enrich_revision(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict context: the navigation breadcrumbs as a /-separated string of revision sha1_git """ ctx_parents, ctx_children, url_direct_child = _get_revision_contexts( revision['id'], context) revision['url'] = flask.url_for('api_revision', sha1_git=revision['id']) revision['history_url'] = flask.url_for('api_revision_log', sha1_git=revision['id']) if context: revision['history_context_url'] = flask.url_for( 'api_revision_log', sha1_git=revision['id'], prev_sha1s=context) if 'author' in revision: author = revision['author'] revision['author_url'] = flask.url_for('api_person', person_id=author['id']) if 'committer' in revision: committer = revision['committer'] revision['committer_url'] = flask.url_for('api_person', person_id=committer['id']) if 'directory' in revision: revision['directory_url'] = flask.url_for( 'api_directory', sha1_git=revision['directory']) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append(flask.url_for('api_revision', sha1_git=parent, context=ctx_parents)) revision['parent_urls'] = parents if 'children' in revision: children = _make_child_url(revision['children'], context) if url_direct_child: children.append(url_direct_child) revision['children_urls'] = children else: if url_direct_child: revision['children_urls'] = [url_direct_child] if 'message_decoding_failed' in revision: revision['message_url'] = flask.url_for( 'api_revision_raw_message', sha1_git=revision['id']) return revision diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py index ba5c86d0..bb996376 100644 --- a/swh/web/ui/views/api.py +++ b/swh/web/ui/views/api.py @@ -1,1077 +1,1089 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from flask import request, url_for from swh.web.ui import service, utils, apidoc as doc from swh.web.ui.exc import NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') @doc.route('/api/1/stat/counters/', noargs=True) @doc.returns(rettype=doc.rettypes.dict, retdoc="A dictionary of SWH's most important statistics") def api_stats(): """Return statistics on SWH storage. """ return service.stat_counters() @app.route('/api/1/origin//visits/') @doc.route('/api/1/origin/visits/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.header('Link', doc="""Optional 'Link' header proposed to the api consumer for navigation purpose. Possible value is 'next'.""") @doc.param('last_visit', default=None, doc="""Last visit from which starting the next result page.""") @doc.param('per_page', default=10, doc="""Return the number of results for one page.""") @doc.returns(rettype=doc.rettypes.list, retdoc="""All instances of visits of the origin pointed by origin_id as POSIX time since epoch (if visit_id is not defined) """) def api_origin_visits(origin_id): """Return a list of origin visit (dict) for that particular origin including date (visit date as posix timestamp), target, target_type, status, ... """ result = {} per_page = int(request.args.get('per_page', '10')) last_visit = request.args.get('last_visit') if last_visit: last_visit = int(last_visit) def _lookup_origin_visits( origin_id, last_visit=last_visit, per_page=per_page): print(origin_id, last_visit, per_page) return service.lookup_origin_visits( origin_id, last_visit=last_visit, per_page=per_page) def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_visit_url'] = url_for('api_origin_visit', origin_id=ov['origin'], visit_id=ov['visit']) return ov r = _api_lookup( origin_id, _lookup_origin_visits, error_msg_if_not_found='No origin %s found' % origin_id, enrich_fn=_enrich_origin_visit) if r: l = len(r) url = url_for('api_origin_visits', origin_id=origin_id) headers = {} if l == per_page: new_last_visit = r[-1]['visit'] params = [('last_visit', new_last_visit)] nb_per_page = request.args.get('per_page') if nb_per_page: params.append(('per_page', nb_per_page)) headers['link-next'] = utils.to_url(url, params) result['headers'] = headers result.update({ 'results': r }) return result @app.route('/api/1/origin//visit//') @doc.route('/api/1/origin/visit/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.arg('visit_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin visit identifier') @doc.raises(exc=doc.excs.notfound, doc='Raised if no visit that match the query is found') @doc.returns(rettype=doc.rettypes.list, retdoc="""The single instance visit visit_id of the origin pointed by origin_id as POSIX time since epoch""") def api_origin_visit(origin_id, visit_id): """Return origin visit (dict) for that particular origin including (but not limited to) date (visit date as posix timestamp), target, target_type, status, ... """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_url'] = url_for('api_origin', origin_id=ov['origin']) if 'occurrences' in ov: ov['occurrences'] = { k: utils.enrich_object(v) for k, v in ov['occurrences'].items() } return ov return _api_lookup( origin_id, service.lookup_origin_visit, 'No visit %s for origin %s found' % (visit_id, origin_id), _enrich_origin_visit, visit_id) @app.route('/api/1/content/symbol/', methods=['POST']) @app.route('/api/1/content/symbol//') @doc.route('/api/1/content/symbol/', tags=['upcoming']) @doc.arg('q', default='hello', argtype=doc.argtypes.str, argdoc="""An expression string to lookup in swh's raw content""") @doc.header('Link', doc="""Optional 'Link' header proposed to the api consumer for navigation purpose. possible are 'next' or 'previous' page.""") @doc.param('last_sha1', default=None, doc="""Optional parameter to start returning page results from.""") @doc.param('per_page', default=10, doc="""Optional parameter to limit the number of data per page to retrieve on a per request basis. The default is 10, up to 50 max.""") @doc.returns(rettype=doc.rettypes.list, retdoc="""A list of dict whose content matches the expression. Each dict has the following keys: - id (bytes): identifier of the content - name (text): symbol whose content match the expression - kind (text): kind of the symbol that matched - lang (text): Language for that entry - line (int): Number line for the symbol The result is paginated by page of 10 results by default. The 'Link' header gives the link to follow for the next page. """) def api_content_symbol(q=None): """Search symbol in indexed content's data. The result is paginated and and Link header will give the next and previous link to have the next result. """ result = {} last_sha1 = request.args.get('last_sha1', None) per_page = int(request.args.get('per_page', '10')) def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page): return service.lookup_expression(exp, last_sha1, per_page) symbols = _api_lookup( q, lookup_fn=lookup_exp, error_msg_if_not_found='No indexed raw content match expression \'' '%s\'.' % q, enrich_fn=lambda x: utils.enrich_content(x, top_url=True)) if symbols: l = len(symbols) url = url_for('api_content_symbol', q=q) headers = {} if l == per_page: new_last_sha1 = symbols[-1]['sha1'] params = [('last_sha1', new_last_sha1)] nb_per_page = request.args.get('per_page') if nb_per_page: params.append(('per_page', nb_per_page)) headers['link-next'] = utils.to_url(url, params) result['headers'] = headers result.update({ 'results': symbols }) return result @app.route('/api/1/content/known/', methods=['POST']) @app.route('/api/1/content/known//') @doc.route('/api/1/content/known/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @doc.param('q', default=None, doc="""(POST request) An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.returns(rettype=doc.rettypes.dict, retdoc="""A dict with keys: - search_res: a list of dicts corresponding to queried content with key 'found' to True if found, 'False' if not - search_stats: a dict containing number of files searched and percentage of files found """) def api_check_content_known(q=None): """Search a content per hash. This may take the form of: - a GET request with many hashes (limited to the size of parameter we can pass in url) a POST request with many hashes, with the - request body containing identifiers (typically filenames) as - keys and corresponding hashes as values. """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None queries = [] # GET: Many hash separated values request if q: hashes = q.split(',') for v in hashes: queries.append({'filename': None, 'sha1': v}) # POST: Many hash requests in post form submission elif request.method == 'POST': data = request.form # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if queries: lookup = service.lookup_multiple_hashes(queries) result = [] l = len(queries) for el in lookup: result.append({'filename': el['filename'], 'sha1': el['sha1'], 'found': el['found']}) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = l search_stats['pct'] = (nbfound / l) * 100 response['search_res'] = search_res response['search_stats'] = search_stats return response def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): return [enrich_fn(x) for x in res] return enrich_fn(res) @app.route('/api/1/origin//') @app.route('/api/1/origin//url//') @doc.route('/api/1/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The origin's SWH origin_id.") @doc.arg('origin_type', default='git', argtype=doc.argtypes.str, argdoc="The origin's type (git, svn..)") @doc.arg('origin_url', default='https://github.com/hylang/hy', argtype=doc.argtypes.path, argdoc="The origin's URL.") @doc.raises(exc=doc.excs.notfound, doc='Raised if origin_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the origin identified by origin_id') def api_origin(origin_id=None, origin_type=None, origin_url=None): """Return information about the origin matching the passed criteria. Criteria may be: - An SWH-specific ID, if you already know it - An origin type and its URL, if you do not have the origin's SWH identifier """ ori_dict = { 'id': origin_id, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} if 'id' in ori_dict: error_msg = 'Origin with id %s not found.' % ori_dict['id'] else: error_msg = 'Origin with type %s and URL %s not found' % ( ori_dict['type'], ori_dict['url']) def _enrich_origin(origin): if 'id' in origin: o = origin.copy() o['origin_visits_url'] = url_for('api_origin_visits', origin_id=o['id']) return o return origin return _api_lookup( ori_dict, lookup_fn=service.lookup_origin, error_msg_if_not_found=error_msg, enrich_fn=_enrich_origin) @app.route('/api/1/person//') @doc.route('/api/1/person/') @doc.arg('person_id', default=1, argtype=doc.argtypes.int, argdoc="The person's SWH identifier") @doc.raises(exc=doc.excs.notfound, doc='Raised if person_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the person identified by person_id') def api_person(person_id): """Return information about person with identifier person_id. """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release//') @doc.route('/api/1/release/') @doc.arg('sha1_git', default='97d8dcd0c589b1d94a5d26cf0c1e8f2f44b92bfd', argtype=doc.argtypes.sha1_git, argdoc="The release's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if the argument is not a sha1') @doc.raises(exc=doc.excs.notfound, doc='Raised if sha1_git does not correspond to a release in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the release identified by sha1_git') def api_release(sha1_git): """Return information about release with id sha1_git. """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_release) def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) else: # content result['content'] = utils.enrich_content(content) return result @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') @doc.route('/api/1/revision/origin/directory/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's origin's SWH identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""Optional timestamp (default to the nearest time crawl of timestamp)""") @doc.arg('path', default='Dockerfile', argtype=doc.argtypes.path, argdoc='The path to the directory or file to display') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the passed criteria was not found""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision corresponding to the passed criteria""") def api_directory_through_revision_origin(origin_id, branch_name="refs/heads/master", ts=None, path=None, with_data=False): """Display directory or content information through a revision identified by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _revision_directory_by( { 'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') @doc.route('/api/1/revision/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The queried revision's origin identifier in SWH") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master)""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="The time at which the queried revision should be constrained") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision identified by the given criteria""") def api_revision_with_origin(origin_id, branch_name="refs/heads/master", ts=None): """Display revision information through its identification by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), utils.enrich_revision, branch_name, ts) @app.route('/api/1/revision//') @app.route('/api/1/revision//prev//') @doc.route('/api/1/revision/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier") @doc.arg('context', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the revision identified by sha1_git') def api_revision(sha1_git, context=None): """Return information about revision with id sha1_git. """ def _enrich_revision(revision, context=context): return utils.enrich_revision(revision, context) return _api_lookup( sha1_git, service.lookup_revision, 'Revision with sha1_git %s not found.' % sha1_git, _enrich_revision) @app.route('/api/1/revision//raw/') @doc.route('/api/1/revision/raw/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The queried revision's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc="""The message of the revision identified by sha1_git as a downloadable octet stream""") def api_revision_raw_message(sha1_git): """Return the raw data of the message of revision identified by sha1_git """ raw = service.lookup_revision_message(sha1_git) return app.response_class(raw['message'], headers={'Content-disposition': 'attachment;' 'filename=rev_%s_raw' % sha1_git}, mimetype='application/octet-stream') @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') @doc.route('/api/1/revision/directory/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier.") @doc.arg('dir_path', default='Documentation/BUG-HUNTING', argtype=doc.argtypes.path, argdoc='The path from the top level directory') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching sha1_git was not found in SWH , or if the path specified does not exist""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the directory pointed by revision id sha1-git and dir_path""") def api_revision_directory(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). """ return _revision_directory_by( { 'sha1_git': sha1_git }, dir_path, request.path, with_data=with_data) @app.route('/api/1/revision//log/') @app.route('/api/1/revision//prev//log/') @doc.route('/api/1/revision/log/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's identifier queried") @doc.arg('prev_sha1s', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc="""(Optional) Navigation breadcrumbs (descendant revisions previously visited). If multiple values, use / as delimiter. """) +@doc.header('Link', + doc="""Optional 'Link' header proposed to the api consumer + for navigation purpose. Possible value is 'next'.""") +@doc.param('per_page', default=10, + doc="""Default parameter used to group result by page of the specified +number.""") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git or prev_sha1s is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The log data starting at the revision identified by sha1_git, completed with the navigation breadcrumbs, if any""") def api_revision_log(sha1_git, prev_sha1s=None): - """Show all revisions (~git log) starting from sha1_git. - The first element returned is the given sha1_git, or the first - breadcrumb, if any. + """Return all revisions (~git log) starting from sha1_git. The first + element returned is the given sha1_git, or the first breadcrumb, + if any. The result is paginated. To browse for the following revisions, - use the link mentioned in the 'next_revs_url' key. + follow the link mentioned in the responder header 'Link'. """ - limit = app.config['conf']['max_log_revs'] - - response = {'revisions': None, 'next_revs_url': None} - revisions = None - next_revs_url = None + result = {} + per_page = int(request.args.get('per_page', '10')) - def lookup_revision_log_with_limit(s, limit=limit+1): + def lookup_revision_log_with_limit(s, limit=per_page+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) - if len(rev_get) == limit+1: + l = len(rev_get) + if l == per_page+1: rev_backward = rev_get[:-1] - next_revs_url = url_for('api_revision_log', - sha1_git=rev_get[-1]['id']) + url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) + + nb_per_page = request.args.get('per_page') + params = [] + if nb_per_page: + params.append(('per_page', per_page)) + + headers = {} + headers['link-next'] = utils.to_url(url, params) + result['headers'] = headers else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = _api_lookup(rev_forward_ids, lookup_fn=service.lookup_revision_multiple, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward - response['revisions'] = revisions - response['next_revs_url'] = next_revs_url - - return response + result.update({ + 'results': revisions + }) + return result @app.route('/api/1/revision' '/origin//log/') @app.route('/api/1/revision' '/origin/' '/branch//log/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//log/') @app.route('/api/1/revision' '/origin/' '/ts//log/') @doc.route('/api/1/revision/origin/log/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's SWH origin identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""(Optional) The revision's branch name within the origin specified. Defaults to 'refs/heads/master'.""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""(Optional) A time or timestamp string to parse""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision log starting at the revision matching the given criteria.""") def api_revision_log_by(origin_id, branch_name='refs/heads/master', ts=None): """Show all revisions (~git log) starting from the revision targeted by the origin_id provided and optionally a branch name or/and a timestamp. The result is paginated. To browse the following revisions, use the link mentioned in the 'next_revs_url' key. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} next_revs_url = None if ts: ts = utils.parse_timestamp(ts) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = _api_lookup(origin_id, lookup_revision_log_by_with_limit, error_msg, utils.enrich_revision, branch_name, ts) if len(rev_get) == limit+1: revisions = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: revisions = rev_get response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/directory//') @app.route('/api/1/directory///') @doc.route('/api/1/directory/') @doc.arg('sha1_git', default='1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8', argtype=doc.argtypes.sha1_git, argdoc="The queried directory's corresponding sha1_git hash") @doc.arg('path', default='codec/demux', argtype=doc.argtypes.path, argdoc="A path relative to the queried directory's top level") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a directory matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata and contents of the directory identified by sha1_git""") def api_directory(sha1_git, path=None): """Return information about directory with id sha1_git. """ if path: error_msg_path = ('Entry with path %s relative to directory ' 'with sha1_git %s not found.') % (path, sha1_git) return _api_lookup( sha1_git, service.lookup_directory_with_path, error_msg_path, utils.enrich_directory, path) else: error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, service.lookup_directory, error_msg_nopath, utils.enrich_directory) @app.route('/api/1/provenance//') @doc.route('/api/1/provenance/', tags=['hidden']) @doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""List of provenance information (dict) for the matched content.""") def api_content_provenance(q): """Return content's provenance information if any. """ def _enrich_revision(provenance): p = provenance.copy() p['revision_url'] = url_for('api_revision', sha1_git=provenance['revision']) p['content_url'] = url_for('api_content_metadata', q='sha1_git:%s' % provenance['content']) p['origin_url'] = url_for('api_origin', origin_id=provenance['origin']) p['origin_visits_url'] = url_for('api_origin_visits', origin_id=provenance['origin']) p['origin_visit_url'] = url_for('api_origin_visit', origin_id=provenance['origin'], visit_id=provenance['visit']) return p return _api_lookup( q, lookup_fn=service.lookup_content_provenance, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=_enrich_revision) @app.route('/api/1/filetype//') @doc.route('/api/1/filetype/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Filetype information (dict) for the matched content.""") def api_content_filetype(q): """Return content's filetype information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_filetype, error_msg_if_not_found='No filetype information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/language//') @doc.route('/api/1/language/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Language information (dict) for the matched content.""") def api_content_language(q): """Return content's language information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_language, error_msg_if_not_found='No language information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/license//') @doc.route('/api/1/license/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""License information (dict) for the matched content.""") def api_content_license(q): """Return content's license information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_license, error_msg_if_not_found='No license information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/ctags//') @doc.route('/api/1/ctags/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Ctags symbol (dict) for the matched content.""") def api_content_ctags(q): """Return content's ctags symbols if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_ctags, error_msg_if_not_found='No ctags symbol found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/content//raw/') @doc.route('/api/1/content/raw/', tags=['hidden']) @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc='The raw content data as an octet stream') def api_content_raw(q): """Return content's raw data if content is found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return app.response_class(generate(content), headers={'Content-disposition': 'attachment;' 'filename=content_%s_raw' % q}, mimetype='application/octet-stream') @app.route('/api/1/content//') @doc.route('/api/1/content/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the content identified by q. If content decoding was successful, it also returns the data""") def api_content_metadata(q): """Return content information if content is found. """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=utils.enrich_content) @app.route('/api/1/entity//') @doc.route('/api/1/entity/', tags=['hidden']) @doc.arg('uuid', default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba', argtype=doc.argtypes.uuid, argdoc="The entity's uuid identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if uuid is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if an entity matching uuid was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the entity identified by uuid') def api_entity_by_uuid(uuid): """Return content information if content is found. """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=utils.enrich_entity)