Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/browse/views/test_origin.py
# Copyright (C) 2017-2021 The Software Heritage developers | # Copyright (C) 2017-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | |||||
import re | |||||
from hypothesis import given | from hypothesis import given | ||||
import pytest | import pytest | ||||
from django.utils.html import escape | from django.utils.html import escape | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.model import OriginVisit, OriginVisitStatus, Snapshot | ||||
from swh.model.model import ( | |||||
OriginVisit, | |||||
OriginVisitStatus, | |||||
Snapshot, | |||||
SnapshotBranch, | |||||
TargetType, | |||||
) | |||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
from swh.web.browse.snapshot_context import process_snapshot_branches | |||||
from swh.web.common.exc import NotFoundExc | # from swh.web.browse.snapshot_context import process_snapshot_branches | ||||
# from swh.web.common.exc import NotFoundExc | |||||
from swh.web.common.identifiers import gen_swhid | from swh.web.common.identifiers import gen_swhid | ||||
from swh.web.common.utils import format_utc_iso_date, parse_iso8601_date_to_utc, reverse | from swh.web.common.utils import format_utc_iso_date, parse_iso8601_date_to_utc, reverse | ||||
from swh.web.tests.django_asserts import assert_contains, assert_not_contains | from swh.web.tests.django_asserts import assert_contains, assert_not_contains | ||||
from swh.web.tests.strategies import new_origin, new_snapshot, visit_dates | from swh.web.tests.strategies import new_origin # visit_dates, new_snapshot | ||||
from swh.web.tests.utils import check_html_get_response | from swh.web.tests.utils import check_html_get_response | ||||
def test_origin_visits_browse(client, archive_data, origin_with_multiple_visits): | def test_origin_visits_browse(client, archive_data, origin_with_multiple_visits): | ||||
origin_url = origin_with_multiple_visits["url"] | origin_url = origin_with_multiple_visits["url"] | ||||
url = reverse("browse-origin-visits", query_params={"origin_url": origin_url}) | url = reverse("browse-origin-visits", query_params={"origin_url": origin_url}) | ||||
resp = check_html_get_response( | resp = check_html_get_response( | ||||
client, url, status_code=200, template_used="browse/origin-visits.html" | client, url, status_code=200, template_used="browse/origin-visits.html" | ||||
) | ) | ||||
visits = archive_data.origin_visit_get(origin_url) | visits = archive_data.origin_visit_get(origin_url) | ||||
for v in visits: | for v in visits: | ||||
vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ") | vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ") | ||||
browse_dir_url = reverse( | browse_dir_url = reverse( | ||||
"browse-origin-directory", | "browse-origin-directory", | ||||
query_params={"origin_url": origin_url, "timestamp": vdate}, | query_params={"origin_url": origin_url, "timestamp": vdate}, | ||||
) | ) | ||||
assert_contains(resp, browse_dir_url) | assert_contains(resp, browse_dir_url) | ||||
_check_origin_link(resp, origin_url) | _check_origin_link(resp, origin_url) | ||||
def test_origin_root_directory_view(client, archive_data, swh_scheduler, origin): | |||||
origin_visits = archive_data.origin_visit_get(origin["url"]) | |||||
visit = origin_visits[-1] | |||||
snapshot = archive_data.snapshot_get(visit["snapshot"]) | |||||
snapshot_sizes = archive_data.snapshot_count_branches(snapshot["id"]) | |||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | |||||
head_rev = archive_data.revision_get(head_rev_id) | |||||
root_dir_sha1 = head_rev["directory"] | |||||
dir_content = archive_data.directory_ls(root_dir_sha1) | |||||
branches, releases, _ = process_snapshot_branches(snapshot) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
dir_content, | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
dir_content, | |||||
visit_id=visit["visit"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
dir_content, | |||||
timestamp=visit["date"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
dir_content, | |||||
snapshot_id=visit["snapshot"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
dir_content, | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
dir_content, | |||||
visit_id=visit["visit"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
dir_content, | |||||
timestamp=visit["date"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
dir_content, | |||||
snapshot_id=visit["snapshot"], | |||||
) | |||||
def test_origin_sub_directory_view(client, archive_data, swh_scheduler, origin): | |||||
origin_visits = archive_data.origin_visit_get(origin["url"]) | |||||
visit = origin_visits[-1] | |||||
snapshot = archive_data.snapshot_get(visit["snapshot"]) | |||||
snapshot_sizes = archive_data.snapshot_count_branches(snapshot["id"]) | |||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | |||||
head_rev = archive_data.revision_get(head_rev_id) | |||||
root_dir_sha1 = head_rev["directory"] | |||||
subdirs = [ | |||||
e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir" | |||||
] | |||||
branches, releases, _ = process_snapshot_branches(snapshot) | |||||
if len(subdirs) == 0: | |||||
return | |||||
subdir = random.choice(subdirs) | |||||
subdir_content = archive_data.directory_ls(subdir["target"]) | |||||
subdir_path = subdir["name"] | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
subdir_content, | |||||
path=subdir_path, | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
subdir_content, | |||||
path=subdir_path, | |||||
visit_id=visit["visit"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
subdir_content, | |||||
path=subdir_path, | |||||
timestamp=visit["date"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
subdir_content, | |||||
path=subdir_path, | |||||
snapshot_id=visit["snapshot"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
subdir_content, | |||||
path=subdir_path, | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
subdir_content, | |||||
path=subdir_path, | |||||
visit_id=visit["visit"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
subdir_content, | |||||
path=subdir_path, | |||||
timestamp=visit["date"], | |||||
) | |||||
_origin_directory_view_test_helper( | |||||
client, | |||||
archive_data, | |||||
origin, | |||||
visit, | |||||
snapshot_sizes, | |||||
branches, | |||||
releases, | |||||
root_dir_sha1, | |||||
subdir_content, | |||||
path=subdir_path, | |||||
snapshot_id=visit["snapshot"], | |||||
) | |||||
@given( | |||||
new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), | |||||
) | |||||
def test_origin_snapshot_null_branch( | |||||
client, archive_data, revisions_list, new_origin, new_snapshot, visit_dates, | |||||
): | |||||
revisions = revisions_list(size=4) | |||||
snp_dict = new_snapshot.to_dict() | |||||
archive_data.origin_add([new_origin]) | |||||
for i, branch in enumerate(snp_dict["branches"].keys()): | |||||
if i == 0: | |||||
snp_dict["branches"][branch] = None | |||||
else: | |||||
snp_dict["branches"][branch] = { | |||||
"target_type": "revision", | |||||
"target": hash_to_bytes(revisions[i - 1]), | |||||
} | |||||
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) | |||||
visit = archive_data.origin_visit_add( | |||||
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)] | |||||
)[0] | |||||
visit_status = OriginVisitStatus( | |||||
origin=new_origin.url, | |||||
visit=visit.visit, | |||||
date=now(), | |||||
status="partial", | |||||
snapshot=snp_dict["id"], | |||||
) | |||||
archive_data.origin_visit_status_add([visit_status]) | |||||
url = reverse( | |||||
"browse-origin-directory", query_params={"origin_url": new_origin.url} | |||||
) | |||||
check_html_get_response( | |||||
client, url, status_code=200, template_used="browse/directory.html" | |||||
) | |||||
@given( | |||||
new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), | |||||
) | |||||
def test_origin_snapshot_invalid_branch( | |||||
client, archive_data, revisions_list, new_origin, new_snapshot, visit_dates, | |||||
): | |||||
revisions = revisions_list(size=4) | |||||
snp_dict = new_snapshot.to_dict() | |||||
archive_data.origin_add([new_origin]) | |||||
for i, branch in enumerate(snp_dict["branches"].keys()): | |||||
snp_dict["branches"][branch] = { | |||||
"target_type": "revision", | |||||
"target": hash_to_bytes(revisions[i]), | |||||
} | |||||
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)]) | |||||
visit = archive_data.origin_visit_add( | |||||
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)] | |||||
)[0] | |||||
visit_status = OriginVisitStatus( | |||||
origin=new_origin.url, | |||||
visit=visit.visit, | |||||
date=now(), | |||||
status="full", | |||||
snapshot=snp_dict["id"], | |||||
) | |||||
archive_data.origin_visit_status_add([visit_status]) | |||||
url = reverse( | |||||
"browse-origin-directory", | |||||
query_params={"origin_url": new_origin.url, "branch": "invalid_branch"}, | |||||
) | |||||
check_html_get_response(client, url, status_code=404, template_used="error.html") | |||||
@given(new_origin()) | |||||
def test_browse_visits_origin_not_found(client, new_origin): | |||||
url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url}) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert_contains( | |||||
resp, f"Origin with url {new_origin.url} not found", status_code=404 | |||||
) | |||||
def test_browse_origin_directory_no_visit(client, mocker, origin): | |||||
mock_get_origin_visits = mocker.patch( | |||||
"swh.web.common.origin_visits.get_origin_visits" | |||||
) | |||||
mock_get_origin_visits.return_value = [] | |||||
mock_archive = mocker.patch("swh.web.common.origin_visits.archive") | |||||
mock_archive.lookup_origin_visit_latest.return_value = None | |||||
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert_contains(resp, "No valid visit", status_code=404) | |||||
assert not mock_get_origin_visits.called | |||||
def test_browse_origin_directory_unknown_visit(client, mocker, origin): | |||||
mock_get_origin_visits = mocker.patch( | |||||
"swh.web.common.origin_visits.get_origin_visits" | |||||
) | |||||
mock_get_origin_visits.return_value = [{"visit": 1}] | |||||
url = reverse( | |||||
"browse-origin-directory", | |||||
query_params={"origin_url": origin["url"], "visit_id": 2}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert re.search("Visit.*not found", resp.content.decode("utf-8")) | |||||
assert mock_get_origin_visits.called | |||||
def test_browse_origin_directory_not_found(client, origin): | |||||
url = reverse( | |||||
"browse-origin-directory", | |||||
query_params={"origin_url": origin["url"], "path": "/invalid/dir/path/"}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="browse/directory.html" | |||||
) | |||||
assert re.search("Directory.*not found", resp.content.decode("utf-8")) | |||||
def _add_empty_snapshot_origin(new_origin, archive_data): | def _add_empty_snapshot_origin(new_origin, archive_data): | ||||
snapshot = Snapshot(branches={}) | snapshot = Snapshot(branches={}) | ||||
archive_data.origin_add([new_origin]) | archive_data.origin_add([new_origin]) | ||||
archive_data.snapshot_add([snapshot]) | archive_data.snapshot_add([snapshot]) | ||||
visit = archive_data.origin_visit_add( | visit = archive_data.origin_visit_add( | ||||
[OriginVisit(origin=new_origin.url, date=now(), type="git",)] | [OriginVisit(origin=new_origin.url, date=now(), type="git",)] | ||||
)[0] | )[0] | ||||
visit_status = OriginVisitStatus( | visit_status = OriginVisitStatus( | ||||
origin=new_origin.url, | origin=new_origin.url, | ||||
visit=visit.visit, | visit=visit.visit, | ||||
date=now(), | date=now(), | ||||
status="full", | status="full", | ||||
snapshot=snapshot.id, | snapshot=snapshot.id, | ||||
) | ) | ||||
archive_data.origin_visit_status_add([visit_status]) | archive_data.origin_visit_status_add([visit_status]) | ||||
@pytest.mark.django_db | # def test_origin_release_browse(client, archive_data, origin_with_releases): | ||||
@pytest.mark.parametrize("object_type", ["directory"]) | # origin_url = origin_with_releases["url"] | ||||
@given(new_origin()) | # snapshot = archive_data.snapshot_get_latest(origin_url) | ||||
def test_browse_origin_content_directory_empty_snapshot( | # release = [ | ||||
client, staff_user, archive_data, object_type, new_origin | # b for b in snapshot["branches"].values() if b["target_type"] == "release" | ||||
): | # ][-1] | ||||
# release_data = archive_data.release_get(release["target"]) | |||||
_add_empty_snapshot_origin(new_origin, archive_data) | # revision_data = archive_data.revision_get(release_data["target"]) | ||||
# url = reverse( | |||||
# to check proper generation of raw extrinsic metadata api links | # "browse-origin-directory", | ||||
client.force_login(staff_user) | # query_params={"origin_url": origin_url, "release": release_data["name"]}, | ||||
# ) | |||||
url = reverse( | |||||
f"browse-origin-{object_type}", | # resp = check_html_get_response( | ||||
query_params={"origin_url": new_origin.url, "path": "baz"}, | # client, url, status_code=200, template_used="browse/directory.html" | ||||
) | # ) | ||||
# assert_contains(resp, release_data["name"]) | |||||
resp = check_html_get_response( | # assert_contains(resp, release["target"]) | ||||
client, url, status_code=200, template_used=f"browse/{object_type}.html" | |||||
) | # swhid_context = { | ||||
assert re.search("snapshot.*is empty", resp.content.decode("utf-8")) | # "origin": origin_url, | ||||
# "visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]), | |||||
# "anchor": gen_swhid(ObjectType.RELEASE, release_data["id"]), | |||||
def test_browse_directory_snapshot_not_found(client, mocker, origin): | # } | ||||
mock_get_snapshot_context = mocker.patch( | |||||
"swh.web.browse.snapshot_context.get_snapshot_context" | # swh_dir_id = gen_swhid( | ||||
) | # ObjectType.DIRECTORY, revision_data["directory"], metadata=swhid_context | ||||
mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found") | # ) | ||||
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]}) | # swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) | ||||
# assert_contains(resp, swh_dir_id) | |||||
resp = check_html_get_response( | # assert_contains(resp, swh_dir_id_url) | ||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert_contains(resp, "Snapshot not found", status_code=404) | # def test_origin_release_browse_not_found(client, origin_with_releases): | ||||
assert mock_get_snapshot_context.called | |||||
# invalid_release_name = "swh-foo-bar" | |||||
# url = reverse( | |||||
@given(new_origin()) | # "browse-origin-directory", | ||||
def test_origin_empty_snapshot(client, archive_data, new_origin): | # query_params={ | ||||
# "origin_url": origin_with_releases["url"], | |||||
_add_empty_snapshot_origin(new_origin, archive_data) | # "release": invalid_release_name, | ||||
# }, | |||||
url = reverse( | # ) | ||||
"browse-origin-directory", query_params={"origin_url": new_origin.url} | |||||
) | # resp = check_html_get_response( | ||||
# client, url, status_code=404, template_used="error.html" | |||||
resp = check_html_get_response( | # ) | ||||
client, url, status_code=200, template_used="browse/directory.html" | # assert re.search( | ||||
) | # f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8") | ||||
resp_content = resp.content.decode("utf-8") | # ) | ||||
assert re.search("snapshot.*is empty", resp_content) | |||||
assert not re.search("swh-tr-link", resp_content) | |||||
# @given(new_origin()) | |||||
# def test_origin_browse_directory_branch_with_non_resolvable_revision( | |||||
@given(new_origin()) | # client, archive_data, unknown_revision, new_origin, | ||||
def test_origin_empty_snapshot_null_revision(client, archive_data, new_origin): | # ): | ||||
snapshot = Snapshot( | # branch_name = "master" | ||||
branches={ | # snapshot = Snapshot( | ||||
b"HEAD": SnapshotBranch( | # branches={ | ||||
target="refs/head/master".encode(), target_type=TargetType.ALIAS, | # branch_name.encode(): SnapshotBranch( | ||||
), | # target=hash_to_bytes(unknown_revision), | ||||
b"refs/head/master": None, | # target_type=TargetType.REVISION, | ||||
} | # ) | ||||
) | # } | ||||
archive_data.origin_add([new_origin]) | # ) | ||||
archive_data.snapshot_add([snapshot]) | # archive_data.origin_add([new_origin]) | ||||
visit = archive_data.origin_visit_add( | # archive_data.snapshot_add([snapshot]) | ||||
[OriginVisit(origin=new_origin.url, date=now(), type="git",)] | # visit = archive_data.origin_visit_add( | ||||
)[0] | # [OriginVisit(origin=new_origin.url, date=now(), type="git",)] | ||||
visit_status = OriginVisitStatus( | # )[0] | ||||
origin=new_origin.url, | # visit_status = OriginVisitStatus( | ||||
visit=visit.visit, | # origin=new_origin.url, | ||||
date=now(), | # visit=visit.visit, | ||||
status="partial", | # date=now(), | ||||
snapshot=snapshot.id, | # status="partial", | ||||
) | # snapshot=snapshot.id, | ||||
archive_data.origin_visit_status_add([visit_status]) | # ) | ||||
# archive_data.origin_visit_status_add([visit_status]) | |||||
url = reverse( | |||||
"browse-origin-directory", query_params={"origin_url": new_origin.url}, | # url = reverse( | ||||
) | # "browse-origin-directory", | ||||
# query_params={"origin_url": new_origin.url, "branch": branch_name}, | |||||
resp = check_html_get_response( | # ) | ||||
client, url, status_code=200, template_used="browse/directory.html" | |||||
) | # resp = check_html_get_response( | ||||
resp_content = resp.content.decode("utf-8") | # client, url, status_code=200, template_used="browse/directory.html" | ||||
assert re.search("snapshot.*is empty", resp_content) | # ) | ||||
assert not re.search("swh-tr-link", resp_content) | # assert_contains( | ||||
# resp, f"Revision {unknown_revision } could not be found in the archive." | |||||
# ) | |||||
def test_origin_release_browse(client, archive_data, origin_with_releases): | |||||
origin_url = origin_with_releases["url"] | # # no revision card | ||||
snapshot = archive_data.snapshot_get_latest(origin_url) | # assert_not_contains(resp, "swh-tip-revision") | ||||
release = [ | # # no Download dropdown | ||||
b for b in snapshot["branches"].values() if b["target_type"] == "release" | # assert_not_contains(resp, "swh-vault-download") | ||||
][-1] | # # no History link | ||||
release_data = archive_data.release_get(release["target"]) | # assert_not_contains(resp, "swh-tr-link") | ||||
revision_data = archive_data.revision_get(release_data["target"]) | # # no SWHIDs for directory and revision | ||||
url = reverse( | # assert_not_contains(resp, "swh:1:dir:") | ||||
"browse-origin-directory", | # assert_not_contains(resp, "swh:1:rev:") | ||||
query_params={"origin_url": origin_url, "release": release_data["name"]}, | |||||
) | |||||
# def test_origin_views_no_url_query_parameter(client): | |||||
resp = check_html_get_response( | # for browse_context in ( | ||||
client, url, status_code=200, template_used="browse/directory.html" | # "directory", | ||||
) | # "visits", | ||||
assert_contains(resp, release_data["name"]) | # ): | ||||
assert_contains(resp, release["target"]) | # url = reverse(f"browse-origin-{browse_context}") | ||||
# resp = check_html_get_response( | |||||
swhid_context = { | # client, url, status_code=400, template_used="error.html" | ||||
"origin": origin_url, | # ) | ||||
"visit": gen_swhid(ObjectType.SNAPSHOT, snapshot["id"]), | # assert_contains( | ||||
"anchor": gen_swhid(ObjectType.RELEASE, release_data["id"]), | # resp, "An origin URL must be provided as query parameter.", | ||||
} | # status_code=400, | ||||
# ) | |||||
swh_dir_id = gen_swhid( | |||||
ObjectType.DIRECTORY, revision_data["directory"], metadata=swhid_context | |||||
) | |||||
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) | |||||
assert_contains(resp, swh_dir_id) | |||||
assert_contains(resp, swh_dir_id_url) | |||||
def test_origin_release_browse_not_found(client, origin_with_releases): | |||||
invalid_release_name = "swh-foo-bar" | |||||
url = reverse( | |||||
"browse-origin-directory", | |||||
query_params={ | |||||
"origin_url": origin_with_releases["url"], | |||||
"release": invalid_release_name, | |||||
}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=404, template_used="error.html" | |||||
) | |||||
assert re.search( | |||||
f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8") | |||||
) | |||||
@given(new_origin()) | |||||
def test_origin_browse_directory_branch_with_non_resolvable_revision( | |||||
client, archive_data, unknown_revision, new_origin, | |||||
): | |||||
branch_name = "master" | |||||
snapshot = Snapshot( | |||||
branches={ | |||||
branch_name.encode(): SnapshotBranch( | |||||
target=hash_to_bytes(unknown_revision), target_type=TargetType.REVISION, | |||||
) | |||||
} | |||||
) | |||||
archive_data.origin_add([new_origin]) | |||||
archive_data.snapshot_add([snapshot]) | |||||
visit = archive_data.origin_visit_add( | |||||
[OriginVisit(origin=new_origin.url, date=now(), type="git",)] | |||||
)[0] | |||||
visit_status = OriginVisitStatus( | |||||
origin=new_origin.url, | |||||
visit=visit.visit, | |||||
date=now(), | |||||
status="partial", | |||||
snapshot=snapshot.id, | |||||
) | |||||
archive_data.origin_visit_status_add([visit_status]) | |||||
url = reverse( | |||||
"browse-origin-directory", | |||||
query_params={"origin_url": new_origin.url, "branch": branch_name}, | |||||
) | |||||
resp = check_html_get_response( | |||||
client, url, status_code=200, template_used="browse/directory.html" | |||||
) | |||||
assert_contains( | |||||
resp, f"Revision {unknown_revision } could not be found in the archive." | |||||
) | |||||
# no revision card | |||||
assert_not_contains(resp, "swh-tip-revision") | |||||
# no Download dropdown | |||||
assert_not_contains(resp, "swh-vault-download") | |||||
# no History link | |||||
assert_not_contains(resp, "swh-tr-link") | |||||
# no SWHIDs for directory and revision | |||||
assert_not_contains(resp, "swh:1:dir:") | |||||
assert_not_contains(resp, "swh:1:rev:") | |||||
def test_origin_views_no_url_query_parameter(client): | |||||
for browse_context in ( | |||||
"directory", | |||||
"visits", | |||||
): | |||||
url = reverse(f"browse-origin-{browse_context}") | |||||
resp = check_html_get_response( | |||||
client, url, status_code=400, template_used="error.html" | |||||
) | |||||
assert_contains( | |||||
resp, "An origin URL must be provided as query parameter.", status_code=400, | |||||
) | |||||
@given(new_origin()) | @given(new_origin()) | ||||
@pytest.mark.parametrize("browse_context", ["log", "branches", "releases"]) | @pytest.mark.parametrize("browse_context", ["log", "branches", "releases"]) | ||||
def test_origin_view_redirects(client, browse_context, new_origin): | def test_origin_view_redirects(client, browse_context, new_origin): | ||||
query_params = {"origin_url": new_origin.url} | query_params = {"origin_url": new_origin.url} | ||||
url = reverse(f"browse-origin-{browse_context}", query_params=query_params) | url = reverse(f"browse-origin-{browse_context}", query_params=query_params) | ||||
resp = check_html_get_response(client, url, status_code=301) | resp = check_html_get_response(client, url, status_code=301) | ||||
assert resp["location"] == reverse( | assert resp["location"] == reverse( | ||||
f"browse-snapshot-{browse_context}", query_params=query_params | f"browse-snapshot-{browse_context}", query_params=query_params | ||||
) | ) | ||||
@given(new_origin()) | @given(new_origin()) | ||||
@pytest.mark.parametrize("browse_context", ["content"]) | @pytest.mark.parametrize("browse_context", ["content", "directory"]) | ||||
def test_origin_content_view_redirects(client, browse_context, new_origin): | def test_origin_content_view_redirects(client, browse_context, new_origin): | ||||
query_params = {"origin_url": new_origin.url, "path": "test.txt"} | query_params = {"origin_url": new_origin.url, "path": "test.txt"} | ||||
url = reverse(f"browse-origin-{browse_context}", query_params=query_params) | url = reverse(f"browse-origin-{browse_context}", query_params=query_params) | ||||
resp = check_html_get_response(client, url, status_code=301) | resp = check_html_get_response(client, url, status_code=301) | ||||
assert resp["location"] == reverse( | assert resp["location"] == reverse( | ||||
f"browse-{browse_context}", query_params=query_params | f"browse-{browse_context}", query_params=query_params | ||||
) | ) | ||||
Show All 17 Lines | for each_arg in url_args: | ||||
resp = check_html_get_response(client, url, status_code=301) | resp = check_html_get_response(client, url, status_code=301) | ||||
assert resp["location"] == reverse( | assert resp["location"] == reverse( | ||||
f"browse-snapshot-{browse_context}", query_params={**each_arg, **params} | f"browse-snapshot-{browse_context}", query_params={**each_arg, **params} | ||||
) | ) | ||||
@given(new_origin()) | @given(new_origin()) | ||||
def test_origin_content_view_legacy_redirects(client, new_origin): | @pytest.mark.parametrize("browse_context", ["content", "directory"]) | ||||
def test_origin_content_view_legacy_redirects(client, browse_context, new_origin): | |||||
url_args = [ | url_args = [ | ||||
{"origin_url": new_origin.url}, | {"origin_url": new_origin.url}, | ||||
{ | { | ||||
"origin_url": new_origin.url, | "origin_url": new_origin.url, | ||||
"path": "test.txt", | "path": "test.txt", | ||||
"timestamp": "2021-01-23T22:24:10Z", | "timestamp": "2021-01-23T22:24:10Z", | ||||
}, | }, | ||||
{"origin_url": new_origin.url, "path": "test.txt"}, | {"origin_url": new_origin.url, "path": "test.txt"}, | ||||
] | ] | ||||
params = {"extra-param1": "extra-param1", "extra-param2": "extra-param2"} | params = {"extra-param1": "extra-param1", "extra-param2": "extra-param2"} | ||||
for each_arg in url_args: | for each_arg in url_args: | ||||
url = reverse( | url = reverse( | ||||
"browse-origin-content-legacy", url_args=each_arg, query_params=params, | f"browse-origin-{browse_context}-legacy", | ||||
url_args=each_arg, | |||||
query_params=params, | |||||
) | ) | ||||
resp = check_html_get_response(client, url, status_code=301) | resp = check_html_get_response(client, url, status_code=301) | ||||
assert resp["location"] == reverse( | assert resp["location"] == reverse( | ||||
"browse-content", query_params={**each_arg, **params} | f"browse-{browse_context}", query_params={**each_arg, **params} | ||||
) | ) | ||||
def _origin_directory_view_test_helper( | def _origin_directory_view_test_helper( | ||||
client, | client, | ||||
archive_data, | archive_data, | ||||
origin_info, | origin_info, | ||||
origin_visit, | origin_visit, | ||||
▲ Show 20 Lines • Show All 136 Lines • ▼ Show 20 Lines | |||||
def _check_origin_link(resp, origin_url): | def _check_origin_link(resp, origin_url): | ||||
browse_origin_url = reverse( | browse_origin_url = reverse( | ||||
"browse-origin", query_params={"origin_url": origin_url} | "browse-origin", query_params={"origin_url": origin_url} | ||||
) | ) | ||||
assert_contains(resp, f'href="{browse_origin_url}"') | assert_contains(resp, f'href="{browse_origin_url}"') | ||||
def test_browse_pull_request_branch( | @given(new_origin()) | ||||
client, archive_data, origin_with_pull_request_branches | def test_browse_visits_origin_not_found(client, new_origin): | ||||
): | url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url}) | ||||
origin_url = origin_with_pull_request_branches.url | |||||
snapshot = archive_data.snapshot_get_latest(origin_url) | resp = check_html_get_response( | ||||
pr_branch = random.choice( | client, url, status_code=404, template_used="error.html" | ||||
[ | |||||
branch | |||||
for branch in snapshot["branches"].keys() | |||||
if branch.startswith("refs/pull/") | |||||
] | |||||
) | |||||
url = reverse( | |||||
"browse-origin-directory", | |||||
query_params={"origin_url": origin_url, "branch": pr_branch}, | |||||
) | ) | ||||
check_html_get_response( | assert_contains( | ||||
client, url, status_code=200, template_used="browse/directory.html" | resp, f"Origin with url {new_origin.url} not found", status_code=404 | ||||
) | ) | ||||
# def test_browse_pull_request_branch( | |||||
# client, archive_data, origin_with_pull_request_branches | |||||
# ): | |||||
# origin_url = origin_with_pull_request_branches.url | |||||
# snapshot = archive_data.snapshot_get_latest(origin_url) | |||||
# pr_branch = random.choice( | |||||
# [ | |||||
# branch | |||||
# for branch in snapshot["branches"].keys() | |||||
# if branch.startswith("refs/pull/") | |||||
# ] | |||||
# ) | |||||
# url = reverse( | |||||
# "browse-origin-directory", | |||||
# query_params={"origin_url": origin_url, "branch": pr_branch}, | |||||
# ) | |||||
# check_html_get_response( | |||||
# client, url, status_code=200, template_used="browse/directory.html" | |||||
# ) |