Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F11023701
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
53 KB
Subscribers
None
View Options
diff --git a/swh/web/api/apiresponse.py b/swh/web/api/apiresponse.py
index 1d47b513..551346bb 100644
--- a/swh/web/api/apiresponse.py
+++ b/swh/web/api/apiresponse.py
@@ -1,192 +1,193 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import traceback
from django.utils.html import escape
from rest_framework.response import Response
from swh.storage.exc import StorageDBError, StorageAPIError
from swh.web.api import utils
from swh.web.common.exc import NotFoundExc, ForbiddenExc, BadInputExc
from swh.web.common.utils import shorten_path, gen_path_info
from swh.web.config import get_config
-def compute_link_header(rv, options):
+def compute_link_header(request, rv, options):
"""Add Link header in returned value results.
Args:
+ request: a DRF Request object
rv (dict): dictionary with keys:
- headers: potential headers with 'link-next' and 'link-prev'
keys
- results: containing the result to return
options (dict): the initial dict to update with result if any
Returns:
dict: dictionary with optional keys 'link-next' and 'link-prev'
"""
link_headers = []
if 'headers' not in rv:
return {}
rv_headers = rv['headers']
if 'link-next' in rv_headers:
link_headers.append('<%s>; rel="next"' % (
- rv_headers['link-next']))
+ request.build_absolute_uri(rv_headers['link-next'])))
if 'link-prev' in rv_headers:
link_headers.append('<%s>; rel="previous"' % (
- rv_headers['link-prev']))
+ request.build_absolute_uri(rv_headers['link-prev'])))
if link_headers:
link_header_str = ','.join(link_headers)
headers = options.get('headers', {})
headers.update({
'Link': link_header_str
})
return headers
return {}
def filter_by_fields(request, data):
"""Extract a request parameter 'fields' if it exists to permit the filtering on
the data dict's keys.
If such field is not provided, returns the data as is.
"""
fields = request.query_params.get('fields')
if fields:
fields = set(fields.split(','))
data = utils.filter_field_keys(data, fields)
return data
def transform(rv):
"""Transform an eventual returned value with multiple layer of
information with only what's necessary.
If the returned value rv contains the 'results' key, this is the
associated value which is returned.
Otherwise, return the initial dict without the potential 'headers'
key.
"""
if 'results' in rv:
return rv['results']
if 'headers' in rv:
rv.pop('headers')
return rv
def make_api_response(request, data, doc_data={}, options={}):
"""Generates an API response based on the requested mimetype.
Args:
request: a DRF Request object
data: raw data to return in the API response
doc_data: documentation data for HTML response
options: optional data that can be used to generate the response
Returns:
a DRF Response a object
"""
if data:
- options['headers'] = compute_link_header(data, options)
+ options['headers'] = compute_link_header(request, data, options)
data = transform(data)
data = filter_by_fields(request, data)
doc_env = doc_data
headers = {}
if 'headers' in options:
doc_env['headers_data'] = options['headers']
headers = options['headers']
# get request status code
doc_env['status_code'] = options.get('status', 200)
response_args = {'status': doc_env['status_code'],
'headers': headers,
'content_type': request.accepted_media_type}
# when requesting HTML, typically when browsing the API through its
# documented views, we need to enrich the input data with documentation
# related ones and inform DRF that we request HTML template rendering
if request.accepted_media_type == 'text/html':
if data:
data = json.dumps(data, sort_keys=True,
indent=4,
separators=(',', ': '))
doc_env['response_data'] = data
doc_env['request'] = {
'path': request.path,
'method': request.method,
'absolute_uri': request.build_absolute_uri(),
}
doc_env['heading'] = shorten_path(str(request.path))
if 'route' in doc_env:
doc_env['endpoint_path'] = gen_path_info(doc_env['route'])
response_args['data'] = doc_env
response_args['template_name'] = 'api/apidoc.html'
# otherwise simply return the raw data and let DRF picks
# the correct renderer (JSON or YAML)
else:
response_args['data'] = data
return Response(**response_args)
def error_response(request, error, doc_data):
"""Private function to create a custom error response.
Args:
request: a DRF Request object
error: the exception that caused the error
doc_data: documentation data for HTML response
"""
error_code = 500
if isinstance(error, BadInputExc):
error_code = 400
elif isinstance(error, NotFoundExc):
error_code = 404
elif isinstance(error, ForbiddenExc):
error_code = 403
elif isinstance(error, StorageDBError):
error_code = 503
elif isinstance(error, StorageAPIError):
error_code = 503
error_opts = {'status': error_code}
error_data = {
'exception': error.__class__.__name__,
'reason': str(error),
}
if request.accepted_media_type == 'text/html':
error_data['reason'] = escape(error_data['reason'])
if get_config()['debug']:
error_data['traceback'] = traceback.format_exc()
return make_api_response(request, error_data, doc_data,
options=error_opts)
diff --git a/swh/web/tests/api/test_apiresponse.py b/swh/web/tests/api/test_apiresponse.py
index 8c4cfe39..6ec7ecd0 100644
--- a/swh/web/tests/api/test_apiresponse.py
+++ b/swh/web/tests/api/test_apiresponse.py
@@ -1,152 +1,159 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
-from rest_framework.test import APIRequestFactory
-
from swh.web.api.apiresponse import (
compute_link_header, transform, make_api_response,
filter_by_fields
)
-api_request_factory = APIRequestFactory()
-
-def test_compute_link_header():
+def test_compute_link_header(api_request_factory):
+ next_link = '/api/endpoint/next'
+ prev_link = '/api/endpoint/prev'
rv = {
- 'headers': {'link-next': 'foo', 'link-prev': 'bar'},
+ 'headers': {'link-next': next_link, 'link-prev': prev_link},
'results': [1, 2, 3]
}
options = {}
- headers = compute_link_header(rv, options)
+ request = api_request_factory.get('/api/endpoint/')
+
+ headers = compute_link_header(request, rv, options)
- assert headers == {'Link': '<foo>; rel="next",<bar>; rel="previous"'}
+ assert headers == {
+ 'Link': (f'<{request.build_absolute_uri(next_link)}>; rel="next",'
+ f'<{request.build_absolute_uri(prev_link)}>; rel="previous"')
+ }
-def test_compute_link_header_nothing_changed():
+def test_compute_link_header_nothing_changed(api_request_factory):
rv = {}
options = {}
- headers = compute_link_header(rv, options)
+ request = api_request_factory.get('/api/test/path/')
+
+ headers = compute_link_header(request, rv, options)
assert headers == {}
-def test_compute_link_header_nothing_changed_2():
+def test_compute_link_header_nothing_changed_2(api_request_factory):
rv = {'headers': {}}
options = {}
- headers = compute_link_header(rv, options)
+ request = api_request_factory.get('/api/test/path/')
+
+ headers = compute_link_header(request, rv, options)
assert headers == {}
def test_transform_only_return_results_1():
rv = {'results': {'some-key': 'some-value'}}
assert transform(rv) == {'some-key': 'some-value'}
def test_transform_only_return_results_2():
rv = {'headers': {'something': 'do changes'},
'results': {'some-key': 'some-value'}}
assert transform(rv) == {'some-key': 'some-value'}
def test_transform_do_remove_headers():
rv = {'headers': {'something': 'do changes'},
'some-key': 'some-value'}
assert transform(rv) == {'some-key': 'some-value'}
def test_transform_do_nothing():
rv = {'some-key': 'some-value'}
assert transform(rv) == {'some-key': 'some-value'}
-def test_swh_multi_response_mimetype(mocker):
+def test_swh_multi_response_mimetype(mocker, api_request_factory):
mock_shorten_path = mocker.patch('swh.web.api.apiresponse.shorten_path')
mock_filter = mocker.patch('swh.web.api.apiresponse.filter_by_fields')
mock_json = mocker.patch('swh.web.api.apiresponse.json')
data = {
'data': [12, 34],
'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
}
mock_filter.return_value = data
mock_shorten_path.return_value = 'my_short_path'
accepted_response_formats = {'html': 'text/html',
'yaml': 'application/yaml',
'json': 'application/json'}
for format in accepted_response_formats:
request = api_request_factory.get('/api/test/path/')
mime_type = accepted_response_formats[format]
setattr(request, 'accepted_media_type', mime_type)
if mime_type == 'text/html':
expected_data = {
'response_data': json.dumps(data),
'request': {
'path': request.path,
'method': request.method,
'absolute_uri': request.build_absolute_uri()
},
'headers_data': {},
'heading': 'my_short_path',
'status_code': 200
}
mock_json.dumps.return_value = json.dumps(data)
else:
expected_data = data
rv = make_api_response(request, data)
mock_filter.assert_called_with(request, data)
assert rv.status_code == 200, rv.data
assert rv.data == expected_data
if mime_type == 'text/html':
assert rv.template_name == 'api/apidoc.html'
-def test_swh_filter_renderer_do_nothing():
+def test_swh_filter_renderer_do_nothing(api_request_factory):
input_data = {'a': 'some-data'}
request = api_request_factory.get('/api/test/path/', data={})
setattr(request, 'query_params', request.GET)
actual_data = filter_by_fields(request, input_data)
assert actual_data == input_data
-def test_swh_filter_renderer_do_filter(mocker):
+def test_swh_filter_renderer_do_filter(mocker, api_request_factory):
mock_ffk = mocker.patch('swh.web.api.apiresponse.utils.filter_field_keys')
mock_ffk.return_value = {'a': 'some-data'}
request = api_request_factory.get('/api/test/path/',
data={'fields': 'a,c'})
setattr(request, 'query_params', request.GET)
input_data = {'a': 'some-data',
'b': 'some-other-data'}
actual_data = filter_by_fields(request, input_data)
assert actual_data == {'a': 'some-data'}
mock_ffk.assert_called_once_with(input_data, {'a', 'c'})
diff --git a/swh/web/tests/api/views/test_content.py b/swh/web/tests/api/views/test_content.py
index 2a23db96..0c490ee8 100644
--- a/swh/web/tests/api/views/test_content.py
+++ b/swh/web/tests/api/views/test_content.py
@@ -1,380 +1,381 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from hypothesis import given
from swh.web.common.utils import reverse
from swh.web.tests.data import random_content
from swh.web.tests.strategies import content, contents_with_ctags
from swh.web.tests.conftest import ctags_json_missing, fossology_missing
@given(content())
def test_api_content_filetype(api_client, indexer_data, content):
indexer_data.content_add_mimetype(content['sha1'])
url = reverse('api-1-content-filetype',
url_args={'q': 'sha1_git:%s' % content['sha1_git']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
content_url = reverse('api-1-content',
url_args={'q': 'sha1:%s' % content['sha1']})
expected_data = indexer_data.content_get_mimetype(content['sha1'])
expected_data['content_url'] = content_url
assert rv.data == expected_data
def test_api_content_filetype_sha_not_found(api_client):
unknown_content_ = random_content()
url = reverse('api-1-content-filetype',
url_args={'q': 'sha1:%s' % unknown_content_['sha1']})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'No filetype information found for content '
'sha1:%s.' % unknown_content_['sha1']
}
@pytest.mark.skip # Language indexer is disabled
@given(content())
def test_api_content_language(api_client, indexer_data, content):
indexer_data.content_add_language(content['sha1'])
url = reverse('api-1-content-language',
url_args={'q': 'sha1_git:%s' % content['sha1_git']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
content_url = reverse('api-1-content',
url_args={'q': 'sha1:%s' % content['sha1']})
expected_data = indexer_data.content_get_language(content['sha1'])
expected_data['content_url'] = content_url
assert rv.data == expected_data
def test_api_content_language_sha_not_found(api_client):
unknown_content_ = random_content()
url = reverse('api-1-content-language',
url_args={'q': 'sha1:%s' % unknown_content_['sha1']})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'No language information found for content '
'sha1:%s.' % unknown_content_['sha1']
}
@pytest.mark.skip # Language indexer is disabled
@pytest.mark.skipif(ctags_json_missing,
reason="requires ctags with json output support")
@given(contents_with_ctags())
def test_api_content_symbol(api_client, indexer_data, contents_with_ctags):
expected_data = {}
for content_sha1 in contents_with_ctags['sha1s']:
indexer_data.content_add_ctags(content_sha1)
for ctag in indexer_data.content_get_ctags(content_sha1):
if ctag['name'] == contents_with_ctags['symbol_name']:
expected_data[content_sha1] = ctag
break
url = reverse('api-1-content-symbol',
url_args={'q': contents_with_ctags['symbol_name']},
query_params={'per_page': 100})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
for entry in rv.data:
content_sha1 = entry['sha1']
expected_entry = expected_data[content_sha1]
for key, view_name in (('content_url', 'api-1-content'),
('data_url', 'api-1-content-raw'),
('license_url', 'api-1-content-license'),
('language_url', 'api-1-content-language'),
('filetype_url', 'api-1-content-filetype')):
expected_entry[key] = reverse(
view_name, url_args={'q': 'sha1:%s' % content_sha1})
expected_entry['sha1'] = content_sha1
del expected_entry['id']
assert entry == expected_entry
assert 'Link' not in rv
url = reverse('api-1-content-symbol',
url_args={'q': contents_with_ctags['symbol_name']},
query_params={'per_page': 2})
rv = api_client.get(url)
- next_url = reverse('api-1-content-symbol',
- url_args={'q': contents_with_ctags['symbol_name']},
- query_params={'last_sha1': rv.data[1]['sha1'],
- 'per_page': 2})
+ next_url = rv.wsgi_request.build_absolute_uri(
+ reverse('api-1-content-symbol',
+ url_args={'q': contents_with_ctags['symbol_name']},
+ query_params={'last_sha1': rv.data[1]['sha1'],
+ 'per_page': 2}))
assert rv['Link'] == '<%s>; rel="next"' % next_url
def test_api_content_symbol_not_found(api_client):
url = reverse('api-1-content-symbol', url_args={'q': 'bar'})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'No indexed raw content match expression \'bar\'.'
}
assert 'Link' not in rv
@pytest.mark.skipif(ctags_json_missing,
reason="requires ctags with json output support")
@given(content())
def test_api_content_ctags(api_client, indexer_data, content):
indexer_data.content_add_ctags(content['sha1'])
url = reverse('api-1-content-ctags',
url_args={'q': 'sha1_git:%s' % content['sha1_git']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
content_url = reverse('api-1-content',
url_args={'q': 'sha1:%s' % content['sha1']})
expected_data = list(indexer_data.content_get_ctags(content['sha1']))
for e in expected_data:
e['content_url'] = content_url
assert rv.data == expected_data
@pytest.mark.skipif(fossology_missing,
reason="requires fossology-nomossa installed")
@given(content())
def test_api_content_license(api_client, indexer_data, content):
indexer_data.content_add_license(content['sha1'])
url = reverse('api-1-content-license',
url_args={'q': 'sha1_git:%s' % content['sha1_git']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
content_url = reverse('api-1-content',
url_args={'q': 'sha1:%s' % content['sha1']})
expected_data = indexer_data.content_get_license(content['sha1'])
expected_data['content_url'] = content_url
assert rv.data == expected_data
def test_api_content_license_sha_not_found(api_client):
unknown_content_ = random_content()
url = reverse('api-1-content-license',
url_args={'q': 'sha1:%s' % unknown_content_['sha1']})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'No license information found for content '
'sha1:%s.' % unknown_content_['sha1']
}
@given(content())
def test_api_content_metadata(api_client, archive_data, content):
url = reverse('api-1-content', {'q': 'sha1:%s' % content['sha1']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
expected_data = archive_data.content_get_metadata(content['sha1'])
for key, view_name in (('data_url', 'api-1-content-raw'),
('license_url', 'api-1-content-license'),
('language_url', 'api-1-content-language'),
('filetype_url', 'api-1-content-filetype')):
expected_data[key] = reverse(
view_name, url_args={'q': 'sha1:%s' % content['sha1']})
assert rv.data == expected_data
def test_api_content_not_found_as_json(api_client):
unknown_content_ = random_content()
url = reverse('api-1-content',
url_args={'q': 'sha1:%s' % unknown_content_['sha1']})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Content with sha1 checksum equals to %s not found!'
% unknown_content_['sha1']
}
def test_api_content_not_found_as_yaml(api_client):
unknown_content_ = random_content()
url = reverse('api-1-content',
url_args={'q': 'sha256:%s' % unknown_content_['sha256']})
rv = api_client.get(url, HTTP_ACCEPT='application/yaml')
assert rv.status_code == 404, rv.data
assert 'application/yaml' in rv['Content-Type']
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Content with sha256 checksum equals to %s not found!' %
unknown_content_['sha256']
}
def test_api_content_raw_ko_not_found(api_client):
unknown_content_ = random_content()
url = reverse('api-1-content-raw',
url_args={'q': 'sha1:%s' % unknown_content_['sha1']})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Content with sha1 checksum equals to %s not found!' %
unknown_content_['sha1']
}
@given(content())
def test_api_content_raw_text(api_client, archive_data, content):
url = reverse('api-1-content-raw',
url_args={'q': 'sha1:%s' % content['sha1']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/octet-stream'
assert rv['Content-disposition'] == \
'attachment; filename=content_sha1_%s_raw' % content['sha1']
assert rv['Content-Type'] == 'application/octet-stream'
expected_data = archive_data.content_get(content['sha1'])
assert rv.content == expected_data['data']
@given(content())
def test_api_content_raw_text_with_filename(api_client, archive_data, content):
url = reverse('api-1-content-raw',
url_args={'q': 'sha1:%s' % content['sha1']},
query_params={'filename': 'filename.txt'})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/octet-stream'
assert rv['Content-disposition'] == \
'attachment; filename=filename.txt'
assert rv['Content-Type'] == 'application/octet-stream'
expected_data = archive_data.content_get(content['sha1'])
assert rv.content == expected_data['data']
@given(content())
def test_api_check_content_known(api_client, content):
url = reverse('api-1-content-known',
url_args={'q': content['sha1']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'search_res': [
{
'found': True,
'sha1': content['sha1']
}
],
'search_stats': {'nbfiles': 1, 'pct': 100.0}
}
@given(content())
def test_api_check_content_known_as_yaml(api_client, content):
url = reverse('api-1-content-known',
url_args={'q': content['sha1']})
rv = api_client.get(url, HTTP_ACCEPT='application/yaml')
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/yaml'
assert rv.data == {
'search_res': [
{
'found': True,
'sha1': content['sha1']
}
],
'search_stats': {'nbfiles': 1, 'pct': 100.0}
}
@given(content())
def test_api_check_content_known_post_as_yaml(api_client, content):
url = reverse('api-1-content-known')
rv = api_client.post(url, data={'q': content['sha1']},
HTTP_ACCEPT='application/yaml')
assert rv.status_code == 200, rv.data
assert 'application/yaml' in rv['Content-Type']
assert rv.data == {
'search_res': [
{
'found': True,
'sha1': content['sha1']
}
],
'search_stats': {'nbfiles': 1, 'pct': 100.0}
}
def test_api_check_content_known_not_found(api_client):
unknown_content_ = random_content()
url = reverse('api-1-content-known',
url_args={'q': unknown_content_['sha1']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'search_res': [
{
'found': False,
'sha1': unknown_content_['sha1']
}
],
'search_stats': {'nbfiles': 1, 'pct': 0.0}
}
@given(content())
def test_api_content_uppercase(api_client, content):
url = reverse('api-1-content-uppercase-checksum',
url_args={'q': content['sha1'].upper()})
rv = api_client.get(url)
assert rv.status_code == 302, rv.data
redirect_url = reverse('api-1-content',
url_args={'q': content['sha1']})
assert rv['location'] == redirect_url
diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py
index 5558031b..f8c84a2d 100644
--- a/swh/web/tests/api/views/test_revision.py
+++ b/swh/web/tests/api/views/test_revision.py
@@ -1,271 +1,271 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from hypothesis import given
from swh.web.common.exc import NotFoundExc
from swh.web.common.utils import reverse
from swh.web.tests.data import random_sha1
from swh.web.tests.strategies import revision
@given(revision())
def test_api_revision(api_client, archive_data, revision):
url = reverse('api-1-revision', url_args={'sha1_git': revision})
rv = api_client.get(url)
expected_revision = archive_data.revision_get(revision)
_enrich_revision(expected_revision)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == expected_revision
def test_api_revision_not_found(api_client):
unknown_revision_ = random_sha1()
url = reverse('api-1-revision',
url_args={'sha1_git': unknown_revision_})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Revision with sha1_git %s not found.' % unknown_revision_
}
@given(revision())
def test_api_revision_raw_ok(api_client, archive_data, revision):
url = reverse('api-1-revision-raw-message',
url_args={'sha1_git': revision})
rv = api_client.get(url)
expected_message = archive_data.revision_get(revision)['message']
assert rv.status_code == 200
assert rv['Content-Type'] == 'application/octet-stream'
assert rv.content == expected_message.encode()
def test_api_revision_raw_ko_no_rev(api_client):
unknown_revision_ = random_sha1()
url = reverse('api-1-revision-raw-message',
url_args={'sha1_git': unknown_revision_})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Revision with sha1_git %s not found.' % unknown_revision_
}
@given(revision())
def test_api_revision_log(api_client, archive_data, revision):
per_page = 10
url = reverse('api-1-revision-log', url_args={'sha1_git': revision},
query_params={'per_page': per_page})
rv = api_client.get(url)
expected_log = archive_data.revision_log(revision, limit=per_page+1)
expected_log = list(map(_enrich_revision, expected_log))
has_next = len(expected_log) > per_page
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == (expected_log[:-1] if has_next else expected_log)
if has_next:
assert 'Link' in rv
- next_log_url = reverse(
- 'api-1-revision-log',
- url_args={'sha1_git': expected_log[-1]['id']},
- query_params={'per_page': per_page})
+ next_log_url = rv.wsgi_request.build_absolute_uri(
+ reverse('api-1-revision-log',
+ url_args={'sha1_git': expected_log[-1]['id']},
+ query_params={'per_page': per_page}))
assert next_log_url in rv['Link']
def test_api_revision_log_not_found(api_client):
unknown_revision_ = random_sha1()
url = reverse('api-1-revision-log',
url_args={'sha1_git': unknown_revision_})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Revision with sha1_git %s not found.' % unknown_revision_
}
assert not rv.has_header('Link')
@given(revision())
def test_api_revision_log_context(api_client, archive_data, revision):
revisions = archive_data.revision_log(revision, limit=4)
prev_rev = revisions[0]['id']
rev = revisions[-1]['id']
per_page = 10
url = reverse('api-1-revision-log',
url_args={'sha1_git': rev,
'prev_sha1s': prev_rev},
query_params={'per_page': per_page})
rv = api_client.get(url)
expected_log = archive_data.revision_log(rev, limit=per_page)
prev_revision = archive_data.revision_get(prev_rev)
expected_log.insert(0, prev_revision)
expected_log = list(map(_enrich_revision, expected_log))
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == expected_log
def test_api_revision_directory_ko_not_found(api_client, mocker):
mock_rev_dir = mocker.patch(
'swh.web.api.views.revision._revision_directory_by')
mock_rev_dir.side_effect = NotFoundExc('Not found')
rv = api_client.get('/api/1/revision/999/directory/some/path/to/dir/')
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Not found'
}
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path/to/dir',
'/api/1/revision/999/directory/some/path/to/dir/',
with_data=False
)
def test_api_revision_directory_ok_returns_dir_entries(api_client, mocker):
mock_rev_dir = mocker.patch(
'swh.web.api.views.revision._revision_directory_by')
stub_dir = {
'type': 'dir',
'revision': '999',
'content': [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'target_url': '/api/1/content/sha1_git:101/',
'name': 'somefile',
'file_url': '/api/1/revision/999/directory/some/path/'
'somefile/'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'target_url': '/api/1/directory/456/',
'name': 'to-subdir',
'dir_url': '/api/1/revision/999/directory/some/path/'
'to-subdir/',
}
]
}
mock_rev_dir.return_value = stub_dir
rv = api_client.get('/api/1/revision/999/directory/some/path/')
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == stub_dir
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path',
'/api/1/revision/999/directory/some/path/',
with_data=False
)
def test_api_revision_directory_ok_returns_content(api_client, mocker):
mock_rev_dir = mocker.patch(
'swh.web.api.views.revision._revision_directory_by')
stub_content = {
'type': 'file',
'revision': '999',
'content': {
'sha1_git': '789',
'sha1': '101',
'data_url': '/api/1/content/101/raw/',
}
}
mock_rev_dir.return_value = stub_content
url = '/api/1/revision/666/directory/some/other/path/'
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == stub_content
mock_rev_dir.assert_called_once_with(
{'sha1_git': '666'}, 'some/other/path', url, with_data=False)
@given(revision())
def test_api_revision_uppercase(api_client, revision):
url = reverse('api-1-revision-uppercase-checksum',
url_args={'sha1_git': revision.upper()})
resp = api_client.get(url)
assert resp.status_code == 302
redirect_url = reverse('api-1-revision',
url_args={'sha1_git': revision})
assert resp['location'] == redirect_url
def _enrich_revision(revision):
directory_url = reverse(
'api-1-directory',
url_args={'sha1_git': revision['directory']})
history_url = reverse('api-1-revision-log',
url_args={'sha1_git': revision['id']})
parents_id_url = []
for p in revision['parents']:
parents_id_url.append({
'id': p,
'url': reverse('api-1-revision', url_args={'sha1_git': p})
})
revision_url = reverse('api-1-revision',
url_args={'sha1_git': revision['id']})
revision['directory_url'] = directory_url
revision['history_url'] = history_url
revision['url'] = revision_url
revision['parents'] = parents_id_url
return revision
diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py
index 6fd03b81..bb6ccf05 100644
--- a/swh/web/tests/api/views/test_snapshot.py
+++ b/swh/web/tests/api/views/test_snapshot.py
@@ -1,194 +1,194 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
from hypothesis import given
from swh.model.hashutil import hash_to_hex
from swh.web.common.utils import reverse
from swh.web.tests.data import random_sha1
from swh.web.tests.strategies import (
snapshot, new_snapshot
)
@given(snapshot())
def test_api_snapshot(api_client, archive_data, snapshot):
url = reverse('api-1-snapshot',
url_args={'snapshot_id': snapshot})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
expected_data = archive_data.snapshot_get(snapshot)
expected_data = _enrich_snapshot(archive_data, expected_data)
assert rv.data == expected_data
@given(snapshot())
def test_api_snapshot_paginated(api_client, archive_data, snapshot):
branches_offset = 0
branches_count = 2
snapshot_branches = []
for k, v in sorted(
archive_data.snapshot_get(snapshot)['branches'].items()):
snapshot_branches.append({
'name': k,
'target_type': v['target_type'],
'target': v['target']
})
whole_snapshot = {'id': snapshot, 'branches': {}, 'next_branch': None}
while branches_offset < len(snapshot_branches):
branches_from = snapshot_branches[branches_offset]['name']
url = reverse('api-1-snapshot',
url_args={'snapshot_id': snapshot},
query_params={'branches_from': branches_from,
'branches_count': branches_count})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
expected_data = archive_data.snapshot_get_branches(
snapshot, branches_from, branches_count)
expected_data = _enrich_snapshot(archive_data, expected_data)
branches_offset += branches_count
if branches_offset < len(snapshot_branches):
next_branch = snapshot_branches[branches_offset]['name']
expected_data['next_branch'] = next_branch
else:
expected_data['next_branch'] = None
assert rv.data == expected_data
whole_snapshot['branches'].update(expected_data['branches'])
if branches_offset < len(snapshot_branches):
- next_url = reverse(
- 'api-1-snapshot',
- url_args={'snapshot_id': snapshot},
- query_params={'branches_from': next_branch,
- 'branches_count': branches_count})
+ next_url = rv.wsgi_request.build_absolute_uri(
+ reverse('api-1-snapshot',
+ url_args={'snapshot_id': snapshot},
+ query_params={'branches_from': next_branch,
+ 'branches_count': branches_count}))
assert rv['Link'] == '<%s>; rel="next"' % next_url
else:
assert not rv.has_header('Link')
url = reverse('api-1-snapshot',
url_args={'snapshot_id': snapshot})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == whole_snapshot
@given(snapshot())
def test_api_snapshot_filtered(api_client, archive_data, snapshot):
snapshot_branches = []
for k, v in sorted(
archive_data.snapshot_get(snapshot)['branches'].items()):
snapshot_branches.append({
'name': k,
'target_type': v['target_type'],
'target': v['target']
})
target_type = random.choice(snapshot_branches)['target_type']
url = reverse('api-1-snapshot',
url_args={'snapshot_id': snapshot},
query_params={'target_types': target_type})
rv = api_client.get(url)
expected_data = archive_data.snapshot_get_branches(
snapshot, target_types=target_type)
expected_data = _enrich_snapshot(archive_data, expected_data)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == expected_data
def test_api_snapshot_errors(api_client):
unknown_snapshot_ = random_sha1()
url = reverse('api-1-snapshot',
url_args={'snapshot_id': '63ce369'})
rv = api_client.get(url)
assert rv.status_code == 400, rv.data
url = reverse('api-1-snapshot',
url_args={'snapshot_id': unknown_snapshot_})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
@given(snapshot())
def test_api_snapshot_uppercase(api_client, snapshot):
url = reverse('api-1-snapshot-uppercase-checksum',
url_args={'snapshot_id': snapshot.upper()})
resp = api_client.get(url)
assert resp.status_code == 302
redirect_url = reverse('api-1-snapshot-uppercase-checksum',
url_args={'snapshot_id': snapshot})
assert resp['location'] == redirect_url
@given(new_snapshot(min_size=4))
def test_api_snapshot_null_branch(api_client, archive_data, new_snapshot):
snp_dict = new_snapshot.to_dict()
snp_id = hash_to_hex(snp_dict['id'])
for branch in snp_dict['branches'].keys():
snp_dict['branches'][branch] = None
break
archive_data.snapshot_add([snp_dict])
url = reverse('api-1-snapshot',
url_args={'snapshot_id': snp_id})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
def _enrich_snapshot(archive_data, snapshot):
def _get_branch_url(target_type, target):
url = None
if target_type == 'revision':
url = reverse('api-1-revision', url_args={'sha1_git': target})
if target_type == 'release':
url = reverse('api-1-release', url_args={'sha1_git': target})
return url
for branch in snapshot['branches'].keys():
target = snapshot['branches'][branch]['target']
target_type = snapshot['branches'][branch]['target_type']
snapshot['branches'][branch]['target_url'] = \
_get_branch_url(target_type, target)
for branch in snapshot['branches'].keys():
target = snapshot['branches'][branch]['target']
target_type = snapshot['branches'][branch]['target_type']
if target_type == 'alias':
if target in snapshot['branches']:
snapshot['branches'][branch]['target_url'] = \
snapshot['branches'][target]['target_url']
else:
snp = archive_data.snapshot_get_branches(snapshot['id'],
branches_from=target,
branches_count=1)
alias_target = snp['branches'][target]['target']
alias_target_type = snp['branches'][target]['target_type']
snapshot['branches'][branch]['target_url'] = \
_get_branch_url(alias_target_type, alias_target)
return snapshot
diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py
index 1139d237..28d621b5 100644
--- a/swh/web/tests/conftest.py
+++ b/swh/web/tests/conftest.py
@@ -1,290 +1,296 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import inspect
import json
import os
import shutil
from subprocess import run, PIPE
import pytest
from django.core.cache import cache
from hypothesis import settings, HealthCheck
-from rest_framework.test import APIClient
+from rest_framework.test import APIClient, APIRequestFactory
from swh.model.hashutil import ALGORITHMS, hash_to_bytes
from swh.web.common import converters
from swh.web.tests.data import get_tests_data, override_storages
# Used to skip some tests
ctags_json_missing = (
shutil.which('ctags') is None or
b'+json' not in run(['ctags', '--version'], stdout=PIPE).stdout
)
fossology_missing = shutil.which('nomossa') is None
# Register some hypothesis profiles
settings.register_profile('default', settings())
settings.register_profile(
'swh-web',
settings(deadline=None,
suppress_health_check=[HealthCheck.too_slow,
HealthCheck.filter_too_much]))
settings.register_profile(
'swh-web-fast',
settings(deadline=None, max_examples=1,
suppress_health_check=[HealthCheck.too_slow,
HealthCheck.filter_too_much]))
def pytest_configure(config):
# Small hack in order to be able to run the unit tests
# without static assets generated by webpack.
# Those assets are not really needed for the Python tests
# but the django templates will fail to load due to missing
# generated file webpack-stats.json describing the js and css
# files to include.
# So generate a dummy webpack-stats.json file to overcome
# that issue.
test_dir = os.path.dirname(__file__)
static_dir = os.path.join(test_dir, '../static')
webpack_stats = os.path.join(static_dir, 'webpack-stats.json')
if os.path.exists(webpack_stats):
return
bundles_dir = os.path.join(test_dir, '../assets/src/bundles')
_, dirs, _ = next(os.walk(bundles_dir))
mock_webpack_stats = {
'status': 'done',
'publicPath': '/static',
'chunks': {}
}
for bundle in dirs:
asset = 'js/%s.js' % bundle
mock_webpack_stats['chunks'][bundle] = [{
'name': asset,
'publicPath': '/static/%s' % asset,
'path': os.path.join(static_dir, asset)
}]
with open(webpack_stats, 'w') as outfile:
json.dump(mock_webpack_stats, outfile)
# Clear Django cache before each test
@pytest.fixture(autouse=True)
def django_cache_cleared():
cache.clear()
# Fixture to get test client from Django REST Framework
@pytest.fixture(scope='module')
def api_client():
return APIClient()
+# Fixture to get API request factory from Django REST Framework
+@pytest.fixture(scope='module')
+def api_request_factory():
+ return APIRequestFactory()
+
+
# Initialize tests data
@pytest.fixture(autouse=True)
def tests_data():
data = get_tests_data(reset=True)
# Update swh-web configuration to use the in-memory storages
# instantiated in the tests.data module
override_storages(data['storage'], data['idx_storage'])
return data
# Fixture to manipulate data from a sample archive used in the tests
@pytest.fixture
def archive_data(tests_data):
return _ArchiveData(tests_data)
# Fixture to manipulate indexer data from a sample archive used in the tests
@pytest.fixture
def indexer_data(tests_data):
return _IndexerData(tests_data)
# Custom data directory for requests_mock
@pytest.fixture
def datadir():
return os.path.join(os.path.abspath(os.path.dirname(__file__)),
'resources')
class _ArchiveData:
"""
Helper class to manage data from a sample test archive.
It is initialized with a reference to an in-memory storage
containing raw tests data.
It is basically a proxy to Storage interface but it overrides some methods
to retrieve those tests data in a json serializable format in order to ease
tests implementation.
"""
def __init__(self, tests_data):
self.storage = tests_data['storage']
def _call_storage_method(method):
def call_storage_method(*args, **kwargs):
return method(*args, **kwargs)
return call_storage_method
# Forward calls to non overridden Storage methods to wrapped
# storage instance
for method_name, method in inspect.getmembers(
self.storage, predicate=inspect.ismethod):
if (not hasattr(self, method_name) and
not method_name.startswith('_')):
setattr(self, method_name, _call_storage_method(method))
def content_find(self, content):
cnt_ids_bytes = {algo_hash: hash_to_bytes(content[algo_hash])
for algo_hash in ALGORITHMS
if content.get(algo_hash)}
cnt = self.storage.content_find(cnt_ids_bytes)
return converters.from_content(cnt[0]) if cnt else cnt
def content_get_metadata(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
metadata = next(self.storage.content_get_metadata([cnt_id_bytes]))
return converters.from_swh(metadata,
hashess={'sha1', 'sha1_git', 'sha256',
'blake2s256'})
def content_get(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
cnt = next(self.storage.content_get([cnt_id_bytes]))
return converters.from_content(cnt)
def directory_get(self, dir_id):
return {
'id': dir_id,
'content': self.directory_ls(dir_id)
}
def directory_ls(self, dir_id):
cnt_id_bytes = hash_to_bytes(dir_id)
dir_content = map(converters.from_directory_entry,
self.storage.directory_ls(cnt_id_bytes))
return list(dir_content)
def release_get(self, rel_id):
rel_id_bytes = hash_to_bytes(rel_id)
rel_data = next(self.storage.release_get([rel_id_bytes]))
return converters.from_release(rel_data)
def revision_get(self, rev_id):
rev_id_bytes = hash_to_bytes(rev_id)
rev_data = next(self.storage.revision_get([rev_id_bytes]))
return converters.from_revision(rev_data)
def revision_log(self, rev_id, limit=None):
rev_id_bytes = hash_to_bytes(rev_id)
return list(map(converters.from_revision,
self.storage.revision_log([rev_id_bytes],
limit=limit)))
def snapshot_get_latest(self, origin_url):
snp = self.storage.snapshot_get_latest(origin_url)
return converters.from_snapshot(snp)
def origin_get(self, origin_info):
origin = self.storage.origin_get(origin_info)
return converters.from_origin(origin)
def origin_visit_get(self, origin_url):
visits = self.storage.origin_visit_get(origin_url)
return list(map(converters.from_origin_visit, visits))
def origin_visit_get_by(self, origin_url, visit_id):
visit = self.storage.origin_visit_get_by(origin_url, visit_id)
return converters.from_origin_visit(visit)
def snapshot_get(self, snapshot_id):
snp = self.storage.snapshot_get(hash_to_bytes(snapshot_id))
return converters.from_snapshot(snp)
def snapshot_get_branches(self, snapshot_id, branches_from='',
branches_count=1000, target_types=None):
snp = self.storage.snapshot_get_branches(
hash_to_bytes(snapshot_id), branches_from.encode(),
branches_count, target_types)
return converters.from_snapshot(snp)
def snapshot_get_head(self, snapshot):
if snapshot['branches']['HEAD']['target_type'] == 'alias':
target = snapshot['branches']['HEAD']['target']
head = snapshot['branches'][target]['target']
else:
head = snapshot['branches']['HEAD']['target']
return head
class _IndexerData:
"""
Helper class to manage indexer tests data
It is initialized with a reference to an in-memory indexer storage
containing raw tests data.
It also defines class methods to retrieve those tests data in
a json serializable format in order to ease tests implementation.
"""
def __init__(self, tests_data):
self.idx_storage = tests_data['idx_storage']
self.mimetype_indexer = tests_data['mimetype_indexer']
self.license_indexer = tests_data['license_indexer']
self.ctags_indexer = tests_data['ctags_indexer']
def content_add_mimetype(self, cnt_id):
self.mimetype_indexer.run([hash_to_bytes(cnt_id)],
'update-dups')
def content_get_mimetype(self, cnt_id):
mimetype = next(self.idx_storage.content_mimetype_get(
[hash_to_bytes(cnt_id)]))
return converters.from_filetype(mimetype)
def content_add_language(self, cnt_id):
raise NotImplementedError('Language indexer is disabled.')
self.language_indexer.run([hash_to_bytes(cnt_id)],
'update-dups')
def content_get_language(self, cnt_id):
lang = next(self.idx_storage.content_language_get(
[hash_to_bytes(cnt_id)]))
return converters.from_swh(lang, hashess={'id'})
def content_add_license(self, cnt_id):
self.license_indexer.run([hash_to_bytes(cnt_id)],
'update-dups')
def content_get_license(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
lic = next(self.idx_storage.content_fossology_license_get(
[cnt_id_bytes]))
return converters.from_swh({'id': cnt_id_bytes,
'facts': lic[cnt_id_bytes]},
hashess={'id'})
def content_add_ctags(self, cnt_id):
self.ctags_indexer.run([hash_to_bytes(cnt_id)],
'update-dups')
def content_get_ctags(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
ctags = self.idx_storage.content_ctags_get([cnt_id_bytes])
for ctag in ctags:
yield converters.from_swh(ctag, hashess={'id'})
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Sep 18, 4:58 PM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3264762
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment