diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py index c1cd9079..c7f3781f 100644 --- a/swh/web/common/identifiers.py +++ b/swh/web/common/identifiers.py @@ -1,393 +1,398 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, List, Optional, cast +from typing import Any, Dict, Iterable, List, Optional from urllib.parse import quote, unquote from typing_extensions import TypedDict from django.http import QueryDict from swh.model.exceptions import ValidationError from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ( CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT, SWHID, + ObjectType, + QualifiedSWHID, parse_swhid, - swhid, ) from swh.web.common import archive from swh.web.common.exc import BadInputExc from swh.web.common.typing import ( QueryParameters, SnapshotContext, SWHIDContext, SWHIDInfo, SWHObjectInfo, ) from swh.web.common.utils import reverse def gen_swhid( object_type: str, object_id: str, scheme_version: int = 1, metadata: SWHIDContext = {}, ) -> str: """ Returns the SoftWare Heritage persistent IDentifier for a swh object based on: * the object type * the object id * the SWHID scheme version Args: object_type: the swh object type (content/directory/release/revision/snapshot) object_id: the swh object id (hexadecimal representation of its hash value) scheme_version: the scheme version of the SWHIDs Returns: the SWHID of the object Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: - obj_swhid = swhid( - object_type, - object_id, - scheme_version, - cast(Dict[str, Any], {k: v for k, v in metadata.items() if v is not None}), + decoded_object_type = ObjectType[object_type.upper()] + decoded_object_id = hash_to_bytes(object_id) + obj_swhid = str( + QualifiedSWHID( + object_type=decoded_object_type, + object_id=decoded_object_id, + scheme_version=scheme_version, + **metadata, + ) ) - except ValidationError as e: + except (ValidationError, KeyError, ValueError) as e: raise BadInputExc("Invalid object (%s) for SWHID. %s" % (object_id, e)) else: return obj_swhid class ResolvedSWHID(TypedDict): """parsed SWHID with context""" swhid_parsed: SWHID """URL to browse object according to SWHID context""" browse_url: Optional[str] def resolve_swhid( swhid: str, query_params: Optional[QueryParameters] = None ) -> ResolvedSWHID: """ Try to resolve a SoftWare Heritage persistent IDentifier into an url for browsing the targeted object. Args: swhid: a SoftWare Heritage persistent IDentifier query_params: optional dict filled with query parameters to append to the browse url Returns: a dict with the following keys: * **swhid_parsed**: the parsed identifier * **browse_url**: the url for browsing the targeted object """ swhid_parsed = get_swhid(swhid) object_type = swhid_parsed.object_type object_id = swhid_parsed.object_id browse_url = None url_args = {} query_dict = QueryDict("", mutable=True) fragment = "" anchor_swhid_parsed = None process_lines = object_type is CONTENT if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] if "origin" in swhid_parsed.metadata: origin_url = unquote(swhid_parsed.metadata["origin"]) origin_url = archive.lookup_origin({"url": origin_url})["url"] query_dict["origin_url"] = origin_url if "anchor" in swhid_parsed.metadata: anchor_swhid_parsed = get_swhid(swhid_parsed.metadata["anchor"]) if "path" in swhid_parsed.metadata and swhid_parsed.metadata["path"] != "/": query_dict["path"] = unquote(swhid_parsed.metadata["path"]) if anchor_swhid_parsed: directory = "" if anchor_swhid_parsed.object_type == DIRECTORY: directory = anchor_swhid_parsed.object_id elif anchor_swhid_parsed.object_type == REVISION: revision = archive.lookup_revision(anchor_swhid_parsed.object_id) directory = revision["directory"] elif anchor_swhid_parsed.object_type == RELEASE: release = archive.lookup_release(anchor_swhid_parsed.object_id) if release["target_type"] == REVISION: revision = archive.lookup_revision(release["target"]) directory = revision["directory"] if object_type == CONTENT: if "origin" not in swhid_parsed.metadata: # when no origin context, content objects need to have their # path prefixed by root directory id for proper breadcrumbs display query_dict["path"] = directory + query_dict["path"] else: # remove leading slash from SWHID content path query_dict["path"] = query_dict["path"][1:] elif object_type == DIRECTORY: object_id = directory # remove leading and trailing slashes from SWHID directory path if query_dict["path"].endswith("/"): query_dict["path"] = query_dict["path"][1:-1] else: query_dict["path"] = query_dict["path"][1:] # snapshot context if "visit" in swhid_parsed.metadata: snp_swhid_parsed = get_swhid(swhid_parsed.metadata["visit"]) if snp_swhid_parsed.object_type != SNAPSHOT: raise BadInputExc("Visit must be a snapshot SWHID.") query_dict["snapshot"] = snp_swhid_parsed.object_id if anchor_swhid_parsed: if anchor_swhid_parsed.object_type == REVISION: # check if the anchor revision is the tip of a branch branch_name = archive.lookup_snapshot_branch_name_from_tip_revision( snp_swhid_parsed.object_id, anchor_swhid_parsed.object_id ) if branch_name: query_dict["branch"] = branch_name elif object_type != REVISION: query_dict["revision"] = anchor_swhid_parsed.object_id elif anchor_swhid_parsed.object_type == RELEASE: release = archive.lookup_release(anchor_swhid_parsed.object_id) if release: query_dict["release"] = release["name"] if object_type == REVISION and "release" not in query_dict: branch_name = archive.lookup_snapshot_branch_name_from_tip_revision( snp_swhid_parsed.object_id, object_id ) if branch_name: query_dict["branch"] = branch_name # browsing content or directory without snapshot context elif object_type in (CONTENT, DIRECTORY) and anchor_swhid_parsed: if anchor_swhid_parsed.object_type == REVISION: # anchor revision, objects are browsed from its view object_type = REVISION object_id = anchor_swhid_parsed.object_id elif object_type == DIRECTORY and anchor_swhid_parsed.object_type == DIRECTORY: # a directory is browsed from its root object_id = anchor_swhid_parsed.object_id if object_type == CONTENT: url_args["query_string"] = f"sha1_git:{object_id}" elif object_type == DIRECTORY: url_args["sha1_git"] = object_id elif object_type == RELEASE: url_args["sha1_git"] = object_id elif object_type == REVISION: url_args["sha1_git"] = object_id elif object_type == SNAPSHOT: url_args["snapshot_id"] = object_id elif object_type == ORIGIN: raise BadInputExc( ( "Origin SWHIDs are not publicly resolvable because they are for " "internal usage only" ) ) if "lines" in swhid_parsed.metadata and process_lines: lines = swhid_parsed.metadata["lines"].split("-") fragment += "#L" + lines[0] if len(lines) > 1: fragment += "-L" + lines[1] if url_args: browse_url = ( reverse( f"browse-{object_type}", url_args=url_args, query_params=query_dict, ) + fragment ) return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url) def get_swhid(swhid: str) -> SWHID: """Check if a SWHID is valid and return it parsed. Args: swhid: a SoftWare Heritage persistent IDentifier. Raises: BadInputExc: if the provided SWHID can not be parsed. Return: A parsed SWHID. """ try: swhid_parsed = parse_swhid(swhid) except ValidationError as ve: raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages)) else: return swhid_parsed def group_swhids(swhids: Iterable[SWHID],) -> Dict[str, List[bytes]]: """ Groups many SoftWare Heritage persistent IDentifiers into a dictionary depending on their type. Args: swhids: an iterable of SoftWare Heritage persistent IDentifier objects Returns: A dictionary with: keys: object types values: object hashes """ swhids_by_type: Dict[str, List[bytes]] = { CONTENT: [], DIRECTORY: [], REVISION: [], RELEASE: [], SNAPSHOT: [], } for obj_swhid in swhids: obj_id = obj_swhid.object_id obj_type = obj_swhid.object_type swhids_by_type[obj_type].append(hash_to_bytes(obj_id)) return swhids_by_type def get_swhids_info( swh_objects: Iterable[SWHObjectInfo], snapshot_context: Optional[SnapshotContext] = None, extra_context: Optional[Dict[str, Any]] = None, ) -> List[SWHIDInfo]: """ Returns a list of dict containing info related to SWHIDs of objects. Args: swh_objects: an iterable of dict describing archived objects snapshot_context: optional dict parameter describing the snapshot in which the objects have been found extra_context: optional dict filled with extra contextual info about the objects Returns: a list of dict containing SWHIDs info """ swhids_info = [] for swh_object in swh_objects: if not swh_object["object_id"]: swhids_info.append( SWHIDInfo( object_type=swh_object["object_type"], object_id="", swhid="", swhid_url="", context={}, swhid_with_context=None, swhid_with_context_url=None, ) ) continue object_type = swh_object["object_type"] object_id = swh_object["object_id"] swhid_context: SWHIDContext = {} if snapshot_context: if snapshot_context["origin_info"] is not None: swhid_context["origin"] = quote( snapshot_context["origin_info"]["url"], safe="/?:@&" ) if object_type != SNAPSHOT: swhid_context["visit"] = gen_swhid( SNAPSHOT, snapshot_context["snapshot_id"] ) if object_type in (CONTENT, DIRECTORY): if snapshot_context["release_id"] is not None: swhid_context["anchor"] = gen_swhid( RELEASE, snapshot_context["release_id"] ) elif snapshot_context["revision_id"] is not None: swhid_context["anchor"] = gen_swhid( REVISION, snapshot_context["revision_id"] ) if object_type in (CONTENT, DIRECTORY): if ( extra_context and "revision" in extra_context and extra_context["revision"] and "anchor" not in swhid_context ): swhid_context["anchor"] = gen_swhid(REVISION, extra_context["revision"]) elif ( extra_context and "root_directory" in extra_context and extra_context["root_directory"] and "anchor" not in swhid_context and ( object_type != DIRECTORY or extra_context["root_directory"] != object_id ) ): swhid_context["anchor"] = gen_swhid( DIRECTORY, extra_context["root_directory"] ) path = None if extra_context and "path" in extra_context: path = extra_context["path"] or "/" if "filename" in extra_context and object_type == CONTENT: path += extra_context["filename"] if object_type == DIRECTORY and path == "/": path = None if path: swhid_context["path"] = quote(path, safe="/?:@&") swhid = gen_swhid(object_type, object_id) swhid_url = reverse("browse-swhid", url_args={"swhid": swhid}) swhid_with_context = None swhid_with_context_url = None if swhid_context: swhid_with_context = gen_swhid( object_type, object_id, metadata=swhid_context ) swhid_with_context_url = reverse( "browse-swhid", url_args={"swhid": swhid_with_context} ) swhids_info.append( SWHIDInfo( object_type=object_type, object_id=object_id, swhid=swhid, swhid_url=swhid_url, context=swhid_context, swhid_with_context=swhid_with_context, swhid_with_context_url=swhid_with_context_url, ) ) return swhids_info diff --git a/swh/web/tests/browse/views/test_content.py b/swh/web/tests/browse/views/test_content.py index eb5475d7..231a15f5 100644 --- a/swh/web/tests/browse/views/test_content.py +++ b/swh/web/tests/browse/views/test_content.py @@ -1,599 +1,612 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from django.utils.html import escape from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.web.browse.snapshot_context import process_snapshot_branches from swh.web.browse.utils import ( _re_encode_content, get_mimetype_and_encoding_for_content, prepare_content_for_display, ) from swh.web.common.exc import NotFoundExc from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import gen_path_info, reverse from swh.web.tests.django_asserts import assert_contains, assert_not_contains from swh.web.tests.strategies import ( content, content_image_type, content_text, content_text_no_highlight, content_text_non_utf8, content_unsupported_image_type_rendering, content_utf8_detected_as_binary, invalid_sha1, origin_with_multiple_visits, unknown_content, ) from swh.web.tests.utils import check_html_get_response, check_http_get_response @given(content_text()) def test_content_view_text(client, archive_data, content): sha1_git = content["sha1_git"] url = reverse( "browse-content", url_args={"query_string": content["sha1"]}, query_params={"path": content["path"]}, ) url_raw = reverse("browse-content-raw", url_args={"query_string": content["sha1"]}) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content) mimetype = content_display["mimetype"] if mimetype.startswith("text/"): assert_contains(resp, '' % content_display["language"]) assert_contains(resp, escape(content_display["content_data"])) assert_contains(resp, url_raw) swh_cnt_id = gen_swhid(CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_not_contains(resp, "swh-metadata-popover") @given(content_text_no_highlight()) def test_content_view_text_no_highlight(client, archive_data, content): sha1_git = content["sha1_git"] url = reverse("browse-content", url_args={"query_string": content["sha1"]}) url_raw = reverse("browse-content-raw", url_args={"query_string": content["sha1"]}) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content) assert_contains(resp, '') assert_contains(resp, escape(content_display["content_data"])) assert_contains(resp, url_raw) swh_cnt_id = gen_swhid(CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) @given(content_text_non_utf8()) def test_content_view_no_utf8_text(client, archive_data, content): sha1_git = content["sha1_git"] url = reverse("browse-content", url_args={"query_string": content["sha1"]}) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content) swh_cnt_id = gen_swhid(CONTENT, sha1_git) swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id}) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, escape(content_display["content_data"])) @given(content_image_type()) def test_content_view_image(client, archive_data, content): url = reverse("browse-content", url_args={"query_string": content["sha1"]}) url_raw = reverse("browse-content-raw", url_args={"query_string": content["sha1"]}) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) content_display = _process_content_for_display(archive_data, content) mimetype = content_display["mimetype"] content_data = content_display["content_data"] assert_contains(resp, '' % (mimetype, content_data)) assert_contains(resp, url_raw) @given(content_unsupported_image_type_rendering()) def test_content_view_image_no_rendering(client, archive_data, content): url = reverse("browse-content", url_args={"query_string": content["sha1"]}) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) mimetype = content["mimetype"] encoding = content["encoding"] assert_contains( resp, ( f"Content with mime type {mimetype} and encoding {encoding} " "cannot be displayed." ), ) @given(content_text()) def test_content_view_text_with_path(client, archive_data, content): path = content["path"] url = reverse( "browse-content", url_args={"query_string": content["sha1"]}, query_params={"path": path}, ) resp = check_html_get_response( client, url, status_code=200, template_used="browse/content.html" ) assert_contains(resp, '