', count=len(releases))
def _directory_view_checks(
client,
root_directory_sha1,
directory_entries,
path=None,
origin_url=None,
snapshot_id=None,
):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
url_args = {"sha1_git": root_directory_sha1}
query_params = {"path": path, "origin_url": origin_url, "snapshot": snapshot_id}
url = reverse("browse-directory", url_args=url_args, query_params=query_params)
root_dir_url = reverse(
"browse-directory", url_args={"sha1_git": root_directory_sha1}
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert_contains(
resp, '' + root_directory_sha1[:7] + ""
)
assert_contains(resp, '', count=len(dirs))
assert_contains(resp, ' | ', count=len(files))
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_directory_sha1},
query_params={"path": dir_path},
)
assert_contains(resp, dir_url)
for f in files:
file_path = "%s/%s" % (root_directory_sha1, f["name"])
if path:
file_path = "%s/%s/%s" % (root_directory_sha1, path, f["name"])
query_string = "sha1_git:" + f["target"]
file_url = reverse(
"browse-content",
url_args={"query_string": query_string},
query_params={"path": file_path},
)
assert_contains(resp, file_url)
path_info = gen_path_info(path)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(
resp, '%s' % (root_dir_url, root_directory_sha1[:7])
)
for p in path_info:
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_directory_sha1},
query_params={"path": p["path"]},
)
assert_contains(resp, '%s' % (dir_url, p["name"]))
assert_contains(resp, "vault-cook-directory")
- swh_dir_id = get_swh_persistent_id(DIRECTORY, directory_entries[0]["dir_id"])
- swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
+ swh_dir_id = gen_swhid(DIRECTORY, directory_entries[0]["dir_id"])
+ swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
swhid_context = {}
if root_directory_sha1 != directory_entries[0]["dir_id"]:
- swhid_context["anchor"] = get_swh_persistent_id(DIRECTORY, root_directory_sha1)
+ swhid_context["anchor"] = gen_swhid(DIRECTORY, root_directory_sha1)
swhid_context["path"] = f"/{path}/" if path else "/"
if root_directory_sha1 != directory_entries[0]["dir_id"]:
- swhid_context["anchor"] = get_swh_persistent_id(DIRECTORY, root_directory_sha1)
+ swhid_context["anchor"] = gen_swhid(DIRECTORY, root_directory_sha1)
- swh_dir_id = get_swh_persistent_id(
+ swh_dir_id = gen_swhid(
DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
)
- swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
+ swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
diff --git a/swh/web/tests/browse/views/test_identifiers.py b/swh/web/tests/browse/views/test_identifiers.py
index f1fb9d49..7ecd5e4e 100644
--- a/swh/web/tests/browse/views/test_identifiers.py
+++ b/swh/web/tests/browse/views/test_identifiers.py
@@ -1,206 +1,200 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
from hypothesis import given
from swh.model.identifiers import CONTENT, REVISION, SNAPSHOT
-from swh.web.common.identifiers import get_swh_persistent_id
+from swh.web.common.identifiers import gen_swhid
from swh.web.common.utils import reverse
from swh.web.tests.django_asserts import assert_contains
from swh.web.tests.strategies import (
content,
directory,
origin,
revision,
release,
snapshot,
)
-swh_id_prefix = "swh:1:"
+swhid_prefix = "swh:1:"
@given(content())
def test_content_id_browse(client, content):
cnt_sha1_git = content["sha1_git"]
- swh_id = swh_id_prefix + "cnt:" + cnt_sha1_git
- url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
+ swhid = swhid_prefix + "cnt:" + cnt_sha1_git
+ url = reverse("browse-swhid", url_args={"swhid": swhid})
query_string = "sha1_git:" + cnt_sha1_git
content_browse_url = reverse(
"browse-content", url_args={"query_string": query_string}
)
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == content_browse_url
@given(directory())
def test_directory_id_browse(client, directory):
- swh_id = swh_id_prefix + "dir:" + directory
- url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
+ swhid = swhid_prefix + "dir:" + directory
+ url = reverse("browse-swhid", url_args={"swhid": swhid})
directory_browse_url = reverse("browse-directory", url_args={"sha1_git": directory})
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == directory_browse_url
@given(revision())
def test_revision_id_browse(client, revision):
- swh_id = swh_id_prefix + "rev:" + revision
- url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
+ swhid = swhid_prefix + "rev:" + revision
+ url = reverse("browse-swhid", url_args={"swhid": swhid})
revision_browse_url = reverse("browse-revision", url_args={"sha1_git": revision})
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == revision_browse_url
query_params = {"origin_url": "https://github.com/user/repo"}
- url = reverse(
- "browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params
- )
+ url = reverse("browse-swhid", url_args={"swhid": swhid}, query_params=query_params)
revision_browse_url = reverse(
"browse-revision", url_args={"sha1_git": revision}, query_params=query_params
)
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == revision_browse_url
@given(release())
def test_release_id_browse(client, release):
- swh_id = swh_id_prefix + "rel:" + release
- url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
+ swhid = swhid_prefix + "rel:" + release
+ url = reverse("browse-swhid", url_args={"swhid": swhid})
release_browse_url = reverse("browse-release", url_args={"sha1_git": release})
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == release_browse_url
query_params = {"origin_url": "https://github.com/user/repo"}
- url = reverse(
- "browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params
- )
+ url = reverse("browse-swhid", url_args={"swhid": swhid}, query_params=query_params)
release_browse_url = reverse(
"browse-release", url_args={"sha1_git": release}, query_params=query_params
)
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == release_browse_url
@given(snapshot())
def test_snapshot_id_browse(client, snapshot):
- swh_id = swh_id_prefix + "snp:" + snapshot
- url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
+ swhid = swhid_prefix + "snp:" + snapshot
+ url = reverse("browse-swhid", url_args={"swhid": swhid})
snapshot_browse_url = reverse("browse-snapshot", url_args={"snapshot_id": snapshot})
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == snapshot_browse_url
query_params = {"origin_url": "https://github.com/user/repo"}
- url = reverse(
- "browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params
- )
+ url = reverse("browse-swhid", url_args={"swhid": swhid}, query_params=query_params)
release_browse_url = reverse(
"browse-snapshot", url_args={"snapshot_id": snapshot}, query_params=query_params
)
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == release_browse_url
@given(release())
def test_bad_id_browse(client, release):
- swh_id = swh_id_prefix + "foo:" + release
- url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
+ swhid = swhid_prefix + "foo:" + release
+ url = reverse("browse-swhid", url_args={"swhid": swhid})
resp = client.get(url)
assert resp.status_code == 400
@given(content())
def test_content_id_optional_parts_browse(client, content):
cnt_sha1_git = content["sha1_git"]
optional_parts = ";lines=4-20;origin=https://github.com/user/repo"
- swh_id = swh_id_prefix + "cnt:" + cnt_sha1_git + optional_parts
- url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
+ swhid = swhid_prefix + "cnt:" + cnt_sha1_git + optional_parts
+ url = reverse("browse-swhid", url_args={"swhid": swhid})
query_string = "sha1_git:" + cnt_sha1_git
content_browse_url = reverse(
"browse-content",
url_args={"query_string": query_string},
query_params={"origin_url": "https://github.com/user/repo"},
)
content_browse_url += "#L4-L20"
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == content_browse_url
@given(release())
def test_origin_id_not_resolvable(client, release):
- swh_id = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb"
- url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
+ swhid = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb"
+ url = reverse("browse-swhid", url_args={"swhid": swhid})
resp = client.get(url)
assert resp.status_code == 400
@given(origin())
def test_legacy_swhid_browse(archive_data, client, origin):
snapshot = archive_data.snapshot_get_latest(origin["url"])
revision = archive_data.snapshot_get_head(snapshot)
directory = archive_data.revision_get(revision)["directory"]
directory_content = archive_data.directory_ls(directory)
directory_file = random.choice(
[e for e in directory_content if e["type"] == "file"]
)
- legacy_swhid = get_swh_persistent_id(
+ legacy_swhid = gen_swhid(
CONTENT,
directory_file["checksums"]["sha1_git"],
metadata={"origin": origin["url"]},
)
- url = reverse("browse-swh-id", url_args={"swh_id": legacy_swhid})
+ url = reverse("browse-swhid", url_args={"swhid": legacy_swhid})
resp = client.get(url)
assert resp.status_code == 302
resp = client.get(resp["location"])
- swhid = get_swh_persistent_id(
+ swhid = gen_swhid(
CONTENT,
directory_file["checksums"]["sha1_git"],
metadata={
"origin": origin["url"],
- "visit": get_swh_persistent_id(SNAPSHOT, snapshot),
- "anchor": get_swh_persistent_id(REVISION, revision),
+ "visit": gen_swhid(SNAPSHOT, snapshot),
+ "anchor": gen_swhid(REVISION, revision),
},
)
assert_contains(resp, swhid)
diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py
index 4dc91596..dce5954b 100644
--- a/swh/web/tests/browse/views/test_origin.py
+++ b/swh/web/tests/browse/views/test_origin.py
@@ -1,1315 +1,1313 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
import re
import string
from django.utils.html import escape
from hypothesis import given
from swh.storage.utils import now
from swh.model.hashutil import hash_to_bytes
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
from swh.model.model import (
Snapshot,
SnapshotBranch,
TargetType,
OriginVisit,
OriginVisitStatus,
)
from swh.web.browse.snapshot_context import process_snapshot_branches
from swh.web.common.exc import NotFoundExc
-from swh.web.common.identifiers import get_swh_persistent_id
+from swh.web.common.identifiers import gen_swhid
from swh.web.common.utils import (
reverse,
gen_path_info,
format_utc_iso_date,
parse_timestamp,
)
from swh.web.tests.data import get_content, random_sha1
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import (
origin,
origin_with_multiple_visits,
new_origin,
new_snapshot,
visit_dates,
revisions,
origin_with_releases,
release as existing_release,
unknown_revision,
)
@given(origin_with_multiple_visits())
def test_origin_visits_browse(client, archive_data, origin):
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
visits = archive_data.origin_visit_get(origin["url"])
for v in visits:
vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ")
browse_dir_url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "timestamp": vdate},
)
assert_contains(resp, browse_dir_url)
_check_origin_link(resp, origin["url"])
@given(origin_with_multiple_visits())
def test_origin_content_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
def _get_archive_data(visit_idx):
snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
dir_content = archive_data.directory_ls(head_rev["directory"])
dir_files = [e for e in dir_content if e["type"] == "file"]
dir_file = random.choice(dir_files)
branches, releases = process_snapshot_branches(snapshot)
return {
"branches": branches,
"releases": releases,
"root_dir_sha1": head_rev["directory"],
"content": get_content(dir_file["checksums"]["sha1"]),
"visit": origin_visits[visit_idx],
}
tdata = _get_archive_data(-1)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=tdata["visit"]["date"],
)
visit_unix_ts = parse_timestamp(tdata["visit"]["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=visit_unix_ts,
)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[-1],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
snapshot_id=tdata["visit"]["snapshot"],
)
tdata = _get_archive_data(0)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[0],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
visit_id=tdata["visit"]["visit"],
)
_origin_content_view_test_helper(
client,
archive_data,
origin,
origin_visits[0],
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
snapshot_id=tdata["visit"]["snapshot"],
)
@given(origin())
def test_origin_root_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
dir_content = archive_data.directory_ls(root_dir_sha1)
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
snapshot_id=visit["snapshot"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
dir_content,
snapshot_id=visit["snapshot"],
)
@given(origin())
def test_origin_sub_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
subdirs = [
e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir"
]
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
if len(subdirs) == 0:
return
subdir = random.choice(subdirs)
subdir_content = archive_data.directory_ls(subdir["target"])
subdir_path = subdir["name"]
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
snapshot_id=visit["snapshot"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
_origin_directory_view_test_helper(
client,
archive_data,
origin,
visit,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
snapshot_id=visit["snapshot"],
)
@given(origin())
def test_origin_branches(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_branches_test_helper(client, origin, snapshot_content)
_origin_branches_test_helper(
client, origin, snapshot_content, snapshot_id=visit["snapshot"]
)
@given(origin())
def test_origin_releases(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_releases_test_helper(client, origin, snapshot_content)
_origin_releases_test_helper(
client, origin, snapshot_content, snapshot_id=visit["snapshot"]
)
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=3, max_size=3),
)
def test_origin_snapshot_null_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
archive_data.origin_add([new_origin])
for i, branch in enumerate(snp_dict["branches"].keys()):
if i == 0:
snp_dict["branches"][branch] = None
else:
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i - 1]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="partial",
snapshot=snp_dict["id"],
)
archive_data.origin_visit_status_add([visit_status])
url = reverse(
"browse-origin-directory", query_params={"origin_url": new_origin.url}
)
rv = client.get(url)
assert rv.status_code == 200
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=4, max_size=4),
)
def test_origin_snapshot_invalid_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
archive_data.origin_add([new_origin])
for i, branch in enumerate(snp_dict["branches"].keys()):
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="full",
snapshot=snp_dict["id"],
)
archive_data.origin_visit_status_add([visit_status])
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin.url, "branch": "invalid_branch"},
)
rv = client.get(url)
assert rv.status_code == 404
@given(new_origin())
def test_browse_visits_origin_not_found(client, new_origin):
url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, f"Origin with url {new_origin.url} not found", status_code=404
)
@given(origin())
def test_browse_origin_directory_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_not_found(client, origin):
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "path": "/invalid/dir/path/"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_origin_content_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_directory_empty_snapshot(client, mocker, origin):
mock_snapshot_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_snapshot_service.lookup_origin.return_value = origin
mock_snapshot_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
for browse_context in ("content", "directory"):
url = reverse(
f"browse-origin-{browse_context}",
query_params={"origin_url": origin["url"], "path": "baz"},
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, f"browse/{browse_context}.html")
assert re.search("snapshot.*is empty", resp.content.decode("utf-8"))
assert mock_get_origin_visit_snapshot.called
assert mock_snapshot_service.lookup_origin.called
assert mock_snapshot_service.lookup_snapshot_sizes.called
@given(origin())
def test_browse_origin_content_not_found(client, origin):
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "/invalid/file/path"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory entry.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_directory_snapshot_not_found(client, mocker, origin):
mock_get_snapshot_context = mocker.patch(
"swh.web.browse.snapshot_context.get_snapshot_context"
)
mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found")
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "Snapshot not found", status_code=404)
assert mock_get_snapshot_context.called
@given(origin())
def test_origin_empty_snapshot(client, mocker, origin):
mock_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
mock_service.lookup_origin.return_value = origin
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
resp_content = resp.content.decode("utf-8")
assert re.search("snapshot.*is empty", resp_content)
assert not re.search("swh-tr-link", resp_content)
assert mock_get_origin_visit_snapshot.called
assert mock_service.lookup_snapshot_sizes.called
@given(new_origin())
def test_origin_empty_snapshot_null_revision(client, archive_data, new_origin):
snapshot = Snapshot(
branches={
b"HEAD": SnapshotBranch(
target="refs/head/master".encode(), target_type=TargetType.ALIAS,
),
b"refs/head/master": None,
}
)
archive_data.origin_add([new_origin])
archive_data.snapshot_add([snapshot])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=now(), type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="partial",
snapshot=snapshot.id,
)
archive_data.origin_visit_status_add([visit_status])
url = reverse(
"browse-origin-directory", query_params={"origin_url": new_origin.url},
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
resp_content = resp.content.decode("utf-8")
assert re.search("snapshot.*is empty", resp_content)
assert not re.search("swh-tr-link", resp_content)
@given(origin_with_releases())
def test_origin_release_browse(client, archive_data, origin):
snapshot = archive_data.snapshot_get_latest(origin["url"])
release = [
b for b in snapshot["branches"].values() if b["target_type"] == "release"
][-1]
release_data = archive_data.release_get(release["target"])
revision_data = archive_data.revision_get(release_data["target"])
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": release_data["name"]},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(resp, release_data["name"])
assert_contains(resp, release["target"])
swhid_context = {
"origin": origin["url"],
- "visit": get_swh_persistent_id(SNAPSHOT, snapshot["id"]),
- "anchor": get_swh_persistent_id(RELEASE, release_data["id"]),
+ "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(RELEASE, release_data["id"]),
"path": "/",
}
- swh_dir_id = get_swh_persistent_id(
+ swh_dir_id = gen_swhid(
DIRECTORY, revision_data["directory"], metadata=swhid_context
)
- swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
+ swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
@given(origin_with_releases())
def test_origin_release_browse_not_found(client, origin):
invalid_release_name = "swh-foo-bar"
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": invalid_release_name},
)
resp = client.get(url)
assert resp.status_code == 404
assert re.search(
f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8")
)
@given(new_origin(), unknown_revision())
def test_origin_browse_directory_branch_with_non_resolvable_revision(
client, archive_data, new_origin, unknown_revision
):
branch_name = "master"
snapshot = Snapshot(
branches={
branch_name.encode(): SnapshotBranch(
target=hash_to_bytes(unknown_revision), target_type=TargetType.REVISION,
)
}
)
archive_data.origin_add([new_origin])
archive_data.snapshot_add([snapshot])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=now(), type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="partial",
snapshot=snapshot.id,
)
archive_data.origin_visit_status_add([visit_status])
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin.url, "branch": branch_name},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(
resp, f"Revision {unknown_revision } could not be found in the archive."
)
@given(origin())
def test_origin_content_no_path(client, origin):
url = reverse("browse-origin-content", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "The path of a content must be given as query parameter.", status_code=400
)
def test_origin_views_no_url_query_parameter(client):
for browse_context in (
"content",
"directory",
"log",
"branches",
"releases",
"visits",
):
url = reverse(f"browse-origin-{browse_context}")
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "An origin URL must be provided as query parameter.", status_code=400
)
def _origin_content_view_test_helper(
client,
archive_data,
origin_info,
origin_visit,
origin_branches,
origin_releases,
root_dir_sha1,
content,
visit_id=None,
timestamp=None,
snapshot_id=None,
):
content_path = "/".join(content["path"].split("/")[1:])
if not visit_id and not snapshot_id:
visit_id = origin_visit["visit"]
query_params = {"origin_url": origin_info["url"], "path": content_path}
if timestamp:
query_params["timestamp"] = timestamp
if visit_id:
query_params["visit_id"] = visit_id
elif snapshot_id:
query_params["snapshot"] = snapshot_id
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
assert type(content["data"]) == str
assert_contains(resp, '' % content["hljs_language"])
assert_contains(resp, escape(content["data"]))
split_path = content_path.split("/")
filename = split_path[-1]
path = content_path.replace(filename, "")[:-1]
path_info = gen_path_info(path)
del query_params["path"]
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
root_dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7]))
for p in path_info:
query_params["path"] = p["path"]
dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '%s' % (dir_url, p["name"]))
assert_contains(resp, "%s" % filename)
query_string = "sha1_git:" + content["sha1_git"]
url_raw = reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params={"filename": filename},
)
assert_contains(resp, url_raw)
if "path" in query_params:
del query_params["path"]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({len(origin_branches)})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_releases_url)}">')
assert_contains(resp, f"Releases ({len(origin_releases)})")
assert_contains(resp, '', count=len(origin_branches))
query_params["path"] = content_path
for branch in origin_branches:
root_dir_branch_url = reverse(
"browse-origin-content",
query_params={"branch": branch["name"], **query_params},
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
root_dir_release_url = reverse(
"browse-origin-content",
query_params={"release": release["name"], **query_params},
)
assert_contains(resp, '' % root_dir_release_url)
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
swhid_context = {
"origin": origin_info["url"],
- "visit": get_swh_persistent_id(SNAPSHOT, snapshot["id"]),
- "anchor": get_swh_persistent_id(REVISION, head_rev_id),
+ "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(REVISION, head_rev_id),
"path": f"/{content_path}",
}
- swh_cnt_id = get_swh_persistent_id(
- CONTENT, content["sha1_git"], metadata=swhid_context
- )
- swh_cnt_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_cnt_id})
+ swh_cnt_id = gen_swhid(CONTENT, content["sha1_git"], metadata=swhid_context)
+ swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_link(resp, origin_info["url"])
def _origin_directory_view_test_helper(
client,
archive_data,
origin_info,
origin_visit,
origin_branches,
origin_releases,
root_directory_sha1,
directory_entries,
visit_id=None,
timestamp=None,
snapshot_id=None,
path=None,
):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
if not visit_id and not snapshot_id:
visit_id = origin_visit["visit"]
query_params = {"origin_url": origin_info["url"]}
if timestamp:
query_params["timestamp"] = timestamp
elif visit_id:
query_params["visit_id"] = visit_id
else:
query_params["snapshot"] = snapshot_id
if path:
query_params["path"] = path
url = reverse("browse-origin-directory", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert_contains(resp, '', count=len(dirs))
assert_contains(resp, ' | ', count=len(files))
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
query_params["path"] = dir_path
dir_url = reverse("browse-origin-directory", query_params=query_params,)
assert_contains(resp, dir_url)
for f in files:
file_path = f["name"]
if path:
file_path = "%s/%s" % (path, f["name"])
query_params["path"] = file_path
file_url = reverse("browse-origin-content", query_params=query_params)
assert_contains(resp, file_url)
if "path" in query_params:
del query_params["path"]
root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params)
nb_bc_paths = 1
if path:
nb_bc_paths = len(path.split("/")) + 1
assert_contains(resp, '', count=nb_bc_paths)
assert_contains(
resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])
)
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({len(origin_branches)})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}"')
assert_contains(resp, f"Releases ({nb_releases})")
if path:
query_params["path"] = path
assert_contains(resp, '', count=len(origin_branches))
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, 'href="%s"' % root_dir_release_url)
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
swhid_context = {
"origin": origin_info["url"],
- "visit": get_swh_persistent_id(SNAPSHOT, snapshot["id"]),
- "anchor": get_swh_persistent_id(REVISION, head_rev_id),
+ "visit": gen_swhid(SNAPSHOT, snapshot["id"]),
+ "anchor": gen_swhid(REVISION, head_rev_id),
"path": f"/{path}" if path else "/",
}
- swh_dir_id = get_swh_persistent_id(
+ swh_dir_id = gen_swhid(
DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
)
- swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
+ swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_link(resp, origin_info["url"])
def _origin_branches_test_helper(
client, origin_info, origin_snapshot, snapshot_id=None
):
query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id}
url = reverse("browse-origin-branches", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({len(origin_branches)})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}">')
assert_contains(resp, f"Releases ({nb_releases})")
assert_contains(resp, '' % escape(browse_branch_url))
browse_revision_url = reverse(
"browse-revision",
url_args={"sha1_git": branch["revision"]},
query_params=query_params,
)
assert_contains(resp, '' % escape(browse_revision_url))
_check_origin_link(resp, origin_info["url"])
def _origin_releases_test_helper(
client, origin_info, origin_snapshot, snapshot_id=None
):
query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id}
url = reverse("browse-origin-releases", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/releases.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({len(origin_branches)})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}"')
assert_contains(resp, f"Releases ({nb_releases})")
assert_contains(resp, '' % escape(browse_release_url))
assert_contains(resp, '' % escape(browse_revision_url))
_check_origin_link(resp, origin_info["url"])
@given(
new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release()
)
def test_origin_branches_pagination_with_alias(
client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release
):
"""
When a snapshot contains a branch or a release alias, pagination links
in the branches / releases view should be displayed.
"""
mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2)
snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())}
for i in range(len(revisions)):
branch = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][branch.encode()] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
release = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][b"RELEASE_ALIAS"] = {
"target_type": "alias",
"target": release.encode(),
}
snp_dict["branches"][release.encode()] = {
"target_type": "release",
"target": hash_to_bytes(existing_release),
}
archive_data.origin_add([new_origin])
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="full",
snapshot=snp_dict["id"],
)
archive_data.origin_visit_status_add([visit_status])
url = reverse("browse-origin-branches", query_params={"origin_url": new_origin.url})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
assert_contains(resp, ' | |