diff --git a/swh/web/ui/tests/views/test_api.py b/swh/web/ui/tests/views/test_api.py index 413e6b51b..079f74b57 100644 --- a/swh/web/ui/tests/views/test_api.py +++ b/swh/web/ui/tests/views/test_api.py @@ -1,2416 +1,2416 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui.tests import test_app from swh.web.ui import exc from swh.web.ui.views import api from swh.web.ui.exc import NotFoundExc, BadInputExc from swh.storage.exc import StorageDBError, StorageAPIError class ApiTestCase(test_app.SWHApiTestCase): def setUp(self): self.origin_visit1 = { 'date': 1104616800.0, 'origin': 10, 'visit': 100, 'metadata': None, 'status': 'full', } self.origin1 = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } @istest def generic_api_lookup_nothing_is_found(self): # given def test_generic_lookup_fn(sha1, another_unused_arg): assert another_unused_arg == 'unused arg' assert sha1 == 'sha1' return None # when with self.assertRaises(NotFoundExc) as cm: api._api_lookup('sha1', test_generic_lookup_fn, 'This will be raised because None is returned.', lambda x: x, 'unused arg') self.assertIn('This will be raised because None is returned.', cm.exception.args[0]) @istest def generic_api_map_are_enriched_and_transformed_to_list(self): # given def test_generic_lookup_fn_1(criteria0, param0, param1): assert criteria0 == 'something' return map(lambda x: x + 1, [1, 2, 3]) # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_1, 'This is not the error message you are looking for. Move along.', lambda x: x * 2, 'some param 0', 'some param 1') self.assertEqual(actual_result, [4, 6, 8]) @istest def generic_api_list_are_enriched_too(self): # given def test_generic_lookup_fn_2(crit): assert crit == 'something' return ['a', 'b', 'c'] # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_2, 'Not the error message you are looking for, it is. ' 'Along, you move!', lambda x: ''. join(['=', x, '='])) self.assertEqual(actual_result, ['=a=', '=b=', '=c=']) @istest def generic_api_generator_are_enriched_and_returned_as_list(self): # given def test_generic_lookup_fn_3(crit): assert crit == 'crit' return (i for i in [4, 5, 6]) # when actual_result = api._api_lookup( 'crit', test_generic_lookup_fn_3, 'Move!', lambda x: x - 1) self.assertEqual(actual_result, [3, 4, 5]) @istest def generic_api_simple_data_are_enriched_and_returned_too(self): # given def test_generic_lookup_fn_4(crit): assert crit == '123' return {'a': 10} def test_enrich_data(x): x['a'] = x['a'] * 10 return x # when actual_result = api._api_lookup( '123', test_generic_lookup_fn_4, 'Nothing to do', test_enrich_data) self.assertEqual(actual_result, {'a': 100}) @patch('swh.web.ui.views.api.service') @istest def api_content_filetype(self, mock_service): stub_filetype = { 'mimetype': 'application/xml', 'encoding': 'ascii', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', } mock_service.lookup_content_filetype.return_value = stub_filetype # when rv = self.app.get( '/api/1/filetype/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'mimetype': 'application/xml', 'encoding': 'ascii', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_filetype.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_filetype_sha_not_found(self, mock_service): # given mock_service.lookup_content_filetype.return_value = None # when rv = self.app.get( '/api/1/filetype/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No filetype information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_filetype.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_language(self, mock_service): stub_language = { 'lang': 'lisp', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', } mock_service.lookup_content_language.return_value = stub_language # when rv = self.app.get( '/api/1/language/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'lang': 'lisp', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_language.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_language_sha_not_found(self, mock_service): # given mock_service.lookup_content_language.return_value = None # when rv = self.app.get( '/api/1/language/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No language information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_language.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_symbol(self, mock_service): stub_ctag = [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.app.get('/api/1/content/symbol/foo/?last_sha1=sha1') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'data_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'license_url': '/api/1/license/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'filetype_url': '/api/1/filetype/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }]) actual_headers = dict(rv.headers) self.assertFalse('Link' in actual_headers) mock_service.lookup_expression.assert_called_once_with( 'foo', 'sha1', 10) @patch('swh.web.ui.views.api.service') @istest def api_content_symbol_2(self, mock_service): stub_ctag = [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.app.get( '/api/1/content/symbol/foo/?last_sha1=prev-sha1&per_page=1') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'data_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'license_url': '/api/1/license/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'filetype_url': '/api/1/filetype/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }]) actual_headers = dict(rv.headers) self.assertEquals( actual_headers['Link'], '; rel="next"') # noqa mock_service.lookup_expression.assert_called_once_with( 'foo', 'prev-sha1', 1) @patch('swh.web.ui.views.api.service') # @istest def api_content_symbol_3(self, mock_service): stub_ctag = [{ 'sha1': '67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foo', 'kind': 'variable', 'line': 100, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.app.get('/api/1/content/symbol/foo/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'sha1': '67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foo', 'kind': 'variable', 'line': 100, 'content_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'data_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'license_url': '/api/1/license/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'filetype_url': '/api/1/filetype/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }]) actual_headers = dict(rv.headers) self.assertEquals( actual_headers['Link'], '') mock_service.lookup_expression.assert_called_once_with('foo', None, 10) @patch('swh.web.ui.views.api.service') @istest def api_content_symbol_not_found(self, mock_service): # given mock_service.lookup_expression.return_value = [] # when rv = self.app.get('/api/1/content/symbol/bar/?last_sha1=hash') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No indexed raw content match expression \'bar\'.' }) actual_headers = dict(rv.headers) self.assertFalse('Link' in actual_headers) mock_service.lookup_expression.assert_called_once_with( 'bar', 'hash', 10) @patch('swh.web.ui.views.api.service') @istest def api_content_ctags(self, mock_service): stub_ctags = { 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'ctags': [] } mock_service.lookup_content_ctags.return_value = stub_ctags # when rv = self.app.get( '/api/1/ctags/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'ctags': [], 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_ctags.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_license(self, mock_service): stub_license = { 'licenses': ['No_license_found', 'Apache-2.0'], 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'tool_name': 'nomos', } mock_service.lookup_content_license.return_value = stub_license # when rv = self.app.get( '/api/1/license/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'licenses': ['No_license_found', 'Apache-2.0'], 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'tool_name': 'nomos', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_license.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_license_sha_not_found(self, mock_service): # given mock_service.lookup_content_license.return_value = None # when rv = self.app.get( '/api/1/license/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No license information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_license.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_provenance(self, mock_service): stub_provenances = [{ 'origin': 1, 'visit': 2, 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }] mock_service.lookup_content_provenance.return_value = stub_provenances # when rv = self.app.get( '/api/1/provenance/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'origin': 1, 'visit': 2, 'origin_url': '/api/1/origin/1/', 'origin_visits_url': '/api/1/origin/1/visits/', 'origin_visit_url': '/api/1/origin/1/visit/2/', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'revision_url': '/api/1/revision/' 'b04caf10e9535160d90e874b45aa426de762f19f/', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }]) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_provenance_sha_not_found(self, mock_service): # given mock_service.lookup_content_provenance.return_value = None # when rv = self.app.get( '/api/1/provenance/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_metadata(self, mock_service): # given mock_service.lookup_content.return_value = { 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560' 'cde9b067a4f', 'length': 17, 'status': 'visible' } # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'data_url': '/api/1/content/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'filetype_url': '/api/1/filetype/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'license_url': '/api/1/license/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c' 'de9b067a4f', 'length': 17, 'status': 'visible' }) mock_service.lookup_content.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_not_found_as_json(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.ui.views.api.service') @istest def api_content_not_found_as_yaml(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/', headers={'accept': 'application/yaml'}) self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.ui.views.api.service') @istest def api_content_raw_ko_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_raw(self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/', headers={'Content-type': 'application/octet-stream', 'Content-disposition': 'attachment'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest - def api_search(self, mock_service): + def api_check_content_known(self, mock_service): # given mock_service.lookup_multiple_hashes.return_value = [ {'found': True, 'filename': None, 'sha1': 'sha1:blah'} ] expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 100}, 'search_res': [{'filename': None, 'sha1': 'sha1:blah', 'found': True}] } # when - rv = self.app.get('/api/1/content/search/sha1:blah/') + rv = self.app.get('/api/1/content/known/sha1:blah/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:blah'}]) @patch('swh.web.ui.views.api.service') @istest - def api_search_as_yaml(self, mock_service): + def api_check_content_known_as_yaml(self, mock_service): # given mock_service.lookup_multiple_hashes.return_value = [ {'found': True, 'filename': None, 'sha1': 'sha1:halb'}, {'found': False, 'filename': None, 'sha1': 'sha1_git:hello'} ] expected_result = { 'search_stats': {'nbfiles': 2, 'pct': 50}, 'search_res': [{'filename': None, 'sha1': 'sha1:halb', 'found': True}, {'filename': None, 'sha1': 'sha1_git:hello', 'found': False}] } # when - rv = self.app.get('/api/1/content/search/sha1:halb,sha1_git:hello/', + rv = self.app.get('/api/1/content/known/sha1:halb,sha1_git:hello/', headers={'Accept': 'application/yaml'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:halb'}, {'filename': None, 'sha1': 'sha1_git:hello'}]) @patch('swh.web.ui.views.api.service') @istest - def api_search_post_as_yaml(self, mock_service): + def api_check_content_known_post_as_yaml(self, mock_service): # given stub_result = [{'filename': None, 'sha1': '7e62b1fe10c88a3eddbba930b156bee2956b2435', 'found': True}, {'filename': 'filepath', 'sha1': '8e62b1fe10c88a3eddbba930b156bee2956b2435', 'found': True}, {'filename': 'filename', 'sha1': '64025b5d1520c615061842a6ce6a456cad962a3f', 'found': False}] mock_service.lookup_multiple_hashes.return_value = stub_result expected_result = { 'search_stats': {'nbfiles': 3, 'pct': 2/3 * 100}, 'search_res': stub_result } # when rv = self.app.post( - '/api/1/content/search/', + '/api/1/content/known/', headers={'Accept': 'application/yaml'}, data=dict( q='7e62b1fe10c88a3eddbba930b156bee2956b2435', filepath='8e62b1fe10c88a3eddbba930b156bee2956b2435', filename='64025b5d1520c615061842a6ce6a456cad962a3f') ) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) @patch('swh.web.ui.views.api.service') @istest - def api_search_not_found(self, mock_service): + def api_check_content_known_not_found(self, mock_service): # given stub_result = [{'filename': None, 'sha1': 'sha1:halb', 'found': False}] mock_service.lookup_multiple_hashes.return_value = stub_result expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 0}, 'search_res': stub_result } # when - rv = self.app.get('/api/1/content/search/sha1:halb/') + rv = self.app.get('/api/1/content/known/sha1:halb/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:halb'}]) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_error(self, mock_service): # given mock_service.stat_counters.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_db(self, mock_service): # given mock_service.stat_counters.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_api(self, mock_service): # given mock_service.stat_counters.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters(self, mock_service): # given stub_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_service.stat_counters.return_value = stub_stats # when rv = self.app.get('/api/1/stat/counters/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_stats) mock_service.stat_counters.assert_called_once_with() @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_error(self, mock_service): # given mock_service.lookup_origin_visits.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_swh_storage_error_db( self, mock_service): # given mock_service.lookup_origin_visits.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_swh_storage_error_api( self, mock_service): # given mock_service.lookup_origin_visits.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits(self, mock_service): # given stub_visits = [ { 'date': 1104616800.0, 'origin': 1, 'visit': 1 }, { 'date': 1293919200.0, 'origin': 1, 'visit': 2 }, { 'date': 1420149600.0, 'origin': 1, 'visit': 3 } ] mock_service.lookup_origin_visits.return_value = stub_visits # when rv = self.app.get('/api/1/origin/2/visits/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [ { 'date': 1104616800.0, 'origin': 1, 'visit': 1, 'origin_visit_url': '/api/1/origin/1/visit/1/', }, { 'date': 1293919200.0, 'origin': 1, 'visit': 2, 'origin_visit_url': '/api/1/origin/1/visit/2/', }, { 'date': 1420149600.0, 'origin': 1, 'visit': 3, 'origin_visit_url': '/api/1/origin/1/visit/3/', } ]) mock_service.lookup_origin_visits.assert_called_once_with(2) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visit(self, mock_service): # given origin_visit = self.origin_visit1.copy() origin_visit.update({ 'occurrences': { 'master': { 'target_type': 'revision', 'target': 'revision-id', } } }) mock_service.lookup_origin_visit.return_value = origin_visit expected_origin_visit = self.origin_visit1.copy() expected_origin_visit.update({ 'origin_url': '/api/1/origin/10/', 'occurrences': { 'master': { 'target_type': 'revision', 'target': 'revision-id', 'target_url': '/api/1/revision/revision-id/' } } }) # when rv = self.app.get('/api/1/origin/10/visit/100/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin_visit) mock_service.lookup_origin_visit.assert_called_once_with(10, 100) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visit_not_found(self, mock_service): # given mock_service.lookup_origin_visit.return_value = None # when rv = self.app.get('/api/1/origin/1/visit/1000/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No visit 1000 for origin 1 found' }) mock_service.lookup_origin_visit.assert_called_once_with(1, 1000) @patch('swh.web.ui.views.api.service') @istest def api_origin_by_id(self, mock_service): # given mock_service.lookup_origin.return_value = self.origin1 expected_origin = self.origin1.copy() expected_origin.update({ 'origin_visits_url': '/api/1/origin/1234/visits/' }) # when rv = self.app.get('/api/1/origin/1234/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin) mock_service.lookup_origin.assert_called_with({'id': 1234}) @patch('swh.web.ui.views.api.service') @istest def api_origin_by_type_url(self, mock_service): # given stub_origin = self.origin1.copy() stub_origin.update({ 'id': 987 }) mock_service.lookup_origin.return_value = stub_origin expected_origin = stub_origin.copy() expected_origin.update({ 'origin_visits_url': '/api/1/origin/987/visits/' }) # when rv = self.app.get('/api/1/origin/ftp/url/ftp://some/url/to/origin/0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin) mock_service.lookup_origin.assert_called_with( {'url': 'ftp://some/url/to/origin/0', 'type': 'ftp'}) @patch('swh.web.ui.views.api.service') @istest def api_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.app.get('/api/1/origin/4321/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Origin with id 4321 not found.' }) mock_service.lookup_origin.assert_called_with({'id': 4321}) @patch('swh.web.ui.views.api.service') @istest def api_release(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', 'target_url': '/api/1/revision/revision-sha1/', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.views.api.service') @istest def api_release_target_type_not_a_revision(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.views.api.service') @istest def api_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Release with sha1_git release-0 not found.' }) @patch('swh.web.ui.views.api.service') @istest def api_revision(self, mock_service): # given stub_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } mock_service.lookup_revision.return_value = stub_revision expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e' 'ff7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6' 'a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' ], 'parent_urls': [ '/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_revision) mock_service.lookup_revision.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.app.get('/api/1/revision/revision-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git revision-0 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ok(self, mock_service): # given stub_revision = {'message': 'synthetic revision message'} mock_service.lookup_revision_message.return_value = stub_revision # when rv = self.app.get('/api/1/revision/18d8be353ed3480476f032475e7c2' '33eff7371d5/raw/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, b'synthetic revision message') mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ok_no_msg(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No message for revision') # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No message for revision'}) self.assertEquals mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ko_no_rev(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No revision found') # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No revision found'}) mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin_not_found(self, mock_service): mock_service.lookup_revision_by.return_value = None rv = self.app.get('/api/1/revision/origin/123/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertIn('Revision with (origin_id: 123', response_data['error']) self.assertIn('not found', response_data['error']) mock_service.lookup_revision_by.assert_called_once_with( 123, 'refs/heads/master', None) @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin(self, mock_service): mock_revision = { 'id': '32', 'directory': '21', 'message': 'message 1', 'type': 'deb', } expected_revision = { 'id': '32', 'url': '/api/1/revision/32/', 'history_url': '/api/1/revision/32/log/', 'directory': '21', 'directory_url': '/api/1/directory/21/', 'message': 'message 1', 'type': 'deb', } mock_service.lookup_revision_by.return_value = mock_revision rv = self.app.get('/api/1/revision/origin/1/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/heads/master', None) @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin_and_branch_name(self, mock_service): mock_revision = { 'id': '12', 'directory': '23', 'message': 'message 2', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '12', 'url': '/api/1/revision/12/', 'history_url': '/api/1/revision/12/log/', 'directory': '23', 'directory_url': '/api/1/directory/23/', 'message': 'message 2', 'type': 'tar', } rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', None) @patch('swh.web.ui.views.api.service') @patch('swh.web.ui.views.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp(self, mock_utils, mock_service): mock_revision = { 'id': '123', 'directory': '456', 'message': 'message 3', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '123', 'url': '/api/1/revision/123/', 'history_url': '/api/1/revision/123/log/', 'directory': '456', 'directory_url': '/api/1/directory/456/', 'message': 'message 3', 'type': 'tar', } mock_utils.parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs/origin/dev' '/ts/1452591542/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with('1452591542') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.ui.views.api.service') @patch('swh.web.ui.views.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes( self, mock_utils, mock_service): mock_revision = { 'id': '999', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '999', 'url': '/api/1/revision/999/', 'history_url': '/api/1/revision/999/log/', } mock_utils.parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs%2Forigin%2Fdev' '/ts/Today%20is%20' 'January%201,%202047%20at%208:21:00AM/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with( 'Today is January 1, 2047 at 8:21:00AM') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_ko_raise(self, mock_service): # given mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa # when with self.assertRaises(NotFoundExc): api._revision_directory_by( {'sha1_git': 'id'}, None, '/api/1/revision/sha1/directory/') # then mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'id'}, None, limit=100, with_data=False) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_type_dir(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) # when actual_dir_content = api._revision_directory_by( {'sha1_git': 'blah-id'}, 'some/path', '/api/1/revision/sha1/directory/') # then self.assertEquals(actual_dir_content, { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'blah-id'}, 'some/path', limit=100, with_data=False) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_type_file(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) # when actual_dir_content = api._revision_directory_by( {'sha1_git': 'sha1'}, 'some/path', '/api/1/revision/origin/2/directory/', limit=1000, with_data=True) # then self.assertEquals(actual_dir_content, { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'sha1'}, 'some/path', limit=1000, with_data=True) @patch('swh.web.ui.views.api.utils') @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_directory_through_revision_origin_ko_not_found(self, mock_rev_dir, mock_utils): mock_rev_dir.side_effect = NotFoundExc('not found') mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00' rv = self.app.get('/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': 'not found'}) mock_rev_dir.assert_called_once_with( {'origin_id': 10, 'branch_name': 'refs/remote/origin/dev', 'ts': '2012-10-20 00:00:00'}, None, '/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_directory_through_revision_origin(self, mock_revision_dir): expected_res = [{ 'id': '123' }] mock_revision_dir.return_value = expected_res rv = self.app.get('/api/1/revision/origin/3/directory/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_res) mock_revision_dir.assert_called_once_with({ 'origin_id': 3, 'branch_name': 'refs/heads/master', 'ts': None}, None, '/api/1/revision/origin/3/directory/', with_data=False) @patch('swh.web.ui.views.api.service') @istest def api_revision_log(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_response = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] expected_response = { 'revisions': expected_revisions, 'next_revs_url': '/api/1/revision/25/log/' } # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_not_found(self, mock_service): # given mock_service.lookup_revision_log.return_value = None # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7' 'e2a44e6/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git' ' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'}) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_context(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions mock_service.lookup_revision_multiple.return_value = [{ 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'type': 'tar', 'synthetic': True, }] expected_revisions = [ { 'url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'history_url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/', 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory_url': '/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'parent_urls': [ '/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' '/prev/7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/' ], 'type': 'tar', 'synthetic': True, }, { 'url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/log/', 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_response = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/18d8be353ed3480476f0' '32475e7c233eff7371d5/prev/prev-rev/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5', 26) mock_service.lookup_revision_multiple.assert_called_once_with( ['prev-rev']) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log_by.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_result = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log_by.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] expected_response = { 'revisions': expected_revisions, 'next_revs_url': '/api/1/revision/25/log/' } # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by_norev(self, mock_service): # given mock_service.lookup_revision_log_by.side_effect = NotFoundExc( 'No revision') # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'error': 'No revision'}) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_history(self, mock_service): # for readability purposes, we use: # - sha1 as 3 letters (url are way too long otherwise to respect pep8) # - only keys with modification steps (all other keys are kept as is) # given stub_revision = { 'id': '883', 'children': ['777', '999'], 'parents': [], 'directory': '272' } mock_service.lookup_revision.return_value = stub_revision # then rv = self.app.get('/api/1/revision/883/prev/999/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'id': '883', 'url': '/api/1/revision/883/', 'history_url': '/api/1/revision/883/log/', 'history_context_url': '/api/1/revision/883/prev/999/log/', 'children': ['777', '999'], 'children_urls': ['/api/1/revision/777/', '/api/1/revision/999/'], 'parents': [], 'parent_urls': [], 'directory': '272', 'directory_url': '/api/1/directory/272/' }) mock_service.lookup_revision.assert_called_once_with('883') @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.app.get('/api/1/revision/999/directory/some/path/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.app.get(url) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) @patch('swh.web.ui.views.api.service') @istest def api_person(self, mock_service): # given stub_person = { 'id': '198003', 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', } mock_service.lookup_person.return_value = stub_person # when rv = self.app.get('/api/1/person/198003/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_person) @patch('swh.web.ui.views.api.service') @istest def api_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.app.get('/api/1/person/666/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Person with id 666 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_directory(self, mock_service): # given stub_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', }] expected_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', 'target_url': '/api/1/directory/8be353ed3480476f032475e7c233eff737123456/', }] mock_service.lookup_directory.return_value = stub_directories # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_directories) mock_service.lookup_directory.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_directory_not_found(self, mock_service): # given mock_service.lookup_directory.return_value = [] # when rv = self.app.get('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_directory_with_path_found(self, mock_service): # given expected_dir = { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'name': 'bla', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', } mock_service.lookup_directory_with_path.return_value = expected_dir # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/bla/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_dir) mock_service.lookup_directory_with_path.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5', 'bla') @patch('swh.web.ui.views.api.service') @istest def api_directory_with_path_not_found(self, mock_service): # given mock_service.lookup_directory_with_path.return_value = None path = 'some/path/to/dir/' # when rv = self.app.get(('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/%s') % path) path = path.strip('/') # Path stripped of lead/trail separators # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': (('Entry with path %s relative to ' 'directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.') % path)}) @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid_not_found(self, mock_service): # when mock_service.lookup_entity_by_uuid.return_value = [] # when rv = self.app.get('/api/1/entity/' '5f4d4c51-498a-4e28-88b3-b3e4e8396cba/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': "Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " + "found."}) mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid_bad_request(self, mock_service): # when mock_service.lookup_entity_by_uuid.side_effect = BadInputExc( 'bad input: uuid malformed!') # when rv = self.app.get('/api/1/entity/uuid malformed/') self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'bad input: uuid malformed!'}) mock_service.lookup_entity_by_uuid.assert_called_once_with( 'uuid malformed') @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid(self, mock_service): # when stub_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' } ] mock_service.lookup_entity_by_uuid.return_value = stub_entities expected_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-' '785107f598e4/', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' } ] # when rv = self.app.get('/api/1/entity' '/34bd6b1b-463f-43e5-a697-785107f598e4/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_entities) mock_service.lookup_entity_by_uuid.assert_called_once_with( '34bd6b1b-463f-43e5-a697-785107f598e4') class ApiUtils(unittest.TestCase): @istest def api_lookup_not_found(self): # when with self.assertRaises(exc.NotFoundExc) as e: api._api_lookup('something', lambda x: None, 'this is the error message raised as it is None') self.assertEqual(e.exception.args[0], 'this is the error message raised as it is None') @istest def api_lookup_with_result(self): # when actual_result = api._api_lookup('something', lambda x: x + '!', 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, 'something!') @istest def api_lookup_with_result_as_map(self): # when actual_result = api._api_lookup([1, 2, 3], lambda x: map(lambda y: y+1, x), 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, [2, 3, 4]) diff --git a/swh/web/ui/tests/views/test_browse.py b/swh/web/ui/tests/views/test_browse.py index d1ced636b..19ac830a1 100644 --- a/swh/web/ui/tests/views/test_browse.py +++ b/swh/web/ui/tests/views/test_browse.py @@ -1,2103 +1,2104 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from unittest import TestCase from unittest.mock import patch from flask import url_for from swh.web.ui.views import browse from swh.web.ui.exc import BadInputExc, NotFoundExc from .. import test_app class FileMock(): def __init__(self, filename): self.filename = filename class StaticViews(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.apidoc.APIUrls') @istest def browse_api_endpoints(self, mock_api_urls): # given endpoints = { '/a/doc/endpoint/': 'relevant documentation', '/some/other/endpoint/': 'more docstrings'} mock_api_urls.apidoc_routes = endpoints # when rv = self.client.get('/api/1/') # then self.assertEquals(rv.status_code, 200) self.assertIsNotNone( self.get_context_variable('doc_routes'), sorted(endpoints.items()) ) self.assert_template_used('api-endpoints.html') @istest def browse_api_doc(self): # given # when rv = self.client.get('/api/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('api.html') @istest def browse_archive(self): # when rv = self.client.get('/browse/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('browse.html') class SearchRedirectsView(test_app.SWHViewTestCase): render_template = False @istest def search_origin_simple(self): # when rv = self.client.get('/origin/search/?origin_id=1&meaningless_arg=42') # then self.assertRedirects(rv, url_for('browse_origin', origin_id=1)) @istest def search_origin_type_url(self): # when rv = self.client.get('/origin/search/?origin_type=git' '&origin_url=http://cool/project/url' '&meaningless_arg=42') # then self.assertRedirects(rv, url_for('browse_origin', origin_type='git', origin_url='http://cool/project/url')) @istest def search_directory_dir_sha1(self): # when rv = self.client.get('/directory/search/?sha1_git=some_sha1' '&path=some/path/in/folder' '&meaningless_arg=gandalf') # then self.assertRedirects(rv, url_for('browse_directory', sha1_git='some_sha1', path='some/path/in/folder')) @istest def search_directory_dir_sha1_nopath(self): # when rv = self.client.get('/directory/search/?sha1_git=some_sha1' '&meaningless_arg=gandalf') # then self.assertRedirects(rv, url_for('browse_directory', sha1_git='some_sha1')) @istest def search_directory_rev_sha1(self): # when rv = self.client.get('/directory/search/?sha1_git=some_sha1' '&dir_path=some/path/in/folder' '&meaningless_arg=gandalf') # then self.assertRedirects(rv, url_for('browse_revision_directory', sha1_git='some_sha1', dir_path='some/path/in/folder')) @istest def search_directory_rev_sha1_nopath(self): # when rv = self.client.get('/directory/search/?sha1_git=some_sha1' '&dir_path=' '&meaningless_arg=gandalf') # then self.assertRedirects(rv, url_for('browse_revision_directory', sha1_git='some_sha1')) @istest def search_directory_dir_time_place(self): # when rv = self.client.get('/directory/search/?origin_id=42' '&branch_name=refs/heads/tail' '&meaningless_arg=gandalf' '&path=some/path') # then self.assertRedirects(rv, url_for( 'browse_revision_directory_through_origin', origin_id=42, branch_name='refs/heads/tail', path='some/path', ts=None)) @istest def search_revision_sha1(self): # when rv = self.client.get('/revision/search/?sha1_git=some_sha1') # then self.assertRedirects(rv, url_for('browse_revision', sha1_git='some_sha1')) @istest def search_revision_time_place(self): # when rv = self.client.get('/revision/search/?origin_id=42' '&branch_name=big/branch/on/tree' '&ts=meaningful_ts') # then self.assertRedirects(rv, url_for('browse_revision_with_origin', origin_id=42, branch_name='big/branch/on/tree', ts='meaningful_ts')) class SearchSymbolView(test_app.SWHViewTestCase): render_template = False @istest def search_symbol(self): # when rv = self.client.get('/content/symbol/') self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('result'), None) self.assertEqual(self.get_context_variable('message'), '') self.assertEqual(self.get_context_variable('linknext'), None) self.assertEqual(self.get_context_variable('linkprev'), None) self.assert_template_used('symbols.html') @patch('swh.web.ui.views.browse.api') @istest def search_symbol_with_result(self, mock_api): # given stub_results = [ { 'kind': 'function', 'name': 'hy', 'sha1': 'some-hash', }, ] mock_api.api_content_symbol.return_value = { 'results': stub_results, } # when rv = self.client.get('/content/symbol/?q=hy') self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('result'), stub_results) self.assertEqual(self.get_context_variable('message'), '') self.assertEqual(self.get_context_variable('linknext'), None) self.assertEqual(self.get_context_variable('linkprev'), None) self.assert_template_used('symbols.html') mock_api.api_content_symbol.assert_called_once_with('hy') @patch('swh.web.ui.views.browse.api') @istest def search_symbol_with_result_and_pages(self, mock_api): # given stub_results = [ { 'kind': 'function', 'name': 'hy', 'sha1': 'some-hash', } ] mock_api.api_content_symbol.return_value = { 'results': stub_results, 'headers': { 'link-next': 'some-link', } } # when rv = self.client.get('/content/symbol/?q=hy&per_page=1') self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('result'), stub_results) self.assertEqual(self.get_context_variable('message'), '') self.assertEqual( self.get_context_variable('linknext'), '/content/symbol/?q=hy&last_sha1=some-hash&per_page=1') self.assertEqual(self.get_context_variable('linkprev'), None) self.assert_template_used('symbols.html') mock_api.api_content_symbol.assert_called_once_with('hy') @patch('swh.web.ui.views.browse.api') @istest def search_symbol_bad_input(self, mock_api): # given mock_api.api_content_symbol.side_effect = BadInputExc('error msg') # when rv = self.client.get('/content/symbol/?q=hello|hy') self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), 'error msg') self.assertEqual(self.get_context_variable('result'), None) self.assertEqual(self.get_context_variable('linknext'), None) self.assertEqual(self.get_context_variable('linkprev'), None) self.assert_template_used('symbols.html') mock_api.api_content_symbol.assert_called_once_with('hello|hy') class SearchView(test_app.SWHViewTestCase): render_template = False @istest def search_default(self): # when rv = self.client.get('/content/search/') self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), '') self.assertEqual(self.get_context_variable('search_res'), None) self.assert_template_used('search.html') @patch('swh.web.ui.views.browse.api') @istest def search_get_query_hash_not_found(self, mock_api): # given - mock_api.api_search.return_value = { + mock_api.api_check_content_known.return_value = { 'search_res': [{ 'filename': None, 'sha1': 'sha1:456', 'found': False}], 'search_stats': {'nbfiles': 1, 'pct': 100}} # when rv = self.client.get('/content/search/?q=sha1:456') self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), '') self.assertEqual(self.get_context_variable('search_res'), [ {'filename': None, 'sha1': 'sha1:456', 'found': False}]) self.assert_template_used('search.html') - mock_api.api_search.assert_called_once_with('sha1:456') + mock_api.api_check_content_known.assert_called_once_with('sha1:456') @patch('swh.web.ui.views.browse.api') @istest def search_get_query_hash_bad_input(self, mock_api): # given - mock_api.api_search.side_effect = BadInputExc('error msg') + mock_api.api_check_content_known.side_effect = BadInputExc('error msg') # when rv = self.client.get('/content/search/?q=sha1_git:789') self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), 'error msg') self.assertEqual(self.get_context_variable('search_res'), None) self.assert_template_used('search.html') - mock_api.api_search.assert_called_once_with('sha1_git:789') + mock_api.api_check_content_known.assert_called_once_with( + 'sha1_git:789') @patch('swh.web.ui.views.browse.api') @istest def search_get_query_hash_found(self, mock_api): # given - mock_api.api_search.return_value = { + mock_api.api_check_content_known.return_value = { 'search_res': [{ 'filename': None, 'sha1': 'sha1:123', 'found': True}], 'search_stats': {'nbfiles': 1, 'pct': 100}} # when rv = self.client.get('/content/search/?q=sha1:123') self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), '') self.assertEqual(len(self.get_context_variable('search_res')), 1) resp = self.get_context_variable('search_res')[0] self.assertTrue(resp is not None) self.assertEqual(resp['sha1'], 'sha1:123') self.assertEqual(resp['found'], True) self.assert_template_used('search.html') - mock_api.api_search.assert_called_once_with('sha1:123') + mock_api.api_check_content_known.assert_called_once_with('sha1:123') @patch('swh.web.ui.views.browse.request') @patch('swh.web.ui.views.browse.api') @istest def search_post_hashes_bad_input(self, mock_api, mock_request): # given mock_request.form = {'a': ['456caf10e9535160d90e874b45aa426de762f19f'], 'b': ['745bab676c8f3cec8016e0c39ea61cf57e518865']} mock_request.method = 'POST' - mock_api.api_search.side_effect = BadInputExc( + mock_api.api_check_content_known.side_effect = BadInputExc( 'error bad input') # when (mock_request completes the post request) rv = self.client.post('/content/search/') # then self.assertEqual(rv.status_code, 200) self.assertEqual(self.get_context_variable('search_stats'), {'nbfiles': 0, 'pct': 0}) self.assertEqual(self.get_context_variable('search_res'), None) self.assertEqual(self.get_context_variable('message'), 'error bad input') self.assert_template_used('search.html') @patch('swh.web.ui.views.browse.request') @patch('swh.web.ui.views.browse.api') @istest def search_post_hashes_none(self, mock_api, mock_request): # given mock_request.form = {'a': ['456caf10e9535160d90e874b45aa426de762f19f'], 'b': ['745bab676c8f3cec8016e0c39ea61cf57e518865']} mock_request.method = 'POST' - mock_api.api_search.return_value = { + mock_api.api_check_content_known.return_value = { 'search_stats': {'nbfiles': 2, 'pct': 0}, 'search_res': [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': False}]} # when (mock_request completes the post request) rv = self.client.post('/content/search/') # then self.assertEqual(rv.status_code, 200) self.assertIsNotNone(self.get_context_variable('search_res')) self.assertTrue(self.get_context_variable('search_stats') is not None) self.assertEqual(len(self.get_context_variable('search_res')), 2) stats = self.get_context_variable('search_stats') self.assertEqual(stats['nbfiles'], 2) self.assertEqual(stats['pct'], 0) a, b = self.get_context_variable('search_res') self.assertEqual(a['found'], False) self.assertEqual(b['found'], False) self.assertEqual(self.get_context_variable('message'), '') self.assert_template_used('search.html') @patch('swh.web.ui.views.browse.request') @patch('swh.web.ui.views.browse.api') @istest def search_post_hashes_some(self, mock_api, mock_request): # given mock_request.form = {'a': '456caf10e9535160d90e874b45aa426de762f19f', 'b': '745bab676c8f3cec8016e0c39ea61cf57e518865'} mock_request.method = 'POST' - mock_api.api_search.return_value = { + mock_api.api_check_content_known.return_value = { 'search_stats': {'nbfiles': 2, 'pct': 50}, 'search_res': [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True}]} # when (mock_request completes the post request) rv = self.client.post('/content/search/') # then self.assertEqual(rv.status_code, 200) self.assertIsNotNone(self.get_context_variable('search_res')) self.assertEqual(len(self.get_context_variable('search_res')), 2) self.assertTrue(self.get_context_variable('search_stats') is not None) stats = self.get_context_variable('search_stats') self.assertEqual(stats['nbfiles'], 2) self.assertEqual(stats['pct'], 50) self.assertEqual(self.get_context_variable('message'), '') a, b = self.get_context_variable('search_res') self.assertEqual(a['found'], False) self.assertEqual(b['found'], True) self.assert_template_used('search.html') class ContentView(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.browse.api') @istest def browse_content_ko_not_found(self, mock_api): # given mock_api.api_content_metadata.side_effect = NotFoundExc( 'Not found!') # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Not found!') self.assertIsNone(self.get_context_variable('content')) mock_api.api_content_metadata.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.browse.api') @istest def browse_content_ko_bad_input(self, mock_api): # given mock_api.api_content_metadata.side_effect = BadInputExc( 'Bad input!') # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Bad input!') self.assertIsNone(self.get_context_variable('content')) mock_api.api_content_metadata.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.browse.service') @patch('swh.web.ui.views.browse.api') @istest def browse_content(self, mock_api, mock_service): # given stub_content = { 'sha1': 'sha1-hash' } mock_api.api_content_metadata.return_value = stub_content mock_api.api_content_filetype.return_value = { 'mimetype': 'text/plain', } mock_api.api_content_language.return_value = { 'lang': 'Hy', } mock_api.api_content_license.return_value = { 'licenses': ['MIT', 'BSD'], } mock_service.lookup_content_raw.return_value = { 'data': b'blah' } mock_api.api_content_ctags.return_value = [ { 'line': 12, }, { 'line': 14, } ] expected_content = { 'sha1': 'sha1-hash', 'data': 'blah', 'encoding': None, 'mimetype': 'text/plain', 'language': 'Hy', 'licenses': "MIT, BSD", } # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('content.html') self.assertIsNone(self.get_context_variable('message')) actual_content = self.get_context_variable('content') actual_content.pop('ctags') self.assertEqual(actual_content, expected_content) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-hash') mock_api.api_content_language.assert_called_once_with('sha1:sha1-hash') mock_api.api_content_filetype.assert_called_once_with('sha1:sha1-hash') mock_api.api_content_license.assert_called_once_with('sha1:sha1-hash') mock_api.api_content_metadata.assert_called_once_with('sha1:sha1-hash') mock_api.api_content_ctags.assert_called_once_with('sha1:sha1-hash') @patch('swh.web.ui.views.browse.service') @patch('swh.web.ui.views.browse.api') @istest def browse_content_less_data(self, mock_api, mock_service): # given stub_content = { 'sha1': 'ha1', } mock_api.api_content_metadata.return_value = stub_content mock_api.api_content_filetype.return_value = None mock_api.api_content_language.return_value = None mock_api.api_content_license.return_value = None mock_service.lookup_content_raw.return_value = None mock_api.api_content_ctags.return_value = [] expected_content = { 'sha1': 'ha1', 'data': None, 'encoding': None, 'mimetype': None, 'language': None, 'licenses': None, 'ctags': None, } # when rv = self.client.get('/browse/content/sha1:ha1/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('content.html') self.assertIsNone(self.get_context_variable('message')) actual_content = self.get_context_variable('content') self.assertEqual(actual_content, expected_content) mock_service.lookup_content_raw.assert_called_once_with('sha1:ha1') mock_api.api_content_language.assert_called_once_with('sha1:ha1') mock_api.api_content_filetype.assert_called_once_with('sha1:ha1') mock_api.api_content_license.assert_called_once_with('sha1:ha1') mock_api.api_content_metadata.assert_called_once_with('sha1:ha1') mock_api.api_content_ctags.assert_called_once_with('sha1:ha1') @patch('swh.web.ui.views.browse.redirect') @patch('swh.web.ui.views.browse.url_for') @istest def browse_content_raw(self, mock_urlfor, mock_redirect): # given stub_content_raw = b'some-data' mock_urlfor.return_value = '/api/content/sha1:sha1-hash/raw/' mock_redirect.return_value = stub_content_raw # when rv = self.client.get('/browse/content/sha1:sha1-hash/raw/') self.assertEqual(rv.status_code, 200) self.assertEqual(rv.data, stub_content_raw) mock_urlfor.assert_called_once_with('api_content_raw', q='sha1:sha1-hash') mock_redirect.assert_called_once_with( '/api/content/sha1:sha1-hash/raw/') class DirectoryView(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.browse.api') @istest def browse_directory_ko_bad_input(self, mock_api): # given mock_api.api_directory.side_effect = BadInputExc( 'Invalid hash') # when rv = self.client.get('/browse/directory/sha2-invalid/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('files'), []) mock_api.api_directory.assert_called_once_with( 'sha2-invalid') @patch('swh.web.ui.views.browse.api') @istest def browse_directory_empty_result(self, mock_api): # given mock_api.api_directory.return_value = [] # when rv = self.client.get('/browse/directory/some-sha1/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Listing for directory some-sha1:') self.assertEqual(self.get_context_variable('files'), []) mock_api.api_directory.assert_called_once_with( 'some-sha1') @patch('swh.web.ui.views.browse.service') @patch('swh.web.ui.views.browse.api') @istest def browse_directory_relative_file(self, mock_api, mock_service): # given stub_entry = { 'sha256': '240', 'type': 'file' } mock_service.lookup_directory_with_path.return_value = stub_entry stub_file = { 'sha1_git': '123', 'sha1': '456', 'status': 'visible', 'data_url': '/api/1/content/890', 'length': 42, 'ctime': 'Thu, 01 Oct 2015 12:13:53 GMT', 'target': 'file.txt', 'sha256': '148' } mock_api.api_content_metadata.return_value = stub_file mock_service.lookup_content_raw.return_value = { 'data': 'this is my file'} # when rv = self.client.get('/browse/directory/sha1/path/to/file/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('content.html') self.assertIsNotNone(self.get_context_variable('content')) content = self.get_context_variable('content') # change caused by call to prepare_data_for_view self.assertEqual(content['data_url'], '/browse/content/890') self.assertEqual(content['data'], 'this is my file') mock_api.api_content_metadata.assert_called_once_with('sha256:240') mock_service.lookup_content_raw.assert_called_once_with('sha256:240') @patch('swh.web.ui.views.browse.service') @patch('swh.web.ui.views.browse.api') @istest def browse_directory_relative_dir(self, mock_api, mock_service): # given mock_service.lookup_directory_with_path.return_value = { 'sha256': '240', 'target': 'abcd', 'type': 'dir' } stub_directory_ls = [ {'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'} ] mock_api.api_directory.return_value = stub_directory_ls # when rv = self.client.get('/browse/directory/sha1/path/to/dir/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('directory.html') self.assertIsNotNone(self.get_context_variable('files')) self.assertEqual(len(self.get_context_variable('files')), len(stub_directory_ls)) mock_api.api_directory.assert_called_once_with('abcd') @patch('swh.web.ui.views.browse.service') @patch('swh.web.ui.views.browse.api') @istest def browse_directory_relative_not_found(self, mock_api, mock_service): # given mock_service.lookup_directory_with_path.side_effect = NotFoundExc( 'Directory entry not found.') # when rv = self.client.get('/browse/directory/some-sha1/some/path/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Directory entry not found.') @patch('swh.web.ui.views.browse.api') @patch('swh.web.ui.views.browse.utils') @istest def browse_directory(self, mock_utils, mock_api): # given stub_directory_ls = [ {'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'} ] mock_api.api_directory.return_value = stub_directory_ls stub_directory_map = [ {'link': '/path/to/url/dir/123', 'name': 'some-dir-name'}, {'link': '/path/to/url/file/654', 'name': 'some-filename'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname'} ] mock_utils.prepare_data_for_view.return_value = stub_directory_map # when rv = self.client.get('/browse/directory/some-sha1/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Listing for directory some-sha1:') self.assertEqual(self.get_context_variable('files'), stub_directory_map) mock_api.api_directory.assert_called_once_with( 'some-sha1') mock_utils.prepare_data_for_view.assert_called_once_with( stub_directory_ls) class ContentWithOriginView(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.browse.api') # @istest def browse_content_with_origin_content_ko_not_found(self, mock_api): # given mock_api.api_content_checksum_to_origin.side_effect = NotFoundExc( 'Not found!') # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual(self.get_context_variable('message'), 'Not found!') mock_api.api_content_checksum_to_origin.assert_called_once_with( 'sha256:some-sha256') @patch('swh.web.ui.views.browse.api') # @istest def browse_content_with_origin_ko_bad_input(self, mock_api): # given mock_api.api_content_checksum_to_origin.side_effect = BadInputExc( 'Invalid hash') # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual( self.get_context_variable('message'), 'Invalid hash') mock_api.api_content_checksum_to_origin.assert_called_once_with( 'sha256:some-sha256') @patch('swh.web.ui.views.browse.api') # @istest def browse_content_with_origin(self, mock_api): # given mock_api.api_content_checksum_to_origin.return_value = { 'origin_type': 'ftp', 'origin_url': '/some/url', 'revision': 'revision-hash', 'branch': 'master', 'path': '/path/to', } # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual( self.get_context_variable('message'), "The content with hash sha256:some-sha256 has been seen on " + "origin with type 'ftp'\n" + "at url '/some/url'. The revision was identified at " + "'revision-hash' on branch 'master'.\n" + "The file's path referenced was '/path/to'.") mock_api.api_content_checksum_to_origin.assert_called_once_with( 'sha256:some-sha256') class OriginView(test_app.SWHViewTestCase): render_template = False def setUp(self): def url_for_test(fn, **args): if fn == 'browse_revision_with_origin': return '/browse/revision/origin/%s/' % args['origin_id'] elif fn == 'api_origin_visits': return '/api/1/stat/visits/%s/' % args['origin_id'] self.url_for_test = url_for_test self.stub_origin = {'type': 'git', 'lister': None, 'project': None, 'url': 'rsync://some/url', 'id': 426} @patch('swh.web.ui.views.browse.api') @istest def browse_origin_ko_not_found(self, mock_api): # given mock_api.api_origin.side_effect = NotFoundExc('Not found!') # when rv = self.client.get('/browse/origin/1/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('origin.html') self.assertIsNone(self.get_context_variable('origin')) self.assertEqual( self.get_context_variable('message'), 'Not found!') mock_api.api_origin.assert_called_once_with(1, None, None) @patch('swh.web.ui.views.browse.api') @istest def browse_origin_ko_bad_input(self, mock_api): # given mock_api.api_origin.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/origin/426/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('origin.html') self.assertIsNone(self.get_context_variable('origin')) mock_api.api_origin.assert_called_once_with(426, None, None) @patch('swh.web.ui.views.browse.api') @patch('swh.web.ui.views.browse.url_for') @istest def browse_origin_found_id(self, mock_url_for, mock_api): # given mock_url_for.side_effect = self.url_for_test mock_api.api_origin.return_value = self.stub_origin # when rv = self.client.get('/browse/origin/426/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin'), self.stub_origin) self.assertEqual(self.get_context_variable('browse_url'), '/browse/revision/origin/426/') self.assertEqual(self.get_context_variable('visit_url'), '/api/1/stat/visits/426/') mock_api.api_origin.assert_called_once_with(426, None, None) @patch('swh.web.ui.views.browse.api') @patch('swh.web.ui.views.browse.url_for') @istest def browse_origin_found_url_type(self, mock_url_for, mock_api): # given mock_url_for.side_effect = self.url_for_test mock_api.api_origin.return_value = self.stub_origin # when rv = self.client.get('/browse/origin/git/url/rsync://some/url/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin'), self.stub_origin) self.assertEqual(self.get_context_variable('browse_url'), '/browse/revision/origin/426/') self.assertEqual(self.get_context_variable('visit_url'), '/api/1/stat/visits/426/') mock_api.api_origin.assert_called_once_with(None, 'git', 'rsync://some/url') class PersonView(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.browse.api') @istest def browse_person_ko_not_found(self, mock_api): # given mock_api.api_person.side_effect = NotFoundExc('not found') # when rv = self.client.get('/browse/person/1/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 1) self.assertEqual( self.get_context_variable('message'), 'not found') mock_api.api_person.assert_called_once_with(1) @patch('swh.web.ui.views.browse.api') @istest def browse_person_ko_bad_input(self, mock_api): # given mock_api.api_person.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/person/426/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 426) mock_api.api_person.assert_called_once_with(426) @patch('swh.web.ui.views.browse.api') @istest def browse_person(self, mock_api): # given mock_person = {'type': 'git', 'lister': None, 'project': None, 'url': 'rsync://some/url', 'id': 426} mock_api.api_person.return_value = mock_person # when rv = self.client.get('/browse/person/426/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 426) self.assertEqual(self.get_context_variable('person'), mock_person) mock_api.api_person.assert_called_once_with(426) class ReleaseView(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.browse.api') @istest def browse_release_ko_not_found(self, mock_api): # given mock_api.api_release.side_effect = NotFoundExc('not found!') # when rv = self.client.get('/browse/release/1/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual( self.get_context_variable('message'), 'not found!') mock_api.api_release.assert_called_once_with('1') @patch('swh.web.ui.views.browse.api') @istest def browse_release_ko_bad_input(self, mock_api): # given mock_api.api_release.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/release/426/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') mock_api.api_release.assert_called_once_with('426') @patch('swh.web.ui.views.browse.api') @istest def browse_release(self, mock_api): # given self.maxDiff = None mock_release = { "date": "Sun, 05 Jul 2015 18:02:06 GMT", "id": "1e951912027ea6873da6985b91e50c47f645ae1a", "target": "d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754", "target_url": '/browse/revision/d770e558e21961ad6cfdf0ff7df0' 'eb5d7d4f0754/', "synthetic": False, "target_type": "revision", "author": { "email": "torvalds@linux-foundation.org", "name": "Linus Torvalds" }, "message": "Linux 4.2-rc1\n", "name": "v4.2-rc1" } mock_api.api_release.return_value = mock_release expected_release = { "date": "Sun, 05 Jul 2015 18:02:06 GMT", "id": "1e951912027ea6873da6985b91e50c47f645ae1a", "target_url": '/browse/revision/d770e558e21961ad6cfdf0ff7df0' 'eb5d7d4f0754/', "target": 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', "synthetic": False, "target_type": "revision", "author": { "email": "torvalds@linux-foundation.org", "name": "Linus Torvalds" }, "message": "Linux 4.2-rc1\n", "name": "v4.2-rc1" } # when rv = self.client.get('/browse/release/426/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual(self.get_context_variable('release'), expected_release) mock_api.api_release.assert_called_once_with('426') class RevisionView(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.browse.api') @istest def browse_revision_ko_not_found(self, mock_api): # given mock_api.api_revision.side_effect = NotFoundExc('Not found!') # when rv = self.client.get('/browse/revision/1/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual( self.get_context_variable('message'), 'Not found!') self.assertIsNone(self.get_context_variable('revision')) mock_api.api_revision.assert_called_once_with('1', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_ko_bad_input(self, mock_api): # given mock_api.api_revision.side_effect = BadInputExc('wrong input!') # when rv = self.client.get('/browse/revision/426/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual( self.get_context_variable('message'), 'wrong input!') self.assertIsNone(self.get_context_variable('revision')) mock_api.api_revision.assert_called_once_with('426', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision(self, mock_api): # given stub_revision = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory_url': '/api/1/directory/' '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b/', 'parent_url': [ '/api/1/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf/' ], } mock_api.api_revision.return_value = stub_revision expected_revision = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'parent_url': [ '/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf/' ], 'directory_url': '/browse/directory/2a1dbabeed4dcf1f4a4c441993b2f' 'fc9d972780b/', } # when rv = self.client.get('/browse/revision/426/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual(self.get_context_variable('revision'), expected_revision) self.assertIsNone(self.get_context_variable('message')) mock_api.api_revision.assert_called_once_with('426', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_raw_message(self, mock_api): # given sha1 = 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754' # when rv = self.client.get('/browse/revision/' 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754/raw/') self.assertRedirects( rv, '/api/1/revision/%s/raw/' % sha1) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_log_ko_not_found(self, mock_api): # given mock_api.api_revision_log.side_effect = NotFoundExc('Not found!') # when rv = self.client.get('/browse/revision/sha1/log/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-log.html') self.assertEqual(self.get_context_variable('sha1_git'), 'sha1') self.assertEqual( self.get_context_variable('message'), 'Not found!') self.assertEqual(self.get_context_variable('revisions'), []) mock_api.api_revision_log.assert_called_once_with('sha1', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_log_ko_bad_input(self, mock_api): # given mock_api.api_revision_log.side_effect = BadInputExc('wrong input!') # when rv = self.client.get('/browse/revision/426/log/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-log.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual( self.get_context_variable('message'), 'wrong input!') self.assertEqual(self.get_context_variable('revisions'), []) mock_api.api_revision_log.assert_called_once_with('426', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_log(self, mock_api): # given stub_revisions = { 'revisions': [{ 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory_url': '/api/1/directory/' '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b/', 'parent_url': [ '/api/1/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf/' ], }], 'next_revs_url': '/api/1/revision/1234/log/' } mock_api.api_revision_log.return_value = stub_revisions # when rv = self.client.get('/browse/revision/426/log/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-log.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertTrue( isinstance(self.get_context_variable('revisions'), map)) self.assertEqual( self.get_context_variable('next_revs_url'), '/browse/revision/1234/log/') self.assertIsNone(self.get_context_variable('message')) mock_api.api_revision_log.assert_called_once_with('426', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_log_by_ko_not_found(self, mock_api): # given mock_api.api_revision_log_by.side_effect = NotFoundExc('Not found!') # when rv = self.client.get('/browse/revision/origin/9/log/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-log.html') self.assertEqual(self.get_context_variable('origin_id'), 9) self.assertEqual( self.get_context_variable('message'), 'Not found!') self.assertEqual(self.get_context_variable('revisions'), []) mock_api.api_revision_log_by.assert_called_once_with( 9, 'refs/heads/master', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_log_by_ko_bad_input(self, mock_api): # given mock_api.api_revision_log.side_effect = BadInputExc('wrong input!') # when rv = self.client.get('/browse/revision/abcd/log/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-log.html') self.assertEqual(self.get_context_variable('sha1_git'), 'abcd') self.assertEqual( self.get_context_variable('message'), 'wrong input!') self.assertEqual(self.get_context_variable('revisions'), []) mock_api.api_revision_log.assert_called_once_with('abcd', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_log_by(self, mock_api): # given stub_revisions = [{ 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory_url': '/api/1/directory/' '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b/', 'parent_url': [ '/api/1/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf/' ], }] mock_api.api_revision_log_by.return_value = stub_revisions # when rv = self.client.get('/browse/revision/origin/2/log/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-log.html') self.assertEqual(self.get_context_variable('origin_id'), 2) self.assertTrue( isinstance(self.get_context_variable('revisions'), map)) self.assertIsNone(self.get_context_variable('message')) mock_api.api_revision_log_by.assert_called_once_with( 2, 'refs/heads/master', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_ko_not_found(self, mock_api): # given mock_api.api_revision_history.side_effect = NotFoundExc( 'Not found') # when rv = self.client.get('/browse/revision/1/history/2/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '1') self.assertEqual(self.get_context_variable('sha1_git'), '2') self.assertEqual( self.get_context_variable('message'), 'Not found') mock_api.api_revision_history.assert_called_once_with( '1', '2') @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_ko_bad_input(self, mock_api): # given mock_api.api_revision_history.side_effect = BadInputExc( 'Input incorrect') # when rv = self.client.get('/browse/revision/321/history/654/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '321') self.assertEqual(self.get_context_variable('sha1_git'), '654') self.assertEqual( self.get_context_variable('message'), 'Input incorrect') mock_api.api_revision_history.assert_called_once_with( '321', '654') @istest def browse_revision_history_ok_same_sha1(self): # when rv = self.client.get('/browse/revision/10/history/10/') # then self.assertEqual(rv.status_code, 302) @patch('swh.web.ui.views.browse.utils') @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history(self, mock_api, mock_utils): # given stub_revision = {'id': 'some-rev'} mock_api.api_revision_history.return_value = stub_revision expected_revision = { 'id': 'some-rev-id', 'author': {'name': 'foo', 'email': 'bar'}, 'committer': {'name': 'foo', 'email': 'bar'} } mock_utils.prepare_data_for_view.return_value = expected_revision # when rv = self.client.get('/browse/revision/426/history/789/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '426') self.assertEqual(self.get_context_variable('sha1_git'), '789') self.assertEqual(self.get_context_variable('revision'), expected_revision) mock_api.api_revision_history.assert_called_once_with( '426', '789') mock_utils.prepare_data_for_view.assert_called_once_with(stub_revision) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_directory_ko_not_found(self, mock_api): # given mock_api.api_revision_directory.side_effect = NotFoundExc('Not found!') # when rv = self.client.get('/browse/revision/1/directory/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual(self.get_context_variable('path'), '.') self.assertIsNone(self.get_context_variable('result')) self.assertEqual( self.get_context_variable('message'), "Not found!") mock_api.api_revision_directory.assert_called_once_with( '1', None, with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_directory_ko_bad_input(self, mock_api): # given mock_api.api_revision_directory.side_effect = BadInputExc('Bad input!') # when rv = self.client.get('/browse/revision/10/directory/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '10') self.assertEqual(self.get_context_variable('path'), '.') self.assertIsNone(self.get_context_variable('result')) self.assertEqual( self.get_context_variable('message'), "Bad input!") mock_api.api_revision_directory.assert_called_once_with( '10', None, with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_directory(self, mock_api): # given stub_result0 = { 'type': 'dir', 'revision': '100', 'content': [ { 'id': 'some-result', 'type': 'file', 'name': 'blah', }, { 'id': 'some-other-result', 'type': 'dir', 'name': 'foo', } ] } mock_api.api_revision_directory.return_value = stub_result0 stub_result1 = { 'type': 'dir', 'revision': '100', 'content': [ { 'id': 'some-result', 'type': 'file', 'name': 'blah', }, { 'id': 'some-other-result', 'type': 'dir', 'name': 'foo', } ] } # when rv = self.client.get('/browse/revision/100/directory/some/path/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '100') self.assertEqual(self.get_context_variable('revision'), '100') self.assertEqual(self.get_context_variable('path'), 'some/path') self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('result'), stub_result1) mock_api.api_revision_directory.assert_called_once_with( '100', 'some/path', with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_directory_ko_not_found(self, mock_api): # given mock_api.api_revision_history_directory.side_effect = NotFoundExc( 'not found') # when rv = self.client.get('/browse/revision/123/history/456/directory/a/b/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '123') self.assertEqual(self.get_context_variable('sha1_git'), '456') self.assertEqual(self.get_context_variable('path'), 'a/b') self.assertEqual(self.get_context_variable('message'), 'not found') self.assertIsNone(self.get_context_variable('result')) mock_api.api_revision_history_directory.assert_called_once_with( '123', '456', 'a/b', with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_directory_ko_bad_input(self, mock_api): # given mock_api.api_revision_history_directory.side_effect = BadInputExc( 'bad input') # when rv = self.client.get('/browse/revision/123/history/456/directory/a/c/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '123') self.assertEqual(self.get_context_variable('sha1_git'), '456') self.assertEqual(self.get_context_variable('path'), 'a/c') self.assertEqual(self.get_context_variable('message'), 'bad input') self.assertIsNone(self.get_context_variable('result')) mock_api.api_revision_history_directory.assert_called_once_with( '123', '456', 'a/c', with_data=True) @patch('swh.web.ui.views.browse.service') @istest def browse_revision_history_directory_ok_no_trailing_slash_so_redirect( self, mock_service): # when rv = self.client.get('/browse/revision/1/history/2/directory/path/to') # then self.assertEqual(rv.status_code, 301) @patch('swh.web.ui.views.browse.service') @istest def browse_revision_history_directory_ok_same_sha1_redirects( self, mock_service): # when rv = self.client.get('/browse/revision/1/history/1/directory/path/to') # then self.assertEqual(rv.status_code, 301) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_directory(self, mock_api): # given stub_result0 = { 'type': 'dir', 'revision': '1000', 'content': [{ 'id': 'some-result', 'type': 'file', 'name': 'blah' }] } mock_api.api_revision_history_directory.return_value = stub_result0 stub_result1 = { 'type': 'dir', 'revision': '1000', 'content': [{ 'id': 'some-result', 'type': 'file', 'name': 'blah' }] } # when rv = self.client.get('/browse/revision/100/history/999/directory/' 'path/to/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '100') self.assertEqual(self.get_context_variable('sha1_git'), '999') self.assertEqual(self.get_context_variable('revision'), '1000') self.assertEqual(self.get_context_variable('path'), 'path/to') self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('result'), stub_result1) mock_api.api_revision_history_directory.assert_called_once_with( '100', '999', 'path/to', with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_through_origin_ko_bad_input(self, mock_api): # given mock_api.api_revision_history_through_origin.side_effect = BadInputExc( 'Problem input.') # noqa # when rv = self.client.get('/browse/revision/origin/99' '/history/123/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Problem input.') mock_api.api_revision_history_through_origin.assert_called_once_with( 99, 'refs/heads/master', None, '123') @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_through_origin_ko_not_found(self, mock_api): # given mock_api.api_revision_history_through_origin.side_effect = NotFoundExc( 'Not found.') # when rv = self.client.get('/browse/revision/origin/999/' 'branch/dev/history/123/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Not found.') mock_api.api_revision_history_through_origin.assert_called_once_with( 999, 'dev', None, '123') @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_through_origin_ko_other_error(self, mock_api): # given mock_api.api_revision_history_through_origin.side_effect = ValueError( 'Other Error.') # when rv = self.client.get('/browse/revision/origin/438' '/branch/scratch' '/ts/2016' '/history/789/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Other Error.') mock_api.api_revision_history_through_origin.assert_called_once_with( 438, 'scratch', '2016', '789') @patch('swh.web.ui.views.browse.api') @istest def browse_revision_history_through_origin(self, mock_api): # given stub_rev = { 'id': 'some-id', 'author': {}, 'committer': {} } mock_api.api_revision_history_through_origin.return_value = stub_rev # when rv = self.client.get('/browse/revision/origin/99/history/123/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('revision'), stub_rev) self.assertIsNone(self.get_context_variable('message')) mock_api.api_revision_history_through_origin.assert_called_once_with( 99, 'refs/heads/master', None, '123') @patch('swh.web.ui.views.browse.api') @istest def browse_revision_with_origin_ko_not_found(self, mock_api): # given mock_api.api_revision_with_origin.side_effect = NotFoundExc( 'Not found') # when rv = self.client.get('/browse/revision/origin/1/') self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Not found') mock_api.api_revision_with_origin.assert_called_once_with( 1, 'refs/heads/master', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_with_origin_ko_bad_input(self, mock_api): # given mock_api.api_revision_with_origin.side_effect = BadInputExc( 'Bad Input') # when rv = self.client.get('/browse/revision/origin/1000/branch/dev/') self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Bad Input') mock_api.api_revision_with_origin.assert_called_once_with( 1000, 'dev', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_with_origin_ko_other(self, mock_api): # given mock_api.api_revision_with_origin.side_effect = ValueError( 'Other') # when rv = self.client.get('/browse/revision/origin/1999' '/branch/scratch/master' '/ts/1990-01-10/') self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Other') mock_api.api_revision_with_origin.assert_called_once_with( 1999, 'scratch/master', '1990-01-10') @patch('swh.web.ui.views.browse.api') @istest def browse_revision_with_origin(self, mock_api): # given stub_rev = {'id': 'some-id', 'author': {}, 'committer': {}} mock_api.api_revision_with_origin.return_value = stub_rev # when rv = self.client.get('/browse/revision/origin/1/') self.assertEqual(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('revision'), stub_rev) self.assertIsNone(self.get_context_variable('message')) mock_api.api_revision_with_origin.assert_called_once_with( 1, 'refs/heads/master', None) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_directory_through_origin_ko_not_found(self, mock_api): # given mock_api.api_directory_through_revision_origin.side_effect = BadInputExc( # noqa 'this is not the robot you are looking for') # when rv = self.client.get('/browse/revision/origin/2' '/directory/') self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertIsNone(self.get_context_variable('result')) self.assertEqual(self.get_context_variable('message'), 'this is not the robot you are looking for') mock_api.api_directory_through_revision_origin.assert_called_once_with( # noqa 2, 'refs/heads/master', None, None, with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_directory_through_origin_ko_bad_input(self, mock_api): # given mock_api.api_directory_through_revision_origin.side_effect = BadInputExc( # noqa 'Bad Robot') # when rv = self.client.get('/browse/revision/origin/2' '/directory/') self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertIsNone(self.get_context_variable('result')) self.assertEqual(self.get_context_variable('message'), 'Bad Robot') mock_api.api_directory_through_revision_origin.assert_called_once_with( 2, 'refs/heads/master', None, None, with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_directory_through_origin_ko_other(self, mock_api): # given mock_api.api_directory_through_revision_origin.side_effect = ValueError( # noqa 'Other bad stuff') # when rv = self.client.get('/browse/revision/origin/2' '/directory/') self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertIsNone(self.get_context_variable('result')) self.assertEqual(self.get_context_variable('message'), 'Other bad stuff') mock_api.api_directory_through_revision_origin.assert_called_once_with( 2, 'refs/heads/master', None, None, with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_revision_directory_through_origin(self, mock_api): # given stub_res = {'id': 'some-id', 'revision': 'some-rev-id', 'type': 'dir', 'content': 'some-content'} mock_api.api_directory_through_revision_origin.return_value = stub_res # when rv = self.client.get('/browse/revision/origin/2' '/branch/dev' '/ts/2013-20-20 10:02' '/directory/some/file/') self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('result'), stub_res) self.assertIsNone(self.get_context_variable('message')) mock_api.api_directory_through_revision_origin.assert_called_once_with( 2, 'dev', '2013-20-20 10:02', 'some/file', with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_directory_through_revision_with_origin_history_ko_not_found( self, mock_api): mock_api.api_directory_through_revision_with_origin_history.side_effect = NotFoundExc( # noqa 'Not found!') # when rv = self.client.get('/browse/revision/origin/987' '/history/sha1git' '/directory/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertIsNone(self.get_context_variable('result')) self.assertEqual(self.get_context_variable('message'), 'Not found!') self.assertEqual(self.get_context_variable('path'), '.') mock_api.api_directory_through_revision_with_origin_history.assert_called_once_with( # noqa 987, 'refs/heads/master', None, 'sha1git', None, with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_directory_through_revision_with_origin_history_ko_bad_input( self, mock_api): mock_api.api_directory_through_revision_with_origin_history.side_effect = BadInputExc( # noqa 'Bad input! Bleh!') # when rv = self.client.get('/browse/revision/origin/798' '/branch/refs/heads/dev' '/ts/2012-11-11' '/history/1234' '/directory/some/path/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertIsNone(self.get_context_variable('result')) self.assertEqual(self.get_context_variable('message'), 'Bad input! Bleh!') self.assertEqual(self.get_context_variable('path'), 'some/path') mock_api.api_directory_through_revision_with_origin_history.assert_called_once_with( # noqa 798, 'refs/heads/dev', '2012-11-11', '1234', 'some/path', with_data=True) @patch('swh.web.ui.views.browse.api') @istest def browse_directory_through_revision_with_origin_history( self, mock_api): stub_dir = {'type': 'dir', 'content': [], 'revision': 'specific-rev-id'} mock_api.api_directory_through_revision_with_origin_history.return_value = stub_dir # noqa # when rv = self.client.get('/browse/revision/origin/101010' '/ts/1955-11-12' '/history/54628' '/directory/emacs-24.5/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('result'), stub_dir) self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('path'), 'emacs-24.5') mock_api.api_directory_through_revision_with_origin_history.assert_called_once_with( # noqa 101010, 'refs/heads/master', '1955-11-12', '54628', 'emacs-24.5', with_data=True) class EntityView(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.browse.api') @istest def browse_entity_ko_not_found(self, mock_api): # given mock_api.api_entity_by_uuid.side_effect = NotFoundExc('Not found!') # when rv = self.client.get('/browse/entity/' '5f4d4c51-498a-4e28-88b3-b3e4e8396cba/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), []) self.assertEqual(self.get_context_variable('message'), 'Not found!') mock_api.api_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.views.browse.api') @istest def browse_entity_ko_bad_input(self, mock_api): # given mock_api.api_entity_by_uuid.side_effect = BadInputExc('wrong input!') # when rv = self.client.get('/browse/entity/blah-blah-uuid/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), []) self.assertEqual(self.get_context_variable('message'), 'wrong input!') mock_api.api_entity_by_uuid.assert_called_once_with( 'blah-blah-uuid') @patch('swh.web.ui.views.browse.api') @istest def browse_entity(self, mock_api): # given stub_entities = [ {'id': '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba'}] mock_api.api_entity_by_uuid.return_value = stub_entities # when rv = self.client.get('/browse/entity/' '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), stub_entities) self.assertIsNone(self.get_context_variable('message')) mock_api.api_entity_by_uuid.assert_called_once_with( '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba') class Lookup(TestCase): @patch('swh.web.ui.views.browse.api') @istest def api_lookup(self, mock_api): # given mock_api.api_content_metadata.return_value = {'id': 'blah'} # given r = browse.api_lookup(mock_api.api_content_metadata, 'sha1:blah') # then self.assertEquals(r, {'id': 'blah'}) mock_api.api_content_metadata.assert_called_once_with('sha1:blah') @patch('swh.web.ui.views.browse.api') @istest def api_lookup_not_found(self, mock_api): # given mock_api.api_content_filetype.side_effect = NotFoundExc # given r = browse.api_lookup(mock_api.api_content_filetype, 'sha1_git:foo') # then self.assertIsNone(r) mock_api.api_content_filetype.assert_called_once_with('sha1_git:foo') @patch('swh.web.ui.views.browse.api') @istest def api_lookup_bad_input(self, mock_api): # given mock_api.api_content_license.side_effect = BadInputExc # given r = browse.api_lookup(mock_api.api_content_license, 'sha1_git:foo') # then self.assertIsNone(r) mock_api.api_content_license.assert_called_once_with('sha1_git:foo') diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py index f94ddd4b6..a84e87990 100644 --- a/swh/web/ui/views/api.py +++ b/swh/web/ui/views/api.py @@ -1,1019 +1,1019 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from flask import request, url_for from swh.web.ui import service, utils, apidoc as doc from swh.web.ui.exc import NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') @doc.route('/api/1/stat/counters/', noargs=True) @doc.returns(rettype=doc.rettypes.dict, retdoc="A dictionary of SWH's most important statistics") def api_stats(): """Return statistics on SWH storage. """ return service.stat_counters() @app.route('/api/1/origin//visits/') @doc.route('/api/1/origin/visits/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.returns(rettype=doc.rettypes.list, retdoc="""All instances of visits of the origin pointed by origin_id as POSIX time since epoch (if visit_id is not defined) """) def api_origin_visits(origin_id): """Return a list of origin visit (dict) for that particular origin including date (visit date as posix timestamp), target, target_type, status, ... """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_visit_url'] = url_for('api_origin_visit', origin_id=ov['origin'], visit_id=ov['visit']) return ov return _api_lookup( origin_id, service.lookup_origin_visits, error_msg_if_not_found='No origin %s found' % origin_id, enrich_fn=_enrich_origin_visit) @app.route('/api/1/origin//visit//') @doc.route('/api/1/origin/visit/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.arg('visit_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin visit identifier') @doc.raises(exc=doc.excs.notfound, doc='Raised if no visit that match the query is found') @doc.returns(rettype=doc.rettypes.list, retdoc="""The single instance visit visit_id of the origin pointed by origin_id as POSIX time since epoch""") def api_origin_visit(origin_id, visit_id): """Return origin visit (dict) for that particular origin including (but not limited to) date (visit date as posix timestamp), target, target_type, status, ... """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_url'] = url_for('api_origin', origin_id=ov['origin']) if 'occurrences' in ov: ov['occurrences'] = { k: utils.enrich_object(v) for k, v in ov['occurrences'].items() } return ov return _api_lookup( origin_id, service.lookup_origin_visit, 'No visit %s for origin %s found' % (visit_id, origin_id), _enrich_origin_visit, visit_id) @app.route('/api/1/content/symbol/', methods=['POST']) @app.route('/api/1/content/symbol//') @doc.route('/api/1/content/symbol/', tags=['upcoming']) @doc.arg('q', default='hello', argtype=doc.argtypes.str, argdoc="""An expression string to lookup in swh's raw content""") @doc.returns(rettype=doc.rettypes.list, retdoc="""A list of dict whose content matches the expression. Each dict has the following keys: - id (bytes): identifier of the content - name (text): symbol whose content match the expression - kind (text): kind of the symbol that matched - lang (text): Language for that entry - line (int): Number line for the symbol The result is paginated by page of 10 results. The 'Link' header gives the relation to follow for the next and eventually the previous page. """) def api_content_symbol(q=None): """Search symbol in indexed content's data. The result is paginated and and Link header will give the next and previous link to have the next result. """ result = {} last_sha1 = request.args.get('last_sha1', None) per_page = int(request.args.get('per_page', '10')) def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page): return service.lookup_expression(exp, last_sha1, per_page) symbols = _api_lookup( q, lookup_fn=lookup_exp, error_msg_if_not_found='No indexed raw content match expression \'' '%s\'.' % q, enrich_fn=lambda x: utils.enrich_content(x, top_url=True)) if symbols: l = len(symbols) url = url_for('api_content_symbol', q=q) headers = {} if l == per_page: new_last_sha1 = symbols[-1]['sha1'] headers['link-next'] = utils.to_url( url, (('last_sha1', new_last_sha1), )) if headers: result['headers'] = headers result.update({ 'results': symbols }) return result -@app.route('/api/1/content/search/', methods=['POST']) -@app.route('/api/1/content/search//') -@doc.route('/api/1/content/search/') +@app.route('/api/1/content/known/', methods=['POST']) +@app.route('/api/1/content/known//') +@doc.route('/api/1/content/known/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.returns(rettype=doc.rettypes.dict, retdoc="""A dict with keys: - search_res: a list of dicts corresponding to queried content with key 'found' to True if found, 'False' if not - search_stats: a dict containing number of files searched and percentage of files found """) -def api_search(q=None): +def api_check_content_known(q=None): """Search a content per hash. This may take the form of: - a GET request with many hashes (limited to the size of parameter we can pass in url) a POST request with many hashes, with the - request body containing identifiers (typically filenames) as - keys and corresponding hashes as values. """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None queries = [] # GET: Many hash separated values request if q: hashes = q.split(',') for v in hashes: queries.append({'filename': None, 'sha1': v}) # POST: Many hash requests in post form submission elif request.method == 'POST': data = request.form # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if queries: lookup = service.lookup_multiple_hashes(queries) result = [] l = len(queries) for el in lookup: result.append({'filename': el['filename'], 'sha1': el['sha1'], 'found': el['found']}) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = l search_stats['pct'] = (nbfound / l) * 100 response['search_res'] = search_res response['search_stats'] = search_stats return response def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): return [enrich_fn(x) for x in res] return enrich_fn(res) @app.route('/api/1/origin//') @app.route('/api/1/origin//url//') @doc.route('/api/1/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The origin's SWH origin_id.") @doc.arg('origin_type', default='git', argtype=doc.argtypes.str, argdoc="The origin's type (git, svn..)") @doc.arg('origin_url', default='https://github.com/hylang/hy', argtype=doc.argtypes.path, argdoc="The origin's URL.") @doc.raises(exc=doc.excs.notfound, doc='Raised if origin_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the origin identified by origin_id') def api_origin(origin_id=None, origin_type=None, origin_url=None): """Return information about the origin matching the passed criteria. Criteria may be: - An SWH-specific ID, if you already know it - An origin type and its URL, if you do not have the origin's SWH identifier """ ori_dict = { 'id': origin_id, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} if 'id' in ori_dict: error_msg = 'Origin with id %s not found.' % ori_dict['id'] else: error_msg = 'Origin with type %s and URL %s not found' % ( ori_dict['type'], ori_dict['url']) def _enrich_origin(origin): if 'id' in origin: o = origin.copy() o['origin_visits_url'] = url_for('api_origin_visits', origin_id=o['id']) return o return origin return _api_lookup( ori_dict, lookup_fn=service.lookup_origin, error_msg_if_not_found=error_msg, enrich_fn=_enrich_origin) @app.route('/api/1/person//') @doc.route('/api/1/person/') @doc.arg('person_id', default=1, argtype=doc.argtypes.int, argdoc="The person's SWH identifier") @doc.raises(exc=doc.excs.notfound, doc='Raised if person_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the person identified by person_id') def api_person(person_id): """Return information about person with identifier person_id. """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release//') @doc.route('/api/1/release/') @doc.arg('sha1_git', default='97d8dcd0c589b1d94a5d26cf0c1e8f2f44b92bfd', argtype=doc.argtypes.sha1_git, argdoc="The release's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if the argument is not a sha1') @doc.raises(exc=doc.excs.notfound, doc='Raised if sha1_git does not correspond to a release in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the release identified by sha1_git') def api_release(sha1_git): """Return information about release with id sha1_git. """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_release) def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) else: # content result['content'] = utils.enrich_content(content) return result @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') @doc.route('/api/1/revision/origin/directory/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's origin's SWH identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""Optional timestamp (default to the nearest time crawl of timestamp)""") @doc.arg('path', default='.', argtype=doc.argtypes.path, argdoc='The path to the directory or file to display') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the passed criteria was not found""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision corresponding to the passed criteria""") def api_directory_through_revision_origin(origin_id, branch_name="refs/heads/master", ts=None, path=None, with_data=False): """Display directory or content information through a revision identified by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _revision_directory_by( { 'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') @doc.route('/api/1/revision/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The queried revision's origin identifier in SWH") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master)""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="The time at which the queried revision should be constrained") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision identified by the given criteria""") def api_revision_with_origin(origin_id, branch_name="refs/heads/master", ts=None): """Display revision information through its identification by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), utils.enrich_revision, branch_name, ts) @app.route('/api/1/revision//') @app.route('/api/1/revision//prev//') @doc.route('/api/1/revision/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier") @doc.arg('context', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the revision identified by sha1_git') def api_revision(sha1_git, context=None): """Return information about revision with id sha1_git. """ def _enrich_revision(revision, context=context): return utils.enrich_revision(revision, context) return _api_lookup( sha1_git, service.lookup_revision, 'Revision with sha1_git %s not found.' % sha1_git, _enrich_revision) @app.route('/api/1/revision//raw/') @doc.route('/api/1/revision/raw/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The queried revision's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc="""The message of the revision identified by sha1_git as a downloadable octet stream""") def api_revision_raw_message(sha1_git): """Return the raw data of the message of revision identified by sha1_git """ raw = service.lookup_revision_message(sha1_git) return app.response_class(raw['message'], headers={'Content-disposition': 'attachment;' 'filename=rev_%s_raw' % sha1_git}, mimetype='application/octet-stream') @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') @doc.route('/api/1/revision/directory/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier.") @doc.arg('dir_path', default='.', argtype=doc.argtypes.path, argdoc='The path from the top level directory') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching sha1_git was not found in SWH , or if the path specified does not exist""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the directory pointed by revision id sha1-git and dir_path""") def api_revision_directory(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). """ return _revision_directory_by( { 'sha1_git': sha1_git }, dir_path, request.path, with_data=with_data) @app.route('/api/1/revision//log/') @app.route('/api/1/revision//prev//log/') @doc.route('/api/1/revision/log/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's identifier queried") @doc.arg('prev_sha1s', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc="""(Optional) Navigation breadcrumbs (descendant revisions previously visited). If multiple values, use / as delimiter. """) @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git or prev_sha1s is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The log data starting at the revision identified by sha1_git, completed with the navigation breadcrumbs, if any""") def api_revision_log(sha1_git, prev_sha1s=None): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git, or the first breadcrumb, if any. The result is paginated. To browse for the following revisions, use the link mentioned in the 'next_revs_url' key. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} revisions = None next_revs_url = None def lookup_revision_log_with_limit(s, limit=limit+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) if len(rev_get) == limit+1: rev_backward = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = _api_lookup(rev_forward_ids, lookup_fn=service.lookup_revision_multiple, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/revision' '/origin//log/') @app.route('/api/1/revision' '/origin/' '/branch//log/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//log/') @app.route('/api/1/revision' '/origin/' '/ts//log/') @doc.route('/api/1/revision/origin/log/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's SWH origin identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""(Optional) The revision's branch name within the origin specified. Defaults to 'refs/heads/master'.""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""(Optional) A time or timestamp string to parse""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision log starting at the revision matching the given criteria.""") def api_revision_log_by(origin_id, branch_name='refs/heads/master', ts=None): """Show all revisions (~git log) starting from the revision targeted by the origin_id provided and optionally a branch name or/and a timestamp. The result is paginated. To browse the following revisions, use the link mentioned in the 'next_revs_url' key. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} next_revs_url = None if ts: ts = utils.parse_timestamp(ts) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = _api_lookup(origin_id, lookup_revision_log_by_with_limit, error_msg, utils.enrich_revision, branch_name, ts) if len(rev_get) == limit+1: revisions = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: revisions = rev_get response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/directory//') @app.route('/api/1/directory///') @doc.route('/api/1/directory/') @doc.arg('sha1_git', default='1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8', argtype=doc.argtypes.sha1_git, argdoc="The queried directory's corresponding sha1_git hash") @doc.arg('path', default='.', argtype=doc.argtypes.path, argdoc="A path relative to the queried directory's top level") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a directory matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata and contents of the release identified by sha1_git""") def api_directory(sha1_git, path=None): """Return information about release with id sha1_git. """ if path: error_msg_path = ('Entry with path %s relative to directory ' 'with sha1_git %s not found.') % (path, sha1_git) return _api_lookup( sha1_git, service.lookup_directory_with_path, error_msg_path, utils.enrich_directory, path) else: error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, service.lookup_directory, error_msg_nopath, utils.enrich_directory) @app.route('/api/1/provenance//') @doc.route('/api/1/provenance/', tags=['hidden']) @doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""List of provenance information (dict) for the matched content.""") def api_content_provenance(q): """Return content's provenance information if any. """ def _enrich_revision(provenance): p = provenance.copy() p['revision_url'] = url_for('api_revision', sha1_git=provenance['revision']) p['content_url'] = url_for('api_content_metadata', q='sha1_git:%s' % provenance['content']) p['origin_url'] = url_for('api_origin', origin_id=provenance['origin']) p['origin_visits_url'] = url_for('api_origin_visits', origin_id=provenance['origin']) p['origin_visit_url'] = url_for('api_origin_visit', origin_id=provenance['origin'], visit_id=provenance['visit']) return p return _api_lookup( q, lookup_fn=service.lookup_content_provenance, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=_enrich_revision) @app.route('/api/1/filetype//') @doc.route('/api/1/filetype/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Filetype information (dict) for the matched content.""") def api_content_filetype(q): """Return content's filetype information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_filetype, error_msg_if_not_found='No filetype information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/language//') @doc.route('/api/1/language/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Language information (dict) for the matched content.""") def api_content_language(q): """Return content's language information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_language, error_msg_if_not_found='No language information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/license//') @doc.route('/api/1/license/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""License information (dict) for the matched content.""") def api_content_license(q): """Return content's license information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_license, error_msg_if_not_found='No license information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/ctags//') @doc.route('/api/1/ctags/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Ctags symbol (dict) for the matched content.""") def api_content_ctags(q): """Return content's ctags symbols if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_ctags, error_msg_if_not_found='No ctags symbol found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/content//raw/') @doc.route('/api/1/content/raw/', tags=['hidden']) @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc='The raw content data as an octet stream') def api_content_raw(q): """Return content's raw data if content is found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return app.response_class(generate(content), headers={'Content-disposition': 'attachment;' 'filename=content_%s_raw' % q}, mimetype='application/octet-stream') @app.route('/api/1/content//') @doc.route('/api/1/content/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the content identified by q. If content decoding was successful, it also returns the data""") def api_content_metadata(q): """Return content information if content is found. """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=utils.enrich_content) @app.route('/api/1/entity//') @doc.route('/api/1/entity/', tags=['hidden']) @doc.arg('uuid', default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba', argtype=doc.argtypes.uuid, argdoc="The entity's uuid identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if uuid is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if an entity matching uuid was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the entity identified by uuid') def api_entity_by_uuid(uuid): """Return content information if content is found. """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=utils.enrich_entity) diff --git a/swh/web/ui/views/browse.py b/swh/web/ui/views/browse.py index 35f08d69c..c1ee1cc24 100644 --- a/swh/web/ui/views/browse.py +++ b/swh/web/ui/views/browse.py @@ -1,1032 +1,1032 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from encodings.aliases import aliases from flask import render_template, request, url_for, redirect from swh.core.hashutil import ALGORITHMS from swh.core.utils import grouper from .. import service, utils, apidoc from ..exc import BadInputExc, NotFoundExc from ..main import app from . import api hash_filter_keys = ALGORITHMS def api_lookup(api_fn, query): """Lookup api with api_fn function with parameter query. Example: filetype = api_lookup('api_content_filetype', 'sha1:blah') if filetype: content['mimetype'] = filetype['mimetype'] """ try: return api_fn(query) except (NotFoundExc, BadInputExc): return None @app.route('/origin/search/') def search_origin(): """ Redirect request with GET params for an origin to our fragmented URI scheme """ if request.method == 'GET': data = request.args origin_id = data.get('origin_id') if origin_id: return redirect(url_for('browse_origin', origin_id=origin_id)) args = ['origin_type', 'origin_url'] values = {arg: data.get(arg) for arg in args if data.get(arg)} if 'origin_type' in values and 'origin_url' in values: return redirect(url_for('browse_origin', **values)) @app.route('/directory/search/') def search_directory(): """ Redirect request with GET params for a directory to our fragmented URI scheme """ def url_for_filtered(endpoint, **kwargs): """Make url_for ignore keyword args that have an empty string for value """ filtered = {k: v for k, v in kwargs.items() if kwargs[k]} return url_for(endpoint, **filtered) if request.method == 'GET': data = request.args sha1_git = data.get('sha1_git') if sha1_git: if 'dir_path' in data: # dir_path exists only in requests for a revision's directory return redirect(url_for_filtered( 'browse_revision_directory', sha1_git=sha1_git, dir_path=data.get('dir_path') )) return redirect(url_for_filtered( 'browse_directory', sha1_git=sha1_git, path=data.get('path') )) args = ['origin_id', 'branch_name', 'ts', 'path'] values = {arg: data.get(arg) for arg in args if data.get(arg)} if 'origin_id' in values: return redirect(url_for('browse_revision_directory_through_origin', **values)) @app.route('/revision/search/') def search_revision(): """ Redirect request with GET params for a revision to our fragmented URI scheme """ if request.method == 'GET': data = request.args sha1_git = data.get('sha1_git') if sha1_git: return redirect(url_for('browse_revision', sha1_git=sha1_git)) args = ['origin_id', 'branch_name', 'ts'] values = {arg: data.get(arg) for arg in args if data.get(arg)} if 'origin_id' in values: return redirect(url_for('browse_revision_with_origin', **values)) @app.route('/content/symbol/', methods=['GET']) def search_symbol(): """Search for symbols in contents. Returns: dict representing data to look for in swh storage. """ env = { 'result': None, 'per_page': None, 'message': '', 'linknext': None, 'linkprev': None, } # Read form or get information data = request.args q = data.get('q') per_page = data.get('per_page') env['q'] = q if per_page: env['per_page'] = per_page if q: try: result = api.api_content_symbol(q) if result: headers = result.get('headers') result = utils.prepare_data_for_view(result['results']) env['result'] = result if headers: url = url_for('search_symbol') if 'link-next' in headers: next_last_sha1 = result[-1]['sha1'] if per_page: params = (('q', q), ('last_sha1', next_last_sha1), ('per_page', per_page)) else: params = (('q', q), ('last_sha1', next_last_sha1)) env['linknext'] = utils.to_url(url, params) except BadInputExc as e: env['message'] = str(e) return render_template('symbols.html', **env) @app.route('/content/search/', methods=['GET', 'POST']) def search_content(): """Search for hashes in swh-storage. One form to submit either: - hash query to look up in swh storage - file hashes calculated client-side to be queried in swh storage - both Returns: dict representing data to look for in swh storage. The following keys are returned: - search_stats: {'nbfiles': X, 'pct': Y} the number of total queried files and percentage of files not in storage respectively - responses: array of {'filename': X, 'sha1': Y, 'found': Z} - messages: General messages. TODO: Batch-process with all checksums, not just sha1 """ env = {'search_res': None, 'search_stats': None, 'message': []} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None message = '' # Get with a single hash request if request.method == 'POST': # Post form submission with many hash requests q = None else: data = request.args q = data.get('q') try: - search = api.api_search(q) + search = api.api_check_content_known(q) search_res = search['search_res'] search_stats = search['search_stats'] except BadInputExc as e: message = str(e) env['search_stats'] = search_stats env['search_res'] = search_res env['message'] = message return render_template('search.html', **env) @app.route('/browse/') def browse(): """Render the user-facing browse view """ return render_template('browse.html') @app.route('/api/1/') def browse_api_endpoints(): """Display the list of opened api endpoints. """ routes = apidoc.APIUrls.get_app_endpoints() # Return a list of routes with consistent ordering env = { 'doc_routes': sorted(routes.items()) } return render_template('api-endpoints.html', **env) @app.route('/api/') def browse_api_doc(): """Display the API's documentation. """ return render_template('api.html') @app.route('/browse/content//') def browse_content(q): """Given a hash and a checksum, display the content's meta-data. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ env = {'q': q, 'message': None, 'content': None} encoding = request.args.get('encoding', 'utf8') if encoding not in aliases: env['message'] = 'Encoding %s not supported.' \ 'Supported Encodings: %s' % ( encoding, list(aliases.keys())) return render_template('content.html', **env) try: content = api.api_content_metadata(q) filetype = api_lookup(api.api_content_filetype, q) if filetype: content['mimetype'] = filetype.get('mimetype') content['encoding'] = filetype.get('encoding') else: content['mimetype'] = None content['encoding'] = None language = api_lookup(api.api_content_language, q) if language: content['language'] = language.get('lang') else: content['language'] = None licenses = api_lookup(api.api_content_license, q) if licenses: content['licenses'] = ', '.join(licenses.get('licenses', [])) else: content['licenses'] = None content_raw = service.lookup_content_raw(q) if content_raw: content['data'] = content_raw['data'] else: content['data'] = None ctags = api_lookup(api.api_content_ctags, q) if ctags: url = url_for('browse_content', q=q) content['ctags'] = grouper(( '%s' % ( url, ctag['line'], ctag['line']) for ctag in ctags ), 20) else: content['ctags'] = None env['content'] = utils.prepare_data_for_view(content, encoding=encoding) except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('content.html', **env) @app.route('/browse/content//raw/') def browse_content_raw(q): """Given a hash and a checksum, display the content's raw data. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ return redirect(url_for('api_content_raw', q=q)) def _origin_seen(q, data): """Given an origin, compute a message string with the right information. Args: origin: a dictionary with keys: - origin: a dictionary with type and url keys - occurrence: a dictionary with a validity range Returns: Message as a string """ origin_type = data['origin_type'] origin_url = data['origin_url'] revision = data['revision'] branch = data['branch'] path = data['path'] return """The content with hash %s has been seen on origin with type '%s' at url '%s'. The revision was identified at '%s' on branch '%s'. The file's path referenced was '%s'.""" % (q, origin_type, origin_url, revision, branch, path) # @app.route('/browse/content//origin/') def browse_content_with_origin(q): """Show content information. Args: - q: query string of the form with `algo_hash` in sha1, sha1_git, sha256. This means that several different URLs (at least one per HASH_ALGO) will point to the same content sha: the sha with 'hash' format Returns: The content's information at for a given checksum. """ env = {'q': q} try: origin = api.api_content_checksum_to_origin(q) message = _origin_seen(q, origin) except (NotFoundExc, BadInputExc) as e: message = str(e) env['message'] = message return render_template('content-with-origin.html', **env) @app.route('/browse/directory//') @app.route('/browse/directory///') def browse_directory(sha1_git, path=None): """Show directory information. Args: - sha1_git: the directory's sha1 git identifier. If path is set, the base directory for the relative path to the entry - path: the path to the requested entry, relative to the directory pointed by sha1_git Returns: The content's information at sha1_git, or at sha1_git/path if path is set. """ env = {'sha1_git': sha1_git, 'files': []} try: if path: env['message'] = ('Listing for directory with path %s from %s:' % (path, sha1_git)) dir_or_file = service.lookup_directory_with_path( sha1_git, path) if dir_or_file['type'] == 'file': fsha = 'sha256:%s' % dir_or_file['sha256'] content = api.api_content_metadata(fsha) content_raw = service.lookup_content_raw(fsha) if content_raw: # FIXME: currently assuming utf8 encoding content['data'] = content_raw['data'] env['content'] = utils.prepare_data_for_view( content, encoding='utf-8') return render_template('content.html', **env) else: directory_files = api.api_directory(dir_or_file['target']) env['files'] = utils.prepare_data_for_view(directory_files) else: env['message'] = "Listing for directory %s:" % sha1_git directory_files = api.api_directory(sha1_git) env['files'] = utils.prepare_data_for_view(directory_files) except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('directory.html', **env) @app.route('/browse/origin//url//') @app.route('/browse/origin//') def browse_origin(origin_id=None, origin_type=None, origin_url=None): """Browse origin matching given criteria - either origin_id or origin_type and origin_path. Args: - origin_id: origin's swh identifier - origin_type: origin's type - origin_url: origin's URL """ # URLs for the calendar JS plugin env = {'browse_url': None, 'visit_url': None, 'origin': None} try: origin = api.api_origin(origin_id, origin_type, origin_url) env['origin'] = origin env['browse_url'] = url_for('browse_revision_with_origin', origin_id=origin['id']) env['visit_url'] = url_for('api_origin_visits', origin_id=origin['id']) except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('origin.html', **env) @app.route('/browse/person//') def browse_person(person_id): """Browse person with id id. """ env = {'person_id': person_id, 'person': None, 'message': None} try: env['person'] = api.api_person(person_id) except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('person.html', **env) @app.route('/browse/release//') def browse_release(sha1_git): """Browse release with sha1_git. """ env = {'sha1_git': sha1_git, 'message': None, 'release': None} try: rel = api.api_release(sha1_git) env['release'] = utils.prepare_data_for_view(rel) except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('release.html', **env) @app.route('/browse/revision//') @app.route('/browse/revision//prev//') def browse_revision(sha1_git, prev_sha1s=None): """Browse the revision with git SHA1 sha1_git_cur, while optionally keeping the context from which we came as a list of previous (i.e. later) revisions' sha1s. Args: sha1_git: the requested revision's sha1_git. prev_sha1s: an optional string of /-separated sha1s representing our context, ordered by descending revision date. Returns: Information about revision of git SHA1 sha1_git_cur, with relevant URLS pointing to the context augmented with sha1_git_cur. Example: GET /browse/revision/ """ env = {'sha1_git': sha1_git, 'message': None, 'revision': None} try: rev = api.api_revision(sha1_git, prev_sha1s) env['revision'] = utils.prepare_data_for_view(rev) except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('revision.html', **env) @app.route('/browse/revision//raw/') def browse_revision_raw_message(sha1_git): """Given a sha1_git, display the corresponding revision's raw message. """ return redirect(url_for('api_revision_raw_message', sha1_git=sha1_git)) @app.route('/browse/revision//log/') @app.route('/browse/revision//prev//log/') def browse_revision_log(sha1_git, prev_sha1s=None): """Browse revision with sha1_git's log. If the navigation path through the commit tree is specified, we intersect the earliest revision's log with the revisions the user browsed through - ie the path taken to the specified revision. Args: sha1_git: the current revision's SHA1_git checksum prev_sha1s: optionally, the path through which we want log information """ env = {'sha1_git': sha1_git, 'sha1_url': '/browse/revision/%s/' % sha1_git, 'message': None, 'revisions': []} try: revision_data = api.api_revision_log(sha1_git, prev_sha1s) revisions = revision_data['revisions'] next_revs_url = revision_data['next_revs_url'] env['revisions'] = map(utils.prepare_data_for_view, revisions) env['next_revs_url'] = utils.prepare_data_for_view(next_revs_url) except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('revision-log.html', **env) @app.route('/browse/revision' '/origin//log/') @app.route('/browse/revision' '/origin/' '/branch//log/') @app.route('/browse/revision' '/origin/' '/branch/' '/ts//log/') @app.route('/browse/revision' '/origin/' '/ts//log/') def browse_revision_log_by(origin_id, branch_name='refs/heads/master', timestamp=None): """Browse the revision described by origin, branch name and timestamp's log Args: origin_id: the revision's origin branch_name: the revision's branch timestamp: the requested timeframe for the revision Returns: The revision log of the described revision as a list of revisions if it is found. """ env = {'sha1_git': None, 'origin_id': origin_id, 'origin_url': '/browse/origin/%d/' % origin_id, 'branch_name': branch_name, 'timestamp': timestamp, 'message': None, 'revisions': []} try: revisions = api.api_revision_log_by( origin_id, branch_name, timestamp) env['revisions'] = map(utils.prepare_data_for_view, revisions) except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('revision-log.html', **env) @app.route('/browse/revision//prev//') def browse_with_rev_context(sha1_git_cur, sha1s): """Browse the revision with git SHA1 sha1_git_cur, while keeping the context from which we came as a list of previous (i.e. later) revisions' sha1s. Args: sha1_git_cur: the requested revision's sha1_git. sha1s: a string of /-separated sha1s representing our context, ordered by descending revision date. Returns: Information about revision of git SHA1 sha1_git_cur, with relevant URLS pointing to the context augmented with sha1_git_cur. Example: GET /browse/revision/ """ env = {'sha1_git': sha1_git_cur, 'message': None, 'revision': None} try: revision = api.api_revision( sha1_git_cur, sha1s) env['revision'] = utils.prepare_data_for_view(revision) except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision.html', **env) @app.route('/browse/revision//history//') def browse_revision_history(sha1_git_root, sha1_git): """Display information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. """ env = {'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, 'message': None, 'keys': [], 'revision': None} if sha1_git == sha1_git_root: return redirect(url_for('browse_revision', sha1_git=sha1_git)) try: revision = api.api_revision_history(sha1_git_root, sha1_git) env['revision'] = utils.prepare_data_for_view(revision) except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision.html', **env) @app.route('/browse/revision//directory/') @app.route('/browse/revision//directory//') def browse_revision_directory(sha1_git, dir_path=None): """Browse directory from revision with sha1_git. """ env = { 'sha1_git': sha1_git, 'path': '.' if not dir_path else dir_path, 'message': None, 'result': None } encoding = request.args.get('encoding', 'utf8') if encoding not in aliases: env['message'] = 'Encoding %s not supported.' \ 'Supported Encodings: %s' % ( encoding, list(aliases.keys())) return render_template('revision-directory.html', **env) try: result = api.api_revision_directory(sha1_git, dir_path, with_data=True) result['content'] = utils.prepare_data_for_view(result['content'], encoding=encoding) env['revision'] = result['revision'] env['result'] = result except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision-directory.html', **env) @app.route('/browse/revision/' '/history/' '/directory/') @app.route('/browse/revision/' '/history/' '/directory//') def browse_revision_history_directory(sha1_git_root, sha1_git, path=None): """Return information about directory pointed to by the revision defined as: revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. path: optional directory pointed to by that revision. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root or the path referenced does not exist """ env = { 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, 'path': '.' if not path else path, 'message': None, 'result': None } encoding = request.args.get('encoding', 'utf8') if encoding not in aliases: env['message'] = 'Encoding %s not supported.' \ 'Supported Encodings: %s' % ( encoding, list(aliases.keys())) return render_template('revision-directory.html', **env) if sha1_git == sha1_git_root: return redirect(url_for('browse_revision_directory', sha1_git=sha1_git, path=path, encoding=encoding), code=301) try: result = api.api_revision_history_directory(sha1_git_root, sha1_git, path, with_data=True) env['revision'] = result['revision'] env['content'] = utils.prepare_data_for_view(result['content'], encoding=encoding) env['result'] = result except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision-directory.html', **env) @app.route('/browse/revision' '/origin/' '/history/' '/directory/') @app.route('/browse/revision' '/origin/' '/history/' '/directory//') @app.route('/browse/revision' '/origin/' '/branch/' '/history/' '/directory/') @app.route('/browse/revision' '/origin/' '/branch/' '/history/' '/directory//') @app.route('/browse/revision' '/origin/' '/ts/' '/history/' '/directory/') @app.route('/browse/revision' '/origin/' '/ts/' '/history/' '/directory//') @app.route('/browse/revision' '/origin/' '/branch/' '/ts/' '/history/' '/directory/') @app.route('/browse/revision' '/origin/' '/branch/' '/ts/' '/history/' '/directory//') def browse_directory_through_revision_with_origin_history( origin_id, branch_name="refs/heads/master", ts=None, sha1_git=None, path=None): env = { 'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts, 'sha1_git': sha1_git, 'path': '.' if not path else path, 'message': None, 'result': None } encoding = request.args.get('encoding', 'utf8') if encoding not in aliases: env['message'] = (('Encoding %s not supported.' 'Supported Encodings: %s') % ( encoding, list(aliases.keys()))) return render_template('revision-directory.html', **env) try: result = api.api_directory_through_revision_with_origin_history( origin_id, branch_name, ts, sha1_git, path, with_data=True) env['revision'] = result['revision'] env['content'] = utils.prepare_data_for_view(result['content'], encoding=encoding) env['result'] = result except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision-directory.html', **env) @app.route('/browse/revision' '/origin/') @app.route('/browse/revision' '/origin//') @app.route('/browse/revision' '/origin/' '/branch//') @app.route('/browse/revision' '/origin/' '/branch/' '/ts//') @app.route('/browse/revision' '/origin/' '/ts//') def browse_revision_with_origin(origin_id, branch_name="refs/heads/master", ts=None): """Instead of having to specify a (root) revision by SHA1_GIT, users might want to specify a place and a time. In SWH a "place" is an origin; a "time" is a timestamp at which some place has been observed by SWH crawlers. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ env = {'message': None, 'revision': None} try: revision = api.api_revision_with_origin(origin_id, branch_name, ts) env['revision'] = utils.prepare_data_for_view(revision) except (ValueError, NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('revision.html', **env) @app.route('/browse/revision' '/origin/' '/history//') @app.route('/browse/revision' '/origin/' '/branch/' '/history//') @app.route('/browse/revision' '/origin/' '/branch/' '/ts/' '/history//') def browse_revision_history_through_origin(origin_id, branch_name='refs/heads/master', ts=None, sha1_git=None): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of the revision root identified by (origin_id, branch_name, ts). Given sha1_git_root such root revision's identifier, in other words, sha1_git is an ancestor of sha1_git_root. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. """ env = {'message': None, 'revision': None} try: revision = api.api_revision_history_through_origin( origin_id, branch_name, ts, sha1_git) env['revision'] = utils.prepare_data_for_view(revision) except (ValueError, BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision.html', **env) @app.route('/browse/revision' '/origin/' '/directory/') @app.route('/browse/revision' '/origin/' '/directory/') @app.route('/browse/revision' '/origin/' '/branch/' '/directory/') @app.route('/browse/revision' '/origin/' '/branch/' '/directory//') @app.route('/browse/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/browse/revision' '/origin/' '/branch/' '/ts/' '/directory//') def browse_revision_directory_through_origin(origin_id, branch_name='refs/heads/master', ts=None, path=None): env = {'message': None, 'origin_id': origin_id, 'ts': ts, 'path': '.' if not path else path, 'result': None} encoding = request.args.get('encoding', 'utf8') if encoding not in aliases: env['message'] = 'Encoding %s not supported.' \ 'Supported Encodings: %s' % ( encoding, list(aliases.keys())) return render_template('revision-directory.html', **env) try: result = api.api_directory_through_revision_origin( origin_id, branch_name, ts, path, with_data=True) result['content'] = utils.prepare_data_for_view(result['content'], encoding=encoding) env['revision'] = result['revision'] env['result'] = result except (ValueError, BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision-directory.html', **env) @app.route('/browse/entity/') @app.route('/browse/entity//') def browse_entity(uuid): env = {'entities': [], 'message': None} try: entities = api.api_entity_by_uuid(uuid) env['entities'] = entities except (NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('entity.html', **env)