diff --git a/swh/web/browse/views/directory.py b/swh/web/browse/views/directory.py index ffb223b8..333f39ae 100644 --- a/swh/web/browse/views/directory.py +++ b/swh/web/browse/views/directory.py @@ -1,272 +1,273 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import sentry_sdk from django.http import HttpResponse from django.shortcuts import redirect, render from django.template.defaultfilters import filesizeformat from swh.model.identifiers import DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.web.browse.browseurls import browse_route from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import gen_link, get_directory_entries, get_readme_to_display from swh.web.common import archive from swh.web.common.exc import NotFoundExc, handle_view_exception from swh.web.common.identifiers import get_swhids_info from swh.web.common.typing import DirectoryMetadata, SWHObjectInfo from swh.web.common.utils import gen_path_info, reverse, swh_object_icons def _directory_browse(request, sha1_git, path=None): root_sha1_git = sha1_git try: if path: dir_info = archive.lookup_directory_with_path(sha1_git, path) sha1_git = dir_info["target"] dirs, files = get_directory_entries(sha1_git) origin_url = request.GET.get("origin_url") if not origin_url: origin_url = request.GET.get("origin") snapshot_id = request.GET.get("snapshot") snapshot_context = None if origin_url is not None or snapshot_id is not None: try: snapshot_context = get_snapshot_context( snapshot_id=snapshot_id, origin_url=origin_url, branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), revision_id=request.GET.get("revision"), path=path, ) except NotFoundExc as e: if str(e).startswith("Origin"): raw_dir_url = reverse( "browse-directory", url_args={"sha1_git": sha1_git} ) error_message = ( "The Software Heritage archive has a directory " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the directory " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_dir_url)) ) raise NotFoundExc(error_message) else: raise e except Exception as exc: return handle_view_exception(request, exc) path_info = gen_path_info(path) query_params = snapshot_context["query_params"] if snapshot_context else {} breadcrumbs = [] breadcrumbs.append( { "name": root_sha1_git[:7], "url": reverse( "browse-directory", url_args={"sha1_git": root_sha1_git}, - query_params=query_params, + query_params={**query_params, "path": None}, ), } ) + for pi in path_info: breadcrumbs.append( { "name": pi["name"], "url": reverse( "browse-directory", url_args={"sha1_git": root_sha1_git}, - query_params={"path": pi["path"], **query_params}, + query_params={**query_params, "path": pi["path"],}, ), } ) path = "" if path is None else (path + "/") for d in dirs: if d["type"] == "rev": d["url"] = reverse( "browse-revision", url_args={"sha1_git": d["target"]}, query_params=query_params, ) else: d["url"] = reverse( "browse-directory", url_args={"sha1_git": root_sha1_git}, - query_params={"path": path + d["name"], **query_params}, + query_params={**query_params, "path": path + d["name"],}, ) sum_file_sizes = 0 readmes = {} for f in files: query_string = "sha1_git:" + f["target"] f["url"] = reverse( "browse-content", url_args={"query_string": query_string}, query_params={ - "path": root_sha1_git + "/" + path + f["name"], **query_params, + "path": root_sha1_git + "/" + path + f["name"], }, ) if f["length"] is not None: sum_file_sizes += f["length"] f["length"] = filesizeformat(f["length"]) if f["name"].lower().startswith("readme"): readmes[f["name"]] = f["checksums"]["sha1"] readme_name, readme_url, readme_html = get_readme_to_display(readmes) sum_file_sizes = filesizeformat(sum_file_sizes) dir_metadata = DirectoryMetadata( object_type=DIRECTORY, object_id=sha1_git, directory=root_sha1_git, nb_files=len(files), nb_dirs=len(dirs), sum_file_sizes=sum_file_sizes, root_directory=root_sha1_git, path=f"/{path}" if path else None, revision=None, revision_found=None, release=None, snapshot=None, ) vault_cooking = { "directory_context": True, "directory_id": sha1_git, "revision_context": False, "revision_id": None, } swh_objects = [SWHObjectInfo(object_type=DIRECTORY, object_id=sha1_git)] if snapshot_context: swh_objects.append( SWHObjectInfo( object_type=REVISION, object_id=snapshot_context["revision_id"] ) ) swh_objects.append( SWHObjectInfo( object_type=SNAPSHOT, object_id=snapshot_context["snapshot_id"] ) ) if snapshot_context["release_id"]: swh_objects.append( SWHObjectInfo( object_type=RELEASE, object_id=snapshot_context["release_id"] ) ) swhids_info = get_swhids_info(swh_objects, snapshot_context, dir_metadata) heading = "Directory - %s" % sha1_git if breadcrumbs: dir_path = "/".join([bc["name"] for bc in breadcrumbs]) + "/" heading += " - %s" % dir_path top_right_link = None if snapshot_context is not None and not snapshot_context["is_empty"]: history_url = reverse( "browse-revision-log", url_args={"sha1_git": snapshot_context["revision_id"]}, query_params=query_params, ) top_right_link = { "url": history_url, "icon": swh_object_icons["revisions history"], "text": "History", } return render( request, "browse/directory.html", { "heading": heading, "swh_object_id": swhids_info[0]["swhid"], "swh_object_name": "Directory", "swh_object_metadata": dir_metadata, "dirs": dirs, "files": files, "breadcrumbs": breadcrumbs, "top_right_link": top_right_link, "readme_name": readme_name, "readme_url": readme_url, "readme_html": readme_html, "snapshot_context": snapshot_context, "vault_cooking": vault_cooking, "show_actions": True, "swhids_info": swhids_info, }, ) @browse_route( r"directory/(?P[0-9a-f]+)/", view_name="browse-directory", checksum_args=["sha1_git"], ) def directory_browse(request, sha1_git): """Django view for browsing the content of a directory identified by its sha1_git value. The url that points to it is :http:get:`/browse/directory/(sha1_git)/` """ return _directory_browse(request, sha1_git, request.GET.get("path")) @browse_route( r"directory/(?P[0-9a-f]+)/(?P.+)/", view_name="browse-directory-legacy", checksum_args=["sha1_git"], ) def directory_browse_legacy(request, sha1_git, path): """Django view for browsing the content of a directory identified by its sha1_git value. The url that points to it is :http:get:`/browse/directory/(sha1_git)/(path)/` """ return _directory_browse(request, sha1_git, path) @browse_route( r"directory/resolve/content-path/(?P[0-9a-f]+)/", view_name="browse-directory-resolve-content-path", checksum_args=["sha1_git"], ) def _directory_resolve_content_path(request, sha1_git): """ Internal endpoint redirecting to data url for a specific file path relative to a root directory. """ try: path = os.path.normpath(request.GET.get("path")) if not path.startswith("../"): dir_info = archive.lookup_directory_with_path(sha1_git, path) if dir_info["type"] == "file": sha1 = dir_info["checksums"]["sha1"] data_url = reverse( "browse-content-raw", url_args={"query_string": sha1} ) return redirect(data_url) except Exception as exc: sentry_sdk.capture_exception(exc) return HttpResponse(status=404) diff --git a/swh/web/tests/browse/views/test_directory.py b/swh/web/tests/browse/views/test_directory.py index ebaef4fd..46009fe6 100644 --- a/swh/web/tests/browse/views/test_directory.py +++ b/swh/web/tests/browse/views/test_directory.py @@ -1,318 +1,428 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from django.utils.html import escape +from swh.model.from_disk import DentryPerms +from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import DIRECTORY, RELEASE, REVISION, SNAPSHOT +from swh.model.model import ( + Directory, + DirectoryEntry, + Origin, + OriginVisit, + OriginVisitStatus, + Revision, + RevisionType, + Snapshot, + SnapshotBranch, + TargetType, + TimestampWithTimezone, +) +from swh.storage.utils import now from swh.web.browse.snapshot_context import process_snapshot_branches from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import gen_path_info, reverse from swh.web.tests.django_asserts import assert_contains, assert_template_used from swh.web.tests.strategies import ( directory, directory_with_subdirs, + empty_directory, invalid_sha1, + new_person, + new_swh_date, origin_with_multiple_visits, unknown_directory, ) @given(directory()) def test_root_directory_view(client, archive_data, directory): _directory_view_checks(client, directory, archive_data.directory_ls(directory)) @given(directory_with_subdirs()) def test_sub_directory_view(client, archive_data, directory): dir_content = archive_data.directory_ls(directory) subdir = random.choice([e for e in dir_content if e["type"] == "dir"]) subdir_content = archive_data.directory_ls(subdir["target"]) _directory_view_checks(client, directory, subdir_content, subdir["name"]) +@given(empty_directory(), new_person(), new_swh_date()) +def test_sub_directory_view_origin_context( + client, archive_data, empty_directory, person, date +): + origin_url = "test_sub_directory_view_origin_context" + subdir = Directory( + entries=( + DirectoryEntry( + name=b"foo", + type="dir", + target=hash_to_bytes(empty_directory), + perms=DentryPerms.directory, + ), + DirectoryEntry( + name=b"bar", + type="dir", + target=hash_to_bytes(empty_directory), + perms=DentryPerms.directory, + ), + ) + ) + + parentdir = Directory( + entries=( + DirectoryEntry( + name=b"baz", type="dir", target=subdir.id, perms=DentryPerms.directory, + ), + ) + ) + archive_data.directory_add([subdir, parentdir]) + + revision = Revision( + directory=parentdir.id, + author=person, + committer=person, + message=b"commit message", + date=TimestampWithTimezone.from_datetime(date), + committer_date=TimestampWithTimezone.from_datetime(date), + synthetic=False, + type=RevisionType.GIT, + ) + archive_data.revision_add([revision]) + + snapshot = Snapshot( + branches={ + b"HEAD": SnapshotBranch( + target="refs/head/master".encode(), target_type=TargetType.ALIAS, + ), + b"refs/head/master": SnapshotBranch( + target=revision.id, target_type=TargetType.REVISION, + ), + } + ) + archive_data.snapshot_add([snapshot]) + + archive_data.origin_add([Origin(url=origin_url)]) + date = now() + visit = OriginVisit(origin=origin_url, date=date, type="git") + visit = archive_data.origin_visit_add([visit])[0] + visit_status = OriginVisitStatus( + origin=origin_url, + visit=visit.visit, + date=date, + status="full", + snapshot=snapshot.id, + ) + archive_data.origin_visit_status_add([visit_status]) + + dir_content = archive_data.directory_ls(hash_to_hex(parentdir.id)) + subdir = dir_content[0] + subdir_content = archive_data.directory_ls(subdir["target"]) + _directory_view_checks( + client, + hash_to_hex(parentdir.id), + subdir_content, + subdir["name"], + origin_url, + hash_to_hex(snapshot.id), + hash_to_hex(revision.id), + ) + + @given(invalid_sha1(), unknown_directory()) def test_directory_request_errors(client, invalid_sha1, unknown_directory): dir_url = reverse("browse-directory", url_args={"sha1_git": invalid_sha1}) resp = client.get(dir_url) assert resp.status_code == 400 assert_template_used(resp, "error.html") dir_url = reverse("browse-directory", url_args={"sha1_git": unknown_directory}) resp = client.get(dir_url) assert resp.status_code == 404 assert_template_used(resp, "error.html") @given(directory()) def test_directory_uppercase(client, directory): url = reverse( "browse-directory-uppercase-checksum", url_args={"sha1_git": directory.upper()} ) resp = client.get(url) assert resp.status_code == 302 redirect_url = reverse("browse-directory", url_args={"sha1_git": directory}) assert resp["location"] == redirect_url @given(directory()) def test_permalink_box_context(client, tests_data, directory): origin_url = random.choice(tests_data["origins"])["url"] url = reverse( "browse-directory", url_args={"sha1_git": directory}, query_params={"origin_url": origin_url}, ) resp = client.get(url) assert resp.status_code == 200 assert_contains(resp, 'id="swhid-context-option-directory"') @given(origin_with_multiple_visits()) def test_directory_origin_snapshot_branch_browse(client, archive_data, origin): visits = archive_data.origin_visit_get(origin["url"]) visit = random.choice(visits) snapshot = archive_data.snapshot_get(visit["snapshot"]) branches, releases = process_snapshot_branches(snapshot) branch_info = random.choice(branches) directory = archive_data.revision_get(branch_info["revision"])["directory"] directory_content = archive_data.directory_ls(directory) directory_subdir = random.choice( [e for e in directory_content if e["type"] == "dir"] ) url = reverse( "browse-directory", url_args={"sha1_git": directory}, query_params={ "origin_url": origin["url"], "snapshot": snapshot["id"], "branch": branch_info["name"], "path": directory_subdir["name"], }, ) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") _check_origin_snapshot_related_html(resp, origin, snapshot, branches, releases) assert_contains(resp, directory_subdir["name"]) assert_contains(resp, f"Branch: {branch_info['name']}") dir_swhid = gen_swhid( DIRECTORY, directory_subdir["target"], metadata={ "origin": origin["url"], "visit": gen_swhid(SNAPSHOT, snapshot), "anchor": gen_swhid(REVISION, branch_info["revision"]), "path": "/", }, ) assert_contains(resp, dir_swhid) rev_swhid = gen_swhid( REVISION, branch_info["revision"], metadata={"origin": origin["url"], "visit": gen_swhid(SNAPSHOT, snapshot),}, ) assert_contains(resp, rev_swhid) snp_swhid = gen_swhid(SNAPSHOT, snapshot, metadata={"origin": origin["url"],},) assert_contains(resp, snp_swhid) @given(origin_with_multiple_visits()) def test_content_origin_snapshot_release_browse(client, archive_data, origin): visits = archive_data.origin_visit_get(origin["url"]) visit = random.choice(visits) snapshot = archive_data.snapshot_get(visit["snapshot"]) branches, releases = process_snapshot_branches(snapshot) release_info = random.choice(releases) directory = release_info["directory"] directory_content = archive_data.directory_ls(directory) directory_subdir = random.choice( [e for e in directory_content if e["type"] == "dir"] ) url = reverse( "browse-directory", url_args={"sha1_git": directory}, query_params={ "origin_url": origin["url"], "snapshot": snapshot["id"], "release": release_info["name"], "path": directory_subdir["name"], }, ) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") _check_origin_snapshot_related_html(resp, origin, snapshot, branches, releases) assert_contains(resp, directory_subdir["name"]) assert_contains(resp, f"Release: {release_info['name']}") dir_swhid = gen_swhid( DIRECTORY, directory_subdir["target"], metadata={ "origin": origin["url"], "visit": gen_swhid(SNAPSHOT, snapshot), "anchor": gen_swhid(RELEASE, release_info["id"]), "path": "/", }, ) assert_contains(resp, dir_swhid) rev_swhid = gen_swhid( REVISION, release_info["target"], metadata={"origin": origin["url"], "visit": gen_swhid(SNAPSHOT, snapshot),}, ) assert_contains(resp, rev_swhid) rel_swhid = gen_swhid( RELEASE, release_info["id"], metadata={"origin": origin["url"], "visit": gen_swhid(SNAPSHOT, snapshot),}, ) assert_contains(resp, rel_swhid) snp_swhid = gen_swhid(SNAPSHOT, snapshot, metadata={"origin": origin["url"],},) assert_contains(resp, snp_swhid) def _check_origin_snapshot_related_html(resp, origin, snapshot, branches, releases): browse_origin_url = reverse( "browse-origin", query_params={"origin_url": origin["url"]} ) assert_contains(resp, f'href="{browse_origin_url}"') origin_branches_url = reverse( "browse-origin-branches", query_params={"origin_url": origin["url"], "snapshot": snapshot["id"]}, ) assert_contains(resp, f'href="{escape(origin_branches_url)}"') assert_contains(resp, f"Branches ({len(branches)})") origin_releases_url = reverse( "browse-origin-releases", query_params={"origin_url": origin["url"], "snapshot": snapshot["id"]}, ) assert_contains(resp, f'href="{escape(origin_releases_url)}"') assert_contains(resp, f"Releases ({len(releases)})") assert_contains(resp, '
  • ', count=len(branches)) assert_contains(resp, '
  • ', count=len(releases)) def _directory_view_checks( client, root_directory_sha1, directory_entries, path=None, origin_url=None, snapshot_id=None, + revision_id=None, ): dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")] files = [e for e in directory_entries if e["type"] == "file"] url_args = {"sha1_git": root_directory_sha1} - query_params = {"path": path, "origin_url": origin_url, "snapshot": snapshot_id} + query_params = {"origin_url": origin_url, "snapshot": snapshot_id} - url = reverse("browse-directory", url_args=url_args, query_params=query_params) + url = reverse( + "browse-directory", + url_args=url_args, + query_params={**query_params, "path": path}, + ) root_dir_url = reverse( - "browse-directory", url_args={"sha1_git": root_directory_sha1} + "browse-directory", url_args=url_args, query_params=query_params, ) resp = client.get(url) assert resp.status_code == 200 assert_template_used(resp, "browse/directory.html") assert_contains( - resp, '' + root_directory_sha1[:7] + "" + resp, '' + root_directory_sha1[:7] + "", ) assert_contains(resp, '', count=len(dirs)) assert_contains(resp, '', count=len(files)) for d in dirs: if d["type"] == "rev": dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]}) else: dir_path = d["name"] if path: dir_path = "%s/%s" % (path, d["name"]) dir_url = reverse( "browse-directory", url_args={"sha1_git": root_directory_sha1}, - query_params={"path": dir_path}, + query_params={**query_params, "path": dir_path}, ) assert_contains(resp, dir_url) for f in files: file_path = "%s/%s" % (root_directory_sha1, f["name"]) if path: file_path = "%s/%s/%s" % (root_directory_sha1, path, f["name"]) query_string = "sha1_git:" + f["target"] file_url = reverse( "browse-content", url_args={"query_string": query_string}, - query_params={"path": file_path}, + query_params={**query_params, "path": file_path}, ) assert_contains(resp, file_url) path_info = gen_path_info(path) assert_contains(resp, '
  • ', count=len(path_info) + 1) assert_contains( resp, '%s' % (root_dir_url, root_directory_sha1[:7]) ) for p in path_info: dir_url = reverse( "browse-directory", url_args={"sha1_git": root_directory_sha1}, - query_params={"path": p["path"]}, + query_params={**query_params, "path": p["path"]}, ) assert_contains(resp, '%s' % (dir_url, p["name"])) assert_contains(resp, "vault-cook-directory") swh_dir_id = gen_swhid(DIRECTORY, directory_entries[0]["dir_id"]) swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) swhid_context = {} + if origin_url: + swhid_context["origin"] = origin_url + if snapshot_id: + swhid_context["visit"] = gen_swhid(SNAPSHOT, snapshot_id) if root_directory_sha1 != directory_entries[0]["dir_id"]: swhid_context["anchor"] = gen_swhid(DIRECTORY, root_directory_sha1) - - swhid_context["path"] = f"/{path}/" if path else None - if root_directory_sha1 != directory_entries[0]["dir_id"]: swhid_context["anchor"] = gen_swhid(DIRECTORY, root_directory_sha1) + if revision_id: + swhid_context["anchor"] = gen_swhid(REVISION, revision_id) + swhid_context["path"] = f"/{path}/" if path else None swh_dir_id = gen_swhid( DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context ) swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url)