diff --git a/swh/web/api/apiresponse.py b/swh/web/api/apiresponse.py index 1d47b513..551346bb 100644 --- a/swh/web/api/apiresponse.py +++ b/swh/web/api/apiresponse.py @@ -1,192 +1,193 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import traceback from django.utils.html import escape from rest_framework.response import Response from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.api import utils from swh.web.common.exc import NotFoundExc, ForbiddenExc, BadInputExc from swh.web.common.utils import shorten_path, gen_path_info from swh.web.config import get_config -def compute_link_header(rv, options): +def compute_link_header(request, rv, options): """Add Link header in returned value results. Args: + request: a DRF Request object rv (dict): dictionary with keys: - headers: potential headers with 'link-next' and 'link-prev' keys - results: containing the result to return options (dict): the initial dict to update with result if any Returns: dict: dictionary with optional keys 'link-next' and 'link-prev' """ link_headers = [] if 'headers' not in rv: return {} rv_headers = rv['headers'] if 'link-next' in rv_headers: link_headers.append('<%s>; rel="next"' % ( - rv_headers['link-next'])) + request.build_absolute_uri(rv_headers['link-next']))) if 'link-prev' in rv_headers: link_headers.append('<%s>; rel="previous"' % ( - rv_headers['link-prev'])) + request.build_absolute_uri(rv_headers['link-prev']))) if link_headers: link_header_str = ','.join(link_headers) headers = options.get('headers', {}) headers.update({ 'Link': link_header_str }) return headers return {} def filter_by_fields(request, data): """Extract a request parameter 'fields' if it exists to permit the filtering on the data dict's keys. If such field is not provided, returns the data as is. """ fields = request.query_params.get('fields') if fields: fields = set(fields.split(',')) data = utils.filter_field_keys(data, fields) return data def transform(rv): """Transform an eventual returned value with multiple layer of information with only what's necessary. If the returned value rv contains the 'results' key, this is the associated value which is returned. Otherwise, return the initial dict without the potential 'headers' key. """ if 'results' in rv: return rv['results'] if 'headers' in rv: rv.pop('headers') return rv def make_api_response(request, data, doc_data={}, options={}): """Generates an API response based on the requested mimetype. Args: request: a DRF Request object data: raw data to return in the API response doc_data: documentation data for HTML response options: optional data that can be used to generate the response Returns: a DRF Response a object """ if data: - options['headers'] = compute_link_header(data, options) + options['headers'] = compute_link_header(request, data, options) data = transform(data) data = filter_by_fields(request, data) doc_env = doc_data headers = {} if 'headers' in options: doc_env['headers_data'] = options['headers'] headers = options['headers'] # get request status code doc_env['status_code'] = options.get('status', 200) response_args = {'status': doc_env['status_code'], 'headers': headers, 'content_type': request.accepted_media_type} # when requesting HTML, typically when browsing the API through its # documented views, we need to enrich the input data with documentation # related ones and inform DRF that we request HTML template rendering if request.accepted_media_type == 'text/html': if data: data = json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')) doc_env['response_data'] = data doc_env['request'] = { 'path': request.path, 'method': request.method, 'absolute_uri': request.build_absolute_uri(), } doc_env['heading'] = shorten_path(str(request.path)) if 'route' in doc_env: doc_env['endpoint_path'] = gen_path_info(doc_env['route']) response_args['data'] = doc_env response_args['template_name'] = 'api/apidoc.html' # otherwise simply return the raw data and let DRF picks # the correct renderer (JSON or YAML) else: response_args['data'] = data return Response(**response_args) def error_response(request, error, doc_data): """Private function to create a custom error response. Args: request: a DRF Request object error: the exception that caused the error doc_data: documentation data for HTML response """ error_code = 500 if isinstance(error, BadInputExc): error_code = 400 elif isinstance(error, NotFoundExc): error_code = 404 elif isinstance(error, ForbiddenExc): error_code = 403 elif isinstance(error, StorageDBError): error_code = 503 elif isinstance(error, StorageAPIError): error_code = 503 error_opts = {'status': error_code} error_data = { 'exception': error.__class__.__name__, 'reason': str(error), } if request.accepted_media_type == 'text/html': error_data['reason'] = escape(error_data['reason']) if get_config()['debug']: error_data['traceback'] = traceback.format_exc() return make_api_response(request, error_data, doc_data, options=error_opts) diff --git a/swh/web/tests/api/test_apiresponse.py b/swh/web/tests/api/test_apiresponse.py index 8c4cfe39..6ec7ecd0 100644 --- a/swh/web/tests/api/test_apiresponse.py +++ b/swh/web/tests/api/test_apiresponse.py @@ -1,152 +1,159 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json -from rest_framework.test import APIRequestFactory - from swh.web.api.apiresponse import ( compute_link_header, transform, make_api_response, filter_by_fields ) -api_request_factory = APIRequestFactory() - -def test_compute_link_header(): +def test_compute_link_header(api_request_factory): + next_link = '/api/endpoint/next' + prev_link = '/api/endpoint/prev' rv = { - 'headers': {'link-next': 'foo', 'link-prev': 'bar'}, + 'headers': {'link-next': next_link, 'link-prev': prev_link}, 'results': [1, 2, 3] } options = {} - headers = compute_link_header(rv, options) + request = api_request_factory.get('/api/endpoint/') + + headers = compute_link_header(request, rv, options) - assert headers == {'Link': '; rel="next",; rel="previous"'} + assert headers == { + 'Link': (f'<{request.build_absolute_uri(next_link)}>; rel="next",' + f'<{request.build_absolute_uri(prev_link)}>; rel="previous"') + } -def test_compute_link_header_nothing_changed(): +def test_compute_link_header_nothing_changed(api_request_factory): rv = {} options = {} - headers = compute_link_header(rv, options) + request = api_request_factory.get('/api/test/path/') + + headers = compute_link_header(request, rv, options) assert headers == {} -def test_compute_link_header_nothing_changed_2(): +def test_compute_link_header_nothing_changed_2(api_request_factory): rv = {'headers': {}} options = {} - headers = compute_link_header(rv, options) + request = api_request_factory.get('/api/test/path/') + + headers = compute_link_header(request, rv, options) assert headers == {} def test_transform_only_return_results_1(): rv = {'results': {'some-key': 'some-value'}} assert transform(rv) == {'some-key': 'some-value'} def test_transform_only_return_results_2(): rv = {'headers': {'something': 'do changes'}, 'results': {'some-key': 'some-value'}} assert transform(rv) == {'some-key': 'some-value'} def test_transform_do_remove_headers(): rv = {'headers': {'something': 'do changes'}, 'some-key': 'some-value'} assert transform(rv) == {'some-key': 'some-value'} def test_transform_do_nothing(): rv = {'some-key': 'some-value'} assert transform(rv) == {'some-key': 'some-value'} -def test_swh_multi_response_mimetype(mocker): +def test_swh_multi_response_mimetype(mocker, api_request_factory): mock_shorten_path = mocker.patch('swh.web.api.apiresponse.shorten_path') mock_filter = mocker.patch('swh.web.api.apiresponse.filter_by_fields') mock_json = mocker.patch('swh.web.api.apiresponse.json') data = { 'data': [12, 34], 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' } mock_filter.return_value = data mock_shorten_path.return_value = 'my_short_path' accepted_response_formats = {'html': 'text/html', 'yaml': 'application/yaml', 'json': 'application/json'} for format in accepted_response_formats: request = api_request_factory.get('/api/test/path/') mime_type = accepted_response_formats[format] setattr(request, 'accepted_media_type', mime_type) if mime_type == 'text/html': expected_data = { 'response_data': json.dumps(data), 'request': { 'path': request.path, 'method': request.method, 'absolute_uri': request.build_absolute_uri() }, 'headers_data': {}, 'heading': 'my_short_path', 'status_code': 200 } mock_json.dumps.return_value = json.dumps(data) else: expected_data = data rv = make_api_response(request, data) mock_filter.assert_called_with(request, data) assert rv.status_code == 200, rv.data assert rv.data == expected_data if mime_type == 'text/html': assert rv.template_name == 'api/apidoc.html' -def test_swh_filter_renderer_do_nothing(): +def test_swh_filter_renderer_do_nothing(api_request_factory): input_data = {'a': 'some-data'} request = api_request_factory.get('/api/test/path/', data={}) setattr(request, 'query_params', request.GET) actual_data = filter_by_fields(request, input_data) assert actual_data == input_data -def test_swh_filter_renderer_do_filter(mocker): +def test_swh_filter_renderer_do_filter(mocker, api_request_factory): mock_ffk = mocker.patch('swh.web.api.apiresponse.utils.filter_field_keys') mock_ffk.return_value = {'a': 'some-data'} request = api_request_factory.get('/api/test/path/', data={'fields': 'a,c'}) setattr(request, 'query_params', request.GET) input_data = {'a': 'some-data', 'b': 'some-other-data'} actual_data = filter_by_fields(request, input_data) assert actual_data == {'a': 'some-data'} mock_ffk.assert_called_once_with(input_data, {'a', 'c'}) diff --git a/swh/web/tests/api/views/test_content.py b/swh/web/tests/api/views/test_content.py index 2a23db96..0c490ee8 100644 --- a/swh/web/tests/api/views/test_content.py +++ b/swh/web/tests/api/views/test_content.py @@ -1,380 +1,381 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from hypothesis import given from swh.web.common.utils import reverse from swh.web.tests.data import random_content from swh.web.tests.strategies import content, contents_with_ctags from swh.web.tests.conftest import ctags_json_missing, fossology_missing @given(content()) def test_api_content_filetype(api_client, indexer_data, content): indexer_data.content_add_mimetype(content['sha1']) url = reverse('api-1-content-filetype', url_args={'q': 'sha1_git:%s' % content['sha1_git']}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' content_url = reverse('api-1-content', url_args={'q': 'sha1:%s' % content['sha1']}) expected_data = indexer_data.content_get_mimetype(content['sha1']) expected_data['content_url'] = content_url assert rv.data == expected_data def test_api_content_filetype_sha_not_found(api_client): unknown_content_ = random_content() url = reverse('api-1-content-filetype', url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'No filetype information found for content ' 'sha1:%s.' % unknown_content_['sha1'] } @pytest.mark.skip # Language indexer is disabled @given(content()) def test_api_content_language(api_client, indexer_data, content): indexer_data.content_add_language(content['sha1']) url = reverse('api-1-content-language', url_args={'q': 'sha1_git:%s' % content['sha1_git']}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' content_url = reverse('api-1-content', url_args={'q': 'sha1:%s' % content['sha1']}) expected_data = indexer_data.content_get_language(content['sha1']) expected_data['content_url'] = content_url assert rv.data == expected_data def test_api_content_language_sha_not_found(api_client): unknown_content_ = random_content() url = reverse('api-1-content-language', url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'No language information found for content ' 'sha1:%s.' % unknown_content_['sha1'] } @pytest.mark.skip # Language indexer is disabled @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_api_content_symbol(api_client, indexer_data, contents_with_ctags): expected_data = {} for content_sha1 in contents_with_ctags['sha1s']: indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if ctag['name'] == contents_with_ctags['symbol_name']: expected_data[content_sha1] = ctag break url = reverse('api-1-content-symbol', url_args={'q': contents_with_ctags['symbol_name']}, query_params={'per_page': 100}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' for entry in rv.data: content_sha1 = entry['sha1'] expected_entry = expected_data[content_sha1] for key, view_name in (('content_url', 'api-1-content'), ('data_url', 'api-1-content-raw'), ('license_url', 'api-1-content-license'), ('language_url', 'api-1-content-language'), ('filetype_url', 'api-1-content-filetype')): expected_entry[key] = reverse( view_name, url_args={'q': 'sha1:%s' % content_sha1}) expected_entry['sha1'] = content_sha1 del expected_entry['id'] assert entry == expected_entry assert 'Link' not in rv url = reverse('api-1-content-symbol', url_args={'q': contents_with_ctags['symbol_name']}, query_params={'per_page': 2}) rv = api_client.get(url) - next_url = reverse('api-1-content-symbol', - url_args={'q': contents_with_ctags['symbol_name']}, - query_params={'last_sha1': rv.data[1]['sha1'], - 'per_page': 2}) + next_url = rv.wsgi_request.build_absolute_uri( + reverse('api-1-content-symbol', + url_args={'q': contents_with_ctags['symbol_name']}, + query_params={'last_sha1': rv.data[1]['sha1'], + 'per_page': 2})) assert rv['Link'] == '<%s>; rel="next"' % next_url def test_api_content_symbol_not_found(api_client): url = reverse('api-1-content-symbol', url_args={'q': 'bar'}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'No indexed raw content match expression \'bar\'.' } assert 'Link' not in rv @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(content()) def test_api_content_ctags(api_client, indexer_data, content): indexer_data.content_add_ctags(content['sha1']) url = reverse('api-1-content-ctags', url_args={'q': 'sha1_git:%s' % content['sha1_git']}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' content_url = reverse('api-1-content', url_args={'q': 'sha1:%s' % content['sha1']}) expected_data = list(indexer_data.content_get_ctags(content['sha1'])) for e in expected_data: e['content_url'] = content_url assert rv.data == expected_data @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_api_content_license(api_client, indexer_data, content): indexer_data.content_add_license(content['sha1']) url = reverse('api-1-content-license', url_args={'q': 'sha1_git:%s' % content['sha1_git']}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' content_url = reverse('api-1-content', url_args={'q': 'sha1:%s' % content['sha1']}) expected_data = indexer_data.content_get_license(content['sha1']) expected_data['content_url'] = content_url assert rv.data == expected_data def test_api_content_license_sha_not_found(api_client): unknown_content_ = random_content() url = reverse('api-1-content-license', url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'No license information found for content ' 'sha1:%s.' % unknown_content_['sha1'] } @given(content()) def test_api_content_metadata(api_client, archive_data, content): url = reverse('api-1-content', {'q': 'sha1:%s' % content['sha1']}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_data = archive_data.content_get_metadata(content['sha1']) for key, view_name in (('data_url', 'api-1-content-raw'), ('license_url', 'api-1-content-license'), ('language_url', 'api-1-content-language'), ('filetype_url', 'api-1-content-filetype')): expected_data[key] = reverse( view_name, url_args={'q': 'sha1:%s' % content['sha1']}) assert rv.data == expected_data def test_api_content_not_found_as_json(api_client): unknown_content_ = random_content() url = reverse('api-1-content', url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Content with sha1 checksum equals to %s not found!' % unknown_content_['sha1'] } def test_api_content_not_found_as_yaml(api_client): unknown_content_ = random_content() url = reverse('api-1-content', url_args={'q': 'sha256:%s' % unknown_content_['sha256']}) rv = api_client.get(url, HTTP_ACCEPT='application/yaml') assert rv.status_code == 404, rv.data assert 'application/yaml' in rv['Content-Type'] assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Content with sha256 checksum equals to %s not found!' % unknown_content_['sha256'] } def test_api_content_raw_ko_not_found(api_client): unknown_content_ = random_content() url = reverse('api-1-content-raw', url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Content with sha1 checksum equals to %s not found!' % unknown_content_['sha1'] } @given(content()) def test_api_content_raw_text(api_client, archive_data, content): url = reverse('api-1-content-raw', url_args={'q': 'sha1:%s' % content['sha1']}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/octet-stream' assert rv['Content-disposition'] == \ 'attachment; filename=content_sha1_%s_raw' % content['sha1'] assert rv['Content-Type'] == 'application/octet-stream' expected_data = archive_data.content_get(content['sha1']) assert rv.content == expected_data['data'] @given(content()) def test_api_content_raw_text_with_filename(api_client, archive_data, content): url = reverse('api-1-content-raw', url_args={'q': 'sha1:%s' % content['sha1']}, query_params={'filename': 'filename.txt'}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/octet-stream' assert rv['Content-disposition'] == \ 'attachment; filename=filename.txt' assert rv['Content-Type'] == 'application/octet-stream' expected_data = archive_data.content_get(content['sha1']) assert rv.content == expected_data['data'] @given(content()) def test_api_check_content_known(api_client, content): url = reverse('api-1-content-known', url_args={'q': content['sha1']}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'search_res': [ { 'found': True, 'sha1': content['sha1'] } ], 'search_stats': {'nbfiles': 1, 'pct': 100.0} } @given(content()) def test_api_check_content_known_as_yaml(api_client, content): url = reverse('api-1-content-known', url_args={'q': content['sha1']}) rv = api_client.get(url, HTTP_ACCEPT='application/yaml') assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/yaml' assert rv.data == { 'search_res': [ { 'found': True, 'sha1': content['sha1'] } ], 'search_stats': {'nbfiles': 1, 'pct': 100.0} } @given(content()) def test_api_check_content_known_post_as_yaml(api_client, content): url = reverse('api-1-content-known') rv = api_client.post(url, data={'q': content['sha1']}, HTTP_ACCEPT='application/yaml') assert rv.status_code == 200, rv.data assert 'application/yaml' in rv['Content-Type'] assert rv.data == { 'search_res': [ { 'found': True, 'sha1': content['sha1'] } ], 'search_stats': {'nbfiles': 1, 'pct': 100.0} } def test_api_check_content_known_not_found(api_client): unknown_content_ = random_content() url = reverse('api-1-content-known', url_args={'q': unknown_content_['sha1']}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'search_res': [ { 'found': False, 'sha1': unknown_content_['sha1'] } ], 'search_stats': {'nbfiles': 1, 'pct': 0.0} } @given(content()) def test_api_content_uppercase(api_client, content): url = reverse('api-1-content-uppercase-checksum', url_args={'q': content['sha1'].upper()}) rv = api_client.get(url) assert rv.status_code == 302, rv.data redirect_url = reverse('api-1-content', url_args={'q': content['sha1']}) assert rv['location'] == redirect_url diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py index 5558031b..f8c84a2d 100644 --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -1,271 +1,271 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import revision @given(revision()) def test_api_revision(api_client, archive_data, revision): url = reverse('api-1-revision', url_args={'sha1_git': revision}) rv = api_client.get(url) expected_revision = archive_data.revision_get(revision) _enrich_revision(expected_revision) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_revision def test_api_revision_not_found(api_client): unknown_revision_ = random_sha1() url = reverse('api-1-revision', url_args={'sha1_git': unknown_revision_}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_ } @given(revision()) def test_api_revision_raw_ok(api_client, archive_data, revision): url = reverse('api-1-revision-raw-message', url_args={'sha1_git': revision}) rv = api_client.get(url) expected_message = archive_data.revision_get(revision)['message'] assert rv.status_code == 200 assert rv['Content-Type'] == 'application/octet-stream' assert rv.content == expected_message.encode() def test_api_revision_raw_ko_no_rev(api_client): unknown_revision_ = random_sha1() url = reverse('api-1-revision-raw-message', url_args={'sha1_git': unknown_revision_}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_ } @given(revision()) def test_api_revision_log(api_client, archive_data, revision): per_page = 10 url = reverse('api-1-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) rv = api_client.get(url) expected_log = archive_data.revision_log(revision, limit=per_page+1) expected_log = list(map(_enrich_revision, expected_log)) has_next = len(expected_log) > per_page assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == (expected_log[:-1] if has_next else expected_log) if has_next: assert 'Link' in rv - next_log_url = reverse( - 'api-1-revision-log', - url_args={'sha1_git': expected_log[-1]['id']}, - query_params={'per_page': per_page}) + next_log_url = rv.wsgi_request.build_absolute_uri( + reverse('api-1-revision-log', + url_args={'sha1_git': expected_log[-1]['id']}, + query_params={'per_page': per_page})) assert next_log_url in rv['Link'] def test_api_revision_log_not_found(api_client): unknown_revision_ = random_sha1() url = reverse('api-1-revision-log', url_args={'sha1_git': unknown_revision_}) rv = api_client.get(url) assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_ } assert not rv.has_header('Link') @given(revision()) def test_api_revision_log_context(api_client, archive_data, revision): revisions = archive_data.revision_log(revision, limit=4) prev_rev = revisions[0]['id'] rev = revisions[-1]['id'] per_page = 10 url = reverse('api-1-revision-log', url_args={'sha1_git': rev, 'prev_sha1s': prev_rev}, query_params={'per_page': per_page}) rv = api_client.get(url) expected_log = archive_data.revision_log(rev, limit=per_page) prev_revision = archive_data.revision_get(prev_rev) expected_log.insert(0, prev_revision) expected_log = list(map(_enrich_revision, expected_log)) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_log def test_api_revision_directory_ko_not_found(api_client, mocker): mock_rev_dir = mocker.patch( 'swh.web.api.views.revision._revision_directory_by') mock_rev_dir.side_effect = NotFoundExc('Not found') rv = api_client.get('/api/1/revision/999/directory/some/path/to/dir/') assert rv.status_code == 404, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == { 'exception': 'NotFoundExc', 'reason': 'Not found' } mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False ) def test_api_revision_directory_ok_returns_dir_entries(api_client, mocker): mock_rev_dir = mocker.patch( 'swh.web.api.views.revision._revision_directory_by') stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', } ] } mock_rev_dir.return_value = stub_dir rv = api_client.get('/api/1/revision/999/directory/some/path/') assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == stub_dir mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False ) def test_api_revision_directory_ok_returns_content(api_client, mocker): mock_rev_dir = mocker.patch( 'swh.web.api.views.revision._revision_directory_by') stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } mock_rev_dir.return_value = stub_content url = '/api/1/revision/666/directory/some/other/path/' rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == stub_content mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) @given(revision()) def test_api_revision_uppercase(api_client, revision): url = reverse('api-1-revision-uppercase-checksum', url_args={'sha1_git': revision.upper()}) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse('api-1-revision', url_args={'sha1_git': revision}) assert resp['location'] == redirect_url def _enrich_revision(revision): directory_url = reverse( 'api-1-directory', url_args={'sha1_git': revision['directory']}) history_url = reverse('api-1-revision-log', url_args={'sha1_git': revision['id']}) parents_id_url = [] for p in revision['parents']: parents_id_url.append({ 'id': p, 'url': reverse('api-1-revision', url_args={'sha1_git': p}) }) revision_url = reverse('api-1-revision', url_args={'sha1_git': revision['id']}) revision['directory_url'] = directory_url revision['history_url'] = history_url revision['url'] = revision_url revision['parents'] = parents_id_url return revision diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py index 6fd03b81..bb6ccf05 100644 --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -1,194 +1,194 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import hash_to_hex from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( snapshot, new_snapshot ) @given(snapshot()) def test_api_snapshot(api_client, archive_data, snapshot): url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_data = archive_data.snapshot_get(snapshot) expected_data = _enrich_snapshot(archive_data, expected_data) assert rv.data == expected_data @given(snapshot()) def test_api_snapshot_paginated(api_client, archive_data, snapshot): branches_offset = 0 branches_count = 2 snapshot_branches = [] for k, v in sorted( archive_data.snapshot_get(snapshot)['branches'].items()): snapshot_branches.append({ 'name': k, 'target_type': v['target_type'], 'target': v['target'] }) whole_snapshot = {'id': snapshot, 'branches': {}, 'next_branch': None} while branches_offset < len(snapshot_branches): branches_from = snapshot_branches[branches_offset]['name'] url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}, query_params={'branches_from': branches_from, 'branches_count': branches_count}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' expected_data = archive_data.snapshot_get_branches( snapshot, branches_from, branches_count) expected_data = _enrich_snapshot(archive_data, expected_data) branches_offset += branches_count if branches_offset < len(snapshot_branches): next_branch = snapshot_branches[branches_offset]['name'] expected_data['next_branch'] = next_branch else: expected_data['next_branch'] = None assert rv.data == expected_data whole_snapshot['branches'].update(expected_data['branches']) if branches_offset < len(snapshot_branches): - next_url = reverse( - 'api-1-snapshot', - url_args={'snapshot_id': snapshot}, - query_params={'branches_from': next_branch, - 'branches_count': branches_count}) + next_url = rv.wsgi_request.build_absolute_uri( + reverse('api-1-snapshot', + url_args={'snapshot_id': snapshot}, + query_params={'branches_from': next_branch, + 'branches_count': branches_count})) assert rv['Link'] == '<%s>; rel="next"' % next_url else: assert not rv.has_header('Link') url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}) rv = api_client.get(url) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == whole_snapshot @given(snapshot()) def test_api_snapshot_filtered(api_client, archive_data, snapshot): snapshot_branches = [] for k, v in sorted( archive_data.snapshot_get(snapshot)['branches'].items()): snapshot_branches.append({ 'name': k, 'target_type': v['target_type'], 'target': v['target'] }) target_type = random.choice(snapshot_branches)['target_type'] url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}, query_params={'target_types': target_type}) rv = api_client.get(url) expected_data = archive_data.snapshot_get_branches( snapshot, target_types=target_type) expected_data = _enrich_snapshot(archive_data, expected_data) assert rv.status_code == 200, rv.data assert rv['Content-Type'] == 'application/json' assert rv.data == expected_data def test_api_snapshot_errors(api_client): unknown_snapshot_ = random_sha1() url = reverse('api-1-snapshot', url_args={'snapshot_id': '63ce369'}) rv = api_client.get(url) assert rv.status_code == 400, rv.data url = reverse('api-1-snapshot', url_args={'snapshot_id': unknown_snapshot_}) rv = api_client.get(url) assert rv.status_code == 404, rv.data @given(snapshot()) def test_api_snapshot_uppercase(api_client, snapshot): url = reverse('api-1-snapshot-uppercase-checksum', url_args={'snapshot_id': snapshot.upper()}) resp = api_client.get(url) assert resp.status_code == 302 redirect_url = reverse('api-1-snapshot-uppercase-checksum', url_args={'snapshot_id': snapshot}) assert resp['location'] == redirect_url @given(new_snapshot(min_size=4)) def test_api_snapshot_null_branch(api_client, archive_data, new_snapshot): snp_dict = new_snapshot.to_dict() snp_id = hash_to_hex(snp_dict['id']) for branch in snp_dict['branches'].keys(): snp_dict['branches'][branch] = None break archive_data.snapshot_add([snp_dict]) url = reverse('api-1-snapshot', url_args={'snapshot_id': snp_id}) rv = api_client.get(url) assert rv.status_code == 200, rv.data def _enrich_snapshot(archive_data, snapshot): def _get_branch_url(target_type, target): url = None if target_type == 'revision': url = reverse('api-1-revision', url_args={'sha1_git': target}) if target_type == 'release': url = reverse('api-1-release', url_args={'sha1_git': target}) return url for branch in snapshot['branches'].keys(): target = snapshot['branches'][branch]['target'] target_type = snapshot['branches'][branch]['target_type'] snapshot['branches'][branch]['target_url'] = \ _get_branch_url(target_type, target) for branch in snapshot['branches'].keys(): target = snapshot['branches'][branch]['target'] target_type = snapshot['branches'][branch]['target_type'] if target_type == 'alias': if target in snapshot['branches']: snapshot['branches'][branch]['target_url'] = \ snapshot['branches'][target]['target_url'] else: snp = archive_data.snapshot_get_branches(snapshot['id'], branches_from=target, branches_count=1) alias_target = snp['branches'][target]['target'] alias_target_type = snp['branches'][target]['target_type'] snapshot['branches'][branch]['target_url'] = \ _get_branch_url(alias_target_type, alias_target) return snapshot diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index 1139d237..28d621b5 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,290 +1,296 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import inspect import json import os import shutil from subprocess import run, PIPE import pytest from django.core.cache import cache from hypothesis import settings, HealthCheck -from rest_framework.test import APIClient +from rest_framework.test import APIClient, APIRequestFactory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.web.common import converters from swh.web.tests.data import get_tests_data, override_storages # Used to skip some tests ctags_json_missing = ( shutil.which('ctags') is None or b'+json' not in run(['ctags', '--version'], stdout=PIPE).stdout ) fossology_missing = shutil.which('nomossa') is None # Register some hypothesis profiles settings.register_profile('default', settings()) settings.register_profile( 'swh-web', settings(deadline=None, suppress_health_check=[HealthCheck.too_slow, HealthCheck.filter_too_much])) settings.register_profile( 'swh-web-fast', settings(deadline=None, max_examples=1, suppress_health_check=[HealthCheck.too_slow, HealthCheck.filter_too_much])) def pytest_configure(config): # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) static_dir = os.path.join(test_dir, '../static') webpack_stats = os.path.join(static_dir, 'webpack-stats.json') if os.path.exists(webpack_stats): return bundles_dir = os.path.join(test_dir, '../assets/src/bundles') _, dirs, _ = next(os.walk(bundles_dir)) mock_webpack_stats = { 'status': 'done', 'publicPath': '/static', 'chunks': {} } for bundle in dirs: asset = 'js/%s.js' % bundle mock_webpack_stats['chunks'][bundle] = [{ 'name': asset, 'publicPath': '/static/%s' % asset, 'path': os.path.join(static_dir, asset) }] with open(webpack_stats, 'w') as outfile: json.dump(mock_webpack_stats, outfile) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Fixture to get test client from Django REST Framework @pytest.fixture(scope='module') def api_client(): return APIClient() +# Fixture to get API request factory from Django REST Framework +@pytest.fixture(scope='module') +def api_request_factory(): + return APIRequestFactory() + + # Initialize tests data @pytest.fixture(autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages(data['storage'], data['idx_storage']) return data # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), 'resources') class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data['storage'] def _call_storage_method(method): def call_storage_method(*args, **kwargs): return method(*args, **kwargs) return call_storage_method # Forward calls to non overridden Storage methods to wrapped # storage instance for method_name, method in inspect.getmembers( self.storage, predicate=inspect.ismethod): if (not hasattr(self, method_name) and not method_name.startswith('_')): setattr(self, method_name, _call_storage_method(method)) def content_find(self, content): cnt_ids_bytes = {algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash)} cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0]) if cnt else cnt def content_get_metadata(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) metadata = next(self.storage.content_get_metadata([cnt_id_bytes])) return converters.from_swh(metadata, hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'}) def content_get(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) cnt = next(self.storage.content_get([cnt_id_bytes])) return converters.from_content(cnt) def directory_get(self, dir_id): return { 'id': dir_id, 'content': self.directory_ls(dir_id) } def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map(converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes)) return list(dir_content) def release_get(self, rel_id): rel_id_bytes = hash_to_bytes(rel_id) rel_data = next(self.storage.release_get([rel_id_bytes])) return converters.from_release(rel_data) def revision_get(self, rev_id): rev_id_bytes = hash_to_bytes(rev_id) rev_data = next(self.storage.revision_get([rev_id_bytes])) return converters.from_revision(rev_data) def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list(map(converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit))) def snapshot_get_latest(self, origin_url): snp = self.storage.snapshot_get_latest(origin_url) return converters.from_snapshot(snp) def origin_get(self, origin_info): origin = self.storage.origin_get(origin_info) return converters.from_origin(origin) def origin_visit_get(self, origin_url): visits = self.storage.origin_visit_get(origin_url) return list(map(converters.from_origin_visit, visits)) def origin_visit_get_by(self, origin_url, visit_id): visit = self.storage.origin_visit_get_by(origin_url, visit_id) return converters.from_origin_visit(visit) def snapshot_get(self, snapshot_id): snp = self.storage.snapshot_get(hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp) def snapshot_get_branches(self, snapshot_id, branches_from='', branches_count=1000, target_types=None): snp = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types) return converters.from_snapshot(snp) def snapshot_get_head(self, snapshot): if snapshot['branches']['HEAD']['target_type'] == 'alias': target = snapshot['branches']['HEAD']['target'] head = snapshot['branches'][target]['target'] else: head = snapshot['branches']['HEAD']['target'] return head class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data['idx_storage'] self.mimetype_indexer = tests_data['mimetype_indexer'] self.license_indexer = tests_data['license_indexer'] self.ctags_indexer = tests_data['ctags_indexer'] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') def content_get_mimetype(self, cnt_id): mimetype = next(self.idx_storage.content_mimetype_get( [hash_to_bytes(cnt_id)])) return converters.from_filetype(mimetype) def content_add_language(self, cnt_id): raise NotImplementedError('Language indexer is disabled.') self.language_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') def content_get_language(self, cnt_id): lang = next(self.idx_storage.content_language_get( [hash_to_bytes(cnt_id)])) return converters.from_swh(lang, hashess={'id'}) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) lic = next(self.idx_storage.content_fossology_license_get( [cnt_id_bytes])) return converters.from_swh({'id': cnt_id_bytes, 'facts': lic[cnt_id_bytes]}, hashess={'id'}) def content_add_ctags(self, cnt_id): self.ctags_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'})