diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py index 7d842412..30312470 100644 --- a/swh/web/tests/api/views/test_identifiers.py +++ b/swh/web/tests/api/views/test_identifiers.py @@ -1,179 +1,164 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from hypothesis import given from swh.model.swhids import ObjectType from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 -from swh.web.tests.strategies import ( - unknown_content, - unknown_directory, - unknown_release, - unknown_revision, - unknown_snapshot, -) from swh.web.tests.utils import check_api_get_responses, check_api_post_responses def test_swhid_resolve_success( api_client, content, directory, origin, release, revision, snapshot ): for obj_type, obj_id in ( (ObjectType.CONTENT, content["sha1_git"]), (ObjectType.DIRECTORY, directory), (ObjectType.RELEASE, release), (ObjectType.REVISION, revision), (ObjectType.SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id, metadata={"origin": origin["url"]}) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) resp = check_api_get_responses(api_client, url, status_code=200) if obj_type == ObjectType.CONTENT: url_args = {"query_string": "sha1_git:%s" % obj_id} elif obj_type == ObjectType.SNAPSHOT: url_args = {"snapshot_id": obj_id} else: url_args = {"sha1_git": obj_id} obj_type_str = obj_type.name.lower() browse_rev_url = reverse( f"browse-{obj_type_str}", url_args=url_args, query_params={"origin_url": origin["url"]}, request=resp.wsgi_request, ) expected_result = { "browse_url": browse_rev_url, "metadata": {"origin": origin["url"]}, "namespace": "swh", "object_id": obj_id, "object_type": obj_type_str, "scheme_version": 1, } assert resp.data == expected_result def test_swhid_resolve_invalid(api_client): rev_id_invalid = "96db9023b8_foo_50d6c108e9a3" swhid = "swh:1:rev:%s" % rev_id_invalid url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) check_api_get_responses(api_client, url, status_code=400) -@given( - unknown_content(), - unknown_directory(), - unknown_release(), - unknown_revision(), - unknown_snapshot(), -) def test_swhid_resolve_not_found( api_client, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): for obj_type, obj_id in ( (ObjectType.CONTENT, unknown_content["sha1_git"]), (ObjectType.DIRECTORY, unknown_directory), (ObjectType.RELEASE, unknown_release), (ObjectType.REVISION, unknown_revision), (ObjectType.SNAPSHOT, unknown_snapshot), ): swhid = gen_swhid(obj_type, obj_id) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) check_api_get_responses(api_client, url, status_code=404) def test_swh_origin_id_not_resolvable(api_client): ori_swhid = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb" url = reverse("api-1-resolve-swhid", url_args={"swhid": ori_swhid}) check_api_get_responses(api_client, url, status_code=400) def test_api_known_swhid_all_present( api_client, content, directory, release, revision, snapshot ): input_swhids = [ gen_swhid(ObjectType.CONTENT, content["sha1_git"]), gen_swhid(ObjectType.DIRECTORY, directory), gen_swhid(ObjectType.REVISION, revision), gen_swhid(ObjectType.RELEASE, release), gen_swhid(ObjectType.SNAPSHOT, snapshot), ] url = reverse("api-1-known") resp = check_api_post_responses(api_client, url, data=input_swhids, status_code=200) assert resp.data == {swhid: {"known": True} for swhid in input_swhids} def test_api_known_swhid_some_present(api_client, content, directory): content_ = gen_swhid(ObjectType.CONTENT, content["sha1_git"]) directory_ = gen_swhid(ObjectType.DIRECTORY, directory) unknown_revision_ = gen_swhid(ObjectType.REVISION, random_sha1()) unknown_release_ = gen_swhid(ObjectType.RELEASE, random_sha1()) unknown_snapshot_ = gen_swhid(ObjectType.SNAPSHOT, random_sha1()) input_swhids = [ content_, directory_, unknown_revision_, unknown_release_, unknown_snapshot_, ] url = reverse("api-1-known") resp = check_api_post_responses(api_client, url, data=input_swhids, status_code=200) assert resp.data == { content_: {"known": True}, directory_: {"known": True}, unknown_revision_: {"known": False}, unknown_release_: {"known": False}, unknown_snapshot_: {"known": False}, } def test_api_known_invalid_swhid(api_client): invalid_swhid_sha1 = ["swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13de;"] invalid_swhid_type = ["swh:1:cnn:8068d0075010b590762c6cb5682ed53cb3c13deb"] url = reverse("api-1-known") check_api_post_responses(api_client, url, data=invalid_swhid_sha1, status_code=400) check_api_post_responses(api_client, url, data=invalid_swhid_type, status_code=400) def test_api_known_raises_large_payload_error(api_client): random_swhid = "swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13deb" limit = 10000 err_msg = "The maximum number of SWHIDs this endpoint can receive is 1000" swhids = [random_swhid for i in range(limit)] url = reverse("api-1-known") resp = check_api_post_responses(api_client, url, data=swhids, status_code=413) assert resp.data == {"exception": "LargePayloadExc", "reason": err_msg} diff --git a/swh/web/tests/api/views/test_vault.py b/swh/web/tests/api/views/test_vault.py index d8e6117b..5945ce80 100644 --- a/swh/web/tests/api/views/test_vault.py +++ b/swh/web/tests/api/views/test_vault.py @@ -1,329 +1,325 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re -from hypothesis import given import pytest from swh.model.swhids import CoreSWHID from swh.vault.exc import NotFoundExc from swh.web.common.utils import reverse -from swh.web.tests.strategies import unknown_directory, unknown_revision from swh.web.tests.utils import ( check_api_get_responses, check_api_post_responses, check_http_get_response, check_http_post_response, ) ##################### # Current API: def test_api_vault_cook(api_client, mocker, directory, revision): mock_archive = mocker.patch("swh.web.api.views.vault.archive") for bundle_type, swhid, content_type, in ( ("flat", f"swh:1:dir:{directory}", "application/gzip"), ("gitfast", f"swh:1:rev:{revision}", "application/gzip"), ("git_bare", f"swh:1:rev:{revision}", "application/x-tar"), ): swhid = CoreSWHID.from_string(swhid) fetch_url = reverse( f"api-1-vault-fetch-{bundle_type.replace('_', '-')}", url_args={"swhid": str(swhid)}, ) stub_cook = { "type": bundle_type, "progress_msg": None, "task_id": 1, "task_status": "done", "swhid": swhid, } stub_fetch = b"content" mock_archive.vault_cook.return_value = stub_cook mock_archive.vault_fetch.return_value = stub_fetch email = "test@test.mail" url = reverse( f"api-1-vault-cook-{bundle_type.replace('_', '-')}", url_args={"swhid": str(swhid)}, query_params={"email": email}, ) rv = check_api_post_responses(api_client, url, data=None, status_code=200) assert rv.data == { "fetch_url": rv.wsgi_request.build_absolute_uri(fetch_url), "progress_message": None, "id": 1, "status": "done", "swhid": str(swhid), } mock_archive.vault_cook.assert_called_with(bundle_type, swhid, email) rv = check_http_get_response(api_client, fetch_url, status_code=200) assert rv["Content-Type"] == content_type assert rv.content == stub_fetch mock_archive.vault_fetch.assert_called_with(bundle_type, swhid) -@given(unknown_directory(), unknown_revision()) def test_api_vault_cook_notfound( api_client, mocker, directory, revision, unknown_directory, unknown_revision ): mock_vault = mocker.patch("swh.web.common.archive.vault") mock_vault.cook.side_effect = NotFoundExc("object not found") mock_vault.fetch.side_effect = NotFoundExc("cooked archive not found") mock_vault.progress.side_effect = NotFoundExc("cooking request not found") for bundle_type, swhid in ( ("flat", f"swh:1:dir:{directory}"), ("gitfast", f"swh:1:rev:{revision}"), ("git_bare", f"swh:1:rev:{revision}"), ): swhid = CoreSWHID.from_string(swhid) url = reverse( f"api-1-vault-cook-{bundle_type.replace('_', '-')}", url_args={"swhid": str(swhid)}, ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert rv.data["reason"] == f"Cooking of {swhid} was never requested." mock_vault.progress.assert_called_with(bundle_type, swhid) for bundle_type, swhid in ( ("flat", f"swh:1:dir:{unknown_directory}"), ("gitfast", f"swh:1:rev:{unknown_revision}"), ("git_bare", f"swh:1:rev:{unknown_revision}"), ): swhid = CoreSWHID.from_string(swhid) url = reverse( f"api-1-vault-cook-{bundle_type.replace('_', '-')}", url_args={"swhid": str(swhid)}, ) rv = check_api_post_responses(api_client, url, data=None, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert rv.data["reason"] == f"{swhid} not found." mock_vault.cook.assert_called_with(bundle_type, swhid, email=None) fetch_url = reverse( f"api-1-vault-fetch-{bundle_type.replace('_', '-')}", url_args={"swhid": str(swhid)}, ) rv = check_api_get_responses(api_client, fetch_url, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert rv.data["reason"] == f"Cooked archive for {swhid} not found." mock_vault.fetch.assert_called_with(bundle_type, swhid) @pytest.mark.parametrize("bundle_type", ["flat", "gitfast", "git_bare"]) def test_api_vault_cook_error_content(api_client, mocker, bundle_type): swhid = "swh:1:cnt:" + "0" * 40 email = "test@test.mail" url = reverse( f"api-1-vault-cook-{bundle_type.replace('_', '-')}", url_args={"swhid": swhid}, query_params={"email": email}, ) rv = check_api_post_responses(api_client, url, data=None, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": ( "Content objects do not need to be cooked, " "use `/api/1/content/raw/` instead." ), } @pytest.mark.parametrize( "bundle_type,swhid_type,hint", [ ("flat", "rev", True), ("flat", "rel", False), ("flat", "snp", False), ("gitfast", "dir", True), ("gitfast", "rel", False), ("gitfast", "snp", False), ("git_bare", "dir", True), ("git_bare", "rel", False), ("git_bare", "snp", False), ], ) def test_api_vault_cook_error(api_client, mocker, bundle_type, swhid_type, hint): swhid = f"swh:1:{swhid_type}:" + "0" * 40 email = "test@test.mail" url = reverse( f"api-1-vault-cook-{bundle_type.replace('_', '-')}", url_args={"swhid": swhid}, query_params={"email": email}, ) rv = check_api_post_responses(api_client, url, data=None, status_code=400) assert rv.data["exception"] == "BadInputExc" if hint: assert re.match( r"Only .* can be cooked as .* bundles\. Use .*", rv.data["reason"] ) else: assert re.match(r"Only .* can be cooked as .* bundles\.", rv.data["reason"]) ##################### # Legacy API: def test_api_vault_cook_legacy(api_client, mocker, directory, revision): mock_archive = mocker.patch("swh.web.api.views.vault.archive") for obj_type, bundle_type, response_obj_type, obj_id in ( ("directory", "flat", "directory", directory), ("revision_gitfast", "gitfast", "revision", revision), ): swhid = CoreSWHID.from_string(f"swh:1:{obj_type[:3]}:{obj_id}") fetch_url = reverse( f"api-1-vault-fetch-{bundle_type}", url_args={"swhid": str(swhid)}, ) stub_cook = { "type": obj_type, "progress_msg": None, "task_id": 1, "task_status": "done", "swhid": swhid, "obj_type": response_obj_type, "obj_id": obj_id, } stub_fetch = b"content" mock_archive.vault_cook.return_value = stub_cook mock_archive.vault_fetch.return_value = stub_fetch email = "test@test.mail" url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, query_params={"email": email}, ) rv = check_api_post_responses(api_client, url, data=None, status_code=200) assert rv.data == { "fetch_url": rv.wsgi_request.build_absolute_uri(fetch_url), "progress_message": None, "id": 1, "status": "done", "swhid": str(swhid), "obj_type": response_obj_type, "obj_id": obj_id, } mock_archive.vault_cook.assert_called_with(bundle_type, swhid, email) rv = check_http_get_response(api_client, fetch_url, status_code=200) assert rv["Content-Type"] == "application/gzip" assert rv.content == stub_fetch mock_archive.vault_fetch.assert_called_with(bundle_type, swhid) def test_api_vault_cook_uppercase_hash_legacy(api_client, directory, revision): for obj_type, obj_id in ( ("directory", directory), ("revision_gitfast", revision), ): url = reverse( f"api-1-vault-cook-{obj_type}-uppercase-checksum", url_args={f"{obj_type[:3]}_id": obj_id.upper()}, ) rv = check_http_post_response( api_client, url, data={"email": "test@test.mail"}, status_code=302 ) redirect_url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id} ) assert rv["location"] == redirect_url fetch_url = reverse( f"api-1-vault-fetch-{obj_type}-uppercase-checksum", url_args={f"{obj_type[:3]}_id": obj_id.upper()}, ) rv = check_http_get_response(api_client, fetch_url, status_code=302) redirect_url = reverse( f"api-1-vault-fetch-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) assert rv["location"] == redirect_url -@given(unknown_directory(), unknown_revision()) def test_api_vault_cook_notfound_legacy( api_client, mocker, directory, revision, unknown_directory, unknown_revision ): mock_vault = mocker.patch("swh.web.common.archive.vault") mock_vault.cook.side_effect = NotFoundExc("object not found") mock_vault.fetch.side_effect = NotFoundExc("cooked archive not found") mock_vault.progress.side_effect = NotFoundExc("cooking request not found") for obj_type, bundle_type, obj_id in ( ("directory", "flat", directory), ("revision_gitfast", "gitfast", revision), ): url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) swhid = CoreSWHID.from_string(f"swh:1:{obj_type[:3]}:{obj_id}") rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert rv.data["reason"] == f"Cooking of {swhid} was never requested." mock_vault.progress.assert_called_with(bundle_type, swhid) for obj_type, bundle_type, obj_id in ( ("directory", "flat", unknown_directory), ("revision_gitfast", "gitfast", unknown_revision), ): swhid = CoreSWHID.from_string(f"swh:1:{obj_type[:3]}:{obj_id}") url = reverse( f"api-1-vault-cook-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id} ) rv = check_api_post_responses(api_client, url, data=None, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert rv.data["reason"] == f"{swhid} not found." mock_vault.cook.assert_called_with(bundle_type, swhid, email=None) fetch_url = reverse( f"api-1-vault-fetch-{obj_type}", url_args={f"{obj_type[:3]}_id": obj_id}, ) # Redirected to the current 'fetch' url rv = check_http_get_response(api_client, fetch_url, status_code=302) redirect_url = reverse( f"api-1-vault-fetch-{bundle_type}", url_args={"swhid": str(swhid)}, ) assert rv["location"] == redirect_url rv = check_api_get_responses(api_client, redirect_url, status_code=404) assert rv.data["exception"] == "NotFoundExc" assert rv.data["reason"] == f"Cooked archive for {swhid} not found." mock_vault.fetch.assert_called_with(bundle_type, swhid) diff --git a/swh/web/tests/browse/views/test_content.py b/swh/web/tests/browse/views/test_content.py index 87222246..60583cb5 100644 --- a/swh/web/tests/browse/views/test_content.py +++ b/swh/web/tests/browse/views/test_content.py @@ -1,627 +1,623 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random -from hypothesis import given - from django.utils.html import escape from swh.model.swhids import ObjectType from swh.web.browse.snapshot_context import process_snapshot_branches from swh.web.browse.utils import ( _re_encode_content, get_mimetype_and_encoding_for_content, prepare_content_for_display, ) from swh.web.common.exc import NotFoundExc from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import gen_path_info, reverse from swh.web.tests.django_asserts import assert_contains, assert_not_contains -from swh.web.tests.strategies import invalid_sha1, unknown_content from swh.web.tests.utils import check_html_get_response, check_http_get_response def test_content_view_text(client, archive_data, content_text): sha1_git = content_text["sha1_git"] url = reverse( "browse-content", url_args={"query_string": content_text["sha1"]}, query_params={"path": content_text["path"]}, ) url_raw = reverse( "browse-content-raw", url_args={"query_string": content_text["sha1"]} ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content_text) mimetype = content_display["mimetype"] if mimetype.startswith("text/"): assert_contains(resp, '' % content_display["language"]) assert_contains(resp, escape(content_display["content_data"])) assert_contains(resp, url_raw) swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_not_contains(resp, "swh-metadata-popover") def test_content_view_no_highlight( client, archive_data, content_application_no_highlight, content_text_no_highlight ): for content_ in (content_application_no_highlight, content_text_no_highlight): content = content_ sha1_git = content["sha1_git"] url = reverse("browse-content", url_args={"query_string": content["sha1"]}) url_raw = reverse( "browse-content-raw", url_args={"query_string": content["sha1"]} ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content) assert_contains(resp, '') assert_contains(resp, escape(content_display["content_data"])) assert_contains(resp, url_raw) swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) def test_content_view_no_utf8_text(client, archive_data, content_text_non_utf8): sha1_git = content_text_non_utf8["sha1_git"] url = reverse( "browse-content", url_args={"query_string": content_text_non_utf8["sha1"]} ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content_text_non_utf8) swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, escape(content_display["content_data"])) def test_content_view_image(client, archive_data, content_image_type): url = reverse( "browse-content", url_args={"query_string": content_image_type["sha1"]} ) url_raw = reverse( "browse-content-raw", url_args={"query_string": content_image_type["sha1"]} ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content_image_type) mimetype = content_display["mimetype"] content_data = content_display["content_data"] assert_contains(resp, '' % (mimetype, content_data)) assert_contains(resp, url_raw) def test_content_view_image_no_rendering( client, archive_data, content_unsupported_image_type_rendering ): url = reverse( "browse-content", url_args={"query_string": content_unsupported_image_type_rendering["sha1"]}, ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) mimetype = content_unsupported_image_type_rendering["mimetype"] encoding = content_unsupported_image_type_rendering["encoding"] assert_contains( resp, ( f"Content with mime type {mimetype} and encoding {encoding} " "cannot be displayed." ), ) def test_content_view_text_with_path(client, archive_data, content_text): path = content_text["path"] url = reverse( "browse-content", url_args={"query_string": content_text["sha1"]}, query_params={"path": path}, ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) assert_contains(resp, '