diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py index 552e683d..5a0d3ea7 100644 --- a/swh/web/common/identifiers.py +++ b/swh/web/common/identifiers.py @@ -1,384 +1,390 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List, Optional from urllib.parse import quote, unquote from typing_extensions import TypedDict from django.http import QueryDict from swh.model.exceptions import ValidationError from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, ObjectType, QualifiedSWHID, ) from swh.web.common import archive from swh.web.common.exc import BadInputExc from swh.web.common.typing import ( QueryParameters, SnapshotContext, SWHIDContext, SWHIDInfo, SWHObjectInfo, ) from swh.web.common.utils import reverse def gen_swhid( object_type: str, object_id: str, scheme_version: int = 1, metadata: SWHIDContext = {}, ) -> str: """ Returns the SoftWare Heritage persistent IDentifier for a swh object based on: * the object type * the object id * the SWHID scheme version Args: object_type: the swh object type (content/directory/release/revision/snapshot) object_id: the swh object id (hexadecimal representation of its hash value) scheme_version: the scheme version of the SWHIDs Returns: the SWHID of the object Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: decoded_object_type = ObjectType[object_type.upper()] decoded_object_id = hash_to_bytes(object_id) obj_swhid = str( QualifiedSWHID( object_type=decoded_object_type, object_id=decoded_object_id, scheme_version=scheme_version, **metadata, ) ) except (ValidationError, KeyError, ValueError) as e: raise BadInputExc("Invalid object (%s) for SWHID. %s" % (object_id, e)) else: return obj_swhid class ResolvedSWHID(TypedDict): """parsed SWHID with context""" swhid_parsed: QualifiedSWHID """URL to browse object according to SWHID context""" browse_url: Optional[str] def resolve_swhid( swhid: str, query_params: Optional[QueryParameters] = None ) -> ResolvedSWHID: """ Try to resolve a SoftWare Heritage persistent IDentifier into an url for browsing the targeted object. Args: swhid: a SoftWare Heritage persistent IDentifier query_params: optional dict filled with query parameters to append to the browse url Returns: a dict with the following keys: * **swhid_parsed**: the parsed identifier * **browse_url**: the url for browsing the targeted object """ swhid_parsed = get_swhid(swhid) object_type = swhid_parsed.object_type object_id = swhid_parsed.object_id browse_url = None url_args = {} query_dict = QueryDict("", mutable=True) fragment = "" process_lines = object_type == ObjectType.CONTENT if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] if swhid_parsed.origin: origin_url = unquote(swhid_parsed.origin) origin_url = archive.lookup_origin({"url": origin_url})["url"] query_dict["origin_url"] = origin_url if swhid_parsed.path and swhid_parsed.path != b"/": query_dict["path"] = swhid_parsed.path.decode("utf8", errors="replace") if swhid_parsed.anchor: directory = b"" if swhid_parsed.anchor.object_type == ObjectType.DIRECTORY: directory = swhid_parsed.anchor.object_id elif swhid_parsed.anchor.object_type == ObjectType.REVISION: revision = archive.lookup_revision( hash_to_hex(swhid_parsed.anchor.object_id) ) directory = revision["directory"] elif swhid_parsed.anchor.object_type == ObjectType.RELEASE: release = archive.lookup_release( hash_to_hex(swhid_parsed.anchor.object_id) ) if release["target_type"] == REVISION: revision = archive.lookup_revision(release["target"]) directory = revision["directory"] if object_type == ObjectType.CONTENT: if ( not swhid_parsed.origin and swhid_parsed.anchor.object_type != ObjectType.REVISION ): # when no origin or revision context, content objects need to have # their path prefixed by root directory id for breadcrumbs display query_dict["path"] = hash_to_hex(directory) + query_dict["path"] else: # remove leading slash from SWHID content path query_dict["path"] = query_dict["path"][1:] elif object_type == ObjectType.DIRECTORY: object_id = directory # remove leading and trailing slashes from SWHID directory path if query_dict["path"].endswith("/"): query_dict["path"] = query_dict["path"][1:-1] else: query_dict["path"] = query_dict["path"][1:] # snapshot context if swhid_parsed.visit: if swhid_parsed.visit.object_type != ObjectType.SNAPSHOT: raise BadInputExc("Visit must be a snapshot SWHID.") query_dict["snapshot"] = hash_to_hex(swhid_parsed.visit.object_id) if swhid_parsed.anchor: if ( swhid_parsed.anchor.object_type == ObjectType.REVISION and object_type != ObjectType.REVISION ): query_dict["revision"] = hash_to_hex(swhid_parsed.anchor.object_id) elif swhid_parsed.anchor.object_type == ObjectType.RELEASE: release = archive.lookup_release( hash_to_hex(swhid_parsed.anchor.object_id) ) if release: query_dict["release"] = release["name"] # browsing content or directory without snapshot context elif ( object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY) and swhid_parsed.anchor ): if swhid_parsed.anchor.object_type == ObjectType.REVISION: # anchor revision, objects are browsed from its view object_type = ObjectType.REVISION object_id = swhid_parsed.anchor.object_id elif ( object_type == ObjectType.DIRECTORY and swhid_parsed.anchor.object_type == ObjectType.DIRECTORY ): # a directory is browsed from its root object_id = swhid_parsed.anchor.object_id if object_type == ObjectType.CONTENT: url_args["query_string"] = f"sha1_git:{hash_to_hex(object_id)}" elif object_type in (ObjectType.DIRECTORY, ObjectType.RELEASE, ObjectType.REVISION): url_args["sha1_git"] = hash_to_hex(object_id) elif object_type == ObjectType.SNAPSHOT: url_args["snapshot_id"] = hash_to_hex(object_id) if swhid_parsed.lines and process_lines: lines = swhid_parsed.lines fragment += "#L" + str(lines[0]) if lines[1]: fragment += "-L" + str(lines[1]) if url_args: browse_url = ( reverse( f"browse-{object_type.name.lower()}", url_args=url_args, query_params=query_dict, ) + fragment ) return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url) def get_swhid(swhid: str) -> QualifiedSWHID: """Check if a SWHID is valid and return it parsed. Args: swhid: a SoftWare Heritage persistent IDentifier. Raises: BadInputExc: if the provided SWHID can not be parsed. Return: A parsed SWHID. """ try: + # ensure core part of SWHID is in lower case to avoid parsing error + qualifiers_pos = swhid.find(";") + if qualifiers_pos == -1: + swhid = swhid.lower() + else: + swhid = swhid[:qualifiers_pos].lower() + swhid[qualifiers_pos:] swhid_parsed = QualifiedSWHID.from_string(swhid) except ValidationError as ve: raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages)) else: return swhid_parsed def group_swhids(swhids: Iterable[QualifiedSWHID],) -> Dict[str, List[bytes]]: """ Groups many SoftWare Heritage persistent IDentifiers into a dictionary depending on their type. Args: swhids: an iterable of SoftWare Heritage persistent IDentifier objects Returns: A dictionary with: keys: object types values: object hashes """ swhids_by_type: Dict[str, List[bytes]] = { CONTENT: [], DIRECTORY: [], REVISION: [], RELEASE: [], SNAPSHOT: [], } for obj_swhid in swhids: obj_id = obj_swhid.object_id obj_type = obj_swhid.object_type swhids_by_type[obj_type.name.lower()].append(hash_to_bytes(obj_id)) return swhids_by_type def get_swhids_info( swh_objects: Iterable[SWHObjectInfo], snapshot_context: Optional[SnapshotContext] = None, extra_context: Optional[Dict[str, Any]] = None, ) -> List[SWHIDInfo]: """ Returns a list of dict containing info related to SWHIDs of objects. Args: swh_objects: an iterable of dict describing archived objects snapshot_context: optional dict parameter describing the snapshot in which the objects have been found extra_context: optional dict filled with extra contextual info about the objects Returns: a list of dict containing SWHIDs info """ swhids_info = [] for swh_object in swh_objects: if not swh_object["object_id"]: swhids_info.append( SWHIDInfo( object_type=swh_object["object_type"], object_id="", swhid="", swhid_url="", context={}, swhid_with_context=None, swhid_with_context_url=None, ) ) continue object_type = swh_object["object_type"] object_id = swh_object["object_id"] swhid_context: SWHIDContext = {} if snapshot_context: if snapshot_context["origin_info"] is not None: swhid_context["origin"] = quote( snapshot_context["origin_info"]["url"], safe="/?:@&" ) if object_type != SNAPSHOT: swhid_context["visit"] = gen_swhid( SNAPSHOT, snapshot_context["snapshot_id"] ) if object_type in (CONTENT, DIRECTORY): if snapshot_context["release_id"] is not None: swhid_context["anchor"] = gen_swhid( RELEASE, snapshot_context["release_id"] ) elif snapshot_context["revision_id"] is not None: swhid_context["anchor"] = gen_swhid( REVISION, snapshot_context["revision_id"] ) if object_type in (CONTENT, DIRECTORY): if ( extra_context and "revision" in extra_context and extra_context["revision"] and "anchor" not in swhid_context ): swhid_context["anchor"] = gen_swhid(REVISION, extra_context["revision"]) elif ( extra_context and "root_directory" in extra_context and extra_context["root_directory"] and "anchor" not in swhid_context and ( object_type != DIRECTORY or extra_context["root_directory"] != object_id ) ): swhid_context["anchor"] = gen_swhid( DIRECTORY, extra_context["root_directory"] ) path = None if extra_context and "path" in extra_context: path = extra_context["path"] or "/" if "filename" in extra_context and object_type == CONTENT: path += extra_context["filename"] if object_type == DIRECTORY and path == "/": path = None if path: swhid_context["path"] = quote(path, safe="/?:@&") swhid = gen_swhid(object_type, object_id) swhid_url = reverse("browse-swhid", url_args={"swhid": swhid}) swhid_with_context = None swhid_with_context_url = None if swhid_context: swhid_with_context = gen_swhid( object_type, object_id, metadata=swhid_context ) swhid_with_context_url = reverse( "browse-swhid", url_args={"swhid": swhid_with_context} ) swhids_info.append( SWHIDInfo( object_type=object_type, object_id=object_id, swhid=swhid, swhid_url=swhid_url, context=swhid_context, swhid_with_context=swhid_with_context, swhid_with_context_url=swhid_with_context_url, ) ) return swhids_info diff --git a/swh/web/tests/browse/views/test_identifiers.py b/swh/web/tests/browse/views/test_identifiers.py index afdf18de..3d374250 100644 --- a/swh/web/tests/browse/views/test_identifiers.py +++ b/swh/web/tests/browse/views/test_identifiers.py @@ -1,213 +1,239 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from urllib.parse import quote from hypothesis import given from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT from swh.model.model import Origin from swh.web.common.identifiers import gen_swhid from swh.web.common.utils import reverse from swh.web.tests.django_asserts import assert_contains from swh.web.tests.strategies import ( content, directory, origin, release, revision, snapshot, ) from swh.web.tests.utils import check_html_get_response @given(content()) def test_content_id_browse(client, content): cnt_sha1_git = content["sha1_git"] swhid = gen_swhid(CONTENT, cnt_sha1_git) - url = reverse("browse-swhid", url_args={"swhid": swhid}) - query_string = "sha1_git:" + cnt_sha1_git - content_browse_url = reverse( - "browse-content", url_args={"query_string": query_string} - ) + for swhid_ in (swhid, swhid.upper()): + url = reverse("browse-swhid", url_args={"swhid": swhid_}) - resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == content_browse_url + query_string = "sha1_git:" + cnt_sha1_git + content_browse_url = reverse( + "browse-content", url_args={"query_string": query_string} + ) + + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == content_browse_url @given(directory()) def test_directory_id_browse(client, directory): swhid = gen_swhid(DIRECTORY, directory) - url = reverse("browse-swhid", url_args={"swhid": swhid}) - directory_browse_url = reverse("browse-directory", url_args={"sha1_git": directory}) + for swhid_ in (swhid, swhid.upper()): + url = reverse("browse-swhid", url_args={"swhid": swhid_}) - resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == directory_browse_url + directory_browse_url = reverse( + "browse-directory", url_args={"sha1_git": directory} + ) + + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == directory_browse_url @given(revision()) def test_revision_id_browse(client, revision): swhid = gen_swhid(REVISION, revision) - url = reverse("browse-swhid", url_args={"swhid": swhid}) - revision_browse_url = reverse("browse-revision", url_args={"sha1_git": revision}) + for swhid_ in (swhid, swhid.upper()): + url = reverse("browse-swhid", url_args={"swhid": swhid_}) - resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == revision_browse_url + revision_browse_url = reverse( + "browse-revision", url_args={"sha1_git": revision} + ) - query_params = {"origin_url": "https://github.com/user/repo"} - url = reverse("browse-swhid", url_args={"swhid": swhid}, query_params=query_params) + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == revision_browse_url - revision_browse_url = reverse( - "browse-revision", url_args={"sha1_git": revision}, query_params=query_params - ) + query_params = {"origin_url": "https://github.com/user/repo"} + url = reverse( + "browse-swhid", url_args={"swhid": swhid_}, query_params=query_params + ) - resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == revision_browse_url + revision_browse_url = reverse( + "browse-revision", + url_args={"sha1_git": revision}, + query_params=query_params, + ) + + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == revision_browse_url @given(release()) def test_release_id_browse(client, release): swhid = gen_swhid(RELEASE, release) - url = reverse("browse-swhid", url_args={"swhid": swhid}) - release_browse_url = reverse("browse-release", url_args={"sha1_git": release}) + for swhid_ in (swhid, swhid.upper()): + url = reverse("browse-swhid", url_args={"swhid": swhid_}) - resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == release_browse_url + release_browse_url = reverse("browse-release", url_args={"sha1_git": release}) - query_params = {"origin_url": "https://github.com/user/repo"} + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == release_browse_url - url = reverse("browse-swhid", url_args={"swhid": swhid}, query_params=query_params) + query_params = {"origin_url": "https://github.com/user/repo"} - release_browse_url = reverse( - "browse-release", url_args={"sha1_git": release}, query_params=query_params - ) + url = reverse( + "browse-swhid", url_args={"swhid": swhid_}, query_params=query_params + ) - resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == release_browse_url + release_browse_url = reverse( + "browse-release", url_args={"sha1_git": release}, query_params=query_params + ) + + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == release_browse_url @given(snapshot()) def test_snapshot_id_browse(client, snapshot): swhid = gen_swhid(SNAPSHOT, snapshot) - url = reverse("browse-swhid", url_args={"swhid": swhid}) - snapshot_browse_url = reverse("browse-snapshot", url_args={"snapshot_id": snapshot}) + for swhid_ in (swhid, swhid.upper()): + url = reverse("browse-swhid", url_args={"swhid": swhid_}) - resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == snapshot_browse_url + snapshot_browse_url = reverse( + "browse-snapshot", url_args={"snapshot_id": snapshot} + ) - query_params = {"origin_url": "https://github.com/user/repo"} + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == snapshot_browse_url - url = reverse("browse-swhid", url_args={"swhid": swhid}, query_params=query_params) + query_params = {"origin_url": "https://github.com/user/repo"} - release_browse_url = reverse( - "browse-snapshot", url_args={"snapshot_id": snapshot}, query_params=query_params - ) + url = reverse( + "browse-swhid", url_args={"swhid": swhid_}, query_params=query_params + ) - resp = check_html_get_response(client, url, status_code=302) - assert resp["location"] == release_browse_url + release_browse_url = reverse( + "browse-snapshot", + url_args={"snapshot_id": snapshot}, + query_params=query_params, + ) + + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == release_browse_url @given(release()) def test_bad_id_browse(client, release): swhid = f"swh:1:foo:{release}" url = reverse("browse-swhid", url_args={"swhid": swhid}) check_html_get_response(client, url, status_code=400) @given(content()) def test_content_id_optional_parts_browse(client, archive_data, content): cnt_sha1_git = content["sha1_git"] origin_url = "https://github.com/user/repo" archive_data.origin_add([Origin(url=origin_url)]) swhid = gen_swhid( CONTENT, cnt_sha1_git, metadata={"lines": "4-20", "origin": origin_url}, ) url = reverse("browse-swhid", url_args={"swhid": swhid}) query_string = "sha1_git:" + cnt_sha1_git content_browse_url = reverse( "browse-content", url_args={"query_string": query_string}, query_params={"origin_url": origin_url}, ) content_browse_url += "#L4-L20" resp = check_html_get_response(client, url, status_code=302) assert resp["location"] == content_browse_url @given(release()) def test_origin_id_not_resolvable(client, release): swhid = "swh:1:ori:8068d0075010b590762c6cb5682ed53cb3c13deb" url = reverse("browse-swhid", url_args={"swhid": swhid}) check_html_get_response(client, url, status_code=400) @given(origin()) def test_legacy_swhid_browse(archive_data, client, origin): snapshot = archive_data.snapshot_get_latest(origin["url"]) revision = archive_data.snapshot_get_head(snapshot) directory = archive_data.revision_get(revision)["directory"] directory_content = archive_data.directory_ls(directory) directory_file = random.choice( [e for e in directory_content if e["type"] == "file"] ) legacy_swhid = gen_swhid( CONTENT, directory_file["checksums"]["sha1_git"], metadata={"origin": origin["url"]}, ) url = reverse("browse-swhid", url_args={"swhid": legacy_swhid}) resp = check_html_get_response(client, url, status_code=302) resp = check_html_get_response( client, resp["location"], status_code=200, template_used="browse/content.html" ) swhid = gen_swhid( CONTENT, directory_file["checksums"]["sha1_git"], metadata={ "origin": origin["url"], "visit": gen_swhid(SNAPSHOT, snapshot["id"]), "anchor": gen_swhid(REVISION, revision), }, ) assert_contains(resp, swhid) # also check legacy SWHID URL with trailing slash url = reverse("browse-swhid-legacy", url_args={"swhid": swhid}) resp = check_html_get_response(client, url, status_code=302) resp = check_html_get_response( client, resp["location"], status_code=200, template_used="browse/content.html" ) assert_contains(resp, swhid) @given(directory()) def test_browse_swhid_special_characters_escaping(client, archive_data, directory): origin = "http://example.org/?project=abc;" archive_data.origin_add([Origin(url=origin)]) origin_swhid_escaped = quote(origin, safe="/?:@&") origin_swhid_url_escaped = quote(origin, safe="/:@;") swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}) url = reverse("browse-swhid", url_args={"swhid": swhid}) resp = check_html_get_response(client, url, status_code=302) assert origin_swhid_url_escaped in resp["location"] diff --git a/swh/web/tests/common/test_identifiers.py b/swh/web/tests/common/test_identifiers.py index 1ea3dafb..96858aff 100644 --- a/swh/web/tests/common/test_identifiers.py +++ b/swh/web/tests/common/test_identifiers.py @@ -1,726 +1,739 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from urllib.parse import quote from hypothesis import given import pytest from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, QualifiedSWHID, ) from swh.model.model import Origin from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.common.exc import BadInputExc from swh.web.common.identifiers import ( gen_swhid, get_swhid, get_swhids_info, group_swhids, resolve_swhid, ) from swh.web.common.typing import SWHObjectInfo from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( content, directory, directory_with_files, directory_with_subdirs, origin, origin_with_multiple_visits, release, revision, snapshot, ) @given(content()) def test_gen_swhid(content): swh_object_type = CONTENT sha1_git = content["sha1_git"] expected_swhid = "swh:1:cnt:" + sha1_git assert gen_swhid(swh_object_type, sha1_git) == expected_swhid assert ( gen_swhid(swh_object_type, sha1_git, metadata={"origin": "test"}) == expected_swhid + ";origin=test" ) assert ( gen_swhid(swh_object_type, sha1_git, metadata={"origin": None}) == expected_swhid ) with pytest.raises(BadInputExc) as e: gen_swhid("foo", sha1_git) assert e.match("Invalid object") with pytest.raises(BadInputExc) as e: gen_swhid(swh_object_type, "not a valid id") assert e.match("Invalid object") @given(content(), directory(), release(), revision(), snapshot()) def test_resolve_swhid_legacy(content, directory, release, revision, snapshot): for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), (RELEASE, release), (REVISION, revision), (SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id) url_args = {} if obj_type == CONTENT: url_args["query_string"] = f"sha1_git:{obj_id}" elif obj_type == SNAPSHOT: url_args["snapshot_id"] = obj_id else: url_args["sha1_git"] = obj_id query_params = {"origin_url": "some-origin"} browse_url = reverse( f"browse-{obj_type}", url_args=url_args, query_params=query_params ) - resolved_swhid = resolve_swhid(swhid, query_params) + for swhid_ in (swhid, swhid.upper()): + resolved_swhid = resolve_swhid(swhid_, query_params) - assert isinstance(resolved_swhid["swhid_parsed"], QualifiedSWHID) - assert str(resolved_swhid["swhid_parsed"]) == swhid - assert resolved_swhid["browse_url"] == browse_url + assert isinstance(resolved_swhid["swhid_parsed"], QualifiedSWHID) + assert str(resolved_swhid["swhid_parsed"]) == swhid + assert resolved_swhid["browse_url"] == browse_url with pytest.raises(BadInputExc, match="'ori' is not a valid ObjectType"): resolve_swhid(f"swh:1:ori:{random_sha1()}") @given(content(), directory(), release(), revision(), snapshot()) def test_get_swhid(content, directory, release, revision, snapshot): for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), (RELEASE, release), (REVISION, revision), (SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id) - swh_parsed_swhid = get_swhid(swhid) - - assert isinstance(swh_parsed_swhid, QualifiedSWHID) - assert str(swh_parsed_swhid) == swhid + for swhid_ in (swhid, swhid.upper()): + swh_parsed_swhid = get_swhid(swhid_) + assert isinstance(swh_parsed_swhid, QualifiedSWHID) + assert str(swh_parsed_swhid) == swhid.lower() with pytest.raises(BadInputExc, match="Error when parsing identifier"): get_swhid("foo") @given(content(), directory(), release(), revision(), snapshot()) def test_group_swhids(content, directory, release, revision, snapshot): swhids = [] expected = {} for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), (RELEASE, release), (REVISION, revision), (SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id) swhid = get_swhid(swhid) swhids.append(swhid) expected[obj_type] = [hash_to_bytes(obj_id)] swhid_groups = group_swhids(swhids) assert swhid_groups == expected @given(directory_with_subdirs()) def test_get_swhids_info_directory_context(archive_data, directory): swhid = get_swhids_info( [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)], snapshot_context=None, )[0] assert swhid["swhid_with_context"] is None # path qualifier should be discarded for a root directory swhid = get_swhids_info( [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)], snapshot_context=None, extra_context={"path": "/"}, )[0] assert swhid["swhid_with_context"] is None dir_content = archive_data.directory_ls(directory) dir_subdirs = [e for e in dir_content if e["type"] == "dir"] dir_subdir = random.choice(dir_subdirs) dir_subdir_path = f'/{dir_subdir["name"]}/' dir_subdir_content = archive_data.directory_ls(dir_subdir["target"]) dir_subdir_files = [e for e in dir_subdir_content if e["type"] == "file"] swh_objects_info = [ SWHObjectInfo(object_type=DIRECTORY, object_id=dir_subdir["target"]) ] extra_context = { "root_directory": directory, "path": dir_subdir_path, } if dir_subdir_files: dir_subdir_file = random.choice(dir_subdir_files) extra_context["filename"] = dir_subdir_file["name"] swh_objects_info.append( SWHObjectInfo( object_type=CONTENT, object_id=dir_subdir_file["checksums"]["sha1_git"] ) ) swhids = get_swhids_info( swh_objects_info, snapshot_context=None, extra_context=extra_context, ) - swhid_dir_parsed = get_swhid(swhids[0]["swhid_with_context"]) + swhid_lower = swhids[0]["swhid_with_context"] + swhid_upper = swhid_lower.replace(swhids[0]["swhid"], swhids[0]["swhid"].upper()) - anchor = gen_swhid(DIRECTORY, directory) + for swhid in (swhid_lower, swhid_upper): + swhid_dir_parsed = get_swhid(swhid) - assert swhid_dir_parsed.qualifiers() == { - "anchor": anchor, - "path": dir_subdir_path, - } + anchor = gen_swhid(DIRECTORY, directory) + + assert swhid_dir_parsed.qualifiers() == { + "anchor": anchor, + "path": dir_subdir_path, + } if dir_subdir_files: swhid_cnt_parsed = get_swhid(swhids[1]["swhid_with_context"]) assert swhid_cnt_parsed.qualifiers() == { "anchor": anchor, "path": f'{dir_subdir_path}{dir_subdir_file["name"]}', } @given(revision()) def test_get_swhids_info_revision_context(archive_data, revision): revision_data = archive_data.revision_get(revision) directory = revision_data["directory"] dir_content = archive_data.directory_ls(directory) dir_entry = random.choice(dir_content) swh_objects = [ SWHObjectInfo(object_type=REVISION, object_id=revision), SWHObjectInfo(object_type=DIRECTORY, object_id=directory), ] extra_context = {"revision": revision, "path": "/"} if dir_entry["type"] == "file": swh_objects.append( SWHObjectInfo( object_type=CONTENT, object_id=dir_entry["checksums"]["sha1_git"] ) ) extra_context["filename"] = dir_entry["name"] swhids = get_swhids_info( swh_objects, snapshot_context=None, extra_context=extra_context, ) assert swhids[0]["context"] == {} - swhid_dir_parsed = get_swhid(swhids[1]["swhid_with_context"]) - anchor = gen_swhid(REVISION, revision) + swhid_lower = swhids[1]["swhid_with_context"] + swhid_upper = swhid_lower.replace(swhids[1]["swhid"], swhids[1]["swhid"].upper()) - assert swhid_dir_parsed.qualifiers() == { - "anchor": anchor, - } + for swhid in (swhid_lower, swhid_upper): + swhid_dir_parsed = get_swhid(swhid) + + anchor = gen_swhid(REVISION, revision) + + assert swhid_dir_parsed.qualifiers() == { + "anchor": anchor, + } if dir_entry["type"] == "file": swhid_cnt_parsed = get_swhid(swhids[2]["swhid_with_context"]) assert swhid_cnt_parsed.qualifiers() == { "anchor": anchor, "path": f'/{dir_entry["name"]}', } @given(origin_with_multiple_visits()) def test_get_swhids_info_origin_snapshot_context(archive_data, origin): """ Test SWHIDs with contextual info computation under a variety of origin / snapshot browsing contexts. """ visits = archive_data.origin_visit_get(origin["url"]) for visit in visits: snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_id = snapshot["id"] branches = { k: v["target"] for k, v in snapshot["branches"].items() if v["target_type"] == "revision" } releases = { k: v["target"] for k, v in snapshot["branches"].items() if v["target_type"] == "release" } head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir = head_rev["directory"] dir_content = archive_data.directory_ls(root_dir) dir_files = [e for e in dir_content if e["type"] == "file"] dir_file = random.choice(dir_files) revision_log = [r["id"] for r in archive_data.revision_log(head_rev_id)] branch_name = random.choice(list(branches)) release = random.choice(list(releases)) release_data = archive_data.release_get(releases[release]) release_name = release_data["name"] revision_id = random.choice(revision_log) for snp_ctx_params, anchor_info in ( ( {"snapshot_id": snapshot_id}, {"anchor_type": REVISION, "anchor_id": head_rev_id}, ), ( {"snapshot_id": snapshot_id, "branch_name": branch_name}, {"anchor_type": REVISION, "anchor_id": branches[branch_name]}, ), ( {"snapshot_id": snapshot_id, "release_name": release_name}, {"anchor_type": RELEASE, "anchor_id": releases[release]}, ), ( {"snapshot_id": snapshot_id, "revision_id": revision_id}, {"anchor_type": REVISION, "anchor_id": revision_id}, ), ( {"origin_url": origin["url"], "snapshot_id": snapshot_id}, {"anchor_type": REVISION, "anchor_id": head_rev_id}, ), ( { "origin_url": origin["url"], "snapshot_id": snapshot_id, "branch_name": branch_name, }, {"anchor_type": REVISION, "anchor_id": branches[branch_name]}, ), ( { "origin_url": origin["url"], "snapshot_id": snapshot_id, "release_name": release_name, }, {"anchor_type": RELEASE, "anchor_id": releases[release]}, ), ( { "origin_url": origin["url"], "snapshot_id": snapshot_id, "revision_id": revision_id, }, {"anchor_type": REVISION, "anchor_id": revision_id}, ), ): snapshot_context = get_snapshot_context(**snp_ctx_params) rev_id = head_rev_id if "branch_name" in snp_ctx_params: rev_id = branches[branch_name] elif "release_name" in snp_ctx_params: rev_id = release_data["target"] elif "revision_id" in snp_ctx_params: rev_id = revision_id swh_objects = [ SWHObjectInfo( object_type=CONTENT, object_id=dir_file["checksums"]["sha1_git"] ), SWHObjectInfo(object_type=DIRECTORY, object_id=root_dir), SWHObjectInfo(object_type=REVISION, object_id=rev_id), SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id), ] if "release_name" in snp_ctx_params: swh_objects.append( SWHObjectInfo(object_type=RELEASE, object_id=release_data["id"]) ) swhids = get_swhids_info( swh_objects, snapshot_context, extra_context={"path": "/", "filename": dir_file["name"]}, ) swhid_cnt_parsed = get_swhid(swhids[0]["swhid_with_context"]) swhid_dir_parsed = get_swhid(swhids[1]["swhid_with_context"]) swhid_rev_parsed = get_swhid(swhids[2]["swhid_with_context"]) swhid_snp_parsed = get_swhid( swhids[3]["swhid_with_context"] or swhids[3]["swhid"] ) swhid_rel_parsed = None if "release_name" in snp_ctx_params: swhid_rel_parsed = get_swhid(swhids[4]["swhid_with_context"]) anchor = gen_swhid( object_type=anchor_info["anchor_type"], object_id=anchor_info["anchor_id"], ) snapshot_swhid = gen_swhid(object_type=SNAPSHOT, object_id=snapshot_id) expected_cnt_context = { "visit": snapshot_swhid, "anchor": anchor, "path": f'/{dir_file["name"]}', } expected_dir_context = { "visit": snapshot_swhid, "anchor": anchor, } expected_rev_context = {"visit": snapshot_swhid} expected_snp_context = {} if "origin_url" in snp_ctx_params: expected_cnt_context["origin"] = origin["url"] expected_dir_context["origin"] = origin["url"] expected_rev_context["origin"] = origin["url"] expected_snp_context["origin"] = origin["url"] assert swhid_cnt_parsed.qualifiers() == expected_cnt_context assert swhid_dir_parsed.qualifiers() == expected_dir_context assert swhid_rev_parsed.qualifiers() == expected_rev_context assert swhid_snp_parsed.qualifiers() == expected_snp_context if "release_name" in snp_ctx_params: assert swhid_rel_parsed.qualifiers() == expected_rev_context @given(origin(), directory()) def test_get_swhids_info_characters_and_url_escaping(archive_data, origin, directory): snapshot_context = get_snapshot_context(origin_url=origin["url"]) snapshot_context["origin_info"]["url"] = "http://example.org/?project=abc;def%" path = "/foo;/bar%" swhid_info = get_swhids_info( [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)], snapshot_context=snapshot_context, extra_context={"path": path}, )[0] # check special characters in SWHID have been escaped assert ( swhid_info["context"]["origin"] == "http://example.org/?project%3Dabc%3Bdef%25" ) assert swhid_info["context"]["path"] == "/foo%3B/bar%25" # check special characters in SWHID URL have been escaped parsed_url_swhid = QualifiedSWHID.from_string( swhid_info["swhid_with_context_url"][1:] ) assert ( parsed_url_swhid.qualifiers()["origin"] == "http://example.org/%3Fproject%253Dabc%253Bdef%2525" ) assert parsed_url_swhid.qualifiers()["path"] == "/foo%253B/bar%2525" @given(origin_with_multiple_visits()) def test_resolve_swhids_snapshot_context(client, archive_data, origin): visits = archive_data.origin_visit_get(origin["url"]) visit = random.choice(visits) snapshot = archive_data.snapshot_get(visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) branch_info = None release_info = None for branch_name in sorted(snapshot["branches"]): target_type = snapshot["branches"][branch_name]["target_type"] target = snapshot["branches"][branch_name]["target"] if target_type == "revision" and branch_info is None: branch_info = {"name": branch_name, "revision": target} elif target_type == "release" and release_info is None: release_info = {"name": branch_name, "release": target} if branch_info and release_info: break release_info["name"] = archive_data.release_get(release_info["release"])["name"] directory = archive_data.revision_get(branch_info["revision"])["directory"] directory_content = archive_data.directory_ls(directory) directory_subdirs = [e for e in directory_content if e["type"] == "dir"] directory_subdir = None if directory_subdirs: directory_subdir = random.choice(directory_subdirs) directory_files = [e for e in directory_content if e["type"] == "file"] directory_file = None if directory_files: directory_file = random.choice(directory_files) random_rev_id = random.choice(archive_data.revision_log(head_rev_id))["id"] for snp_ctx_params in ( {}, {"branch_name": branch_info["name"]}, {"release_name": release_info["name"]}, {"revision_id": random_rev_id}, ): snapshot_context = get_snapshot_context( snapshot["id"], origin["url"], **snp_ctx_params ) _check_resolved_swhid_browse_url(SNAPSHOT, snapshot["id"], snapshot_context) rev = head_rev_id if "branch_name" in snp_ctx_params: rev = branch_info["revision"] if "revision_id" in snp_ctx_params: rev = random_rev_id _check_resolved_swhid_browse_url(REVISION, rev, snapshot_context) _check_resolved_swhid_browse_url( DIRECTORY, directory, snapshot_context, path="/" ) if directory_subdir: _check_resolved_swhid_browse_url( DIRECTORY, directory_subdir["target"], snapshot_context, path=f"/{directory_subdir['name']}/", ) if directory_file: _check_resolved_swhid_browse_url( CONTENT, directory_file["target"], snapshot_context, path=f"/{directory_file['name']}", ) _check_resolved_swhid_browse_url( CONTENT, directory_file["target"], snapshot_context, path=f"/{directory_file['name']}", lines="10", ) _check_resolved_swhid_browse_url( CONTENT, directory_file["target"], snapshot_context, path=f"/{directory_file['name']}", lines="10-20", ) def _check_resolved_swhid_browse_url( object_type, object_id, snapshot_context, path=None, lines=None ): snapshot_id = snapshot_context["snapshot_id"] origin_url = None if snapshot_context["origin_info"]: origin_url = snapshot_context["origin_info"]["url"] obj_context = {} query_params = {} if origin_url: obj_context["origin"] = origin_url query_params["origin_url"] = origin_url obj_context["visit"] = gen_swhid(SNAPSHOT, snapshot_id) query_params["snapshot"] = snapshot_id if object_type in (CONTENT, DIRECTORY, REVISION): if snapshot_context["release"]: obj_context["anchor"] = gen_swhid(RELEASE, snapshot_context["release_id"]) query_params["release"] = snapshot_context["release"] else: obj_context["anchor"] = gen_swhid(REVISION, snapshot_context["revision_id"]) if object_type != REVISION: query_params["revision"] = snapshot_context["revision_id"] if path: obj_context["path"] = path if path != "/": if object_type == CONTENT: query_params["path"] = path[1:] else: query_params["path"] = path[1:-1] if object_type == DIRECTORY: object_id = snapshot_context["root_directory"] if lines: obj_context["lines"] = lines - obj_swhid = gen_swhid(object_type, object_id, metadata=obj_context) + obj_core_swhid = gen_swhid(object_type, object_id) + obj_swhid_lower = gen_swhid(object_type, object_id, metadata=obj_context) + obj_swhid_upper = obj_swhid_lower.replace(obj_core_swhid, obj_core_swhid.upper(), 1) - obj_swhid_resolved = resolve_swhid(obj_swhid) + for obj_swhid in (obj_swhid_lower, obj_swhid_upper): + obj_swhid_resolved = resolve_swhid(obj_swhid) - url_args = {"sha1_git": object_id} - if object_type == CONTENT: - url_args = {"query_string": f"sha1_git:{object_id}"} - elif object_type == SNAPSHOT: - url_args = {"snapshot_id": object_id} + url_args = {"sha1_git": object_id} + if object_type == CONTENT: + url_args = {"query_string": f"sha1_git:{object_id}"} + elif object_type == SNAPSHOT: + url_args = {"snapshot_id": object_id} - expected_url = reverse( - f"browse-{object_type}", url_args=url_args, query_params=query_params, - ) - if lines: - lines_number = lines.split("-") - expected_url += f"#L{lines_number[0]}" - if len(lines_number) > 1: - expected_url += f"-L{lines_number[1]}" + expected_url = reverse( + f"browse-{object_type}", url_args=url_args, query_params=query_params, + ) + if lines: + lines_number = lines.split("-") + expected_url += f"#L{lines_number[0]}" + if len(lines_number) > 1: + expected_url += f"-L{lines_number[1]}" - assert obj_swhid_resolved["browse_url"] == expected_url + assert obj_swhid_resolved["browse_url"] == expected_url @given(directory()) def test_resolve_swhid_with_escaped_chars(directory): origin = "http://example.org/?project=abc;" origin_swhid_escaped = quote(origin, safe="/?:@&") origin_swhid_url_escaped = quote(origin, safe="/:@;") swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped assert origin_swhid_url_escaped in resolved_swhid["browse_url"] @given(directory_with_subdirs()) def test_resolve_directory_swhid_path_without_trailing_slash(archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_subdirs = [e for e in dir_content if e["type"] == "dir"] dir_subdir = random.choice(dir_subdirs) dir_subdir_path = dir_subdir["name"] anchor = gen_swhid(DIRECTORY, directory) swhid = gen_swhid( DIRECTORY, dir_subdir["target"], metadata={"anchor": anchor, "path": "/" + dir_subdir_path}, ) resolved_swhid = resolve_swhid(swhid) browse_url = reverse( "browse-directory", url_args={"sha1_git": directory}, query_params={"path": dir_subdir_path}, ) assert resolved_swhid["browse_url"] == browse_url @given(directory()) def test_resolve_swhid_with_malformed_origin_url(archive_data, directory): origin_url = "http://example.org/project/abc" malformed_origin_url = "http:/example.org/project/abc" archive_data.origin_add([Origin(url=origin_url)]) swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": malformed_origin_url}) resolved_swhid = resolve_swhid(swhid) assert origin_url in resolved_swhid["browse_url"] @given(revision()) def test_resolve_dir_entry_swhid_with_anchor_revision(archive_data, revision): revision_data = archive_data.revision_get(revision) directory = revision_data["directory"] dir_content = archive_data.directory_ls(directory) dir_entry = random.choice(dir_content) rev_swhid = gen_swhid(REVISION, revision) if dir_entry["type"] == "rev": return if dir_entry["type"] == "file": swhid = gen_swhid( CONTENT, dir_entry["checksums"]["sha1_git"], metadata={"anchor": rev_swhid, "path": f"/{dir_entry['name']}"}, ) else: swhid = gen_swhid( DIRECTORY, dir_entry["target"], metadata={"anchor": rev_swhid, "path": f"/{dir_entry['name']}/"}, ) browse_url = reverse( "browse-revision", url_args={"sha1_git": revision}, query_params={"path": dir_entry["name"]}, ) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["browse_url"] == browse_url @given(directory_with_subdirs()) def test_resolve_dir_entry_swhid_with_anchor_directory(archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_entry = random.choice( [entry for entry in dir_content if entry["type"] == "dir"] ) dir_swhid = gen_swhid(DIRECTORY, directory) swhid = gen_swhid( DIRECTORY, dir_entry["target"], metadata={"anchor": dir_swhid, "path": f"/{dir_entry['name']}/"}, ) browse_url = reverse( "browse-directory", url_args={"sha1_git": directory}, query_params={"path": f"{dir_entry['name']}"}, ) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["browse_url"] == browse_url @given(directory_with_files()) def test_resolve_file_entry_swhid_with_anchor_directory(archive_data, directory): dir_content = archive_data.directory_ls(directory) file_entry = random.choice( [entry for entry in dir_content if entry["type"] == "file"] ) dir_swhid = gen_swhid(DIRECTORY, directory) sha1_git = file_entry["checksums"]["sha1_git"] swhid = gen_swhid( CONTENT, sha1_git, metadata={"anchor": dir_swhid, "path": f"/{file_entry['name']}"}, ) browse_url = reverse( "browse-content", url_args={"query_string": f"sha1_git:{sha1_git}"}, query_params={"path": f"{directory}/{file_entry['name']}"}, ) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["browse_url"] == browse_url diff --git a/swh/web/urls.py b/swh/web/urls.py index 72109b39..fbd4a02d 100644 --- a/swh/web/urls.py +++ b/swh/web/urls.py @@ -1,82 +1,82 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django_js_reverse.views import urls_js from django.conf import settings from django.conf.urls import ( handler400, handler403, handler404, handler500, include, url, ) from django.contrib.auth.views import LogoutView from django.contrib.staticfiles.views import serve from django.shortcuts import render from django.views.generic.base import RedirectView from swh.web.browse.identifiers import swhid_browse from swh.web.common.exc import ( swh_handle400, swh_handle403, swh_handle404, swh_handle500, ) from swh.web.config import get_config swh_web_config = get_config() favicon_view = RedirectView.as_view( url="/static/img/icons/swh-logo-32x32.png", permanent=True ) def _default_view(request): return render(request, "homepage.html") urlpatterns = [ url(r"^admin/", include("swh.web.admin.urls")), url(r"^favicon\.ico$", favicon_view), url(r"^api/", include("swh.web.api.urls")), url(r"^browse/", include("swh.web.browse.urls")), url(r"^$", _default_view, name="swh-web-homepage"), url(r"^jsreverse/$", urls_js, name="js_reverse"), # keep legacy SWHID resolving URL with trailing slash for backward compatibility url( - r"^(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)/$", + r"^(?P(swh|SWH):[0-9]+:[A-Za-z]+:[0-9A-Fa-f]+.*)/$", swhid_browse, name="browse-swhid-legacy", ), url( - r"^(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)$", + r"^(?P(swh|SWH):[0-9]+:[A-Za-z]+:[0-9A-Fa-f]+.*)$", swhid_browse, name="browse-swhid", ), url(r"^", include("swh.web.misc.urls")), url(r"^", include("swh.web.auth.views")), url(r"^logout/$", LogoutView.as_view(template_name="logout.html"), name="logout"), ] # allow to serve assets through django staticfiles # even if settings.DEBUG is False def insecure_serve(request, path, **kwargs): return serve(request, path, insecure=True, **kwargs) # enable to serve compressed assets through django development server if swh_web_config["serve_assets"]: static_pattern = r"^%s(?P.*)$" % settings.STATIC_URL[1:] urlpatterns.append(url(static_pattern, insecure_serve)) handler400 = swh_handle400 # noqa handler403 = swh_handle403 # noqa handler404 = swh_handle404 # noqa handler500 = swh_handle500 # noqa