' + root_directory_sha1[:7] + ""
)
assert_contains(resp, '', count=len(dirs))
assert_contains(resp, ' ', count=len(files))
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_directory_sha1},
query_params={"path": dir_path},
)
assert_contains(resp, dir_url)
for f in files:
file_path = "%s/%s" % (root_directory_sha1, f["name"])
if path:
file_path = "%s/%s/%s" % (root_directory_sha1, path, f["name"])
query_string = "sha1_git:" + f["target"]
file_url = reverse(
"browse-content",
url_args={"query_string": query_string},
query_params={"path": file_path},
)
assert_contains(resp, file_url)
path_info = gen_path_info(path)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(
resp, '%s ' % (root_dir_url, root_directory_sha1[:7])
)
for p in path_info:
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_directory_sha1},
query_params={"path": p["path"]},
)
assert_contains(resp, '%s ' % (dir_url, p["name"]))
assert_contains(resp, "vault-cook-directory")
swh_dir_id = get_swh_persistent_id("directory", directory_entries[0]["dir_id"])
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
+
+ assert_contains(
+ resp,
+ textwrap.indent(
+ (
+ f"Browse archived directory\n"
+ f'\n'
+ f" {swh_dir_id}\n"
+ f" "
+ ),
+ " " * 4,
+ ),
+ )
diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py
index 1d6fd25b..e6ec17e3 100644
--- a/swh/web/tests/browse/views/test_origin.py
+++ b/swh/web/tests/browse/views/test_origin.py
@@ -1,1110 +1,1140 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import random
import re
import string
+import textwrap
from django.utils.html import escape
from hypothesis import given
from swh.model.hashutil import hash_to_bytes
from swh.model.model import (
Snapshot,
SnapshotBranch,
TargetType,
)
from swh.web.browse.snapshot_context import process_snapshot_branches
from swh.web.common.exc import NotFoundExc
from swh.web.common.identifiers import get_swh_persistent_id
from swh.web.common.utils import (
reverse,
gen_path_info,
format_utc_iso_date,
parse_timestamp,
)
from swh.web.config import get_config
from swh.web.tests.data import get_content, random_sha1
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import (
origin,
origin_with_multiple_visits,
new_origin,
new_snapshot,
visit_dates,
revisions,
origin_with_releases,
release as existing_release,
unknown_revision,
)
@given(origin_with_multiple_visits())
def test_origin_visits_browse(client, archive_data, origin):
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
visits = archive_data.origin_visit_get(origin["url"])
for v in visits:
vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ")
browse_dir_url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "timestamp": vdate},
)
assert_contains(resp, browse_dir_url)
+ _check_origin_view_title(resp, origin["url"], "visits")
+
@given(origin_with_multiple_visits())
def test_origin_content_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
def _get_archive_data(visit_idx):
snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
dir_content = archive_data.directory_ls(head_rev["directory"])
dir_files = [e for e in dir_content if e["type"] == "file"]
dir_file = random.choice(dir_files)
branches, releases = process_snapshot_branches(snapshot)
return {
"branches": branches,
"releases": releases,
"root_dir_sha1": head_rev["directory"],
"content": get_content(dir_file["checksums"]["sha1"]),
"visit": origin_visits[visit_idx],
}
tdata = _get_archive_data(-1)
_origin_content_view_test_helper(
client,
origin,
origin_visits,
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
)
_origin_content_view_test_helper(
client,
origin,
origin_visits,
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=tdata["visit"]["date"],
)
visit_unix_ts = parse_timestamp(tdata["visit"]["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_content_view_test_helper(
client,
origin,
origin_visits,
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=visit_unix_ts,
)
tdata = _get_archive_data(0)
_origin_content_view_test_helper(
client,
origin,
origin_visits,
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
visit_id=tdata["visit"]["visit"],
)
@given(origin())
def test_origin_root_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
dir_content = archive_data.directory_ls(root_dir_sha1)
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_directory_view_test_helper(
client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
origin = dict(origin)
del origin["type"]
_origin_directory_view_test_helper(
client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
@given(origin())
def test_origin_sub_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
subdirs = [
e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir"
]
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
if len(subdirs) == 0:
return
subdir = random.choice(subdirs)
subdir_content = archive_data.directory_ls(subdir["target"])
subdir_path = subdir["name"]
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
origin = dict(origin)
del origin["type"]
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
@given(origin())
def test_origin_branches(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_branches_test_helper(client, origin, snapshot_content)
origin = dict(origin)
origin["type"] = None
_origin_branches_test_helper(client, origin, snapshot_content)
@given(origin())
def test_origin_releases(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_releases_test_helper(client, origin, snapshot_content)
origin = dict(origin)
origin["type"] = None
_origin_releases_test_helper(client, origin, snapshot_content)
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=3, max_size=3),
)
def test_origin_snapshot_null_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
new_origin = archive_data.origin_add([new_origin])[0]
for i, branch in enumerate(snp_dict["branches"].keys()):
if i == 0:
snp_dict["branches"][branch] = None
else:
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i - 1]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="partial", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-directory", query_params={"origin_url": new_origin["url"]}
)
rv = client.get(url)
assert rv.status_code == 200
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=4, max_size=4),
)
def test_origin_snapshot_invalid_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
new_origin = archive_data.origin_add([new_origin])[0]
for i, branch in enumerate(snp_dict["branches"].keys()):
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin["url"], "branch": "invalid_branch"},
)
rv = client.get(url)
assert rv.status_code == 404
@given(new_origin())
def test_browse_visits_origin_not_found(client, new_origin):
url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, f"Origin with url {new_origin.url} not found", status_code=404
)
@given(origin())
def test_browse_origin_directory_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_not_found(client, origin):
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "path": "/invalid/dir/path/"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_origin_content_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_directory_empty_snapshot(client, mocker, origin):
mock_snapshot_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_snapshot_service.lookup_origin.return_value = origin
mock_snapshot_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
for browse_context in ("content", "directory"):
url = reverse(
f"browse-origin-{browse_context}",
query_params={"origin_url": origin["url"], "path": "baz"},
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, f"browse/{browse_context}.html")
assert re.search("snapshot.*is empty", resp.content.decode("utf-8"))
assert mock_get_origin_visit_snapshot.called
assert mock_snapshot_service.lookup_origin.called
assert mock_snapshot_service.lookup_snapshot_sizes.called
@given(origin())
def test_browse_origin_content_not_found(client, origin):
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "/invalid/file/path"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory entry.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_directory_snapshot_not_found(client, mocker, origin):
mock_get_snapshot_context = mocker.patch(
"swh.web.browse.snapshot_context.get_snapshot_context"
)
mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found")
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "Snapshot not found", status_code=404)
assert mock_get_snapshot_context.called
@given(origin())
def test_origin_empty_snapshot(client, mocker, origin):
mock_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
mock_service.lookup_origin.return_value = origin
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
resp_content = resp.content.decode("utf-8")
assert re.search("snapshot.*is empty", resp_content)
assert not re.search("swh-tr-link", resp_content)
assert mock_get_origin_visit_snapshot.called
assert mock_service.lookup_snapshot_sizes.called
@given(origin_with_releases())
def test_origin_release_browse(client, archive_data, origin):
# for swh.web.browse.snapshot_context.get_snapshot_content to only return one branch
config = get_config()
snapshot_max_size = int(config["snapshot_content_max_size"])
config["snapshot_content_max_size"] = 1
try:
snapshot = archive_data.snapshot_get_latest(origin["url"])
release = [
b for b in snapshot["branches"].values() if b["target_type"] == "release"
][-1]
release_data = archive_data.release_get(release["target"])
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": release_data["name"]},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(resp, release_data["name"])
assert_contains(resp, release["target"])
finally:
config["snapshot_content_max_size"] = snapshot_max_size
@given(origin_with_releases())
def test_origin_release_browse_not_found(client, origin):
invalid_release_name = "swh-foo-bar"
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": invalid_release_name},
)
resp = client.get(url)
assert resp.status_code == 404
assert re.search(
f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8")
)
@given(new_origin(), unknown_revision())
def test_origin_browse_directory_branch_with_non_resolvable_revision(
client, archive_data, new_origin, unknown_revision
):
branch_name = "master"
snapshot = Snapshot(
branches={
branch_name.encode(): SnapshotBranch(
target=hash_to_bytes(unknown_revision), target_type=TargetType.REVISION,
)
}
)
new_origin = archive_data.origin_add([new_origin])[0]
archive_data.snapshot_add([snapshot])
visit = archive_data.origin_visit_add(new_origin["url"], datetime.now(), type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snapshot.id
)
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin["url"], "branch": branch_name},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(
resp, f"Revision {unknown_revision } could not be found in the archive."
)
@given(origin())
def test_origin_content_no_path(client, origin):
url = reverse("browse-origin-content", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "The path of a content must be given as query parameter.", status_code=400
)
def test_origin_views_no_url_query_parameter(client):
for browse_context in (
"content",
"directory",
"log",
"branches",
"releases",
"visits",
):
url = reverse(f"browse-origin-{browse_context}")
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "An origin URL must be provided as query parameter.", status_code=400
)
def _origin_content_view_test_helper(
client,
origin_info,
origin_visits,
origin_branches,
origin_releases,
root_dir_sha1,
content,
visit_id=None,
timestamp=None,
):
content_path = "/".join(content["path"].split("/")[1:])
if not visit_id:
visit_id = origin_visits[-1]["visit"]
query_params = {"origin_url": origin_info["url"], "path": content_path}
if timestamp:
query_params["timestamp"] = timestamp
if visit_id:
query_params["visit_id"] = visit_id
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
assert type(content["data"]) == str
assert_contains(resp, '' % content["hljs_language"])
assert_contains(resp, escape(content["data"]))
split_path = content_path.split("/")
filename = split_path[-1]
path = content_path.replace(filename, "")[:-1]
path_info = gen_path_info(path)
del query_params["path"]
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
root_dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(resp, '%s ' % (root_dir_url, root_dir_sha1[:7]))
for p in path_info:
query_params["path"] = p["path"]
dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '%s ' % (dir_url, p["name"]))
assert_contains(resp, " %s " % filename)
query_string = "sha1_git:" + content["sha1_git"]
url_raw = reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params={"filename": filename},
)
assert_contains(resp, url_raw)
if "path" in query_params:
del query_params["path"]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
'Branches (%s) '
% (escape(origin_branches_url), len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
assert_contains(
resp,
'Releases (%s) '
% (escape(origin_releases_url), len(origin_releases)),
)
assert_contains(resp, '', count=len(origin_branches))
query_params["path"] = content_path
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-content", query_params=query_params
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-content", query_params=query_params
)
assert_contains(resp, '' % root_dir_release_url)
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
swh_cnt_id = get_swh_persistent_id("content", content["sha1_git"])
swh_cnt_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, "swh-take-new-snapshot")
+ _check_origin_view_title(resp, origin_info["url"], "content")
+
def _origin_directory_view_test_helper(
client,
origin_info,
origin_visits,
origin_branches,
origin_releases,
root_directory_sha1,
directory_entries,
visit_id=None,
timestamp=None,
path=None,
):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
if not visit_id:
visit_id = origin_visits[-1]["visit"]
query_params = {"origin_url": origin_info["url"]}
if timestamp:
query_params["timestamp"] = timestamp
else:
query_params["visit_id"] = visit_id
if path:
query_params["path"] = path
url = reverse("browse-origin-directory", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert_contains(resp, ' ', count=len(dirs))
assert_contains(resp, ' ', count=len(files))
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
query_params["path"] = dir_path
dir_url = reverse("browse-origin-directory", query_params=query_params,)
assert_contains(resp, dir_url)
for f in files:
file_path = f["name"]
if path:
file_path = "%s/%s" % (path, f["name"])
query_params["path"] = file_path
file_url = reverse("browse-origin-content", query_params=query_params)
assert_contains(resp, file_url)
if "path" in query_params:
del query_params["path"]
root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params)
nb_bc_paths = 1
if path:
nb_bc_paths = len(path.split("/")) + 1
assert_contains(resp, '', count=nb_bc_paths)
assert_contains(
resp, '%s ' % (root_dir_branch_url, root_directory_sha1[:7])
)
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
'Branches (%s) '
% (escape(origin_branches_url), len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp,
'Releases (%s) '
% (escape(origin_releases_url), nb_releases),
)
if path:
query_params["path"] = path
assert_contains(resp, ' ', count=len(origin_branches))
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_release_url)
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
swh_dir_id = get_swh_persistent_id("directory", directory_entries[0]["dir_id"])
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
+ _check_origin_view_title(resp, origin_info["url"], "directory")
+
def _origin_branches_test_helper(client, origin_info, origin_snapshot):
query_params = {"origin_url": origin_info["url"]}
url = reverse("browse-origin-branches", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
' Branches (%s) ' % (origin_branches_url, len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp, 'Releases (%s) ' % (origin_releases_url, nb_releases)
)
assert_contains(resp, '' % escape(browse_branch_url))
browse_revision_url = reverse(
"browse-revision",
url_args={"sha1_git": branch["revision"]},
query_params={"origin_url": origin_info["url"]},
)
assert_contains(resp, '' % escape(browse_revision_url))
+ _check_origin_view_title(resp, origin_info["url"], "branches")
+
def _origin_releases_test_helper(client, origin_info, origin_snapshot):
query_params = {"origin_url": origin_info["url"]}
url = reverse("browse-origin-releases", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/releases.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
' Branches (%s) ' % (origin_branches_url, len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp, 'Releases (%s) ' % (origin_releases_url, nb_releases)
)
assert_contains(resp, ' ' % escape(browse_release_url))
assert_contains(resp, '' % escape(browse_revision_url))
+ _check_origin_view_title(resp, origin_info["url"], "releases")
+
@given(
new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release()
)
def test_origin_branches_pagination_with_alias(
client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release
):
"""
When a snapshot contains a branch or a release alias, pagination links
in the branches / releases view should be displayed.
"""
mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2)
snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())}
for i in range(len(revisions)):
branch = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][branch.encode()] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
release = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][b"RELEASE_ALIAS"] = {
"target_type": "alias",
"target": release.encode(),
}
snp_dict["branches"][release.encode()] = {
"target_type": "release",
"target": hash_to_bytes(existing_release),
}
new_origin = archive_data.origin_add([new_origin])[0]
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-branches", query_params={"origin_url": new_origin["url"]}
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
assert_contains(resp, ' "
+ ),
+ " " * 6,
+ ),
+ )
diff --git a/swh/web/tests/browse/views/test_release.py b/swh/web/tests/browse/views/test_release.py
index 8b1d3892..abdb7cd2 100644
--- a/swh/web/tests/browse/views/test_release.py
+++ b/swh/web/tests/browse/views/test_release.py
@@ -1,121 +1,146 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
+import textwrap
from hypothesis import given
from swh.web.common.identifiers import get_swh_persistent_id
from swh.web.common.utils import reverse, format_utc_iso_date
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import release, origin_with_releases, unknown_release
@given(release())
def test_release_browse(client, archive_data, release):
url = reverse("browse-release", url_args={"sha1_git": release})
release_data = archive_data.release_get(release)
resp = client.get(url)
_release_browse_checks(resp, release_data, archive_data)
@given(origin_with_releases())
def test_release_browse_with_origin(client, archive_data, origin):
snapshot = archive_data.snapshot_get_latest(origin["url"])
release = random.choice(
[b for b in snapshot["branches"].values() if b["target_type"] == "release"]
)
url = reverse(
"browse-release",
url_args={"sha1_git": release["target"]},
query_params={"origin_url": origin["url"]},
)
release_data = archive_data.release_get(release["target"])
resp = client.get(url)
_release_browse_checks(resp, release_data, archive_data, origin)
@given(unknown_release())
def test_release_browse_not_found(client, archive_data, unknown_release):
url = reverse("browse-release", url_args={"sha1_git": unknown_release})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
err_msg = "Release with sha1_git %s not found" % unknown_release
assert_contains(resp, err_msg, status_code=404)
@given(release())
def test_release_uppercase(client, release):
url = reverse(
"browse-release-uppercase-checksum", url_args={"sha1_git": release.upper()}
)
resp = client.get(url)
assert resp.status_code == 302
redirect_url = reverse("browse-release", url_args={"sha1_git": release})
assert resp["location"] == redirect_url
def _release_browse_checks(resp, release_data, archive_data, origin_info=None):
query_params = {}
if origin_info:
query_params["origin_url"] = origin_info["url"]
release_id = release_data["id"]
release_name = release_data["name"]
author_name = release_data["author"]["name"]
release_date = release_data["date"]
message = release_data["message"]
target_type = release_data["target_type"]
target = release_data["target"]
target_url = reverse(
"browse-revision", url_args={"sha1_git": target}, query_params=query_params
)
message_lines = message.split("\n")
assert resp.status_code == 200
assert_template_used(resp, "browse/release.html")
assert_contains(resp, author_name)
assert_contains(resp, format_utc_iso_date(release_date))
assert_contains(
resp,
"%s %s" % (message_lines[0] or "None", "\n".join(message_lines[1:])),
)
assert_contains(resp, release_id)
assert_contains(resp, release_name)
assert_contains(resp, target_type)
assert_contains(resp, '%s ' % (target_url, target))
swh_rel_id = get_swh_persistent_id("release", release_id)
swh_rel_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rel_id})
assert_contains(resp, swh_rel_id)
assert_contains(resp, swh_rel_id_url)
+ if origin_info:
+ browse_origin_url = reverse(
+ "browse-origin", query_params={"origin_url": origin_info["url"]}
+ )
+ title = (
+ f"Browse archived release for origin\n"
+ f'\n'
+ f' {origin_info["url"]}\n'
+ f" "
+ )
+ indent = " " * 6
+ else:
+ title = (
+ f"Browse archived release\n"
+ f'\n'
+ f" {swh_rel_id}\n"
+ f" "
+ )
+ indent = " " * 4
+
+ assert_contains(
+ resp, textwrap.indent(title, indent),
+ )
+
if release_data["target_type"] == "revision":
if origin_info:
directory_url = reverse(
"browse-origin-directory",
query_params={
"origin_url": origin_info["url"],
"release": release_data["name"],
},
)
else:
rev = archive_data.revision_get(release_data["target"])
directory_url = reverse(
"browse-directory", url_args={"sha1_git": rev["directory"]}
)
assert_contains(resp, directory_url)
diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/tests/browse/views/test_revision.py
index 8f8378f3..4c58c6f3 100644
--- a/swh/web/tests/browse/views/test_revision.py
+++ b/swh/web/tests/browse/views/test_revision.py
@@ -1,248 +1,282 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import textwrap
+
from django.utils.html import escape
from hypothesis import given
from swh.web.common.identifiers import get_swh_persistent_id
from swh.web.common.utils import reverse, format_utc_iso_date, parse_timestamp
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import origin, revision, unknown_revision, new_origin
@given(revision())
def test_revision_browse(client, archive_data, revision):
url = reverse("browse-revision", url_args={"sha1_git": revision})
revision_data = archive_data.revision_get(revision)
author_name = revision_data["author"]["name"]
committer_name = revision_data["committer"]["name"]
dir_id = revision_data["directory"]
directory_url = reverse("browse-directory", url_args={"sha1_git": dir_id})
history_url = reverse("browse-revision-log", url_args={"sha1_git": revision})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/revision.html")
assert_contains(resp, author_name)
assert_contains(resp, committer_name)
assert_contains(resp, directory_url)
assert_contains(resp, history_url)
for parent in revision_data["parents"]:
parent_url = reverse("browse-revision", url_args={"sha1_git": parent})
assert_contains(resp, '%s ' % (parent_url, parent))
author_date = revision_data["date"]
committer_date = revision_data["committer_date"]
message_lines = revision_data["message"].split("\n")
assert_contains(resp, format_utc_iso_date(author_date))
assert_contains(resp, format_utc_iso_date(committer_date))
assert_contains(resp, escape(message_lines[0]))
assert_contains(resp, escape("\n".join(message_lines[1:])))
+ swh_rev_id = get_swh_persistent_id("revision", revision)
+ swh_rev_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rev_id})
+
+ assert_contains(
+ resp,
+ textwrap.indent(
+ (
+ f"Browse archived revision\n"
+ f'\n'
+ f" {swh_rev_id}\n"
+ f" "
+ ),
+ " " * 4,
+ ),
+ )
+
@given(origin())
def test_revision_origin_browse(client, archive_data, origin):
snapshot = archive_data.snapshot_get_latest(origin["url"])
revision = archive_data.snapshot_get_head(snapshot)
revision_data = archive_data.revision_get(revision)
dir_id = revision_data["directory"]
origin_revision_log_url = reverse(
"browse-origin-log",
query_params={"origin_url": origin["url"], "revision": revision},
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
query_params={"origin_url": origin["url"]},
)
resp = client.get(url)
assert_contains(resp, origin_revision_log_url)
for parent in revision_data["parents"]:
parent_url = reverse(
"browse-revision",
url_args={"sha1_git": parent},
query_params={"origin_url": origin["url"]},
)
assert_contains(resp, '%s ' % (parent_url, parent))
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
swh_rev_id = get_swh_persistent_id("revision", revision)
swh_rev_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
swh_dir_id = get_swh_persistent_id("directory", dir_id)
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
@given(revision())
def test_revision_log_browse(client, archive_data, revision):
per_page = 10
revision_log = archive_data.revision_log(revision)
revision_log_sorted = sorted(
revision_log,
key=lambda rev: -parse_timestamp(rev["committer_date"]).timestamp(),
)
url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"per_page": per_page},
)
resp = client.get(url)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
nb_log_entries = per_page
if len(revision_log_sorted) < per_page:
nb_log_entries = len(revision_log_sorted)
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, ' Newer')
if len(revision_log_sorted) > per_page:
assert_contains(
resp, 'Older ' % escape(next_page_url),
)
for log in revision_log_sorted[:per_page]:
revision_url = reverse("browse-revision", url_args={"sha1_git": log["id"]})
assert_contains(resp, log["id"][:7])
assert_contains(resp, log["author"]["name"])
assert_contains(resp, format_utc_iso_date(log["date"]))
assert_contains(resp, escape(log["message"]))
assert_contains(resp, format_utc_iso_date(log["committer_date"]))
assert_contains(resp, revision_url)
if len(revision_log_sorted) <= per_page:
return
resp = client.get(next_page_url)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 2 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 2 * per_page:
assert_contains(
resp, 'Older ' % escape(next_page_url),
)
if len(revision_log_sorted) <= 2 * per_page:
return
resp = client.get(next_page_url)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 3 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - 2 * per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 3 * per_page:
assert_contains(
resp, 'Older ' % escape(next_page_url),
)
+ swh_rev_id = get_swh_persistent_id("revision", revision)
+ swh_rev_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rev_id})
+
+ assert_contains(
+ resp,
+ textwrap.indent(
+ (
+ f"Browse archived revisions history\n"
+ f'\n'
+ f" {swh_rev_id}\n"
+ f" "
+ ),
+ " " * 4,
+ ),
+ )
+
@given(revision(), unknown_revision(), new_origin())
def test_revision_request_errors(client, revision, unknown_revision, new_origin):
url = reverse("browse-revision", url_args={"sha1_git": unknown_revision})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, "Revision with sha1_git %s not found" % unknown_revision, status_code=404
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
query_params={"origin_url": new_origin.url},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, "the origin mentioned in your request" " appears broken", status_code=404
)
@given(revision())
def test_revision_uppercase(client, revision):
url = reverse(
"browse-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()}
)
resp = client.get(url)
assert resp.status_code == 302
redirect_url = reverse("browse-revision", url_args={"sha1_git": revision})
assert resp["location"] == redirect_url