' + root_directory_sha1[:7] + ""
)
assert_contains(resp, '', count=len(dirs))
assert_contains(resp, ' ', count=len(files))
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_directory_sha1},
query_params={"path": dir_path},
)
assert_contains(resp, dir_url)
for f in files:
file_path = "%s/%s" % (root_directory_sha1, f["name"])
if path:
file_path = "%s/%s/%s" % (root_directory_sha1, path, f["name"])
query_string = "sha1_git:" + f["target"]
file_url = reverse(
"browse-content",
url_args={"query_string": query_string},
query_params={"path": file_path},
)
assert_contains(resp, file_url)
path_info = gen_path_info(path)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(
resp, '%s ' % (root_dir_url, root_directory_sha1[:7])
)
for p in path_info:
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_directory_sha1},
query_params={"path": p["path"]},
)
assert_contains(resp, '%s ' % (dir_url, p["name"]))
assert_contains(resp, "vault-cook-directory")
- swh_dir_id = get_swh_persistent_id("directory", directory_entries[0]["dir_id"])
+ swh_dir_id = get_swh_persistent_id(DIRECTORY, directory_entries[0]["dir_id"])
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
- assert_contains(resp, swh_dir_id)
- assert_contains(resp, swh_dir_id_url)
assert_contains(
resp,
textwrap.indent(
(
f"Browse archived directory\n"
f'\n'
f" {swh_dir_id}\n"
f" "
),
" " * 4,
),
)
+
+ swhid_context = {}
+ if root_directory_sha1 != directory_entries[0]["dir_id"]:
+ swhid_context["anchor"] = get_swh_persistent_id(DIRECTORY, root_directory_sha1)
+
+ swhid_context["path"] = f"/{path}/" if path else "/"
+
+ if root_directory_sha1 != directory_entries[0]["dir_id"]:
+ swhid_context["anchor"] = get_swh_persistent_id(DIRECTORY, root_directory_sha1)
+
+ swh_dir_id = get_swh_persistent_id(
+ DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
+ )
+ swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
+ assert_contains(resp, swh_dir_id)
+ assert_contains(resp, swh_dir_id_url)
diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py
index 5c8ff562..ed6a7858 100644
--- a/swh/web/tests/browse/views/test_origin.py
+++ b/swh/web/tests/browse/views/test_origin.py
@@ -1,1142 +1,1208 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import random
import re
import string
import textwrap
from django.utils.html import escape
from hypothesis import given
from swh.model.hashutil import hash_to_bytes
+from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
from swh.model.model import (
Snapshot,
SnapshotBranch,
TargetType,
)
from swh.web.browse.snapshot_context import process_snapshot_branches
from swh.web.common.exc import NotFoundExc
from swh.web.common.identifiers import get_swh_persistent_id
from swh.web.common.utils import (
reverse,
gen_path_info,
format_utc_iso_date,
parse_timestamp,
)
-from swh.web.config import get_config
from swh.web.tests.data import get_content, random_sha1
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import (
origin,
origin_with_multiple_visits,
new_origin,
new_snapshot,
visit_dates,
revisions,
origin_with_releases,
release as existing_release,
unknown_revision,
)
@given(origin_with_multiple_visits())
def test_origin_visits_browse(client, archive_data, origin):
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
visits = archive_data.origin_visit_get(origin["url"])
for v in visits:
vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ")
browse_dir_url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "timestamp": vdate},
)
assert_contains(resp, browse_dir_url)
_check_origin_view_title(resp, origin["url"], "visits")
@given(origin_with_multiple_visits())
def test_origin_content_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
def _get_archive_data(visit_idx):
snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
dir_content = archive_data.directory_ls(head_rev["directory"])
dir_files = [e for e in dir_content if e["type"] == "file"]
dir_file = random.choice(dir_files)
branches, releases = process_snapshot_branches(snapshot)
return {
"branches": branches,
"releases": releases,
"root_dir_sha1": head_rev["directory"],
"content": get_content(dir_file["checksums"]["sha1"]),
"visit": origin_visits[visit_idx],
}
tdata = _get_archive_data(-1)
_origin_content_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
)
_origin_content_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=tdata["visit"]["date"],
)
visit_unix_ts = parse_timestamp(tdata["visit"]["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_content_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=visit_unix_ts,
)
tdata = _get_archive_data(0)
_origin_content_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ origin_visits[0],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
visit_id=tdata["visit"]["visit"],
)
@given(origin())
def test_origin_root_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
dir_content = archive_data.directory_ls(root_dir_sha1)
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_directory_view_test_helper(
- client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content
+ client,
+ archive_data,
+ origin,
+ visit,
+ branches,
+ releases,
+ root_dir_sha1,
+ dir_content,
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
origin = dict(origin)
del origin["type"]
_origin_directory_view_test_helper(
- client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content
+ client,
+ archive_data,
+ origin,
+ visit,
+ branches,
+ releases,
+ root_dir_sha1,
+ dir_content,
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
@given(origin())
def test_origin_sub_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
subdirs = [
e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir"
]
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
if len(subdirs) == 0:
return
subdir = random.choice(subdirs)
subdir_content = archive_data.directory_ls(subdir["target"])
subdir_path = subdir["name"]
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
origin = dict(origin)
del origin["type"]
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
+ archive_data,
origin,
- origin_visits,
+ visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
@given(origin())
def test_origin_branches(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_branches_test_helper(client, origin, snapshot_content)
origin = dict(origin)
origin["type"] = None
_origin_branches_test_helper(client, origin, snapshot_content)
@given(origin())
def test_origin_releases(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_releases_test_helper(client, origin, snapshot_content)
origin = dict(origin)
origin["type"] = None
_origin_releases_test_helper(client, origin, snapshot_content)
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=3, max_size=3),
)
def test_origin_snapshot_null_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
new_origin = archive_data.origin_add([new_origin])[0]
for i, branch in enumerate(snp_dict["branches"].keys()):
if i == 0:
snp_dict["branches"][branch] = None
else:
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i - 1]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="partial", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-directory", query_params={"origin_url": new_origin["url"]}
)
rv = client.get(url)
assert rv.status_code == 200
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=4, max_size=4),
)
def test_origin_snapshot_invalid_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
new_origin = archive_data.origin_add([new_origin])[0]
for i, branch in enumerate(snp_dict["branches"].keys()):
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin["url"], "branch": "invalid_branch"},
)
rv = client.get(url)
assert rv.status_code == 404
@given(new_origin())
def test_browse_visits_origin_not_found(client, new_origin):
url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, f"Origin with url {new_origin.url} not found", status_code=404
)
@given(origin())
def test_browse_origin_directory_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_not_found(client, origin):
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "path": "/invalid/dir/path/"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_origin_content_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_directory_empty_snapshot(client, mocker, origin):
mock_snapshot_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_snapshot_service.lookup_origin.return_value = origin
mock_snapshot_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
for browse_context in ("content", "directory"):
url = reverse(
f"browse-origin-{browse_context}",
query_params={"origin_url": origin["url"], "path": "baz"},
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, f"browse/{browse_context}.html")
assert re.search("snapshot.*is empty", resp.content.decode("utf-8"))
assert mock_get_origin_visit_snapshot.called
assert mock_snapshot_service.lookup_origin.called
assert mock_snapshot_service.lookup_snapshot_sizes.called
@given(origin())
def test_browse_origin_content_not_found(client, origin):
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "/invalid/file/path"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory entry.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_directory_snapshot_not_found(client, mocker, origin):
mock_get_snapshot_context = mocker.patch(
"swh.web.browse.snapshot_context.get_snapshot_context"
)
mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found")
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "Snapshot not found", status_code=404)
assert mock_get_snapshot_context.called
@given(origin())
def test_origin_empty_snapshot(client, mocker, origin):
mock_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
mock_service.lookup_origin.return_value = origin
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
resp_content = resp.content.decode("utf-8")
assert re.search("snapshot.*is empty", resp_content)
assert not re.search("swh-tr-link", resp_content)
assert mock_get_origin_visit_snapshot.called
assert mock_service.lookup_snapshot_sizes.called
@given(origin_with_releases())
def test_origin_release_browse(client, archive_data, origin):
- # for swh.web.browse.snapshot_context.get_snapshot_content to only return one branch
- config = get_config()
- snapshot_max_size = int(config["snapshot_content_max_size"])
- config["snapshot_content_max_size"] = 1
- try:
- snapshot = archive_data.snapshot_get_latest(origin["url"])
- release = [
- b for b in snapshot["branches"].values() if b["target_type"] == "release"
- ][-1]
- release_data = archive_data.release_get(release["target"])
- url = reverse(
- "browse-origin-directory",
- query_params={"origin_url": origin["url"], "release": release_data["name"]},
- )
+ snapshot = archive_data.snapshot_get_latest(origin["url"])
+ release = [
+ b for b in snapshot["branches"].values() if b["target_type"] == "release"
+ ][-1]
+ release_data = archive_data.release_get(release["target"])
+ revision_data = archive_data.revision_get(release_data["target"])
+ url = reverse(
+ "browse-origin-directory",
+ query_params={"origin_url": origin["url"], "release": release_data["name"]},
+ )
- resp = client.get(url)
- assert resp.status_code == 200
- assert_contains(resp, release_data["name"])
- assert_contains(resp, release["target"])
- finally:
- config["snapshot_content_max_size"] = snapshot_max_size
+ resp = client.get(url)
+ assert resp.status_code == 200
+ assert_contains(resp, release_data["name"])
+ assert_contains(resp, release["target"])
+
+ swhid_context = {
+ "origin": origin["url"],
+ "visit": get_swh_persistent_id(SNAPSHOT, snapshot["id"]),
+ "anchor": get_swh_persistent_id(RELEASE, release_data["id"]),
+ "path": f"/",
+ }
+
+ swh_dir_id = get_swh_persistent_id(
+ DIRECTORY, revision_data["directory"], metadata=swhid_context
+ )
+ swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
+ assert_contains(resp, swh_dir_id)
+ assert_contains(resp, swh_dir_id_url)
@given(origin_with_releases())
def test_origin_release_browse_not_found(client, origin):
invalid_release_name = "swh-foo-bar"
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": invalid_release_name},
)
resp = client.get(url)
assert resp.status_code == 404
assert re.search(
f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8")
)
@given(new_origin(), unknown_revision())
def test_origin_browse_directory_branch_with_non_resolvable_revision(
client, archive_data, new_origin, unknown_revision
):
branch_name = "master"
snapshot = Snapshot(
branches={
branch_name.encode(): SnapshotBranch(
target=hash_to_bytes(unknown_revision), target_type=TargetType.REVISION,
)
}
)
new_origin = archive_data.origin_add([new_origin])[0]
archive_data.snapshot_add([snapshot])
visit = archive_data.origin_visit_add(
new_origin["url"], datetime.now(tz=timezone.utc), type="git"
)
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snapshot.id
)
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin["url"], "branch": branch_name},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(
resp, f"Revision {unknown_revision } could not be found in the archive."
)
@given(origin())
def test_origin_content_no_path(client, origin):
url = reverse("browse-origin-content", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "The path of a content must be given as query parameter.", status_code=400
)
def test_origin_views_no_url_query_parameter(client):
for browse_context in (
"content",
"directory",
"log",
"branches",
"releases",
"visits",
):
url = reverse(f"browse-origin-{browse_context}")
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "An origin URL must be provided as query parameter.", status_code=400
)
def _origin_content_view_test_helper(
client,
+ archive_data,
origin_info,
- origin_visits,
+ origin_visit,
origin_branches,
origin_releases,
root_dir_sha1,
content,
visit_id=None,
timestamp=None,
):
content_path = "/".join(content["path"].split("/")[1:])
if not visit_id:
- visit_id = origin_visits[-1]["visit"]
+ visit_id = origin_visit["visit"]
query_params = {"origin_url": origin_info["url"], "path": content_path}
if timestamp:
query_params["timestamp"] = timestamp
if visit_id:
query_params["visit_id"] = visit_id
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
assert type(content["data"]) == str
assert_contains(resp, '' % content["hljs_language"])
assert_contains(resp, escape(content["data"]))
split_path = content_path.split("/")
filename = split_path[-1]
path = content_path.replace(filename, "")[:-1]
path_info = gen_path_info(path)
del query_params["path"]
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
root_dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(resp, '%s ' % (root_dir_url, root_dir_sha1[:7]))
for p in path_info:
query_params["path"] = p["path"]
dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '%s ' % (dir_url, p["name"]))
assert_contains(resp, " %s " % filename)
query_string = "sha1_git:" + content["sha1_git"]
url_raw = reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params={"filename": filename},
)
assert_contains(resp, url_raw)
if "path" in query_params:
del query_params["path"]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
'Branches (%s) '
% (escape(origin_branches_url), len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
assert_contains(
resp,
'Releases (%s) '
% (escape(origin_releases_url), len(origin_releases)),
)
assert_contains(resp, '', count=len(origin_branches))
query_params["path"] = content_path
for branch in origin_branches:
- query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
- "browse-origin-content", query_params=query_params
+ "browse-origin-content",
+ query_params={"branch": branch["name"], **query_params},
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
- query_params["release"] = release["name"]
root_dir_release_url = reverse(
- "browse-origin-content", query_params=query_params
+ "browse-origin-content",
+ query_params={"release": release["name"], **query_params},
)
assert_contains(resp, '' % root_dir_release_url)
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
- swh_cnt_id = get_swh_persistent_id("content", content["sha1_git"])
+ snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
+ head_rev_id = archive_data.snapshot_get_head(snapshot)
+
+ swhid_context = {
+ "origin": origin_info["url"],
+ "visit": get_swh_persistent_id(SNAPSHOT, snapshot["id"]),
+ "anchor": get_swh_persistent_id(REVISION, head_rev_id),
+ "path": f"/{content_path}",
+ }
+
+ swh_cnt_id = get_swh_persistent_id(
+ CONTENT, content["sha1_git"], metadata=swhid_context
+ )
swh_cnt_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_view_title(resp, origin_info["url"], "content")
def _origin_directory_view_test_helper(
client,
+ archive_data,
origin_info,
- origin_visits,
+ origin_visit,
origin_branches,
origin_releases,
root_directory_sha1,
directory_entries,
visit_id=None,
timestamp=None,
path=None,
):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
if not visit_id:
- visit_id = origin_visits[-1]["visit"]
+ visit_id = origin_visit["visit"]
query_params = {"origin_url": origin_info["url"]}
if timestamp:
query_params["timestamp"] = timestamp
else:
query_params["visit_id"] = visit_id
if path:
query_params["path"] = path
url = reverse("browse-origin-directory", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert_contains(resp, ' ', count=len(dirs))
assert_contains(resp, ' ', count=len(files))
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
query_params["path"] = dir_path
dir_url = reverse("browse-origin-directory", query_params=query_params,)
assert_contains(resp, dir_url)
for f in files:
file_path = f["name"]
if path:
file_path = "%s/%s" % (path, f["name"])
query_params["path"] = file_path
file_url = reverse("browse-origin-content", query_params=query_params)
assert_contains(resp, file_url)
if "path" in query_params:
del query_params["path"]
root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params)
nb_bc_paths = 1
if path:
nb_bc_paths = len(path.split("/")) + 1
assert_contains(resp, '', count=nb_bc_paths)
assert_contains(
resp, '%s ' % (root_dir_branch_url, root_directory_sha1[:7])
)
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
'Branches (%s) '
% (escape(origin_branches_url), len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp,
'Releases (%s) '
% (escape(origin_releases_url), nb_releases),
)
if path:
query_params["path"] = path
assert_contains(resp, ' ', count=len(origin_branches))
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_release_url)
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
- swh_dir_id = get_swh_persistent_id("directory", directory_entries[0]["dir_id"])
+ snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
+ head_rev_id = archive_data.snapshot_get_head(snapshot)
+
+ swhid_context = {
+ "origin": origin_info["url"],
+ "visit": get_swh_persistent_id(SNAPSHOT, snapshot["id"]),
+ "anchor": get_swh_persistent_id(REVISION, head_rev_id),
+ "path": f"/{path}" if path else "/",
+ }
+
+ swh_dir_id = get_swh_persistent_id(
+ DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
+ )
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_view_title(resp, origin_info["url"], "directory")
def _origin_branches_test_helper(client, origin_info, origin_snapshot):
query_params = {"origin_url": origin_info["url"]}
url = reverse("browse-origin-branches", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
' Branches (%s) ' % (origin_branches_url, len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp, 'Releases (%s) ' % (origin_releases_url, nb_releases)
)
assert_contains(resp, '' % escape(browse_branch_url))
browse_revision_url = reverse(
"browse-revision",
url_args={"sha1_git": branch["revision"]},
query_params={"origin_url": origin_info["url"]},
)
assert_contains(resp, '' % escape(browse_revision_url))
_check_origin_view_title(resp, origin_info["url"], "branches")
def _origin_releases_test_helper(client, origin_info, origin_snapshot):
query_params = {"origin_url": origin_info["url"]}
url = reverse("browse-origin-releases", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/releases.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
' Branches (%s) ' % (origin_branches_url, len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp, 'Releases (%s) ' % (origin_releases_url, nb_releases)
)
assert_contains(resp, ' ' % escape(browse_release_url))
assert_contains(resp, '' % escape(browse_revision_url))
_check_origin_view_title(resp, origin_info["url"], "releases")
@given(
new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release()
)
def test_origin_branches_pagination_with_alias(
client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release
):
"""
When a snapshot contains a branch or a release alias, pagination links
in the branches / releases view should be displayed.
"""
mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2)
snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())}
for i in range(len(revisions)):
branch = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][branch.encode()] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
release = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][b"RELEASE_ALIAS"] = {
"target_type": "alias",
"target": release.encode(),
}
snp_dict["branches"][release.encode()] = {
"target_type": "release",
"target": hash_to_bytes(existing_release),
}
new_origin = archive_data.origin_add([new_origin])[0]
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-branches", query_params={"origin_url": new_origin["url"]}
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
assert_contains(resp, ' "
),
" " * 6,
),
)
diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/tests/browse/views/test_revision.py
index 96045de2..544a1e56 100644
--- a/swh/web/tests/browse/views/test_revision.py
+++ b/swh/web/tests/browse/views/test_revision.py
@@ -1,282 +1,299 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import textwrap
from django.utils.html import escape
from hypothesis import given
+from swh.model.identifiers import DIRECTORY, REVISION, SNAPSHOT
from swh.web.common.identifiers import get_swh_persistent_id
from swh.web.common.utils import reverse, format_utc_iso_date, parse_timestamp
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import origin, revision, unknown_revision, new_origin
@given(revision())
def test_revision_browse(client, archive_data, revision):
url = reverse("browse-revision", url_args={"sha1_git": revision})
revision_data = archive_data.revision_get(revision)
author_name = revision_data["author"]["name"]
committer_name = revision_data["committer"]["name"]
dir_id = revision_data["directory"]
directory_url = reverse("browse-directory", url_args={"sha1_git": dir_id})
history_url = reverse("browse-revision-log", url_args={"sha1_git": revision})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/revision.html")
assert_contains(resp, author_name)
assert_contains(resp, committer_name)
assert_contains(resp, directory_url)
assert_contains(resp, history_url)
for parent in revision_data["parents"]:
parent_url = reverse("browse-revision", url_args={"sha1_git": parent})
assert_contains(resp, '%s ' % (parent_url, parent[:7]))
author_date = revision_data["date"]
committer_date = revision_data["committer_date"]
message_lines = revision_data["message"].split("\n")
assert_contains(resp, format_utc_iso_date(author_date))
assert_contains(resp, format_utc_iso_date(committer_date))
assert_contains(resp, escape(message_lines[0]))
assert_contains(resp, escape("\n".join(message_lines[1:])))
- swh_rev_id = get_swh_persistent_id("revision", revision)
+ swh_rev_id = get_swh_persistent_id(REVISION, revision)
swh_rev_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rev_id})
assert_contains(
resp,
textwrap.indent(
(
f"Browse archived revision\n"
f'\n'
f" {swh_rev_id}\n"
f" "
),
" " * 4,
),
)
+ swhid_context = {"anchor": swh_rev_id, "path": "/"}
+
+ swh_dir_id = get_swh_persistent_id(DIRECTORY, dir_id, metadata=swhid_context)
+ swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
+
+ assert_contains(resp, swh_dir_id)
+ assert_contains(resp, swh_dir_id_url)
+
@given(origin())
def test_revision_origin_browse(client, archive_data, origin):
snapshot = archive_data.snapshot_get_latest(origin["url"])
revision = archive_data.snapshot_get_head(snapshot)
revision_data = archive_data.revision_get(revision)
dir_id = revision_data["directory"]
origin_revision_log_url = reverse(
"browse-origin-log",
query_params={"origin_url": origin["url"], "revision": revision},
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
query_params={"origin_url": origin["url"]},
)
resp = client.get(url)
assert_contains(resp, origin_revision_log_url)
for parent in revision_data["parents"]:
parent_url = reverse(
"browse-revision",
url_args={"sha1_git": parent},
query_params={"origin_url": origin["url"]},
)
assert_contains(resp, '%s ' % (parent_url, parent[:7]))
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
- swh_rev_id = get_swh_persistent_id("revision", revision)
+ swhid_context = {
+ "origin": origin["url"],
+ "visit": get_swh_persistent_id(SNAPSHOT, snapshot["id"]),
+ }
+
+ swh_rev_id = get_swh_persistent_id(REVISION, revision, metadata=swhid_context)
swh_rev_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
- swh_dir_id = get_swh_persistent_id("directory", dir_id)
+ swhid_context["anchor"] = get_swh_persistent_id(REVISION, revision)
+ swhid_context["path"] = "/"
+
+ swh_dir_id = get_swh_persistent_id(DIRECTORY, dir_id, metadata=swhid_context)
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
@given(revision())
def test_revision_log_browse(client, archive_data, revision):
per_page = 10
revision_log = archive_data.revision_log(revision)
revision_log_sorted = sorted(
revision_log,
key=lambda rev: -parse_timestamp(rev["committer_date"]).timestamp(),
)
url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"per_page": per_page},
)
resp = client.get(url)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
nb_log_entries = per_page
if len(revision_log_sorted) < per_page:
nb_log_entries = len(revision_log_sorted)
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, ' Newer')
if len(revision_log_sorted) > per_page:
assert_contains(
resp, 'Older ' % escape(next_page_url),
)
for log in revision_log_sorted[:per_page]:
revision_url = reverse("browse-revision", url_args={"sha1_git": log["id"]})
assert_contains(resp, log["id"][:7])
assert_contains(resp, log["author"]["name"])
assert_contains(resp, format_utc_iso_date(log["date"]))
assert_contains(resp, escape(log["message"]))
assert_contains(resp, format_utc_iso_date(log["committer_date"]))
assert_contains(resp, revision_url)
if len(revision_log_sorted) <= per_page:
return
resp = client.get(next_page_url)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 2 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 2 * per_page:
assert_contains(
resp, 'Older ' % escape(next_page_url),
)
if len(revision_log_sorted) <= 2 * per_page:
return
resp = client.get(next_page_url)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 3 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - 2 * per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 3 * per_page:
assert_contains(
resp, 'Older ' % escape(next_page_url),
)
- swh_rev_id = get_swh_persistent_id("revision", revision)
+ swh_rev_id = get_swh_persistent_id(REVISION, revision)
swh_rev_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rev_id})
assert_contains(
resp,
textwrap.indent(
(
f"Browse archived revisions history\n"
f'\n'
f" {swh_rev_id}\n"
f" "
),
" " * 4,
),
)
@given(revision(), unknown_revision(), new_origin())
def test_revision_request_errors(client, revision, unknown_revision, new_origin):
url = reverse("browse-revision", url_args={"sha1_git": unknown_revision})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, "Revision with sha1_git %s not found" % unknown_revision, status_code=404
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
query_params={"origin_url": new_origin.url},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, "the origin mentioned in your request" " appears broken", status_code=404
)
@given(revision())
def test_revision_uppercase(client, revision):
url = reverse(
"browse-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()}
)
resp = client.get(url)
assert resp.status_code == 302
redirect_url = reverse("browse-revision", url_args={"sha1_git": revision})
assert resp["location"] == redirect_url