diff --git a/swh/web/browse/views/content.py b/swh/web/browse/views/content.py index 4902a52c..11ca7d84 100644 --- a/swh/web/browse/views/content.py +++ b/swh/web/browse/views/content.py @@ -1,387 +1,388 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import difflib import json from distutils.util import strtobool from django.http import HttpResponse from django.shortcuts import render from django.template.defaultfilters import filesizeformat import sentry_sdk from swh.model.hashutil import hash_to_hex from swh.model.identifiers import CONTENT from swh.web.browse.browseurls import browse_route from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( request_content, prepare_content_for_display, content_display_max_size, gen_link, gen_directory_link, ) from swh.web.common import query, service, highlightjs from swh.web.common.exc import NotFoundExc, handle_view_exception from swh.web.common.identifiers import get_swhids_info from swh.web.common.typing import ContentMetadata, SWHObjectInfo from swh.web.common.utils import reverse, gen_path_info, swh_object_icons @browse_route( r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/raw/", view_name="browse-content-raw", checksum_args=["query_string"], ) def content_raw(request, query_string): """Django view that produces a raw display of a content identified by its hash value. The url that points to it is :http:get:`/browse/content/[(algo_hash):](hash)/raw/` """ try: re_encode = bool(strtobool(request.GET.get("re_encode", "false"))) algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) content_data = request_content(query_string, max_size=None, re_encode=re_encode) except Exception as exc: return handle_view_exception(request, exc) filename = request.GET.get("filename", None) if not filename: filename = "%s_%s" % (algo, checksum) if ( content_data["mimetype"].startswith("text/") or content_data["mimetype"] == "inode/x-empty" ): response = HttpResponse(content_data["raw_data"], content_type="text/plain") response["Content-disposition"] = "filename=%s" % filename else: response = HttpResponse( content_data["raw_data"], content_type="application/octet-stream" ) response["Content-disposition"] = "attachment; filename=%s" % filename return response _auto_diff_size_limit = 20000 @browse_route( r"content/(?P.*)/diff/(?P.*)", view_name="diff-contents", ) def _contents_diff(request, from_query_string, to_query_string): """ Browse endpoint used to compute unified diffs between two contents. Diffs are generated only if the two contents are textual. By default, diffs whose size are greater than 20 kB will not be generated. To force the generation of large diffs, the 'force' boolean query parameter must be used. Args: request: input django http request from_query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value identifying the first content to_query_string: same as above for identifying the second content Returns: A JSON object containing the unified diff. """ diff_data = {} content_from = None content_to = None content_from_size = 0 content_to_size = 0 content_from_lines = [] content_to_lines = [] force = request.GET.get("force", "false") path = request.GET.get("path", None) language = "nohighlight" force = bool(strtobool(force)) if from_query_string == to_query_string: diff_str = "File renamed without changes" else: try: text_diff = True if from_query_string: content_from = request_content(from_query_string, max_size=None) content_from_display_data = prepare_content_for_display( content_from["raw_data"], content_from["mimetype"], path ) language = content_from_display_data["language"] content_from_size = content_from["length"] if not ( content_from["mimetype"].startswith("text/") or content_from["mimetype"] == "inode/x-empty" ): text_diff = False if text_diff and to_query_string: content_to = request_content(to_query_string, max_size=None) content_to_display_data = prepare_content_for_display( content_to["raw_data"], content_to["mimetype"], path ) language = content_to_display_data["language"] content_to_size = content_to["length"] if not ( content_to["mimetype"].startswith("text/") or content_to["mimetype"] == "inode/x-empty" ): text_diff = False diff_size = abs(content_to_size - content_from_size) if not text_diff: diff_str = "Diffs are not generated for non textual content" language = "nohighlight" elif not force and diff_size > _auto_diff_size_limit: diff_str = "Large diffs are not automatically computed" language = "nohighlight" else: if content_from: content_from_lines = ( content_from["raw_data"].decode("utf-8").splitlines(True) ) if content_from_lines and content_from_lines[-1][-1] != "\n": content_from_lines[-1] += "[swh-no-nl-marker]\n" if content_to: content_to_lines = ( content_to["raw_data"].decode("utf-8").splitlines(True) ) if content_to_lines and content_to_lines[-1][-1] != "\n": content_to_lines[-1] += "[swh-no-nl-marker]\n" diff_lines = difflib.unified_diff(content_from_lines, content_to_lines) diff_str = "".join(list(diff_lines)[2:]) except Exception as exc: sentry_sdk.capture_exception(exc) diff_str = str(exc) diff_data["diff_str"] = diff_str diff_data["language"] = language diff_data_json = json.dumps(diff_data, separators=(",", ": ")) return HttpResponse(diff_data_json, content_type="application/json") @browse_route( r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/", view_name="browse-content", checksum_args=["query_string"], ) def content_display(request, query_string): """Django view that produces an HTML display of a content identified by its hash value. The url that points to it is :http:get:`/browse/content/[(algo_hash):](hash)/` """ try: algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) content_data = request_content(query_string, raise_if_unavailable=False) origin_url = request.GET.get("origin_url") selected_language = request.GET.get("language") if not origin_url: origin_url = request.GET.get("origin") snapshot_id = request.GET.get("snapshot") path = request.GET.get("path") snapshot_context = None if origin_url is not None or snapshot_id is not None: try: snapshot_context = get_snapshot_context( origin_url=origin_url, snapshot_id=snapshot_id, branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), revision_id=request.GET.get("revision"), path=path, browse_context=CONTENT, ) except NotFoundExc as e: if str(e).startswith("Origin"): raw_cnt_url = reverse( "browse-content", url_args={"query_string": query_string} ) error_message = ( "The Software Heritage archive has a content " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the content " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_cnt_url)) ) raise NotFoundExc(error_message) else: raise e except Exception as exc: return handle_view_exception(request, exc) content = None language = None mimetype = None if content_data["raw_data"] is not None: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content = content_display_data["content_data"] language = content_display_data["language"] mimetype = content_display_data["mimetype"] # Override language with user-selected language if selected_language is not None: language = selected_language available_languages = None if mimetype and "text/" in mimetype: available_languages = highlightjs.get_supported_languages() filename = None path_info = None directory_id = None directory_url = None root_dir = None if snapshot_context: root_dir = snapshot_context.get("root_directory") query_params = snapshot_context["query_params"] if snapshot_context else {} breadcrumbs = [] if path: split_path = path.split("/") root_dir = root_dir or split_path[0] filename = split_path[-1] if root_dir != path: path = path.replace(root_dir + "/", "") path = path[: -len(filename)] path_info = gen_path_info(path) query_params.pop("path", None) dir_url = reverse( "browse-directory", url_args={"sha1_git": root_dir}, query_params=query_params, ) breadcrumbs.append({"name": root_dir[:7], "url": dir_url}) for pi in path_info: query_params["path"] = pi["path"] dir_url = reverse( "browse-directory", url_args={"sha1_git": root_dir}, query_params=query_params, ) breadcrumbs.append({"name": pi["name"], "url": dir_url}) breadcrumbs.append({"name": filename, "url": None}) if path and root_dir != path: try: dir_info = service.lookup_directory_with_path(root_dir, path) directory_id = dir_info["target"] except Exception as exc: return handle_view_exception(request, exc) elif root_dir != path: directory_id = root_dir else: root_dir = None if directory_id: directory_url = gen_directory_link(directory_id) query_params = {"filename": filename} content_checksums = content_data["checksums"] content_url = reverse( "browse-content", url_args={"query_string": f'sha1_git:{content_checksums["sha1_git"]}'}, ) content_raw_url = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params=query_params, ) content_metadata = ContentMetadata( object_type=CONTENT, object_id=content_checksums["sha1_git"], sha1=content_checksums["sha1"], sha1_git=content_checksums["sha1_git"], sha256=content_checksums["sha256"], blake2s256=content_checksums["blake2s256"], content_url=content_url, mimetype=content_data["mimetype"], encoding=content_data["encoding"], size=filesizeformat(content_data["length"]), language=content_data["language"], licenses=content_data["licenses"], root_directory=root_dir, - path=f"/{path}" if path else "/", + path=f"/{path}" if path else "", filename=filename or "", directory=directory_id, directory_url=directory_url, revision=None, release=None, snapshot=None, origin_url=origin_url, ) swhids_info = get_swhids_info( [SWHObjectInfo(object_type=CONTENT, object_id=content_checksums["sha1_git"])], + snapshot_context, extra_context=content_metadata, ) heading = "Content - %s" % content_checksums["sha1_git"] if breadcrumbs: content_path = "/".join([bc["name"] for bc in breadcrumbs]) heading += " - %s" % content_path return render( request, "browse/content.html", { "heading": heading, "swh_object_id": swhids_info[0]["swhid"], "swh_object_name": "Content", "swh_object_metadata": content_metadata, "content": content, "content_size": content_data["length"], "max_content_size": content_display_max_size, "filename": filename, "encoding": content_data["encoding"], "mimetype": mimetype, "language": language, "available_languages": available_languages, "breadcrumbs": breadcrumbs, "top_right_link": { "url": content_raw_url, "icon": swh_object_icons["content"], "text": "Raw File", }, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions_menu": True, "swhids_info": swhids_info, "error_code": content_data["error_code"], "error_message": content_data["error_message"], "error_description": content_data["error_description"], }, status=content_data["error_code"], ) diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py index 95b0dee1..156478b9 100644 --- a/swh/web/common/identifiers.py +++ b/swh/web/common/identifiers.py @@ -1,322 +1,322 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from urllib.parse import quote from typing import Any, Dict, Iterable, List, Optional from typing_extensions import TypedDict from django.http import QueryDict from swh.model.exceptions import ValidationError from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ( persistent_identifier, parse_persistent_identifier, CONTENT, DIRECTORY, ORIGIN, RELEASE, REVISION, SNAPSHOT, PersistentId, ) from swh.web.common.exc import BadInputExc from swh.web.common.typing import ( QueryParameters, SnapshotContext, SWHObjectInfo, SWHIDInfo, SWHIDContext, ) from swh.web.common.utils import reverse def get_swh_persistent_id( object_type: str, object_id: str, scheme_version: int = 1, metadata: SWHIDContext = {}, ) -> str: """ Returns the persistent identifier for a swh object based on: * the object type * the object id * the swh identifiers scheme version Args: object_type: the swh object type (content/directory/release/revision/snapshot) object_id: the swh object id (hexadecimal representation of its hash value) scheme_version: the scheme version of the swh persistent identifiers Returns: the swh object persistent identifier Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: swh_id = persistent_identifier(object_type, object_id, scheme_version, metadata) except ValidationError as e: raise BadInputExc( "Invalid object (%s) for swh persistent id. %s" % (object_id, e) ) else: return swh_id ResolvedPersistentId = TypedDict( "ResolvedPersistentId", {"swh_id_parsed": PersistentId, "browse_url": Optional[str]} ) def resolve_swh_persistent_id( swh_id: str, query_params: Optional[QueryParameters] = None ) -> ResolvedPersistentId: """ Try to resolve a Software Heritage persistent id into an url for browsing the targeted object. Args: swh_id: a Software Heritage persistent identifier query_params: optional dict filled with query parameters to append to the browse url Returns: a dict with the following keys: * **swh_id_parsed**: the parsed identifier * **browse_url**: the url for browsing the targeted object """ swh_id_parsed = get_persistent_identifier(swh_id) object_type = swh_id_parsed.object_type object_id = swh_id_parsed.object_id browse_url = None query_dict = QueryDict("", mutable=True) if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] if "origin" in swh_id_parsed.metadata: query_dict["origin_url"] = swh_id_parsed.metadata["origin"] if object_type == CONTENT: query_string = "sha1_git:" + object_id fragment = "" if "lines" in swh_id_parsed.metadata: lines = swh_id_parsed.metadata["lines"].split("-") fragment += "#L" + lines[0] if len(lines) > 1: fragment += "-L" + lines[1] browse_url = ( reverse( "browse-content", url_args={"query_string": query_string}, query_params=query_dict, ) + fragment ) elif object_type == DIRECTORY: browse_url = reverse( "browse-directory", url_args={"sha1_git": object_id}, query_params=query_dict, ) elif object_type == RELEASE: browse_url = reverse( "browse-release", url_args={"sha1_git": object_id}, query_params=query_dict ) elif object_type == REVISION: browse_url = reverse( "browse-revision", url_args={"sha1_git": object_id}, query_params=query_dict ) elif object_type == SNAPSHOT: browse_url = reverse( "browse-snapshot", url_args={"snapshot_id": object_id}, query_params=query_dict, ) elif object_type == ORIGIN: raise BadInputExc( ( "Origin PIDs (Persistent Identifiers) are not " "publicly resolvable because they are for " "internal usage only" ) ) return {"swh_id_parsed": swh_id_parsed, "browse_url": browse_url} def get_persistent_identifier(persistent_id: str) -> PersistentId: """Check if a persistent identifier is valid. Args: persistent_id: A string representing a Software Heritage persistent identifier. Raises: BadInputExc: if the provided persistent identifier can not be parsed. Return: A persistent identifier object. """ try: pid_object = parse_persistent_identifier(persistent_id) except ValidationError as ve: raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages)) else: return pid_object def group_swh_persistent_identifiers( persistent_ids: Iterable[PersistentId], ) -> Dict[str, List[bytes]]: """ Groups many Software Heritage persistent identifiers into a dictionary depending on their type. Args: persistent_ids: an iterable of Software Heritage persistent identifier objects Returns: A dictionary with: keys: persistent identifier types values: persistent identifiers id """ pids_by_type: Dict[str, List[bytes]] = { CONTENT: [], DIRECTORY: [], REVISION: [], RELEASE: [], SNAPSHOT: [], } for pid in persistent_ids: obj_id = pid.object_id obj_type = pid.object_type pids_by_type[obj_type].append(hash_to_bytes(obj_id)) return pids_by_type def get_swhids_info( swh_objects: Iterable[SWHObjectInfo], snapshot_context: Optional[SnapshotContext] = None, extra_context: Optional[Dict[str, Any]] = None, ) -> List[SWHIDInfo]: """ Returns a list of dict containing info related to persistent identifiers of swh objects. Args: swh_objects: an iterable of dict describing archived objects snapshot_context: optional dict parameter describing the snapshot in which the objects have been found extra_context: optional dict filled with extra contextual info about the objects Returns: a list of dict containing persistent identifiers info """ swhids_info = [] for swh_object in swh_objects: if not swh_object["object_id"]: swhids_info.append( SWHIDInfo( object_type=swh_object["object_type"], object_id="", swhid="", swhid_url="", context={}, swhid_with_context=None, swhid_with_context_url=None, ) ) continue object_type = swh_object["object_type"] object_id = swh_object["object_id"] swhid_context: SWHIDContext = {} if snapshot_context: if snapshot_context["origin_info"] is not None: swhid_context["origin"] = quote( snapshot_context["origin_info"]["url"], safe="/?:@&" ) if object_type != SNAPSHOT: swhid_context["visit"] = get_swh_persistent_id( SNAPSHOT, snapshot_context["snapshot_id"] ) if object_type in (CONTENT, DIRECTORY): if snapshot_context["release_id"] is not None: swhid_context["anchor"] = get_swh_persistent_id( RELEASE, snapshot_context["release_id"] ) elif snapshot_context["revision_id"] is not None: swhid_context["anchor"] = get_swh_persistent_id( REVISION, snapshot_context["revision_id"] ) if object_type in (CONTENT, DIRECTORY): if ( extra_context and "revision" in extra_context and extra_context["revision"] and "anchor" not in swhid_context ): swhid_context["anchor"] = get_swh_persistent_id( REVISION, extra_context["revision"] ) elif ( extra_context and "root_directory" in extra_context and extra_context["root_directory"] and "anchor" not in swhid_context and ( object_type != DIRECTORY or extra_context["root_directory"] != object_id ) ): swhid_context["anchor"] = get_swh_persistent_id( DIRECTORY, extra_context["root_directory"] ) path = None if extra_context and "path" in extra_context: - path = extra_context["path"] + path = extra_context["path"] or "/" if "filename" in extra_context and object_type == CONTENT: path += extra_context["filename"] if path: swhid_context["path"] = quote(path, safe="/?:@&") swhid = get_swh_persistent_id(object_type, object_id) swhid_url = reverse("browse-swh-id", url_args={"swh_id": swhid}) swhid_with_context = None swhid_with_context_url = None if swhid_context: swhid_with_context = get_swh_persistent_id( object_type, object_id, metadata=swhid_context ) swhid_with_context_url = reverse( "browse-swh-id", url_args={"swh_id": swhid_with_context} ) swhids_info.append( SWHIDInfo( object_type=object_type, object_id=object_id, swhid=swhid, swhid_url=swhid_url, context=swhid_context, swhid_with_context=swhid_with_context, swhid_with_context_url=swhid_with_context_url, ) ) return swhids_info diff --git a/swh/web/tests/browse/views/test_identifiers.py b/swh/web/tests/browse/views/test_identifiers.py index 16335f4f..f1fb9d49 100644 --- a/swh/web/tests/browse/views/test_identifiers.py +++ b/swh/web/tests/browse/views/test_identifiers.py @@ -1,160 +1,206 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import random + from hypothesis import given +from swh.model.identifiers import CONTENT, REVISION, SNAPSHOT +from swh.web.common.identifiers import get_swh_persistent_id from swh.web.common.utils import reverse -from swh.web.tests.strategies import content, directory, revision, release, snapshot +from swh.web.tests.django_asserts import assert_contains +from swh.web.tests.strategies import ( + content, + directory, + origin, + revision, + release, + snapshot, +) swh_id_prefix = "swh:1:" @given(content()) def test_content_id_browse(client, content): cnt_sha1_git = content["sha1_git"] swh_id = swh_id_prefix + "cnt:" + cnt_sha1_git url = reverse("browse-swh-id", url_args={"swh_id": swh_id}) query_string = "sha1_git:" + cnt_sha1_git content_browse_url = reverse( "browse-content", url_args={"query_string": query_string} ) resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == content_browse_url @given(directory()) def test_directory_id_browse(client, directory): swh_id = swh_id_prefix + "dir:" + directory url = reverse("browse-swh-id", url_args={"swh_id": swh_id}) directory_browse_url = reverse("browse-directory", url_args={"sha1_git": directory}) resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == directory_browse_url @given(revision()) def test_revision_id_browse(client, revision): swh_id = swh_id_prefix + "rev:" + revision url = reverse("browse-swh-id", url_args={"swh_id": swh_id}) revision_browse_url = reverse("browse-revision", url_args={"sha1_git": revision}) resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == revision_browse_url query_params = {"origin_url": "https://github.com/user/repo"} url = reverse( "browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params ) revision_browse_url = reverse( "browse-revision", url_args={"sha1_git": revision}, query_params=query_params ) resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == revision_browse_url @given(release()) def test_release_id_browse(client, release): swh_id = swh_id_prefix + "rel:" + release url = reverse("browse-swh-id", url_args={"swh_id": swh_id}) release_browse_url = reverse("browse-release", url_args={"sha1_git": release}) resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == release_browse_url query_params = {"origin_url": "https://github.com/user/repo"} url = reverse( "browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params ) release_browse_url = reverse( "browse-release", url_args={"sha1_git": release}, query_params=query_params ) resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == release_browse_url @given(snapshot()) def test_snapshot_id_browse(client, snapshot): swh_id = swh_id_prefix + "snp:" + snapshot url = reverse("browse-swh-id", url_args={"swh_id": swh_id}) snapshot_browse_url = reverse("browse-snapshot", url_args={"snapshot_id": snapshot}) resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == snapshot_browse_url query_params = {"origin_url": "https://github.com/user/repo"} url = reverse( "browse-swh-id", url_args={"swh_id": swh_id}, query_params=query_params ) release_browse_url = reverse( "browse-snapshot", url_args={"snapshot_id": snapshot}, query_params=query_params ) resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == release_browse_url @given(release()) def test_bad_id_browse(client, release): swh_id = swh_id_prefix + "foo:" + release url = reverse("browse-swh-id", url_args={"swh_id": swh_id}) resp = client.get(url) assert resp.status_code == 400 @given(content()) def test_content_id_optional_parts_browse(client, content): cnt_sha1_git = content["sha1_git"] optional_parts = ";lines=4-20;origin=https://github.com/user/repo" swh_id = swh_id_prefix + "cnt:" + cnt_sha1_git + optional_parts url = reverse("browse-swh-id", url_args={"swh_id": swh_id}) query_string = "sha1_git:" + cnt_sha1_git content_browse_url = reverse( "browse-content", url_args={"query_string": query_string}, query_params={"origin_url": "https://github.com/user/repo"}, ) content_browse_url += "#L4-L20" resp = client.get(url) assert resp.status_code == 302 assert resp["location"] == content_browse_url @given(release()) def test_origin_id_not_resolvable(client, release): swh_id = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb" url = reverse("browse-swh-id", url_args={"swh_id": swh_id}) resp = client.get(url) assert resp.status_code == 400 + + +@given(origin()) +def test_legacy_swhid_browse(archive_data, client, origin): + snapshot = archive_data.snapshot_get_latest(origin["url"]) + revision = archive_data.snapshot_get_head(snapshot) + directory = archive_data.revision_get(revision)["directory"] + directory_content = archive_data.directory_ls(directory) + directory_file = random.choice( + [e for e in directory_content if e["type"] == "file"] + ) + legacy_swhid = get_swh_persistent_id( + CONTENT, + directory_file["checksums"]["sha1_git"], + metadata={"origin": origin["url"]}, + ) + + url = reverse("browse-swh-id", url_args={"swh_id": legacy_swhid}) + resp = client.get(url) + assert resp.status_code == 302 + + resp = client.get(resp["location"]) + + swhid = get_swh_persistent_id( + CONTENT, + directory_file["checksums"]["sha1_git"], + metadata={ + "origin": origin["url"], + "visit": get_swh_persistent_id(SNAPSHOT, snapshot), + "anchor": get_swh_persistent_id(REVISION, revision), + }, + ) + + assert_contains(resp, swhid)