Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/browse/views/test_origin.py
Show All 18 Lines | from swh.model.model import ( | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
from swh.web.browse.snapshot_context import process_snapshot_branches | from swh.web.browse.snapshot_context import process_snapshot_branches | ||||
from swh.web.common.exc import NotFoundExc | from swh.web.common.exc import NotFoundExc | ||||
from swh.web.common.identifiers import gen_swhid | from swh.web.common.identifiers import gen_swhid | ||||
from swh.web.common.utils import ( | from swh.web.common.utils import format_utc_iso_date, parse_iso8601_date_to_utc, reverse | ||||
format_utc_iso_date, | |||||
gen_path_info, | |||||
parse_iso8601_date_to_utc, | |||||
reverse, | |||||
) | |||||
from swh.web.tests.data import get_content | |||||
from swh.web.tests.django_asserts import assert_contains, assert_not_contains | from swh.web.tests.django_asserts import assert_contains, assert_not_contains | ||||
from swh.web.tests.strategies import new_origin, new_snapshot, visit_dates | from swh.web.tests.strategies import new_origin, new_snapshot, visit_dates | ||||
from swh.web.tests.utils import check_html_get_response | from swh.web.tests.utils import check_html_get_response | ||||
def test_origin_visits_browse(client, archive_data, origin_with_multiple_visits): | def test_origin_visits_browse(client, archive_data, origin_with_multiple_visits): | ||||
origin_url = origin_with_multiple_visits["url"] | origin_url = origin_with_multiple_visits["url"] | ||||
url = reverse("browse-origin-visits", query_params={"origin_url": origin_url}) | url = reverse("browse-origin-visits", query_params={"origin_url": origin_url}) | ||||
Show All 10 Lines | for v in visits: | ||||
"browse-origin-directory", | "browse-origin-directory", | ||||
query_params={"origin_url": origin_url, "timestamp": vdate}, | query_params={"origin_url": origin_url, "timestamp": vdate}, | ||||
) | ) | ||||
assert_contains(resp, browse_dir_url) | assert_contains(resp, browse_dir_url) | ||||
_check_origin_link(resp, origin_url) | _check_origin_link(resp, origin_url) | ||||
def test_origin_content_view( | |||||
client, archive_data, swh_scheduler, origin_with_multiple_visits | |||||
): | |||||
origin_visits = archive_data.origin_visit_get(origin_with_multiple_visits["url"]) | |||||
def _get_archive_data(visit_idx): | |||||
snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"]) | |||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | |||||
head_rev = archive_data.revision_get(head_rev_id) | |||||
dir_content = archive_data.directory_ls(head_rev["directory"]) | |||||
dir_files = [e for e in dir_content if e["type"] == "file"] | |||||
dir_file = random.choice(dir_files) | |||||
branches, releases, _ = process_snapshot_branches(snapshot) | |||||
return { | |||||
"branches": branches, | |||||
"releases": releases, | |||||
"root_dir_sha1": head_rev["directory"], | |||||
"content": get_content(dir_file["checksums"]["sha1"]), | |||||
"visit": origin_visits[visit_idx], | |||||
"snapshot_sizes": archive_data.snapshot_count_branches(snapshot["id"]), | |||||
} | |||||
tdata = _get_archive_data(-1) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[-1], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[-1], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
timestamp=tdata["visit"]["date"], | |||||
) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[-1], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
snapshot_id=tdata["visit"]["snapshot"], | |||||
) | |||||
tdata = _get_archive_data(0) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[0], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
visit_id=tdata["visit"]["visit"], | |||||
) | |||||
_origin_content_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin_with_multiple_visits, | |||||
origin_visits[0], | |||||
tdata["snapshot_sizes"], | |||||
tdata["branches"], | |||||
tdata["releases"], | |||||
tdata["root_dir_sha1"], | |||||
tdata["content"], | |||||
snapshot_id=tdata["visit"]["snapshot"], | |||||
) | |||||
def test_origin_root_directory_view(client, archive_data, swh_scheduler, origin): | def test_origin_root_directory_view(client, archive_data, swh_scheduler, origin): | ||||
origin_visits = archive_data.origin_visit_get(origin["url"]) | origin_visits = archive_data.origin_visit_get(origin["url"]) | ||||
visit = origin_visits[-1] | visit = origin_visits[-1] | ||||
snapshot = archive_data.snapshot_get(visit["snapshot"]) | snapshot = archive_data.snapshot_get(visit["snapshot"]) | ||||
snapshot_sizes = archive_data.snapshot_count_branches(snapshot["id"]) | snapshot_sizes = archive_data.snapshot_count_branches(snapshot["id"]) | ||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | head_rev_id = archive_data.snapshot_get_head(snapshot) | ||||
head_rev = archive_data.revision_get(head_rev_id) | head_rev = archive_data.revision_get(head_rev_id) | ||||
▲ Show 20 Lines • Show All 365 Lines • ▼ Show 20 Lines | def test_browse_origin_directory_not_found(client, origin): | ||||
) | ) | ||||
resp = check_html_get_response( | resp = check_html_get_response( | ||||
client, url, status_code=404, template_used="browse/directory.html" | client, url, status_code=404, template_used="browse/directory.html" | ||||
) | ) | ||||
assert re.search("Directory.*not found", resp.content.decode("utf-8")) | assert re.search("Directory.*not found", resp.content.decode("utf-8")) | ||||
def test_browse_origin_content_no_visit(client, mocker, origin): | |||||
mock_get_origin_visits = mocker.patch( | |||||
"swh.web.common.origin_visits.get_origin_visits" | |||||
) | |||||
mock_get_origin_visits.return_value = [] | |||||
mock_archive = mocker.patch("swh.web.common.origin_visits.archive") | |||||
mock_archive.lookup_origin_visit_latest.return_value = None | |||||
url = reverse( | |||||
"browse-origin-content", | |||||
query_params={"origin_url": origin["url"], "path": "foo"}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert_contains(resp, "No valid visit", status_code=404) | |||||
assert not mock_get_origin_visits.called | |||||
def test_browse_origin_content_unknown_visit(client, mocker, origin): | |||||
mock_get_origin_visits = mocker.patch( | |||||
"swh.web.common.origin_visits.get_origin_visits" | |||||
) | |||||
mock_get_origin_visits.return_value = [{"visit": 1}] | |||||
url = reverse( | |||||
"browse-origin-content", | |||||
query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert re.search("Visit.*not found", resp.content.decode("utf-8")) | |||||
assert mock_get_origin_visits.called | |||||
def _add_empty_snapshot_origin(new_origin, archive_data): | def _add_empty_snapshot_origin(new_origin, archive_data): | ||||
snapshot = Snapshot(branches={}) | snapshot = Snapshot(branches={}) | ||||
archive_data.origin_add([new_origin]) | archive_data.origin_add([new_origin]) | ||||
archive_data.snapshot_add([snapshot]) | archive_data.snapshot_add([snapshot]) | ||||
visit = archive_data.origin_visit_add( | visit = archive_data.origin_visit_add( | ||||
[OriginVisit(origin=new_origin.url, date=now(), type="git",)] | [OriginVisit(origin=new_origin.url, date=now(), type="git",)] | ||||
)[0] | )[0] | ||||
visit_status = OriginVisitStatus( | visit_status = OriginVisitStatus( | ||||
origin=new_origin.url, | origin=new_origin.url, | ||||
visit=visit.visit, | visit=visit.visit, | ||||
date=now(), | date=now(), | ||||
status="full", | status="full", | ||||
snapshot=snapshot.id, | snapshot=snapshot.id, | ||||
) | ) | ||||
archive_data.origin_visit_status_add([visit_status]) | archive_data.origin_visit_status_add([visit_status]) | ||||
@pytest.mark.django_db | @pytest.mark.django_db | ||||
@pytest.mark.parametrize("object_type", ["content", "directory"]) | @pytest.mark.parametrize("object_type", ["directory"]) | ||||
@given(new_origin()) | @given(new_origin()) | ||||
def test_browse_origin_content_directory_empty_snapshot( | def test_browse_origin_content_directory_empty_snapshot( | ||||
client, staff_user, archive_data, object_type, new_origin | client, staff_user, archive_data, object_type, new_origin | ||||
): | ): | ||||
_add_empty_snapshot_origin(new_origin, archive_data) | _add_empty_snapshot_origin(new_origin, archive_data) | ||||
# to check proper generation of raw extrinsic metadata api links | # to check proper generation of raw extrinsic metadata api links | ||||
client.force_login(staff_user) | client.force_login(staff_user) | ||||
url = reverse( | url = reverse( | ||||
f"browse-origin-{object_type}", | f"browse-origin-{object_type}", | ||||
query_params={"origin_url": new_origin.url, "path": "baz"}, | query_params={"origin_url": new_origin.url, "path": "baz"}, | ||||
) | ) | ||||
resp = check_html_get_response( | resp = check_html_get_response( | ||||
client, url, status_code=200, template_used=f"browse/{object_type}.html" | client, url, status_code=200, template_used=f"browse/{object_type}.html" | ||||
) | ) | ||||
assert re.search("snapshot.*is empty", resp.content.decode("utf-8")) | assert re.search("snapshot.*is empty", resp.content.decode("utf-8")) | ||||
def test_browse_origin_content_not_found(client, origin): | |||||
url = reverse( | |||||
"browse-origin-content", | |||||
query_params={"origin_url": origin["url"], "path": "/invalid/file/path"}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="browse/content.html" | |||||
) | |||||
assert re.search("Directory entry.*not found", resp.content.decode("utf-8")) | |||||
def test_browse_directory_snapshot_not_found(client, mocker, origin): | def test_browse_directory_snapshot_not_found(client, mocker, origin): | ||||
mock_get_snapshot_context = mocker.patch( | mock_get_snapshot_context = mocker.patch( | ||||
"swh.web.browse.snapshot_context.get_snapshot_context" | "swh.web.browse.snapshot_context.get_snapshot_context" | ||||
) | ) | ||||
mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found") | mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found") | ||||
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) | url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) | ||||
resp = check_html_get_response( | resp = check_html_get_response( | ||||
▲ Show 20 Lines • Show All 152 Lines • ▼ Show 20 Lines | ): | ||||
assert_not_contains(resp, "swh-vault-download") | assert_not_contains(resp, "swh-vault-download") | ||||
# no History link | # no History link | ||||
assert_not_contains(resp, "swh-tr-link") | assert_not_contains(resp, "swh-tr-link") | ||||
# no SWHIDs for directory and revision | # no SWHIDs for directory and revision | ||||
assert_not_contains(resp, "swh:1:dir:") | assert_not_contains(resp, "swh:1:dir:") | ||||
assert_not_contains(resp, "swh:1:rev:") | assert_not_contains(resp, "swh:1:rev:") | ||||
def test_origin_content_no_path(client, origin): | |||||
url = reverse("browse-origin-content", query_params={"origin_url": origin["url"]}) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=400, template_used="error.html" | |||||
) | |||||
assert_contains( | |||||
resp, "The path of a content must be given as query parameter.", status_code=400 | |||||
) | |||||
def test_origin_views_no_url_query_parameter(client): | def test_origin_views_no_url_query_parameter(client): | ||||
for browse_context in ( | for browse_context in ( | ||||
"content", | |||||
"directory", | "directory", | ||||
"visits", | "visits", | ||||
): | ): | ||||
url = reverse(f"browse-origin-{browse_context}") | url = reverse(f"browse-origin-{browse_context}") | ||||
resp = check_html_get_response( | resp = check_html_get_response( | ||||
client, url, status_code=400, template_used="error.html" | client, url, status_code=400, template_used="error.html" | ||||
) | ) | ||||
assert_contains( | assert_contains( | ||||
Show All 9 Lines | def test_origin_view_redirects(client, browse_context, new_origin): | ||||
resp = check_html_get_response(client, url, status_code=301) | resp = check_html_get_response(client, url, status_code=301) | ||||
assert resp["location"] == reverse( | assert resp["location"] == reverse( | ||||
f"browse-snapshot-{browse_context}", query_params=query_params | f"browse-snapshot-{browse_context}", query_params=query_params | ||||
) | ) | ||||
@given(new_origin()) | @given(new_origin()) | ||||
@pytest.mark.parametrize("browse_context", ["content"]) | |||||
def test_origin_content_view_redirects(client, browse_context, new_origin): | |||||
query_params = {"origin_url": new_origin.url, "path": "test.txt"} | |||||
url = reverse(f"browse-origin-{browse_context}", query_params=query_params) | |||||
resp = check_html_get_response(client, url, status_code=301) | |||||
assert resp["location"] == reverse( | |||||
f"browse-{browse_context}", query_params=query_params | |||||
) | |||||
@given(new_origin()) | |||||
@pytest.mark.parametrize("browse_context", ["log", "branches", "releases"]) | @pytest.mark.parametrize("browse_context", ["log", "branches", "releases"]) | ||||
def test_origin_view_legacy_redirects(client, browse_context, new_origin): | def test_origin_view_legacy_redirects(client, browse_context, new_origin): | ||||
# Each legacy route corresponds to two URL patterns, testing both | # Each legacy route corresponds to two URL patterns, testing both | ||||
url_args = [ | url_args = [ | ||||
{"origin_url": new_origin.url}, | {"origin_url": new_origin.url}, | ||||
{"origin_url": new_origin.url, "timestamp": "2021-01-23T22:24:10Z"}, | {"origin_url": new_origin.url, "timestamp": "2021-01-23T22:24:10Z"}, | ||||
] | ] | ||||
params = {"extra-param1": "extra-param1", "extra-param2": "extra-param2"} | params = {"extra-param1": "extra-param1", "extra-param2": "extra-param2"} | ||||
for each_arg in url_args: | for each_arg in url_args: | ||||
url = reverse( | url = reverse( | ||||
f"browse-origin-{browse_context}-legacy", | f"browse-origin-{browse_context}-legacy", | ||||
url_args=each_arg, | url_args=each_arg, | ||||
query_params=params, | query_params=params, | ||||
) | ) | ||||
resp = check_html_get_response(client, url, status_code=301) | resp = check_html_get_response(client, url, status_code=301) | ||||
assert resp["location"] == reverse( | assert resp["location"] == reverse( | ||||
f"browse-snapshot-{browse_context}", query_params={**each_arg, **params} | f"browse-snapshot-{browse_context}", query_params={**each_arg, **params} | ||||
) | ) | ||||
def _origin_content_view_test_helper( | @given(new_origin()) | ||||
client, | def test_origin_content_view_legacy_redirects(client, new_origin): | ||||
archive_data, | url_args = [ | ||||
origin_info, | {"origin_url": new_origin.url}, | ||||
origin_visit, | { | ||||
snapshot_sizes, | "origin_url": new_origin.url, | ||||
origin_branches, | "path": "test.txt", | ||||
origin_releases, | "timestamp": "2021-01-23T22:24:10Z", | ||||
root_dir_sha1, | }, | ||||
content, | {"origin_url": new_origin.url, "path": "test.txt"}, | ||||
visit_id=None, | ] | ||||
timestamp=None, | params = {"extra-param1": "extra-param1", "extra-param2": "extra-param2"} | ||||
snapshot_id=None, | for each_arg in url_args: | ||||
): | url = reverse( | ||||
content_path = "/".join(content["path"].split("/")[1:]) | "browse-origin-content-legacy", url_args=each_arg, query_params=params, | ||||
if not visit_id and not snapshot_id: | |||||
visit_id = origin_visit["visit"] | |||||
query_params = {"origin_url": origin_info["url"], "path": content_path} | |||||
if timestamp: | |||||
query_params["timestamp"] = timestamp | |||||
if visit_id: | |||||
query_params["visit_id"] = visit_id | |||||
elif snapshot_id: | |||||
query_params["snapshot"] = snapshot_id | |||||
url = reverse("browse-origin-content", query_params=query_params) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=200, template_used="browse/content.html" | |||||
) | |||||
assert type(content["data"]) == str | |||||
assert_contains(resp, '<code class="%s">' % content["hljs_language"]) | |||||
assert_contains(resp, escape(content["data"])) | |||||
split_path = content_path.split("/") | |||||
filename = split_path[-1] | |||||
path = content_path.replace(filename, "")[:-1] | |||||
path_info = gen_path_info(path) | |||||
del query_params["path"] | |||||
if timestamp: | |||||
query_params["timestamp"] = format_utc_iso_date( | |||||
parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ" | |||||
) | |||||
root_dir_url = reverse("browse-origin-directory", query_params=query_params) | |||||
assert_contains(resp, '<li class="swh-path">', count=len(path_info) + 1) | |||||
assert_contains(resp, '<a href="%s">%s</a>' % (root_dir_url, root_dir_sha1[:7])) | |||||
for p in path_info: | |||||
query_params["path"] = p["path"] | |||||
dir_url = reverse("browse-origin-directory", query_params=query_params) | |||||
assert_contains(resp, '<a href="%s">%s</a>' % (dir_url, p["name"])) | |||||
assert_contains(resp, "<li>%s</li>" % filename) | |||||
query_string = "sha1_git:" + content["sha1_git"] | |||||
url_raw = reverse( | |||||
"browse-content-raw", | |||||
url_args={"query_string": query_string}, | |||||
query_params={"filename": filename}, | |||||
) | |||||
assert_contains(resp, url_raw) | |||||
if "path" in query_params: | |||||
del query_params["path"] | |||||
origin_branches_url = reverse("browse-origin-branches", query_params=query_params) | |||||
assert_contains(resp, f'href="{escape(origin_branches_url)}"') | |||||
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})") | |||||
origin_releases_url = reverse("browse-origin-releases", query_params=query_params) | |||||
assert_contains(resp, f'href="{escape(origin_releases_url)}">') | |||||
assert_contains(resp, f"Releases ({snapshot_sizes['release']})") | |||||
assert_contains(resp, '<li class="swh-branch">', count=len(origin_branches)) | |||||
query_params["path"] = content_path | |||||
for branch in origin_branches: | |||||
root_dir_branch_url = reverse( | |||||
"browse-origin-content", | |||||
query_params={"branch": branch["name"], **query_params}, | |||||
) | |||||
assert_contains(resp, '<a href="%s">' % root_dir_branch_url) | |||||
assert_contains(resp, '<li class="swh-release">', count=len(origin_releases)) | |||||
query_params["branch"] = None | |||||
for release in origin_releases: | |||||
root_dir_release_url = reverse( | |||||
"browse-origin-content", | |||||
query_params={"release": release["name"], **query_params}, | |||||
) | |||||
assert_contains(resp, '<a href="%s">' % root_dir_release_url) | |||||
url = reverse("browse-origin-content", query_params=query_params) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=200, template_used="browse/content.html" | |||||
) | ) | ||||
snapshot = archive_data.snapshot_get(origin_visit["snapshot"]) | resp = check_html_get_response(client, url, status_code=301) | ||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | assert resp["location"] == reverse( | ||||
"browse-content", query_params={**each_arg, **params} | |||||
swhid_context = { | |||||
"origin": origin_info["url"], | |||||
"visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]), | |||||
"anchor": gen_swhid(ObjectType.REVISION, head_rev_id), | |||||
"path": f"/{content_path}", | |||||
} | |||||
swh_cnt_id = gen_swhid( | |||||
ObjectType.CONTENT, content["sha1_git"], metadata=swhid_context | |||||
) | ) | ||||
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) | |||||
assert_contains(resp, swh_cnt_id) | |||||
assert_contains(resp, swh_cnt_id_url) | |||||
assert_contains(resp, "swh-take-new-snapshot") | |||||
_check_origin_link(resp, origin_info["url"]) | |||||
assert_not_contains(resp, "swh-metadata-popover") | |||||
def _origin_directory_view_test_helper( | def _origin_directory_view_test_helper( | ||||
client, | client, | ||||
archive_data, | archive_data, | ||||
origin_info, | origin_info, | ||||
origin_visit, | origin_visit, | ||||
snapshot_sizes, | snapshot_sizes, | ||||
▲ Show 20 Lines • Show All 162 Lines • Show Last 20 Lines |