diff --git a/swh/web/common/exc.py b/swh/web/common/exc.py index a68d6d18..71bdd08f 100644 --- a/swh/web/common/exc.py +++ b/swh/web/common/exc.py @@ -1,145 +1,148 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import traceback import sentry_sdk from django.shortcuts import render from django.utils.html import escape from django.utils.safestring import mark_safe from swh.web.config import get_config class BadInputExc(ValueError): """Wrong request to the api. Example: Asking a content with the wrong identifier format. """ pass class NotFoundExc(Exception): """Good request to the api but no result were found. Example: Asking a content with the right identifier format but that content does not exist. """ pass class ForbiddenExc(Exception): """Good request to the api, forbidden result to return due to enforce policy. Example: Asking for a raw content which exists but whose mimetype is not text. """ pass class LargePayloadExc(Exception): """The input size is too large. Example: Asking to resolve 10000 SWHIDs when the limit is 1000. """ pass http_status_code_message = { 400: "Bad Request", 401: "Unauthorized", 403: "Access Denied", 404: "Resource not found", 413: "Payload Too Large", 500: "Internal Server Error", 501: "Not Implemented", 502: "Bad Gateway", 503: "Service unavailable", } def _generate_error_page(request, error_code, error_description): return render( request, "error.html", { "error_code": error_code, "error_message": http_status_code_message[error_code], "error_description": mark_safe(error_description), }, status=error_code, ) def swh_handle400(request, exception=None): """ Custom Django HTTP error 400 handler for swh-web. """ error_description = ( "The server cannot process the request to %s due to " "something that is perceived to be a client error." % escape(request.META["PATH_INFO"]) ) return _generate_error_page(request, 400, error_description) def swh_handle403(request, exception=None): """ Custom Django HTTP error 403 handler for swh-web. """ error_description = "The resource %s requires an authentication." % escape( request.META["PATH_INFO"] ) return _generate_error_page(request, 403, error_description) def swh_handle404(request, exception=None): """ Custom Django HTTP error 404 handler for swh-web. """ error_description = "The resource %s could not be found on the server." % escape( request.META["PATH_INFO"] ) return _generate_error_page(request, 404, error_description) def swh_handle500(request): """ Custom Django HTTP error 500 handler for swh-web. """ error_description = ( "An unexpected condition was encountered when " "requesting resource %s." % escape(request.META["PATH_INFO"]) ) return _generate_error_page(request, 500, error_description) def handle_view_exception(request, exc): """ Function used to generate an error page when an exception was raised inside a swh-web browse view. """ sentry_sdk.capture_exception(exc) error_code = 500 error_description = "%s: %s" % (type(exc).__name__, str(exc)) if get_config()["debug"]: error_description = traceback.format_exc() if isinstance(exc, BadInputExc): error_code = 400 if isinstance(exc, ForbiddenExc): error_code = 403 if isinstance(exc, NotFoundExc): error_code = 404 - return _generate_error_page(request, error_code, error_description) + resp = _generate_error_page(request, error_code, error_description) + if get_config()["debug"]: + resp.traceback = error_description + return resp diff --git a/swh/web/settings/tests.py b/swh/web/settings/tests.py index ae254178..b4e50124 100644 --- a/swh/web/settings/tests.py +++ b/swh/web/settings/tests.py @@ -1,108 +1,109 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django tests settings for swh-web. """ import os import sys from swh.web.config import get_config scope1_limiter_rate = 3 scope1_limiter_rate_post = 1 scope2_limiter_rate = 5 scope2_limiter_rate_post = 2 scope3_limiter_rate = 1 scope3_limiter_rate_post = 1 save_origin_rate_post = 10 swh_web_config = get_config() swh_web_config.update( { - "debug": False, + # disable django debug mode when running cypress tests + "debug": "pytest" in sys.argv[0] or "PYTEST_XDIST_WORKER" in os.environ, "secret_key": "test", "history_counters_url": "", "throttling": { "cache_uri": None, "scopes": { "swh_api": { "limiter_rate": {"default": "60/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_search": { "limiter_rate": {"default": "100/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_visit_latest": { "limiter_rate": {"default": "6000/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_vault_cooking": { "limiter_rate": {"default": "120/h", "GET": "60/m"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_save_origin": { "limiter_rate": { "default": "120/h", "POST": "%s/h" % save_origin_rate_post, } }, "scope1": { "limiter_rate": { "default": "%s/min" % scope1_limiter_rate, "POST": "%s/min" % scope1_limiter_rate_post, } }, "scope2": { "limiter_rate": { "default": "%s/min" % scope2_limiter_rate, "POST": "%s/min" % scope2_limiter_rate_post, } }, "scope3": { "limiter_rate": { "default": "%s/min" % scope3_limiter_rate, "POST": "%s/min" % scope3_limiter_rate_post, }, "exempted_networks": ["127.0.0.0/8"], }, }, }, "keycloak": { "server_url": "http://localhost:8080/auth", "realm_name": "SoftwareHeritage", }, } ) from .common import * # noqa from .common import ALLOWED_HOSTS, LOGGING # noqa, isort: skip DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": swh_web_config["test_db"], } } # when not running unit tests, make the webapp fetch data from memory storages if "pytest" not in sys.argv[0] and "PYTEST_XDIST_WORKER" not in os.environ: swh_web_config.update({"debug": True, "e2e_tests_mode": True}) from swh.web.tests.data import get_tests_data, override_storages test_data = get_tests_data() override_storages( test_data["storage"], test_data["idx_storage"], test_data["search"] ) else: ALLOWED_HOSTS += ["testserver"] # Silent DEBUG output when running unit tests LOGGING["handlers"]["console"]["level"] = "INFO" # type: ignore diff --git a/swh/web/tests/admin/test_origin_save.py b/swh/web/tests/admin/test_origin_save.py index 3c3b4480..ad9e3d00 100644 --- a/swh/web/tests/admin/test_origin_save.py +++ b/swh/web/tests/admin/test_origin_save.py @@ -1,231 +1,219 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from urllib.parse import unquote import pytest from django.contrib.auth import get_user_model from swh.web.common.models import ( SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_PENDING, SAVE_REQUEST_REJECTED, SAVE_TASK_NOT_YET_SCHEDULED, SaveAuthorizedOrigin, SaveOriginRequest, SaveUnauthorizedOrigin, ) from swh.web.common.origin_save import can_save_origin from swh.web.common.utils import reverse +from swh.web.tests.utils import check_http_get_response, check_http_post_response _user_name = "swh-web-admin" _user_mail = "admin@swh-web.org" _user_password = "..34~pounds~BEAUTY~march~63.." _authorized_origin_url = "https://scm.ourproject.org/anonscm/" _unauthorized_origin_url = "https://www.softwareheritage.org/" pytestmark = pytest.mark.django_db @pytest.fixture(autouse=True) def populated_db(): User = get_user_model() user = User.objects.create_user(_user_name, _user_mail, _user_password) user.is_staff = True user.save() SaveAuthorizedOrigin.objects.create(url=_authorized_origin_url) SaveUnauthorizedOrigin.objects.create(url=_unauthorized_origin_url) def check_not_login(client, url): login_url = reverse("login", query_params={"next": url}) - response = client.post(url) - assert response.status_code == 302 - assert unquote(response.url) == login_url + + resp = check_http_post_response(client, url, status_code=302) + assert unquote(resp.url) == login_url def test_add_authorized_origin_url(client): authorized_url = "https://scm.adullact.net/anonscm/" assert can_save_origin(authorized_url) == SAVE_REQUEST_PENDING url = reverse( "admin-origin-save-add-authorized-url", url_args={"origin_url": authorized_url} ) check_not_login(client, url) assert can_save_origin(authorized_url) == SAVE_REQUEST_PENDING client.login(username=_user_name, password=_user_password) - response = client.post(url) - assert response.status_code == 200 + + check_http_post_response(client, url, status_code=200) assert can_save_origin(authorized_url) == SAVE_REQUEST_ACCEPTED def test_remove_authorized_origin_url(client): assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_ACCEPTED url = reverse( "admin-origin-save-remove-authorized-url", url_args={"origin_url": _authorized_origin_url}, ) check_not_login(client, url) assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_ACCEPTED client.login(username=_user_name, password=_user_password) - response = client.post(url) - assert response.status_code == 200 + check_http_post_response(client, url, status_code=200) assert can_save_origin(_authorized_origin_url) == SAVE_REQUEST_PENDING def test_add_unauthorized_origin_url(client): unauthorized_url = "https://www.yahoo./" assert can_save_origin(unauthorized_url) == SAVE_REQUEST_PENDING url = reverse( "admin-origin-save-add-unauthorized-url", url_args={"origin_url": unauthorized_url}, ) check_not_login(client, url) assert can_save_origin(unauthorized_url) == SAVE_REQUEST_PENDING client.login(username=_user_name, password=_user_password) - response = client.post(url) - assert response.status_code == 200 + check_http_post_response(client, url, status_code=200) assert can_save_origin(unauthorized_url) == SAVE_REQUEST_REJECTED def test_remove_unauthorized_origin_url(client): assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_REJECTED url = reverse( "admin-origin-save-remove-unauthorized-url", url_args={"origin_url": _unauthorized_origin_url}, ) check_not_login(client, url) assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_REJECTED client.login(username=_user_name, password=_user_password) - response = client.post(url) - assert response.status_code == 200 + check_http_post_response(client, url, status_code=200) assert can_save_origin(_unauthorized_origin_url) == SAVE_REQUEST_PENDING def test_accept_pending_save_request(client, mocker): mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler") visit_type = "git" origin_url = "https://v2.pikacode.com/bthate/botlib.git" save_request_url = reverse( "api-1-save-origin", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) - response = client.post( - save_request_url, data={}, content_type="application/x-www-form-urlencoded" - ) - assert response.status_code == 200 + response = check_http_post_response(client, save_request_url, status_code=200) assert response.data["save_request_status"] == SAVE_REQUEST_PENDING accept_request_url = reverse( "admin-origin-save-request-accept", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) check_not_login(client, accept_request_url) tasks_data = [ { "priority": "high", "policy": "oneshot", "type": "load-git", "arguments": {"kwargs": {"repo_url": origin_url}, "args": []}, "status": "next_run_not_scheduled", "id": 1, } ] mock_scheduler.create_tasks.return_value = tasks_data mock_scheduler.get_tasks.return_value = tasks_data client.login(username=_user_name, password=_user_password) - response = client.post(accept_request_url) - assert response.status_code == 200 + response = check_http_post_response(client, accept_request_url, status_code=200) - response = client.get(save_request_url) - assert response.status_code == 200 + response = check_http_get_response(client, save_request_url, status_code=200) assert response.data[0]["save_request_status"] == SAVE_REQUEST_ACCEPTED assert response.data[0]["save_task_status"] == SAVE_TASK_NOT_YET_SCHEDULED def test_reject_pending_save_request(client, mocker): mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler") visit_type = "git" origin_url = "https://wikipedia.com" save_request_url = reverse( "api-1-save-origin", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) - response = client.post( - save_request_url, data={}, content_type="application/x-www-form-urlencoded" - ) - assert response.status_code == 200 + + response = check_http_post_response(client, save_request_url, status_code=200) assert response.data["save_request_status"] == SAVE_REQUEST_PENDING reject_request_url = reverse( "admin-origin-save-request-reject", url_args={"visit_type": visit_type, "origin_url": origin_url}, ) check_not_login(client, reject_request_url) client.login(username=_user_name, password=_user_password) - response = client.post(reject_request_url) - assert response.status_code == 200 + response = check_http_post_response(client, reject_request_url, status_code=200) tasks_data = [ { "priority": "high", "policy": "oneshot", "type": "load-git", "arguments": {"kwargs": {"repo_url": origin_url}, "args": []}, "status": "next_run_not_scheduled", "id": 1, } ] mock_scheduler.create_tasks.return_value = tasks_data mock_scheduler.get_tasks.return_value = tasks_data - response = client.get(save_request_url) - assert response.status_code == 200 + response = check_http_get_response(client, save_request_url, status_code=200) assert response.data[0]["save_request_status"] == SAVE_REQUEST_REJECTED def test_remove_save_request(client): sor = SaveOriginRequest.objects.create( visit_type="git", origin_url="https://wikipedia.com", status=SAVE_REQUEST_PENDING, ) assert SaveOriginRequest.objects.count() == 1 remove_request_url = reverse( "admin-origin-save-request-remove", url_args={"sor_id": sor.id} ) check_not_login(client, remove_request_url) client.login(username=_user_name, password=_user_password) - response = client.post(remove_request_url) - assert response.status_code == 200 + check_http_post_response(client, remove_request_url, status_code=200) assert SaveOriginRequest.objects.count() == 0 diff --git a/swh/web/tests/api/test_apidoc.py b/swh/web/tests/api/test_apidoc.py index cce65553..c2e5d554 100644 --- a/swh/web/tests/api/test_apidoc.py +++ b/swh/web/tests/api/test_apidoc.py @@ -1,494 +1,488 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import textwrap import pytest from rest_framework.response import Response from swh.storage.exc import StorageAPIError, StorageDBError from swh.web.api.apidoc import _parse_httpdomain_doc, api_doc from swh.web.api.apiurls import api_route from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc from swh.web.common.utils import prettify_html, reverse -from swh.web.tests.django_asserts import assert_template_used +from swh.web.tests.utils import check_api_get_responses, check_html_get_response _httpdomain_doc = """ .. http:get:: /api/1/revision/(sha1_git)/ Get information about a revision in the archive. Revisions are identified by **sha1** checksums, compatible with Git commit identifiers. See :func:`swh.model.identifiers.revision_identifier` in our data model module for details about how they are computed. :param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request :json object author: information about the author of the revision :>json object committer: information about the committer of the revision :>json string committer_date: ISO representation of the commit date (in UTC) :>json string date: ISO representation of the revision date (in UTC) :>json string directory: the unique identifier that revision points to :>json string directory_url: link to :http:get:`/api/1/directory/(sha1_git)/[(path)/]` to get information about the directory associated to the revision :>json string id: the revision unique identifier :>json boolean merge: whether or not the revision corresponds to a merge commit :>json string message: the message associated to the revision :>json array parents: the parents of the revision, i.e. the previous revisions that head directly to it, each entry of that array contains an unique parent revision identifier but also a link to :http:get:`/api/1/revision/(sha1_git)/` to get more information about it :>json string type: the type of the revision :statuscode 200: no error :statuscode 400: an invalid **sha1_git** value has been provided :statuscode 404: requested revision can not be found in the archive **Request:** .. parsed-literal:: :swh_web_api:`revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/` """ _exception_http_code = { BadInputExc: 400, ForbiddenExc: 403, NotFoundExc: 404, Exception: 500, StorageAPIError: 503, StorageDBError: 503, } def test_apidoc_nodoc_failure(): with pytest.raises(Exception): @api_doc("/my/nodoc/url/") def apidoc_nodoc_tester(request, arga=0, argb=0): return Response(arga + argb) @api_route(r"/some/(?P[0-9]+)/(?P[0-9]+)/", "api-1-some-doc-route") @api_doc("/some/doc/route/") def apidoc_route(request, myarg, myotherarg, akw=0): """ Sample doc """ return {"result": int(myarg) + int(myotherarg) + akw} def test_apidoc_route_doc(client): url = reverse("api-1-some-doc-route-doc") - rv = client.get(url, HTTP_ACCEPT="text/html") - - assert rv.status_code == 200, rv.content - assert_template_used(rv, "api/apidoc.html") + check_html_get_response( + client, url, status_code=200, template_used="api/apidoc.html" + ) def test_apidoc_route_fn(api_client): url = reverse("api-1-some-doc-route", url_args={"myarg": 1, "myotherarg": 1}) - rv = api_client.get(url) - assert rv.status_code == 200, rv.data + check_api_get_responses(api_client, url, status_code=200) @api_route(r"/test/error/(?P.+)/", "api-1-test-error") @api_doc("/test/error/") def apidoc_test_error_route(request, exc_name): """ Sample doc """ for e in _exception_http_code.keys(): if e.__name__ == exc_name: raise e("Error") def test_apidoc_error(api_client): for exc, code in _exception_http_code.items(): url = reverse("api-1-test-error", url_args={"exc_name": exc.__name__}) - rv = api_client.get(url) - - assert rv.status_code == code, rv.data + check_api_get_responses(api_client, url, status_code=code) @api_route( r"/some/full/(?P[0-9]+)/(?P[0-9]+)/", "api-1-some-complete-doc-route", ) @api_doc("/some/complete/doc/route/") def apidoc_full_stack(request, myarg, myotherarg, akw=0): """ Sample doc """ return {"result": int(myarg) + int(myotherarg) + akw} def test_apidoc_full_stack_doc(client): url = reverse("api-1-some-complete-doc-route-doc") - rv = client.get(url, HTTP_ACCEPT="text/html") - assert rv.status_code == 200, rv.content - assert_template_used(rv, "api/apidoc.html") + check_html_get_response( + client, url, status_code=200, template_used="api/apidoc.html" + ) def test_apidoc_full_stack_fn(api_client): url = reverse( "api-1-some-complete-doc-route", url_args={"myarg": 1, "myotherarg": 1} ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data + check_api_get_responses(api_client, url, status_code=200) @api_route(r"/test/post/only/", "api-1-test-post-only", methods=["POST"]) @api_doc("/test/post/only/") def apidoc_test_post_only(request, exc_name): """ Sample doc """ return {"result": "some data"} def test_apidoc_post_only(client): # a dedicated view accepting GET requests should have # been created to display the HTML documentation url = reverse("api-1-test-post-only-doc") - rv = client.get(url, HTTP_ACCEPT="text/html") - assert rv.status_code == 200, rv.content - assert_template_used(rv, "api/apidoc.html") + check_html_get_response( + client, url, status_code=200, template_used="api/apidoc.html" + ) def test_api_doc_parse_httpdomain(): doc_data = { "description": "", "urls": [], "args": [], "params": [], "resheaders": [], "reqheaders": [], "input_type": "", "inputs": [], "return_type": "", "returns": [], "status_codes": [], "examples": [], } _parse_httpdomain_doc(_httpdomain_doc, doc_data) expected_urls = [ { "rule": "/api/1/revision/ **\\(sha1_git\\)** /", "methods": ["GET", "HEAD", "OPTIONS"], } ] assert "urls" in doc_data assert doc_data["urls"] == expected_urls expected_description = ( "Get information about a revision in the archive. " "Revisions are identified by **sha1** checksums, " "compatible with Git commit identifiers. See " "**swh.model.identifiers.revision_identifier** in " "our data model module for details about how they " "are computed." ) assert "description" in doc_data assert doc_data["description"] == expected_description expected_args = [ { "name": "sha1_git", "type": "string", "doc": ( "hexadecimal representation of the revision " "**sha1_git** identifier" ), } ] assert "args" in doc_data assert doc_data["args"] == expected_args expected_params = [] assert "params" in doc_data assert doc_data["params"] == expected_params expected_reqheaders = [ { "doc": ( "the requested response content type, either " "``application/json`` (default) or ``application/yaml``" ), "name": "Accept", } ] assert "reqheaders" in doc_data assert doc_data["reqheaders"] == expected_reqheaders expected_resheaders = [ {"doc": "this depends on **Accept** header of request", "name": "Content-Type"} ] assert "resheaders" in doc_data assert doc_data["resheaders"] == expected_resheaders expected_statuscodes = [ {"code": "200", "doc": "no error"}, {"code": "400", "doc": "an invalid **sha1_git** value has been provided"}, {"code": "404", "doc": "requested revision can not be found in the archive"}, ] assert "status_codes" in doc_data assert doc_data["status_codes"] == expected_statuscodes expected_input_type = "object" assert "input_type" in doc_data assert doc_data["input_type"] == expected_input_type expected_inputs = [ {"name": "n", "type": "int", "doc": "sample input integer"}, {"name": "s", "type": "string", "doc": "sample input string"}, {"name": "a", "type": "array", "doc": "sample input array"}, ] assert "inputs" in doc_data assert doc_data["inputs"] == expected_inputs expected_return_type = "object" assert "return_type" in doc_data assert doc_data["return_type"] == expected_return_type expected_returns = [ { "name": "author", "type": "object", "doc": "information about the author of the revision", }, { "name": "committer", "type": "object", "doc": "information about the committer of the revision", }, { "name": "committer_date", "type": "string", "doc": "ISO representation of the commit date (in UTC)", }, { "name": "date", "type": "string", "doc": "ISO representation of the revision date (in UTC)", }, { "name": "directory", "type": "string", "doc": "the unique identifier that revision points to", }, { "name": "directory_url", "type": "string", "doc": ( "link to `/api/1/directory/ `_ " "to get information about the directory associated to " "the revision" ), }, {"name": "id", "type": "string", "doc": "the revision unique identifier"}, { "name": "merge", "type": "boolean", "doc": "whether or not the revision corresponds to a merge commit", }, { "name": "message", "type": "string", "doc": "the message associated to the revision", }, { "name": "parents", "type": "array", "doc": ( "the parents of the revision, i.e. the previous revisions " "that head directly to it, each entry of that array " "contains an unique parent revision identifier but also a " "link to `/api/1/revision/ `_ " "to get more information about it" ), }, {"name": "type", "type": "string", "doc": "the type of the revision"}, ] assert "returns" in doc_data assert doc_data["returns"] == expected_returns expected_examples = ["/api/1/revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/"] assert "examples" in doc_data assert doc_data["examples"] == expected_examples @api_route(r"/post/endpoint/", "api-1-post-endpoint", methods=["POST"]) @api_doc("/post/endpoint/") def apidoc_test_post_endpoint(request): """ .. http:post:: /api/1/post/endpoint/ Endpoint documentation :json object : an object whose keys are input SWHIDs and values objects with the following keys: * **known (bool)**: whether the object was found """ pass def test_apidoc_input_output_doc(client): url = reverse("api-1-post-endpoint-doc") - rv = client.get(url, HTTP_ACCEPT="text/html") - assert rv.status_code == 200, rv.content - assert_template_used(rv, "api/apidoc.html") + rv = check_html_get_response( + client, url, status_code=200, template_used="api/apidoc.html" + ) input_html_doc = textwrap.indent( ( '
\n' '
\n' " array\n" "
\n" '
\n' "

\n" " Input array of SWHIDs\n" "

\n" "
\n" "
\n" ), " " * 7, ) output_html_doc = textwrap.indent( ( '
\n' '
\n' " object\n" "
\n" '
\n' "

\n" " an object containing the following keys:\n" "

\n" '
\n' "
\n" "
    \n" "
  • \n" "

    \n" " \n" " <swhid> (object)\n" " \n" " : an object whose keys are input SWHIDs" " and values objects with the following keys:\n" "

    \n" "
    \n" '
      \n' "
    • \n" "

      \n" " \n" " known (bool)\n" " \n" " : whether the object was found\n" "

      \n" "
    • \n" "
    \n" "
    \n" "
  • \n" "
\n" "
\n" "
\n" "
\n" "
\n" ), " " * 7, ) html = prettify_html(rv.content) assert input_html_doc in html assert output_html_doc in html @api_route(r"/endpoint/links/in/doc/", "api-1-endpoint-links-in-doc") @api_doc("/endpoint/links/in/doc/") def apidoc_test_endpoint_with_links_in_doc(request): """ .. http:get:: /api/1/post/endpoint/ Endpoint documentation with links to :http:get:`/api/1/content/[(hash_type):](hash)/`, :http:get:`/api/1/directory/(sha1_git)/[(path)/]` and `archive `_. """ pass def test_apidoc_with_links(client): url = reverse("api-1-endpoint-links-in-doc") - rv = client.get(url, HTTP_ACCEPT="text/html") - assert rv.status_code == 200, rv.content - assert_template_used(rv, "api/apidoc.html") + rv = check_html_get_response( + client, url, status_code=200, template_used="api/apidoc.html" + ) html = prettify_html(rv.content) first_link = textwrap.indent( ( '\n' " /api/1/content/\n" "" ), " " * 9, ) second_link = textwrap.indent( ( '\n' " /api/1/directory/\n" "" ), " " * 9, ) third_link = textwrap.indent( ( '\n' " archive\n" "" ), " " * 9, ) assert first_link in html assert second_link in html assert third_link in html diff --git a/swh/web/tests/api/test_apiresponse.py b/swh/web/tests/api/test_apiresponse.py index 5f84c6cb..fc7def3c 100644 --- a/swh/web/tests/api/test_apiresponse.py +++ b/swh/web/tests/api/test_apiresponse.py @@ -1,140 +1,142 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from swh.web.api.apiresponse import ( compute_link_header, filter_by_fields, make_api_response, transform, ) from swh.web.common.utils import reverse from swh.web.tests.django_asserts import assert_contains def test_compute_link_header(): next_link = "/api/endpoint/next" prev_link = "/api/endpoint/prev" rv = { "headers": {"link-next": next_link, "link-prev": prev_link}, "results": [1, 2, 3], } options = {} headers = compute_link_header(rv, options) assert headers == { "Link": (f'<{next_link}>; rel="next",' f'<{prev_link}>; rel="previous"') } def test_compute_link_header_nothing_changed(): rv = {} options = {} headers = compute_link_header(rv, options) assert headers == {} def test_compute_link_header_nothing_changed_2(): rv = {"headers": {}} options = {} headers = compute_link_header(rv, options) assert headers == {} def test_transform_only_return_results_1(): rv = {"results": {"some-key": "some-value"}} assert transform(rv) == {"some-key": "some-value"} def test_transform_only_return_results_2(): rv = {"headers": {"something": "do changes"}, "results": {"some-key": "some-value"}} assert transform(rv) == {"some-key": "some-value"} def test_transform_do_remove_headers(): rv = {"headers": {"something": "do changes"}, "some-key": "some-value"} assert transform(rv) == {"some-key": "some-value"} def test_transform_do_nothing(): rv = {"some-key": "some-value"} assert transform(rv) == {"some-key": "some-value"} def test_swh_multi_response_mimetype(mocker, api_request_factory): mock_shorten_path = mocker.patch("swh.web.api.apiresponse.shorten_path") mock_filter = mocker.patch("swh.web.api.apiresponse.filter_by_fields") mock_json = mocker.patch("swh.web.api.apiresponse.json") data = {"data": [12, 34], "id": "adc83b19e793491b1c6ea0fd8b46cd9f32e592fc"} mock_filter.return_value = data mock_shorten_path.return_value = "my_short_path" mock_json.dumps.return_value = json.dumps(data) accepted_response_formats = { "html": "text/html", "yaml": "application/yaml", "json": "application/json", } for resp_format in accepted_response_formats: request = api_request_factory.get("/api/test/path/") content_type = accepted_response_formats[resp_format] setattr(request, "accepted_media_type", content_type) rv = make_api_response(request, data) mock_filter.assert_called_with(request, data) if resp_format != "html": assert rv.status_code == 200, rv.data assert rv.data == data else: assert rv.status_code == 200, rv.content assert_contains(rv, json.dumps(data)) def test_swh_filter_renderer_do_nothing(api_request_factory): input_data = {"a": "some-data"} request = api_request_factory.get("/api/test/path/", data={}) setattr(request, "query_params", request.GET) actual_data = filter_by_fields(request, input_data) assert actual_data == input_data def test_swh_filter_renderer_do_filter(mocker, api_request_factory): mock_ffk = mocker.patch("swh.web.api.apiresponse.utils.filter_field_keys") mock_ffk.return_value = {"a": "some-data"} request = api_request_factory.get("/api/test/path/", data={"fields": "a,c"}) setattr(request, "query_params", request.GET) input_data = {"a": "some-data", "b": "some-other-data"} actual_data = filter_by_fields(request, input_data) assert actual_data == {"a": "some-data"} mock_ffk.assert_called_once_with(input_data, {"a", "c"}) def test_error_response_handler(mocker, api_client): mock_archive = mocker.patch("swh.web.api.views.stat.archive") mock_archive.stat_counters.side_effect = Exception("Something went wrong") url = reverse("api-1-stat-counters") resp = api_client.get(url) assert resp.status_code == 500 + assert "traceback" in resp.data + assert "Traceback" in resp.data["traceback"] diff --git a/swh/web/tests/api/views/__init__.py b/swh/web/tests/api/views/__init__.py index f2f97daa..e69de29b 100644 --- a/swh/web/tests/api/views/__init__.py +++ b/swh/web/tests/api/views/__init__.py @@ -1,71 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from typing import Any, Dict, Optional - -from rest_framework.response import Response -from rest_framework.test import APIClient - - -def check_api_get_responses( - api_client: APIClient, url: str, status_code: int -) -> Response: - """Helper function to check Web API responses to GET requests - for all accepted content types. - - Args: - api_client: DRF test client - url: Web API URL to check responses - status_code: expected HTTP status code - - Returns: - The Web API JSON response - """ - # check API Web UI - html_content_type = "text/html" - resp = api_client.get(url, HTTP_ACCEPT=html_content_type) - assert resp.status_code == status_code, resp.content - assert resp["Content-Type"].startswith(html_content_type) - - # check YAML response - yaml_content_type = "application/yaml" - resp = api_client.get(url, HTTP_ACCEPT=yaml_content_type) - assert resp.status_code == status_code, resp.data - assert resp["Content-Type"] == yaml_content_type - - # check JSON response - resp = api_client.get(url) - assert resp.status_code == status_code, resp.data - assert resp["Content-Type"] == "application/json" - - return resp - - -def check_api_post_responses( - api_client: APIClient, url: str, data: Optional[Dict[str, Any]], status_code: int -) -> Response: - """Helper function to check Web API responses to POST requests - for all accepted content types. - - Args: - api_client: DRF test client - url: Web API URL to check responses - status_code: expected HTTP status code - - Returns: - The Web API JSON response - """ - # check YAML response - yaml_content_type = "application/yaml" - resp = api_client.post(url, data=data, format="json", HTTP_ACCEPT=yaml_content_type) - assert resp.status_code == status_code, resp.data - assert resp["Content-Type"] == yaml_content_type - - # check JSON response - resp = api_client.post(url, data=data, format="json") - assert resp.status_code == status_code, resp.data - assert resp["Content-Type"] == "application/json" - - return resp diff --git a/swh/web/tests/api/views/test_content.py b/swh/web/tests/api/views/test_content.py index bcc919e3..00ac81b7 100644 --- a/swh/web/tests/api/views/test_content.py +++ b/swh/web/tests/api/views/test_content.py @@ -1,328 +1,323 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given import pytest from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses, check_api_post_responses from swh.web.tests.conftest import ctags_json_missing, fossology_missing from swh.web.tests.data import random_content from swh.web.tests.strategies import content, contents_with_ctags +from swh.web.tests.utils import ( + check_api_get_responses, + check_api_post_responses, + check_http_get_response, +) @given(content()) def test_api_content_filetype(api_client, indexer_data, content): indexer_data.content_add_mimetype(content["sha1"]) url = reverse( "api-1-content-filetype", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) rv = check_api_get_responses(api_client, url, status_code=200) content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = indexer_data.content_get_mimetype(content["sha1"]) expected_data["content_url"] = content_url assert rv.data == expected_data def test_api_content_filetype_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-filetype", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No filetype information found for content " "sha1:%s." % unknown_content_["sha1"], } @pytest.mark.skip # Language indexer is disabled @given(content()) def test_api_content_language(api_client, indexer_data, content): indexer_data.content_add_language(content["sha1"]) url = reverse( "api-1-content-language", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) rv = check_api_get_responses(api_client, url, status_code=200) content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = indexer_data.content_get_language(content["sha1"]) expected_data["content_url"] = content_url assert rv.data == expected_data def test_api_content_language_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-language", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No language information found for content " "sha1:%s." % unknown_content_["sha1"], } @pytest.mark.skip # Language indexer is disabled @pytest.mark.skipif( ctags_json_missing, reason="requires ctags with json output support" ) @given(contents_with_ctags()) def test_api_content_symbol(api_client, indexer_data, contents_with_ctags): expected_data = {} for content_sha1 in contents_with_ctags["sha1s"]: indexer_data.content_add_ctags(content_sha1) for ctag in indexer_data.content_get_ctags(content_sha1): if ctag["name"] == contents_with_ctags["symbol_name"]: expected_data[content_sha1] = ctag break url = reverse( "api-1-content-symbol", url_args={"q": contents_with_ctags["symbol_name"]}, query_params={"per_page": 100}, ) rv = check_api_get_responses(api_client, url, status_code=200) for entry in rv.data: content_sha1 = entry["sha1"] expected_entry = expected_data[content_sha1] for key, view_name in ( ("content_url", "api-1-content"), ("data_url", "api-1-content-raw"), ("license_url", "api-1-content-license"), ("language_url", "api-1-content-language"), ("filetype_url", "api-1-content-filetype"), ): expected_entry[key] = reverse( view_name, url_args={"q": "sha1:%s" % content_sha1}, request=rv.wsgi_request, ) expected_entry["sha1"] = content_sha1 del expected_entry["id"] assert entry == expected_entry assert "Link" not in rv url = reverse( "api-1-content-symbol", url_args={"q": contents_with_ctags["symbol_name"]}, query_params={"per_page": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) next_url = ( reverse( "api-1-content-symbol", url_args={"q": contents_with_ctags["symbol_name"]}, query_params={"last_sha1": rv.data[1]["sha1"], "per_page": 2}, request=rv.wsgi_request, ), ) assert rv["Link"] == '<%s>; rel="next"' % next_url def test_api_content_symbol_not_found(api_client): url = reverse("api-1-content-symbol", url_args={"q": "bar"}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No indexed raw content match expression 'bar'.", } assert "Link" not in rv @pytest.mark.skipif( ctags_json_missing, reason="requires ctags with json output support" ) @given(content()) def test_api_content_ctags(api_client, indexer_data, content): indexer_data.content_add_ctags(content["sha1"]) url = reverse( "api-1-content-ctags", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) rv = check_api_get_responses(api_client, url, status_code=200) content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = list(indexer_data.content_get_ctags(content["sha1"])) for e in expected_data: e["content_url"] = content_url assert rv.data == expected_data @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_api_content_license(api_client, indexer_data, content): indexer_data.content_add_license(content["sha1"]) url = reverse( "api-1-content-license", url_args={"q": "sha1_git:%s" % content["sha1_git"]} ) rv = check_api_get_responses(api_client, url, status_code=200) content_url = reverse( "api-1-content", url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) expected_data = indexer_data.content_get_license(content["sha1"]) expected_data["content_url"] = content_url assert rv.data == expected_data def test_api_content_license_sha_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-license", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No license information found for content " "sha1:%s." % unknown_content_["sha1"], } @given(content()) def test_api_content_metadata(api_client, archive_data, content): url = reverse("api-1-content", {"q": "sha1:%s" % content["sha1"]}) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.content_get(content["sha1"]) for key, view_name in ( ("data_url", "api-1-content-raw"), ("license_url", "api-1-content-license"), ("language_url", "api-1-content-language"), ("filetype_url", "api-1-content-filetype"), ): expected_data[key] = reverse( view_name, url_args={"q": "sha1:%s" % content["sha1"]}, request=rv.wsgi_request, ) assert rv.data == expected_data def test_api_content_not_found(api_client): unknown_content_ = random_content() url = reverse("api-1-content", url_args={"q": "sha1:%s" % unknown_content_["sha1"]}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Content with sha1 checksum equals to %s not found!" % unknown_content_["sha1"], } def test_api_content_raw_ko_not_found(api_client): unknown_content_ = random_content() url = reverse( "api-1-content-raw", url_args={"q": "sha1:%s" % unknown_content_["sha1"]} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Content with sha1 checksum equals to %s not found!" % unknown_content_["sha1"], } @given(content()) def test_api_content_raw_text(api_client, archive_data, content): url = reverse("api-1-content-raw", url_args={"q": "sha1:%s" % content["sha1"]}) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data + rv = check_http_get_response(api_client, url, status_code=200) assert rv["Content-Type"] == "application/octet-stream" assert ( rv["Content-disposition"] == "attachment; filename=content_sha1_%s_raw" % content["sha1"] ) - assert rv["Content-Type"] == "application/octet-stream" expected_data = archive_data.content_get_data(content["sha1"]) assert rv.content == expected_data["data"] @given(content()) def test_api_content_raw_text_with_filename(api_client, archive_data, content): url = reverse( "api-1-content-raw", url_args={"q": "sha1:%s" % content["sha1"]}, query_params={"filename": "filename.txt"}, ) - rv = api_client.get(url) - - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/octet-stream" + rv = check_http_get_response(api_client, url, status_code=200) assert rv["Content-disposition"] == "attachment; filename=filename.txt" assert rv["Content-Type"] == "application/octet-stream" expected_data = archive_data.content_get_data(content["sha1"]) assert rv.content == expected_data["data"] @given(content()) def test_api_check_content_known(api_client, content): url = reverse("api-1-content-known", url_args={"q": content["sha1"]}) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == { "search_res": [{"found": True, "sha1": content["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 100.0}, } @given(content()) def test_api_check_content_known_post(api_client, content): url = reverse("api-1-content-known") rv = check_api_post_responses( api_client, url, data={"q": content["sha1"]}, status_code=200 ) assert rv.data == { "search_res": [{"found": True, "sha1": content["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 100.0}, } def test_api_check_content_known_not_found(api_client): unknown_content_ = random_content() url = reverse("api-1-content-known", url_args={"q": unknown_content_["sha1"]}) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == { "search_res": [{"found": False, "sha1": unknown_content_["sha1"]}], "search_stats": {"nbfiles": 1, "pct": 0.0}, } @given(content()) def test_api_content_uppercase(api_client, content): url = reverse( "api-1-content-uppercase-checksum", url_args={"q": content["sha1"].upper()} ) - rv = api_client.get(url) - assert rv.status_code == 302, rv.data - + rv = check_http_get_response(api_client, url, status_code=302) redirect_url = reverse("api-1-content", url_args={"q": content["sha1"]}) - assert rv["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_directory.py b/swh/web/tests/api/views/test_directory.py index cec21a5c..4540052d 100644 --- a/swh/web/tests/api/views/test_directory.py +++ b/swh/web/tests/api/views/test_directory.py @@ -1,80 +1,79 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.web.api.utils import enrich_directory from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import directory +from swh.web.tests.utils import check_api_get_responses, check_http_get_response @given(directory()) def test_api_directory(api_client, archive_data, directory): url = reverse("api-1-directory", url_args={"sha1_git": directory}) rv = check_api_get_responses(api_client, url, status_code=200) dir_content = list(archive_data.directory_ls(directory)) expected_data = list( map(enrich_directory, dir_content, [rv.wsgi_request] * len(dir_content)) ) assert rv.data == expected_data def test_api_directory_not_found(api_client): unknown_directory_ = random_sha1() url = reverse("api-1-directory", url_args={"sha1_git": unknown_directory_}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Directory with sha1_git %s not found" % unknown_directory_, } @given(directory()) def test_api_directory_with_path_found(api_client, archive_data, directory): directory_content = archive_data.directory_ls(directory) path = random.choice(directory_content) url = reverse( "api-1-directory", url_args={"sha1_git": directory, "path": path["name"]} ) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == enrich_directory(path, rv.wsgi_request) @given(directory()) def test_api_directory_with_path_not_found(api_client, directory): path = "some/path/to/nonexistent/dir/" url = reverse("api-1-directory", url_args={"sha1_git": directory, "path": path}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": ( "Directory entry with path %s from %s not found" % (path, directory) ), } @given(directory()) def test_api_directory_uppercase(api_client, directory): url = reverse( "api-1-directory-uppercase-checksum", url_args={"sha1_git": directory.upper()} ) - resp = api_client.get(url) - assert resp.status_code == 302 + resp = check_http_get_response(api_client, url, status_code=302) redirect_url = reverse("api-1-directory", url_args={"sha1_git": directory}) assert resp["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py index 29b7e4b7..c71d3d4b 100644 --- a/swh/web/tests/api/views/test_graph.py +++ b/swh/web/tests/api/views/test_graph.py @@ -1,245 +1,232 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import json import textwrap from hypothesis import given from swh.model.identifiers import ORIGIN, SNAPSHOT, swhid from swh.web.api.views.graph import API_GRAPH_PERM from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.auth.keycloak_mock import mock_keycloak from swh.web.tests.auth.sample_data import oidc_profile from swh.web.tests.strategies import origin +from swh.web.tests.utils import check_http_get_response def test_graph_endpoint_needs_authentication(api_client): url = reverse("api-1-graph", url_args={"graph_query": "stats"}) - resp = api_client.get(url) - assert resp.status_code == 401 + check_http_get_response(api_client, url, status_code=401) def _authenticate_graph_user(api_client, mocker): mock_keycloak(mocker, user_permissions=[API_GRAPH_PERM]) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") def test_graph_endpoint_needs_permission(api_client, mocker, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") mock_keycloak(mocker, user_permissions=[]) - resp = api_client.get(url) - assert resp.status_code == 403 + check_http_get_response(api_client, url, status_code=403) _authenticate_graph_user(api_client, mocker) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) - resp = api_client.get(url) - assert resp.status_code == 200 + check_http_get_response(api_client, url, status_code=200) def test_graph_text_plain_response(api_client, mocker, requests_mock): _authenticate_graph_user(api_client, mocker) graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323" response_text = textwrap.dedent( """\ swh:1:cnt:1d3dace0a825b0535c37c53ed669ef817e9c1b47 swh:1:cnt:6d5b280f4e33589ae967a7912a587dd5cb8dedaa swh:1:cnt:91bef238bf01356a550d416d14bb464c576ac6f4 swh:1:cnt:58a8b925a463b87d49639fda282b8f836546e396 swh:1:cnt:fd32ee0a87e16ccc853dfbeb7018674f9ce008c0 swh:1:cnt:ab7c39871872589a4fc9e249ebc927fb1042c90d swh:1:cnt:93073c02bf3869845977527de16af4d54765838d swh:1:cnt:4251f795b52c54c447a97c9fe904d8b1f993b1e0 swh:1:cnt:c6e7055424332006d07876ffeba684e7e284b383 swh:1:cnt:8459d8867dc3b15ef7ae9683e21cccc9ab2ec887 swh:1:cnt:5f9981d52202815aa947f85b9dfa191b66f51138 swh:1:cnt:00a685ec51bcdf398c15d588ecdedb611dbbab4b swh:1:cnt:e1cf1ea335106a0197a2f92f7804046425a7d3eb swh:1:cnt:07069b38087f88ec192d2c9aff75a502476fd17d swh:1:cnt:f045ee845c7f14d903a2c035b2691a7c400c01f0 """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": "text/plain"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) - resp = api_client.get(url) - - assert resp.status_code == 200 - assert resp.content_type == "text/plain" + resp = check_http_get_response( + api_client, url, status_code=200, content_type="text/plain" + ) assert resp.content == response_text.encode() _response_json = { "counts": {"nodes": 17075708289, "edges": 196236587976}, "ratios": { "compression": 0.16, "bits_per_node": 58.828, "bits_per_edge": 5.119, "avg_locality": 2184278529.729, }, "indegree": {"min": 0, "max": 263180117, "avg": 11.4921492364925}, "outdegree": {"min": 0, "max": 1033207, "avg": 11.4921492364925}, } def test_graph_json_response(api_client, mocker, requests_mock): _authenticate_graph_user(api_client, mocker) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) - resp = api_client.get(url) - - assert resp.status_code == 200 + resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() def test_graph_ndjson_response(api_client, mocker, requests_mock): _authenticate_graph_user(api_client, mocker) graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb" response_ndjson = textwrap.dedent( """\ ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:acfb7cabd63b368a03a9df87670ece1488c8bce0"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:2a0837708151d76edf28fdbb90dc3eabc676cff3"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:eaf025ad54b94b2fdda26af75594cfae3491ec75"] """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_ndjson, headers={"Content-Type": "application/x-ndjson"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) - resp = api_client.get(url) - - assert resp.status_code == 200 + resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/x-ndjson" assert resp.content == response_ndjson.encode() @given(origin()) def test_graph_response_resolve_origins( archive_data, api_client, mocker, requests_mock, origin ): hasher = hashlib.sha1() hasher.update(origin["url"].encode()) origin_sha1 = hasher.hexdigest() origin_swhid = str(swhid(ORIGIN, origin_sha1)) snapshot = archive_data.snapshot_get_latest(origin["url"])["id"] snapshot_swhid = str(swhid(SNAPSHOT, snapshot)) _authenticate_graph_user(api_client, mocker) for graph_query, response_text, content_type in ( ( f"visit/nodes/{snapshot_swhid}", f"{snapshot_swhid}\n{origin_swhid}\n", "text/plain", ), ( f"visit/edges/{snapshot_swhid}", f"{snapshot_swhid} {origin_swhid}\n", "text/plain", ), ( f"visit/paths/{snapshot_swhid}", f'["{snapshot_swhid}", "{origin_swhid}"]\n', "application/x-ndjson", ), ): # set two lines response to check resolved origins cache response_text = response_text + response_text requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": content_type}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward"}, ) - resp = api_client.get(url) - - assert resp.status_code == 200 + resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == content_type assert resp.content == response_text.encode() url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward", "resolve_origins": "true"}, ) - resp = api_client.get(url) - - assert resp.status_code == 200 + resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == content_type assert ( resp.content == response_text.replace(origin_swhid, origin["url"]).encode() ) def test_graph_response_resolve_origins_nothing_to_do( api_client, mocker, requests_mock ): _authenticate_graph_user(api_client, mocker) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"resolve_origins": "true"}, ) - resp = api_client.get(url) - - assert resp.status_code == 200 + resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py index e0a8f86f..070d4dfe 100644 --- a/swh/web/tests/api/views/test_identifiers.py +++ b/swh/web/tests/api/views/test_identifiers.py @@ -1,166 +1,166 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses, check_api_post_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( content, directory, origin, release, revision, snapshot, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ) +from swh.web.tests.utils import check_api_get_responses, check_api_post_responses @given(origin(), content(), directory(), release(), revision(), snapshot()) def test_swhid_resolve_success( api_client, client, origin, content, directory, release, revision, snapshot ): for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), (RELEASE, release), (REVISION, revision), (SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id, metadata={"origin": origin["url"]}) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) resp = check_api_get_responses(api_client, url, status_code=200) if obj_type == CONTENT: url_args = {"query_string": "sha1_git:%s" % obj_id} elif obj_type == SNAPSHOT: url_args = {"snapshot_id": obj_id} else: url_args = {"sha1_git": obj_id} browse_rev_url = reverse( "browse-%s" % obj_type, url_args=url_args, query_params={"origin_url": origin["url"]}, request=resp.wsgi_request, ) expected_result = { "browse_url": browse_rev_url, "metadata": {"origin": origin["url"]}, "namespace": "swh", "object_id": obj_id, "object_type": obj_type, "scheme_version": 1, } assert resp.data == expected_result def test_swhid_resolve_invalid(api_client): rev_id_invalid = "96db9023b8_foo_50d6c108e9a3" swhid = "swh:1:rev:%s" % rev_id_invalid url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) check_api_get_responses(api_client, url, status_code=400) @given( unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot(), ) def test_swhid_resolve_not_found( api_client, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): for obj_type, obj_id in ( (CONTENT, unknown_content["sha1_git"]), (DIRECTORY, unknown_directory), (RELEASE, unknown_release), (REVISION, unknown_revision), (SNAPSHOT, unknown_snapshot), ): swhid = gen_swhid(obj_type, obj_id) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) check_api_get_responses(api_client, url, status_code=404) def test_swh_origin_id_not_resolvable(api_client): ori_swhid = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb" url = reverse("api-1-resolve-swhid", url_args={"swhid": ori_swhid}) check_api_get_responses(api_client, url, status_code=400) @given(content(), directory()) def test_api_known_swhid_some_present(api_client, content, directory): content_ = gen_swhid(CONTENT, content["sha1_git"]) directory_ = gen_swhid(DIRECTORY, directory) unknown_revision_ = gen_swhid(REVISION, random_sha1()) unknown_release_ = gen_swhid(RELEASE, random_sha1()) unknown_snapshot_ = gen_swhid(SNAPSHOT, random_sha1()) input_swhids = [ content_, directory_, unknown_revision_, unknown_release_, unknown_snapshot_, ] url = reverse("api-1-known") resp = check_api_post_responses(api_client, url, data=input_swhids, status_code=200) assert resp.data == { content_: {"known": True}, directory_: {"known": True}, unknown_revision_: {"known": False}, unknown_release_: {"known": False}, unknown_snapshot_: {"known": False}, } def test_api_known_invalid_swhid(api_client): invalid_swhid_sha1 = ["swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13de;"] invalid_swhid_type = ["swh:1:cnn:8068d0075010b590762c6cb5682ed53cb3c13deb"] url = reverse("api-1-known") check_api_post_responses(api_client, url, data=invalid_swhid_sha1, status_code=400) check_api_post_responses(api_client, url, data=invalid_swhid_type, status_code=400) def test_api_known_raises_large_payload_error(api_client): random_swhid = "swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13deb" limit = 10000 err_msg = "The maximum number of SWHIDs this endpoint can receive is 1000" swhids = [random_swhid for i in range(limit)] url = reverse("api-1-known") resp = check_api_post_responses(api_client, url, data=swhids, status_code=413) assert resp.data == {"exception": "LargePayloadExc", "reason": err_msg} diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 55e906ab..572e35cd 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,682 +1,680 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta from hypothesis import given import pytest from requests.utils import parse_header_links from swh.model.model import Origin, OriginVisit, OriginVisitStatus from swh.storage.exc import StorageAPIError, StorageDBError from swh.storage.utils import now from swh.web.api.utils import enrich_origin, enrich_origin_visit from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.strategies import new_origin, new_snapshots, origin, visit_dates +from swh.web.tests.utils import check_api_get_responses def _scroll_results(api_client, url): """Iterates through pages of results, and returns them all.""" results = [] while True: - rv = api_client.get(url) - assert rv.status_code == 200, rv.data - assert rv["Content-Type"] == "application/json" + rv = check_api_get_responses(api_client, url, status_code=200) results.extend(rv.data) if "Link" in rv: for link in parse_header_links(rv["Link"]): if link["rel"] == "next": # Found link to next page of results url = link["url"] break else: # No link with 'rel=next' break else: # No Link header break return results def test_api_lookup_origin_visits_raise_error(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "voluntary error to check the bad request middleware." mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == {"exception": "BadInputExc", "reason": err_msg} def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage exploded! Will be back online shortly!" mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageDBError", "reason": "An unexpected error occurred in the backend: %s" % err_msg, } def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage API dropped dead! Will resurrect asap!" mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageAPIError", "reason": "An unexpected error occurred in the api backend: %s" % err_msg, } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:4]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_id = origin_visit.visit archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=visit_date + timedelta(minutes=5), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit", url_args={"origin_url": new_origin.url, "visit_id": visit_id}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_id) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add([new_origin]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No visit for origin %s found" % new_origin.url, } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_ids[0], date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_ids[1]) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot( api_client, archive_data, new_origin, visit_dates, new_snapshots ): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) # Add snapshot to the latest visit visit_id = visit_ids[-1] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_id, date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}, query_params={"require_snapshot": True}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_status_get_latest( new_origin.url, type="git", require_snapshot=True ) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(origin()) def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v["visit"] for v in all_visits]) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": max_visit_id + 1}, ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin %s or its visit with id %s not found!" % (origin["url"], max_visit_id + 1), } def test_api_origins_wrong_input(api_client, archive_data): """Should fail with 400 if the input is deprecated. """ # fail if wrong input url = reverse("api-1-origins", query_params={"origin_from": 1}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "Please use the Link header to browse through result", } def test_api_origins(api_client, archive_data): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} # Get only one url = reverse("api-1-origins", query_params={"origin_count": 1}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= origin_urls # Get all url = reverse("api-1-origins", query_params={"origin_count": len(origins)}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse("api-1-origins", query_params={"origin_count": len(origins) + 10}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls @pytest.mark.parametrize("origin_count", [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} url = reverse("api-1-origins", query_params={"origin_count": origin_count}) results = _scroll_results(api_client, url) assert len(results) == len(origins) assert {origin["url"] for origin in results} == origin_urls @given(origin()) def test_api_origin_by_url(api_client, archive_data, origin): origin_url = origin["url"] url = reverse("api-1-origin", url_args={"origin_url": origin_url}) rv = check_api_get_responses(api_client, url, status_code=200) expected_origin = archive_data.origin_get([origin_url])[0] expected_origin = enrich_origin(expected_origin, rv.wsgi_request) assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse("api-1-origin", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin with url %s not found!" % new_origin.url, } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } # Search for 'github.com', get only one url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 1}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= expected_origins # Search for 'github.com', get all url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins # Search for 'github.com', get more than available url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_words(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "com github"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "memononen libtess2"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } url = reverse( "api-1-origin-search", url_args={"url_pattern": "libtess2 memononen"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) @pytest.mark.parametrize("limit", [1, 2, 3, 10]) def test_api_origin_search_scroll(api_client, archive_data, mocker, limit, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": limit}, ) results = _scroll_results(api_client, url) assert {origin["url"] for origin in results} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_limit(api_client, archive_data, tests_data, mocker, backend): if backend == "swh-search": tests_data["search"].origin_update( [{"url": "http://foobar/{}".format(i)} for i in range(2000)] ) else: # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) archive_data.origin_add( [Origin(url="http://foobar/{}".format(i)) for i in range(2000)] ) url = reverse( "api-1-origin-search", url_args={"url_pattern": "foobar"}, query_params={"limit": 1050}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1000 @given(origin()) def test_api_origin_metadata_search(api_client, mocker, origin): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [ { "from_revision": ( b"p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed " b"\xf2U\xfa\x05B8" ), "metadata": {"author": "Jane Doe"}, "id": origin["url"], "tool": { "configuration": { "context": ["NpmMapping", "CodemetaMapping"], "type": "local", }, "id": 3, "name": "swh-metadata-detector", "version": "0.0.1", }, } ] url = reverse("api-1-origin-metadata-search", query_params={"fulltext": "Jane Doe"}) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = [ { "url": origin["url"], "metadata": { "metadata": {"author": "Jane Doe"}, "from_revision": ("7026b7c1a2af56521e951c01ed20f255fa054238"), "tool": { "configuration": { "context": ["NpmMapping", "CodemetaMapping"], "type": "local", }, "id": 3, "name": "swh-metadata-detector", "version": "0.0.1", }, }, } ] assert rv.data == expected_data oimsft.assert_called_with(conjunction=["Jane Doe"], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(api_client, mocker, origin): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [ { "from_revision": ( b"p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed " b"\xf2U\xfa\x05B8" ), "metadata": {"author": "Jane Doe"}, "id": origin["url"], "tool": { "configuration": { "context": ["NpmMapping", "CodemetaMapping"], "type": "local", }, "id": 3, "name": "swh-metadata-detector", "version": "0.0.1", }, } ] url = reverse("api-1-origin-metadata-search", query_params={"fulltext": "Jane Doe"}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=["Jane Doe"], limit=70) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": "Jane Doe", "limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=["Jane Doe"], limit=10) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": "Jane Doe", "limit": 987}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 oimsft.assert_called_with(conjunction=["Jane Doe"], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(api_client, mocker, origin): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") oimg = mock_idx_storage.origin_intrinsic_metadata_get oimg.side_effect = lambda origin_urls: [ { "from_revision": ( b"p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed " b"\xf2U\xfa\x05B8" ), "metadata": {"author": "Jane Doe"}, "id": origin["url"], "tool": { "configuration": { "context": ["NpmMapping", "CodemetaMapping"], "type": "local", }, "id": 3, "name": "swh-metadata-detector", "version": "0.0.1", }, } ] url = reverse( "api-origin-intrinsic-metadata", url_args={"origin_url": origin["url"]} ) rv = check_api_get_responses(api_client, url, status_code=200) oimg.assert_called_with([origin["url"]]) expected_data = {"author": "Jane Doe"} assert rv.data == expected_data def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") url = reverse("api-1-origin-metadata-search") check_api_get_responses(api_client, url, status_code=400) mock_idx_storage.assert_not_called() diff --git a/swh/web/tests/api/views/test_origin_save.py b/swh/web/tests/api/views/test_origin_save.py index 76318099..88ba55da 100644 --- a/swh/web/tests/api/views/test_origin_save.py +++ b/swh/web/tests/api/views/test_origin_save.py @@ -1,321 +1,321 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import pytest from django.utils import timezone from swh.web.common.models import ( SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_PENDING, SAVE_REQUEST_REJECTED, SAVE_TASK_FAILED, SAVE_TASK_NOT_CREATED, SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, SAVE_TASK_SUCCEEDED, SaveOriginRequest, SaveUnauthorizedOrigin, ) from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses, check_api_post_responses +from swh.web.tests.utils import check_api_get_responses, check_api_post_responses pytestmark = pytest.mark.django_db @pytest.fixture(autouse=True) def populated_db(): SaveUnauthorizedOrigin.objects.create(url="https://github.com/user/illegal_repo") SaveUnauthorizedOrigin.objects.create(url="https://gitlab.com/user_to_exclude") def test_invalid_visit_type(api_client): url = reverse( "api-1-save-origin", url_args={ "visit_type": "foo", "origin_url": "https://github.com/torvalds/linux", }, ) check_api_get_responses(api_client, url, status_code=400) def test_invalid_origin_url(api_client): url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": "bar"} ) check_api_get_responses(api_client, url, status_code=400) def check_created_save_request_status( api_client, mocker, origin_url, scheduler_task_status, expected_request_status, expected_task_status=None, visit_date=None, ): mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler") if not scheduler_task_status: mock_scheduler.get_tasks.return_value = [] else: mock_scheduler.get_tasks.return_value = [ { "priority": "high", "policy": "oneshot", "type": "load-git", "arguments": {"kwargs": {"repo_url": origin_url}, "args": []}, "status": scheduler_task_status, "id": 1, } ] mock_scheduler.create_tasks.return_value = [ { "priority": "high", "policy": "oneshot", "type": "load-git", "arguments": {"kwargs": {"repo_url": origin_url}, "args": []}, "status": "next_run_not_scheduled", "id": 1, } ] url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url} ) mock_visit_date = mocker.patch( ("swh.web.common.origin_save." "_get_visit_info_for_save_request") ) mock_visit_date.return_value = (visit_date, None) if expected_request_status != SAVE_REQUEST_REJECTED: response = check_api_post_responses(api_client, url, data=None, status_code=200) assert response.data["save_request_status"] == expected_request_status assert response.data["save_task_status"] == expected_task_status else: check_api_post_responses(api_client, url, data=None, status_code=403) def check_save_request_status( api_client, mocker, origin_url, expected_request_status, expected_task_status, scheduler_task_status="next_run_not_scheduled", visit_date=None, ): mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler") mock_scheduler.get_tasks.return_value = [ { "priority": "high", "policy": "oneshot", "type": "load-git", "arguments": {"kwargs": {"repo_url": origin_url}, "args": []}, "status": scheduler_task_status, "id": 1, } ] url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": origin_url} ) mock_visit_date = mocker.patch( ("swh.web.common.origin_save." "_get_visit_info_for_save_request") ) mock_visit_date.return_value = (visit_date, None) response = check_api_get_responses(api_client, url, status_code=200) save_request_data = response.data[0] assert save_request_data["save_request_status"] == expected_request_status assert save_request_data["save_task_status"] == expected_task_status # Check that save task status is still available when # the scheduler task has been archived mock_scheduler.get_tasks.return_value = [] response = check_api_get_responses(api_client, url, status_code=200) save_request_data = response.data[0] assert save_request_data["save_task_status"] == expected_task_status def test_save_request_rejected(api_client, mocker): origin_url = "https://github.com/user/illegal_repo" check_created_save_request_status( api_client, mocker, origin_url, None, SAVE_REQUEST_REJECTED ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_REJECTED, SAVE_TASK_NOT_CREATED ) def test_save_request_pending(api_client, mocker): origin_url = "https://unkwownforge.com/user/repo" check_created_save_request_status( api_client, mocker, origin_url, None, SAVE_REQUEST_PENDING, SAVE_TASK_NOT_CREATED, ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_PENDING, SAVE_TASK_NOT_CREATED ) def test_save_request_succeed(api_client, mocker): origin_url = "https://github.com/Kitware/CMake" check_created_save_request_status( api_client, mocker, origin_url, None, SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_SCHEDULED, scheduler_task_status="next_run_scheduled", ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_SUCCEEDED, scheduler_task_status="completed", visit_date=None, ) visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_SUCCEEDED, scheduler_task_status="completed", visit_date=visit_date, ) def test_save_request_failed(api_client, mocker): origin_url = "https://gitlab.com/inkscape/inkscape" check_created_save_request_status( api_client, mocker, origin_url, None, SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_SCHEDULED, scheduler_task_status="next_run_scheduled", ) check_save_request_status( api_client, mocker, origin_url, SAVE_REQUEST_ACCEPTED, SAVE_TASK_FAILED, scheduler_task_status="disabled", ) def test_create_save_request_only_when_needed(api_client, mocker): origin_url = "https://github.com/webpack/webpack" SaveOriginRequest.objects.create( visit_type="git", origin_url=origin_url, status=SAVE_REQUEST_ACCEPTED, loading_task_id=56, ) check_created_save_request_status( api_client, mocker, origin_url, "next_run_not_scheduled", SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) assert len(sors) == 1 check_created_save_request_status( api_client, mocker, origin_url, "next_run_scheduled", SAVE_REQUEST_ACCEPTED, SAVE_TASK_SCHEDULED, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) assert len(sors) == 1 visit_date = datetime.now(tz=timezone.utc) + timedelta(hours=1) check_created_save_request_status( api_client, mocker, origin_url, "completed", SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, visit_date=visit_date, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) # check_api_post_responses sends two POST requests to check YAML and JSON response assert len(sors) == 3 check_created_save_request_status( api_client, mocker, origin_url, "disabled", SAVE_REQUEST_ACCEPTED, SAVE_TASK_NOT_YET_SCHEDULED, ) sors = list( SaveOriginRequest.objects.filter(visit_type="git", origin_url=origin_url) ) assert len(sors) == 5 def test_get_save_requests_unknown_origin(api_client): unknown_origin_url = "https://gitlab.com/foo/bar" url = reverse( "api-1-save-origin", url_args={"visit_type": "git", "origin_url": unknown_origin_url}, ) response = check_api_get_responses(api_client, url, status_code=404) assert response.data == { "exception": "NotFoundExc", "reason": ( "No save requests found for visit of type " "git on origin with url %s." ) % unknown_origin_url, } diff --git a/swh/web/tests/api/views/test_ping.py b/swh/web/tests/api/views/test_ping.py index 89a035d4..2e2b64eb 100644 --- a/swh/web/tests/api/views/test_ping.py +++ b/swh/web/tests/api/views/test_ping.py @@ -1,13 +1,13 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses +from swh.web.tests.utils import check_api_get_responses def test_api_1_ping(api_client): url = reverse("api-1-ping") rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == "pong" diff --git a/swh/web/tests/api/views/test_release.py b/swh/web/tests/api/views/test_release.py index 54f676d6..c449c641 100644 --- a/swh/web/tests/api/views/test_release.py +++ b/swh/web/tests/api/views/test_release.py @@ -1,123 +1,122 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import ( ObjectType, Person, Release, Timestamp, TimestampWithTimezone, ) from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import content, directory, release +from swh.web.tests.utils import check_api_get_responses, check_http_get_response @given(release()) def test_api_release(api_client, archive_data, release): url = reverse("api-1-release", url_args={"sha1_git": release}) rv = check_api_get_responses(api_client, url, status_code=200) expected_release = archive_data.release_get(release) target_revision = expected_release["target"] target_url = reverse( "api-1-revision", url_args={"sha1_git": target_revision}, request=rv.wsgi_request, ) expected_release["target_url"] = target_url assert rv.data == expected_release @given(content(), directory(), release()) def test_api_release_target_type_not_a_revision( api_client, archive_data, content, directory, release ): for target_type, target in ( (ObjectType.CONTENT, content), (ObjectType.DIRECTORY, directory), (ObjectType.RELEASE, release), ): if target_type == ObjectType.CONTENT: target = target["sha1_git"] sample_release = Release( author=Person( email=b"author@company.org", fullname=b"author ", name=b"author", ), date=TimestampWithTimezone( timestamp=Timestamp( seconds=int(datetime.now().timestamp()), microseconds=0 ), offset=0, negative_utc=False, ), message=b"sample release message", name=b"sample release", synthetic=False, target=hash_to_bytes(target), target_type=target_type, ) archive_data.release_add([sample_release]) new_release_id = hash_to_hex(sample_release.id) url = reverse("api-1-release", url_args={"sha1_git": new_release_id}) rv = check_api_get_responses(api_client, url, status_code=200) expected_release = archive_data.release_get(new_release_id) if target_type == ObjectType.CONTENT: url_args = {"q": "sha1_git:%s" % target} else: url_args = {"sha1_git": target} target_url = reverse( "api-1-%s" % target_type.value, url_args=url_args, request=rv.wsgi_request ) expected_release["target_url"] = target_url assert rv.data == expected_release def test_api_release_not_found(api_client): unknown_release_ = random_sha1() url = reverse("api-1-release", url_args={"sha1_git": unknown_release_}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Release with sha1_git %s not found." % unknown_release_, } @given(release()) def test_api_release_uppercase(api_client, release): url = reverse( "api-1-release-uppercase-checksum", url_args={"sha1_git": release.upper()} ) - resp = api_client.get(url) - assert resp.status_code == 302 + resp = check_http_get_response(api_client, url, status_code=302) redirect_url = reverse( "api-1-release-uppercase-checksum", url_args={"sha1_git": release} ) assert resp["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py index f09f34da..145cb2a1 100644 --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -1,200 +1,197 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.web.api.utils import enrich_revision from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import revision +from swh.web.tests.utils import check_api_get_responses, check_http_get_response @given(revision()) def test_api_revision(api_client, archive_data, revision): url = reverse("api-1-revision", url_args={"sha1_git": revision}) rv = check_api_get_responses(api_client, url, status_code=200) expected_revision = archive_data.revision_get(revision) enrich_revision(expected_revision, rv.wsgi_request) assert rv.data == expected_revision def test_api_revision_not_found(api_client): unknown_revision_ = random_sha1() url = reverse("api-1-revision", url_args={"sha1_git": unknown_revision_}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Revision with sha1_git %s not found." % unknown_revision_, } @given(revision()) def test_api_revision_raw_ok(api_client, archive_data, revision): url = reverse("api-1-revision-raw-message", url_args={"sha1_git": revision}) - rv = api_client.get(url) expected_message = archive_data.revision_get(revision)["message"] - assert rv.status_code == 200 + rv = check_http_get_response(api_client, url, status_code=200) assert rv["Content-Type"] == "application/octet-stream" - assert rv.content == expected_message.encode() def test_api_revision_raw_ko_no_rev(api_client): unknown_revision_ = random_sha1() url = reverse( "api-1-revision-raw-message", url_args={"sha1_git": unknown_revision_} ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Revision with sha1_git %s not found." % unknown_revision_, } @given(revision()) def test_api_revision_log(api_client, archive_data, revision): limit = 10 url = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, query_params={"limit": limit}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_log = archive_data.revision_log(revision, limit=limit) expected_log = list( map(enrich_revision, expected_log, [rv.wsgi_request] * len(expected_log)) ) assert rv.data == expected_log def test_api_revision_log_not_found(api_client): unknown_revision_ = random_sha1() url = reverse("api-1-revision-log", url_args={"sha1_git": unknown_revision_}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Revision with sha1_git %s not found." % unknown_revision_, } assert not rv.has_header("Link") def test_api_revision_directory_ko_not_found(api_client, mocker): mock_rev_dir = mocker.patch("swh.web.api.views.revision._revision_directory_by") mock_rev_dir.side_effect = NotFoundExc("Not found") url = "/api/1/revision/999/directory/some/path/to/dir/" rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == {"exception": "NotFoundExc", "reason": "Not found"} mock_rev_dir.assert_called_with( {"sha1_git": "999"}, "some/path/to/dir", url, with_data=False, ) def test_api_revision_directory_ok_returns_dir_entries(api_client, mocker): mock_rev_dir = mocker.patch("swh.web.api.views.revision._revision_directory_by") stub_dir = { "type": "dir", "revision": "999", "content": [ { "sha1_git": "789", "type": "file", "target": "101", "target_url": "/api/1/content/sha1_git:101/", "name": "somefile", "file_url": "/api/1/revision/999/directory/some/path/" "somefile/", }, { "sha1_git": "123", "type": "dir", "target": "456", "target_url": "/api/1/directory/456/", "name": "to-subdir", "dir_url": "/api/1/revision/999/directory/some/path/" "to-subdir/", }, ], } mock_rev_dir.return_value = stub_dir url = "/api/1/revision/999/directory/some/path/" rv = check_api_get_responses(api_client, url, status_code=200) stub_dir["content"][0]["target_url"] = rv.wsgi_request.build_absolute_uri( stub_dir["content"][0]["target_url"] ) stub_dir["content"][0]["file_url"] = rv.wsgi_request.build_absolute_uri( stub_dir["content"][0]["file_url"] ) stub_dir["content"][1]["target_url"] = rv.wsgi_request.build_absolute_uri( stub_dir["content"][1]["target_url"] ) stub_dir["content"][1]["dir_url"] = rv.wsgi_request.build_absolute_uri( stub_dir["content"][1]["dir_url"] ) assert rv.data == stub_dir mock_rev_dir.assert_called_with( {"sha1_git": "999"}, "some/path", url, with_data=False, ) def test_api_revision_directory_ok_returns_content(api_client, mocker): mock_rev_dir = mocker.patch("swh.web.api.views.revision._revision_directory_by") stub_content = { "type": "file", "revision": "999", "content": { "sha1_git": "789", "sha1": "101", "data_url": "/api/1/content/101/raw/", }, } mock_rev_dir.return_value = stub_content url = "/api/1/revision/666/directory/some/other/path/" rv = check_api_get_responses(api_client, url, status_code=200) stub_content["content"]["data_url"] = rv.wsgi_request.build_absolute_uri( stub_content["content"]["data_url"] ) assert rv.data == stub_content mock_rev_dir.assert_called_with( {"sha1_git": "666"}, "some/other/path", url, with_data=False ) @given(revision()) def test_api_revision_uppercase(api_client, revision): url = reverse( "api-1-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()} ) - resp = api_client.get(url) - assert resp.status_code == 302 + resp = check_http_get_response(api_client, url, status_code=302) redirect_url = reverse("api-1-revision", url_args={"sha1_git": revision}) assert resp["location"] == redirect_url diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py index f44ab008..65047568 100644 --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -1,153 +1,152 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import hash_to_hex from swh.model.model import Snapshot from swh.web.api.utils import enrich_snapshot from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import new_snapshot, snapshot +from swh.web.tests.utils import check_api_get_responses, check_http_get_response @given(snapshot()) def test_api_snapshot(api_client, archive_data, snapshot): url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = {**archive_data.snapshot_get(snapshot), "next_branch": None} expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.data == expected_data @given(snapshot()) def test_api_snapshot_paginated(api_client, archive_data, snapshot): branches_offset = 0 branches_count = 2 snapshot_branches = [] for k, v in sorted(archive_data.snapshot_get(snapshot)["branches"].items()): snapshot_branches.append( {"name": k, "target_type": v["target_type"], "target": v["target"]} ) whole_snapshot = {"id": snapshot, "branches": {}, "next_branch": None} while branches_offset < len(snapshot_branches): branches_from = snapshot_branches[branches_offset]["name"] url = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={ "branches_from": branches_from, "branches_count": branches_count, }, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.snapshot_get_branches( snapshot, branches_from, branches_count ) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) branches_offset += branches_count if branches_offset < len(snapshot_branches): next_branch = snapshot_branches[branches_offset]["name"] expected_data["next_branch"] = next_branch else: expected_data["next_branch"] = None assert rv.data == expected_data whole_snapshot["branches"].update(expected_data["branches"]) if branches_offset < len(snapshot_branches): next_url = rv.wsgi_request.build_absolute_uri( reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={ "branches_from": next_branch, "branches_count": branches_count, }, ) ) assert rv["Link"] == '<%s>; rel="next"' % next_url else: assert not rv.has_header("Link") url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == whole_snapshot @given(snapshot()) def test_api_snapshot_filtered(api_client, archive_data, snapshot): snapshot_branches = [] for k, v in sorted(archive_data.snapshot_get(snapshot)["branches"].items()): snapshot_branches.append( {"name": k, "target_type": v["target_type"], "target": v["target"]} ) target_type = random.choice(snapshot_branches)["target_type"] url = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={"target_types": target_type}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.snapshot_get_branches( snapshot, target_types=target_type ) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.data == expected_data def test_api_snapshot_errors(api_client): unknown_snapshot_ = random_sha1() url = reverse("api-1-snapshot", url_args={"snapshot_id": "63ce369"}) check_api_get_responses(api_client, url, status_code=400) url = reverse("api-1-snapshot", url_args={"snapshot_id": unknown_snapshot_}) check_api_get_responses(api_client, url, status_code=404) @given(snapshot()) def test_api_snapshot_uppercase(api_client, snapshot): url = reverse( "api-1-snapshot-uppercase-checksum", url_args={"snapshot_id": snapshot.upper()} ) - resp = api_client.get(url) - assert resp.status_code == 302 + resp = check_http_get_response(api_client, url, status_code=302) redirect_url = reverse( "api-1-snapshot-uppercase-checksum", url_args={"snapshot_id": snapshot} ) assert resp["location"] == redirect_url @given(new_snapshot(min_size=4)) def test_api_snapshot_null_branch(api_client, archive_data, new_snapshot): snp_dict = new_snapshot.to_dict() snp_id = hash_to_hex(snp_dict["id"]) for branch in snp_dict["branches"].keys(): snp_dict["branches"][branch] = None break archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) url = reverse("api-1-snapshot", url_args={"snapshot_id": snp_id}) check_api_get_responses(api_client, url, status_code=200) diff --git a/swh/web/tests/api/views/test_stat.py b/swh/web/tests/api/views/test_stat.py index 6030147a..28ad3cee 100644 --- a/swh/web/tests/api/views/test_stat.py +++ b/swh/web/tests/api/views/test_stat.py @@ -1,59 +1,59 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage.exc import StorageAPIError, StorageDBError from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses +from swh.web.tests.utils import check_api_get_responses def test_api_1_stat_counters_raise_error(api_client, mocker): mock_archive = mocker.patch("swh.web.api.views.stat.archive") mock_archive.stat_counters.side_effect = BadInputExc( "voluntary error to check the bad request middleware." ) url = reverse("api-1-stat-counters") rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "voluntary error to check the bad request middleware.", } def test_api_1_stat_counters_raise_from_db(api_client, mocker): mock_archive = mocker.patch("swh.web.api.views.stat.archive") mock_archive.stat_counters.side_effect = StorageDBError( "Storage exploded! Will be back online shortly!" ) url = reverse("api-1-stat-counters") rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageDBError", "reason": "An unexpected error occurred in the backend: " "Storage exploded! Will be back online shortly!", } def test_api_1_stat_counters_raise_from_api(api_client, mocker): mock_archive = mocker.patch("swh.web.api.views.stat.archive") mock_archive.stat_counters.side_effect = StorageAPIError( "Storage API dropped dead! Will resurrect from its ashes asap!" ) url = reverse("api-1-stat-counters") rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageAPIError", "reason": "An unexpected error occurred in the api backend: " "Storage API dropped dead! Will resurrect from its ashes asap!", } def test_api_1_stat_counters(api_client, archive_data): url = reverse("api-1-stat-counters") rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == archive_data.stat_counters() diff --git a/swh/web/tests/api/views/test_vault.py b/swh/web/tests/api/views/test_vault.py index 71772bb5..d1d4da13 100644 --- a/swh/web/tests/api/views/test_vault.py +++ b/swh/web/tests/api/views/test_vault.py @@ -1,166 +1,167 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.model import hashutil from swh.vault.exc import NotFoundExc from swh.web.common.utils import reverse -from swh.web.tests.api.views import check_api_get_responses, check_api_post_responses from swh.web.tests.strategies import ( directory, revision, unknown_directory, unknown_revision, ) +from swh.web.tests.utils import ( + check_api_get_responses, + check_api_post_responses, + check_http_get_response, + check_http_post_response, +) @given(directory(), revision()) def test_api_vault_cook(api_client, mocker, directory, revision): mock_archive = mocker.patch("swh.web.api.views.vault.archive") for obj_type, obj_id in ( ("directory", directory), ("revision_gitfast", revision), ): fetch_url = reverse( f"api-1-vault-fetch-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) stub_cook = { "fetch_url": fetch_url, "obj_id": obj_id, "obj_type": obj_type, "progress_message": None, "status": "done", "task_uuid": "de75c902-5ee5-4739-996e-448376a93eff", } stub_fetch = b"content" mock_archive.vault_cook.return_value = stub_cook mock_archive.vault_fetch.return_value = stub_fetch email = "test@test.mail" url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, query_params={"email": email}, ) rv = check_api_post_responses(api_client, url, data=None, status_code=200) stub_cook["fetch_url"] = rv.wsgi_request.build_absolute_uri( stub_cook["fetch_url"] ) assert rv.data == stub_cook mock_archive.vault_cook.assert_called_with( obj_type, hashutil.hash_to_bytes(obj_id), email ) - rv = api_client.get(fetch_url) - - assert rv.status_code == 200 + rv = check_http_get_response(api_client, fetch_url, status_code=200) assert rv["Content-Type"] == "application/gzip" assert rv.content == stub_fetch mock_archive.vault_fetch.assert_called_with( obj_type, hashutil.hash_to_bytes(obj_id) ) @given(directory(), revision()) def test_api_vault_cook_uppercase_hash(api_client, directory, revision): for obj_type, obj_id in ( ("directory", directory), ("revision_gitfast", revision), ): url = reverse( f"api-1-vault-cook-{obj_type}-uppercase-checksum", url_args={f"{obj_type[:3]}_id": obj_id.upper()}, ) - rv = api_client.post(url, {"email": "test@test.mail"}) - - assert rv.status_code == 302 + rv = check_http_post_response( + api_client, url, data={"email": "test@test.mail"}, status_code=302 + ) redirect_url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id} ) assert rv["location"] == redirect_url fetch_url = reverse( f"api-1-vault-fetch-{obj_type}-uppercase-checksum", url_args={f"{obj_type[:3]}_id": obj_id.upper()}, ) - rv = api_client.get(fetch_url) - - assert rv.status_code == 302 + rv = check_http_get_response(api_client, fetch_url, status_code=302) redirect_url = reverse( f"api-1-vault-fetch-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) assert rv["location"] == redirect_url @given(directory(), revision(), unknown_directory(), unknown_revision()) def test_api_vault_cook_notfound( api_client, mocker, directory, revision, unknown_directory, unknown_revision ): mock_vault = mocker.patch("swh.web.common.archive.vault") mock_vault.cook.side_effect = NotFoundExc("object not found") mock_vault.fetch.side_effect = NotFoundExc("cooked archive not found") mock_vault.progress.side_effect = NotFoundExc("cooking request not found") for obj_type, obj_id in ( ("directory", directory), ("revision_gitfast", revision), ): obj_name = obj_type.split("_")[0] url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert ( rv.data["reason"] == f"Cooking of {obj_name} '{obj_id}' was never requested." ) mock_vault.progress.assert_called_with(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in ( ("directory", unknown_directory), ("revision_gitfast", unknown_revision), ): obj_name = obj_type.split("_")[0] url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id} ) rv = check_api_post_responses(api_client, url, data=None, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert rv.data["reason"] == f"{obj_name.title()} '{obj_id}' not found." mock_vault.cook.assert_called_with( obj_type, hashutil.hash_to_bytes(obj_id), email=None ) fetch_url = reverse( f"api-1-vault-fetch-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) rv = check_api_get_responses(api_client, fetch_url, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert ( rv.data["reason"] == f"Cooked archive for {obj_name} '{obj_id}' not found." ) mock_vault.fetch.assert_called_with(obj_type, hashutil.hash_to_bytes(obj_id)) diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py index f05ec593..8f800372 100644 --- a/swh/web/tests/auth/test_api_auth.py +++ b/swh/web/tests/auth/test_api_auth.py @@ -1,114 +1,107 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.contrib.auth.models import AnonymousUser, User from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse +from swh.web.tests.utils import check_api_get_responses, check_http_get_response from . import sample_data from .keycloak_mock import mock_keycloak @pytest.mark.django_db def test_drf_django_session_auth_success(mocker, client): """ Check user gets authenticated when querying the web api through a web browser. """ url = reverse("api-1-stat-counters") mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") - response = client.get(url) + response = check_http_get_response(client, url, status_code=200) request = response.wsgi_request - assert response.status_code == 200 - # user should be authenticated assert isinstance(request.user, OIDCUser) # check remoter used has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_success(mocker, api_client): """ Check user gets authenticated when querying the web api through an HTTP client using bearer token authentication. """ url = reverse("api-1-stat-counters") refresh_token = sample_data.oidc_profile["refresh_token"] mock_keycloak(mocker) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - response = api_client.get(url) + response = check_api_get_responses(api_client, url, status_code=200) request = response.wsgi_request - assert response.status_code == 200 - # user should be authenticated assert isinstance(request.user, OIDCUser) # check remoter used has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_failure(mocker, api_client): url = reverse("api-1-stat-counters") refresh_token = sample_data.oidc_profile["refresh_token"] # check for failed authentication but with expected token format mock_keycloak(mocker, auth_success=False) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - response = api_client.get(url) + response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request - assert response.status_code == 403 assert isinstance(request.user, AnonymousUser) # check for failed authentication when token format is invalid api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà") - response = api_client.get(url) + response = check_api_get_responses(api_client, url, status_code=400) request = response.wsgi_request - assert response.status_code == 400 assert isinstance(request.user, AnonymousUser) def test_drf_oidc_auth_invalid_or_missing_authorization_type(api_client): url = reverse("api-1-stat-counters") refresh_token = sample_data.oidc_profile["refresh_token"] # missing authorization type api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}") - response = api_client.get(url) + response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request - assert response.status_code == 403 assert isinstance(request.user, AnonymousUser) # invalid authorization type api_client.credentials(HTTP_AUTHORIZATION="Foo token") - response = api_client.get(url) + response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request - assert response.status_code == 403 assert isinstance(request.user, AnonymousUser) diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py index 636cb7b6..1c19007e 100644 --- a/swh/web/tests/auth/test_middlewares.py +++ b/swh/web/tests/auth/test_middlewares.py @@ -1,48 +1,47 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import pytest from django.test import modify_settings from swh.web.common.utils import reverse +from swh.web.tests.utils import check_html_get_response from .keycloak_mock import mock_keycloak @pytest.mark.django_db @modify_settings( MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionRefreshMiddleware"]} ) def test_oidc_session_refresh_middleware_disabled(client, mocker): # authenticate but make session expires immediately kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp())) client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") - resp = client.get(url) # no redirection for silent refresh - assert resp.status_code != 302 + check_html_get_response(client, url, status_code=200) @pytest.mark.django_db def test_oidc_session_refresh_middleware_enabled(client, mocker): # authenticate but make session expires immediately kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp())) client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") - resp = client.get(url) # should redirect for silent session refresh - assert resp.status_code == 302 + resp = check_html_get_response(client, url, status_code=302) silent_refresh_url = reverse( "oidc-login", query_params={"next_path": url, "prompt": "none"} ) assert resp["location"] == silent_refresh_url diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index 8305ef7e..f2b4b596 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,533 +1,527 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid from keycloak.exceptions import KeycloakError import pytest from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.common.utils import reverse -from swh.web.tests.django_asserts import assert_contains, assert_template_used +from swh.web.tests.django_asserts import assert_contains +from swh.web.tests.utils import ( + check_html_get_response, + check_http_get_response, + check_http_post_response, +) from swh.web.urls import _default_view as homepage_view from . import sample_data from .keycloak_mock import mock_keycloak @pytest.mark.django_db def test_oidc_login_views_success(client, mocker): """ Simulate a successful login authentication with OpenID Connect authorization code flow with PKCE. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # user initiates login process login_url = reverse("oidc-login") - response = client.get(login_url) - request = response.wsgi_request # should redirect to Keycloak authentication page in order # for a user to login with its username / password - assert response.status_code == 302 + response = check_html_get_response(client, login_url, status_code=302) + request = response.wsgi_request + assert isinstance(request.user, AnonymousUser) parsed_url = urlparse(response["location"]) authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == reverse("oidc-login-complete", request=request) assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == "openid" assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-login-complete' view to # login in Django. # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) - # login process finalization - response = client.get(login_complete_url) + # login process finalization, should redirect to root url by default + response = check_html_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request - # should redirect to root url by default - assert response.status_code == 302 assert response["location"] == request.build_absolute_uri("/") # user should be authenticated assert isinstance(request.user, OIDCUser) # check remote user has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_oidc_logout_view_success(client, mocker): """ Simulate a successful logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() # user initiates logout oidc_logout_url = reverse("oidc-logout") - response = client.get(oidc_logout_url) - request = response.wsgi_request # should redirect to logout page - assert response.status_code == 302 + response = check_html_get_response(client, oidc_logout_url, status_code=302) + request = response.wsgi_request + logout_url = reverse("logout", query_params={"remote_user": 1}) assert response["location"] == request.build_absolute_uri(logout_url) # should have been logged out in Keycloak kc_oidc_mock.logout.assert_called_with(sample_data.oidc_profile["refresh_token"]) # check effective logout in Django assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_view_failure(client, mocker): """ Simulate a failed authentication with OpenID Connect. """ # mock Keycloak client mock_keycloak(mocker, auth_success=False) # user initiates login process login_url = reverse("oidc-login") - response = client.get(login_url) - request = response.wsgi_request - # should render an error page - assert response.status_code == 500 - assert_template_used(response, "error.html") + response = check_html_get_response( + client, login_url, status_code=500, template_used="error.html" + ) + request = response.wsgi_request # no users should be logged in assert isinstance(request.user, AnonymousUser) # Simulate possible errors with OpenID Connect in the login complete view. def test_oidc_login_complete_view_no_login_data(client, mocker): # user initiates login process login_url = reverse("oidc-login-complete") - response = client.get(login_url) - # should render an error page - assert_template_used(response, "error.html") + response = check_html_get_response( + client, login_url, status_code=500, template_used="error.html" + ) + assert_contains( response, "Login process has not been initialized.", status_code=500 ) def test_oidc_login_complete_view_missing_parameters(client, mocker): # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # user initiates login process login_url = reverse("oidc-login-complete") - response = client.get(login_url) - request = response.wsgi_request - # should render an error page - assert_template_used(response, "error.html") + response = check_html_get_response( + client, login_url, status_code=400, template_used="error.html" + ) + request = response.wsgi_request assert_contains( response, "Missing query parameters for authentication.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) def test_oidc_login_complete_wrong_csrf_token(client, mocker): # mock Keycloak client mock_keycloak(mocker) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # user initiates login process login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} ) - response = client.get(login_url) - request = response.wsgi_request - # should render an error page - assert_template_used(response, "error.html") + response = check_html_get_response( + client, login_url, status_code=400, template_used="error.html" + ) + request = response.wsgi_request assert_contains( response, "Wrong CSRF token, aborting login process.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_complete_wrong_code_verifier(client, mocker): # mock Keycloak client mock_keycloak(mocker, auth_success=False) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # check authentication error is reported login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": session["login_data"]["state"]}, ) - response = client.get(login_url) - request = response.wsgi_request - # should render an error page - assert_template_used(response, "error.html") + response = check_html_get_response( + client, login_url, status_code=500, template_used="error.html" + ) + request = response.wsgi_request assert_contains(response, "User authentication failed.", status_code=500) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_logout_view_failure(client, mocker): """ Simulate a failed logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") err_msg = "Authentication server error" kc_oidc_mock.logout.side_effect = Exception(err_msg) # user initiates logout process logout_url = reverse("oidc-logout") - response = client.get(logout_url) - request = response.wsgi_request - # should render an error page - assert_template_used(response, "error.html") + response = check_html_get_response( + client, logout_url, status_code=500, template_used="error.html" + ) + request = response.wsgi_request assert_contains(response, err_msg, status_code=500) # user should be logged out from Django anyway assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_silent_refresh_failure(client, mocker): # mock Keycloak client mock_keycloak(mocker) next_path = reverse("swh-web-homepage") # silent session refresh initialization login_url = reverse( "oidc-login", query_params={"next_path": next_path, "prompt": "none"} ) - response = client.get(login_url) + response = check_http_get_response(client, login_url, status_code=302) request = response.wsgi_request login_data = request.session["login_data"] # check prompt value has been registered in user session assert "prompt" in login_data assert login_data["prompt"] == "none" # simulate a failed silent session refresh session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "error": "login_required", "state": login_data["state"], "session_state": session_state, }, ) - # login process finalization - response = client.get(login_complete_url) + # login process finalization, should redirect to logout page + response = check_http_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request - - # should redirect to logout page - assert response.status_code == 302 logout_url = reverse( "logout", query_params={"next_path": next_path, "remote_user": 1} ) assert response["location"] == logout_url def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") - response = client.post(url, data={"password": "secret"}) - assert response.status_code == 403 + check_http_post_response(client, url, data={"password": "secret"}, status_code=403) def _generate_bearer_token(client, password): client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) url = reverse("oidc-generate-bearer-token") return client.post( url, data={"password": password}, content_type="application/json" ) @pytest.mark.django_db def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): """ User with correct credentials should be allowed to generate a token. """ kc_mock = mock_keycloak(mocker) password = "secret" response = _generate_bearer_token(client, password) user = response.wsgi_request.user assert response.status_code == 200 assert response.content.decode("ascii") == kc_mock.offline_token( username=user.username, password=password ) @pytest.mark.django_db def test_oidc_generate_bearer_token_authenticated_user_failure(client, mocker): """ User with wrong credentials should not be allowed to generate a token. """ response_code = 401 kc_mock = mock_keycloak(mocker) kc_mock.offline_token.side_effect = KeycloakError( error_message="Invalid password", response_code=response_code ) response = _generate_bearer_token(client, password="invalid-password") assert response.status_code == response_code def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) - response = client.get(url) - assert response.status_code == 403 + check_http_get_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_list_bearer_tokens(client, mocker): """ User with correct credentials should be allowed to list his tokens. """ mock_keycloak(mocker) nb_tokens = 3 password = "secret" for _ in range(nb_tokens): response = _generate_bearer_token(client, password) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) - response = client.get(url) - assert response.status_code == 200 + + response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") - response = client.post(url) - assert response.status_code == 403 + check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_get_bearer_token(client, mocker): """ User with correct credentials should be allowed to display a token. """ mock_keycloak(mocker) nb_tokens = 3 password = "secret" for i in range(nb_tokens): response = _generate_bearer_token(client, password) token = response.content url = reverse("oidc-get-bearer-token") - response = client.post( + + response = check_http_post_response( + client, url, + status_code=200, data={"password": password, "token_id": i + 1}, - content_type="application/json", + content_type="text/plain", ) - assert response.status_code == 200 assert response.content == token @pytest.mark.django_db def test_oidc_get_bearer_token_invalid_password(client, mocker): """ User with wrong credentials should not be allowed to display a token. """ mock_keycloak(mocker) password = "secret" _generate_bearer_token(client, password) url = reverse("oidc-get-bearer-token") - response = client.post( + check_http_post_response( + client, url, + status_code=401, data={"password": "invalid-password", "token_id": 1}, - content_type="application/json", ) - assert response.status_code == 401 def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") - response = client.post(url) - assert response.status_code == 403 + check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_revoke_bearer_tokens(client, mocker): """ User with correct credentials should be allowed to revoke tokens. """ mock_keycloak(mocker) nb_tokens = 3 password = "secret" for _ in range(nb_tokens): _generate_bearer_token(client, password) url = reverse("oidc-revoke-bearer-tokens") - response = client.post( - url, - data={"password": password, "token_ids": [1]}, - content_type="application/json", + + check_http_post_response( + client, url, status_code=200, data={"password": password, "token_ids": [1]}, ) - assert response.status_code == 200 assert len(OIDCUserOfflineTokens.objects.all()) == 2 - response = client.post( - url, - data={"password": password, "token_ids": [2, 3]}, - content_type="application/json", + check_http_post_response( + client, url, status_code=200, data={"password": password, "token_ids": [2, 3]}, ) - assert response.status_code == 200 assert len(OIDCUserOfflineTokens.objects.all()) == 0 @pytest.mark.django_db def test_oidc_revoke_bearer_token_invalid_password(client, mocker): """ User with wrong credentials should not be allowed to revoke tokens. """ mock_keycloak(mocker) password = "secret" _generate_bearer_token(client, password) url = reverse("oidc-revoke-bearer-tokens") - response = client.post( + + check_http_post_response( + client, url, + status_code=401, data={"password": "invalid-password", "token_ids": [1]}, - content_type="application/json", ) - assert response.status_code == 401 diff --git a/swh/web/tests/browse/views/test_content.py b/swh/web/tests/browse/views/test_content.py index 835c306a..bb0dfde1 100644 --- a/swh/web/tests/browse/views/test_content.py +++ b/swh/web/tests/browse/views/test_content.py @@ -1,589 +1,588 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from django.utils.html import escape from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.web.browse.snapshot_context import process_snapshot_branches from swh.web.browse.utils import ( _re_encode_content, get_mimetype_and_encoding_for_content, prepare_content_for_display, ) from swh.web.common.exc import NotFoundExc from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import gen_path_info, reverse -from swh.web.tests.django_asserts import ( - assert_contains, - assert_not_contains, - assert_template_used, -) +from swh.web.tests.django_asserts import assert_contains, assert_not_contains from swh.web.tests.strategies import ( content, content_image_type, content_text, content_text_no_highlight, content_text_non_utf8, content_unsupported_image_type_rendering, content_utf8_detected_as_binary, invalid_sha1, origin_with_multiple_visits, unknown_content, ) +from swh.web.tests.utils import check_html_get_response, check_http_get_response @given(content_text()) def test_content_view_text(client, archive_data, content): sha1_git = content["sha1_git"] url = reverse( "browse-content", url_args={"query_string": content["sha1"]}, query_params={"path": content["path"]}, ) url_raw = reverse("browse-content-raw", url_args={"query_string": content["sha1"]}) - resp = client.get(url) + resp = check_html_get_response( + client, url, status_code=200, template_used="browse/content.html" + ) content_display = _process_content_for_display(archive_data, content) mimetype = content_display["mimetype"] - assert resp.status_code == 200 - assert_template_used(resp, "browse/content.html") - if mimetype.startswith("text/"): assert_contains(resp, '' % content_display["language"]) assert_contains(resp, escape(content_display["content_data"])) assert_contains(resp, url_raw) swh_cnt_id = gen_swhid(CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) @given(content_text_no_highlight()) def test_content_view_text_no_highlight(client, archive_data, content): sha1_git = content["sha1_git"] url = reverse("browse-content", url_args={"query_string": content["sha1"]}) url_raw = reverse("browse-content-raw", url_args={"query_string": content["sha1"]}) - resp = client.get(url) + resp = check_html_get_response( + client, url, status_code=200, template_used="browse/content.html" + ) content_display = _process_content_for_display(archive_data, content) - assert resp.status_code == 200 - assert_template_used(resp, "browse/content.html") - assert_contains(resp, '') assert_contains(resp, escape(content_display["content_data"])) assert_contains(resp, url_raw) swh_cnt_id = gen_swhid(CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) @given(content_text_non_utf8()) def test_content_view_no_utf8_text(client, archive_data, content): sha1_git = content["sha1_git"] url = reverse("browse-content", url_args={"query_string": content["sha1"]}) - resp = client.get(url) + resp = check_html_get_response( + client, url, status_code=200, template_used="browse/content.html" + ) content_display = _process_content_for_display(archive_data, content) - assert resp.status_code == 200 - assert_template_used(resp, "browse/content.html") swh_cnt_id = gen_swhid(CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, escape(content_display["content_data"])) @given(content_image_type()) def test_content_view_image(client, archive_data, content): url = reverse("browse-content", url_args={"query_string": content["sha1"]}) url_raw = reverse("browse-content-raw", url_args={"query_string": content["sha1"]}) - resp = client.get(url) + resp = check_html_get_response( + client, url, status_code=200, template_used="browse/content.html" + ) content_display = _process_content_for_display(archive_data, content) mimetype = content_display["mimetype"] content_data = content_display["content_data"] - - assert resp.status_code == 200 - assert_template_used(resp, "browse/content.html") assert_contains(resp, '' % (mimetype, content_data)) assert_contains(resp, url_raw) @given(content_unsupported_image_type_rendering()) def test_content_view_image_no_rendering(client, archive_data, content): url = reverse("browse-content", url_args={"query_string": content["sha1"]}) - resp = client.get(url) + resp = check_html_get_response( + client, url, status_code=200, template_used="browse/content.html" + ) mimetype = content["mimetype"] encoding = content["encoding"] - - assert resp.status_code == 200 - assert_template_used(resp, "browse/content.html") assert_contains( resp, ( f"Content with mime type {mimetype} and encoding {encoding} " "cannot be displayed." ), ) @given(content_text()) def test_content_view_text_with_path(client, archive_data, content): path = content["path"] url = reverse( "browse-content", url_args={"query_string": content["sha1"]}, query_params={"path": path}, ) - resp = client.get(url) - assert resp.status_code == 200 - assert_template_used(resp, "browse/content.html") + resp = check_html_get_response( + client, url, status_code=200, template_used="browse/content.html" + ) assert_contains(resp, '