.+)/",
view_name="browse-revision",
checksum_args=["sha1_git"],
)
def revision_browse(request, sha1_git, extra_path=None):
"""
Django view that produces an HTML display of a revision
identified by its id.
The url that points to it is :http:get:`/browse/revision/(sha1_git)/`.
"""
try:
revision = service.lookup_revision(sha1_git)
origin_info = None
snapshot_context = None
origin_url = request.GET.get("origin_url", None)
if not origin_url:
origin_url = request.GET.get("origin", None)
timestamp = request.GET.get("timestamp", None)
visit_id = request.GET.get("visit_id", None)
snapshot_id = request.GET.get("snapshot_id", None)
path = request.GET.get("path", None)
dir_id = None
dirs, files = None, None
content_data = None
if origin_url:
try:
snapshot_context = get_snapshot_context(
origin_url=origin_url, timestamp=timestamp, visit_id=visit_id
)
except NotFoundExc:
raw_rev_url = reverse(
"browse-revision", url_args={"sha1_git": sha1_git}
)
error_message = (
"The Software Heritage archive has a revision "
"with the hash you provided but the origin "
"mentioned in your request appears broken: %s. "
"Please check the URL and try again.\n\n"
"Nevertheless, you can still browse the revision "
"without origin information: %s"
% (gen_link(origin_url), gen_link(raw_rev_url))
)
raise NotFoundExc(error_message)
origin_info = snapshot_context["origin_info"]
snapshot_id = snapshot_context["snapshot_id"]
elif snapshot_id:
snapshot_context = get_snapshot_context(snapshot_id)
if path:
file_info = service.lookup_directory_with_path(revision["directory"], path)
if file_info["type"] == "dir":
dir_id = file_info["target"]
else:
query_string = "sha1_git:" + file_info["target"]
content_data = request_content(query_string, raise_if_unavailable=False)
else:
dir_id = revision["directory"]
if dir_id:
path = "" if path is None else (path + "/")
dirs, files = get_directory_entries(dir_id)
except Exception as exc:
return handle_view_exception(request, exc)
revision_data = {}
revision_data["author"] = "None"
if revision["author"]:
author_link = gen_person_mail_link(revision["author"])
revision_data["author"] = author_link
revision_data["committer"] = "None"
if revision["committer"]:
committer_link = gen_person_mail_link(revision["committer"])
revision_data["committer"] = committer_link
revision_data["committer date"] = format_utc_iso_date(revision["committer_date"])
revision_data["date"] = format_utc_iso_date(revision["date"])
revision_data["directory"] = revision["directory"]
if snapshot_context:
revision_data["snapshot"] = snapshot_id
browse_snapshot_link = gen_snapshot_link(snapshot_id)
revision_data["context-independent snapshot"] = browse_snapshot_link
revision_data["context-independent directory"] = gen_directory_link(
revision["directory"]
)
revision_data["revision"] = sha1_git
revision_data["merge"] = revision["merge"]
revision_data["metadata"] = escape(
json.dumps(
revision["metadata"], sort_keys=True, indent=4, separators=(",", ": ")
)
)
if origin_info:
revision_data["origin url"] = gen_link(origin_info["url"], origin_info["url"])
revision_data["context-independent revision"] = gen_revision_link(sha1_git)
parents = ""
for p in revision["parents"]:
parent_link = gen_revision_link(
p, link_text=None, link_attrs=None, snapshot_context=snapshot_context
)
parents += parent_link + "
"
revision_data["parents"] = mark_safe(parents)
revision_data["synthetic"] = revision["synthetic"]
revision_data["type"] = revision["type"]
message_lines = ["None"]
if revision["message"]:
message_lines = revision["message"].split("\n")
parents = []
for p in revision["parents"]:
parent_url = gen_revision_url(p, snapshot_context)
parents.append({"id": p, "url": parent_url})
path_info = gen_path_info(path)
query_params = {
"snapshot_id": snapshot_id,
- "origin": origin_url,
+ "origin_url": origin_url,
"timestamp": timestamp,
"visit_id": visit_id,
}
breadcrumbs = []
breadcrumbs.append(
{
"name": revision["directory"][:7],
"url": reverse(
"browse-revision",
url_args={"sha1_git": sha1_git},
query_params=query_params,
),
}
)
for pi in path_info:
query_params["path"] = pi["path"]
breadcrumbs.append(
{
"name": pi["name"],
"url": reverse(
"browse-revision",
url_args={"sha1_git": sha1_git},
query_params=query_params,
),
}
)
vault_cooking = {
"directory_context": False,
"directory_id": None,
"revision_context": True,
"revision_id": sha1_git,
}
swh_objects = [{"type": "revision", "id": sha1_git}]
content = None
content_size = None
mimetype = None
language = None
readme_name = None
readme_url = None
readme_html = None
readmes = {}
error_code = 200
error_message = ""
error_description = ""
if content_data:
breadcrumbs[-1]["url"] = None
content_size = content_data["length"]
mimetype = content_data["mimetype"]
if content_data["raw_data"]:
content_display_data = prepare_content_for_display(
content_data["raw_data"], content_data["mimetype"], path
)
content = content_display_data["content_data"]
language = content_display_data["language"]
mimetype = content_display_data["mimetype"]
query_params = {}
if path:
filename = path_info[-1]["name"]
query_params["filename"] = path_info[-1]["name"]
revision_data["filename"] = filename
top_right_link = {
"url": reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params=query_params,
),
"icon": swh_object_icons["content"],
"text": "Raw File",
}
swh_objects.append({"type": "content", "id": file_info["target"]})
error_code = content_data["error_code"]
error_message = content_data["error_message"]
error_description = content_data["error_description"]
else:
for d in dirs:
if d["type"] == "rev":
d["url"] = reverse(
"browse-revision", url_args={"sha1_git": d["target"]}
)
else:
query_params["path"] = path + d["name"]
d["url"] = reverse(
"browse-revision",
url_args={"sha1_git": sha1_git},
query_params=query_params,
)
for f in files:
query_params["path"] = path + f["name"]
f["url"] = reverse(
"browse-revision",
url_args={"sha1_git": sha1_git},
query_params=query_params,
)
if f["length"] is not None:
f["length"] = filesizeformat(f["length"])
if f["name"].lower().startswith("readme"):
readmes[f["name"]] = f["checksums"]["sha1"]
readme_name, readme_url, readme_html = get_readme_to_display(readmes)
top_right_link = {
"url": get_revision_log_url(sha1_git, snapshot_context),
"icon": swh_object_icons["revisions history"],
"text": "History",
}
vault_cooking["directory_context"] = True
vault_cooking["directory_id"] = dir_id
swh_objects.append({"type": "directory", "id": dir_id})
diff_revision_url = reverse(
"diff-revision",
url_args={"sha1_git": sha1_git},
query_params={
- "origin": origin_url,
+ "origin_url": origin_url,
"timestamp": timestamp,
"visit_id": visit_id,
},
)
if snapshot_id:
swh_objects.append({"type": "snapshot", "id": snapshot_id})
swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context)
heading = "Revision - %s - %s" % (
sha1_git[:7],
textwrap.shorten(message_lines[0], width=70),
)
if snapshot_context:
context_found = "snapshot: %s" % snapshot_context["snapshot_id"]
if origin_info:
context_found = "origin: %s" % origin_info["url"]
heading += " - %s" % context_found
return render(
request,
"browse/revision.html",
{
"heading": heading,
"swh_object_id": swh_ids[0]["swh_id"],
"swh_object_name": "Revision",
"swh_object_metadata": revision_data,
"message_header": message_lines[0],
"message_body": "\n".join(message_lines[1:]),
"parents": parents,
"snapshot_context": snapshot_context,
"dirs": dirs,
"files": files,
"content": content,
"content_size": content_size,
"max_content_size": content_display_max_size,
"mimetype": mimetype,
"language": language,
"readme_name": readme_name,
"readme_url": readme_url,
"readme_html": readme_html,
"breadcrumbs": breadcrumbs,
"top_right_link": top_right_link,
"vault_cooking": vault_cooking,
"diff_revision_url": diff_revision_url,
"show_actions_menu": True,
"swh_ids": swh_ids,
"error_code": error_code,
"error_message": error_message,
"error_description": error_description,
},
status=error_code,
)
diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py
index c0df94da..4db225e6 100644
--- a/swh/web/common/identifiers.py
+++ b/swh/web/common/identifiers.py
@@ -1,197 +1,197 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Dict, Iterable, List, Optional
from typing_extensions import TypedDict
from django.http import QueryDict
from swh.model.exceptions import ValidationError
from swh.model.hashutil import hash_to_bytes
from swh.model.identifiers import (
persistent_identifier,
parse_persistent_identifier,
CONTENT,
DIRECTORY,
ORIGIN,
RELEASE,
REVISION,
SNAPSHOT,
PersistentId,
)
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import QueryParameters
from swh.web.common.utils import reverse
def get_swh_persistent_id(
object_type: str, object_id: str, scheme_version: int = 1
) -> str:
"""
Returns the persistent identifier for a swh object based on:
* the object type
* the object id
* the swh identifiers scheme version
Args:
object_type: the swh object type
(content/directory/release/revision/snapshot)
object_id: the swh object id (hexadecimal representation
of its hash value)
scheme_version: the scheme version of the swh
persistent identifiers
Returns:
the swh object persistent identifier
Raises:
BadInputExc: if the provided parameters do not enable to
generate a valid identifier
"""
try:
swh_id = persistent_identifier(object_type, object_id, scheme_version)
except ValidationError as e:
raise BadInputExc(
"Invalid object (%s) for swh persistent id. %s" % (object_id, e)
)
else:
return swh_id
ResolvedPersistentId = TypedDict(
"ResolvedPersistentId", {"swh_id_parsed": PersistentId, "browse_url": Optional[str]}
)
def resolve_swh_persistent_id(
swh_id: str, query_params: Optional[QueryParameters] = None
) -> ResolvedPersistentId:
"""
Try to resolve a Software Heritage persistent id into an url for
browsing the targeted object.
Args:
swh_id: a Software Heritage persistent identifier
query_params: optional dict filled with
query parameters to append to the browse url
Returns:
a dict with the following keys:
* **swh_id_parsed**: the parsed identifier
* **browse_url**: the url for browsing the targeted object
"""
swh_id_parsed = get_persistent_identifier(swh_id)
object_type = swh_id_parsed.object_type
object_id = swh_id_parsed.object_id
browse_url = None
query_dict = QueryDict("", mutable=True)
if query_params and len(query_params) > 0:
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
if "origin" in swh_id_parsed.metadata:
- query_dict["origin"] = swh_id_parsed.metadata["origin"]
+ query_dict["origin_url"] = swh_id_parsed.metadata["origin"]
if object_type == CONTENT:
query_string = "sha1_git:" + object_id
fragment = ""
if "lines" in swh_id_parsed.metadata:
lines = swh_id_parsed.metadata["lines"].split("-")
fragment += "#L" + lines[0]
if len(lines) > 1:
fragment += "-L" + lines[1]
browse_url = (
reverse(
"browse-content",
url_args={"query_string": query_string},
query_params=query_dict,
)
+ fragment
)
elif object_type == DIRECTORY:
browse_url = reverse(
"browse-directory",
url_args={"sha1_git": object_id},
query_params=query_dict,
)
elif object_type == RELEASE:
browse_url = reverse(
"browse-release", url_args={"sha1_git": object_id}, query_params=query_dict
)
elif object_type == REVISION:
browse_url = reverse(
"browse-revision", url_args={"sha1_git": object_id}, query_params=query_dict
)
elif object_type == SNAPSHOT:
browse_url = reverse(
"browse-snapshot",
url_args={"snapshot_id": object_id},
query_params=query_dict,
)
elif object_type == ORIGIN:
raise BadInputExc(
(
"Origin PIDs (Persistent Identifiers) are not "
"publicly resolvable because they are for "
"internal usage only"
)
)
return {"swh_id_parsed": swh_id_parsed, "browse_url": browse_url}
def get_persistent_identifier(persistent_id: str) -> PersistentId:
"""Check if a persistent identifier is valid.
Args:
persistent_id: A string representing a Software Heritage
persistent identifier.
Raises:
BadInputExc: if the provided persistent identifier can
not be parsed.
Return:
A persistent identifier object.
"""
try:
pid_object = parse_persistent_identifier(persistent_id)
except ValidationError as ve:
raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages))
else:
return pid_object
def group_swh_persistent_identifiers(
persistent_ids: Iterable[PersistentId],
) -> Dict[str, List[bytes]]:
"""
Groups many Software Heritage persistent identifiers into a
dictionary depending on their type.
Args:
persistent_ids: an iterable of Software Heritage persistent
identifier objects
Returns:
A dictionary with:
keys: persistent identifier types
values: persistent identifiers id
"""
pids_by_type: Dict[str, List[bytes]] = {
CONTENT: [],
DIRECTORY: [],
REVISION: [],
RELEASE: [],
SNAPSHOT: [],
}
for pid in persistent_ids:
obj_id = pid.object_id
obj_type = pid.object_type
pids_by_type[obj_type].append(hash_to_bytes(obj_id))
return pids_by_type
diff --git a/swh/web/tests/api/views/test_identifiers.py b/swh/web/tests/api/views/test_identifiers.py
index e297ed9e..585f55ae 100644
--- a/swh/web/tests/api/views/test_identifiers.py
+++ b/swh/web/tests/api/views/test_identifiers.py
@@ -1,188 +1,188 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from hypothesis import given
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
from swh.web.common.utils import reverse
from swh.web.tests.data import random_sha1
from swh.web.tests.strategies import (
content,
directory,
origin,
release,
revision,
snapshot,
unknown_content,
unknown_directory,
unknown_release,
unknown_revision,
unknown_snapshot,
)
@given(origin(), content(), directory(), release(), revision(), snapshot())
def test_swh_id_resolve_success(
api_client, origin, content, directory, release, revision, snapshot
):
for obj_type_short, obj_type, obj_id in (
("cnt", CONTENT, content["sha1_git"]),
("dir", DIRECTORY, directory),
("rel", RELEASE, release),
("rev", REVISION, revision),
("snp", SNAPSHOT, snapshot),
):
swh_id = "swh:1:%s:%s;origin=%s" % (obj_type_short, obj_id, origin["url"])
url = reverse("api-1-resolve-swh-pid", url_args={"swh_id": swh_id})
resp = api_client.get(url)
if obj_type == CONTENT:
url_args = {"query_string": "sha1_git:%s" % obj_id}
elif obj_type == SNAPSHOT:
url_args = {"snapshot_id": obj_id}
else:
url_args = {"sha1_git": obj_id}
browse_rev_url = reverse(
"browse-%s" % obj_type,
url_args=url_args,
- query_params={"origin": origin["url"]},
+ query_params={"origin_url": origin["url"]},
request=resp.wsgi_request,
)
expected_result = {
"browse_url": browse_rev_url,
"metadata": {"origin": origin["url"]},
"namespace": "swh",
"object_id": obj_id,
"object_type": obj_type,
"scheme_version": 1,
}
assert resp.status_code == 200, resp.data
assert resp.data == expected_result
def test_swh_id_resolve_invalid(api_client):
rev_id_invalid = "96db9023b8_foo_50d6c108e9a3"
swh_id = "swh:1:rev:%s" % rev_id_invalid
url = reverse("api-1-resolve-swh-pid", url_args={"swh_id": swh_id})
resp = api_client.get(url)
assert resp.status_code == 400, resp.data
@given(
unknown_content(),
unknown_directory(),
unknown_release(),
unknown_revision(),
unknown_snapshot(),
)
def test_swh_id_resolve_not_found(
api_client,
unknown_content,
unknown_directory,
unknown_release,
unknown_revision,
unknown_snapshot,
):
for obj_type_short, obj_id in (
("cnt", unknown_content["sha1_git"]),
("dir", unknown_directory),
("rel", unknown_release),
("rev", unknown_revision),
("snp", unknown_snapshot),
):
swh_id = "swh:1:%s:%s" % (obj_type_short, obj_id)
url = reverse("api-1-resolve-swh-pid", url_args={"swh_id": swh_id})
resp = api_client.get(url)
assert resp.status_code == 404, resp.data
def test_swh_origin_id_not_resolvable(api_client):
ori_pid = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb"
url = reverse("api-1-resolve-swh-pid", url_args={"swh_id": ori_pid})
resp = api_client.get(url)
assert resp.status_code == 400, resp.data
@given(content(), directory())
def test_api_known_swhpid_some_present(api_client, content, directory):
content_ = "swh:1:cnt:%s" % content["sha1_git"]
directory_ = "swh:1:dir:%s" % directory
unknown_revision_ = "swh:1:rev:%s" % random_sha1()
unknown_release_ = "swh:1:rel:%s" % random_sha1()
unknown_snapshot_ = "swh:1:snp:%s" % random_sha1()
input_pids = [
content_,
directory_,
unknown_revision_,
unknown_release_,
unknown_snapshot_,
]
url = reverse("api-1-known")
resp = api_client.post(
url, data=input_pids, format="json", HTTP_ACCEPT="application/json"
)
assert resp.status_code == 200, resp.data
assert resp["Content-Type"] == "application/json"
assert resp.data == {
content_: {"known": True},
directory_: {"known": True},
unknown_revision_: {"known": False},
unknown_release_: {"known": False},
unknown_snapshot_: {"known": False},
}
def test_api_known_invalid_swhpid(api_client):
invalid_pid_sha1 = ["swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13de;"]
invalid_pid_type = ["swh:1:cnn:8068d0075010b590762c6cb5682ed53cb3c13deb"]
url = reverse("api-1-known")
resp = api_client.post(
url, data=invalid_pid_sha1, format="json", HTTP_ACCEPT="application/json"
)
assert resp.status_code == 400, resp.data
resp2 = api_client.post(
url, data=invalid_pid_type, format="json", HTTP_ACCEPT="application/json"
)
assert resp2.status_code == 400, resp.data
def test_api_known_raises_large_payload_error(api_client):
random_pid = "swh:1:cnt:8068d0075010b590762c6cb5682ed53cb3c13deb"
limit = 10000
err_msg = "The maximum number of PIDs this endpoint can receive is 1000"
pids = [random_pid for i in range(limit)]
url = reverse("api-1-known")
resp = api_client.post(
url, data=pids, format="json", HTTP_ACCEPT="application/json"
)
assert resp.status_code == 413, resp.data
assert resp["Content-Type"] == "application/json"
assert resp.data == {"exception": "LargePayloadExc", "reason": err_msg}
diff --git a/swh/web/tests/browse/views/test_directory.py b/swh/web/tests/browse/views/test_directory.py
index 3b66065e..e38cea56 100644
--- a/swh/web/tests/browse/views/test_directory.py
+++ b/swh/web/tests/browse/views/test_directory.py
@@ -1,146 +1,146 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
from hypothesis import given
from swh.web.common.identifiers import get_swh_persistent_id
from swh.web.common.utils import gen_path_info, reverse
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import (
directory,
directory_with_subdirs,
invalid_sha1,
unknown_directory,
)
@given(directory())
def test_root_directory_view(client, archive_data, directory):
_directory_view(client, directory, archive_data.directory_ls(directory))
@given(directory_with_subdirs())
def test_sub_directory_view(client, archive_data, directory):
dir_content = archive_data.directory_ls(directory)
subdir = random.choice([e for e in dir_content if e["type"] == "dir"])
subdir_content = archive_data.directory_ls(subdir["target"])
_directory_view(client, directory, subdir_content, subdir["name"])
@given(invalid_sha1(), unknown_directory())
def test_directory_request_errors(client, invalid_sha1, unknown_directory):
dir_url = reverse("browse-directory", url_args={"sha1_git": invalid_sha1})
resp = client.get(dir_url)
assert resp.status_code == 400
assert_template_used(resp, "error.html")
dir_url = reverse("browse-directory", url_args={"sha1_git": unknown_directory})
resp = client.get(dir_url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
@given(directory())
def test_directory_uppercase(client, directory):
url = reverse(
"browse-directory-uppercase-checksum", url_args={"sha1_git": directory.upper()}
)
resp = client.get(url)
assert resp.status_code == 302
redirect_url = reverse("browse-directory", url_args={"sha1_git": directory})
assert resp["location"] == redirect_url
@given(directory())
def test_permalink_box_context(client, tests_data, directory):
origin_url = random.choice(tests_data["origins"])["url"]
url = reverse(
"browse-directory",
url_args={"sha1_git": directory},
- query_params={"origin": origin_url},
+ query_params={"origin_url": origin_url},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(resp, 'id="swh-id-option-origin-directory"')
def _directory_view(client, root_directory_sha1, directory_entries, path=None):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
url_args = {"sha1_git": root_directory_sha1}
if path:
url_args["path"] = path
url = reverse("browse-directory", url_args=url_args)
root_dir_url = reverse(
"browse-directory", url_args={"sha1_git": root_directory_sha1}
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert_contains(
resp, '' + root_directory_sha1[:7] + ""
)
assert_contains(resp, '', count=len(dirs))
assert_contains(resp, ' | ', count=len(files))
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_directory_sha1, "path": dir_path},
)
assert_contains(resp, dir_url)
for f in files:
file_path = "%s/%s" % (root_directory_sha1, f["name"])
if path:
file_path = "%s/%s/%s" % (root_directory_sha1, path, f["name"])
query_string = "sha1_git:" + f["target"]
file_url = reverse(
"browse-content",
url_args={"query_string": query_string},
query_params={"path": file_path},
)
assert_contains(resp, file_url)
path_info = gen_path_info(path)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(
resp, '%s' % (root_dir_url, root_directory_sha1[:7])
)
for p in path_info:
dir_url = reverse(
"browse-directory",
url_args={"sha1_git": root_directory_sha1, "path": p["path"]},
)
assert_contains(resp, '%s' % (dir_url, p["name"]))
assert_contains(resp, "vault-cook-directory")
swh_dir_id = get_swh_persistent_id("directory", directory_entries[0]["dir_id"])
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
diff --git a/swh/web/tests/browse/views/test_identifiers.py b/swh/web/tests/browse/views/test_identifiers.py
index 0e17766c..16335f4f 100644
--- a/swh/web/tests/browse/views/test_identifiers.py
+++ b/swh/web/tests/browse/views/test_identifiers.py
@@ -1,160 +1,160 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from hypothesis import given
from swh.web.common.utils import reverse
from swh.web.tests.strategies import content, directory, revision, release, snapshot
swh_id_prefix = "swh:1:"
@given(content())
def test_content_id_browse(client, content):
cnt_sha1_git = content["sha1_git"]
swh_id = swh_id_prefix + "cnt:" + cnt_sha1_git
url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
query_string = "sha1_git:" + cnt_sha1_git
content_browse_url = reverse(
"browse-content", url_args={"query_string": query_string}
)
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == content_browse_url
@given(directory())
def test_directory_id_browse(client, directory):
swh_id = swh_id_prefix + "dir:" + directory
url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
directory_browse_url = reverse("browse-directory", url_args={"sha1_git": directory})
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == directory_browse_url
@given(revision())
def test_revision_id_browse(client, revision):
swh_id = swh_id_prefix + "rev:" + revision
url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
revision_browse_url = reverse("browse-revision", url_args={"sha1_git": revision})
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == revision_browse_url
- query_params = {"origin": "https://github.com/user/repo"}
+ query_params = {"origin_url": "https://github.com/user/repo"}
url = reverse(
"browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params
)
revision_browse_url = reverse(
"browse-revision", url_args={"sha1_git": revision}, query_params=query_params
)
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == revision_browse_url
@given(release())
def test_release_id_browse(client, release):
swh_id = swh_id_prefix + "rel:" + release
url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
release_browse_url = reverse("browse-release", url_args={"sha1_git": release})
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == release_browse_url
- query_params = {"origin": "https://github.com/user/repo"}
+ query_params = {"origin_url": "https://github.com/user/repo"}
url = reverse(
"browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params
)
release_browse_url = reverse(
"browse-release", url_args={"sha1_git": release}, query_params=query_params
)
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == release_browse_url
@given(snapshot())
def test_snapshot_id_browse(client, snapshot):
swh_id = swh_id_prefix + "snp:" + snapshot
url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
snapshot_browse_url = reverse("browse-snapshot", url_args={"snapshot_id": snapshot})
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == snapshot_browse_url
- query_params = {"origin": "https://github.com/user/repo"}
+ query_params = {"origin_url": "https://github.com/user/repo"}
url = reverse(
"browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params
)
release_browse_url = reverse(
"browse-snapshot", url_args={"snapshot_id": snapshot}, query_params=query_params
)
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == release_browse_url
@given(release())
def test_bad_id_browse(client, release):
swh_id = swh_id_prefix + "foo:" + release
url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
resp = client.get(url)
assert resp.status_code == 400
@given(content())
def test_content_id_optional_parts_browse(client, content):
cnt_sha1_git = content["sha1_git"]
optional_parts = ";lines=4-20;origin=https://github.com/user/repo"
swh_id = swh_id_prefix + "cnt:" + cnt_sha1_git + optional_parts
url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
query_string = "sha1_git:" + cnt_sha1_git
content_browse_url = reverse(
"browse-content",
url_args={"query_string": query_string},
- query_params={"origin": "https://github.com/user/repo"},
+ query_params={"origin_url": "https://github.com/user/repo"},
)
content_browse_url += "#L4-L20"
resp = client.get(url)
assert resp.status_code == 302
assert resp["location"] == content_browse_url
@given(release())
def test_origin_id_not_resolvable(client, release):
swh_id = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb"
url = reverse("browse-swh-id", url_args={"swh_id": swh_id})
resp = client.get(url)
assert resp.status_code == 400
diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py
index 20a6a7a6..1d6fd25b 100644
--- a/swh/web/tests/browse/views/test_origin.py
+++ b/swh/web/tests/browse/views/test_origin.py
@@ -1,1110 +1,1110 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import random
import re
import string
from django.utils.html import escape
from hypothesis import given
from swh.model.hashutil import hash_to_bytes
from swh.model.model import (
Snapshot,
SnapshotBranch,
TargetType,
)
from swh.web.browse.snapshot_context import process_snapshot_branches
from swh.web.common.exc import NotFoundExc
from swh.web.common.identifiers import get_swh_persistent_id
from swh.web.common.utils import (
reverse,
gen_path_info,
format_utc_iso_date,
parse_timestamp,
)
from swh.web.config import get_config
from swh.web.tests.data import get_content, random_sha1
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import (
origin,
origin_with_multiple_visits,
new_origin,
new_snapshot,
visit_dates,
revisions,
origin_with_releases,
release as existing_release,
unknown_revision,
)
@given(origin_with_multiple_visits())
def test_origin_visits_browse(client, archive_data, origin):
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
url = reverse("browse-origin-visits", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/origin-visits.html")
visits = archive_data.origin_visit_get(origin["url"])
for v in visits:
vdate = format_utc_iso_date(v["date"], "%Y-%m-%dT%H:%M:%SZ")
browse_dir_url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "timestamp": vdate},
)
assert_contains(resp, browse_dir_url)
@given(origin_with_multiple_visits())
def test_origin_content_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
def _get_archive_data(visit_idx):
snapshot = archive_data.snapshot_get(origin_visits[visit_idx]["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
dir_content = archive_data.directory_ls(head_rev["directory"])
dir_files = [e for e in dir_content if e["type"] == "file"]
dir_file = random.choice(dir_files)
branches, releases = process_snapshot_branches(snapshot)
return {
"branches": branches,
"releases": releases,
"root_dir_sha1": head_rev["directory"],
"content": get_content(dir_file["checksums"]["sha1"]),
"visit": origin_visits[visit_idx],
}
tdata = _get_archive_data(-1)
_origin_content_view_test_helper(
client,
origin,
origin_visits,
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
)
_origin_content_view_test_helper(
client,
origin,
origin_visits,
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=tdata["visit"]["date"],
)
visit_unix_ts = parse_timestamp(tdata["visit"]["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_content_view_test_helper(
client,
origin,
origin_visits,
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
timestamp=visit_unix_ts,
)
tdata = _get_archive_data(0)
_origin_content_view_test_helper(
client,
origin,
origin_visits,
tdata["branches"],
tdata["releases"],
tdata["root_dir_sha1"],
tdata["content"],
visit_id=tdata["visit"]["visit"],
)
@given(origin())
def test_origin_root_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
dir_content = archive_data.directory_ls(root_dir_sha1)
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_directory_view_test_helper(
client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
origin = dict(origin)
del origin["type"]
_origin_directory_view_test_helper(
client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
dir_content,
timestamp=visit["date"],
)
@given(origin())
def test_origin_sub_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev["directory"]
subdirs = [
e for e in archive_data.directory_ls(root_dir_sha1) if e["type"] == "dir"
]
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit["date"]).timestamp()
visit_unix_ts = int(visit_unix_ts)
if len(subdirs) == 0:
return
subdir = random.choice(subdirs)
subdir_content = archive_data.directory_ls(subdir["target"])
subdir_path = subdir["name"]
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
origin = dict(origin)
del origin["type"]
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
visit_id=visit["visit"],
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit_unix_ts,
)
_origin_directory_view_test_helper(
client,
origin,
origin_visits,
branches,
releases,
root_dir_sha1,
subdir_content,
path=subdir_path,
timestamp=visit["date"],
)
@given(origin())
def test_origin_branches(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_branches_test_helper(client, origin, snapshot_content)
origin = dict(origin)
origin["type"] = None
_origin_branches_test_helper(client, origin, snapshot_content)
@given(origin())
def test_origin_releases(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin["url"])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit["snapshot"])
snapshot_content = process_snapshot_branches(snapshot)
_origin_releases_test_helper(client, origin, snapshot_content)
origin = dict(origin)
origin["type"] = None
_origin_releases_test_helper(client, origin, snapshot_content)
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=3, max_size=3),
)
def test_origin_snapshot_null_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
new_origin = archive_data.origin_add([new_origin])[0]
for i, branch in enumerate(snp_dict["branches"].keys()):
if i == 0:
snp_dict["branches"][branch] = None
else:
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i - 1]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="partial", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-directory", query_params={"origin_url": new_origin["url"]}
)
rv = client.get(url)
assert rv.status_code == 200
@given(
new_origin(),
new_snapshot(min_size=4, max_size=4),
visit_dates(),
revisions(min_size=4, max_size=4),
)
def test_origin_snapshot_invalid_branch(
client, archive_data, new_origin, new_snapshot, visit_dates, revisions
):
snp_dict = new_snapshot.to_dict()
new_origin = archive_data.origin_add([new_origin])[0]
for i, branch in enumerate(snp_dict["branches"].keys()):
snp_dict["branches"][branch] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin["url"], "branch": "invalid_branch"},
)
rv = client.get(url)
assert rv.status_code == 404
@given(new_origin())
def test_browse_visits_origin_not_found(client, new_origin):
url = reverse("browse-origin-visits", query_params={"origin_url": new_origin.url})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, f"Origin with url {new_origin.url} not found", status_code=404
)
@given(origin())
def test_browse_origin_directory_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_directory_not_found(client, origin):
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "path": "/invalid/dir/path/"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_origin_content_no_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = []
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "No visit", status_code=404)
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_unknown_visit(client, mocker, origin):
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_visits.get_origin_visits"
)
mock_get_origin_visits.return_value = [{"visit": 1}]
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "foo", "visit_id": 2},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Visit.*not found", resp.content.decode("utf-8"))
assert mock_get_origin_visits.called
@given(origin())
def test_browse_origin_content_directory_empty_snapshot(client, mocker, origin):
mock_snapshot_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_snapshot_service.lookup_origin.return_value = origin
mock_snapshot_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
for browse_context in ("content", "directory"):
url = reverse(
f"browse-origin-{browse_context}",
query_params={"origin_url": origin["url"], "path": "baz"},
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, f"browse/{browse_context}.html")
assert re.search("snapshot.*is empty", resp.content.decode("utf-8"))
assert mock_get_origin_visit_snapshot.called
assert mock_snapshot_service.lookup_origin.called
assert mock_snapshot_service.lookup_snapshot_sizes.called
@given(origin())
def test_browse_origin_content_not_found(client, origin):
url = reverse(
"browse-origin-content",
query_params={"origin_url": origin["url"], "path": "/invalid/file/path"},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert re.search("Directory entry.*not found", resp.content.decode("utf-8"))
@given(origin())
def test_browse_directory_snapshot_not_found(client, mocker, origin):
mock_get_snapshot_context = mocker.patch(
"swh.web.browse.snapshot_context.get_snapshot_context"
)
mock_get_snapshot_context.side_effect = NotFoundExc("Snapshot not found")
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(resp, "Snapshot not found", status_code=404)
assert mock_get_snapshot_context.called
@given(origin())
def test_origin_empty_snapshot(client, mocker, origin):
mock_service = mocker.patch("swh.web.browse.snapshot_context.service")
mock_get_origin_visit_snapshot = mocker.patch(
"swh.web.browse.snapshot_context.get_origin_visit_snapshot"
)
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_service.lookup_snapshot_sizes.return_value = {
"revision": 0,
"release": 0,
}
mock_service.lookup_origin.return_value = origin
url = reverse("browse-origin-directory", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
resp_content = resp.content.decode("utf-8")
assert re.search("snapshot.*is empty", resp_content)
assert not re.search("swh-tr-link", resp_content)
assert mock_get_origin_visit_snapshot.called
assert mock_service.lookup_snapshot_sizes.called
@given(origin_with_releases())
def test_origin_release_browse(client, archive_data, origin):
# for swh.web.browse.snapshot_context.get_snapshot_content to only return one branch
config = get_config()
snapshot_max_size = int(config["snapshot_content_max_size"])
config["snapshot_content_max_size"] = 1
try:
snapshot = archive_data.snapshot_get_latest(origin["url"])
release = [
b for b in snapshot["branches"].values() if b["target_type"] == "release"
][-1]
release_data = archive_data.release_get(release["target"])
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": release_data["name"]},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(resp, release_data["name"])
assert_contains(resp, release["target"])
finally:
config["snapshot_content_max_size"] = snapshot_max_size
@given(origin_with_releases())
def test_origin_release_browse_not_found(client, origin):
invalid_release_name = "swh-foo-bar"
url = reverse(
"browse-origin-directory",
query_params={"origin_url": origin["url"], "release": invalid_release_name},
)
resp = client.get(url)
assert resp.status_code == 404
assert re.search(
f"Release {invalid_release_name}.*not found", resp.content.decode("utf-8")
)
@given(new_origin(), unknown_revision())
def test_origin_browse_directory_branch_with_non_resolvable_revision(
client, archive_data, new_origin, unknown_revision
):
branch_name = "master"
snapshot = Snapshot(
branches={
branch_name.encode(): SnapshotBranch(
target=hash_to_bytes(unknown_revision), target_type=TargetType.REVISION,
)
}
)
new_origin = archive_data.origin_add([new_origin])[0]
archive_data.snapshot_add([snapshot])
visit = archive_data.origin_visit_add(new_origin["url"], datetime.now(), type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snapshot.id
)
url = reverse(
"browse-origin-directory",
query_params={"origin_url": new_origin["url"], "branch": branch_name},
)
resp = client.get(url)
assert resp.status_code == 200
assert_contains(
resp, f"Revision {unknown_revision } could not be found in the archive."
)
@given(origin())
def test_origin_content_no_path(client, origin):
url = reverse("browse-origin-content", query_params={"origin_url": origin["url"]})
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "The path of a content must be given as query parameter.", status_code=400
)
def test_origin_views_no_url_query_parameter(client):
for browse_context in (
"content",
"directory",
"log",
"branches",
"releases",
"visits",
):
url = reverse(f"browse-origin-{browse_context}")
resp = client.get(url)
assert resp.status_code == 400
assert_contains(
resp, "An origin URL must be provided as query parameter.", status_code=400
)
def _origin_content_view_test_helper(
client,
origin_info,
origin_visits,
origin_branches,
origin_releases,
root_dir_sha1,
content,
visit_id=None,
timestamp=None,
):
content_path = "/".join(content["path"].split("/")[1:])
if not visit_id:
visit_id = origin_visits[-1]["visit"]
query_params = {"origin_url": origin_info["url"], "path": content_path}
if timestamp:
query_params["timestamp"] = timestamp
if visit_id:
query_params["visit_id"] = visit_id
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
assert type(content["data"]) == str
assert_contains(resp, '' % content["hljs_language"])
assert_contains(resp, escape(content["data"]))
split_path = content_path.split("/")
filename = split_path[-1]
path = content_path.replace(filename, "")[:-1]
path_info = gen_path_info(path)
del query_params["path"]
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
root_dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7]))
for p in path_info:
query_params["path"] = p["path"]
dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '%s' % (dir_url, p["name"]))
assert_contains(resp, "%s" % filename)
query_string = "sha1_git:" + content["sha1_git"]
url_raw = reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params={"filename": filename},
)
assert_contains(resp, url_raw)
if "path" in query_params:
del query_params["path"]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
'Branches (%s)'
% (escape(origin_branches_url), len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
assert_contains(
resp,
'Releases (%s)'
% (escape(origin_releases_url), len(origin_releases)),
)
assert_contains(resp, '', count=len(origin_branches))
query_params["path"] = content_path
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-content", query_params=query_params
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-content", query_params=query_params
)
assert_contains(resp, '' % root_dir_release_url)
url = reverse("browse-origin-content", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/content.html")
swh_cnt_id = get_swh_persistent_id("content", content["sha1_git"])
swh_cnt_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, "swh-take-new-snapshot")
def _origin_directory_view_test_helper(
client,
origin_info,
origin_visits,
origin_branches,
origin_releases,
root_directory_sha1,
directory_entries,
visit_id=None,
timestamp=None,
path=None,
):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
if not visit_id:
visit_id = origin_visits[-1]["visit"]
query_params = {"origin_url": origin_info["url"]}
if timestamp:
query_params["timestamp"] = timestamp
else:
query_params["visit_id"] = visit_id
if path:
query_params["path"] = path
url = reverse("browse-origin-directory", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert resp.status_code == 200
assert_template_used(resp, "browse/directory.html")
assert_contains(resp, '', count=len(dirs))
assert_contains(resp, ' | ', count=len(files))
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
query_params["path"] = dir_path
dir_url = reverse("browse-origin-directory", query_params=query_params,)
assert_contains(resp, dir_url)
for f in files:
file_path = f["name"]
if path:
file_path = "%s/%s" % (path, f["name"])
query_params["path"] = file_path
file_url = reverse("browse-origin-content", query_params=query_params)
assert_contains(resp, file_url)
if "path" in query_params:
del query_params["path"]
root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params)
nb_bc_paths = 1
if path:
nb_bc_paths = len(path.split("/")) + 1
assert_contains(resp, '', count=nb_bc_paths)
assert_contains(
resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])
)
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
'Branches (%s)'
% (escape(origin_branches_url), len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp,
'Releases (%s)'
% (escape(origin_releases_url), nb_releases),
)
if path:
query_params["path"] = path
assert_contains(resp, '', count=len(origin_branches))
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_release_url)
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
swh_dir_id = get_swh_persistent_id("directory", directory_entries[0]["dir_id"])
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
def _origin_branches_test_helper(client, origin_info, origin_snapshot):
query_params = {"origin_url": origin_info["url"]}
url = reverse("browse-origin-branches", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
'Branches (%s)' % (origin_branches_url, len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp, 'Releases (%s)' % (origin_releases_url, nb_releases)
)
assert_contains(resp, '' % escape(browse_branch_url))
browse_revision_url = reverse(
"browse-revision",
url_args={"sha1_git": branch["revision"]},
- query_params={"origin": origin_info["url"]},
+ query_params={"origin_url": origin_info["url"]},
)
assert_contains(resp, '' % escape(browse_revision_url))
def _origin_releases_test_helper(client, origin_info, origin_snapshot):
query_params = {"origin_url": origin_info["url"]}
url = reverse("browse-origin-releases", query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/releases.html")
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(
resp,
'Branches (%s)' % (origin_branches_url, len(origin_branches)),
)
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(
resp, 'Releases (%s)' % (origin_releases_url, nb_releases)
)
assert_contains(resp, ' ' % escape(browse_release_url))
assert_contains(resp, '' % escape(browse_revision_url))
@given(
new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release()
)
def test_origin_branches_pagination_with_alias(
client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release
):
"""
When a snapshot contains a branch or a release alias, pagination links
in the branches / releases view should be displayed.
"""
mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2)
snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())}
for i in range(len(revisions)):
branch = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][branch.encode()] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
release = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][b"RELEASE_ALIAS"] = {
"target_type": "alias",
"target": release.encode(),
}
snp_dict["branches"][release.encode()] = {
"target_type": "release",
"target": hash_to_bytes(existing_release),
}
new_origin = archive_data.origin_add([new_origin])[0]
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(new_origin["url"], visit_dates[0], type="git")
archive_data.origin_visit_update(
new_origin["url"], visit.visit, status="full", snapshot=snp_dict["id"]
)
url = reverse(
"browse-origin-branches", query_params={"origin_url": new_origin["url"]}
)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/branches.html")
assert_contains(resp, '%s%s" % (message_lines[0] or "None", "\n".join(message_lines[1:])),
)
assert_contains(resp, release_id)
assert_contains(resp, release_name)
assert_contains(resp, target_type)
assert_contains(resp, '%s' % (target_url, target))
swh_rel_id = get_swh_persistent_id("release", release_id)
swh_rel_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rel_id})
assert_contains(resp, swh_rel_id)
assert_contains(resp, swh_rel_id_url)
if release_data["target_type"] == "revision":
if origin_info:
directory_url = reverse(
"browse-origin-directory",
query_params={
"origin_url": origin_info["url"],
"release": release_data["name"],
},
)
else:
rev = archive_data.revision_get(release_data["target"])
directory_url = reverse(
"browse-directory", url_args={"sha1_git": rev["directory"]}
)
assert_contains(resp, directory_url)
diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/tests/browse/views/test_revision.py
index 0353751e..8f8378f3 100644
--- a/swh/web/tests/browse/views/test_revision.py
+++ b/swh/web/tests/browse/views/test_revision.py
@@ -1,248 +1,248 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.utils.html import escape
from hypothesis import given
from swh.web.common.identifiers import get_swh_persistent_id
from swh.web.common.utils import reverse, format_utc_iso_date, parse_timestamp
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import origin, revision, unknown_revision, new_origin
@given(revision())
def test_revision_browse(client, archive_data, revision):
url = reverse("browse-revision", url_args={"sha1_git": revision})
revision_data = archive_data.revision_get(revision)
author_name = revision_data["author"]["name"]
committer_name = revision_data["committer"]["name"]
dir_id = revision_data["directory"]
directory_url = reverse("browse-directory", url_args={"sha1_git": dir_id})
history_url = reverse("browse-revision-log", url_args={"sha1_git": revision})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, "browse/revision.html")
assert_contains(resp, author_name)
assert_contains(resp, committer_name)
assert_contains(resp, directory_url)
assert_contains(resp, history_url)
for parent in revision_data["parents"]:
parent_url = reverse("browse-revision", url_args={"sha1_git": parent})
assert_contains(resp, '%s' % (parent_url, parent))
author_date = revision_data["date"]
committer_date = revision_data["committer_date"]
message_lines = revision_data["message"].split("\n")
assert_contains(resp, format_utc_iso_date(author_date))
assert_contains(resp, format_utc_iso_date(committer_date))
assert_contains(resp, escape(message_lines[0]))
assert_contains(resp, escape("\n".join(message_lines[1:])))
@given(origin())
def test_revision_origin_browse(client, archive_data, origin):
snapshot = archive_data.snapshot_get_latest(origin["url"])
revision = archive_data.snapshot_get_head(snapshot)
revision_data = archive_data.revision_get(revision)
dir_id = revision_data["directory"]
origin_revision_log_url = reverse(
"browse-origin-log",
query_params={"origin_url": origin["url"], "revision": revision},
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
- query_params={"origin": origin["url"]},
+ query_params={"origin_url": origin["url"]},
)
resp = client.get(url)
assert_contains(resp, origin_revision_log_url)
for parent in revision_data["parents"]:
parent_url = reverse(
"browse-revision",
url_args={"sha1_git": parent},
- query_params={"origin": origin["url"]},
+ query_params={"origin_url": origin["url"]},
)
assert_contains(resp, '%s' % (parent_url, parent))
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
swh_rev_id = get_swh_persistent_id("revision", revision)
swh_rev_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
swh_dir_id = get_swh_persistent_id("directory", dir_id)
swh_dir_id_url = reverse("browse-swh-id", url_args={"swh_id": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
@given(revision())
def test_revision_log_browse(client, archive_data, revision):
per_page = 10
revision_log = archive_data.revision_log(revision)
revision_log_sorted = sorted(
revision_log,
key=lambda rev: -parse_timestamp(rev["committer_date"]).timestamp(),
)
url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"per_page": per_page},
)
resp = client.get(url)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
nb_log_entries = per_page
if len(revision_log_sorted) < per_page:
nb_log_entries = len(revision_log_sorted)
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, 'Newer')
if len(revision_log_sorted) > per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
for log in revision_log_sorted[:per_page]:
revision_url = reverse("browse-revision", url_args={"sha1_git": log["id"]})
assert_contains(resp, log["id"][:7])
assert_contains(resp, log["author"]["name"])
assert_contains(resp, format_utc_iso_date(log["date"]))
assert_contains(resp, escape(log["message"]))
assert_contains(resp, format_utc_iso_date(log["committer_date"]))
assert_contains(resp, revision_url)
if len(revision_log_sorted) <= per_page:
return
resp = client.get(next_page_url)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 2 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 2 * per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
if len(revision_log_sorted) <= 2 * per_page:
return
resp = client.get(next_page_url)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 3 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - 2 * per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert resp.status_code == 200
assert_template_used(resp, "browse/revision-log.html")
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 3 * per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
@given(revision(), unknown_revision(), new_origin())
def test_revision_request_errors(client, revision, unknown_revision, new_origin):
url = reverse("browse-revision", url_args={"sha1_git": unknown_revision})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, "Revision with sha1_git %s not found" % unknown_revision, status_code=404
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
- query_params={"origin": new_origin.url},
+ query_params={"origin_url": new_origin.url},
)
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, "error.html")
assert_contains(
resp, "the origin mentioned in your request" " appears broken", status_code=404
)
@given(revision())
def test_revision_uppercase(client, revision):
url = reverse(
"browse-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()}
)
resp = client.get(url)
assert resp.status_code == 302
redirect_url = reverse("browse-revision", url_args={"sha1_git": revision})
assert resp["location"] == redirect_url
diff --git a/swh/web/tests/common/test_identifiers.py b/swh/web/tests/common/test_identifiers.py
index e98f772e..68fadcf6 100644
--- a/swh/web/tests/common/test_identifiers.py
+++ b/swh/web/tests/common/test_identifiers.py
@@ -1,121 +1,121 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from hypothesis import given
import pytest
from swh.model.hashutil import hash_to_bytes
from swh.model.identifiers import (
CONTENT,
DIRECTORY,
RELEASE,
REVISION,
SNAPSHOT,
PersistentId,
)
from swh.web.common.exc import BadInputExc
from swh.web.common.identifiers import (
get_swh_persistent_id,
resolve_swh_persistent_id,
get_persistent_identifier,
group_swh_persistent_identifiers,
)
from swh.web.common.utils import reverse
from swh.web.tests.data import random_sha1
from swh.web.tests.strategies import content, directory, release, revision, snapshot
@given(content())
def test_get_swh_persistent_id(content):
swh_object_type = CONTENT
sha1_git = content["sha1_git"]
expected_swh_id = "swh:1:cnt:" + sha1_git
assert get_swh_persistent_id(swh_object_type, sha1_git) == expected_swh_id
with pytest.raises(BadInputExc) as e:
get_swh_persistent_id("foo", sha1_git)
assert e.match("Invalid object")
with pytest.raises(BadInputExc) as e:
get_swh_persistent_id(swh_object_type, "not a valid id")
assert e.match("Invalid object")
@given(content(), directory(), release(), revision(), snapshot())
def test_resolve_swh_persistent_id(content, directory, release, revision, snapshot):
for obj_type, obj_id in (
(CONTENT, content["sha1_git"]),
(DIRECTORY, directory),
(RELEASE, release),
(REVISION, revision),
(SNAPSHOT, snapshot),
):
swh_pid = get_swh_persistent_id(obj_type, obj_id)
url_args = {}
if obj_type == CONTENT:
url_args["query_string"] = f"sha1_git:{obj_id}"
elif obj_type == SNAPSHOT:
url_args["snapshot_id"] = obj_id
else:
url_args["sha1_git"] = obj_id
- query_params = {"origin": "some-origin"}
+ query_params = {"origin_url": "some-origin"}
browse_url = reverse(
f"browse-{obj_type}", url_args=url_args, query_params=query_params
)
resolved_pid = resolve_swh_persistent_id(swh_pid, query_params)
assert isinstance(resolved_pid["swh_id_parsed"], PersistentId)
assert str(resolved_pid["swh_id_parsed"]) == swh_pid
assert resolved_pid["browse_url"] == browse_url
with pytest.raises(BadInputExc, match="Origin PIDs"):
resolve_swh_persistent_id(f"swh:1:ori:{random_sha1()}")
@given(content(), directory(), release(), revision(), snapshot())
def test_get_persistent_identifier(content, directory, release, revision, snapshot):
for obj_type, obj_id in (
(CONTENT, content["sha1_git"]),
(DIRECTORY, directory),
(RELEASE, release),
(REVISION, revision),
(SNAPSHOT, snapshot),
):
swh_pid = get_swh_persistent_id(obj_type, obj_id)
swh_parsed_pid = get_persistent_identifier(swh_pid)
assert isinstance(swh_parsed_pid, PersistentId)
assert str(swh_parsed_pid) == swh_pid
with pytest.raises(BadInputExc, match="Error when parsing identifier"):
get_persistent_identifier("foo")
@given(content(), directory(), release(), revision(), snapshot())
def test_group_persistent_identifiers(content, directory, release, revision, snapshot):
swh_pids = []
expected = {}
for obj_type, obj_id in (
(CONTENT, content["sha1_git"]),
(DIRECTORY, directory),
(RELEASE, release),
(REVISION, revision),
(SNAPSHOT, snapshot),
):
swh_pid = get_swh_persistent_id(obj_type, obj_id)
swh_pid = get_persistent_identifier(swh_pid)
swh_pids.append(swh_pid)
expected[obj_type] = [hash_to_bytes(obj_id)]
pid_groups = group_swh_persistent_identifiers(swh_pids)
assert pid_groups == expected
| |