diff --git a/pytest.ini b/pytest.ini --- a/pytest.ini +++ b/pytest.ini @@ -2,5 +2,3 @@ norecursedirs = docs node_modules .tox DJANGO_SETTINGS_MODULE = swh.web.settings.tests -markers = - origin_id: execute tests using an origin id (deselect with '-m "not origin_id"') diff --git a/swh/web/tests/admin/test_origin_save.py b/swh/web/tests/admin/test_origin_save.py --- a/swh/web/tests/admin/test_origin_save.py +++ b/swh/web/tests/admin/test_origin_save.py @@ -5,9 +5,9 @@ from urllib.parse import unquote +import pytest from django.contrib.auth import get_user_model -from unittest.mock import patch from swh.web.common.models import ( SaveAuthorizedOrigin, SaveUnauthorizedOrigin, SaveOriginRequest @@ -18,7 +18,6 @@ SAVE_REQUEST_REJECTED, SAVE_TASK_NOT_YET_SCHEDULED ) from swh.web.common.utils import reverse -from swh.web.tests.testcase import WebTestCase _user_name = 'swh-web-admin' _user_mail = 'admin@swh-web.org' @@ -28,205 +27,197 @@ _unauthorized_origin_url = 'https://www.softwareheritage.org/' -class OriginSaveAdminTestCase(WebTestCase): - - @classmethod - def setUpTestData(cls): # noqa: N802 - User = get_user_model() # noqa: N806 - user = User.objects.create_user(_user_name, _user_mail, _user_password) - user.is_staff = True - user.save() - SaveAuthorizedOrigin.objects.create(url=_authorized_origin_url) - SaveUnauthorizedOrigin.objects.create(url=_unauthorized_origin_url) +pytestmark = pytest.mark.django_db - def check_not_login(self, url): - login_url = reverse('login', query_params={'next': url}) - response = self.client.post(url) - self.assertEqual(response.status_code, 302) - self.assertEqual(unquote(response.url), login_url) - - def test_add_authorized_origin_url(self): - authorized_url = 'https://scm.adullact.net/anonscm/' - self.assertEqual(can_save_origin(authorized_url), - SAVE_REQUEST_PENDING) - - url = reverse('admin-origin-save-add-authorized-url', - url_args={'origin_url': authorized_url}) - - self.check_not_login(url) - - self.assertEqual(can_save_origin(authorized_url), - SAVE_REQUEST_PENDING) - - self.client.login(username=_user_name, password=_user_password) - response = self.client.post(url) - self.assertEqual(response.status_code, 200) - self.assertEqual(can_save_origin(authorized_url), - SAVE_REQUEST_ACCEPTED) - - def test_remove_authorized_origin_url(self): - self.assertEqual(can_save_origin(_authorized_origin_url), - SAVE_REQUEST_ACCEPTED) - - url = reverse('admin-origin-save-remove-authorized-url', - url_args={'origin_url': _authorized_origin_url}) - - self.check_not_login(url) - - self.assertEqual(can_save_origin(_authorized_origin_url), - SAVE_REQUEST_ACCEPTED) - - self.client.login(username=_user_name, password=_user_password) - response = self.client.post(url) - self.assertEqual(response.status_code, 200) - self.assertEqual(can_save_origin(_authorized_origin_url), - SAVE_REQUEST_PENDING) - - def test_add_unauthorized_origin_url(self): - unauthorized_url = 'https://www.yahoo./' - self.assertEqual(can_save_origin(unauthorized_url), - SAVE_REQUEST_PENDING) - - url = reverse('admin-origin-save-add-unauthorized-url', - url_args={'origin_url': unauthorized_url}) - - self.check_not_login(url) - - self.assertEqual(can_save_origin(unauthorized_url), - SAVE_REQUEST_PENDING) - - self.client.login(username=_user_name, password=_user_password) - response = self.client.post(url) - self.assertEqual(response.status_code, 200) - self.assertEqual(can_save_origin(unauthorized_url), - SAVE_REQUEST_REJECTED) - - def test_remove_unauthorized_origin_url(self): - self.assertEqual(can_save_origin(_unauthorized_origin_url), - SAVE_REQUEST_REJECTED) - - url = reverse('admin-origin-save-remove-unauthorized-url', - url_args={'origin_url': _unauthorized_origin_url}) - - self.check_not_login(url) - - self.assertEqual(can_save_origin(_unauthorized_origin_url), - SAVE_REQUEST_REJECTED) - - self.client.login(username=_user_name, password=_user_password) - response = self.client.post(url) - self.assertEqual(response.status_code, 200) - self.assertEqual(can_save_origin(_unauthorized_origin_url), - SAVE_REQUEST_PENDING) - - @patch('swh.web.common.origin_save.scheduler') - def test_accept_pending_save_request(self, mock_scheduler): - visit_type = 'git' - origin_url = 'https://v2.pikacode.com/bthate/botlib.git' - save_request_url = reverse('api-1-save-origin', - url_args={'visit_type': visit_type, - 'origin_url': origin_url}) - response = self.client.post(save_request_url, data={}, - content_type='application/x-www-form-urlencoded') # noqa - self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['save_request_status'], - SAVE_REQUEST_PENDING) - - accept_request_url = reverse('admin-origin-save-request-accept', - url_args={'visit_type': visit_type, - 'origin_url': origin_url}) - - self.check_not_login(accept_request_url) - - tasks_data = [ - { - 'priority': 'high', - 'policy': 'oneshot', - 'type': 'load-git', - 'arguments': { - 'kwargs': { - 'repo_url': origin_url - }, - 'args': [] + +@pytest.fixture(autouse=True) +def populated_db(): + User = get_user_model() + user = User.objects.create_user(_user_name, _user_mail, _user_password) + user.is_staff = True + user.save() + SaveAuthorizedOrigin.objects.create(url=_authorized_origin_url) + SaveUnauthorizedOrigin.objects.create(url=_unauthorized_origin_url) + + +def check_not_login(client, url): + login_url = reverse('login', query_params={'next': url}) + response = client.post(url) + assert response.status_code == 302 + assert unquote(response.url) == login_url + + +def test_add_authorized_origin_url(client): + authorized_url = 'https://scm.adullact.net/anonscm/' + assert can_save_origin(authorized_url) == SAVE_REQUEST_PENDING + + url = reverse('admin-origin-save-add-authorized-url', + url_args={'origin_url': authorized_url}) + + check_not_login(client, url) + + assert can_save_origin(authorized_url) == SAVE_REQUEST_PENDING + + client.login(username=_user_name, password=_user_password) + response = client.post(url) + assert response.status_code == 200 + assert can_save_origin(authorized_url) == SAVE_REQUEST_ACCEPTED + + +def test_remove_authorized_origin_url(client): + assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_ACCEPTED + + url = reverse('admin-origin-save-remove-authorized-url', + url_args={'origin_url': _authorized_origin_url}) + + check_not_login(client, url) + + assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_ACCEPTED + + client.login(username=_user_name, password=_user_password) + response = client.post(url) + assert response.status_code == 200 + assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_PENDING + + +def test_add_unauthorized_origin_url(client): + unauthorized_url = 'https://www.yahoo./' + assert can_save_origin(unauthorized_url) == SAVE_REQUEST_PENDING + + url = reverse('admin-origin-save-add-unauthorized-url', + url_args={'origin_url': unauthorized_url}) + + check_not_login(client, url) + + assert can_save_origin(unauthorized_url) == SAVE_REQUEST_PENDING + + client.login(username=_user_name, password=_user_password) + response = client.post(url) + assert response.status_code == 200 + assert can_save_origin(unauthorized_url) == SAVE_REQUEST_REJECTED + + +def test_remove_unauthorized_origin_url(client): + assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_REJECTED + + url = reverse('admin-origin-save-remove-unauthorized-url', + url_args={'origin_url': _unauthorized_origin_url}) + + check_not_login(client, url) + + assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_REJECTED + + client.login(username=_user_name, password=_user_password) + response = client.post(url) + assert response.status_code == 200 + assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_PENDING + + +def test_accept_pending_save_request(client, mocker): + mock_scheduler = mocker.patch('swh.web.common.origin_save.scheduler') + visit_type = 'git' + origin_url = 'https://v2.pikacode.com/bthate/botlib.git' + save_request_url = reverse('api-1-save-origin', + url_args={'visit_type': visit_type, + 'origin_url': origin_url}) + response = client.post(save_request_url, data={}, + content_type='application/x-www-form-urlencoded') + assert response.status_code == 200 + assert response.data['save_request_status'] == SAVE_REQUEST_PENDING + + accept_request_url = reverse('admin-origin-save-request-accept', + url_args={'visit_type': visit_type, + 'origin_url': origin_url}) + + check_not_login(client, accept_request_url) + + tasks_data = [ + { + 'priority': 'high', + 'policy': 'oneshot', + 'type': 'load-git', + 'arguments': { + 'kwargs': { + 'repo_url': origin_url }, - 'status': 'next_run_not_scheduled', - 'id': 1, - } - ] - - mock_scheduler.create_tasks.return_value = tasks_data - mock_scheduler.get_tasks.return_value = tasks_data - - self.client.login(username=_user_name, password=_user_password) - response = self.client.post(accept_request_url) - self.assertEqual(response.status_code, 200) - - response = self.client.get(save_request_url) - self.assertEqual(response.status_code, 200) - self.assertEqual(response.data[0]['save_request_status'], - SAVE_REQUEST_ACCEPTED) - self.assertEqual(response.data[0]['save_task_status'], - SAVE_TASK_NOT_YET_SCHEDULED) - - @patch('swh.web.common.origin_save.scheduler') - def test_reject_pending_save_request(self, mock_scheduler): - visit_type = 'git' - origin_url = 'https://wikipedia.com' - save_request_url = reverse('api-1-save-origin', - url_args={'visit_type': visit_type, - 'origin_url': origin_url}) - response = self.client.post(save_request_url, data={}, - content_type='application/x-www-form-urlencoded') # noqa - self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['save_request_status'], - SAVE_REQUEST_PENDING) - - reject_request_url = reverse('admin-origin-save-request-reject', - url_args={'visit_type': visit_type, - 'origin_url': origin_url}) - - self.check_not_login(reject_request_url) - - self.client.login(username=_user_name, password=_user_password) - response = self.client.post(reject_request_url) - self.assertEqual(response.status_code, 200) - - tasks_data = [ - { - 'priority': 'high', - 'policy': 'oneshot', - 'type': 'load-git', - 'arguments': { - 'kwargs': { - 'repo_url': origin_url - }, - 'args': [] + 'args': [] + }, + 'status': 'next_run_not_scheduled', + 'id': 1, + } + ] + + mock_scheduler.create_tasks.return_value = tasks_data + mock_scheduler.get_tasks.return_value = tasks_data + + client.login(username=_user_name, password=_user_password) + response = client.post(accept_request_url) + assert response.status_code == 200 + + response = client.get(save_request_url) + assert response.status_code == 200 + assert response.data[0]['save_request_status'] == SAVE_REQUEST_ACCEPTED + assert response.data[0]['save_task_status'] == SAVE_TASK_NOT_YET_SCHEDULED + + +def test_reject_pending_save_request(client, mocker): + mock_scheduler = mocker.patch('swh.web.common.origin_save.scheduler') + visit_type = 'git' + origin_url = 'https://wikipedia.com' + save_request_url = reverse('api-1-save-origin', + url_args={'visit_type': visit_type, + 'origin_url': origin_url}) + response = client.post(save_request_url, data={}, + content_type='application/x-www-form-urlencoded') + assert response.status_code == 200 + assert response.data['save_request_status'] == SAVE_REQUEST_PENDING + + reject_request_url = reverse('admin-origin-save-request-reject', + url_args={'visit_type': visit_type, + 'origin_url': origin_url}) + + check_not_login(client, reject_request_url) + + client.login(username=_user_name, password=_user_password) + response = client.post(reject_request_url) + assert response.status_code == 200 + + tasks_data = [ + { + 'priority': 'high', + 'policy': 'oneshot', + 'type': 'load-git', + 'arguments': { + 'kwargs': { + 'repo_url': origin_url }, - 'status': 'next_run_not_scheduled', - 'id': 1, - } - ] - - mock_scheduler.create_tasks.return_value = tasks_data - mock_scheduler.get_tasks.return_value = tasks_data - - response = self.client.get(save_request_url) - self.assertEqual(response.status_code, 200) - self.assertEqual(response.data[0]['save_request_status'], - SAVE_REQUEST_REJECTED) - - def test_remove_save_request(self): - sor = SaveOriginRequest.objects.create(visit_type='git', - origin_url='https://wikipedia.com', # noqa - status=SAVE_REQUEST_PENDING) - self.assertEqual(SaveOriginRequest.objects.count(), 1) - - remove_request_url = reverse('admin-origin-save-request-remove', - url_args={'sor_id': sor.id}) - - self.check_not_login(remove_request_url) - - self.client.login(username=_user_name, password=_user_password) - response = self.client.post(remove_request_url) - self.assertEqual(response.status_code, 200) - self.assertEqual(SaveOriginRequest.objects.count(), 0) + 'args': [] + }, + 'status': 'next_run_not_scheduled', + 'id': 1, + } + ] + + mock_scheduler.create_tasks.return_value = tasks_data + mock_scheduler.get_tasks.return_value = tasks_data + + response = client.get(save_request_url) + assert response.status_code == 200 + assert response.data[0]['save_request_status'] == SAVE_REQUEST_REJECTED + + +def test_remove_save_request(client): + sor = SaveOriginRequest.objects.create(visit_type='git', + origin_url='https://wikipedia.com', # noqa + status=SAVE_REQUEST_PENDING) + assert SaveOriginRequest.objects.count() == 1 + + remove_request_url = reverse('admin-origin-save-request-remove', + url_args={'sor_id': sor.id}) + + check_not_login(client, remove_request_url) + + client.login(username=_user_name, password=_user_password) + response = client.post(remove_request_url) + assert response.status_code == 200 + assert SaveOriginRequest.objects.count() == 0 diff --git a/swh/web/tests/api/test_api_lookup.py b/swh/web/tests/api/test_api_lookup.py --- a/swh/web/tests/api/test_api_lookup.py +++ b/swh/web/tests/api/test_api_lookup.py @@ -1,116 +1,115 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest + from swh.web.common.exc import NotFoundExc from swh.web.api.views import utils -from swh.web.tests.testcase import WebTestCase - - -class ApiLookupTestCase(WebTestCase): - - def test_genericapi_lookup_nothing_is_found(self): - # given - def test_generic_lookup_fn(sha1, another_unused_arg): - assert another_unused_arg == 'unused_arg' - assert sha1 == 'sha1' - return None - - # when - with self.assertRaises(NotFoundExc) as cm: - utils.api_lookup( - test_generic_lookup_fn, 'sha1', 'unused_arg', - notfound_msg='This will be raised because None is returned.') - - self.assertIn('This will be raised because None is returned.', - cm.exception.args[0]) - - def test_generic_api_map_are_enriched_and_transformed_to_list(self): - # given - def test_generic_lookup_fn_1(criteria0, param0, param1): - assert criteria0 == 'something' - return map(lambda x: x + 1, [1, 2, 3]) - - # when - actual_result = utils.api_lookup( - test_generic_lookup_fn_1, 'something', 'some param 0', - 'some param 1', - notfound_msg=('This is not the error message you are looking for. ' - 'Move along.'), - enrich_fn=lambda x: x * 2) - - self.assertEqual(actual_result, [4, 6, 8]) - - def test_generic_api_list_are_enriched_too(self): - # given - def test_generic_lookup_fn_2(crit): - assert crit == 'something' - return ['a', 'b', 'c'] - - # when - actual_result = utils.api_lookup( - test_generic_lookup_fn_2, 'something', - notfound_msg=('Not the error message you are looking for, it is. ' - 'Along, you move!'), - enrich_fn=lambda x: ''. join(['=', x, '='])) - - self.assertEqual(actual_result, ['=a=', '=b=', '=c=']) - - def test_generic_api_generator_are_enriched_and_returned_as_list(self): - # given - def test_generic_lookup_fn_3(crit): - assert crit == 'crit' - return (i for i in [4, 5, 6]) - - # when - actual_result = utils.api_lookup( - test_generic_lookup_fn_3, 'crit', - notfound_msg='Move!', - enrich_fn=lambda x: x - 1) - - self.assertEqual(actual_result, [3, 4, 5]) - - def test_generic_api_simple_data_are_enriched_and_returned_too(self): - # given - def test_generic_lookup_fn_4(crit): - assert crit == '123' - return {'a': 10} - - def test_enrich_data(x): - x['a'] = x['a'] * 10 - return x - - # when - actual_result = utils.api_lookup( - test_generic_lookup_fn_4, '123', - notfound_msg='Nothing to do', - enrich_fn=test_enrich_data) - - self.assertEqual(actual_result, {'a': 100}) - - def test_api_lookup_not_found(self): - # when - with self.assertRaises(NotFoundExc) as e: - utils.api_lookup( - lambda x: None, 'something', - notfound_msg='this is the error message raised as it is None') - - self.assertEqual(e.exception.args[0], - 'this is the error message raised as it is None') - - def test_api_lookup_with_result(self): - # when - actual_result = utils.api_lookup( - lambda x: x + '!', 'something', - notfound_msg='this is the error which won\'t be used here') - - self.assertEqual(actual_result, 'something!') - - def test_api_lookup_with_result_as_map(self): - # when - actual_result = utils.api_lookup( - lambda x: map(lambda y: y+1, x), [1, 2, 3], - notfound_msg='this is the error which won\'t be used here') - - self.assertEqual(actual_result, [2, 3, 4]) + + +def test_genericapi_lookup_nothing_is_found(): + + def test_generic_lookup_fn(sha1, another_unused_arg): + assert another_unused_arg == 'unused_arg' + assert sha1 == 'sha1' + return None + + notfound_msg = 'This will be raised because None is returned.' + + with pytest.raises(NotFoundExc) as e: + utils.api_lookup( + test_generic_lookup_fn, 'sha1', 'unused_arg', + notfound_msg=notfound_msg) + + assert e.match(notfound_msg) + + +def test_generic_api_map_are_enriched_and_transformed_to_list(): + + def test_generic_lookup_fn_1(criteria0, param0, param1): + assert criteria0 == 'something' + return map(lambda x: x + 1, [1, 2, 3]) + + actual_result = utils.api_lookup( + test_generic_lookup_fn_1, 'something', 'some param 0', + 'some param 1', + notfound_msg=('This is not the error message you are looking for. ' + 'Move along.'), + enrich_fn=lambda x: x * 2) + + assert actual_result == [4, 6, 8] + + +def test_generic_api_list_are_enriched_too(): + + def test_generic_lookup_fn_2(crit): + assert crit == 'something' + return ['a', 'b', 'c'] + + actual_result = utils.api_lookup( + test_generic_lookup_fn_2, 'something', + notfound_msg=('Not the error message you are looking for, it is. ' + 'Along, you move!'), + enrich_fn=lambda x: ''. join(['=', x, '='])) + + assert actual_result == ['=a=', '=b=', '=c='] + + +def test_generic_api_generator_are_enriched_and_returned_as_list(): + + def test_generic_lookup_fn_3(crit): + assert crit == 'crit' + return (i for i in [4, 5, 6]) + + actual_result = utils.api_lookup( + test_generic_lookup_fn_3, 'crit', + notfound_msg='Move!', + enrich_fn=lambda x: x - 1) + + assert actual_result == [3, 4, 5] + + +def test_generic_api_simple_data_are_enriched_and_returned_too(): + + def test_generic_lookup_fn_4(crit): + assert crit == '123' + return {'a': 10} + + def test_enrich_data(x): + x['a'] = x['a'] * 10 + return x + + actual_result = utils.api_lookup( + test_generic_lookup_fn_4, '123', + notfound_msg='Nothing to do', + enrich_fn=test_enrich_data) + + assert actual_result == {'a': 100} + + +def test_api_lookup_not_found(): + notfound_msg = 'this is the error message raised as it is None' + with pytest.raises(NotFoundExc) as e: + utils.api_lookup( + lambda x: None, 'something', + notfound_msg=notfound_msg) + + assert e.match(notfound_msg) + + +def test_api_lookup_with_result(): + actual_result = utils.api_lookup( + lambda x: x + '!', 'something', + notfound_msg='this is the error which won\'t be used here') + + assert actual_result == 'something!' + + +def test_api_lookup_with_result_as_map(): + actual_result = utils.api_lookup( + lambda x: map(lambda y: y+1, x), [1, 2, 3], + notfound_msg='this is the error which won\'t be used here') + + assert actual_result == [2, 3, 4] diff --git a/swh/web/tests/api/test_apidoc.py b/swh/web/tests/api/test_apidoc.py --- a/swh/web/tests/api/test_apidoc.py +++ b/swh/web/tests/api/test_apidoc.py @@ -1,9 +1,10 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from rest_framework.test import APITestCase +import pytest + from rest_framework.response import Response from swh.storage.exc import StorageDBError, StorageAPIError @@ -11,41 +12,47 @@ from swh.web.api.apidoc import api_doc, _parse_httpdomain_doc from swh.web.api.apiurls import api_route from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc -from swh.web.tests.testcase import WebTestCase +from swh.web.tests.django_asserts import assert_template_used -# flake8: noqa httpdomain_doc = """ .. http:get:: /api/1/revision/(sha1_git)/ Get information about a revision in the archive. - Revisions are identified by **sha1** checksums, compatible with Git commit identifiers. - See :func:`swh.model.identifiers.revision_identifier` in our data model module for details - about how they are computed. + Revisions are identified by **sha1** checksums, compatible with Git commit + identifiers. + See :func:`swh.model.identifiers.revision_identifier` in our data model + module for details about how they are computed. - :param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier + :param string sha1_git: hexadecimal representation of the revision + **sha1_git** identifier :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` - :resheader Content-Type: this depends on :http:header:`Accept` header of request + :resheader Content-Type: this depends on :http:header:`Accept` header + of request :>json object author: information about the author of the revision :>json object committer: information about the committer of the revision - :>json string committer_date: ISO representation of the commit date (in UTC) + :>json string committer_date: ISO representation of the commit date + (in UTC) :>json string date: ISO representation of the revision date (in UTC) :>json string directory: the unique identifier that revision points to - :>json string directory_url: link to :http:get:`/api/1/directory/(sha1_git)/[(path)/]` - to get information about the directory associated to the revision + :>json string directory_url: link to + :http:get:`/api/1/directory/(sha1_git)/[(path)/]` to get information + about the directory associated to the revision :>json string id: the revision unique identifier - :>json boolean merge: whether or not the revision corresponds to a merge commit + :>json boolean merge: whether or not the revision corresponds to a merge + commit :>json string message: the message associated to the revision - :>json array parents: the parents of the revision, i.e. the previous revisions - that head directly to it, each entry of that array contains an unique parent - revision identifier but also a link to :http:get:`/api/1/revision/(sha1_git)/` - to get more information about it + :>json array parents: the parents of the revision, i.e. the previous + revisions that head directly to it, each entry of that array contains + an unique parent revision identifier but also a link to + :http:get:`/api/1/revision/(sha1_git)/` to get more information + about it :>json string type: the type of the revision - **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head` :statuscode 200: no error :statuscode 400: an invalid **sha1_git** value has been provided @@ -55,243 +62,252 @@ .. parsed-literal:: - $ curl -i :swh_web_api:`revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/` + :swh_web_api:`revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/` """ -class APIDocTestCase(WebTestCase, APITestCase): +exception_http_code = { + BadInputExc: 400, + ForbiddenExc: 403, + NotFoundExc: 404, + Exception: 500, + StorageAPIError: 503, + StorageDBError: 503, +} - exception_http_code = { - BadInputExc: 400, - ForbiddenExc: 403, - NotFoundExc: 404, - Exception: 500, - StorageAPIError: 503, - StorageDBError: 503, - } - def test_apidoc_nodoc_failure(self): - with self.assertRaises(Exception): - @api_doc('/my/nodoc/url/') - def apidoc_nodoc_tester(request, arga=0, argb=0): - return Response(arga + argb) - - @staticmethod - @api_route(r'/some/(?P[0-9]+)/(?P[0-9]+)/', - 'some-doc-route') - @api_doc('/some/doc/route/') - def apidoc_route(request, myarg, myotherarg, akw=0): - """ - Sample doc - """ - return {'result': int(myarg) + int(myotherarg) + akw} - - def test_apidoc_route_doc(self): - # when - rv = self.client.get('/api/1/some/doc/route/') - - # then - self.assertEqual(rv.status_code, 200, rv.data) - self.assertTemplateUsed('api/apidoc.html') - - def test_apidoc_route_fn(self): - - # when - rv = self.client.get('/api/1/some/1/1/') - - # then - self.assertEqual(rv.status_code, 200, rv.data) - - @staticmethod - @api_route(r'/test/error/(?P.+)/', - 'test-error') - @api_doc('/test/error/') - def apidoc_test_error_route(request, exc_name): - """ - Sample doc - """ - for e in APIDocTestCase.exception_http_code.keys(): - if e.__name__ == exc_name: - raise e('Error') - - def test_apidoc_error(self): - for exc, code in self.exception_http_code.items(): - # when - rv = self.client.get('/api/1/test/error/%s/' % exc.__name__) - - # then - self.assertEqual(rv.status_code, code) - - @staticmethod - @api_route(r'/some/full/(?P[0-9]+)/(?P[0-9]+)/', - 'some-complete-doc-route') - @api_doc('/some/complete/doc/route/') - def apidoc_full_stack(request, myarg, myotherarg, akw=0): - """ - Sample doc - """ - return {'result': int(myarg) + int(myotherarg) + akw} - - def test_apidoc_full_stack_doc(self): - # when - rv = self.client.get('/api/1/some/complete/doc/route/') - - # then - self.assertEqual(rv.status_code, 200, rv.data) - self.assertTemplateUsed('api/apidoc.html') - - def test_apidoc_full_stack_fn(self): - # when - rv = self.client.get('/api/1/some/full/1/1/') - - # then - self.assertEqual(rv.status_code, 200, rv.data) - - def test_api_doc_parse_httpdomain(self): - doc_data = { - 'description': '', - 'urls': [], - 'args': [], - 'params': [], - 'resheaders': [], - 'reqheaders': [], - 'return_type': '', - 'returns': [], - 'status_codes': [], - 'examples': [] - } +def test_apidoc_nodoc_failure(): + with pytest.raises(Exception): + @api_doc('/my/nodoc/url/') + def apidoc_nodoc_tester(request, arga=0, argb=0): + return Response(arga + argb) + + +@api_route(r'/some/(?P[0-9]+)/(?P[0-9]+)/', + 'some-doc-route') +@api_doc('/some/doc/route/') +def apidoc_route(request, myarg, myotherarg, akw=0): + """ + Sample doc + """ + return {'result': int(myarg) + int(myotherarg) + akw} + +# remove deprecation warnings related to docutils +@pytest.mark.filterwarnings( + 'ignore:.*U.*mode is deprecated:DeprecationWarning') +def test_apidoc_route_doc(client): + rv = client.get('/api/1/some/doc/route/', HTTP_ACCEPT='text/html') + + assert rv.status_code == 200, rv.content + assert_template_used(rv, 'api/apidoc.html') + + +def test_apidoc_route_fn(api_client): + rv = api_client.get('/api/1/some/1/1/') + + assert rv.status_code == 200, rv.data + + +@api_route(r'/test/error/(?P.+)/', 'test-error') +@api_doc('/test/error/') +def apidoc_test_error_route(request, exc_name): + """ + Sample doc + """ + for e in exception_http_code.keys(): + if e.__name__ == exc_name: + raise e('Error') + + +def test_apidoc_error(api_client): + for exc, code in exception_http_code.items(): + rv = api_client.get('/api/1/test/error/%s/' % exc.__name__) + + assert rv.status_code == code, rv.data + - _parse_httpdomain_doc(httpdomain_doc, doc_data) +@api_route(r'/some/full/(?P[0-9]+)/(?P[0-9]+)/', + 'some-complete-doc-route') +@api_doc('/some/complete/doc/route/') +def apidoc_full_stack(request, myarg, myotherarg, akw=0): + """ + Sample doc + """ + return {'result': int(myarg) + int(myotherarg) + akw} - expected_urls = [{ - 'rule': '/api/1/revision/ **\\(sha1_git\\)** /', - 'methods': ['GET', 'HEAD', 'OPTIONS'] - }] - self.assertIn('urls', doc_data) - self.assertEqual(doc_data['urls'], expected_urls) +# remove deprecation warnings related to docutils +@pytest.mark.filterwarnings( + 'ignore:.*U.*mode is deprecated:DeprecationWarning') +def test_apidoc_full_stack_doc(client): + rv = client.get('/api/1/some/complete/doc/route/', HTTP_ACCEPT='text/html') + assert rv.status_code == 200, rv.content + assert_template_used(rv, 'api/apidoc.html') - expected_description = 'Get information about a revision in the archive. \ -Revisions are identified by **sha1** checksums, compatible with Git commit \ -identifiers. See **swh.model.identifiers.revision_identifier** in our data \ -model module for details about how they are computed.' - self.assertIn('description', doc_data) - self.assertEqual(doc_data['description'], expected_description) - expected_args = [{ - 'name': 'sha1_git', +def test_apidoc_full_stack_fn(api_client): + rv = api_client.get('/api/1/some/full/1/1/') + + assert rv.status_code == 200, rv.data + + +def test_api_doc_parse_httpdomain(): + doc_data = { + 'description': '', + 'urls': [], + 'args': [], + 'params': [], + 'resheaders': [], + 'reqheaders': [], + 'return_type': '', + 'returns': [], + 'status_codes': [], + 'examples': [] + } + + _parse_httpdomain_doc(httpdomain_doc, doc_data) + + expected_urls = [{ + 'rule': '/api/1/revision/ **\\(sha1_git\\)** /', + 'methods': ['GET', 'HEAD'] + }] + + assert 'urls' in doc_data + assert doc_data['urls'] == expected_urls + + expected_description = ('Get information about a revision in the archive. ' + 'Revisions are identified by **sha1** checksums, ' + 'compatible with Git commit identifiers. See ' + '**swh.model.identifiers.revision_identifier** in ' + 'our data model module for details about how they ' + 'are computed.') + + assert 'description' in doc_data + assert doc_data['description'] == expected_description + + expected_args = [{ + 'name': 'sha1_git', + 'type': 'string', + 'doc': ('hexadecimal representation of the revision ' + '**sha1_git** identifier') + }] + + assert 'args' in doc_data + assert doc_data['args'] == expected_args + + expected_params = [] + assert 'params' in doc_data + assert doc_data['params'] == expected_params + + expected_reqheaders = [{ + 'doc': ('the requested response content type, either ' + '``application/json`` or ``application/yaml``'), + 'name': 'Accept' + }] + + assert 'reqheaders' in doc_data + assert doc_data['reqheaders'] == expected_reqheaders + + expected_resheaders = [{ + 'doc': 'this depends on **Accept** header of request', + 'name': 'Content-Type' + }] + + assert 'resheaders' in doc_data + assert doc_data['resheaders'] == expected_resheaders + + expected_statuscodes = [ + { + 'code': '200', + 'doc': 'no error' + }, + { + 'code': '400', + 'doc': 'an invalid **sha1_git** value has been provided' + }, + { + 'code': '404', + 'doc': 'requested revision can not be found in the archive' + } + ] + + assert 'status_codes' in doc_data + assert doc_data['status_codes'] == expected_statuscodes + + expected_return_type = 'object' + + assert 'return_type' in doc_data + assert doc_data['return_type'] in expected_return_type + + expected_returns = [ + { + 'name': 'author', + 'type': 'object', + 'doc': 'information about the author of the revision' + }, + { + 'name': 'committer', + 'type': 'object', + 'doc': 'information about the committer of the revision' + }, + { + 'name': 'committer_date', + 'type': 'string', + 'doc': 'ISO representation of the commit date (in UTC)' + }, + { + 'name': 'date', + 'type': 'string', + 'doc': 'ISO representation of the revision date (in UTC)' + }, + { + 'name': 'directory', 'type': 'string', - 'doc': 'hexadecimal representation of the revision **sha1_git** identifier' - }] - - self.assertIn('args', doc_data) - self.assertEqual(doc_data['args'], expected_args) - - expected_params = [] - self.assertIn('params', doc_data) - self.assertEqual(doc_data['params'], expected_params) - - expected_reqheaders = [{ - 'doc': 'the requested response content type, either ``application/json`` or ``application/yaml``', - 'name': 'Accept' - }] - - self.assertIn('reqheaders', doc_data) - self.assertEqual(doc_data['reqheaders'], expected_reqheaders) - - expected_resheaders = [{ - 'doc': 'this depends on **Accept** header of request', - 'name': 'Content-Type' - }] - - self.assertIn('resheaders', doc_data) - self.assertEqual(doc_data['resheaders'], expected_resheaders) - - expected_statuscodes = [ - { - 'code': '200', - 'doc': 'no error' - }, - { - 'code': '400', - 'doc': 'an invalid **sha1_git** value has been provided' - }, - { - 'code': '404', - 'doc': 'requested revision can not be found in the archive' - } - ] - - self.assertIn('status_codes', doc_data) - self.assertEqual(doc_data['status_codes'], expected_statuscodes) - - expected_return_type = 'object' - - self.assertIn('return_type', doc_data) - self.assertEqual(doc_data['return_type'], expected_return_type) - - expected_returns = [ - { - 'name': 'author', - 'type': 'object', - 'doc': 'information about the author of the revision' - }, - { - 'name': 'committer', - 'type': 'object', - 'doc': 'information about the committer of the revision' - }, - { - 'name': 'committer_date', - 'type': 'string', - 'doc': 'ISO representation of the commit date (in UTC)' - }, - { - 'name': 'date', - 'type': 'string', - 'doc': 'ISO representation of the revision date (in UTC)' - }, - { - 'name': 'directory', - 'type': 'string', - 'doc': 'the unique identifier that revision points to' - }, - { - 'name': 'directory_url', - 'type': 'string', - 'doc': 'link to ``_ to get information about the directory associated to the revision' - }, - { - 'name': 'id', - 'type': 'string', - 'doc': 'the revision unique identifier' - }, - { - 'name': 'merge', - 'type': 'boolean', - 'doc': 'whether or not the revision corresponds to a merge commit' - }, - { - 'name': 'message', - 'type': 'string', - 'doc': 'the message associated to the revision' - }, - { - 'name': 'parents', - 'type': 'array', - 'doc': 'the parents of the revision, i.e. the previous revisions that head directly to it, each entry of that array contains an unique parent revision identifier but also a link to ``_ to get more information about it' - }, - { - 'name': 'type', - 'type': 'string', - 'doc': 'the type of the revision' - } - ] - - self.assertIn('returns', doc_data) - self.assertEqual(doc_data['returns'], expected_returns) - - expected_examples = ['/api/1/revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/'] - - self.assertIn('examples', doc_data) - self.assertEqual(doc_data['examples'], expected_examples) + 'doc': 'the unique identifier that revision points to' + }, + { + 'name': 'directory_url', + 'type': 'string', + 'doc': ('link to ``_ to get information about ' + 'the directory associated to the revision') + }, + { + 'name': 'id', + 'type': 'string', + 'doc': 'the revision unique identifier' + }, + { + 'name': 'merge', + 'type': 'boolean', + 'doc': 'whether or not the revision corresponds to a merge commit' + }, + { + 'name': 'message', + 'type': 'string', + 'doc': 'the message associated to the revision' + }, + { + 'name': 'parents', + 'type': 'array', + 'doc': ('the parents of the revision, i.e. the previous revisions ' + 'that head directly to it, each entry of that array ' + 'contains an unique parent revision identifier but also a ' + 'link to ``_ to get more information ' + 'about it') + }, + { + 'name': 'type', + 'type': 'string', + 'doc': 'the type of the revision' + } + ] + + assert 'returns' in doc_data + assert doc_data['returns'] == expected_returns + + expected_examples = [ + '/api/1/revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/' + ] + + assert 'examples' in doc_data + assert doc_data['examples'] == expected_examples diff --git a/swh/web/tests/api/test_apiresponse.py b/swh/web/tests/api/test_apiresponse.py --- a/swh/web/tests/api/test_apiresponse.py +++ b/swh/web/tests/api/test_apiresponse.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,163 +7,146 @@ from rest_framework.test import APIRequestFactory -from unittest.mock import patch - from swh.web.api.apiresponse import ( compute_link_header, transform, make_api_response, filter_by_fields ) -from swh.web.tests.testcase import WebTestCase api_request_factory = APIRequestFactory() -class SWHComputeLinkHeaderTest(WebTestCase): - def test_compute_link_header(self): - rv = { - 'headers': {'link-next': 'foo', 'link-prev': 'bar'}, - 'results': [1, 2, 3] - } - options = {} +def test_compute_link_header(): + rv = { + 'headers': {'link-next': 'foo', 'link-prev': 'bar'}, + 'results': [1, 2, 3] + } + options = {} + + headers = compute_link_header(rv, options) + + assert headers == {'Link': '; rel="next",; rel="previous"'} + + +def test_compute_link_header_nothing_changed(): + rv = {} + options = {} + + headers = compute_link_header(rv, options) + + assert headers == {} + - # when - headers = compute_link_header( - rv, options) +def test_compute_link_header_nothing_changed_2(): + rv = {'headers': {}} + options = {} - self.assertEqual(headers, { - 'Link': '; rel="next",; rel="previous"', - }) + headers = compute_link_header(rv, options) - def test_compute_link_header_nothing_changed(self): - rv = {} - options = {} + assert headers == {} - # when - headers = compute_link_header( - rv, options) - self.assertEqual(headers, {}) +def test_transform_only_return_results_1(): + rv = {'results': {'some-key': 'some-value'}} - def test_compute_link_header_nothing_changed_2(self): - rv = {'headers': {}} - options = {} + assert transform(rv) == {'some-key': 'some-value'} - # when - headers = compute_link_header( - rv, options) - self.assertEqual(headers, {}) +def test_transform_only_return_results_2(): + rv = {'headers': {'something': 'do changes'}, + 'results': {'some-key': 'some-value'}} + assert transform(rv) == {'some-key': 'some-value'} -class SWHTransformProcessorTest(WebTestCase): - def test_transform_only_return_results_1(self): - rv = {'results': {'some-key': 'some-value'}} - self.assertEqual(transform(rv), {'some-key': 'some-value'}) +def test_transform_do_remove_headers(): + rv = {'headers': {'something': 'do changes'}, + 'some-key': 'some-value'} - def test_transform_only_return_results_2(self): - rv = {'headers': {'something': 'do changes'}, - 'results': {'some-key': 'some-value'}} + assert transform(rv) == {'some-key': 'some-value'} - self.assertEqual(transform(rv), {'some-key': 'some-value'}) - def test_transform_do_remove_headers(self): - rv = {'headers': {'something': 'do changes'}, - 'some-key': 'some-value'} +def test_transform_do_nothing(): + rv = {'some-key': 'some-value'} - self.assertEqual(transform(rv), {'some-key': 'some-value'}) + assert transform(rv) == {'some-key': 'some-value'} - def test_transform_do_nothing(self): - rv = {'some-key': 'some-value'} - self.assertEqual(transform(rv), {'some-key': 'some-value'}) +def test_swh_multi_response_mimetype(mocker): + mock_shorten_path = mocker.patch('swh.web.api.apiresponse.shorten_path') + mock_filter = mocker.patch('swh.web.api.apiresponse.filter_by_fields') + mock_json = mocker.patch('swh.web.api.apiresponse.json') + data = { + 'data': [12, 34], + 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' + } -class RendererTestCase(WebTestCase): + mock_filter.return_value = data + mock_shorten_path.return_value = 'my_short_path' - @patch('swh.web.api.apiresponse.json') - @patch('swh.web.api.apiresponse.filter_by_fields') - @patch('swh.web.api.apiresponse.shorten_path') - def test_swh_multi_response_mimetype(self, mock_shorten_path, - mock_filter, mock_json): - # given - data = { - 'data': [12, 34], - 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' - } + accepted_response_formats = {'html': 'text/html', + 'yaml': 'application/yaml', + 'json': 'application/json'} - mock_filter.return_value = data - mock_shorten_path.return_value = 'my_short_path' + for format in accepted_response_formats: - accepted_response_formats = {'html': 'text/html', - 'yaml': 'application/yaml', - 'json': 'application/json'} + request = api_request_factory.get('/api/test/path/') - for format in accepted_response_formats: + mime_type = accepted_response_formats[format] + setattr(request, 'accepted_media_type', mime_type) - request = api_request_factory.get('/api/test/path/') + if mime_type == 'text/html': - mime_type = accepted_response_formats[format] - setattr(request, 'accepted_media_type', mime_type) + expected_data = { + 'response_data': json.dumps(data), + 'request': { + 'path': request.path, + 'method': request.method, + 'absolute_uri': request.build_absolute_uri() + }, + 'headers_data': {}, + 'heading': 'my_short_path', + 'status_code': 200 + } - if mime_type == 'text/html': + mock_json.dumps.return_value = json.dumps(data) + else: + expected_data = data - expected_data = { - 'response_data': json.dumps(data), - 'request': { - 'path': request.path, - 'method': request.method, - 'absolute_uri': request.build_absolute_uri() - }, - 'headers_data': {}, - 'heading': 'my_short_path', - 'status_code': 200 - } + rv = make_api_response(request, data) - mock_json.dumps.return_value = json.dumps(data) - else: - expected_data = data + mock_filter.assert_called_with(request, data) - # when + assert rv.status_code == 200, rv.data + assert rv.data == expected_data + if mime_type == 'text/html': + assert rv.template_name == 'api/apidoc.html' - rv = make_api_response(request, data) - # then - mock_filter.assert_called_with(request, data) - self.assertEqual(rv.data, expected_data) - self.assertEqual(rv.status_code, 200, rv.data) - if mime_type == 'text/html': - self.assertEqual(rv.template_name, 'api/apidoc.html') +def test_swh_filter_renderer_do_nothing(): + input_data = {'a': 'some-data'} - def test_swh_filter_renderer_do_nothing(self): - # given - input_data = {'a': 'some-data'} + request = api_request_factory.get('/api/test/path/', data={}) + setattr(request, 'query_params', request.GET) - request = api_request_factory.get('/api/test/path/', data={}) - setattr(request, 'query_params', request.GET) + actual_data = filter_by_fields(request, input_data) - # when - actual_data = filter_by_fields(request, input_data) + assert actual_data == input_data - # then - self.assertEqual(actual_data, input_data) - @patch('swh.web.api.apiresponse.utils.filter_field_keys') - def test_swh_filter_renderer_do_filter(self, mock_ffk): - # given - mock_ffk.return_value = {'a': 'some-data'} +def test_swh_filter_renderer_do_filter(mocker): + mock_ffk = mocker.patch('swh.web.api.apiresponse.utils.filter_field_keys') + mock_ffk.return_value = {'a': 'some-data'} - request = api_request_factory.get('/api/test/path/', - data={'fields': 'a,c'}) - setattr(request, 'query_params', request.GET) + request = api_request_factory.get('/api/test/path/', + data={'fields': 'a,c'}) + setattr(request, 'query_params', request.GET) - input_data = {'a': 'some-data', - 'b': 'some-other-data'} + input_data = {'a': 'some-data', + 'b': 'some-other-data'} - # when - actual_data = filter_by_fields(request, input_data) + actual_data = filter_by_fields(request, input_data) - # then - self.assertEqual(actual_data, {'a': 'some-data'}) + assert actual_data == {'a': 'some-data'} - mock_ffk.assert_called_once_with(input_data, {'a', 'c'}) + mock_ffk.assert_called_once_with(input_data, {'a', 'c'}) diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py --- a/swh/web/tests/api/test_utils.py +++ b/swh/web/tests/api/test_utils.py @@ -3,586 +3,543 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from unittest.mock import patch, call - from swh.web.api import utils -from swh.web.tests.testcase import WebTestCase - - -class UtilsTestCase(WebTestCase): - def setUp(self): - self.maxDiff = None - self.url_map = [dict(rule='/other/', - methods=set(['GET', 'POST', 'HEAD']), - endpoint='foo'), - dict(rule='/some/old/url/', - methods=set(['GET', 'POST']), - endpoint='blablafn'), - dict(rule='/other/old/url/', - methods=set(['GET', 'HEAD']), - endpoint='bar'), - dict(rule='/other', - methods=set([]), - endpoint=None), - dict(rule='/other2', - methods=set([]), - endpoint=None)] - self.sample_content_hashes = { - 'blake2s256': ('791e07fcea240ade6dccd0a9309141673' - 'c31242cae9c237cf3855e151abc78e9'), - 'sha1': 'dc2830a9e72f23c1dfebef4413003221baa5fb62', - 'sha1_git': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1', - 'sha256': ('b5c7fe0536f44ef60c8780b6065d30bca74a5cd06' - 'd78a4a71ba1ad064770f0c9') - } - def test_filter_field_keys_dict_unknown_keys(self): - # when - actual_res = utils.filter_field_keys( - {'directory': 1, 'file': 2, 'link': 3}, - {'directory1', 'file2'}) - - # then - self.assertEqual(actual_res, {}) - - def test_filter_field_keys_dict(self): - # when - actual_res = utils.filter_field_keys( - {'directory': 1, 'file': 2, 'link': 3}, - {'directory', 'link'}) - - # then - self.assertEqual(actual_res, {'directory': 1, 'link': 3}) - - def test_filter_field_keys_list_unknown_keys(self): - # when - actual_res = utils.filter_field_keys( - [{'directory': 1, 'file': 2, 'link': 3}, - {'1': 1, '2': 2, 'link': 3}], - {'d'}) - - # then - self.assertEqual(actual_res, [{}, {}]) - - def test_filter_field_keys_map(self): - # when - actual_res = utils.filter_field_keys( - map(lambda x: {'i': x['i']+1, 'j': x['j']}, - [{'i': 1, 'j': None}, - {'i': 2, 'j': None}, - {'i': 3, 'j': None}]), - {'i'}) - - # then - self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}]) - - def test_filter_field_keys_list(self): - # when - actual_res = utils.filter_field_keys( - [{'directory': 1, 'file': 2, 'link': 3}, - {'dir': 1, 'fil': 2, 'lin': 3}], - {'directory', 'dir'}) - - # then - self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) - - def test_filter_field_keys_other(self): - # given - input_set = {1, 2} - - # when - actual_res = utils.filter_field_keys(input_set, {'a', '1'}) - - # then - self.assertEqual(actual_res, input_set) - - def test_person_to_string(self): - self.assertEqual(utils.person_to_string(dict(name='raboof', - email='foo@bar')), - 'raboof ') - - def test_enrich_release_0(self): - # when - actual_release = utils.enrich_release({}) - - # then - self.assertEqual(actual_release, {}) - - @patch('swh.web.api.utils.reverse') - def test_enrich_release_1(self, mock_django_reverse): - # given - - def reverse_test_context(view_name, url_args): - if view_name == 'api-1-content': - id = url_args['q'] - return '/api/1/content/%s/' % id - else: - raise ValueError( - 'This should not happened so fail if it does.') - - mock_django_reverse.side_effect = reverse_test_context - - # when - actual_release = utils.enrich_release({ - 'target': '123', - 'target_type': 'content', - 'author': { - 'id': 100, - 'name': 'author release name', - 'email': 'author@email', - }, - }) - - # then - self.assertEqual(actual_release, { - 'target': '123', - 'target_type': 'content', - 'target_url': '/api/1/content/sha1_git:123/', - 'author': { - 'id': 100, - 'name': 'author release name', - 'email': 'author@email', - }, - }) + +url_map = [ + { + 'rule': '/other/', + 'methods': set(['GET', 'POST', 'HEAD']), + 'endpoint': 'foo' + }, + { + 'rule': '/some/old/url/', + 'methods': set(['GET', 'POST']), + 'endpoint': 'blablafn' + }, + { + 'rule': '/other/old/url/', + 'methods': set(['GET', 'HEAD']), + 'endpoint': 'bar' + }, + { + 'rule': '/other', + 'methods': set([]), + 'endpoint': None + }, + { + 'rule': '/other2', + 'methods': set([]), + 'endpoint': None + } +] + +sample_content_hashes = { + 'blake2s256': ('791e07fcea240ade6dccd0a9309141673' + 'c31242cae9c237cf3855e151abc78e9'), + 'sha1': 'dc2830a9e72f23c1dfebef4413003221baa5fb62', + 'sha1_git': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1', + 'sha256': ('b5c7fe0536f44ef60c8780b6065d30bca74a5cd06' + 'd78a4a71ba1ad064770f0c9') +} + + +def test_filter_field_keys_dict_unknown_keys(): + actual_res = utils.filter_field_keys( + {'directory': 1, 'file': 2, 'link': 3}, + {'directory1', 'file2'}) + + assert actual_res == {} + + +def test_filter_field_keys_dict(): + actual_res = utils.filter_field_keys( + {'directory': 1, 'file': 2, 'link': 3}, + {'directory', 'link'}) + + assert actual_res == {'directory': 1, 'link': 3} + + +def test_filter_field_keys_list_unknown_keys(): + actual_res = utils.filter_field_keys( + [{'directory': 1, 'file': 2, 'link': 3}, + {'1': 1, '2': 2, 'link': 3}], {'d'}) + + assert actual_res == [{}, {}] + + +def test_filter_field_keys_map(): + actual_res = utils.filter_field_keys( + map(lambda x: {'i': x['i']+1, 'j': x['j']}, + [{'i': 1, 'j': None}, + {'i': 2, 'j': None}, + {'i': 3, 'j': None}]), {'i'}) + + assert list(actual_res) == [{'i': 2}, {'i': 3}, {'i': 4}] + + +def test_filter_field_keys_list(): + actual_res = utils.filter_field_keys( + [{'directory': 1, 'file': 2, 'link': 3}, + {'dir': 1, 'fil': 2, 'lin': 3}], + {'directory', 'dir'}) + + assert actual_res == [{'directory': 1}, {'dir': 1}] + + +def test_filter_field_keys_other(): + input_set = {1, 2} + + actual_res = utils.filter_field_keys(input_set, {'a', '1'}) + + assert actual_res == input_set + + +def test_person_to_string(): + assert utils.person_to_string({'name': 'raboof', + 'email': 'foo@bar'}) == 'raboof ' + + +def test_enrich_release_0(): + actual_release = utils.enrich_release({}) + + assert actual_release == {} + + +def test_enrich_release_1(mocker): + + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + + def reverse_test_context(view_name, url_args): + if view_name == 'api-1-content': + id = url_args['q'] + return '/api/1/content/%s/' % id + else: + raise ValueError('This should not happened so fail if it does.') + + mock_django_reverse.side_effect = reverse_test_context + + actual_release = utils.enrich_release({ + 'target': '123', + 'target_type': 'content', + 'author': { + 'id': 100, + 'name': 'author release name', + 'email': 'author@email', + }, + }) + + assert actual_release == { + 'target': '123', + 'target_type': 'content', + 'target_url': '/api/1/content/sha1_git:123/', + 'author': { + 'id': 100, + 'name': 'author release name', + 'email': 'author@email', + }, + } + + mock_django_reverse.assert_has_calls([ + mocker.call('api-1-content', url_args={'q': 'sha1_git:123'}), + ]) + + +def test_enrich_release_2(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.return_value = '/api/1/dir/23/' + + actual_release = utils.enrich_release({'target': '23', + 'target_type': 'directory'}) + + assert actual_release == { + 'target': '23', + 'target_type': 'directory', + 'target_url': '/api/1/dir/23/' + } + + mock_django_reverse.assert_called_once_with('api-1-directory', + url_args={'sha1_git': '23'}) + + +def test_enrich_release_3(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.return_value = '/api/1/rev/3/' + + actual_release = utils.enrich_release({'target': '3', + 'target_type': 'revision'}) + + assert actual_release == { + 'target': '3', + 'target_type': 'revision', + 'target_url': '/api/1/rev/3/' + } + + mock_django_reverse.assert_called_once_with('api-1-revision', + url_args={'sha1_git': '3'}) + + +def test_enrich_release_4(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.return_value = '/api/1/rev/4/' + + actual_release = utils.enrich_release({'target': '4', + 'target_type': 'release'}) + + assert actual_release == { + 'target': '4', + 'target_type': 'release', + 'target_url': '/api/1/rev/4/' + } + + mock_django_reverse.assert_called_once_with('api-1-release', + url_args={'sha1_git': '4'}) + + +def test_enrich_directory_no_type(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + assert utils.enrich_directory({'id': 'dir-id'}) == {'id': 'dir-id'} + + mock_django_reverse.return_value = '/api/content/sha1_git:123/' + + actual_directory = utils.enrich_directory({ + 'id': 'dir-id', + 'type': 'file', + 'target': '123', + }) + + assert actual_directory == { + 'id': 'dir-id', + 'type': 'file', + 'target': '123', + 'target_url': '/api/content/sha1_git:123/', + } + + mock_django_reverse.assert_called_once_with( + 'api-1-content', url_args={'q': 'sha1_git:123'}) + + +def test_enrich_directory_with_context_and_type_file(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.return_value = '/api/content/sha1_git:123/' + + actual_directory = utils.enrich_directory({ + 'id': 'dir-id', + 'type': 'file', + 'name': 'hy', + 'target': '789', + }, context_url='/api/revision/revsha1/directory/prefix/path/') + + assert actual_directory == { + 'id': 'dir-id', + 'type': 'file', + 'name': 'hy', + 'target': '789', + 'target_url': '/api/content/sha1_git:123/', + 'file_url': '/api/revision/revsha1/directory' + '/prefix/path/hy/' + } + + mock_django_reverse.assert_called_once_with( + 'api-1-content', url_args={'q': 'sha1_git:789'}) + + +def test_enrich_directory_with_context_and_type_dir(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.return_value = '/api/directory/456/' + + actual_directory = utils.enrich_directory({ + 'id': 'dir-id', + 'type': 'dir', + 'name': 'emacs-42', + 'target_type': 'file', + 'target': '456', + }, context_url='/api/revision/origin/2/directory/some/prefix/path/') + + assert actual_directory == { + 'id': 'dir-id', + 'type': 'dir', + 'target_type': 'file', + 'name': 'emacs-42', + 'target': '456', + 'target_url': '/api/directory/456/', + 'dir_url': '/api/revision/origin/2/directory' + '/some/prefix/path/emacs-42/' + } + + mock_django_reverse.assert_called_once_with('api-1-directory', + url_args={'sha1_git': '456'}) + + +def test_enrich_content_without_hashes(): + assert utils.enrich_content({'id': '123'}) == {'id': '123'} + + +def test_enrich_content_with_hashes(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + for algo, hash in sample_content_hashes.items(): + + query_string = '%s:%s' % (algo, hash) + + mock_django_reverse.side_effect = [ + '/api/content/%s/raw/' % query_string, + '/api/filetype/%s/' % query_string, + '/api/language/%s/' % query_string, + '/api/license/%s/' % query_string + ] + + enriched_content = utils.enrich_content({algo: hash}, + query_string=query_string) + + assert enriched_content == { + algo: hash, + 'data_url': '/api/content/%s/raw/' % query_string, + 'filetype_url': '/api/filetype/%s/' % query_string, + 'language_url': '/api/language/%s/' % query_string, + 'license_url': '/api/license/%s/' % query_string, + } mock_django_reverse.assert_has_calls([ - call('api-1-content', url_args={'q': 'sha1_git:123'}), + mocker.call('api-1-content-raw', url_args={'q': query_string}), + mocker.call('api-1-content-filetype', + url_args={'q': query_string}), + mocker.call('api-1-content-language', + url_args={'q': query_string}), + mocker.call('api-1-content-license', + url_args={'q': query_string}), ]) - @patch('swh.web.api.utils.reverse') - def test_enrich_release_2(self, mock_django_reverse): - # given - mock_django_reverse.return_value = '/api/1/dir/23/' - - # when - actual_release = utils.enrich_release({'target': '23', - 'target_type': 'directory'}) - - # then - self.assertEqual(actual_release, { - 'target': '23', - 'target_type': 'directory', - 'target_url': '/api/1/dir/23/' - }) - - mock_django_reverse.assert_called_once_with('api-1-directory', - url_args={'sha1_git': '23'}) # noqa - - @patch('swh.web.api.utils.reverse') - def test_enrich_release_3(self, mock_django_reverse): - # given - mock_django_reverse.return_value = '/api/1/rev/3/' - - # when - actual_release = utils.enrich_release({'target': '3', - 'target_type': 'revision'}) - - # then - self.assertEqual(actual_release, { - 'target': '3', - 'target_type': 'revision', - 'target_url': '/api/1/rev/3/' - }) - - mock_django_reverse.assert_called_once_with('api-1-revision', - url_args={'sha1_git': '3'}) - - @patch('swh.web.api.utils.reverse') - def test_enrich_release_4(self, mock_django_reverse): - # given - mock_django_reverse.return_value = '/api/1/rev/4/' - - # when - actual_release = utils.enrich_release({'target': '4', - 'target_type': 'release'}) - - # then - self.assertEqual(actual_release, { - 'target': '4', - 'target_type': 'release', - 'target_url': '/api/1/rev/4/' - }) - - mock_django_reverse.assert_called_once_with('api-1-release', - url_args={'sha1_git': '4'}) - - @patch('swh.web.api.utils.reverse') - def test_enrich_directory_no_type(self, mock_django_reverse): - # when/then - self.assertEqual(utils.enrich_directory({'id': 'dir-id'}), - {'id': 'dir-id'}) - - # given - mock_django_reverse.return_value = '/api/content/sha1_git:123/' - - # when - actual_directory = utils.enrich_directory({ - 'id': 'dir-id', - 'type': 'file', - 'target': '123', - }) - - # then - self.assertEqual(actual_directory, { - 'id': 'dir-id', - 'type': 'file', - 'target': '123', - 'target_url': '/api/content/sha1_git:123/', - }) - - mock_django_reverse.assert_called_once_with( - 'api-1-content', url_args={'q': 'sha1_git:123'}) - - @patch('swh.web.api.utils.reverse') - def test_enrich_directory_with_context_and_type_file( - self, mock_django_reverse, - ): - # given - mock_django_reverse.return_value = '/api/content/sha1_git:123/' - - # when - actual_directory = utils.enrich_directory({ - 'id': 'dir-id', - 'type': 'file', - 'name': 'hy', - 'target': '789', - }, context_url='/api/revision/revsha1/directory/prefix/path/') - - # then - self.assertEqual(actual_directory, { - 'id': 'dir-id', - 'type': 'file', - 'name': 'hy', - 'target': '789', - 'target_url': '/api/content/sha1_git:123/', - 'file_url': '/api/revision/revsha1/directory' - '/prefix/path/hy/' - }) - - mock_django_reverse.assert_called_once_with( - 'api-1-content', url_args={'q': 'sha1_git:789'}) - - @patch('swh.web.api.utils.reverse') - def test_enrich_directory_with_context_and_type_dir( - self, mock_django_reverse, - ): - # given - mock_django_reverse.return_value = '/api/directory/456/' - - # when - actual_directory = utils.enrich_directory({ - 'id': 'dir-id', - 'type': 'dir', - 'name': 'emacs-42', - 'target_type': 'file', - 'target': '456', - }, context_url='/api/revision/origin/2/directory/some/prefix/path/') - - # then - self.assertEqual(actual_directory, { - 'id': 'dir-id', - 'type': 'dir', - 'target_type': 'file', - 'name': 'emacs-42', - 'target': '456', - 'target_url': '/api/directory/456/', - 'dir_url': '/api/revision/origin/2/directory' - '/some/prefix/path/emacs-42/' - }) - - mock_django_reverse.assert_called_once_with('api-1-directory', - url_args={'sha1_git': '456'}) # noqa - - def test_enrich_content_without_hashes(self): - # when/then - self.assertEqual(utils.enrich_content({'id': '123'}), - {'id': '123'}) - - @patch('swh.web.api.utils.reverse') - def test_enrich_content_with_hashes(self, mock_django_reverse): - - for algo, hash in self.sample_content_hashes.items(): - - query_string = '%s:%s' % (algo, hash) - - # given - mock_django_reverse.side_effect = [ - '/api/content/%s/raw/' % query_string, - '/api/filetype/%s/' % query_string, - '/api/language/%s/' % query_string, - '/api/license/%s/' % query_string - ] - - # when - enriched_content = utils.enrich_content( - { - algo: hash, - }, - query_string=query_string - ) - - # then - self.assertEqual( - enriched_content, - { - algo: hash, - 'data_url': '/api/content/%s/raw/' % query_string, - 'filetype_url': '/api/filetype/%s/' % query_string, - 'language_url': '/api/language/%s/' % query_string, - 'license_url': '/api/license/%s/' % query_string, - } - ) - - mock_django_reverse.assert_has_calls([ - call('api-1-content-raw', url_args={'q': query_string}), - call('api-1-content-filetype', url_args={'q': query_string}), - call('api-1-content-language', url_args={'q': query_string}), - call('api-1-content-license', url_args={'q': query_string}), - ]) - - mock_django_reverse.reset() - - @patch('swh.web.api.utils.reverse') - def test_enrich_content_with_hashes_and_top_level_url(self, - mock_django_reverse): - - for algo, hash in self.sample_content_hashes.items(): - - query_string = '%s:%s' % (algo, hash) - - # given - mock_django_reverse.side_effect = [ - '/api/content/%s/' % query_string, - '/api/content/%s/raw/' % query_string, - '/api/filetype/%s/' % query_string, - '/api/language/%s/' % query_string, - '/api/license/%s/' % query_string, - ] - - # when - enriched_content = utils.enrich_content( - { - algo: hash - }, - top_url=True, - query_string=query_string - ) - - # then - self.assertEqual( - enriched_content, - { - algo: hash, - 'content_url': '/api/content/%s/' % query_string, - 'data_url': '/api/content/%s/raw/' % query_string, - 'filetype_url': '/api/filetype/%s/' % query_string, - 'language_url': '/api/language/%s/' % query_string, - 'license_url': '/api/license/%s/' % query_string, - } - ) - - mock_django_reverse.assert_has_calls([ - call('api-1-content', url_args={'q': query_string}), - call('api-1-content-raw', url_args={'q': query_string}), - call('api-1-content-filetype', url_args={'q': query_string}), - call('api-1-content-language', url_args={'q': query_string}), - call('api-1-content-license', url_args={'q': query_string}), - ]) - - mock_django_reverse.reset() - - def _reverse_context_test(self, view_name, url_args): - if view_name == 'api-1-revision': - return '/api/revision/%s/' % url_args['sha1_git'] - elif view_name == 'api-1-revision-context': - return '/api/revision/%s/prev/%s/' % (url_args['sha1_git'], url_args['context']) # noqa - elif view_name == 'api-1-revision-log': - if 'prev_sha1s' in url_args: - return '/api/revision/%s/prev/%s/log/' % (url_args['sha1_git'], url_args['prev_sha1s']) # noqa - else: - return '/api/revision/%s/log/' % url_args['sha1_git'] - - @patch('swh.web.api.utils.reverse') - def test_enrich_revision_without_children_or_parent( - self, mock_django_reverse, - ): - # given - def reverse_test(view_name, url_args): - if view_name == 'api-1-revision': - return '/api/revision/' + url_args['sha1_git'] + '/' - elif view_name == 'api-1-revision-log': - return '/api/revision/' + url_args['sha1_git'] + '/log/' - elif view_name == 'api-1-directory': - return '/api/directory/' + url_args['sha1_git'] + '/' - - mock_django_reverse.side_effect = reverse_test - - # when - actual_revision = utils.enrich_revision({ - 'id': 'rev-id', - 'directory': '123', - 'author': {'id': '1'}, - 'committer': {'id': '2'}, - }) - - expected_revision = { - 'id': 'rev-id', - 'directory': '123', - 'url': '/api/revision/rev-id/', - 'history_url': '/api/revision/rev-id/log/', - 'directory_url': '/api/directory/123/', - 'author': {'id': '1'}, - 'committer': {'id': '2'}, - } + mock_django_reverse.reset() - # then - self.assertEqual(actual_revision, expected_revision) - - mock_django_reverse.assert_has_calls( - [call('api-1-revision', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), - call('api-1-directory', url_args={'sha1_git': '123'})]) - - @patch('swh.web.api.utils.reverse') - def test_enrich_revision_with_children_and_parent_no_dir( - self, mock_django_reverse, - ): - # given - mock_django_reverse.side_effect = self._reverse_context_test - - # when - actual_revision = utils.enrich_revision({ - 'id': 'rev-id', - 'parents': ['123'], - 'children': ['456'], - }) - - expected_revision = { - 'id': 'rev-id', - 'url': '/api/revision/rev-id/', - 'history_url': '/api/revision/rev-id/log/', - 'parents': [{'id': '123', 'url': '/api/revision/123/'}], - 'children': ['456'], - 'children_urls': ['/api/revision/456/'], - } - # then - self.assertEqual(actual_revision, expected_revision) - - mock_django_reverse.assert_has_calls( - [call('api-1-revision', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision', url_args={'sha1_git': '123'}), - call('api-1-revision', url_args={'sha1_git': '456'})]) - - @patch('swh.web.api.utils.reverse') - def test_enrich_revision_no_context(self, mock_django_reverse): - # given - mock_django_reverse.side_effect = self._reverse_context_test - - # when - actual_revision = utils.enrich_revision({ - 'id': 'rev-id', - 'parents': ['123'], - 'children': ['456'], - }) - - expected_revision = { - 'id': 'rev-id', - 'url': '/api/revision/rev-id/', - 'history_url': '/api/revision/rev-id/log/', - 'parents': [{'id': '123', 'url': '/api/revision/123/'}], - 'children': ['456'], - 'children_urls': ['/api/revision/456/'] +def test_enrich_content_with_hashes_and_top_level_url(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + for algo, hash in sample_content_hashes.items(): + + query_string = '%s:%s' % (algo, hash) + + mock_django_reverse.side_effect = [ + '/api/content/%s/' % query_string, + '/api/content/%s/raw/' % query_string, + '/api/filetype/%s/' % query_string, + '/api/language/%s/' % query_string, + '/api/license/%s/' % query_string, + ] + + enriched_content = utils.enrich_content({algo: hash}, top_url=True, + query_string=query_string) + + assert enriched_content == { + algo: hash, + 'content_url': '/api/content/%s/' % query_string, + 'data_url': '/api/content/%s/raw/' % query_string, + 'filetype_url': '/api/filetype/%s/' % query_string, + 'language_url': '/api/language/%s/' % query_string, + 'license_url': '/api/license/%s/' % query_string, } - # then - self.assertEqual(actual_revision, expected_revision) + mock_django_reverse.assert_has_calls([ + mocker.call('api-1-content', url_args={'q': query_string}), + mocker.call('api-1-content-raw', url_args={'q': query_string}), + mocker.call('api-1-content-filetype', + url_args={'q': query_string}), + mocker.call('api-1-content-language', + url_args={'q': query_string}), + mocker.call('api-1-content-license', url_args={'q': query_string}), + ]) + + mock_django_reverse.reset() - mock_django_reverse.assert_has_calls( - [call('api-1-revision', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision', url_args={'sha1_git': '123'}), - call('api-1-revision', url_args={'sha1_git': '456'})]) - def _reverse_rev_message_test(self, view_name, url_args): - if view_name == 'api-1-revision': - return '/api/revision/%s/' % url_args['sha1_git'] - elif view_name == 'api-1-revision-log': - if 'prev_sha1s' in url_args and url_args['prev_sha1s'] is not None: - return '/api/revision/%s/prev/%s/log/' % (url_args['sha1_git'], url_args['prev_sha1s']) # noqa - else: - return '/api/revision/%s/log/' % url_args['sha1_git'] - elif view_name == 'api-1-revision-raw-message': - return '/api/revision/' + url_args['sha1_git'] + '/raw/' +def _reverse_context_test(view_name, url_args): + if view_name == 'api-1-revision': + return '/api/revision/%s/' % url_args['sha1_git'] + elif view_name == 'api-1-revision-context': + return ('/api/revision/%s/prev/%s/' % + (url_args['sha1_git'], url_args['context'])) + elif view_name == 'api-1-revision-log': + if 'prev_sha1s' in url_args: + return ('/api/revision/%s/prev/%s/log/' % + (url_args['sha1_git'], url_args['prev_sha1s'])) else: - return '/api/revision/%s/prev/%s/' % (url_args['sha1_git'], url_args['context']) # noqa - - @patch('swh.web.api.utils.reverse') - def test_enrich_revision_with_no_message(self, mock_django_reverse): - # given - mock_django_reverse.side_effect = self._reverse_rev_message_test - - # when - expected_revision = { - 'id': 'rev-id', - 'url': '/api/revision/rev-id/', - 'history_url': '/api/revision/rev-id/log/', - 'message': None, - 'parents': [{'id': '123', 'url': '/api/revision/123/'}], - 'children': ['456'], - 'children_urls': ['/api/revision/456/'], - } + return '/api/revision/%s/log/' % url_args['sha1_git'] - actual_revision = utils.enrich_revision({ - 'id': 'rev-id', - 'message': None, - 'parents': ['123'], - 'children': ['456'], - }) - - # then - self.assertEqual(actual_revision, expected_revision) - - mock_django_reverse.assert_has_calls( - [call('api-1-revision', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision', url_args={'sha1_git': '123'}), - call('api-1-revision', url_args={'sha1_git': '456'})] - ) - - @patch('swh.web.api.utils.reverse') - def test_enrich_revision_with_invalid_message(self, mock_django_reverse): - # given - mock_django_reverse.side_effect = self._reverse_rev_message_test - - # when - actual_revision = utils.enrich_revision({ - 'id': 'rev-id', - 'message': None, - 'message_decoding_failed': True, - 'parents': ['123'], - 'children': ['456'], - }) - - expected_revision = { - 'id': 'rev-id', - 'url': '/api/revision/rev-id/', - 'history_url': '/api/revision/rev-id/log/', - 'message': None, - 'message_decoding_failed': True, - 'message_url': '/api/revision/rev-id/raw/', - 'parents': [{'id': '123', 'url': '/api/revision/123/'}], - 'children': ['456'], - 'children_urls': ['/api/revision/456/'], - } - # then - self.assertEqual(actual_revision, expected_revision) +def test_enrich_revision_without_children_or_parent(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') - mock_django_reverse.assert_has_calls( - [call('api-1-revision', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), - call('api-1-revision', url_args={'sha1_git': '123'}), - call('api-1-revision', url_args={'sha1_git': '456'}), - call('api-1-revision-raw-message', url_args={'sha1_git': 'rev-id'})]) # noqa + def reverse_test(view_name, url_args): + if view_name == 'api-1-revision': + return '/api/revision/' + url_args['sha1_git'] + '/' + elif view_name == 'api-1-revision-log': + return '/api/revision/' + url_args['sha1_git'] + '/log/' + elif view_name == 'api-1-directory': + return '/api/directory/' + url_args['sha1_git'] + '/' + + mock_django_reverse.side_effect = reverse_test + + actual_revision = utils.enrich_revision({ + 'id': 'rev-id', + 'directory': '123', + 'author': {'id': '1'}, + 'committer': {'id': '2'}, + }) + + expected_revision = { + 'id': 'rev-id', + 'directory': '123', + 'url': '/api/revision/rev-id/', + 'history_url': '/api/revision/rev-id/log/', + 'directory_url': '/api/directory/123/', + 'author': {'id': '1'}, + 'committer': {'id': '2'}, + } + + assert actual_revision == expected_revision + + mock_django_reverse.assert_has_calls([ + mocker.call('api-1-revision', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-directory', url_args={'sha1_git': '123'}) + ]) + + +def test_enrich_revision_with_children_and_parent_no_dir(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.side_effect = _reverse_context_test + + actual_revision = utils.enrich_revision({ + 'id': 'rev-id', + 'parents': ['123'], + 'children': ['456'], + }) + + expected_revision = { + 'id': 'rev-id', + 'url': '/api/revision/rev-id/', + 'history_url': '/api/revision/rev-id/log/', + 'parents': [{'id': '123', 'url': '/api/revision/123/'}], + 'children': ['456'], + 'children_urls': ['/api/revision/456/'], + } + + assert actual_revision == expected_revision + + mock_django_reverse.assert_has_calls([ + mocker.call('api-1-revision', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision', url_args={'sha1_git': '123'}), + mocker.call('api-1-revision', url_args={'sha1_git': '456'}) + ]) + + +def test_enrich_revision_no_context(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.side_effect = _reverse_context_test + + actual_revision = utils.enrich_revision({ + 'id': 'rev-id', + 'parents': ['123'], + 'children': ['456'], + }) + + expected_revision = { + 'id': 'rev-id', + 'url': '/api/revision/rev-id/', + 'history_url': '/api/revision/rev-id/log/', + 'parents': [{'id': '123', 'url': '/api/revision/123/'}], + 'children': ['456'], + 'children_urls': ['/api/revision/456/'] + } + + assert actual_revision == expected_revision + + mock_django_reverse.assert_has_calls([ + mocker.call('api-1-revision', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision', url_args={'sha1_git': '123'}), + mocker.call('api-1-revision', url_args={'sha1_git': '456'}) + ]) + + +def _reverse_rev_message_test(view_name, url_args): + if view_name == 'api-1-revision': + return '/api/revision/%s/' % url_args['sha1_git'] + elif view_name == 'api-1-revision-log': + if 'prev_sha1s' in url_args and url_args['prev_sha1s'] is not None: + return ('/api/revision/%s/prev/%s/log/' % + (url_args['sha1_git'], url_args['prev_sha1s'])) + else: + return '/api/revision/%s/log/' % url_args['sha1_git'] + elif view_name == 'api-1-revision-raw-message': + return '/api/revision/' + url_args['sha1_git'] + '/raw/' + else: + return ('/api/revision/%s/prev/%s/' % + (url_args['sha1_git'], url_args['context'])) + + +def test_enrich_revision_with_no_message(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.side_effect = _reverse_rev_message_test + + expected_revision = { + 'id': 'rev-id', + 'url': '/api/revision/rev-id/', + 'history_url': '/api/revision/rev-id/log/', + 'message': None, + 'parents': [{'id': '123', 'url': '/api/revision/123/'}], + 'children': ['456'], + 'children_urls': ['/api/revision/456/'], + } + + actual_revision = utils.enrich_revision({ + 'id': 'rev-id', + 'message': None, + 'parents': ['123'], + 'children': ['456'], + }) + + assert actual_revision == expected_revision + + mock_django_reverse.assert_has_calls([ + mocker.call('api-1-revision', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision', url_args={'sha1_git': '123'}), + mocker.call('api-1-revision', url_args={'sha1_git': '456'}) + ]) + + +def test_enrich_revision_with_invalid_message(mocker): + mock_django_reverse = mocker.patch('swh.web.api.utils.reverse') + mock_django_reverse.side_effect = _reverse_rev_message_test + + actual_revision = utils.enrich_revision({ + 'id': 'rev-id', + 'message': None, + 'message_decoding_failed': True, + 'parents': ['123'], + 'children': ['456'], + }) + + expected_revision = { + 'id': 'rev-id', + 'url': '/api/revision/rev-id/', + 'history_url': '/api/revision/rev-id/log/', + 'message': None, + 'message_decoding_failed': True, + 'message_url': '/api/revision/rev-id/raw/', + 'parents': [{'id': '123', 'url': '/api/revision/123/'}], + 'children': ['456'], + 'children_urls': ['/api/revision/456/'], + } + + assert actual_revision == expected_revision + + mock_django_reverse.assert_has_calls([ + mocker.call('api-1-revision', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision-log', url_args={'sha1_git': 'rev-id'}), + mocker.call('api-1-revision', url_args={'sha1_git': '123'}), + mocker.call('api-1-revision', url_args={'sha1_git': '456'}), + mocker.call('api-1-revision-raw-message', + url_args={'sha1_git': 'rev-id'}) + ]) diff --git a/swh/web/tests/api/views/test_content.py b/swh/web/tests/api/views/test_content.py --- a/swh/web/tests/api/views/test_content.py +++ b/swh/web/tests/api/views/test_content.py @@ -6,386 +6,375 @@ import pytest from hypothesis import given -from rest_framework.test import APITestCase from swh.web.common.utils import reverse from swh.web.tests.data import random_content -from swh.web.tests.strategies import ( - content, contents_with_ctags -) -from swh.web.tests.testcase import ( - WebTestCase, ctags_json_missing, fossology_missing -) - - -class ContentApiTestCase(WebTestCase, APITestCase): - - @given(content()) - def test_api_content_filetype(self, content): - - self.content_add_mimetype(content['sha1']) - url = reverse('api-1-content-filetype', - url_args={'q': 'sha1_git:%s' % content['sha1_git']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - content_url = reverse('api-1-content', - url_args={'q': 'sha1:%s' % content['sha1']}) - expected_data = self.content_get_mimetype(content['sha1']) - expected_data['content_url'] = content_url - self.assertEqual(rv.data, expected_data) - - def test_api_content_filetype_sha_not_found(self): - unknown_content_ = random_content() - - url = reverse('api-1-content-filetype', - url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'No filetype information found for content ' - 'sha1:%s.' % unknown_content_['sha1'] - }) - - @pytest.mark.xfail # Language indexer is disabled - @given(content()) - def test_api_content_language(self, content): - - self.content_add_language(content['sha1']) - url = reverse('api-1-content-language', - url_args={'q': 'sha1_git:%s' % content['sha1_git']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - content_url = reverse('api-1-content', - url_args={'q': 'sha1:%s' % content['sha1']}) - expected_data = self.content_get_language(content['sha1']) - expected_data['content_url'] = content_url - self.assertEqual(rv.data, expected_data) - - def test_api_content_language_sha_not_found(self): - unknown_content_ = random_content() - - url = reverse('api-1-content-language', - url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'No language information found for content ' - 'sha1:%s.' % unknown_content_['sha1'] - }) - - @pytest.mark.xfail # Language indexer is disabled - @pytest.mark.skipif(ctags_json_missing, - reason="requires ctags with json output support") - @given(contents_with_ctags()) - def test_api_content_symbol(self, contents_with_ctags): - - expected_data = {} - for content_sha1 in contents_with_ctags['sha1s']: - self.content_add_ctags(content_sha1) - for ctag in self.content_get_ctags(content_sha1): - if ctag['name'] == contents_with_ctags['symbol_name']: - expected_data[content_sha1] = ctag - break - url = reverse('api-1-content-symbol', - url_args={'q': contents_with_ctags['symbol_name']}, - query_params={'per_page': 100}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - for entry in rv.data: - content_sha1 = entry['sha1'] - expected_entry = expected_data[content_sha1] - for key, view_name in (('content_url', 'api-1-content'), - ('data_url', 'api-1-content-raw'), - ('license_url', 'api-1-content-license'), - ('language_url', 'api-1-content-language'), - ('filetype_url', 'api-1-content-filetype')): - expected_entry[key] = reverse(view_name, - url_args={'q': 'sha1:%s' % - content_sha1}) - expected_entry['sha1'] = content_sha1 - del expected_entry['id'] - self.assertEqual(entry, expected_entry) - self.assertFalse('Link' in rv) - - url = reverse('api-1-content-symbol', - url_args={'q': contents_with_ctags['symbol_name']}, - query_params={'per_page': 2}) - rv = self.client.get(url) - - next_url = reverse('api-1-content-symbol', - url_args={'q': contents_with_ctags['symbol_name']}, - query_params={'last_sha1': rv.data[1]['sha1'], - 'per_page': 2}) - self.assertEqual(rv['Link'], '<%s>; rel="next"' % next_url) - - def test_api_content_symbol_not_found(self): - - url = reverse('api-1-content-symbol', url_args={'q': 'bar'}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'No indexed raw content match expression \'bar\'.' - }) - self.assertFalse('Link' in rv) - - @pytest.mark.skipif(ctags_json_missing, - reason="requires ctags with json output support") - @given(content()) - def test_api_content_ctags(self, content): - - self.content_add_ctags(content['sha1']) - url = reverse('api-1-content-ctags', - url_args={'q': 'sha1_git:%s' % content['sha1_git']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - content_url = reverse('api-1-content', - url_args={'q': 'sha1:%s' % content['sha1']}) - expected_data = list(self.content_get_ctags(content['sha1'])) - for e in expected_data: - e['content_url'] = content_url - self.assertEqual(rv.data, expected_data) - - @pytest.mark.skipif(fossology_missing, - reason="requires fossology-nomossa installed") - @given(content()) - def test_api_content_license(self, content): - - self.content_add_license(content['sha1']) - url = reverse('api-1-content-license', - url_args={'q': 'sha1_git:%s' % content['sha1_git']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - content_url = reverse('api-1-content', - url_args={'q': 'sha1:%s' % content['sha1']}) - expected_data = self.content_get_license(content['sha1']) - expected_data['content_url'] = content_url - self.assertEqual(rv.data, expected_data) - - def test_api_content_license_sha_not_found(self): - unknown_content_ = random_content() - - url = reverse('api-1-content-license', - url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'No license information found for content ' - 'sha1:%s.' % unknown_content_['sha1'] - }) - - @given(content()) - def test_api_content_metadata(self, content): - - url = reverse('api-1-content', {'q': 'sha1:%s' % content['sha1']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = self.content_get_metadata(content['sha1']) - for key, view_name in (('data_url', 'api-1-content-raw'), +from swh.web.tests.strategies import content, contents_with_ctags +from swh.web.tests.conftest import ctags_json_missing, fossology_missing + + +@given(content()) +def test_api_content_filetype(api_client, indexer_data, content): + indexer_data.content_add_mimetype(content['sha1']) + url = reverse('api-1-content-filetype', + url_args={'q': 'sha1_git:%s' % content['sha1_git']}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + content_url = reverse('api-1-content', + url_args={'q': 'sha1:%s' % content['sha1']}) + expected_data = indexer_data.content_get_mimetype(content['sha1']) + expected_data['content_url'] = content_url + assert rv.data == expected_data + + +def test_api_content_filetype_sha_not_found(api_client): + unknown_content_ = random_content() + + url = reverse('api-1-content-filetype', + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) + rv = api_client.get(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'No filetype information found for content ' + 'sha1:%s.' % unknown_content_['sha1'] + } + + +@pytest.mark.skip # Language indexer is disabled +@given(content()) +def test_api_content_language(api_client, indexer_data, content): + indexer_data.content_add_language(content['sha1']) + url = reverse('api-1-content-language', + url_args={'q': 'sha1_git:%s' % content['sha1_git']}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + content_url = reverse('api-1-content', + url_args={'q': 'sha1:%s' % content['sha1']}) + expected_data = indexer_data.content_get_language(content['sha1']) + expected_data['content_url'] = content_url + assert rv.data == expected_data + + +def test_api_content_language_sha_not_found(api_client): + unknown_content_ = random_content() + + url = reverse('api-1-content-language', + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) + rv = api_client.get(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'No language information found for content ' + 'sha1:%s.' % unknown_content_['sha1'] + } + + +@pytest.mark.skip # Language indexer is disabled +@pytest.mark.skipif(ctags_json_missing, + reason="requires ctags with json output support") +@given(contents_with_ctags()) +def test_api_content_symbol(api_client, indexer_data, contents_with_ctags): + expected_data = {} + for content_sha1 in contents_with_ctags['sha1s']: + indexer_data.content_add_ctags(content_sha1) + for ctag in indexer_data.content_get_ctags(content_sha1): + if ctag['name'] == contents_with_ctags['symbol_name']: + expected_data[content_sha1] = ctag + break + url = reverse('api-1-content-symbol', + url_args={'q': contents_with_ctags['symbol_name']}, + query_params={'per_page': 100}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + for entry in rv.data: + content_sha1 = entry['sha1'] + expected_entry = expected_data[content_sha1] + for key, view_name in (('content_url', 'api-1-content'), + ('data_url', 'api-1-content-raw'), ('license_url', 'api-1-content-license'), ('language_url', 'api-1-content-language'), ('filetype_url', 'api-1-content-filetype')): - expected_data[key] = reverse(view_name, - url_args={'q': 'sha1:%s' % - content['sha1']}) - self.assertEqual(rv.data, expected_data) - - def test_api_content_not_found_as_json(self): - unknown_content_ = random_content() - - url = reverse('api-1-content', - url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Content with sha1 checksum equals to %s not found!' - % unknown_content_['sha1'] - }) - - def test_api_content_not_found_as_yaml(self): - unknown_content_ = random_content() - - url = reverse('api-1-content', - url_args={'q': 'sha256:%s' % unknown_content_['sha256']}) - rv = self.client.get(url, HTTP_ACCEPT='application/yaml') - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertTrue('application/yaml' in rv['Content-Type']) - - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Content with sha256 checksum equals to %s not found!' % - unknown_content_['sha256'] - }) - - def test_api_content_raw_ko_not_found(self): - unknown_content_ = random_content() - - url = reverse('api-1-content-raw', - url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Content with sha1 checksum equals to %s not found!' % - unknown_content_['sha1'] - }) - - @given(content()) - def test_api_content_raw_text(self, content): - - url = reverse('api-1-content-raw', - url_args={'q': 'sha1:%s' % content['sha1']}) - - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200) - self.assertEqual(rv['Content-Type'], 'application/octet-stream') - self.assertEqual( - rv['Content-disposition'], - 'attachment; filename=content_sha1_%s_raw' % content['sha1']) - self.assertEqual( - rv['Content-Type'], 'application/octet-stream') - expected_data = self.content_get(content['sha1']) - self.assertEqual(rv.content, expected_data['data']) - - @given(content()) - def test_api_content_raw_text_with_filename(self, content): - - url = reverse('api-1-content-raw', - url_args={'q': 'sha1:%s' % content['sha1']}, - query_params={'filename': 'filename.txt'}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200) - self.assertEqual(rv['Content-Type'], 'application/octet-stream') - self.assertEqual( - rv['Content-disposition'], - 'attachment; filename=filename.txt') - self.assertEqual( - rv['Content-Type'], 'application/octet-stream') - expected_data = self.content_get(content['sha1']) - self.assertEqual(rv.content, expected_data['data']) - - @given(content()) - def test_api_check_content_known(self, content): - - url = reverse('api-1-content-known', - url_args={'q': content['sha1']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - - self.assertEqual(rv.data, { - 'search_res': [ - { - 'found': True, - 'sha1': content['sha1'] - } - ], - 'search_stats': {'nbfiles': 1, 'pct': 100.0} - }) - - @given(content()) - def test_api_check_content_known_as_yaml(self, content): - - url = reverse('api-1-content-known', - url_args={'q': content['sha1']}) - rv = self.client.get(url, HTTP_ACCEPT='application/yaml') - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/yaml') - - self.assertEqual(rv.data, { - 'search_res': [ - { - 'found': True, - 'sha1': content['sha1'] - } - ], - 'search_stats': {'nbfiles': 1, 'pct': 100.0} - }) - - @given(content()) - def test_api_check_content_known_post_as_yaml(self, content): - - url = reverse('api-1-content-known') - rv = self.client.post( - url, - data={ - 'q': content['sha1'] - }, - HTTP_ACCEPT='application/yaml' - ) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertTrue('application/yaml' in rv['Content-Type']) - self.assertEqual(rv.data, { - 'search_res': [ - { - 'found': True, - 'sha1': content['sha1'] - } - ], - 'search_stats': {'nbfiles': 1, 'pct': 100.0} - }) - - def test_api_check_content_known_not_found(self): - unknown_content_ = random_content() - - url = reverse('api-1-content-known', - url_args={'q': unknown_content_['sha1']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'search_res': [ - { - 'found': False, - 'sha1': unknown_content_['sha1'] - } - ], - 'search_stats': {'nbfiles': 1, 'pct': 0.0} - }) - - @given(content()) - def test_api_content_uppercase(self, content): - url = reverse('api-1-content-uppercase-checksum', - url_args={'q': content['sha1'].upper()}) - - resp = self.client.get(url) - self.assertEqual(resp.status_code, 302) - - redirect_url = reverse('api-1-content', - url_args={'q': content['sha1']}) - - self.assertEqual(resp['location'], redirect_url) + expected_entry[key] = reverse( + view_name, url_args={'q': 'sha1:%s' % content_sha1}) + expected_entry['sha1'] = content_sha1 + del expected_entry['id'] + assert entry == expected_entry + assert 'Link' not in rv + + url = reverse('api-1-content-symbol', + url_args={'q': contents_with_ctags['symbol_name']}, + query_params={'per_page': 2}) + rv = api_client.get(url) + + next_url = reverse('api-1-content-symbol', + url_args={'q': contents_with_ctags['symbol_name']}, + query_params={'last_sha1': rv.data[1]['sha1'], + 'per_page': 2}) + assert rv['Link'] == '<%s>; rel="next"' % next_url + + +def test_api_content_symbol_not_found(api_client): + url = reverse('api-1-content-symbol', url_args={'q': 'bar'}) + rv = api_client.get(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'No indexed raw content match expression \'bar\'.' + } + assert 'Link' not in rv + + +@pytest.mark.skipif(ctags_json_missing, + reason="requires ctags with json output support") +@given(content()) +def test_api_content_ctags(api_client, indexer_data, content): + indexer_data.content_add_ctags(content['sha1']) + url = reverse('api-1-content-ctags', + url_args={'q': 'sha1_git:%s' % content['sha1_git']}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + content_url = reverse('api-1-content', + url_args={'q': 'sha1:%s' % content['sha1']}) + expected_data = list(indexer_data.content_get_ctags(content['sha1'])) + for e in expected_data: + e['content_url'] = content_url + assert rv.data == expected_data + + +@pytest.mark.skipif(fossology_missing, + reason="requires fossology-nomossa installed") +@given(content()) +def test_api_content_license(api_client, indexer_data, content): + indexer_data.content_add_license(content['sha1']) + url = reverse('api-1-content-license', + url_args={'q': 'sha1_git:%s' % content['sha1_git']}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + content_url = reverse('api-1-content', + url_args={'q': 'sha1:%s' % content['sha1']}) + expected_data = indexer_data.content_get_license(content['sha1']) + expected_data['content_url'] = content_url + assert rv.data == expected_data + + +def test_api_content_license_sha_not_found(api_client): + unknown_content_ = random_content() + + url = reverse('api-1-content-license', + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) + rv = api_client.get(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'No license information found for content ' + 'sha1:%s.' % unknown_content_['sha1'] + } + + +@given(content()) +def test_api_content_metadata(api_client, archive_data, content): + url = reverse('api-1-content', {'q': 'sha1:%s' % content['sha1']}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + expected_data = archive_data.content_get_metadata(content['sha1']) + for key, view_name in (('data_url', 'api-1-content-raw'), + ('license_url', 'api-1-content-license'), + ('language_url', 'api-1-content-language'), + ('filetype_url', 'api-1-content-filetype')): + expected_data[key] = reverse( + view_name, url_args={'q': 'sha1:%s' % content['sha1']}) + assert rv.data == expected_data + + +def test_api_content_not_found_as_json(api_client): + unknown_content_ = random_content() + + url = reverse('api-1-content', + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) + rv = api_client.get(url) + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Content with sha1 checksum equals to %s not found!' + % unknown_content_['sha1'] + } + + +def test_api_content_not_found_as_yaml(api_client): + unknown_content_ = random_content() + + url = reverse('api-1-content', + url_args={'q': 'sha256:%s' % unknown_content_['sha256']}) + rv = api_client.get(url, HTTP_ACCEPT='application/yaml') + + assert rv.status_code == 404, rv.data + assert 'application/yaml' in rv['Content-Type'] + + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Content with sha256 checksum equals to %s not found!' % + unknown_content_['sha256'] + } + + +def test_api_content_raw_ko_not_found(api_client): + unknown_content_ = random_content() + + url = reverse('api-1-content-raw', + url_args={'q': 'sha1:%s' % unknown_content_['sha1']}) + rv = api_client.get(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Content with sha1 checksum equals to %s not found!' % + unknown_content_['sha1'] + } + + +@given(content()) +def test_api_content_raw_text(api_client, archive_data, content): + url = reverse('api-1-content-raw', + url_args={'q': 'sha1:%s' % content['sha1']}) + + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/octet-stream' + assert rv['Content-disposition'] == \ + 'attachment; filename=content_sha1_%s_raw' % content['sha1'] + assert rv['Content-Type'] == 'application/octet-stream' + expected_data = archive_data.content_get(content['sha1']) + assert rv.content == expected_data['data'] + + +@given(content()) +def test_api_content_raw_text_with_filename(api_client, archive_data, content): + url = reverse('api-1-content-raw', + url_args={'q': 'sha1:%s' % content['sha1']}, + query_params={'filename': 'filename.txt'}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/octet-stream' + assert rv['Content-disposition'] == \ + 'attachment; filename=filename.txt' + assert rv['Content-Type'] == 'application/octet-stream' + expected_data = archive_data.content_get(content['sha1']) + assert rv.content == expected_data['data'] + + +@given(content()) +def test_api_check_content_known(api_client, content): + url = reverse('api-1-content-known', + url_args={'q': content['sha1']}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + assert rv.data == { + 'search_res': [ + { + 'found': True, + 'sha1': content['sha1'] + } + ], + 'search_stats': {'nbfiles': 1, 'pct': 100.0} + } + + +@given(content()) +def test_api_check_content_known_as_yaml(api_client, content): + url = reverse('api-1-content-known', + url_args={'q': content['sha1']}) + rv = api_client.get(url, HTTP_ACCEPT='application/yaml') + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/yaml' + + assert rv.data == { + 'search_res': [ + { + 'found': True, + 'sha1': content['sha1'] + } + ], + 'search_stats': {'nbfiles': 1, 'pct': 100.0} + } + + +@given(content()) +def test_api_check_content_known_post_as_yaml(api_client, content): + url = reverse('api-1-content-known') + rv = api_client.post(url, data={'q': content['sha1']}, + HTTP_ACCEPT='application/yaml') + + assert rv.status_code == 200, rv.data + assert 'application/yaml' in rv['Content-Type'] + assert rv.data == { + 'search_res': [ + { + 'found': True, + 'sha1': content['sha1'] + } + ], + 'search_stats': {'nbfiles': 1, 'pct': 100.0} + } + + +def test_api_check_content_known_not_found(api_client): + unknown_content_ = random_content() + + url = reverse('api-1-content-known', + url_args={'q': unknown_content_['sha1']}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'search_res': [ + { + 'found': False, + 'sha1': unknown_content_['sha1'] + } + ], + 'search_stats': {'nbfiles': 1, 'pct': 0.0} + } + + +@given(content()) +def test_api_content_uppercase(api_client, content): + url = reverse('api-1-content-uppercase-checksum', + url_args={'q': content['sha1'].upper()}) + + rv = api_client.get(url) + assert rv.status_code == 302, rv.data + + redirect_url = reverse('api-1-content', + url_args={'q': content['sha1']}) + + assert rv['location'] == redirect_url diff --git a/swh/web/tests/api/views/test_directory.py b/swh/web/tests/api/views/test_directory.py --- a/swh/web/tests/api/views/test_directory.py +++ b/swh/web/tests/api/views/test_directory.py @@ -6,101 +6,98 @@ import random from hypothesis import given -from rest_framework.test import APITestCase from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import directory -from swh.web.tests.testcase import WebTestCase -class DirectoryApiTestCase(WebTestCase, APITestCase): +@given(directory()) +def test_api_directory(api_client, archive_data, directory): - @given(directory()) - def test_api_directory(self, directory): + url = reverse('api-1-directory', url_args={'sha1_git': directory}) + rv = api_client.get(url) - url = reverse('api-1-directory', url_args={'sha1_git': directory}) - rv = self.client.get(url) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') + expected_data = list(map(_enrich_dir_data, + archive_data.directory_ls(directory))) - expected_data = list(map(self._enrich_dir_data, - self.directory_ls(directory))) + assert rv.data == expected_data - self.assertEqual(rv.data, expected_data) - def test_api_directory_not_found(self): - unknown_directory_ = random_sha1() +def test_api_directory_not_found(api_client): + unknown_directory_ = random_sha1() - url = reverse('api-1-directory', - url_args={'sha1_git': unknown_directory_}) - rv = self.client.get(url) + url = reverse('api-1-directory', + url_args={'sha1_git': unknown_directory_}) + rv = api_client.get(url) - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Directory with sha1_git %s not found' - % unknown_directory_}) + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Directory with sha1_git %s not found' % unknown_directory_ + } - @given(directory()) - def test_api_directory_with_path_found(self, directory): - directory_content = self.directory_ls(directory) - path = random.choice(directory_content) +@given(directory()) +def test_api_directory_with_path_found(api_client, archive_data, directory): - url = reverse('api-1-directory', - url_args={'sha1_git': directory, - 'path': path['name']}) - rv = self.client.get(url) + directory_content = archive_data.directory_ls(directory) + path = random.choice(directory_content) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, self._enrich_dir_data(path)) + url = reverse('api-1-directory', + url_args={'sha1_git': directory, 'path': path['name']}) + rv = api_client.get(url) - @given(directory()) - def test_api_directory_with_path_not_found(self, directory): + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == _enrich_dir_data(path) - path = 'some/path/to/nonexistent/dir/' - url = reverse('api-1-directory', - url_args={'sha1_git': directory, - 'path': path}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': ('Directory entry with path %s from %s not found' - % (path, directory))}) +@given(directory()) +def test_api_directory_with_path_not_found(api_client, directory): - @given(directory()) - def test_api_directory_uppercase(self, directory): - url = reverse('api-1-directory-uppercase-checksum', - url_args={'sha1_git': directory.upper()}) + path = 'some/path/to/nonexistent/dir/' + url = reverse('api-1-directory', + url_args={'sha1_git': directory, 'path': path}) + rv = api_client.get(url) - resp = self.client.get(url) - self.assertEqual(resp.status_code, 302) + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': ('Directory entry with path %s from %s not found' % + (path, directory)) + } - redirect_url = reverse('api-1-directory', - url_args={'sha1_git': directory}) - self.assertEqual(resp['location'], redirect_url) +@given(directory()) +def test_api_directory_uppercase(api_client, directory): + url = reverse('api-1-directory-uppercase-checksum', + url_args={'sha1_git': directory.upper()}) - @classmethod - def _enrich_dir_data(cls, dir_data): - if dir_data['type'] == 'file': - dir_data['target_url'] = \ - reverse('api-1-content', - url_args={'q': 'sha1_git:%s' % dir_data['target']}) - elif dir_data['type'] == 'dir': - dir_data['target_url'] = \ - reverse('api-1-directory', - url_args={'sha1_git': dir_data['target']}) - elif dir_data['type'] == 'rev': - dir_data['target_url'] = \ - reverse('api-1-revision', - url_args={'sha1_git': dir_data['target']}) + resp = api_client.get(url) + assert resp.status_code == 302 - return dir_data + redirect_url = reverse('api-1-directory', url_args={'sha1_git': directory}) + + assert resp['location'] == redirect_url + + +def _enrich_dir_data(dir_data): + if dir_data['type'] == 'file': + dir_data['target_url'] = reverse( + 'api-1-content', + url_args={'q': 'sha1_git:%s' % dir_data['target']}) + elif dir_data['type'] == 'dir': + dir_data['target_url'] = reverse( + 'api-1-directory', + url_args={'sha1_git': dir_data['target']}) + elif dir_data['type'] == 'rev': + dir_data['target_url'] = reverse( + 'api-1-revision', + url_args={'sha1_git': dir_data['target']}) + return dir_data diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py --- a/swh/web/tests/api/views/test_identifiers.py +++ b/swh/web/tests/api/views/test_identifiers.py @@ -4,7 +4,6 @@ # See top-level LICENSE file for more information from hypothesis import given -from rest_framework.test import APITestCase from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT @@ -16,76 +15,75 @@ unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot ) -from swh.web.tests.testcase import WebTestCase - - -class SwhIdsApiTestCase(WebTestCase, APITestCase): - - @given(origin(), content(), directory(), release(), revision(), snapshot()) - def test_swh_id_resolve_success(self, origin, content, directory, - release, revision, snapshot): - - for obj_type_short, obj_type, obj_id in ( - ('cnt', CONTENT, content['sha1_git']), - ('dir', DIRECTORY, directory), - ('rel', RELEASE, release), - ('rev', REVISION, revision), - ('snp', SNAPSHOT, snapshot)): - - swh_id = 'swh:1:%s:%s;origin=%s' % (obj_type_short, obj_id, - origin['url']) - url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': swh_id}) - - resp = self.client.get(url) - - if obj_type == CONTENT: - url_args = {'query_string': 'sha1_git:%s' % obj_id} - elif obj_type == SNAPSHOT: - url_args = {'snapshot_id': obj_id} - else: - url_args = {'sha1_git': obj_id} - - browse_rev_url = reverse('browse-%s' % obj_type, - url_args=url_args, - query_params={'origin': origin['url']}) - - expected_result = { - 'browse_url': browse_rev_url, - 'metadata': {'origin': origin['url']}, - 'namespace': 'swh', - 'object_id': obj_id, - 'object_type': obj_type, - 'scheme_version': 1 - } - - self.assertEqual(resp.status_code, 200, resp.data) - self.assertEqual(resp.data, expected_result) - - def test_swh_id_resolve_invalid(self): - rev_id_invalid = '96db9023b8_foo_50d6c108e9a3' - swh_id = 'swh:1:rev:%s' % rev_id_invalid + + +@given(origin(), content(), directory(), release(), revision(), snapshot()) +def test_swh_id_resolve_success(api_client, origin, content, directory, + release, revision, snapshot): + + for obj_type_short, obj_type, obj_id in ( + ('cnt', CONTENT, content['sha1_git']), + ('dir', DIRECTORY, directory), + ('rel', RELEASE, release), + ('rev', REVISION, revision), + ('snp', SNAPSHOT, snapshot)): + + swh_id = 'swh:1:%s:%s;origin=%s' % (obj_type_short, obj_id, + origin['url']) url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': swh_id}) - resp = self.client.get(url) + resp = api_client.get(url) + + if obj_type == CONTENT: + url_args = {'query_string': 'sha1_git:%s' % obj_id} + elif obj_type == SNAPSHOT: + url_args = {'snapshot_id': obj_id} + else: + url_args = {'sha1_git': obj_id} + + browse_rev_url = reverse('browse-%s' % obj_type, + url_args=url_args, + query_params={'origin': origin['url']}) + + expected_result = { + 'browse_url': browse_rev_url, + 'metadata': {'origin': origin['url']}, + 'namespace': 'swh', + 'object_id': obj_id, + 'object_type': obj_type, + 'scheme_version': 1 + } - self.assertEqual(resp.status_code, 400, resp.data) + assert resp.status_code == 200, resp.data + assert resp.data == expected_result - @given(unknown_content(), unknown_directory(), unknown_release(), - unknown_revision(), unknown_snapshot()) - def test_swh_id_resolve_not_found(self, unknown_content, unknown_directory, - unknown_release, unknown_revision, - unknown_snapshot): - for obj_type_short, obj_id in (('cnt', unknown_content['sha1_git']), - ('dir', unknown_directory), - ('rel', unknown_release), - ('rev', unknown_revision), - ('snp', unknown_snapshot)): +def test_swh_id_resolve_invalid(api_client): + rev_id_invalid = '96db9023b8_foo_50d6c108e9a3' + swh_id = 'swh:1:rev:%s' % rev_id_invalid + url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': swh_id}) - swh_id = 'swh:1:%s:%s' % (obj_type_short, obj_id) + resp = api_client.get(url) - url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': swh_id}) + assert resp.status_code == 400, resp.data + + +@given(unknown_content(), unknown_directory(), unknown_release(), + unknown_revision(), unknown_snapshot()) +def test_swh_id_resolve_not_found(api_client, unknown_content, + unknown_directory, unknown_release, + unknown_revision, unknown_snapshot): + + for obj_type_short, obj_id in (('cnt', unknown_content['sha1_git']), + ('dir', unknown_directory), + ('rel', unknown_release), + ('rev', unknown_revision), + ('snp', unknown_snapshot)): + + swh_id = 'swh:1:%s:%s' % (obj_type_short, obj_id) + + url = reverse('api-1-resolve-swh-pid', url_args={'swh_id': swh_id}) - resp = self.client.get(url) + resp = api_client.get(url) - self.assertEqual(resp.status_code, 404, resp.data) + assert resp.status_code == 404, resp.data diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -3,12 +3,8 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from unittest.mock import patch - -from hypothesis import given, strategies -import pytest +from hypothesis import given from requests.utils import parse_header_links -from rest_framework.test import APITestCase from swh.storage.exc import StorageDBError, StorageAPIError @@ -18,295 +14,202 @@ from swh.web.tests.strategies import ( origin, new_origin, visit_dates, new_snapshots ) -from swh.web.tests.testcase import WebTestCase -from swh.web.tests.data import get_tests_data - - -class OriginApiTestCase(WebTestCase, APITestCase): - def _scroll_results(self, url): - """Iterates through pages of results, and returns them all.""" - results = [] - - while True: - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - - results.extend(rv.data) - - if 'Link' in rv: - for link in parse_header_links(rv['Link']): - if link['rel'] == 'next': - # Found link to next page of results - url = link['url'] - break - else: - # No link with 'rel=next' + + +def _scroll_results(api_client, url): + """Iterates through pages of results, and returns them all.""" + results = [] + + while True: + rv = api_client.get(url) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + results.extend(rv.data) + + if 'Link' in rv: + for link in parse_header_links(rv['Link']): + if link['rel'] == 'next': + # Found link to next page of results + url = link['url'] break else: - # No Link header + # No link with 'rel=next' break + else: + # No Link header + break - return results + return results - @patch('swh.web.api.views.origin.get_origin_visits') - def test_api_lookup_origin_visits_raise_error( - self, mock_get_origin_visits, - ): - err_msg = 'voluntary error to check the bad request middleware.' +def test_api_lookup_origin_visits_raise_error(api_client, mocker): + mock_get_origin_visits = mocker.patch( + 'swh.web.api.views.origin.get_origin_visits') + err_msg = 'voluntary error to check the bad request middleware.' - mock_get_origin_visits.side_effect = BadInputExc(err_msg) - - url = reverse( - 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 400, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'BadInputExc', - 'reason': err_msg}) - - @patch('swh.web.api.views.origin.get_origin_visits') - def test_api_lookup_origin_visits_raise_swh_storage_error_db( - self, mock_get_origin_visits): - - err_msg = 'Storage exploded! Will be back online shortly!' - - mock_get_origin_visits.side_effect = StorageDBError(err_msg) + mock_get_origin_visits.side_effect = BadInputExc(err_msg) - url = reverse( - 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) - rv = self.client.get(url) + url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'}) + rv = api_client.get(url) - self.assertEqual(rv.status_code, 503, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'StorageDBError', - 'reason': - 'An unexpected error occurred in the backend: %s' % err_msg}) - - @patch('swh.web.api.views.origin.get_origin_visits') - def test_api_lookup_origin_visits_raise_swh_storage_error_api( - self, mock_get_origin_visits): - - err_msg = 'Storage API dropped dead! Will resurrect asap!' - - mock_get_origin_visits.side_effect = StorageAPIError(err_msg) - - url = reverse( - 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 503, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'StorageAPIError', - 'reason': - 'An unexpected error occurred in the api backend: %s' % err_msg - }) - - @given(new_origin(), visit_dates(3), new_snapshots(3)) - def test_api_lookup_origin_visits(self, new_origin, visit_dates, - new_snapshots): - - self.storage.origin_add_one(new_origin) - for i, visit_date in enumerate(visit_dates): - origin_visit = self.storage.origin_visit_add( - new_origin['url'], visit_date, type='git') - self.storage.snapshot_add([new_snapshots[i]]) - self.storage.origin_visit_update( - new_origin['url'], origin_visit['visit'], - snapshot=new_snapshots[i]['id']) - - all_visits = list(reversed(get_origin_visits(new_origin))) - - for last_visit, expected_visits in ( - (None, all_visits[:2]), - (all_visits[1]['visit'], all_visits[2:4])): - - url = reverse('api-1-origin-visits', - url_args={'origin_url': new_origin['url']}, - query_params={'per_page': 2, - 'last_visit': last_visit}) - - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - - for expected_visit in expected_visits: - origin_visit_url = reverse( - 'api-1-origin-visit', - url_args={'origin_url': new_origin['url'], - 'visit_id': expected_visit['visit']}) - snapshot_url = reverse( - 'api-1-snapshot', - url_args={'snapshot_id': expected_visit['snapshot']}) - expected_visit['origin'] = new_origin['url'] - expected_visit['origin_visit_url'] = origin_visit_url - expected_visit['snapshot_url'] = snapshot_url - - self.assertEqual(rv.data, expected_visits) - - @given(new_origin(), visit_dates(3), new_snapshots(3)) - def test_api_lookup_origin_visits_by_id(self, new_origin, visit_dates, - new_snapshots): - - self.storage.origin_add_one(new_origin) - for i, visit_date in enumerate(visit_dates): - origin_visit = self.storage.origin_visit_add( - new_origin['url'], visit_date, type='git') - self.storage.snapshot_add([new_snapshots[i]]) - self.storage.origin_visit_update( - new_origin['url'], origin_visit['visit'], - snapshot=new_snapshots[i]['id']) - - all_visits = list(reversed(get_origin_visits(new_origin))) - - for last_visit, expected_visits in ( - (None, all_visits[:2]), - (all_visits[1]['visit'], all_visits[2:4])): - - url = reverse('api-1-origin-visits', - url_args={'origin_url': new_origin['url']}, - query_params={'per_page': 2, - 'last_visit': last_visit}) - - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - - for expected_visit in expected_visits: - origin_visit_url = reverse( - 'api-1-origin-visit', - url_args={'origin_url': new_origin['url'], - 'visit_id': expected_visit['visit']}) - snapshot_url = reverse( - 'api-1-snapshot', - url_args={'snapshot_id': expected_visit['snapshot']}) - expected_visit['origin'] = new_origin['url'] - expected_visit['origin_visit_url'] = origin_visit_url - expected_visit['snapshot_url'] = snapshot_url - - self.assertEqual(rv.data, expected_visits) - - @given(new_origin(), visit_dates(3), new_snapshots(3)) - def test_api_lookup_origin_visit(self, new_origin, visit_dates, - new_snapshots): - - self.storage.origin_add_one(new_origin) - for i, visit_date in enumerate(visit_dates): - origin_visit = self.storage.origin_visit_add( - new_origin['url'], visit_date, type='git') - visit_id = origin_visit['visit'] - self.storage.snapshot_add([new_snapshots[i]]) - self.storage.origin_visit_update( - new_origin['url'], origin_visit['visit'], - snapshot=new_snapshots[i]['id']) - url = reverse('api-1-origin-visit', - url_args={'origin_url': new_origin['url'], - 'visit_id': visit_id}) - - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - - expected_visit = self.origin_visit_get_by( - new_origin['url'], visit_id) - - origin_url = reverse('api-1-origin', - url_args={'origin_url': new_origin['url']}) - snapshot_url = reverse( - 'api-1-snapshot', - url_args={'snapshot_id': expected_visit['snapshot']}) + assert rv.status_code == 400, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'BadInputExc', + 'reason': err_msg + } - expected_visit['origin'] = new_origin['url'] - expected_visit['origin_url'] = origin_url - expected_visit['snapshot_url'] = snapshot_url - self.assertEqual(rv.data, expected_visit) +def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, + mocker): + mock_get_origin_visits = mocker.patch( + 'swh.web.api.views.origin.get_origin_visits') + err_msg = 'Storage exploded! Will be back online shortly!' - @given(new_origin()) - def test_api_lookup_origin_visit_latest_no_visit(self, new_origin): + mock_get_origin_visits.side_effect = StorageDBError(err_msg) - self.storage.origin_add_one(new_origin) + url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'}) + rv = api_client.get(url) - url = reverse('api-1-origin-visit-latest', - url_args={'origin_url': new_origin['url']}) + assert rv.status_code == 503, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'StorageDBError', + 'reason': + 'An unexpected error occurred in the backend: %s' % err_msg + } - rv = self.client.get(url) - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'No visit for origin %s found' % new_origin['url'] - }) - @given(new_origin(), visit_dates(2), new_snapshots(1)) - def test_api_lookup_origin_visit_latest( - self, new_origin, visit_dates, new_snapshots): +def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, + mocker): + mock_get_origin_visits = mocker.patch( + 'swh.web.api.views.origin.get_origin_visits') + err_msg = 'Storage API dropped dead! Will resurrect asap!' - self.storage.origin_add_one(new_origin) + mock_get_origin_visits.side_effect = StorageAPIError(err_msg) - visit_dates.sort() - visit_ids = [] - for i, visit_date in enumerate(visit_dates): - origin_visit = self.storage.origin_visit_add( - new_origin['url'], visit_date, type='git') - visit_ids.append(origin_visit['visit']) + url = reverse( + 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) + rv = api_client.get(url) - self.storage.snapshot_add([new_snapshots[0]]) - self.storage.origin_visit_update( - new_origin['url'], visit_ids[0], - snapshot=new_snapshots[0]['id']) + assert rv.status_code == 503, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'StorageAPIError', + 'reason': + 'An unexpected error occurred in the api backend: %s' % err_msg + } - url = reverse('api-1-origin-visit-latest', - url_args={'origin_url': new_origin['url']}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') +@given(new_origin(), visit_dates(3), new_snapshots(3)) +def test_api_lookup_origin_visits(api_client, archive_data, new_origin, + visit_dates, new_snapshots): + archive_data.origin_add_one(new_origin) + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + new_origin['url'], visit_date, type='git') + archive_data.snapshot_add([new_snapshots[i]]) + archive_data.origin_visit_update( + new_origin['url'], origin_visit['visit'], + snapshot=new_snapshots[i]['id']) - expected_visit = self.origin_visit_get_by( - new_origin['url'], visit_ids[1]) + all_visits = list(reversed(get_origin_visits(new_origin))) - origin_url = reverse('api-1-origin', - url_args={'origin_url': new_origin['url']}) + for last_visit, expected_visits in ( + (None, all_visits[:2]), + (all_visits[1]['visit'], all_visits[2:4])): + + url = reverse('api-1-origin-visits', + url_args={'origin_url': new_origin['url']}, + query_params={'per_page': 2, + 'last_visit': last_visit}) + + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + for expected_visit in expected_visits: + origin_visit_url = reverse( + 'api-1-origin-visit', + url_args={'origin_url': new_origin['url'], + 'visit_id': expected_visit['visit']}) + snapshot_url = reverse( + 'api-1-snapshot', + url_args={'snapshot_id': expected_visit['snapshot']}) + expected_visit['origin'] = new_origin['url'] + expected_visit['origin_visit_url'] = origin_visit_url + expected_visit['snapshot_url'] = snapshot_url + + assert rv.data == expected_visits - expected_visit['origin'] = new_origin['url'] - expected_visit['origin_url'] = origin_url - expected_visit['snapshot_url'] = None - self.assertEqual(rv.data, expected_visit) +@given(new_origin(), visit_dates(3), new_snapshots(3)) +def test_api_lookup_origin_visits_by_id(api_client, archive_data, new_origin, + visit_dates, new_snapshots): + archive_data.origin_add_one(new_origin) + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + new_origin['url'], visit_date, type='git') + archive_data.snapshot_add([new_snapshots[i]]) + archive_data.origin_visit_update( + new_origin['url'], origin_visit['visit'], + snapshot=new_snapshots[i]['id']) - @given(new_origin(), visit_dates(2), new_snapshots(1)) - def test_api_lookup_origin_visit_latest_with_snapshot( - self, new_origin, visit_dates, new_snapshots): - self.storage.origin_add_one(new_origin) - visit_dates.sort() - visit_ids = [] - for i, visit_date in enumerate(visit_dates): - origin_visit = self.storage.origin_visit_add( - new_origin['url'], visit_date, type='git') - visit_ids.append(origin_visit['visit']) + all_visits = list(reversed(get_origin_visits(new_origin))) - self.storage.snapshot_add([new_snapshots[0]]) - self.storage.origin_visit_update( - new_origin['url'], visit_ids[0], - snapshot=new_snapshots[0]['id']) + for last_visit, expected_visits in ( + (None, all_visits[:2]), + (all_visits[1]['visit'], all_visits[2:4])): - url = reverse('api-1-origin-visit-latest', - url_args={'origin_url': new_origin['url']}) - url += '?require_snapshot=true' + url = reverse('api-1-origin-visits', + url_args={'origin_url': new_origin['url']}, + query_params={'per_page': 2, + 'last_visit': last_visit}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') + rv = api_client.get(url) - expected_visit = self.origin_visit_get_by( - new_origin['url'], visit_ids[0]) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + for expected_visit in expected_visits: + origin_visit_url = reverse( + 'api-1-origin-visit', + url_args={'origin_url': new_origin['url'], + 'visit_id': expected_visit['visit']}) + snapshot_url = reverse( + 'api-1-snapshot', + url_args={'snapshot_id': expected_visit['snapshot']}) + expected_visit['origin'] = new_origin['url'] + expected_visit['origin_visit_url'] = origin_visit_url + expected_visit['snapshot_url'] = snapshot_url + + assert rv.data == expected_visits + + +@given(new_origin(), visit_dates(3), new_snapshots(3)) +def test_api_lookup_origin_visit(api_client, archive_data, new_origin, + visit_dates, new_snapshots): + archive_data.origin_add_one(new_origin) + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + new_origin['url'], visit_date, type='git') + visit_id = origin_visit['visit'] + archive_data.snapshot_add([new_snapshots[i]]) + archive_data.origin_visit_update( + new_origin['url'], origin_visit['visit'], + snapshot=new_snapshots[i]['id']) + url = reverse('api-1-origin-visit', + url_args={'origin_url': new_origin['url'], + 'visit_id': visit_id}) + + rv = api_client.get(url) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + expected_visit = archive_data.origin_visit_get_by( + new_origin['url'], visit_id) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) @@ -318,331 +221,276 @@ expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url - self.assertEqual(rv.data, expected_visit) + assert rv.data == expected_visit - @given(origin()) - def test_api_lookup_origin_visit_not_found(self, origin): - all_visits = list(reversed(get_origin_visits(origin))) +@given(new_origin()) +def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, + new_origin): + archive_data.origin_add_one(new_origin) - max_visit_id = max([v['visit'] for v in all_visits]) + url = reverse('api-1-origin-visit-latest', + url_args={'origin_url': new_origin['url']}) - url = reverse('api-1-origin-visit', - url_args={'origin_url': origin['url'], - 'visit_id': max_visit_id + 1}) - - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Origin %s or its visit with id %s not found!' % - (origin['url'], max_visit_id+1) - }) - - @pytest.mark.origin_id - def test_api_origins(self): - origins = get_tests_data()['origins'] - origin_urls = {origin['url'] for origin in origins} - - # Get only one - url = reverse('api-1-origins', - query_params={'origin_count': 1}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1) - self.assertLess({origin['url'] for origin in rv.data}, origin_urls) - - # Get all - url = reverse('api-1-origins', - query_params={'origin_count': len(origins)}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), len(origins)) - self.assertEqual({origin['url'] for origin in rv.data}, origin_urls) - - # Get "all + 10" - url = reverse('api-1-origins', - query_params={'origin_count': len(origins)+10}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), len(origins)) - self.assertEqual({origin['url'] for origin in rv.data}, origin_urls) - - @pytest.mark.origin_id - @given(strategies.integers(min_value=1)) - def test_api_origins_scroll(self, origin_count): - origins = get_tests_data()['origins'] - origin_urls = {origin['url'] for origin in origins} - - url = reverse('api-1-origins', - query_params={'origin_count': origin_count}) - - results = self._scroll_results(url) - - self.assertEqual(len(results), len(origins)) - self.assertEqual({origin['url'] for origin in results}, origin_urls) - - @given(origin()) - def test_api_origin_by_url(self, origin): - - url = reverse('api-1-origin', - url_args={'origin_url': origin['url']}) - rv = self.client.get(url) - - expected_origin = self.origin_get(origin) - - origin_visits_url = reverse('api-1-origin-visits', - url_args={'origin_url': origin['url']}) - - expected_origin['origin_visits_url'] = origin_visits_url - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, expected_origin) - - @given(new_origin()) - def test_api_origin_not_found(self, new_origin): - - url = reverse('api-1-origin', - url_args={'origin_url': new_origin['url']}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Origin with url %s not found!' % new_origin['url'] - }) - - @pytest.mark.origin_id - def test_api_origin_search(self): - expected_origins = { - 'https://github.com/wcoder/highlightjs-line-numbers.js', - 'https://github.com/memononen/libtess2', - } + rv = api_client.get(url) + assert rv.status_code == 404, rv.data + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'No visit for origin %s found' % new_origin['url'] + } - # Search for 'github.com', get only one - url = reverse('api-1-origin-search', - url_args={'url_pattern': 'github.com'}, - query_params={'limit': 1}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1) - self.assertLess({origin['url'] for origin in rv.data}, - expected_origins) - - # Search for 'github.com', get all - url = reverse('api-1-origin-search', - url_args={'url_pattern': 'github.com'}, - query_params={'limit': 2}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual({origin['url'] for origin in rv.data}, - expected_origins) - - # Search for 'github.com', get more than available - url = reverse('api-1-origin-search', - url_args={'url_pattern': 'github.com'}, - query_params={'limit': 10}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual({origin['url'] for origin in rv.data}, - expected_origins) - - @pytest.mark.origin_id - def test_api_origin_search_regexp(self): - expected_origins = { - 'https://github.com/memononen/libtess2', - 'repo_with_submodules' - } - url = reverse('api-1-origin-search', - url_args={'url_pattern': '(repo|libtess)'}, - query_params={'limit': 10, - 'regexp': True}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual({origin['url'] for origin in rv.data}, - expected_origins) - - @pytest.mark.origin_id - @given(strategies.integers(min_value=1)) - def test_api_origin_search_scroll(self, limit): - expected_origins = { - 'https://github.com/wcoder/highlightjs-line-numbers.js', - 'https://github.com/memononen/libtess2', +@given(new_origin(), visit_dates(2), new_snapshots(1)) +def test_api_lookup_origin_visit_latest(api_client, archive_data, new_origin, + visit_dates, new_snapshots): + archive_data.origin_add_one(new_origin) + visit_dates.sort() + visit_ids = [] + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + new_origin['url'], visit_date, type='git') + visit_ids.append(origin_visit['visit']) + + archive_data.snapshot_add([new_snapshots[0]]) + archive_data.origin_visit_update( + new_origin['url'], visit_ids[0], + snapshot=new_snapshots[0]['id']) + + url = reverse('api-1-origin-visit-latest', + url_args={'origin_url': new_origin['url']}) + + rv = api_client.get(url) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + expected_visit = archive_data.origin_visit_get_by( + new_origin['url'], visit_ids[1]) + + origin_url = reverse('api-1-origin', + url_args={'origin_url': new_origin['url']}) + + expected_visit['origin'] = new_origin['url'] + expected_visit['origin_url'] = origin_url + expected_visit['snapshot_url'] = None + + assert rv.data == expected_visit + + +@given(new_origin(), visit_dates(2), new_snapshots(1)) +def test_api_lookup_origin_visit_latest_with_snapshot(api_client, archive_data, + new_origin, visit_dates, + new_snapshots): + archive_data.origin_add_one(new_origin) + visit_dates.sort() + visit_ids = [] + for i, visit_date in enumerate(visit_dates): + origin_visit = archive_data.origin_visit_add( + new_origin['url'], visit_date, type='git') + visit_ids.append(origin_visit['visit']) + + archive_data.snapshot_add([new_snapshots[0]]) + archive_data.origin_visit_update( + new_origin['url'], visit_ids[0], + snapshot=new_snapshots[0]['id']) + + url = reverse('api-1-origin-visit-latest', + url_args={'origin_url': new_origin['url']}) + url += '?require_snapshot=true' + + rv = api_client.get(url) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + expected_visit = archive_data.origin_visit_get_by( + new_origin['url'], visit_ids[0]) + + origin_url = reverse('api-1-origin', + url_args={'origin_url': new_origin['url']}) + snapshot_url = reverse( + 'api-1-snapshot', + url_args={'snapshot_id': expected_visit['snapshot']}) + + expected_visit['origin'] = new_origin['url'] + expected_visit['origin_url'] = origin_url + expected_visit['snapshot_url'] = snapshot_url + + assert rv.data == expected_visit + + +@given(origin()) +def test_api_lookup_origin_visit_not_found(api_client, origin): + + all_visits = list(reversed(get_origin_visits(origin))) + + max_visit_id = max([v['visit'] for v in all_visits]) + + url = reverse('api-1-origin-visit', + url_args={'origin_url': origin['url'], + 'visit_id': max_visit_id + 1}) + + rv = api_client.get(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Origin %s or its visit with id %s not found!' % + (origin['url'], max_visit_id+1) + } + + +def test_api_origin_search_limit(api_client, archive_data): + archive_data.origin_add([ + {'url': 'http://foobar/{}'.format(i)} + for i in range(2000) + ]) + + url = reverse('api-1-origin-search', + url_args={'url_pattern': 'foobar'}, + query_params={'limit': 1050}) + rv = api_client.get(url) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert len(rv.data) == 1000 + + +@given(origin()) +def test_api_origin_metadata_search(api_client, mocker, origin): + mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') + oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext + oimsft.side_effect = lambda conjunction, limit: [{ + 'from_revision': ( + b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' + b'\xf2U\xfa\x05B8'), + 'metadata': {'author': 'Jane Doe'}, + 'id': origin['url'], + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1' + } + }] + + url = reverse('api-1-origin-metadata-search', + query_params={'fulltext': 'Jane Doe'}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.content + assert rv['Content-Type'] == 'application/json' + expected_data = [{ + 'url': origin['url'], + 'metadata': { + 'metadata': {'author': 'Jane Doe'}, + 'from_revision': ( + '7026b7c1a2af56521e951c01ed20f255fa054238'), + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1', + } } + }] + + assert rv.data == expected_data + oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70) + + +@given(origin()) +def test_api_origin_metadata_search_limit(api_client, mocker, origin): + mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') + oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext + + oimsft.side_effect = lambda conjunction, limit: [{ + 'from_revision': ( + b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' + b'\xf2U\xfa\x05B8'), + 'metadata': {'author': 'Jane Doe'}, + 'id': origin['url'], + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1' + } + }] + + url = reverse('api-1-origin-metadata-search', + query_params={'fulltext': 'Jane Doe'}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.content + assert rv['Content-Type'] == 'application/json' + assert len(rv.data) == 1 + oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70) + + url = reverse('api-1-origin-metadata-search', + query_params={'fulltext': 'Jane Doe', + 'limit': 10}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.content + assert rv['Content-Type'] == 'application/json' + assert len(rv.data) == 1 + oimsft.assert_called_with(conjunction=['Jane Doe'], limit=10) + + url = reverse('api-1-origin-metadata-search', + query_params={'fulltext': 'Jane Doe', + 'limit': 987}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.content + assert rv['Content-Type'] == 'application/json' + assert len(rv.data) == 1 + oimsft.assert_called_with(conjunction=['Jane Doe'], limit=100) + + +@given(origin()) +def test_api_origin_intrinsic_metadata(api_client, mocker, origin): + mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') + oimg = mock_idx_storage.origin_intrinsic_metadata_get + oimg.side_effect = lambda origin_urls: [{ + 'from_revision': ( + b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' + b'\xf2U\xfa\x05B8'), + 'metadata': {'author': 'Jane Doe'}, + 'id': origin['url'], + 'tool': { + 'configuration': { + 'context': ['NpmMapping', 'CodemetaMapping'], + 'type': 'local' + }, + 'id': 3, + 'name': 'swh-metadata-detector', + 'version': '0.0.1' + } + }] + + url = reverse('api-origin-intrinsic-metadata', + url_args={'origin_url': origin['url']}) + rv = api_client.get(url) + + oimg.assert_called_once_with([origin['url']]) + assert rv.status_code == 200, rv.content + assert rv['Content-Type'] == 'application/json' + expected_data = {'author': 'Jane Doe'} + assert rv.data == expected_data + + +def test_api_origin_metadata_search_invalid(api_client, mocker): + mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage') + url = reverse('api-1-origin-metadata-search') + rv = api_client.get(url) - url = reverse('api-1-origin-search', - url_args={'url_pattern': 'github.com'}, - query_params={'limit': limit}) - - results = self._scroll_results(url) - - self.assertEqual({origin['url'] for origin in results}, - expected_origins) - - def test_api_origin_search_limit(self): - self.storage.origin_add([ - {'url': 'http://foobar/{}'.format(i)} - for i in range(2000) - ]) - - url = reverse('api-1-origin-search', - url_args={'url_pattern': 'foobar'}, - query_params={'limit': 1050}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1000) - - @given(origin()) - def test_api_origin_metadata_search(self, origin): - with patch('swh.web.common.service.idx_storage') as mock_idx_storage: - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .side_effect = lambda conjunction, limit: [{ - 'from_revision': ( - b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' - b'\xf2U\xfa\x05B8'), - 'metadata': {'author': 'Jane Doe'}, - 'id': origin['url'], - 'tool': { - 'configuration': { - 'context': ['NpmMapping', 'CodemetaMapping'], - 'type': 'local' - }, - 'id': 3, - 'name': 'swh-metadata-detector', - 'version': '0.0.1' - } - }] - - url = reverse('api-1-origin-metadata-search', - query_params={'fulltext': 'Jane Doe'}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = [{ - 'url': origin['url'], - 'metadata': { - 'metadata': {'author': 'Jane Doe'}, - 'from_revision': ( - '7026b7c1a2af56521e951c01ed20f255fa054238'), - 'tool': { - 'configuration': { - 'context': ['NpmMapping', 'CodemetaMapping'], - 'type': 'local' - }, - 'id': 3, - 'name': 'swh-metadata-detector', - 'version': '0.0.1', - } - } - }] - self.assertEqual(rv.data, expected_data) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .assert_called_with(conjunction=['Jane Doe'], limit=70) - - @given(origin()) - def test_api_origin_metadata_search_limit(self, origin): - with patch('swh.web.common.service.idx_storage') as mock_idx_storage: - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .side_effect = lambda conjunction, limit: [{ - 'from_revision': ( - b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' - b'\xf2U\xfa\x05B8'), - 'metadata': {'author': 'Jane Doe'}, - 'id': origin['url'], - 'tool': { - 'configuration': { - 'context': ['NpmMapping', 'CodemetaMapping'], - 'type': 'local' - }, - 'id': 3, - 'name': 'swh-metadata-detector', - 'version': '0.0.1' - } - }] - - url = reverse('api-1-origin-metadata-search', - query_params={'fulltext': 'Jane Doe'}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .assert_called_with(conjunction=['Jane Doe'], limit=70) - - url = reverse('api-1-origin-metadata-search', - query_params={'fulltext': 'Jane Doe', - 'limit': 10}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .assert_called_with(conjunction=['Jane Doe'], limit=10) - - url = reverse('api-1-origin-metadata-search', - query_params={'fulltext': 'Jane Doe', - 'limit': 987}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(len(rv.data), 1) - mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ - .assert_called_with(conjunction=['Jane Doe'], limit=100) - - @given(origin()) - def test_api_origin_intrinsic_metadata(self, origin): - with patch('swh.web.common.service.idx_storage') as mock_idx_storage: - mock_idx_storage.origin_intrinsic_metadata_get \ - .side_effect = lambda origin_urls: [{ - 'from_revision': ( - b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' - b'\xf2U\xfa\x05B8'), - 'metadata': {'author': 'Jane Doe'}, - 'id': origin['url'], - 'tool': { - 'configuration': { - 'context': ['NpmMapping', 'CodemetaMapping'], - 'type': 'local' - }, - 'id': 3, - 'name': 'swh-metadata-detector', - 'version': '0.0.1' - } - }] - - url = reverse('api-origin-intrinsic-metadata', - url_args={'origin_url': origin['url']}) - rv = self.client.get(url) - - mock_idx_storage.origin_intrinsic_metadata_get \ - .assert_called_once_with([origin['url']]) - self.assertEqual(rv.status_code, 200, rv.content) - self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = {'author': 'Jane Doe'} - self.assertEqual(rv.data, expected_data) - - @patch('swh.web.common.service.idx_storage') - def test_api_origin_metadata_search_invalid(self, mock_idx_storage): - - url = reverse('api-1-origin-metadata-search') - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 400, rv.content) - mock_idx_storage.assert_not_called() + assert rv.status_code == 400, rv.content + mock_idx_storage.assert_not_called() diff --git a/swh/web/tests/api/views/test_origin_save.py b/swh/web/tests/api/views/test_origin_save.py --- a/swh/web/tests/api/views/test_origin_save.py +++ b/swh/web/tests/api/views/test_origin_save.py @@ -3,12 +3,11 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest + from datetime import datetime, timedelta from django.utils import timezone -from rest_framework.test import APITestCase -from unittest.mock import patch - from swh.web.common.utils import reverse from swh.web.common.models import ( SaveUnauthorizedOrigin, SaveOriginRequest, @@ -19,243 +18,234 @@ SAVE_TASK_NOT_CREATED, SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, SAVE_TASK_FAILED, SAVE_TASK_SUCCEED ) -from swh.web.tests.testcase import WebTestCase - - -class SaveApiTestCase(WebTestCase, APITestCase): - - @classmethod - def setUpTestData(cls): # noqa: N802 - SaveUnauthorizedOrigin.objects.create( - url='https://github.com/user/illegal_repo') - SaveUnauthorizedOrigin.objects.create( - url='https://gitlab.com/user_to_exclude') - - def test_invalid_visit_type(self): - url = reverse('api-1-save-origin', - url_args={'visit_type': 'foo', - 'origin_url': 'https://github.com/torvalds/linux'}) # noqa - - response = self.client.post(url) - self.assertEqual(response.status_code, 400) - - def test_invalid_origin_url(self): - url = reverse('api-1-save-origin', - url_args={'visit_type': 'git', - 'origin_url': 'bar'}) - - response = self.client.post(url) - self.assertEqual(response.status_code, 400) - - def check_created_save_request_status(self, mock_scheduler, origin_url, - scheduler_task_status, - expected_request_status, - expected_task_status=None, - visit_date=None): - - if not scheduler_task_status: - mock_scheduler.get_tasks.return_value = [] - else: - mock_scheduler.get_tasks.return_value = \ - [{ - 'priority': 'high', - 'policy': 'oneshot', - 'type': 'load-git', - 'arguments': { - 'kwargs': { - 'repo_url': origin_url - }, - 'args': [] - }, - 'status': scheduler_task_status, - 'id': 1, - }] - - mock_scheduler.create_tasks.return_value = \ - [{ - 'priority': 'high', - 'policy': 'oneshot', - 'type': 'load-git', - 'arguments': { - 'kwargs': { - 'repo_url': origin_url - }, - 'args': [] - }, - 'status': 'next_run_not_scheduled', - 'id': 1, - }] - - url = reverse('api-1-save-origin', - url_args={'visit_type': 'git', - 'origin_url': origin_url}) - - with patch('swh.web.common.origin_save._get_visit_info_for_save_request') as mock_visit_date: # noqa - mock_visit_date.return_value = (visit_date, None) - response = self.client.post(url) - - if expected_request_status != SAVE_REQUEST_REJECTED: - self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['save_request_status'], - expected_request_status) - self.assertEqual(response.data['save_task_status'], - expected_task_status) - - else: - self.assertEqual(response.status_code, 403) - - def check_save_request_status(self, mock_scheduler, origin_url, - expected_request_status, - expected_task_status, - scheduler_task_status='next_run_not_scheduled', # noqa - visit_date=None): - - mock_scheduler.get_tasks.return_value = \ - [{ - 'priority': 'high', - 'policy': 'oneshot', - 'type': 'load-git', - 'arguments': { - 'kwargs': { - 'repo_url': origin_url - }, - 'args': [] + +pytestmark = pytest.mark.django_db + + +@pytest.fixture(autouse=True) +def populated_db(): + SaveUnauthorizedOrigin.objects.create( + url='https://github.com/user/illegal_repo') + SaveUnauthorizedOrigin.objects.create( + url='https://gitlab.com/user_to_exclude') + + +def test_invalid_visit_type(api_client): + url = reverse('api-1-save-origin', + url_args={'visit_type': 'foo', + 'origin_url': 'https://github.com/torvalds/linux'}) + + response = api_client.post(url) + assert response.status_code == 400 + + +def test_invalid_origin_url(api_client): + url = reverse('api-1-save-origin', + url_args={'visit_type': 'git', + 'origin_url': 'bar'}) + + response = api_client.post(url) + assert response.status_code == 400 + + +def check_created_save_request_status(api_client, mocker, origin_url, + scheduler_task_status, + expected_request_status, + expected_task_status=None, + visit_date=None): + + mock_scheduler = mocker.patch('swh.web.common.origin_save.scheduler') + if not scheduler_task_status: + mock_scheduler.get_tasks.return_value = [] + else: + mock_scheduler.get_tasks.return_value = [{ + 'priority': 'high', + 'policy': 'oneshot', + 'type': 'load-git', + 'arguments': { + 'kwargs': { + 'repo_url': origin_url }, - 'status': scheduler_task_status, - 'id': 1, - }] - - url = reverse('api-1-save-origin', - url_args={'visit_type': 'git', - 'origin_url': origin_url}) - - with patch('swh.web.common.origin_save._get_visit_info_for_save_request') as mock_visit_date: # noqa - mock_visit_date.return_value = (visit_date, None) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - save_request_data = response.data[0] - - self.assertEqual(save_request_data['save_request_status'], - expected_request_status) - self.assertEqual(save_request_data['save_task_status'], - expected_task_status) - - # Check that save task status is still available when - # the scheduler task has been archived - mock_scheduler.get_tasks.return_value = [] - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - save_request_data = response.data[0] - self.assertEqual(save_request_data['save_task_status'], - expected_task_status) - - @patch('swh.web.common.origin_save.scheduler') - def test_save_request_rejected(self, mock_scheduler): - origin_url = 'https://github.com/user/illegal_repo' - self.check_created_save_request_status(mock_scheduler, origin_url, - None, SAVE_REQUEST_REJECTED) - self.check_save_request_status(mock_scheduler, origin_url, - SAVE_REQUEST_REJECTED, - SAVE_TASK_NOT_CREATED) - - @patch('swh.web.common.origin_save.scheduler') - def test_save_request_pending(self, mock_scheduler): - origin_url = 'https://unkwownforge.com/user/repo' - self.check_created_save_request_status(mock_scheduler, origin_url, - None, SAVE_REQUEST_PENDING, - SAVE_TASK_NOT_CREATED) - self.check_save_request_status(mock_scheduler, origin_url, - SAVE_REQUEST_PENDING, - SAVE_TASK_NOT_CREATED) - - @patch('swh.web.common.origin_save.scheduler') - def test_save_request_succeed(self, mock_scheduler): - origin_url = 'https://github.com/Kitware/CMake' - self.check_created_save_request_status(mock_scheduler, origin_url, - None, SAVE_REQUEST_ACCEPTED, - SAVE_TASK_NOT_YET_SCHEDULED) - self.check_save_request_status(mock_scheduler, origin_url, - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_SCHEDULED, - scheduler_task_status='next_run_scheduled') # noqa - self.check_save_request_status(mock_scheduler, origin_url, - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_SUCCEED, - scheduler_task_status='completed', - visit_date=None) # noqa - visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) - self.check_save_request_status(mock_scheduler, origin_url, - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_SUCCEED, - scheduler_task_status='completed', - visit_date=visit_date) # noqa - - @patch('swh.web.common.origin_save.scheduler') - def test_save_request_failed(self, mock_scheduler): - origin_url = 'https://gitlab.com/inkscape/inkscape' - self.check_created_save_request_status(mock_scheduler, origin_url, - None, SAVE_REQUEST_ACCEPTED, - SAVE_TASK_NOT_YET_SCHEDULED) - self.check_save_request_status(mock_scheduler, origin_url, - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_SCHEDULED, - scheduler_task_status='next_run_scheduled') # noqa - self.check_save_request_status(mock_scheduler, origin_url, - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_FAILED, - scheduler_task_status='disabled') # noqa - - @patch('swh.web.common.origin_save.scheduler') - def test_create_save_request_only_when_needed(self, mock_scheduler): - origin_url = 'https://github.com/webpack/webpack' - SaveOriginRequest.objects.create(visit_type='git', - origin_url=origin_url, - status=SAVE_REQUEST_ACCEPTED, # noqa - loading_task_id=56) - - self.check_created_save_request_status(mock_scheduler, origin_url, - 'next_run_not_scheduled', - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_NOT_YET_SCHEDULED) - sors = list(SaveOriginRequest.objects.filter(visit_type='git', - origin_url=origin_url)) - self.assertEqual(len(sors), 1) - - self.check_created_save_request_status(mock_scheduler, origin_url, - 'next_run_scheduled', - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_SCHEDULED) - sors = list(SaveOriginRequest.objects.filter(visit_type='git', - origin_url=origin_url)) - self.assertEqual(len(sors), 1) - - visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) - self.check_created_save_request_status(mock_scheduler, origin_url, - 'completed', - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_NOT_YET_SCHEDULED, - visit_date=visit_date) - sors = list(SaveOriginRequest.objects.filter(visit_type='git', - origin_url=origin_url)) - self.assertEqual(len(sors), 2) - - self.check_created_save_request_status(mock_scheduler, origin_url, - 'disabled', - SAVE_REQUEST_ACCEPTED, - SAVE_TASK_NOT_YET_SCHEDULED) - sors = list(SaveOriginRequest.objects.filter(visit_type='git', - origin_url=origin_url)) - self.assertEqual(len(sors), 3) - - def test_get_save_requests_unknown_origin(self): - unknown_origin_url = 'https://gitlab.com/foo/bar' - url = reverse('api-1-save-origin', - url_args={'visit_type': 'git', - 'origin_url': unknown_origin_url}) - response = self.client.get(url) - self.assertEqual(response.status_code, 404) - self.assertEqual(response.data, { - 'exception': 'NotFoundExc', - 'reason': ('No save requests found for visit of type ' - 'git on origin with url %s.') % unknown_origin_url - }) + 'args': [] + }, + 'status': scheduler_task_status, + 'id': 1, + }] + + mock_scheduler.create_tasks.return_value = [{ + 'priority': 'high', + 'policy': 'oneshot', + 'type': 'load-git', + 'arguments': { + 'kwargs': { + 'repo_url': origin_url + }, + 'args': [] + }, + 'status': 'next_run_not_scheduled', + 'id': 1, + }] + + url = reverse('api-1-save-origin', + url_args={'visit_type': 'git', + 'origin_url': origin_url}) + + mock_visit_date = mocker.patch(('swh.web.common.origin_save.' + '_get_visit_info_for_save_request')) + mock_visit_date.return_value = (visit_date, None) + response = api_client.post(url) + + if expected_request_status != SAVE_REQUEST_REJECTED: + assert response.status_code == 200, response.data + assert (response.data['save_request_status'] == + expected_request_status) + assert response.data['save_task_status'] == expected_task_status + else: + assert response.status_code == 403, response.data + + +def check_save_request_status(api_client, mocker, origin_url, + expected_request_status, + expected_task_status, + scheduler_task_status='next_run_not_scheduled', + visit_date=None): + mock_scheduler = mocker.patch('swh.web.common.origin_save.scheduler') + mock_scheduler.get_tasks.return_value = [{ + 'priority': 'high', + 'policy': 'oneshot', + 'type': 'load-git', + 'arguments': { + 'kwargs': { + 'repo_url': origin_url + }, + 'args': [] + }, + 'status': scheduler_task_status, + 'id': 1, + }] + + url = reverse('api-1-save-origin', + url_args={'visit_type': 'git', + 'origin_url': origin_url}) + + mock_visit_date = mocker.patch(('swh.web.common.origin_save.' + '_get_visit_info_for_save_request')) + mock_visit_date.return_value = (visit_date, None) + response = api_client.get(url) + assert response.status_code == 200, response.data + save_request_data = response.data[0] + + assert (save_request_data['save_request_status'] == + expected_request_status) + assert save_request_data['save_task_status'] == expected_task_status + + # Check that save task status is still available when + # the scheduler task has been archived + mock_scheduler.get_tasks.return_value = [] + response = api_client.get(url) + assert response.status_code == 200 + save_request_data = response.data[0] + assert save_request_data['save_task_status'] == expected_task_status + + +def test_save_request_rejected(api_client, mocker): + origin_url = 'https://github.com/user/illegal_repo' + check_created_save_request_status(api_client, mocker, origin_url, + None, SAVE_REQUEST_REJECTED) + check_save_request_status(api_client, mocker, origin_url, + SAVE_REQUEST_REJECTED, SAVE_TASK_NOT_CREATED) + + +def test_save_request_pending(api_client, mocker): + origin_url = 'https://unkwownforge.com/user/repo' + check_created_save_request_status(api_client, mocker, + origin_url, None, SAVE_REQUEST_PENDING, + SAVE_TASK_NOT_CREATED) + check_save_request_status(api_client, mocker, origin_url, + SAVE_REQUEST_PENDING, SAVE_TASK_NOT_CREATED) + + +def test_save_request_succeed(api_client, mocker): + origin_url = 'https://github.com/Kitware/CMake' + check_created_save_request_status(api_client, mocker, origin_url, + None, SAVE_REQUEST_ACCEPTED, + SAVE_TASK_NOT_YET_SCHEDULED) + check_save_request_status(api_client, mocker, origin_url, + SAVE_REQUEST_ACCEPTED, SAVE_TASK_SCHEDULED, + scheduler_task_status='next_run_scheduled') + check_save_request_status(api_client, mocker, origin_url, + SAVE_REQUEST_ACCEPTED, SAVE_TASK_SUCCEED, + scheduler_task_status='completed', + visit_date=None) + visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) + check_save_request_status(api_client, mocker, origin_url, + SAVE_REQUEST_ACCEPTED, SAVE_TASK_SUCCEED, + scheduler_task_status='completed', + visit_date=visit_date) + + +def test_save_request_failed(api_client, mocker): + origin_url = 'https://gitlab.com/inkscape/inkscape' + check_created_save_request_status(api_client, mocker, origin_url, + None, SAVE_REQUEST_ACCEPTED, + SAVE_TASK_NOT_YET_SCHEDULED) + check_save_request_status(api_client, mocker, origin_url, + SAVE_REQUEST_ACCEPTED, SAVE_TASK_SCHEDULED, + scheduler_task_status='next_run_scheduled') + check_save_request_status(api_client, mocker, origin_url, + SAVE_REQUEST_ACCEPTED, SAVE_TASK_FAILED, + scheduler_task_status='disabled') + + +def test_create_save_request_only_when_needed(api_client, mocker): + origin_url = 'https://github.com/webpack/webpack' + SaveOriginRequest.objects.create(visit_type='git', origin_url=origin_url, + status=SAVE_REQUEST_ACCEPTED, + loading_task_id=56) + + check_created_save_request_status(api_client, mocker, origin_url, + 'next_run_not_scheduled', + SAVE_REQUEST_ACCEPTED, + SAVE_TASK_NOT_YET_SCHEDULED) + + sors = list(SaveOriginRequest.objects.filter(visit_type='git', + origin_url=origin_url)) + assert len(sors) == 1 + + check_created_save_request_status(api_client, mocker, origin_url, + 'next_run_scheduled', + SAVE_REQUEST_ACCEPTED, + SAVE_TASK_SCHEDULED) + sors = list(SaveOriginRequest.objects.filter(visit_type='git', + origin_url=origin_url)) + assert len(sors) == 1 + + visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) + check_created_save_request_status(api_client, mocker, origin_url, + 'completed', SAVE_REQUEST_ACCEPTED, + SAVE_TASK_NOT_YET_SCHEDULED, + visit_date=visit_date) + sors = list(SaveOriginRequest.objects.filter(visit_type='git', + origin_url=origin_url)) + assert len(sors) == 2 + + check_created_save_request_status(api_client, mocker, origin_url, + 'disabled', SAVE_REQUEST_ACCEPTED, + SAVE_TASK_NOT_YET_SCHEDULED) + sors = list(SaveOriginRequest.objects.filter(visit_type='git', + origin_url=origin_url)) + assert len(sors) == 3 + + +def test_get_save_requests_unknown_origin(api_client): + unknown_origin_url = 'https://gitlab.com/foo/bar' + url = reverse('api-1-save-origin', + url_args={'visit_type': 'git', + 'origin_url': unknown_origin_url}) + response = api_client.get(url) + assert response.status_code == 404 + assert response.data == { + 'exception': 'NotFoundExc', + 'reason': ('No save requests found for visit of type ' + 'git on origin with url %s.') % unknown_origin_url + } diff --git a/swh/web/tests/api/views/test_release.py b/swh/web/tests/api/views/test_release.py --- a/swh/web/tests/api/views/test_release.py +++ b/swh/web/tests/api/views/test_release.py @@ -5,7 +5,6 @@ from datetime import datetime from hypothesis import given -from rest_framework.test import APITestCase from swh.model.hashutil import hash_to_bytes from swh.web.common.utils import reverse @@ -13,104 +12,103 @@ from swh.web.tests.strategies import ( release, sha1, content, directory ) -from swh.web.tests.testcase import WebTestCase -class ReleaseApiTestCase(WebTestCase, APITestCase): +@given(release()) +def test_api_release(api_client, archive_data, release): + url = reverse('api-1-release', url_args={'sha1_git': release}) + + rv = api_client.get(url) + + expected_release = archive_data.release_get(release) + target_revision = expected_release['target'] + target_url = reverse('api-1-revision', + url_args={'sha1_git': target_revision}) + expected_release['target_url'] = target_url + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == expected_release + + +@given(sha1(), sha1(), sha1(), content(), directory(), release()) +def test_api_release_target_type_not_a_revision(api_client, archive_data, + new_rel1, new_rel2, + new_rel3, content, + directory, release): + for new_rel_id, target_type, target in ( + (new_rel1, 'content', content), + (new_rel2, 'directory', directory), + (new_rel3, 'release', release)): + + if target_type == 'content': + target = target['sha1_git'] + + sample_release = { + 'author': { + 'email': b'author@company.org', + 'fullname': b'author ', + 'name': b'author' + }, + 'date': { + 'timestamp': int(datetime.now().timestamp()), + 'offset': 0, + 'negative_utc': False, + }, + 'id': hash_to_bytes(new_rel_id), + 'message': b'sample release message', + 'name': b'sample release', + 'synthetic': False, + 'target': hash_to_bytes(target), + 'target_type': target_type + } + + archive_data.release_add([sample_release]) + + url = reverse('api-1-release', url_args={'sha1_git': new_rel_id}) + + rv = api_client.get(url) + + expected_release = archive_data.release_get(new_rel_id) + + if target_type == 'content': + url_args = {'q': 'sha1_git:%s' % target} + else: + url_args = {'sha1_git': target} + + target_url = reverse('api-1-%s' % target_type, + url_args=url_args) + expected_release['target_url'] = target_url - @given(release()) - def test_api_release(self, release): + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == expected_release - url = reverse('api-1-release', url_args={'sha1_git': release}) - rv = self.client.get(url) +def test_api_release_not_found(api_client): + unknown_release_ = random_sha1() - expected_release = self.release_get(release) - target_revision = expected_release['target'] - target_url = reverse('api-1-revision', - url_args={'sha1_git': target_revision}) - expected_release['target_url'] = target_url + url = reverse('api-1-release', url_args={'sha1_git': unknown_release_}) + + rv = api_client.get(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Release with sha1_git %s not found.' % unknown_release_ + } + + +@given(release()) +def test_api_release_uppercase(api_client, release): + url = reverse('api-1-release-uppercase-checksum', + url_args={'sha1_git': release.upper()}) + + resp = api_client.get(url) + assert resp.status_code == 302 + + redirect_url = reverse('api-1-release-uppercase-checksum', + url_args={'sha1_git': release}) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, expected_release) - - @given(sha1(), sha1(), sha1(), content(), directory(), release()) - def test_api_release_target_type_not_a_revision(self, new_rel1, new_rel2, - new_rel3, content, - directory, release): - - for new_rel_id, target_type, target in ( - (new_rel1, 'content', content), - (new_rel2, 'directory', directory), - (new_rel3, 'release', release)): - - if target_type == 'content': - target = target['sha1_git'] - - sample_release = { - 'author': { - 'email': b'author@company.org', - 'fullname': b'author ', - 'name': b'author' - }, - 'date': { - 'timestamp': int(datetime.now().timestamp()), - 'offset': 0, - 'negative_utc': False, - }, - 'id': hash_to_bytes(new_rel_id), - 'message': b'sample release message', - 'name': b'sample release', - 'synthetic': False, - 'target': hash_to_bytes(target), - 'target_type': target_type - } - - self.storage.release_add([sample_release]) - - url = reverse('api-1-release', url_args={'sha1_git': new_rel_id}) - - rv = self.client.get(url) - - expected_release = self.release_get(new_rel_id) - - if target_type == 'content': - url_args = {'q': 'sha1_git:%s' % target} - else: - url_args = {'sha1_git': target} - - target_url = reverse('api-1-%s' % target_type, - url_args=url_args) - expected_release['target_url'] = target_url - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, expected_release) - - def test_api_release_not_found(self): - unknown_release_ = random_sha1() - - url = reverse('api-1-release', url_args={'sha1_git': unknown_release_}) - - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Release with sha1_git %s not found.' % unknown_release_ - }) - - @given(release()) - def test_api_release_uppercase(self, release): - url = reverse('api-1-release-uppercase-checksum', - url_args={'sha1_git': release.upper()}) - - resp = self.client.get(url) - self.assertEqual(resp.status_code, 302) - - redirect_url = reverse('api-1-release-uppercase-checksum', - url_args={'sha1_git': release}) - - self.assertEqual(resp['location'], redirect_url) + assert resp['location'] == redirect_url diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -4,266 +4,268 @@ # See top-level LICENSE file for more information from hypothesis import given -from rest_framework.test import APITestCase -from unittest.mock import patch from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import revision -from swh.web.tests.testcase import WebTestCase -class RevisionApiTestCase(WebTestCase, APITestCase): +@given(revision()) +def test_api_revision(api_client, archive_data, revision): + url = reverse('api-1-revision', url_args={'sha1_git': revision}) + rv = api_client.get(url) - @given(revision()) - def test_api_revision(self, revision): + expected_revision = archive_data.revision_get(revision) - url = reverse('api-1-revision', url_args={'sha1_git': revision}) - rv = self.client.get(url) + _enrich_revision(expected_revision) - expected_revision = self.revision_get(revision) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == expected_revision - self._enrich_revision(expected_revision) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, expected_revision) +def test_api_revision_not_found(api_client): + unknown_revision_ = random_sha1() - def test_api_revision_not_found(self): - unknown_revision_ = random_sha1() + url = reverse('api-1-revision', + url_args={'sha1_git': unknown_revision_}) + rv = api_client.get(url) - url = reverse('api-1-revision', - url_args={'sha1_git': unknown_revision_}) - rv = self.client.get(url) + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_ + } - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Revision with sha1_git %s not found.' % - unknown_revision_}) - @given(revision()) - def test_api_revision_raw_ok(self, revision): +@given(revision()) +def test_api_revision_raw_ok(api_client, archive_data, revision): + url = reverse('api-1-revision-raw-message', + url_args={'sha1_git': revision}) + rv = api_client.get(url) - url = reverse('api-1-revision-raw-message', - url_args={'sha1_git': revision}) - rv = self.client.get(url) + expected_message = archive_data.revision_get(revision)['message'] - expected_message = self.revision_get(revision)['message'] + assert rv.status_code == 200 + assert rv['Content-Type'] == 'application/octet-stream' + assert rv.content == expected_message.encode() - self.assertEqual(rv.status_code, 200) - self.assertEqual(rv['Content-Type'], 'application/octet-stream') - self.assertEqual(rv.content, expected_message.encode()) - def test_api_revision_raw_ko_no_rev(self): - unknown_revision_ = random_sha1() +def test_api_revision_raw_ko_no_rev(api_client): + unknown_revision_ = random_sha1() - url = reverse('api-1-revision-raw-message', - url_args={'sha1_git': unknown_revision_}) - rv = self.client.get(url) + url = reverse('api-1-revision-raw-message', + url_args={'sha1_git': unknown_revision_}) + rv = api_client.get(url) - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Revision with sha1_git %s not found.' % - unknown_revision_}) + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_ + } - @given(revision()) - def test_api_revision_log(self, revision): - per_page = 10 +@given(revision()) +def test_api_revision_log(api_client, archive_data, revision): + per_page = 10 - url = reverse('api-1-revision-log', url_args={'sha1_git': revision}, - query_params={'per_page': per_page}) + url = reverse('api-1-revision-log', url_args={'sha1_git': revision}, + query_params={'per_page': per_page}) - rv = self.client.get(url) + rv = api_client.get(url) - expected_log = self.revision_log(revision, limit=per_page+1) - expected_log = list(map(self._enrich_revision, expected_log)) - - has_next = len(expected_log) > per_page - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, - expected_log[:-1] if has_next else expected_log) - - if has_next: - self.assertIn('Link', rv) - next_log_url = reverse( - 'api-1-revision-log', - url_args={'sha1_git': expected_log[-1]['id']}, - query_params={'per_page': per_page}) - self.assertIn(next_log_url, rv['Link']) - - def test_api_revision_log_not_found(self): - unknown_revision_ = random_sha1() - - url = reverse('api-1-revision-log', - url_args={'sha1_git': unknown_revision_}) - - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Revision with sha1_git %s not found.' % - unknown_revision_}) - self.assertFalse(rv.has_header('Link')) - - @given(revision()) - def test_api_revision_log_context(self, revision): - - revisions = self.revision_log(revision, limit=4) - - prev_rev = revisions[0]['id'] - rev = revisions[-1]['id'] - - per_page = 10 - - url = reverse('api-1-revision-log', - url_args={'sha1_git': rev, - 'prev_sha1s': prev_rev}, - query_params={'per_page': per_page}) - - rv = self.client.get(url) - - expected_log = self.revision_log(rev, limit=per_page) - prev_revision = self.revision_get(prev_rev) - expected_log.insert(0, prev_revision) - expected_log = list(map(self._enrich_revision, expected_log)) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, expected_log) - - @patch('swh.web.api.views.revision._revision_directory_by') - def test_api_revision_directory_ko_not_found(self, mock_rev_dir): - # given - mock_rev_dir.side_effect = NotFoundExc('Not found') - - # then - rv = self.client.get('/api/1/revision/999/directory/some/path/to/dir/') - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'NotFoundExc', - 'reason': 'Not found'}) - - mock_rev_dir.assert_called_once_with( - {'sha1_git': '999'}, - 'some/path/to/dir', - '/api/1/revision/999/directory/some/path/to/dir/', - with_data=False) - - @patch('swh.web.api.views.revision._revision_directory_by') - def test_api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): - stub_dir = { - 'type': 'dir', - 'revision': '999', - 'content': [ - { - 'sha1_git': '789', - 'type': 'file', - 'target': '101', - 'target_url': '/api/1/content/sha1_git:101/', - 'name': 'somefile', - 'file_url': '/api/1/revision/999/directory/some/path/' - 'somefile/' - }, - { - 'sha1_git': '123', - 'type': 'dir', - 'target': '456', - 'target_url': '/api/1/directory/456/', - 'name': 'to-subdir', - 'dir_url': '/api/1/revision/999/directory/some/path/' - 'to-subdir/', - }] - } + expected_log = archive_data.revision_log(revision, limit=per_page+1) + expected_log = list(map(_enrich_revision, expected_log)) + + has_next = len(expected_log) > per_page + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == (expected_log[:-1] if has_next else expected_log) + + if has_next: + assert 'Link' in rv + next_log_url = reverse( + 'api-1-revision-log', + url_args={'sha1_git': expected_log[-1]['id']}, + query_params={'per_page': per_page}) + assert next_log_url in rv['Link'] + + +def test_api_revision_log_not_found(api_client): + unknown_revision_ = random_sha1() + + url = reverse('api-1-revision-log', + url_args={'sha1_git': unknown_revision_}) + + rv = api_client.get(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_ + } + assert not rv.has_header('Link') + + +@given(revision()) +def test_api_revision_log_context(api_client, archive_data, revision): + revisions = archive_data.revision_log(revision, limit=4) - # given - mock_rev_dir.return_value = stub_dir + prev_rev = revisions[0]['id'] + rev = revisions[-1]['id'] - # then - rv = self.client.get('/api/1/revision/999/directory/some/path/') + per_page = 10 - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, stub_dir) + url = reverse('api-1-revision-log', + url_args={'sha1_git': rev, + 'prev_sha1s': prev_rev}, + query_params={'per_page': per_page}) - mock_rev_dir.assert_called_once_with( - {'sha1_git': '999'}, - 'some/path', - '/api/1/revision/999/directory/some/path/', - with_data=False) + rv = api_client.get(url) - @patch('swh.web.api.views.revision._revision_directory_by') - def test_api_revision_directory_ok_returns_content(self, mock_rev_dir): - stub_content = { - 'type': 'file', - 'revision': '999', - 'content': { + expected_log = archive_data.revision_log(rev, limit=per_page) + prev_revision = archive_data.revision_get(prev_rev) + expected_log.insert(0, prev_revision) + expected_log = list(map(_enrich_revision, expected_log)) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == expected_log + + +def test_api_revision_directory_ko_not_found(api_client, mocker): + mock_rev_dir = mocker.patch( + 'swh.web.api.views.revision._revision_directory_by') + mock_rev_dir.side_effect = NotFoundExc('Not found') + + rv = api_client.get('/api/1/revision/999/directory/some/path/to/dir/') + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'NotFoundExc', + 'reason': 'Not found' + } + + mock_rev_dir.assert_called_once_with( + {'sha1_git': '999'}, + 'some/path/to/dir', + '/api/1/revision/999/directory/some/path/to/dir/', + with_data=False + ) + + +def test_api_revision_directory_ok_returns_dir_entries(api_client, mocker): + mock_rev_dir = mocker.patch( + 'swh.web.api.views.revision._revision_directory_by') + stub_dir = { + 'type': 'dir', + 'revision': '999', + 'content': [ + { 'sha1_git': '789', - 'sha1': '101', - 'data_url': '/api/1/content/101/raw/', + 'type': 'file', + 'target': '101', + 'target_url': '/api/1/content/sha1_git:101/', + 'name': 'somefile', + 'file_url': '/api/1/revision/999/directory/some/path/' + 'somefile/' + }, + { + 'sha1_git': '123', + 'type': 'dir', + 'target': '456', + 'target_url': '/api/1/directory/456/', + 'name': 'to-subdir', + 'dir_url': '/api/1/revision/999/directory/some/path/' + 'to-subdir/', } + ] + } + + mock_rev_dir.return_value = stub_dir + + rv = api_client.get('/api/1/revision/999/directory/some/path/') + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == stub_dir + + mock_rev_dir.assert_called_once_with( + {'sha1_git': '999'}, + 'some/path', + '/api/1/revision/999/directory/some/path/', + with_data=False + ) + + +def test_api_revision_directory_ok_returns_content(api_client, mocker): + mock_rev_dir = mocker.patch( + 'swh.web.api.views.revision._revision_directory_by') + stub_content = { + 'type': 'file', + 'revision': '999', + 'content': { + 'sha1_git': '789', + 'sha1': '101', + 'data_url': '/api/1/content/101/raw/', } + } + + mock_rev_dir.return_value = stub_content + + url = '/api/1/revision/666/directory/some/other/path/' + rv = api_client.get(url) - # given - mock_rev_dir.return_value = stub_content + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == stub_content - # then - url = '/api/1/revision/666/directory/some/other/path/' - rv = self.client.get(url) + mock_rev_dir.assert_called_once_with( + {'sha1_git': '666'}, 'some/other/path', url, with_data=False) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, stub_content) - mock_rev_dir.assert_called_once_with( - {'sha1_git': '666'}, 'some/other/path', url, with_data=False) +@given(revision()) +def test_api_revision_uppercase(api_client, revision): + url = reverse('api-1-revision-uppercase-checksum', + url_args={'sha1_git': revision.upper()}) - def _enrich_revision(self, revision): - directory_url = reverse( - 'api-1-directory', - url_args={'sha1_git': revision['directory']}) + resp = api_client.get(url) + assert resp.status_code == 302 - history_url = reverse('api-1-revision-log', - url_args={'sha1_git': revision['id']}) + redirect_url = reverse('api-1-revision', + url_args={'sha1_git': revision}) - parents_id_url = [] - for p in revision['parents']: - parents_id_url.append({ - 'id': p, - 'url': reverse('api-1-revision', url_args={'sha1_git': p}) - }) + assert resp['location'] == redirect_url - revision_url = reverse('api-1-revision', - url_args={'sha1_git': revision['id']}) - revision['directory_url'] = directory_url - revision['history_url'] = history_url - revision['url'] = revision_url - revision['parents'] = parents_id_url +def _enrich_revision(revision): + directory_url = reverse( + 'api-1-directory', + url_args={'sha1_git': revision['directory']}) - return revision + history_url = reverse('api-1-revision-log', + url_args={'sha1_git': revision['id']}) - @given(revision()) - def test_api_revision_uppercase(self, revision): - url = reverse('api-1-revision-uppercase-checksum', - url_args={'sha1_git': revision.upper()}) + parents_id_url = [] + for p in revision['parents']: + parents_id_url.append({ + 'id': p, + 'url': reverse('api-1-revision', url_args={'sha1_git': p}) + }) - resp = self.client.get(url) - self.assertEqual(resp.status_code, 302) + revision_url = reverse('api-1-revision', + url_args={'sha1_git': revision['id']}) - redirect_url = reverse('api-1-revision', - url_args={'sha1_git': revision}) + revision['directory_url'] = directory_url + revision['history_url'] = history_url + revision['url'] = revision_url + revision['parents'] = parents_id_url - self.assertEqual(resp['location'], redirect_url) + return revision diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -6,7 +6,6 @@ import random from hypothesis import given -from rest_framework.test import APITestCase from swh.model.hashutil import hash_to_hex from swh.web.common.utils import reverse @@ -14,177 +13,182 @@ from swh.web.tests.strategies import ( snapshot, new_snapshot ) -from swh.web.tests.testcase import WebTestCase -class SnapshotApiTestCase(WebTestCase, APITestCase): +@given(snapshot()) +def test_api_snapshot(api_client, archive_data, snapshot): - @given(snapshot()) - def test_api_snapshot(self, snapshot): + url = reverse('api-1-snapshot', + url_args={'snapshot_id': snapshot}) + rv = api_client.get(url) - url = reverse('api-1-snapshot', - url_args={'snapshot_id': snapshot}) - rv = self.client.get(url) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = self.snapshot_get(snapshot) - expected_data = self._enrich_snapshot(expected_data) - self.assertEqual(rv.data, expected_data) - - @given(snapshot()) - def test_api_snapshot_paginated(self, snapshot): - - branches_offset = 0 - branches_count = 2 - - snapshot_branches = [] - - for k, v in sorted(self.snapshot_get(snapshot)['branches'].items()): - snapshot_branches.append({ - 'name': k, - 'target_type': v['target_type'], - 'target': v['target'] - }) - - whole_snapshot = {'id': snapshot, 'branches': {}, 'next_branch': None} - - while branches_offset < len(snapshot_branches): - branches_from = snapshot_branches[branches_offset]['name'] - url = reverse('api-1-snapshot', - url_args={'snapshot_id': snapshot}, - query_params={'branches_from': branches_from, - 'branches_count': branches_count}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = self.snapshot_get_branches(snapshot, branches_from, - branches_count) - - expected_data = self._enrich_snapshot(expected_data) - - branches_offset += branches_count - if branches_offset < len(snapshot_branches): - next_branch = snapshot_branches[branches_offset]['name'] - expected_data['next_branch'] = next_branch - else: - expected_data['next_branch'] = None - - self.assertEqual(rv.data, expected_data) - whole_snapshot['branches'].update(expected_data['branches']) - - if branches_offset < len(snapshot_branches): - next_url = reverse( - 'api-1-snapshot', - url_args={'snapshot_id': snapshot}, - query_params={'branches_from': next_branch, - 'branches_count': branches_count}) - self.assertEqual(rv['Link'], '<%s>; rel="next"' % next_url) - else: - self.assertFalse(rv.has_header('Link')) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + expected_data = archive_data.snapshot_get(snapshot) + expected_data = _enrich_snapshot(archive_data, expected_data) + assert rv.data == expected_data - url = reverse('api-1-snapshot', - url_args={'snapshot_id': snapshot}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, whole_snapshot) +@given(snapshot()) +def test_api_snapshot_paginated(api_client, archive_data, snapshot): - @given(snapshot()) - def test_api_snapshot_filtered(self, snapshot): + branches_offset = 0 + branches_count = 2 - snapshot_branches = [] + snapshot_branches = [] - for k, v in sorted(self.snapshot_get(snapshot)['branches'].items()): - snapshot_branches.append({ - 'name': k, - 'target_type': v['target_type'], - 'target': v['target'] - }) + for k, v in sorted( + archive_data.snapshot_get(snapshot)['branches'].items()): + snapshot_branches.append({ + 'name': k, + 'target_type': v['target_type'], + 'target': v['target'] + }) - target_type = random.choice(snapshot_branches)['target_type'] + whole_snapshot = {'id': snapshot, 'branches': {}, 'next_branch': None} + while branches_offset < len(snapshot_branches): + branches_from = snapshot_branches[branches_offset]['name'] url = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}, - query_params={'target_types': target_type}) - rv = self.client.get(url) - - expected_data = self.snapshot_get_branches( - snapshot, target_types=target_type) - expected_data = self._enrich_snapshot(expected_data) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, expected_data) + query_params={'branches_from': branches_from, + 'branches_count': branches_count}) + rv = api_client.get(url) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + expected_data = archive_data.snapshot_get_branches( + snapshot, branches_from, branches_count) + + expected_data = _enrich_snapshot(archive_data, expected_data) - def test_api_snapshot_errors(self): - unknown_snapshot_ = random_sha1() - - url = reverse('api-1-snapshot', - url_args={'snapshot_id': '63ce369'}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 400, rv.data) - - url = reverse('api-1-snapshot', - url_args={'snapshot_id': unknown_snapshot_}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 404, rv.data) - - def _enrich_snapshot(self, snapshot): - def _get_branch_url(target_type, target): - url = None - if target_type == 'revision': - url = reverse('api-1-revision', url_args={'sha1_git': target}) - if target_type == 'release': - url = reverse('api-1-release', url_args={'sha1_git': target}) - return url - - for branch in snapshot['branches'].keys(): - target = snapshot['branches'][branch]['target'] - target_type = snapshot['branches'][branch]['target_type'] - snapshot['branches'][branch]['target_url'] = \ - _get_branch_url(target_type, target) - for branch in snapshot['branches'].keys(): - target = snapshot['branches'][branch]['target'] - target_type = snapshot['branches'][branch]['target_type'] - if target_type == 'alias': - if target in snapshot['branches']: - snapshot['branches'][branch]['target_url'] = \ - snapshot['branches'][target]['target_url'] - else: - snp = self.snapshot_get_branches(snapshot['id'], - branches_from=target, - branches_count=1) - alias_target = snp['branches'][target]['target'] - alias_target_type = snp['branches'][target]['target_type'] - snapshot['branches'][branch]['target_url'] = \ - _get_branch_url(alias_target_type, alias_target) - - return snapshot - - @given(snapshot()) - def test_api_snapshot_uppercase(self, snapshot): - url = reverse('api-1-snapshot-uppercase-checksum', - url_args={'snapshot_id': snapshot.upper()}) - - resp = self.client.get(url) - self.assertEqual(resp.status_code, 302) - - redirect_url = reverse('api-1-snapshot-uppercase-checksum', - url_args={'snapshot_id': snapshot}) - - self.assertEqual(resp['location'], redirect_url) - - @given(new_snapshot(min_size=4)) - def test_api_snapshot_null_branch(self, new_snapshot): - snp_dict = new_snapshot.to_dict() - snp_id = hash_to_hex(snp_dict['id']) - for branch in snp_dict['branches'].keys(): - snp_dict['branches'][branch] = None - break - self.storage.snapshot_add([snp_dict]) - url = reverse('api-1-snapshot', - url_args={'snapshot_id': snp_id}) - rv = self.client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) + branches_offset += branches_count + if branches_offset < len(snapshot_branches): + next_branch = snapshot_branches[branches_offset]['name'] + expected_data['next_branch'] = next_branch + else: + expected_data['next_branch'] = None + + assert rv.data == expected_data + whole_snapshot['branches'].update(expected_data['branches']) + + if branches_offset < len(snapshot_branches): + next_url = reverse( + 'api-1-snapshot', + url_args={'snapshot_id': snapshot}, + query_params={'branches_from': next_branch, + 'branches_count': branches_count}) + assert rv['Link'] == '<%s>; rel="next"' % next_url + else: + assert not rv.has_header('Link') + + url = reverse('api-1-snapshot', + url_args={'snapshot_id': snapshot}) + rv = api_client.get(url) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == whole_snapshot + + +@given(snapshot()) +def test_api_snapshot_filtered(api_client, archive_data, snapshot): + + snapshot_branches = [] + + for k, v in sorted( + archive_data.snapshot_get(snapshot)['branches'].items()): + snapshot_branches.append({ + 'name': k, + 'target_type': v['target_type'], + 'target': v['target'] + }) + + target_type = random.choice(snapshot_branches)['target_type'] + + url = reverse('api-1-snapshot', + url_args={'snapshot_id': snapshot}, + query_params={'target_types': target_type}) + rv = api_client.get(url) + + expected_data = archive_data.snapshot_get_branches( + snapshot, target_types=target_type) + expected_data = _enrich_snapshot(archive_data, expected_data) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == expected_data + + +def test_api_snapshot_errors(api_client): + unknown_snapshot_ = random_sha1() + + url = reverse('api-1-snapshot', + url_args={'snapshot_id': '63ce369'}) + rv = api_client.get(url) + assert rv.status_code == 400, rv.data + + url = reverse('api-1-snapshot', + url_args={'snapshot_id': unknown_snapshot_}) + rv = api_client.get(url) + assert rv.status_code == 404, rv.data + + +@given(snapshot()) +def test_api_snapshot_uppercase(api_client, snapshot): + url = reverse('api-1-snapshot-uppercase-checksum', + url_args={'snapshot_id': snapshot.upper()}) + + resp = api_client.get(url) + assert resp.status_code == 302 + + redirect_url = reverse('api-1-snapshot-uppercase-checksum', + url_args={'snapshot_id': snapshot}) + + assert resp['location'] == redirect_url + + +@given(new_snapshot(min_size=4)) +def test_api_snapshot_null_branch(api_client, archive_data, new_snapshot): + snp_dict = new_snapshot.to_dict() + snp_id = hash_to_hex(snp_dict['id']) + for branch in snp_dict['branches'].keys(): + snp_dict['branches'][branch] = None + break + archive_data.snapshot_add([snp_dict]) + url = reverse('api-1-snapshot', + url_args={'snapshot_id': snp_id}) + rv = api_client.get(url) + assert rv.status_code == 200, rv.data + + +def _enrich_snapshot(archive_data, snapshot): + def _get_branch_url(target_type, target): + url = None + if target_type == 'revision': + url = reverse('api-1-revision', url_args={'sha1_git': target}) + if target_type == 'release': + url = reverse('api-1-release', url_args={'sha1_git': target}) + return url + + for branch in snapshot['branches'].keys(): + target = snapshot['branches'][branch]['target'] + target_type = snapshot['branches'][branch]['target_type'] + snapshot['branches'][branch]['target_url'] = \ + _get_branch_url(target_type, target) + for branch in snapshot['branches'].keys(): + target = snapshot['branches'][branch]['target'] + target_type = snapshot['branches'][branch]['target_type'] + if target_type == 'alias': + if target in snapshot['branches']: + snapshot['branches'][branch]['target_url'] = \ + snapshot['branches'][target]['target_url'] + else: + snp = archive_data.snapshot_get_branches(snapshot['id'], + branches_from=target, + branches_count=1) + alias_target = snp['branches'][target]['target'] + alias_target_type = snp['branches'][target]['target_type'] + snapshot['branches'][branch]['target_url'] = \ + _get_branch_url(alias_target_type, alias_target) + + return snapshot diff --git a/swh/web/tests/api/views/test_stat.py b/swh/web/tests/api/views/test_stat.py --- a/swh/web/tests/api/views/test_stat.py +++ b/swh/web/tests/api/views/test_stat.py @@ -3,74 +3,68 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from rest_framework.test import APITestCase -from unittest.mock import patch - from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse -from swh.web.tests.testcase import WebTestCase - -class StatApiTestCase(WebTestCase, APITestCase): - @patch('swh.web.api.views.stat.service') - def test_api_1_stat_counters_raise_error(self, mock_service): - mock_service.stat_counters.side_effect = BadInputExc( - 'voluntary error to check the bad request middleware.') +def test_api_1_stat_counters_raise_error(api_client, mocker): + mock_service = mocker.patch('swh.web.api.views.stat.service') + mock_service.stat_counters.side_effect = BadInputExc( + 'voluntary error to check the bad request middleware.') - url = reverse('api-1-stat-counters') - rv = self.client.get(url) + url = reverse('api-1-stat-counters') + rv = api_client.get(url) - self.assertEqual(rv.status_code, 400, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'BadInputExc', - 'reason': 'voluntary error to check the bad request middleware.'}) + assert rv.status_code == 400, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'BadInputExc', + 'reason': 'voluntary error to check the bad request middleware.'} - @patch('swh.web.api.views.stat.service') - def test_api_1_stat_counters_raise_from_db(self, mock_service): - mock_service.stat_counters.side_effect = StorageDBError( - 'Storage exploded! Will be back online shortly!') +def test_api_1_stat_counters_raise_from_db(api_client, mocker): + mock_service = mocker.patch('swh.web.api.views.stat.service') + mock_service.stat_counters.side_effect = StorageDBError( + 'Storage exploded! Will be back online shortly!') - url = reverse('api-1-stat-counters') - rv = self.client.get(url) + url = reverse('api-1-stat-counters') + rv = api_client.get(url) - self.assertEqual(rv.status_code, 503, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'StorageDBError', - 'reason': - 'An unexpected error occurred in the backend: ' - 'Storage exploded! Will be back online shortly!'}) + assert rv.status_code == 503, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'StorageDBError', + 'reason': + 'An unexpected error occurred in the backend: ' + 'Storage exploded! Will be back online shortly!'} - @patch('swh.web.api.views.stat.service') - def test_api_1_stat_counters_raise_from_api(self, mock_service): - mock_service.stat_counters.side_effect = StorageAPIError( - 'Storage API dropped dead! Will resurrect from its ashes asap!' - ) +def test_api_1_stat_counters_raise_from_api(api_client, mocker): + mock_service = mocker.patch('swh.web.api.views.stat.service') + mock_service.stat_counters.side_effect = StorageAPIError( + 'Storage API dropped dead! Will resurrect from its ashes asap!' + ) - url = reverse('api-1-stat-counters') - rv = self.client.get(url) + url = reverse('api-1-stat-counters') + rv = api_client.get(url) - self.assertEqual(rv.status_code, 503, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, { - 'exception': 'StorageAPIError', - 'reason': - 'An unexpected error occurred in the api backend: ' - 'Storage API dropped dead! Will resurrect from its ashes asap!' - }) + assert rv.status_code == 503, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == { + 'exception': 'StorageAPIError', + 'reason': + 'An unexpected error occurred in the api backend: ' + 'Storage API dropped dead! Will resurrect from its ashes asap!' + } - def test_api_1_stat_counters(self): - url = reverse('api-1-stat-counters') +def test_api_1_stat_counters(api_client, archive_data): + url = reverse('api-1-stat-counters') - rv = self.client.get(url) + rv = api_client.get(url) - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, self.storage.stat_counters()) + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data == archive_data.stat_counters() diff --git a/swh/web/tests/api/views/test_vault.py b/swh/web/tests/api/views/test_vault.py --- a/swh/web/tests/api/views/test_vault.py +++ b/swh/web/tests/api/views/test_vault.py @@ -3,120 +3,116 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from rest_framework.test import APITestCase -from unittest.mock import patch - from swh.model import hashutil -from swh.web.tests.testcase import WebTestCase - TEST_OBJ_ID = 'd4905454cc154b492bd6afed48694ae3c579345e' OBJECT_TYPES = {'directory': ('directory', None), 'revision_gitfast': ('revision', 'gitfast')} -class VaultApiTestCase(WebTestCase, APITestCase): - @patch('swh.web.api.views.vault.service') - def test_api_vault_cook(self, mock_service): - stub_cook = { - 'fetch_url': ('http://127.0.0.1:5004/api/1/vault/directory/{}/raw/' - .format(TEST_OBJ_ID)), - 'obj_id': TEST_OBJ_ID, - 'obj_type': 'test_type', - 'progress_message': None, - 'status': 'done', - 'task_uuid': 'de75c902-5ee5-4739-996e-448376a93eff', - } - stub_fetch = b'content' - - mock_service.vault_cook.return_value = stub_cook - mock_service.vault_fetch.return_value = stub_fetch - - for obj_type, (obj_type_name, obj_type_format) in OBJECT_TYPES.items(): - url = '/api/1/vault/{}/{}/'.format(obj_type_name, TEST_OBJ_ID) - if obj_type_format: - url += '{}/'.format(obj_type_format) - rv = self.client.post(url, {'email': 'test@test.mail'}) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - - self.assertEqual(rv.data, stub_cook) - mock_service.vault_cook.assert_called_with( - obj_type, - hashutil.hash_to_bytes(TEST_OBJ_ID), - 'test@test.mail') - - rv = self.client.get(url + 'raw/') - - self.assertEqual(rv.status_code, 200) - self.assertEqual(rv['Content-Type'], 'application/gzip') - self.assertEqual(rv.content, stub_fetch) - mock_service.vault_fetch.assert_called_with( - obj_type, hashutil.hash_to_bytes(TEST_OBJ_ID)) - - @patch('swh.web.api.views.vault.service') - def test_api_vault_cook_uppercase_hash(self, mock_service): - stub_cook = { - 'fetch_url': ('http://127.0.0.1:5004/api/1/vault/directory/{}/raw/' - .format(TEST_OBJ_ID.upper())), - 'obj_id': TEST_OBJ_ID.upper(), - 'obj_type': 'test_type', - 'progress_message': None, - 'status': 'done', - 'task_uuid': 'de75c902-5ee5-4739-996e-448376a93eff', - } - stub_fetch = b'content' - - mock_service.vault_cook.return_value = stub_cook - mock_service.vault_fetch.return_value = stub_fetch - - for obj_type, (obj_type_name, obj_type_format) in OBJECT_TYPES.items(): - url = '/api/1/vault/{}/{}/'.format(obj_type_name, TEST_OBJ_ID) - if obj_type_format: - url += '{}/'.format(obj_type_format) - rv = self.client.post(url, {'email': 'test@test.mail'}) - - self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - - self.assertEqual(rv.data, stub_cook) - mock_service.vault_cook.assert_called_with( - obj_type, - hashutil.hash_to_bytes(TEST_OBJ_ID), - 'test@test.mail') - - rv = self.client.get(url + 'raw/') - - self.assertEqual(rv.status_code, 200) - self.assertEqual(rv['Content-Type'], 'application/gzip') - self.assertEqual(rv.content, stub_fetch) - mock_service.vault_fetch.assert_called_with( - obj_type, hashutil.hash_to_bytes(TEST_OBJ_ID)) - - @patch('swh.web.api.views.vault.service') - def test_api_vault_cook_notfound(self, mock_service): - mock_service.vault_cook.return_value = None - mock_service.vault_fetch.return_value = None - - for obj_type, (obj_type_name, obj_type_format) in OBJECT_TYPES.items(): - url = '/api/1/vault/{}/{}/'.format(obj_type_name, TEST_OBJ_ID) - if obj_type_format: - url += '{}/'.format(obj_type_format) - rv = self.client.post(url) - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - - self.assertEqual(rv.data['exception'], 'NotFoundExc') - mock_service.vault_cook.assert_called_with( - obj_type, hashutil.hash_to_bytes(TEST_OBJ_ID), None) - - rv = self.client.get(url + 'raw/') - - self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data['exception'], 'NotFoundExc') - mock_service.vault_fetch.assert_called_with( - obj_type, hashutil.hash_to_bytes(TEST_OBJ_ID)) +def test_api_vault_cook(api_client, mocker): + mock_service = mocker.patch('swh.web.api.views.vault.service') + stub_cook = { + 'fetch_url': ('http://127.0.0.1:5004/api/1/vault/directory/{}/raw/' + .format(TEST_OBJ_ID)), + 'obj_id': TEST_OBJ_ID, + 'obj_type': 'test_type', + 'progress_message': None, + 'status': 'done', + 'task_uuid': 'de75c902-5ee5-4739-996e-448376a93eff', + } + stub_fetch = b'content' + + mock_service.vault_cook.return_value = stub_cook + mock_service.vault_fetch.return_value = stub_fetch + + for obj_type, (obj_type_name, obj_type_format) in OBJECT_TYPES.items(): + url = '/api/1/vault/{}/{}/'.format(obj_type_name, TEST_OBJ_ID) + if obj_type_format: + url += '{}/'.format(obj_type_format) + rv = api_client.post(url, {'email': 'test@test.mail'}) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + assert rv.data == stub_cook + mock_service.vault_cook.assert_called_with( + obj_type, + hashutil.hash_to_bytes(TEST_OBJ_ID), + 'test@test.mail') + + rv = api_client.get(url + 'raw/') + + assert rv.status_code == 200 + assert rv['Content-Type'] == 'application/gzip' + assert rv.content == stub_fetch + mock_service.vault_fetch.assert_called_with( + obj_type, hashutil.hash_to_bytes(TEST_OBJ_ID)) + + +def test_api_vault_cook_uppercase_hash(api_client, mocker): + mock_service = mocker.patch('swh.web.api.views.vault.service') + stub_cook = { + 'fetch_url': ('http://127.0.0.1:5004/api/1/vault/directory/{}/raw/' + .format(TEST_OBJ_ID.upper())), + 'obj_id': TEST_OBJ_ID.upper(), + 'obj_type': 'test_type', + 'progress_message': None, + 'status': 'done', + 'task_uuid': 'de75c902-5ee5-4739-996e-448376a93eff', + } + stub_fetch = b'content' + + mock_service.vault_cook.return_value = stub_cook + mock_service.vault_fetch.return_value = stub_fetch + + for obj_type, (obj_type_name, obj_type_format) in OBJECT_TYPES.items(): + url = '/api/1/vault/{}/{}/'.format(obj_type_name, TEST_OBJ_ID) + if obj_type_format: + url += '{}/'.format(obj_type_format) + rv = api_client.post(url, {'email': 'test@test.mail'}) + + assert rv.status_code == 200, rv.data + assert rv['Content-Type'] == 'application/json' + + assert rv.data == stub_cook + mock_service.vault_cook.assert_called_with( + obj_type, + hashutil.hash_to_bytes(TEST_OBJ_ID), + 'test@test.mail') + + rv = api_client.get(url + 'raw/') + + assert rv.status_code == 200 + assert rv['Content-Type'] == 'application/gzip' + assert rv.content == stub_fetch + mock_service.vault_fetch.assert_called_with( + obj_type, hashutil.hash_to_bytes(TEST_OBJ_ID)) + + +def test_api_vault_cook_notfound(api_client, mocker): + mock_service = mocker.patch('swh.web.api.views.vault.service') + mock_service.vault_cook.return_value = None + mock_service.vault_fetch.return_value = None + + for obj_type, (obj_type_name, obj_type_format) in OBJECT_TYPES.items(): + url = '/api/1/vault/{}/{}/'.format(obj_type_name, TEST_OBJ_ID) + if obj_type_format: + url += '{}/'.format(obj_type_format) + rv = api_client.post(url) + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + + assert rv.data['exception'] == 'NotFoundExc' + mock_service.vault_cook.assert_called_with( + obj_type, hashutil.hash_to_bytes(TEST_OBJ_ID), None) + + rv = api_client.get(url + 'raw/') + + assert rv.status_code == 404, rv.data + assert rv['Content-Type'] == 'application/json' + assert rv.data['exception'] == 'NotFoundExc' + mock_service.vault_fetch.assert_called_with( + obj_type, hashutil.hash_to_bytes(TEST_OBJ_ID)) diff --git a/swh/web/tests/browse/test_utils.py b/swh/web/tests/browse/test_utils.py --- a/swh/web/tests/browse/test_utils.py +++ b/swh/web/tests/browse/test_utils.py @@ -8,134 +8,121 @@ from swh.web.browse import utils from swh.web.common.utils import reverse, format_utc_iso_date from swh.web.tests.strategies import origin_with_multiple_visits -from swh.web.tests.testcase import WebTestCase - - -class SwhBrowseUtilsTestCase(WebTestCase): - - def test_get_mimetype_and_encoding_for_content(self): - text = b'Hello world!' - self.assertEqual(utils.get_mimetype_and_encoding_for_content(text), - ('text/plain', 'us-ascii')) - - @given(origin_with_multiple_visits()) - def test_get_origin_visit_snapshot_simple(self, origin): - - visits = self.origin_visit_get(origin['url']) - - for visit in visits: - - snapshot = self.snapshot_get(visit['snapshot']) - branches = [] - releases = [] - - def _process_branch_data(branch, branch_data): - if branch_data['target_type'] == 'revision': - rev_data = self.revision_get(branch_data['target']) - branches.append({ - 'name': branch, - 'revision': branch_data['target'], - 'directory': rev_data['directory'], - 'date': format_utc_iso_date(rev_data['date']), - 'message': rev_data['message'] - }) - elif branch_data['target_type'] == 'release': - rel_data = self.release_get(branch_data['target']) - rev_data = self.revision_get(rel_data['target']) - releases.append({ - 'name': rel_data['name'], - 'branch_name': branch, - 'date': format_utc_iso_date(rel_data['date']), - 'id': rel_data['id'], - 'message': rel_data['message'], - 'target_type': rel_data['target_type'], - 'target': rel_data['target'], - 'directory': rev_data['directory'] - }) - - for branch in sorted(snapshot['branches'].keys()): - branch_data = snapshot['branches'][branch] - if branch_data['target_type'] == 'alias': - target_data = snapshot['branches'][branch_data['target']] - _process_branch_data(branch, target_data) - else: - _process_branch_data(branch, branch_data) - - assert branches and releases, 'Incomplete test data.' - - origin_visit_branches = utils.get_origin_visit_snapshot( - origin, visit_id=visit['visit']) - - self.assertEqual(origin_visit_branches, (branches, releases)) - - def test_gen_link(self): - self.assertEqual( - utils.gen_link('https://www.softwareheritage.org/', 'swh'), + + +def test_get_mimetype_and_encoding_for_content(): + text = b'Hello world!' + assert (utils.get_mimetype_and_encoding_for_content(text) == + ('text/plain', 'us-ascii')) + + +@given(origin_with_multiple_visits()) +def test_get_origin_visit_snapshot_simple(archive_data, origin): + visits = archive_data.origin_visit_get(origin['url']) + + for visit in visits: + + snapshot = archive_data.snapshot_get(visit['snapshot']) + branches = [] + releases = [] + + def _process_branch_data(branch, branch_data): + if branch_data['target_type'] == 'revision': + rev_data = archive_data.revision_get(branch_data['target']) + branches.append({ + 'name': branch, + 'revision': branch_data['target'], + 'directory': rev_data['directory'], + 'date': format_utc_iso_date(rev_data['date']), + 'message': rev_data['message'] + }) + elif branch_data['target_type'] == 'release': + rel_data = archive_data.release_get(branch_data['target']) + rev_data = archive_data.revision_get(rel_data['target']) + releases.append({ + 'name': rel_data['name'], + 'branch_name': branch, + 'date': format_utc_iso_date(rel_data['date']), + 'id': rel_data['id'], + 'message': rel_data['message'], + 'target_type': rel_data['target_type'], + 'target': rel_data['target'], + 'directory': rev_data['directory'] + }) + + for branch in sorted(snapshot['branches'].keys()): + branch_data = snapshot['branches'][branch] + if branch_data['target_type'] == 'alias': + target_data = snapshot['branches'][branch_data['target']] + _process_branch_data(branch, target_data) + else: + _process_branch_data(branch, branch_data) + + assert branches and releases, 'Incomplete test data.' + + origin_visit_branches = utils.get_origin_visit_snapshot( + origin, visit_id=visit['visit']) + + assert origin_visit_branches == (branches, releases) + + +def test_gen_link(): + assert (utils.gen_link('https://www.softwareheritage.org/', 'swh') == 'swh') - def test_gen_revision_link(self): - revision_id = '28a0bc4120d38a394499382ba21d6965a67a3703' - revision_url = reverse('browse-revision', - url_args={'sha1_git': revision_id}) - - self.assertEqual(utils.gen_revision_link(revision_id, link_text=None, - link_attrs=None), - '%s' % (revision_url, revision_id)) - self.assertEqual( - utils.gen_revision_link(revision_id, shorten_id=True, - link_attrs=None), + +def test_gen_revision_link(): + revision_id = '28a0bc4120d38a394499382ba21d6965a67a3703' + revision_url = reverse('browse-revision', + url_args={'sha1_git': revision_id}) + + assert (utils.gen_revision_link(revision_id, link_text=None, + link_attrs=None) == + '%s' % (revision_url, revision_id)) + assert (utils.gen_revision_link(revision_id, shorten_id=True, + link_attrs=None) == '%s' % (revision_url, revision_id[:7])) - def test_gen_person_mail_link(self): - person_full = { - 'name': 'John Doe', - 'email': 'john.doe@swh.org', - 'fullname': 'John Doe ' - } - self.assertEqual( - utils.gen_person_mail_link(person_full), +def test_gen_person_mail_link(): + person_full = { + 'name': 'John Doe', + 'email': 'john.doe@swh.org', + 'fullname': 'John Doe ' + } + + assert (utils.gen_person_mail_link(person_full) == '%s' % (person_full['email'], - person_full['name']) - ) + person_full['name'])) - link_text = 'Mail' - self.assertEqual( - utils.gen_person_mail_link(person_full, link_text=link_text), + link_text = 'Mail' + assert (utils.gen_person_mail_link(person_full, link_text=link_text) == '%s' % (person_full['email'], - link_text) - ) + link_text)) - person_partial_email = { - 'name': None, - 'email': None, - 'fullname': 'john.doe@swh.org' - } + person_partial_email = { + 'name': None, + 'email': None, + 'fullname': 'john.doe@swh.org' + } - self.assertEqual( - utils.gen_person_mail_link(person_partial_email), + assert (utils.gen_person_mail_link(person_partial_email) == '%s' % (person_partial_email['fullname'], - person_partial_email['fullname']) - ) - - person_partial = { - 'name': None, - 'email': None, - 'fullname': 'John Doe ' - } - - self.assertEqual( - utils.gen_person_mail_link(person_partial), - person_partial['fullname'] - ) - - person_none = { - 'name': None, - 'email': None, - 'fullname': None - } - - self.assertEqual( - utils.gen_person_mail_link(person_none), - 'None' - ) + person_partial_email['fullname'])) + + person_partial = { + 'name': None, + 'email': None, + 'fullname': 'John Doe ' + } + + assert (utils.gen_person_mail_link(person_partial) == + person_partial['fullname']) + + person_none = { + 'name': None, + 'email': None, + 'fullname': None + } + + assert utils.gen_person_mail_link(person_none) == 'None' diff --git a/swh/web/tests/browse/views/test_content.py b/swh/web/tests/browse/views/test_content.py --- a/swh/web/tests/browse/views/test_content.py +++ b/swh/web/tests/browse/views/test_content.py @@ -3,8 +3,6 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from unittest.mock import patch - from django.utils.html import escape from hypothesis import given @@ -16,354 +14,356 @@ from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse, get_swh_persistent_id from swh.web.common.utils import gen_path_info +from swh.web.tests.django_asserts import ( + assert_contains, assert_not_contains, assert_template_used +) from swh.web.tests.strategies import ( content, content_text_non_utf8, content_text_no_highlight, content_image_type, content_text, invalid_sha1, unknown_content ) -from swh.web.tests.testcase import WebTestCase - -class SwhBrowseContentTest(WebTestCase): - @given(content_text()) - def test_content_view_text(self, content): +@given(content_text()) +def test_content_view_text(client, archive_data, content): + sha1_git = content['sha1_git'] - sha1_git = content['sha1_git'] + url = reverse('browse-content', + url_args={'query_string': content['sha1']}, + query_params={'path': content['path']}) - url = reverse('browse-content', - url_args={'query_string': content['sha1']}, - query_params={'path': content['path']}) + url_raw = reverse('browse-content-raw', + url_args={'query_string': content['sha1']}) - url_raw = reverse('browse-content-raw', - url_args={'query_string': content['sha1']}) + resp = client.get(url) - resp = self.client.get(url) + content_display = _process_content_for_display(archive_data, content) + mimetype = content_display['mimetype'] - content_display = self._process_content_for_display(content) - mimetype = content_display['mimetype'] + assert resp.status_code == 200 + assert_template_used('browse/content.html') - self.assertEqual(resp.status_code, 200) - self.assertTemplateUsed('browse/content.html') + if mimetype.startswith('text/'): + assert_contains(resp, '' % + content_display['language']) + assert_contains(resp, escape(content_display['content_data'])) + assert_contains(resp, url_raw) - if mimetype.startswith('text/'): - self.assertContains(resp, '' % - content_display['language']) - self.assertContains(resp, escape(content_display['content_data'])) - self.assertContains(resp, url_raw) + swh_cnt_id = get_swh_persistent_id('content', sha1_git) + swh_cnt_id_url = reverse('browse-swh-id', + url_args={'swh_id': swh_cnt_id}) + assert_contains(resp, swh_cnt_id) + assert_contains(resp, swh_cnt_id_url) - swh_cnt_id = get_swh_persistent_id('content', sha1_git) - swh_cnt_id_url = reverse('browse-swh-id', - url_args={'swh_id': swh_cnt_id}) - self.assertContains(resp, swh_cnt_id) - self.assertContains(resp, swh_cnt_id_url) - @given(content_text_no_highlight()) - def test_content_view_text_no_highlight(self, content): +@given(content_text_no_highlight()) +def test_content_view_text_no_highlight(client, archive_data, content): + sha1_git = content['sha1_git'] - sha1_git = content['sha1_git'] + url = reverse('browse-content', + url_args={'query_string': content['sha1']}) - url = reverse('browse-content', + url_raw = reverse('browse-content-raw', url_args={'query_string': content['sha1']}) - url_raw = reverse('browse-content-raw', - url_args={'query_string': content['sha1']}) + resp = client.get(url) - resp = self.client.get(url) + content_display = _process_content_for_display(archive_data, content) - content_display = self._process_content_for_display(content) + assert resp.status_code == 200 + assert_template_used('browse/content.html') - self.assertEqual(resp.status_code, 200) - self.assertTemplateUsed('browse/content.html') + assert_contains(resp, '') + assert_contains(resp, escape(content_display['content_data'])) + assert_contains(resp, url_raw) - self.assertContains(resp, '') - self.assertContains(resp, escape(content_display['content_data'])) # noqa - self.assertContains(resp, url_raw) + swh_cnt_id = get_swh_persistent_id('content', sha1_git) + swh_cnt_id_url = reverse('browse-swh-id', + url_args={'swh_id': swh_cnt_id}) - swh_cnt_id = get_swh_persistent_id('content', sha1_git) - swh_cnt_id_url = reverse('browse-swh-id', - url_args={'swh_id': swh_cnt_id}) + assert_contains(resp, swh_cnt_id) + assert_contains(resp, swh_cnt_id_url) - self.assertContains(resp, swh_cnt_id) - self.assertContains(resp, swh_cnt_id_url) - @given(content_text_non_utf8()) - def test_content_view_no_utf8_text(self, content): +@given(content_text_non_utf8()) +def test_content_view_no_utf8_text(client, archive_data, content): + sha1_git = content['sha1_git'] - sha1_git = content['sha1_git'] + url = reverse('browse-content', + url_args={'query_string': content['sha1']}) - url = reverse('browse-content', - url_args={'query_string': content['sha1']}) + resp = client.get(url) - resp = self.client.get(url) + content_display = _process_content_for_display(archive_data, content) - content_display = self._process_content_for_display(content) + assert resp.status_code == 200 + assert_template_used('browse/content.html') + swh_cnt_id = get_swh_persistent_id('content', sha1_git) + swh_cnt_id_url = reverse('browse-swh-id', + url_args={'swh_id': swh_cnt_id}) + assert_contains(resp, swh_cnt_id_url) + assert_contains(resp, escape(content_display['content_data'])) - self.assertEqual(resp.status_code, 200) - self.assertTemplateUsed('browse/content.html') - swh_cnt_id = get_swh_persistent_id('content', sha1_git) - swh_cnt_id_url = reverse('browse-swh-id', - url_args={'swh_id': swh_cnt_id}) - self.assertContains(resp, swh_cnt_id_url) - self.assertContains(resp, escape(content_display['content_data'])) - @given(content_image_type()) - def test_content_view_image(self, content): +@given(content_image_type()) +def test_content_view_image(client, archive_data, content): + url = reverse('browse-content', + url_args={'query_string': content['sha1']}) - url = reverse('browse-content', + url_raw = reverse('browse-content-raw', url_args={'query_string': content['sha1']}) - url_raw = reverse('browse-content-raw', - url_args={'query_string': content['sha1']}) - - resp = self.client.get(url) + resp = client.get(url) - content_display = self._process_content_for_display(content) - mimetype = content_display['mimetype'] - content_data = content_display['content_data'] + content_display = _process_content_for_display(archive_data, content) + mimetype = content_display['mimetype'] + content_data = content_display['content_data'] - self.assertEqual(resp.status_code, 200) - self.assertTemplateUsed('browse/content.html') - self.assertContains(resp, '' - % (mimetype, content_data)) - self.assertContains(resp, url_raw) + assert resp.status_code == 200 + assert_template_used('browse/content.html') + assert_contains(resp, '' + % (mimetype, content_data)) + assert_contains(resp, url_raw) - @given(content_text()) - def test_content_view_text_with_path(self, content): - path = content['path'] +@given(content_text()) +def test_content_view_text_with_path(client, archive_data, content): + path = content['path'] - url = reverse('browse-content', - url_args={'query_string': content['sha1']}, - query_params={'path': path}) + url = reverse('browse-content', + url_args={'query_string': content['sha1']}, + query_params={'path': path}) - resp = self.client.get(url) - self.assertEqual(resp.status_code, 200) - self.assertTemplateUsed('browse/content.html') + resp = client.get(url) + assert resp.status_code == 200 + assert_template_used('browse/content.html') - self.assertContains(resp, '