Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/browse/views/test_content.py
# Copyright (C) 2017-2021 The Software Heritage developers | # Copyright (C) 2017-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | import random | ||||
import re | |||||
import pytest | import pytest | ||||
from django.utils.html import escape | from django.utils.html import escape | ||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | ||||
from swh.web.browse.snapshot_context import process_snapshot_branches | from swh.web.browse.snapshot_context import process_snapshot_branches | ||||
from swh.web.browse.utils import ( | from swh.web.browse.utils import ( | ||||
_re_encode_content, | _re_encode_content, | ||||
get_mimetype_and_encoding_for_content, | get_mimetype_and_encoding_for_content, | ||||
prepare_content_for_display, | prepare_content_for_display, | ||||
) | ) | ||||
from swh.web.common.exc import NotFoundExc | from swh.web.common.exc import NotFoundExc | ||||
from swh.web.common.identifiers import gen_swhid | from swh.web.common.identifiers import gen_swhid | ||||
from swh.web.common.utils import gen_path_info, reverse | from swh.web.common.utils import ( | ||||
format_utc_iso_date, | |||||
gen_path_info, | |||||
parse_iso8601_date_to_utc, | |||||
reverse, | |||||
) | |||||
from swh.web.tests.data import get_content | |||||
from swh.web.tests.django_asserts import assert_contains, assert_not_contains | from swh.web.tests.django_asserts import assert_contains, assert_not_contains | ||||
from swh.web.tests.utils import check_html_get_response, check_http_get_response | from swh.web.tests.utils import check_html_get_response, check_http_get_response | ||||
def test_content_view_text(client, archive_data, content_text): | def test_content_view_text(client, archive_data, content_text): | ||||
sha1_git = content_text["sha1_git"] | sha1_git = content_text["sha1_git"] | ||||
url = reverse( | url = reverse( | ||||
▲ Show 20 Lines • Show All 596 Lines • ▼ Show 20 Lines | def _process_content_for_display(archive_data, content): | ||||
content_display = prepare_content_for_display( | content_display = prepare_content_for_display( | ||||
content_data, mime_type, content["path"] | content_data, mime_type, content["path"] | ||||
) | ) | ||||
assert type(content_display["content_data"]) == str | assert type(content_display["content_data"]) == str | ||||
return content_display | return content_display | ||||
def test_content_dispaly_empty_query_string_missing_path(client): | |||||
url = reverse("browse-content", query_params={"origin_url": "http://example.com"},) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=400, template_used="error.html" | |||||
) | |||||
assert_contains(resp, "The path query parameter must be provided.", status_code=400) | |||||
def test_content_dispaly_empty_query_string_and_snapshot_origin(client): | |||||
url = reverse("browse-content", query_params={"path": "test.txt"},) | |||||
resp = check_html_get_response(client, url, status_code=400,) | |||||
assert_contains( | |||||
resp, | |||||
"The origin_url or snapshot query parameters must be provided.", | |||||
status_code=400, | |||||
) | |||||
def test_content_dispaly_empty_query_string_with_origin( | |||||
client, archive_data, origin_with_multiple_visits | |||||
): | |||||
origin_url = origin_with_multiple_visits["url"] | |||||
snapshot = archive_data.snapshot_get_latest(origin_url) | |||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | |||||
head_rev = archive_data.revision_get(head_rev_id) | |||||
dir_content = archive_data.directory_ls(head_rev["directory"]) | |||||
dir_files = [e for e in dir_content if e["type"] == "file"] | |||||
dir_file = random.choice(dir_files) | |||||
url = reverse( | |||||
"browse-content", | |||||
query_params={"origin_url": origin_url, "path": dir_file["name"],}, | |||||
) | |||||
resp = check_html_get_response(client, url, status_code=302,) | |||||
redict_url = reverse( | |||||
"browse-content", | |||||
url_args={"query_string": f"sha1_git:{dir_file['checksums']['sha1_git']}"}, | |||||
query_params={"origin_url": origin_url, "path": dir_file["name"],}, | |||||
) | |||||
assert resp.url == redict_url | |||||
def test_content_dispaly_empty_query_string_with_snapshot( | |||||
client, archive_data, origin_with_multiple_visits | |||||
): | |||||
origin_url = origin_with_multiple_visits["url"] | |||||
snapshot = archive_data.snapshot_get_latest(origin_url) | |||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | |||||
head_rev = archive_data.revision_get(head_rev_id) | |||||
dir_content = archive_data.directory_ls(head_rev["directory"]) | |||||
dir_files = [e for e in dir_content if e["type"] == "file"] | |||||
dir_file = random.choice(dir_files) | |||||
url = reverse( | |||||
"browse-content", | |||||
query_params={"snapshot": snapshot["id"], "path": dir_file["name"],}, | |||||
) | |||||
resp = check_html_get_response(client, url, status_code=302,) | |||||
redict_url = reverse( | |||||
"browse-content", | |||||
url_args={"query_string": f"sha1_git:{dir_file['checksums']['sha1_git']}"}, | |||||
query_params={"snapshot": snapshot["id"], "path": dir_file["name"],}, | |||||
) | |||||
assert resp.url == redict_url | |||||
def test_browse_origin_content_no_visit(client, mocker, origin): | |||||
mock_get_origin_visits = mocker.patch( | |||||
"swh.web.common.origin_visits.get_origin_visits" | |||||
) | |||||
mock_get_origin_visits.return_value = [] | |||||
mock_archive = mocker.patch("swh.web.common.origin_visits.archive") | |||||
mock_archive.lookup_origin_visit_latest.return_value = None | |||||
url = reverse( | |||||
"browse-content", query_params={"origin_url": origin["url"], "path": "foo"}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert_contains(resp, "No valid visit", status_code=404) | |||||
assert not mock_get_origin_visits.called | |||||
def test_browse_origin_content_unknown_visit(client, mocker, origin): | |||||
mock_get_origin_visits = mocker.patch( | |||||
"swh.web.common.origin_visits.get_origin_visits" | |||||
) | |||||
mock_get_origin_visits.return_value = [{"visit": 1}] | |||||
url = reverse( | |||||
"browse-content", | |||||
query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert re.search("Resource not found", resp.content.decode("utf-8")) | |||||
def test_browse_origin_content_not_found(client, origin): | |||||
url = reverse( | |||||
"browse-content", | |||||
query_params={"origin_url": origin["url"], "path": "/invalid/file/path"}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert re.search("Resource not found", resp.content.decode("utf-8")) | |||||
def test_browse_content_invalid_origin(client): | |||||
url = reverse( | |||||
"browse-content", | |||||
query_params={ | |||||
"origin_url": "http://invalid-origin", | |||||
"path": "/invalid/file/path", | |||||
}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert re.search("Resource not found", resp.content.decode("utf-8")) | |||||
def test_origin_content_view( | |||||
client, archive_data, swh_scheduler, origin_with_multiple_visits | |||||
): | |||||
origin_visits = archive_data.origin_visit_get(origin_with_multiple_visits["url"]) | |||||
def _get_archive_data(visit_idx): | |||||
snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"]) | |||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | |||||
head_rev = archive_data.revision_get(head_rev_id) | |||||
dir_content = archive_data.directory_ls(head_rev["directory"]) | |||||
dir_files = [e for e in dir_content if e["type"] == "file"] | |||||
dir_file = random.choice(dir_files) | |||||
branches, releases, _ = process_snapshot_branches(snapshot) | |||||
return { | |||||
"branches": branches, | |||||
"releases": releases, | |||||
"root_dir_sha1": head_rev["directory"], | |||||
"content": get_content(dir_file["checksums"]["sha1"]), | |||||
"visit": origin_visits[visit_idx], | |||||
"snapshot_sizes": archive_data.snapshot_count_branches(snapshot["id"]), | |||||
} | |||||
tdata = _get_archive_data(-1) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[-1], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[-1], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
timestamp=tdata["visit"]["date"], | |||||
) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[-1], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
snapshot_id=tdata["visit"]["snapshot"], | |||||
) | |||||
tdata = _get_archive_data(0) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[0], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
visit_id=tdata["visit"]["visit"], | |||||
) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[0], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
snapshot_id=tdata["visit"]["snapshot"], | |||||
) | |||||
def _origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_info, | |||||
origin_visit, | |||||
snapshot_sizes, | |||||
origin_branches, | |||||
origin_releases, | |||||
root_dir_sha1, | |||||
content, | |||||
visit_id=None, | |||||
timestamp=None, | |||||
snapshot_id=None, | |||||
): | |||||
content_path = "/".join(content["path"].split("/")[1:]) | |||||
if not visit_id and not snapshot_id: | |||||
visit_id = origin_visit["visit"] | |||||
query_params = {"origin_url": origin_info["url"], "path": content_path} | |||||
if timestamp: | |||||
query_params["timestamp"] = timestamp | |||||
if visit_id: | |||||
query_params["visit_id"] = visit_id | |||||
elif snapshot_id: | |||||
query_params["snapshot"] = snapshot_id | |||||
url = reverse( | |||||
"browse-content", | |||||
url_args={"query_string": f"sha1_git:{content['sha1_git']}"}, | |||||
query_params=query_params, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=200, template_used="browse/content.html" | |||||
) | |||||
assert type(content["data"]) == str | |||||
assert_contains(resp, '<code class="%s">' % content["hljs_language"]) | |||||
assert_contains(resp, escape(content["data"])) | |||||
split_path = content_path.split("/") | |||||
filename = split_path[-1] | |||||
path = content_path.replace(filename, "")[:-1] | |||||
path_info = gen_path_info(path) | |||||
del query_params["path"] | |||||
if timestamp: | |||||
query_params["timestamp"] = format_utc_iso_date( | |||||
parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ" | |||||
) | |||||
root_dir_url = reverse( | |||||
"browse-directory", | |||||
url_args={"sha1_git": root_dir_sha1}, | |||||
query_params=query_params, | |||||
) | |||||
assert_contains(resp, '<li class="swh-path">', count=len(path_info) + 1) | |||||
assert_contains(resp, '<a href="%s">%s</a>' % (root_dir_url, root_dir_sha1[:7])) | |||||
for p in path_info: | |||||
query_params["path"] = p["path"] | |||||
dir_url = reverse("browse-origin-directory", query_params=query_params) | |||||
assert_contains(resp, '<a href="%s">%s</a>' % (dir_url, p["name"])) | |||||
assert_contains(resp, "<li>%s</li>" % filename) | |||||
query_string = "sha1_git:" + content["sha1_git"] | |||||
url_raw = reverse( | |||||
"browse-content-raw", | |||||
url_args={"query_string": query_string}, | |||||
query_params={"filename": filename}, | |||||
) | |||||
assert_contains(resp, url_raw) | |||||
if "path" in query_params: | |||||
del query_params["path"] | |||||
origin_branches_url = reverse("browse-origin-branches", query_params=query_params) | |||||
assert_contains(resp, f'href="{escape(origin_branches_url)}"') | |||||
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})") | |||||
origin_releases_url = reverse("browse-origin-releases", query_params=query_params) | |||||
assert_contains(resp, f'href="{escape(origin_releases_url)}">') | |||||
assert_contains(resp, f"Releases ({snapshot_sizes['release']})") | |||||
assert_contains(resp, '<li class="swh-branch">', count=len(origin_branches)) | |||||
query_params["path"] = content_path | |||||
for branch in origin_branches: | |||||
root_dir_branch_url = reverse( | |||||
"browse-origin-content", | |||||
query_params={"branch": branch["name"], **query_params}, | |||||
) | |||||
assert_contains(resp, '<a href="%s">' % root_dir_branch_url) | |||||
assert_contains(resp, '<li class="swh-release">', count=len(origin_releases)) | |||||
query_params["branch"] = None | |||||
for release in origin_releases: | |||||
root_dir_release_url = reverse( | |||||
"browse-origin-content", | |||||
query_params={"release": release["name"], **query_params}, | |||||
) | |||||
assert_contains(resp, '<a href="%s">' % root_dir_release_url) | |||||
url = reverse( | |||||
"browse-content", | |||||
url_args={"query_string": query_string}, | |||||
query_params=query_params, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=200, template_used="browse/content.html" | |||||
) | |||||
snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) | |||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | |||||
swhid_context = { | |||||
"origin": origin_info["url"], | |||||
"visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]), | |||||
"anchor": gen_swhid(ObjectType.REVISION, head_rev_id), | |||||
"path": f"/{content_path}", | |||||
} | |||||
swh_cnt_id = gen_swhid( | |||||
ObjectType.CONTENT, content["sha1_git"], metadata=swhid_context | |||||
) | |||||
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) | |||||
assert_contains(resp, swh_cnt_id) | |||||
assert_contains(resp, swh_cnt_id_url) | |||||
assert_contains(resp, "swh-take-new-snapshot") | |||||
_check_origin_link(resp, origin_info["url"]) | |||||
assert_not_contains(resp, "swh-metadata-popover") | |||||
def _check_origin_link(resp, origin_url): | |||||
browse_origin_url = reverse( | |||||
"browse-origin", query_params={"origin_url": origin_url} | |||||
) | |||||
assert_contains(resp, f'href="{browse_origin_url}"') |