diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py index 15e6d507..0163ad48 100644 --- a/swh/web/tests/api/test_utils.py +++ b/swh/web/tests/api/test_utils.py @@ -1,615 +1,612 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.web.api import utils from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import resolve_branch_alias, reverse -from swh.web.tests.strategies import origin, release, revision, snapshot +from swh.web.tests.strategies import release, revision, snapshot url_map = [ { "rule": "/other/", "methods": set(["GET", "POST", "HEAD"]), "endpoint": "foo", }, { "rule": "/some/old/url/", "methods": set(["GET", "POST"]), "endpoint": "blablafn", }, { "rule": "/other/old/url/", "methods": set(["GET", "HEAD"]), "endpoint": "bar", }, {"rule": "/other", "methods": set([]), "endpoint": None}, {"rule": "/other2", "methods": set([]), "endpoint": None}, ] def test_filter_field_keys_dict_unknown_keys(): actual_res = utils.filter_field_keys( {"directory": 1, "file": 2, "link": 3}, {"directory1", "file2"} ) assert actual_res == {} def test_filter_field_keys_dict(): actual_res = utils.filter_field_keys( {"directory": 1, "file": 2, "link": 3}, {"directory", "link"} ) assert actual_res == {"directory": 1, "link": 3} def test_filter_field_keys_list_unknown_keys(): actual_res = utils.filter_field_keys( [{"directory": 1, "file": 2, "link": 3}, {"1": 1, "2": 2, "link": 3}], {"d"} ) assert actual_res == [{}, {}] def test_filter_field_keys_map(): actual_res = utils.filter_field_keys( map( lambda x: {"i": x["i"] + 1, "j": x["j"]}, [{"i": 1, "j": None}, {"i": 2, "j": None}, {"i": 3, "j": None}], ), {"i"}, ) assert list(actual_res) == [{"i": 2}, {"i": 3}, {"i": 4}] def test_filter_field_keys_list(): actual_res = utils.filter_field_keys( [{"directory": 1, "file": 2, "link": 3}, {"dir": 1, "fil": 2, "lin": 3}], {"directory", "dir"}, ) assert actual_res == [{"directory": 1}, {"dir": 1}] def test_filter_field_keys_other(): input_set = {1, 2} actual_res = utils.filter_field_keys(input_set, {"a", "1"}) assert actual_res == input_set def test_person_to_string(): assert ( utils.person_to_string({"name": "raboof", "email": "foo@bar"}) == "raboof " ) def test_enrich_release_empty(): actual_release = utils.enrich_release({}) assert actual_release == {} @given(release()) def test_enrich_release_content_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "content" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{release_data["target"]}'}, request=request, ) assert actual_release == release_data @given(release()) def test_enrich_release_directory_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "directory" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": release_data["target"]}, request=request, ) assert actual_release == release_data @given(release()) def test_enrich_release_revision_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "revision" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": release_data["target"]}, request=request ) assert actual_release == release_data @given(release()) def test_enrich_release_release_target(api_request_factory, archive_data, release): release_data = archive_data.release_get(release) release_data["target_type"] = "release" url = reverse("api-1-release", url_args={"sha1_git": release}) request = api_request_factory.get(url) actual_release = utils.enrich_release(release_data, request) release_data["target_url"] = reverse( "api-1-release", url_args={"sha1_git": release_data["target"]}, request=request ) assert actual_release == release_data def test_enrich_directory_entry_no_type(): assert utils.enrich_directory_entry({"id": "dir-id"}) == {"id": "dir-id"} def test_enrich_directory_entry_with_type(api_request_factory, archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_entry = random.choice(dir_content) url = reverse("api-1-directory", url_args={"sha1_git": directory}) request = api_request_factory.get(url) actual_directory = utils.enrich_directory_entry(dir_entry, request) if dir_entry["type"] == "file": dir_entry["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{dir_entry["target"]}'}, request=request, ) elif dir_entry["type"] == "dir": dir_entry["target_url"] = reverse( "api-1-directory", url_args={"sha1_git": dir_entry["target"]}, request=request, ) elif dir_entry["type"] == "rev": dir_entry["target_url"] = reverse( "api-1-revision", url_args={"sha1_git": dir_entry["target"]}, request=request, ) assert actual_directory == dir_entry def test_enrich_content_without_hashes(): assert utils.enrich_content({"id": "123"}) == {"id": "123"} def test_enrich_content_with_hashes(api_request_factory, content): for algo in DEFAULT_ALGORITHMS: content_data = dict(content) query_string = "%s:%s" % (algo, content_data[algo]) url = reverse("api-1-content", url_args={"q": query_string}) request = api_request_factory.get(url) enriched_content = utils.enrich_content( content_data, query_string=query_string, request=request ) content_data["data_url"] = reverse( "api-1-content-raw", url_args={"q": query_string}, request=request ) content_data["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": query_string}, request=request ) content_data["language_url"] = reverse( "api-1-content-language", url_args={"q": query_string}, request=request ) content_data["license_url"] = reverse( "api-1-content-license", url_args={"q": query_string}, request=request ) assert enriched_content == content_data def test_enrich_content_with_hashes_and_top_level_url(api_request_factory, content): for algo in DEFAULT_ALGORITHMS: content_data = dict(content) query_string = "%s:%s" % (algo, content_data[algo]) url = reverse("api-1-content", url_args={"q": query_string}) request = api_request_factory.get(url) enriched_content = utils.enrich_content( content_data, query_string=query_string, top_url=True, request=request ) content_data["content_url"] = reverse( "api-1-content", url_args={"q": query_string}, request=request ) content_data["data_url"] = reverse( "api-1-content-raw", url_args={"q": query_string}, request=request ) content_data["filetype_url"] = reverse( "api-1-content-filetype", url_args={"q": query_string}, request=request ) content_data["language_url"] = reverse( "api-1-content-language", url_args={"q": query_string}, request=request ) content_data["license_url"] = reverse( "api-1-content-license", url_args={"q": query_string}, request=request ) assert enriched_content == content_data @given(revision()) def test_enrich_revision_without_children_or_parent( api_request_factory, archive_data, revision ): revision_data = archive_data.revision_get(revision) del revision_data["parents"] url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_children_and_parent_no_dir( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) del revision_data["directory"] revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_no_context( api_request_factory, revision, parent_revision, child_revision ): revision_data = { "id": revision, "parents": [parent_revision], "children": [child_revision], } url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": parent_revision, "url": reverse( "api-1-revision", url_args={"sha1_git": parent_revision}, request=request, ), } ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_no_message( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) revision_data["message"] = None revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(revision(), revision(), revision()) def test_enrich_revision_with_invalid_message( api_request_factory, archive_data, revision, parent_revision, child_revision ): revision_data = archive_data.revision_get(revision) revision_data["decoding_failures"] = ["message"] revision_data["parents"] = revision_data["parents"] + (parent_revision,) revision_data["children"] = child_revision url = reverse("api-1-revision", url_args={"sha1_git": revision}) request = api_request_factory.get(url) actual_revision = utils.enrich_revision(revision_data, request) revision_data["url"] = reverse( "api-1-revision", url_args={"sha1_git": revision}, request=request ) revision_data["message_url"] = reverse( "api-1-revision-raw-message", url_args={"sha1_git": revision}, request=request ) revision_data["directory_url"] = reverse( "api-1-directory", url_args={"sha1_git": revision_data["directory"]}, request=request, ) revision_data["history_url"] = reverse( "api-1-revision-log", url_args={"sha1_git": revision}, request=request ) revision_data["parents"] = tuple( { "id": p["id"], "url": reverse( "api-1-revision", url_args={"sha1_git": p["id"]}, request=request ), } for p in revision_data["parents"] ) revision_data["children_urls"] = [ reverse( "api-1-revision", url_args={"sha1_git": child_revision}, request=request ) ] assert actual_revision == revision_data @given(snapshot()) def test_enrich_snapshot(api_request_factory, archive_data, snapshot): snapshot_data = archive_data.snapshot_get(snapshot) url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) request = api_request_factory.get(url) actual_snapshot = utils.enrich_snapshot(snapshot_data, request) for _, b in snapshot_data["branches"].items(): if b["target_type"] in ("directory", "revision", "release"): b["target_url"] = reverse( f'api-1-{b["target_type"]}', url_args={"sha1_git": b["target"]}, request=request, ) elif b["target_type"] == "content": b["target_url"] = reverse( "api-1-content", url_args={"q": f'sha1_git:{b["target"]}'}, request=request, ) for _, b in snapshot_data["branches"].items(): if b["target_type"] == "alias": target = resolve_branch_alias(snapshot_data, b) b["target_url"] = target["target_url"] assert actual_snapshot == snapshot_data -@given(origin()) def test_enrich_origin(api_request_factory, origin): url = reverse("api-1-origin", url_args={"origin_url": origin["url"]}) request = api_request_factory.get(url) origin_data = {"url": origin["url"]} actual_origin = utils.enrich_origin(origin_data, request) origin_data["origin_visits_url"] = reverse( "api-1-origin-visits", url_args={"origin_url": origin["url"]}, request=request ) assert actual_origin == origin_data -@given(origin()) def test_enrich_origin_search_result(api_request_factory, origin): url = reverse("api-1-origin-search", url_args={"url_pattern": origin["url"]}) request = api_request_factory.get(url) origin_visits_url = reverse( "api-1-origin-visits", url_args={"origin_url": origin["url"]}, request=request ) origin_search_result_data = ( [{"url": origin["url"]}], None, ) enriched_origin_search_result = ( [{"url": origin["url"], "origin_visits_url": origin_visits_url}], None, ) assert ( utils.enrich_origin_search_result(origin_search_result_data, request=request) == enriched_origin_search_result ) -@given(origin()) def test_enrich_origin_visit(api_request_factory, origin): origin_visit = random.choice(get_origin_visits(origin)) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": origin_visit["visit"]}, ) request = api_request_factory.get(url) actual_origin_visit = utils.enrich_origin_visit( origin_visit, with_origin_link=True, with_origin_visit_link=True, request=request, ) origin_visit["origin_url"] = reverse( "api-1-origin", url_args={"origin_url": origin["url"]}, request=request ) origin_visit["origin_visit_url"] = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": origin_visit["visit"]}, request=request, ) origin_visit["snapshot_url"] = reverse( "api-1-snapshot", url_args={"snapshot_id": origin_visit["snapshot"]}, request=request, ) assert actual_origin_visit == origin_visit diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py index 63d7a25c..81bcd290 100644 --- a/swh/web/tests/api/views/test_graph.py +++ b/swh/web/tests/api/views/test_graph.py @@ -1,261 +1,257 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import json import textwrap -from hypothesis import given - from django.http.response import StreamingHttpResponse from swh.model.hashutil import hash_to_bytes from swh.model.swhids import ExtendedObjectType, ExtendedSWHID from swh.web.api.views.graph import API_GRAPH_PERM from swh.web.common.utils import reverse from swh.web.config import SWH_WEB_INTERNAL_SERVER_NAME, get_config -from swh.web.tests.strategies import origin from swh.web.tests.utils import check_http_get_response def test_graph_endpoint_no_authentication_for_vpn_users(api_client, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) check_http_get_response( api_client, url, status_code=200, server_name=SWH_WEB_INTERNAL_SERVER_NAME ) def test_graph_endpoint_needs_authentication(api_client): url = reverse("api-1-graph", url_args={"graph_query": "stats"}) check_http_get_response(api_client, url, status_code=401) def _authenticate_graph_user(api_client, keycloak_oidc): keycloak_oidc.client_permissions = [API_GRAPH_PERM] oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") def test_graph_endpoint_needs_permission(api_client, keycloak_oidc, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") check_http_get_response(api_client, url, status_code=403) _authenticate_graph_user(api_client, keycloak_oidc) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) check_http_get_response(api_client, url, status_code=200) def test_graph_text_plain_response(api_client, keycloak_oidc, requests_mock): _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323" response_text = textwrap.dedent( """\ swh:1:cnt:1d3dace0a825b0535c37c53ed669ef817e9c1b47 swh:1:cnt:6d5b280f4e33589ae967a7912a587dd5cb8dedaa swh:1:cnt:91bef238bf01356a550d416d14bb464c576ac6f4 swh:1:cnt:58a8b925a463b87d49639fda282b8f836546e396 swh:1:cnt:fd32ee0a87e16ccc853dfbeb7018674f9ce008c0 swh:1:cnt:ab7c39871872589a4fc9e249ebc927fb1042c90d swh:1:cnt:93073c02bf3869845977527de16af4d54765838d swh:1:cnt:4251f795b52c54c447a97c9fe904d8b1f993b1e0 swh:1:cnt:c6e7055424332006d07876ffeba684e7e284b383 swh:1:cnt:8459d8867dc3b15ef7ae9683e21cccc9ab2ec887 swh:1:cnt:5f9981d52202815aa947f85b9dfa191b66f51138 swh:1:cnt:00a685ec51bcdf398c15d588ecdedb611dbbab4b swh:1:cnt:e1cf1ea335106a0197a2f92f7804046425a7d3eb swh:1:cnt:07069b38087f88ec192d2c9aff75a502476fd17d swh:1:cnt:f045ee845c7f14d903a2c035b2691a7c400c01f0 """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": "text/plain", "Transfer-Encoding": "chunked"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response( api_client, url, status_code=200, content_type="text/plain" ) assert isinstance(resp, StreamingHttpResponse) assert b"".join(resp.streaming_content) == response_text.encode() _response_json = { "counts": {"nodes": 17075708289, "edges": 196236587976}, "ratios": { "compression": 0.16, "bits_per_node": 58.828, "bits_per_edge": 5.119, "avg_locality": 2184278529.729, }, "indegree": {"min": 0, "max": 263180117, "avg": 11.4921492364925}, "outdegree": {"min": 0, "max": 1033207, "avg": 11.4921492364925}, } def test_graph_json_response(api_client, keycloak_oidc, requests_mock): _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() def test_graph_ndjson_response(api_client, keycloak_oidc, requests_mock): _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb" response_ndjson = textwrap.dedent( """\ ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:acfb7cabd63b368a03a9df87670ece1488c8bce0"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:2a0837708151d76edf28fdbb90dc3eabc676cff3"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:eaf025ad54b94b2fdda26af75594cfae3491ec75"] """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_ndjson, headers={ "Content-Type": "application/x-ndjson", "Transfer-Encoding": "chunked", }, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == "application/x-ndjson" assert b"".join(resp.streaming_content) == response_ndjson.encode() -@given(origin()) def test_graph_response_resolve_origins( archive_data, api_client, keycloak_oidc, requests_mock, origin ): hasher = hashlib.sha1() hasher.update(origin["url"].encode()) origin_sha1 = hasher.digest() origin_swhid = str( ExtendedSWHID(object_type=ExtendedObjectType.ORIGIN, object_id=origin_sha1) ) snapshot = archive_data.snapshot_get_latest(origin["url"])["id"] snapshot_swhid = str( ExtendedSWHID( object_type=ExtendedObjectType.SNAPSHOT, object_id=hash_to_bytes(snapshot) ) ) _authenticate_graph_user(api_client, keycloak_oidc) for graph_query, response_text, content_type in ( ( f"visit/nodes/{snapshot_swhid}", f"{snapshot_swhid}\n{origin_swhid}\n", "text/plain", ), ( f"visit/edges/{snapshot_swhid}", f"{snapshot_swhid} {origin_swhid}\n", "text/plain", ), ( f"visit/paths/{snapshot_swhid}", f'["{snapshot_swhid}", "{origin_swhid}"]\n', "application/x-ndjson", ), ): # set two lines response to check resolved origins cache response_text = response_text + response_text requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": content_type, "Transfer-Encoding": "chunked"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == content_type assert b"".join(resp.streaming_content) == response_text.encode() url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward", "resolve_origins": "true"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == content_type assert ( b"".join(resp.streaming_content) == response_text.replace(origin_swhid, origin["url"]).encode() ) def test_graph_response_resolve_origins_nothing_to_do( api_client, keycloak_oidc, requests_mock ): _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"resolve_origins": "true"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py index 8be60ce8..79c3b7f3 100644 --- a/swh/web/tests/api/views/test_identifiers.py +++ b/swh/web/tests/api/views/test_identifiers.py @@ -1,185 +1,184 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.model.swhids import ObjectType from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( - origin, release, revision, snapshot, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ) from swh.web.tests.utils import check_api_get_responses, check_api_post_responses -@given(origin(), release(), revision(), snapshot()) +@given(release(), revision(), snapshot()) def test_swhid_resolve_success( api_client, client, content, directory, origin, release, revision, snapshot ): for obj_type, obj_id in ( (ObjectType.CONTENT, content["sha1_git"]), (ObjectType.DIRECTORY, directory), (ObjectType.RELEASE, release), (ObjectType.REVISION, revision), (ObjectType.SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id, metadata={"origin": origin["url"]}) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) resp = check_api_get_responses(api_client, url, status_code=200) if obj_type == ObjectType.CONTENT: url_args = {"query_string": "sha1_git:%s" % obj_id} elif obj_type == ObjectType.SNAPSHOT: url_args = {"snapshot_id": obj_id} else: url_args = {"sha1_git": obj_id} obj_type_str = obj_type.name.lower() browse_rev_url = reverse( f"browse-{obj_type_str}", url_args=url_args, query_params={"origin_url": origin["url"]}, request=resp.wsgi_request, ) expected_result = { "browse_url": browse_rev_url, "metadata": {"origin": origin["url"]}, "namespace": "swh", "object_id": obj_id, "object_type": obj_type_str, "scheme_version": 1, } assert resp.data == expected_result def test_swhid_resolve_invalid(api_client): rev_id_invalid = "96db9023b8_foo_50d6c108e9a3" swhid = "swh:1:rev:%s" % rev_id_invalid url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) check_api_get_responses(api_client, url, status_code=400) @given( unknown_content(), unknown_directory(), unknown_release(), unknown_revision(), unknown_snapshot(), ) def test_swhid_resolve_not_found( api_client, unknown_content, unknown_directory, unknown_release, unknown_revision, unknown_snapshot, ): for obj_type, obj_id in ( (ObjectType.CONTENT, unknown_content["sha1_git"]), (ObjectType.DIRECTORY, unknown_directory), (ObjectType.RELEASE, unknown_release), (ObjectType.REVISION, unknown_revision), (ObjectType.SNAPSHOT, unknown_snapshot), ): swhid = gen_swhid(obj_type, obj_id) url = reverse("api-1-resolve-swhid", url_args={"swhid": swhid}) check_api_get_responses(api_client, url, status_code=404) def test_swh_origin_id_not_resolvable(api_client): ori_swhid = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb" url = reverse("api-1-resolve-swhid", url_args={"swhid": ori_swhid}) check_api_get_responses(api_client, url, status_code=400) @given(release(), revision(), snapshot()) def test_api_known_swhid_all_present( api_client, content, directory, release, revision, snapshot ): input_swhids = [ gen_swhid(ObjectType.CONTENT, content["sha1_git"]), gen_swhid(ObjectType.DIRECTORY, directory), gen_swhid(ObjectType.REVISION, revision), gen_swhid(ObjectType.RELEASE, release), gen_swhid(ObjectType.SNAPSHOT, snapshot), ] url = reverse("api-1-known") resp = check_api_post_responses(api_client, url, data=input_swhids, status_code=200) assert resp.data == {swhid: {"known": True} for swhid in input_swhids} def test_api_known_swhid_some_present(api_client, content, directory): content_ = gen_swhid(ObjectType.CONTENT, content["sha1_git"]) directory_ = gen_swhid(ObjectType.DIRECTORY, directory) unknown_revision_ = gen_swhid(ObjectType.REVISION, random_sha1()) unknown_release_ = gen_swhid(ObjectType.RELEASE, random_sha1()) unknown_snapshot_ = gen_swhid(ObjectType.SNAPSHOT, random_sha1()) input_swhids = [ content_, directory_, unknown_revision_, unknown_release_, unknown_snapshot_, ] url = reverse("api-1-known") resp = check_api_post_responses(api_client, url, data=input_swhids, status_code=200) assert resp.data == { content_: {"known": True}, directory_: {"known": True}, unknown_revision_: {"known": False}, unknown_release_: {"known": False}, unknown_snapshot_: {"known": False}, } def test_api_known_invalid_swhid(api_client): invalid_swhid_sha1 = ["swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13de;"] invalid_swhid_type = ["swh:1:cnn:8068d0075010b590762c6cb5682ed53cb3c13deb"] url = reverse("api-1-known") check_api_post_responses(api_client, url, data=invalid_swhid_sha1, status_code=400) check_api_post_responses(api_client, url, data=invalid_swhid_type, status_code=400) def test_api_known_raises_large_payload_error(api_client): random_swhid = "swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13deb" limit = 10000 err_msg = "The maximum number of SWHIDs this endpoint can receive is 1000" swhids = [random_swhid for i in range(limit)] url = reverse("api-1-known") resp = check_api_post_responses(api_client, url, data=swhids, status_code=413) assert resp.data == {"exception": "LargePayloadExc", "reason": err_msg} diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 90f2424b..eca8448f 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,740 +1,737 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import json from hypothesis import given import pytest from swh.indexer.storage.model import OriginIntrinsicMetadataRow from swh.model.hashutil import hash_to_bytes from swh.model.model import Origin, OriginVisit, OriginVisitStatus from swh.search.interface import PagedResult from swh.storage.exc import StorageAPIError, StorageDBError from swh.storage.utils import now from swh.web.api.utils import enrich_origin, enrich_origin_visit from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse from swh.web.tests.api.views.utils import scroll_results from swh.web.tests.data import ( INDEXER_TOOL, ORIGIN_MASTER_REVISION, ORIGIN_METADATA_KEY, ORIGIN_METADATA_VALUE, ) -from swh.web.tests.strategies import new_origin, new_snapshots, origin, visit_dates +from swh.web.tests.strategies import new_origin, new_snapshots, visit_dates from swh.web.tests.utils import check_api_get_responses def test_api_lookup_origin_visits_raise_error(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "voluntary error to check the bad request middleware." mock_get_origin_visits.side_effect = BadInputExc(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == {"exception": "BadInputExc", "reason": err_msg} def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage exploded! Will be back online shortly!" mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageDBError", "reason": "An unexpected error occurred in the backend: %s" % err_msg, } def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client, mocker): mock_get_origin_visits = mocker.patch("swh.web.api.views.origin.get_origin_visits") err_msg = "Storage API dropped dead! Will resurrect asap!" mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse("api-1-origin-visits", url_args={"origin_url": "http://foo"}) rv = check_api_get_responses(api_client, url, status_code=503) assert rv.data == { "exception": "StorageAPIError", "reason": "An unexpected error occurred in the api backend: %s" % err_msg, } @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=now(), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) all_visits = list(reversed(get_origin_visits(new_origin.to_dict()))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]["visit"], all_visits[2:4]), ): url = reverse( "api-1-origin-visits", url_args={"origin_url": new_origin.url}, query_params={"per_page": 2, "last_visit": last_visit}, ) rv = check_api_get_responses(api_client, url, status_code=200) for i in range(len(expected_visits)): expected_visits[i] = enrich_origin_visit( expected_visits[i], with_origin_link=False, with_origin_visit_link=True, request=rv.wsgi_request, ) assert rv.data == expected_visits @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_id = origin_visit.visit archive_data.snapshot_add([new_snapshots[i]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=origin_visit.visit, date=visit_date + timedelta(minutes=5), status="full", snapshot=new_snapshots[i].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit", url_args={"origin_url": new_origin.url, "visit_id": visit_id}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_get_by(new_origin.url, visit_id) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin()) def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data, new_origin): archive_data.origin_add([new_origin]) url = reverse("api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "No visit for origin %s found" % new_origin.url, } @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_ids[0], date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit-latest", url_args={"origin_url": new_origin.url} ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_status_get_latest( new_origin.url, type="git" ) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit @given(new_origin(), visit_dates(2), new_snapshots(1)) def test_api_lookup_origin_visit_latest_with_snapshot( api_client, subtest, new_origin, visit_dates, new_snapshots ): # ensure archive_data fixture will be reset between each hypothesis # example test run @subtest def test_inner(archive_data): archive_data.origin_add([new_origin]) visit_dates.sort() visit_ids = [] for i, visit_date in enumerate(visit_dates): origin_visit = archive_data.origin_visit_add( [OriginVisit(origin=new_origin.url, date=visit_date, type="git",)] )[0] visit_ids.append(origin_visit.visit) archive_data.snapshot_add([new_snapshots[0]]) # Add snapshot to the latest visit visit_id = visit_ids[-1] visit_status = OriginVisitStatus( origin=new_origin.url, visit=visit_id, date=now(), status="full", snapshot=new_snapshots[0].id, ) archive_data.origin_visit_status_add([visit_status]) url = reverse( "api-1-origin-visit-latest", url_args={"origin_url": new_origin.url}, query_params={"require_snapshot": True}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_visit = archive_data.origin_visit_status_get_latest( new_origin.url, type="git", require_snapshot=True ) expected_visit = enrich_origin_visit( expected_visit, with_origin_link=True, with_origin_visit_link=False, request=rv.wsgi_request, ) assert rv.data == expected_visit -@given(origin()) def test_api_lookup_origin_visit_not_found(api_client, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v["visit"] for v in all_visits]) url = reverse( "api-1-origin-visit", url_args={"origin_url": origin["url"], "visit_id": max_visit_id + 1}, ) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin %s or its visit with id %s not found!" % (origin["url"], max_visit_id + 1), } def test_api_origins_wrong_input(api_client, archive_data): """Should fail with 400 if the input is deprecated. """ # fail if wrong input url = reverse("api-1-origins", query_params={"origin_from": 1}) rv = check_api_get_responses(api_client, url, status_code=400) assert rv.data == { "exception": "BadInputExc", "reason": "Please use the Link header to browse through result", } def test_api_origins(api_client, archive_data): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} # Get only one url = reverse("api-1-origins", query_params={"origin_count": 1}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= origin_urls # Get all url = reverse("api-1-origins", query_params={"origin_count": len(origins)}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls # Get "all + 10" url = reverse("api-1-origins", query_params={"origin_count": len(origins) + 10}) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(origins) assert {origin["url"] for origin in rv.data} == origin_urls @pytest.mark.parametrize("origin_count", [1, 2, 10, 100]) def test_api_origins_scroll(api_client, archive_data, origin_count): page_result = archive_data.origin_list(limit=10000) origins = page_result.results origin_urls = {origin.url for origin in origins} url = reverse("api-1-origins", query_params={"origin_count": origin_count}) results = scroll_results(api_client, url) assert len(results) == len(origins) assert {origin["url"] for origin in results} == origin_urls -@given(origin()) def test_api_origin_by_url(api_client, archive_data, origin): origin_url = origin["url"] url = reverse("api-1-origin", url_args={"origin_url": origin_url}) rv = check_api_get_responses(api_client, url, status_code=200) expected_origin = archive_data.origin_get([origin_url])[0] expected_origin = enrich_origin(expected_origin, rv.wsgi_request) assert rv.data == expected_origin @given(new_origin()) def test_api_origin_not_found(api_client, new_origin): url = reverse("api-1-origin", url_args={"origin_url": new_origin.url}) rv = check_api_get_responses(api_client, url, status_code=404) assert rv.data == { "exception": "NotFoundExc", "reason": "Origin with url %s not found!" % new_origin.url, } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } # Search for 'github.com', get only one url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 1}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} <= expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] # Search for 'github.com', get all url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] # Search for 'github.com', get more than available url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins assert rv.data == [ enrich_origin({"url": origin["url"]}, request=rv.wsgi_request) for origin in rv.data ] @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_words(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "com github"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "memononen libtess2"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } url = reverse( "api-1-origin-search", url_args={"url_pattern": "libtess2 memononen"}, query_params={"limit": 2}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1 assert {origin["url"] for origin in rv.data} == { "https://github.com/memononen/libtess2" } @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_visit_type(api_client, mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com",}, query_params={"visit_type": "git"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins url = reverse( "api-1-origin-search", url_args={"url_pattern": "github com",}, query_params={"visit_type": "foo"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == [] def test_api_origin_search_use_ql(api_client, mocker): expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } ORIGINS = [{"url": origin} for origin in expected_origins] mock_archive_search = mocker.patch("swh.web.common.archive.search") mock_archive_search.origin_search.return_value = PagedResult( results=ORIGINS, next_page_token=None, ) query = "origin = 'github.com'" url = reverse( "api-1-origin-search", url_args={"url_pattern": query}, query_params={"visit_type": "git", "use_ql": "true"}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert {origin["url"] for origin in rv.data} == expected_origins mock_archive_search.origin_search.assert_called_with( query=query, page_token=None, with_visit=False, visit_types=["git"], limit=70 ) @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) @pytest.mark.parametrize("limit", [1, 2, 3, 10]) def test_api_origin_search_scroll(api_client, archive_data, mocker, limit, backend): if backend != "swh-search": # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) expected_origins = { "https://github.com/wcoder/highlightjs-line-numbers.js", "https://github.com/memononen/libtess2", } url = reverse( "api-1-origin-search", url_args={"url_pattern": "github.com"}, query_params={"limit": limit}, ) results = scroll_results(api_client, url) assert {origin["url"] for origin in results} == expected_origins @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_api_origin_search_limit(api_client, archive_data, tests_data, mocker, backend): if backend == "swh-search": tests_data["search"].origin_update( [{"url": "http://foobar/{}".format(i)} for i in range(2000)] ) else: # equivalent to not configuring search in the config mocker.patch("swh.web.common.archive.search", None) archive_data.origin_add( [Origin(url="http://foobar/{}".format(i)) for i in range(2000)] ) url = reverse( "api-1-origin-search", url_args={"url_pattern": "foobar"}, query_params={"limit": 1050}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == 1000 @pytest.mark.parametrize("backend", ["swh-search", "swh-indexer-storage"]) def test_api_origin_metadata_search(api_client, mocker, backend): mock_config = mocker.patch("swh.web.common.archive.config") mock_config.get_config.return_value = { "search_config": {"metadata_backend": backend} } url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE} ) rv = check_api_get_responses(api_client, url, status_code=200) rv.data = sorted(rv.data, key=lambda d: d["url"]) expected_data = sorted( [ { "url": origin_url, "metadata": { "from_revision": ORIGIN_MASTER_REVISION[origin_url], "tool": { "name": INDEXER_TOOL["tool_name"], "version": INDEXER_TOOL["tool_version"], "configuration": INDEXER_TOOL["tool_configuration"], "id": INDEXER_TOOL["id"], }, "mappings": [], }, } for origin_url in sorted(ORIGIN_MASTER_REVISION.keys()) ], key=lambda d: d["url"], ) for i in range(len(expected_data)): expected = expected_data[i] response = rv.data[i] metadata = response["metadata"].pop("metadata") assert any( [ORIGIN_METADATA_VALUE in json.dumps(val) for val in metadata.values()] ) assert response == expected def test_api_origin_metadata_search_limit(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext oimsft.side_effect = lambda conjunction, limit: [ OriginIntrinsicMetadataRow( id=origin_url, from_revision=hash_to_bytes(master_rev), indexer_configuration_id=INDEXER_TOOL["id"], metadata={ORIGIN_METADATA_KEY: ORIGIN_METADATA_VALUE}, mappings=[], ) for origin_url, master_rev in ORIGIN_MASTER_REVISION.items() ] url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE} ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=70) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE, "limit": 10}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=10) url = reverse( "api-1-origin-metadata-search", query_params={"fulltext": ORIGIN_METADATA_VALUE, "limit": 987}, ) rv = check_api_get_responses(api_client, url, status_code=200) assert len(rv.data) == len(ORIGIN_MASTER_REVISION) oimsft.assert_called_with(conjunction=[ORIGIN_METADATA_VALUE], limit=100) -@given(origin()) def test_api_origin_intrinsic_metadata(api_client, origin): url = reverse( "api-origin-intrinsic-metadata", url_args={"origin_url": origin["url"]} ) rv = check_api_get_responses(api_client, url, status_code=200) assert ORIGIN_METADATA_KEY in rv.data assert rv.data[ORIGIN_METADATA_KEY] == ORIGIN_METADATA_VALUE def test_api_origin_metadata_search_invalid(api_client, mocker): mock_idx_storage = mocker.patch("swh.web.common.archive.idx_storage") url = reverse("api-1-origin-metadata-search") check_api_get_responses(api_client, url, status_code=400) mock_idx_storage.assert_not_called() @pytest.mark.parametrize("backend", ["swh-counters", "swh-storage"]) def test_api_stat_counters(api_client, mocker, backend): mock_config = mocker.patch("swh.web.common.archive.config") mock_config.get_config.return_value = {"counters_backend": backend} url = reverse("api-1-stat-counters") rv = check_api_get_responses(api_client, url, status_code=200) counts = json.loads(rv.content) for obj in ["content", "origin", "release", "directory", "revision"]: assert counts.get(obj, 0) > 0 diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py index 6280dac2..0107a1c0 100644 --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -1,168 +1,163 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.hashutil import hash_to_hex from swh.model.model import Snapshot from swh.web.api.utils import enrich_snapshot from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 -from swh.web.tests.strategies import ( - new_snapshot, - origin_with_pull_request_branches, - snapshot, -) +from swh.web.tests.strategies import new_snapshot, snapshot from swh.web.tests.utils import check_api_get_responses, check_http_get_response @given(snapshot()) def test_api_snapshot(api_client, archive_data, snapshot): url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = {**archive_data.snapshot_get(snapshot), "next_branch": None} expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.data == expected_data @given(snapshot()) def test_api_snapshot_paginated(api_client, archive_data, snapshot): branches_offset = 0 branches_count = 2 snapshot_branches = [] for k, v in sorted(archive_data.snapshot_get(snapshot)["branches"].items()): snapshot_branches.append( {"name": k, "target_type": v["target_type"], "target": v["target"]} ) whole_snapshot = {"id": snapshot, "branches": {}, "next_branch": None} while branches_offset < len(snapshot_branches): branches_from = snapshot_branches[branches_offset]["name"] url = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={ "branches_from": branches_from, "branches_count": branches_count, }, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.snapshot_get_branches( snapshot, branches_from, branches_count ) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) branches_offset += branches_count if branches_offset < len(snapshot_branches): next_branch = snapshot_branches[branches_offset]["name"] expected_data["next_branch"] = next_branch else: expected_data["next_branch"] = None assert rv.data == expected_data whole_snapshot["branches"].update(expected_data["branches"]) if branches_offset < len(snapshot_branches): next_url = rv.wsgi_request.build_absolute_uri( reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={ "branches_from": next_branch, "branches_count": branches_count, }, ) ) assert rv["Link"] == '<%s>; rel="next"' % next_url else: assert not rv.has_header("Link") url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot}) rv = check_api_get_responses(api_client, url, status_code=200) assert rv.data == whole_snapshot @given(snapshot()) def test_api_snapshot_filtered(api_client, archive_data, snapshot): snapshot_branches = [] for k, v in sorted(archive_data.snapshot_get(snapshot)["branches"].items()): snapshot_branches.append( {"name": k, "target_type": v["target_type"], "target": v["target"]} ) target_type = random.choice(snapshot_branches)["target_type"] url = reverse( "api-1-snapshot", url_args={"snapshot_id": snapshot}, query_params={"target_types": target_type}, ) rv = check_api_get_responses(api_client, url, status_code=200) expected_data = archive_data.snapshot_get_branches( snapshot, target_types=target_type ) expected_data = enrich_snapshot(expected_data, rv.wsgi_request) assert rv.data == expected_data def test_api_snapshot_errors(api_client): unknown_snapshot_ = random_sha1() url = reverse("api-1-snapshot", url_args={"snapshot_id": "63ce369"}) check_api_get_responses(api_client, url, status_code=400) url = reverse("api-1-snapshot", url_args={"snapshot_id": unknown_snapshot_}) check_api_get_responses(api_client, url, status_code=404) @given(snapshot()) def test_api_snapshot_uppercase(api_client, snapshot): url = reverse( "api-1-snapshot-uppercase-checksum", url_args={"snapshot_id": snapshot.upper()} ) resp = check_http_get_response(api_client, url, status_code=302) redirect_url = reverse( "api-1-snapshot-uppercase-checksum", url_args={"snapshot_id": snapshot} ) assert resp["location"] == redirect_url @given(new_snapshot(min_size=4)) def test_api_snapshot_null_branch(api_client, archive_data, new_snapshot): snp_dict = new_snapshot.to_dict() snp_id = hash_to_hex(snp_dict["id"]) for branch in snp_dict["branches"].keys(): snp_dict["branches"][branch] = None break archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) url = reverse("api-1-snapshot", url_args={"snapshot_id": snp_id}) check_api_get_responses(api_client, url, status_code=200) -@given(origin_with_pull_request_branches()) def test_api_snapshot_no_pull_request_branches_filtering( - api_client, archive_data, origin + api_client, archive_data, origin_with_pull_request_branches ): """Pull request branches should not be filtered out when querying a snapshot with the Web API.""" - snapshot = archive_data.snapshot_get_latest(origin.url) + snapshot = archive_data.snapshot_get_latest(origin_with_pull_request_branches.url) url = reverse("api-1-snapshot", url_args={"snapshot_id": snapshot["id"]}) resp = check_api_get_responses(api_client, url, status_code=200) assert any([b.startswith("refs/pull/") for b in resp.data["branches"]]) diff --git a/swh/web/tests/browse/test_snapshot_context.py b/swh/web/tests/browse/test_snapshot_context.py index 6a5b60cd..629e7d06 100644 --- a/swh/web/tests/browse/test_snapshot_context.py +++ b/swh/web/tests/browse/test_snapshot_context.py @@ -1,415 +1,410 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.model.swhids import ObjectType from swh.web.browse.snapshot_context import ( _get_release, get_origin_visit_snapshot, get_snapshot_content, get_snapshot_context, ) from swh.web.browse.utils import gen_revision_url from swh.web.common.identifiers import gen_swhid from swh.web.common.origin_visits import get_origin_visit, get_origin_visits from swh.web.common.typing import ( SnapshotBranchInfo, SnapshotContext, SnapshotReleaseInfo, ) from swh.web.common.utils import format_utc_iso_date, reverse -from swh.web.tests.strategies import ( - origin_with_multiple_visits, - origin_with_releases, - snapshot, -) +from swh.web.tests.strategies import snapshot -@given(origin_with_multiple_visits()) -def test_get_origin_visit_snapshot_simple(archive_data, origin): - visits = archive_data.origin_visit_get(origin["url"]) +def test_get_origin_visit_snapshot_simple(archive_data, origin_with_multiple_visits): + visits = archive_data.origin_visit_get(origin_with_multiple_visits["url"]) for visit in visits: snapshot = archive_data.snapshot_get(visit["snapshot"]) branches = [] releases = [] def _process_branch_data(branch, branch_data, alias=False): if branch_data["target_type"] == "revision": rev_data = archive_data.revision_get(branch_data["target"]) branches.append( SnapshotBranchInfo( name=branch, alias=alias, revision=branch_data["target"], directory=rev_data["directory"], date=format_utc_iso_date(rev_data["date"]), message=rev_data["message"], url=None, ) ) elif branch_data["target_type"] == "release": rel_data = archive_data.release_get(branch_data["target"]) rev_data = archive_data.revision_get(rel_data["target"]) releases.append( SnapshotReleaseInfo( name=rel_data["name"], alias=alias, branch_name=branch, date=format_utc_iso_date(rel_data["date"]), id=rel_data["id"], message=rel_data["message"], target_type=rel_data["target_type"], target=rel_data["target"], directory=rev_data["directory"], url=None, ) ) aliases = {} for branch in sorted(snapshot["branches"].keys()): branch_data = snapshot["branches"][branch] if branch_data["target_type"] == "alias": target_data = snapshot["branches"][branch_data["target"]] aliases[branch] = target_data _process_branch_data(branch, target_data, alias=True) else: _process_branch_data(branch, branch_data) assert branches and releases, "Incomplete test data." origin_visit_branches = get_origin_visit_snapshot( - origin, visit_id=visit["visit"] + origin_with_multiple_visits, visit_id=visit["visit"] ) assert origin_visit_branches == (branches, releases, aliases) @given(snapshot()) def test_get_snapshot_context_no_origin(archive_data, snapshot): for browse_context, kwargs in ( ("content", {"snapshot_id": snapshot, "path": "/some/path"}), ("directory", {"snapshot_id": snapshot}), ("log", {"snapshot_id": snapshot}), ): url_args = {"snapshot_id": snapshot} query_params = dict(kwargs) query_params.pop("snapshot_id") snapshot_context = get_snapshot_context(**kwargs, browse_context=browse_context) branches, releases, _ = get_snapshot_content(snapshot) releases = list(reversed(releases)) revision_id = None root_directory = None for branch in branches: if branch["name"] == "HEAD": revision_id = branch["revision"] root_directory = branch["directory"] branch["url"] = reverse( f"browse-snapshot-{browse_context}", url_args=url_args, query_params={"branch": branch["name"], **query_params}, ) for release in releases: release["url"] = reverse( f"browse-snapshot-{browse_context}", url_args=url_args, query_params={"release": release["name"], **query_params}, ) branches_url = reverse("browse-snapshot-branches", url_args=url_args) releases_url = reverse("browse-snapshot-releases", url_args=url_args) directory_url = reverse("browse-snapshot-directory", url_args=url_args) is_empty = not branches and not releases snapshot_swhid = gen_swhid(ObjectType.SNAPSHOT, snapshot) snapshot_sizes = archive_data.snapshot_count_branches(snapshot) expected = SnapshotContext( branch="HEAD", branch_alias=True, branches=branches, branches_url=branches_url, is_empty=is_empty, origin_info=None, origin_visits_url=None, release=None, release_alias=False, release_id=None, query_params=query_params, releases=releases, releases_url=releases_url, revision_id=revision_id, revision_info=_get_revision_info(archive_data, revision_id), root_directory=root_directory, snapshot_id=snapshot, snapshot_sizes=snapshot_sizes, snapshot_swhid=snapshot_swhid, url_args=url_args, visit_info=None, directory_url=directory_url, ) if revision_id: expected["revision_info"]["revision_url"] = gen_revision_url( revision_id, snapshot_context ) assert snapshot_context == expected _check_branch_release_revision_parameters( archive_data, expected, browse_context, kwargs, branches, releases ) -@given(origin_with_multiple_visits()) -def test_get_snapshot_context_with_origin(archive_data, origin): +def test_get_snapshot_context_with_origin(archive_data, origin_with_multiple_visits): - origin_visits = get_origin_visits(origin) + origin_visits = get_origin_visits(origin_with_multiple_visits) timestamp = format_utc_iso_date(origin_visits[0]["date"], "%Y-%m-%dT%H:%M:%SZ") visit_id = origin_visits[1]["visit"] + origin_url = origin_with_multiple_visits["url"] + for browse_context, kwargs in ( - ("content", {"origin_url": origin["url"], "path": "/some/path"}), - ("directory", {"origin_url": origin["url"]}), - ("log", {"origin_url": origin["url"]}), - ("directory", {"origin_url": origin["url"], "timestamp": timestamp,},), - ("directory", {"origin_url": origin["url"], "visit_id": visit_id,},), + ("content", {"origin_url": origin_url, "path": "/some/path"}), + ("directory", {"origin_url": origin_url}), + ("log", {"origin_url": origin_url}), + ("directory", {"origin_url": origin_url, "timestamp": timestamp,},), + ("directory", {"origin_url": origin_url, "visit_id": visit_id,},), ): visit_id = kwargs["visit_id"] if "visit_id" in kwargs else None visit_ts = kwargs["timestamp"] if "timestamp" in kwargs else None visit_info = get_origin_visit( {"url": kwargs["origin_url"]}, visit_ts=visit_ts, visit_id=visit_id ) snapshot = visit_info["snapshot"] snapshot_context = get_snapshot_context(**kwargs, browse_context=browse_context) query_params = dict(kwargs) branches, releases, _ = get_snapshot_content(snapshot) releases = list(reversed(releases)) revision_id = None root_directory = None for branch in branches: if branch["name"] == "HEAD": revision_id = branch["revision"] root_directory = branch["directory"] branch["url"] = reverse( f"browse-origin-{browse_context}", query_params={"branch": branch["name"], **query_params}, ) for release in releases: release["url"] = reverse( f"browse-origin-{browse_context}", query_params={"release": release["name"], **query_params}, ) query_params.pop("path", None) branches_url = reverse("browse-origin-branches", query_params=query_params) releases_url = reverse("browse-origin-releases", query_params=query_params) origin_visits_url = reverse( "browse-origin-visits", query_params={"origin_url": kwargs["origin_url"]} ) is_empty = not branches and not releases snapshot_swhid = gen_swhid(ObjectType.SNAPSHOT, snapshot) snapshot_sizes = archive_data.snapshot_count_branches(snapshot) visit_info["url"] = directory_url = reverse( "browse-origin-directory", query_params=query_params ) visit_info["formatted_date"] = format_utc_iso_date(visit_info["date"]) if "path" in kwargs: query_params["path"] = kwargs["path"] expected = SnapshotContext( branch="HEAD", branch_alias=True, branches=branches, branches_url=branches_url, is_empty=is_empty, - origin_info={"url": origin["url"]}, + origin_info={"url": origin_url}, origin_visits_url=origin_visits_url, release=None, release_alias=False, release_id=None, query_params=query_params, releases=releases, releases_url=releases_url, revision_id=revision_id, revision_info=_get_revision_info(archive_data, revision_id), root_directory=root_directory, snapshot_id=snapshot, snapshot_sizes=snapshot_sizes, snapshot_swhid=snapshot_swhid, url_args={}, visit_info=visit_info, directory_url=directory_url, ) if revision_id: expected["revision_info"]["revision_url"] = gen_revision_url( revision_id, snapshot_context ) assert snapshot_context == expected _check_branch_release_revision_parameters( archive_data, expected, browse_context, kwargs, branches, releases ) def _check_branch_release_revision_parameters( archive_data, base_expected_context, browse_context, kwargs, branches, releases, ): branch = random.choice(branches) snapshot_context = get_snapshot_context( **kwargs, browse_context=browse_context, branch_name=branch["name"] ) url_args = dict(kwargs) url_args.pop("path", None) url_args.pop("timestamp", None) url_args.pop("visit_id", None) url_args.pop("origin_url", None) query_params = dict(kwargs) query_params.pop("snapshot_id", None) expected_branch = dict(base_expected_context) expected_branch["branch"] = branch["name"] expected_branch["branch_alias"] = branch["alias"] expected_branch["revision_id"] = branch["revision"] expected_branch["revision_info"] = _get_revision_info( archive_data, branch["revision"] ) expected_branch["root_directory"] = branch["directory"] expected_branch["query_params"] = {"branch": branch["name"], **query_params} expected_branch["revision_info"]["revision_url"] = gen_revision_url( branch["revision"], expected_branch ) assert snapshot_context == expected_branch if releases: release = random.choice(releases) snapshot_context = get_snapshot_context( **kwargs, browse_context=browse_context, release_name=release["name"] ) expected_release = dict(base_expected_context) expected_release["branch"] = None expected_release["branch_alias"] = False expected_release["release"] = release["name"] expected_release["release_id"] = release["id"] if release["target_type"] == "revision": expected_release["revision_id"] = release["target"] expected_release["revision_info"] = _get_revision_info( archive_data, release["target"] ) expected_release["root_directory"] = release["directory"] expected_release["query_params"] = {"release": release["name"], **query_params} expected_release["revision_info"]["revision_url"] = gen_revision_url( release["target"], expected_release ) assert snapshot_context == expected_release revision_log = archive_data.revision_log(branch["revision"]) revision = revision_log[-1] snapshot_context = get_snapshot_context( **kwargs, browse_context=browse_context, revision_id=revision["id"] ) if "origin_url" in kwargs: view_name = f"browse-origin-{browse_context}" else: view_name = f"browse-snapshot-{browse_context}" kwargs.pop("visit_id", None) revision_browse_url = reverse( view_name, url_args=url_args, query_params={"revision": revision["id"], **query_params}, ) branches.append( SnapshotBranchInfo( name=revision["id"], alias=False, revision=revision["id"], directory=revision["directory"], date=revision["date"], message=revision["message"], url=revision_browse_url, ) ) expected_revision = dict(base_expected_context) expected_revision["branch"] = None expected_revision["branch_alias"] = False expected_revision["branches"] = branches expected_revision["revision_id"] = revision["id"] expected_revision["revision_info"] = _get_revision_info( archive_data, revision["id"] ) expected_revision["root_directory"] = revision["directory"] expected_revision["query_params"] = {"revision": revision["id"], **query_params} expected_revision["revision_info"]["revision_url"] = gen_revision_url( revision["id"], expected_revision ) assert snapshot_context == expected_revision -@given(origin_with_releases()) -def test_get_release_large_snapshot(archive_data, origin): - snapshot = archive_data.snapshot_get_latest(origin["url"]) +def test_get_release_large_snapshot(archive_data, origin_with_releases): + snapshot = archive_data.snapshot_get_latest(origin_with_releases["url"]) release_id = random.choice( [ v["target"] for v in snapshot["branches"].values() if v["target_type"] == "release" ] ) release_data = archive_data.release_get(release_id) # simulate large snapshot processing by providing releases parameter # as an empty list release = _get_release( releases=[], release_name=release_data["name"], snapshot_id=snapshot["id"] ) assert release_data["name"] == release["name"] assert release_data["id"] == release["id"] def _get_revision_info(archive_data, revision_id): revision_info = None if revision_id: revision_info = archive_data.revision_get(revision_id) revision_info["message_header"] = revision_info["message"].split("\n")[0] revision_info["date"] = format_utc_iso_date(revision_info["date"]) revision_info["committer_date"] = format_utc_iso_date( revision_info["committer_date"] ) return revision_info diff --git a/swh/web/tests/browse/views/test_content.py b/swh/web/tests/browse/views/test_content.py index df433881..87222246 100644 --- a/swh/web/tests/browse/views/test_content.py +++ b/swh/web/tests/browse/views/test_content.py @@ -1,627 +1,627 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from django.utils.html import escape from swh.model.swhids import ObjectType from swh.web.browse.snapshot_context import process_snapshot_branches from swh.web.browse.utils import ( _re_encode_content, get_mimetype_and_encoding_for_content, prepare_content_for_display, ) from swh.web.common.exc import NotFoundExc from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import gen_path_info, reverse from swh.web.tests.django_asserts import assert_contains, assert_not_contains -from swh.web.tests.strategies import ( - invalid_sha1, - origin_with_multiple_visits, - unknown_content, -) +from swh.web.tests.strategies import invalid_sha1, unknown_content from swh.web.tests.utils import check_html_get_response, check_http_get_response def test_content_view_text(client, archive_data, content_text): sha1_git = content_text["sha1_git"] url = reverse( "browse-content", url_args={"query_string": content_text["sha1"]}, query_params={"path": content_text["path"]}, ) url_raw = reverse( "browse-content-raw", url_args={"query_string": content_text["sha1"]} ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content_text) mimetype = content_display["mimetype"] if mimetype.startswith("text/"): assert_contains(resp, '' % content_display["language"]) assert_contains(resp, escape(content_display["content_data"])) assert_contains(resp, url_raw) swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_not_contains(resp, "swh-metadata-popover") def test_content_view_no_highlight( client, archive_data, content_application_no_highlight, content_text_no_highlight ): for content_ in (content_application_no_highlight, content_text_no_highlight): content = content_ sha1_git = content["sha1_git"] url = reverse("browse-content", url_args={"query_string": content["sha1"]}) url_raw = reverse( "browse-content-raw", url_args={"query_string": content["sha1"]} ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content) assert_contains(resp, '') assert_contains(resp, escape(content_display["content_data"])) assert_contains(resp, url_raw) swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) def test_content_view_no_utf8_text(client, archive_data, content_text_non_utf8): sha1_git = content_text_non_utf8["sha1_git"] url = reverse( "browse-content", url_args={"query_string": content_text_non_utf8["sha1"]} ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content_text_non_utf8) swh_cnt_id = gen_swhid(ObjectType.CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, escape(content_display["content_data"])) def test_content_view_image(client, archive_data, content_image_type): url = reverse( "browse-content", url_args={"query_string": content_image_type["sha1"]} ) url_raw = reverse( "browse-content-raw", url_args={"query_string": content_image_type["sha1"]} ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content_image_type) mimetype = content_display["mimetype"] content_data = content_display["content_data"] assert_contains(resp, '' % (mimetype, content_data)) assert_contains(resp, url_raw) def test_content_view_image_no_rendering( client, archive_data, content_unsupported_image_type_rendering ): url = reverse( "browse-content", url_args={"query_string": content_unsupported_image_type_rendering["sha1"]}, ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) mimetype = content_unsupported_image_type_rendering["mimetype"] encoding = content_unsupported_image_type_rendering["encoding"] assert_contains( resp, ( f"Content with mime type {mimetype} and encoding {encoding} " "cannot be displayed." ), ) def test_content_view_text_with_path(client, archive_data, content_text): path = content_text["path"] url = reverse( "browse-content", url_args={"query_string": content_text["sha1"]}, query_params={"path": path}, ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) assert_contains(resp, '