diff --git a/swh/web/browse/views/revision.py b/swh/web/browse/views/revision.py --- a/swh/web/browse/views/revision.py +++ b/swh/web/browse/views/revision.py @@ -488,6 +488,9 @@ mimetype = content_display_data["mimetype"] if path: filename = path_info[-1]["name"] + query_params["filename"] = filename + filepath = "/".join(pi["name"] for pi in path_info[:-1]) + extra_context["path"] = f"/{filepath}/" if filepath else "/" extra_context["filename"] = filename top_right_link = { diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py --- a/swh/web/common/identifiers.py +++ b/swh/web/common/identifiers.py @@ -23,6 +23,7 @@ PersistentId, ) +from swh.web.common import service from swh.web.common.exc import BadInputExc from swh.web.common.typing import ( QueryParameters, @@ -72,9 +73,12 @@ return swh_id -ResolvedPersistentId = TypedDict( - "ResolvedPersistentId", {"swh_id_parsed": PersistentId, "browse_url": Optional[str]} -) +class ResolvedPersistentId(TypedDict): + """parsed SWHID with context""" + + swh_id_parsed: PersistentId + """URL to browse object according to SWHID context""" + browse_url: Optional[str] def resolve_swh_persistent_id( @@ -99,48 +103,108 @@ object_type = swh_id_parsed.object_type object_id = swh_id_parsed.object_id browse_url = None + url_args = {} query_dict = QueryDict("", mutable=True) + fragment = "" + anchor_swhid_parsed = None + if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] + if "origin" in swh_id_parsed.metadata: query_dict["origin_url"] = swh_id_parsed.metadata["origin"] + + if "anchor" in swh_id_parsed.metadata: + anchor_swhid_parsed = get_persistent_identifier( + swh_id_parsed.metadata["anchor"] + ) + + if "path" in swh_id_parsed.metadata and swh_id_parsed.metadata["path"] != "/": + query_dict["path"] = swh_id_parsed.metadata["path"] + if anchor_swhid_parsed: + directory = "" + if anchor_swhid_parsed.object_type == DIRECTORY: + directory = anchor_swhid_parsed.object_id + elif anchor_swhid_parsed.object_type == REVISION: + revision = service.lookup_revision(anchor_swhid_parsed.object_id) + directory = revision["directory"] + elif anchor_swhid_parsed.object_type == RELEASE: + release = service.lookup_release(anchor_swhid_parsed.object_id) + if release["target_type"] == REVISION: + revision = service.lookup_revision(release["target"]) + directory = revision["directory"] + if object_type == CONTENT: + if "origin" not in swh_id_parsed.metadata: + # when no origin context, content objects need to have their + # path prefixed by root directory id for proper breadcrumbs display + query_dict["path"] = directory + query_dict["path"] + else: + # remove leading slash from SWHID content path + query_dict["path"] = query_dict["path"][1:] + elif object_type == DIRECTORY: + object_id = directory + # remove leading and trailing slashes from SWHID directory path + query_dict["path"] = query_dict["path"][1:-1] + + # snapshot context + if "visit" in swh_id_parsed.metadata: + + snp_swhid_parsed = get_persistent_identifier(swh_id_parsed.metadata["visit"]) + if snp_swhid_parsed.object_type != SNAPSHOT: + raise BadInputExc("Visit must be a snapshot SWHID.") + query_dict["snapshot"] = snp_swhid_parsed.object_id + + if anchor_swhid_parsed: + if anchor_swhid_parsed.object_type == REVISION: + # check if the anchor revision is the tip of a branch + branch_name = service.lookup_snapshot_branch_name_from_tip_revision( + snp_swhid_parsed.object_id, anchor_swhid_parsed.object_id + ) + if branch_name: + query_dict["branch"] = branch_name + elif object_type != REVISION: + query_dict["revision"] = anchor_swhid_parsed.object_id + + elif anchor_swhid_parsed.object_type == RELEASE: + release = service.lookup_release(anchor_swhid_parsed.object_id) + if release: + query_dict["release"] = release["name"] + + if object_type == REVISION and "release" not in query_dict: + branch_name = service.lookup_snapshot_branch_name_from_tip_revision( + snp_swhid_parsed.object_id, object_id + ) + if branch_name: + query_dict["branch"] = branch_name + + # browsing content or directory without snapshot context + elif object_type in (CONTENT, DIRECTORY) and anchor_swhid_parsed: + if anchor_swhid_parsed.object_type == REVISION: + # anchor revision, objects are browsed from its view + object_type = REVISION + object_id = anchor_swhid_parsed.object_id + elif object_type == DIRECTORY and anchor_swhid_parsed.object_type == DIRECTORY: + # a directory is browsed from its root + object_id = anchor_swhid_parsed.object_id + if object_type == CONTENT: query_string = "sha1_git:" + object_id - fragment = "" if "lines" in swh_id_parsed.metadata: lines = swh_id_parsed.metadata["lines"].split("-") fragment += "#L" + lines[0] if len(lines) > 1: fragment += "-L" + lines[1] - browse_url = ( - reverse( - "browse-content", - url_args={"query_string": query_string}, - query_params=query_dict, - ) - + fragment - ) + url_args["query_string"] = query_string + elif object_type == DIRECTORY: - browse_url = reverse( - "browse-directory", - url_args={"sha1_git": object_id}, - query_params=query_dict, - ) + url_args["sha1_git"] = object_id elif object_type == RELEASE: - browse_url = reverse( - "browse-release", url_args={"sha1_git": object_id}, query_params=query_dict - ) + url_args["sha1_git"] = object_id elif object_type == REVISION: - browse_url = reverse( - "browse-revision", url_args={"sha1_git": object_id}, query_params=query_dict - ) + url_args["sha1_git"] = object_id elif object_type == SNAPSHOT: - browse_url = reverse( - "browse-snapshot", - url_args={"snapshot_id": object_id}, - query_params=query_dict, - ) + url_args["snapshot_id"] = object_id elif object_type == ORIGIN: raise BadInputExc( ( @@ -150,7 +214,15 @@ ) ) - return {"swh_id_parsed": swh_id_parsed, "browse_url": browse_url} + if url_args: + browse_url = ( + reverse( + f"browse-{object_type}", url_args=url_args, query_params=query_dict, + ) + + fragment + ) + + return ResolvedPersistentId(swh_id_parsed=swh_id_parsed, browse_url=browse_url) def get_persistent_identifier(persistent_id: str) -> PersistentId: diff --git a/swh/web/common/service.py b/swh/web/common/service.py --- a/swh/web/common/service.py +++ b/swh/web/common/service.py @@ -1023,6 +1023,41 @@ return converters.from_snapshot(snapshot) +def lookup_snapshot_branch_name_from_tip_revision( + snapshot_id: str, revision_id: str +) -> Optional[str]: + """Check if a revision corresponds to the tip of a snapshot branch + + Args: + snapshot_id: hexadecimal representation of a snapshot id + revision_id: hexadecimal representation of a revision id + + Returns: + The name of the first found branch or None otherwise + """ + per_page = 10000 + branches_from = "" + snapshot: Dict[str, Any] = {"branches": {}} + branches = [] + while not branches_from or len(snapshot["branches"]) == per_page + 1: + snapshot = lookup_snapshot( + snapshot_id, + target_types=[REVISION], + branches_from=branches_from, + branches_count=per_page + 1, + ) + + branches += [ + {"name": k, "target": v["target"]} for k, v in snapshot["branches"].items() + ] + branches_from = branches[-1]["name"] + + for branch in branches: + if branch["target"] == revision_id: + return branch["name"] + return None + + def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. diff --git a/swh/web/tests/common/test_identifiers.py b/swh/web/tests/common/test_identifiers.py --- a/swh/web/tests/common/test_identifiers.py +++ b/swh/web/tests/common/test_identifiers.py @@ -62,7 +62,9 @@ @given(content(), directory(), release(), revision(), snapshot()) -def test_resolve_swh_persistent_id(content, directory, release, revision, snapshot): +def test_resolve_swh_persistent_id_legacy( + content, directory, release, revision, snapshot +): for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), @@ -417,3 +419,148 @@ assert swhid["context"]["origin"] == "http://example.org/?project%3Dabc%3Bdef%25" assert swhid["context"]["path"] == "/foo%3B/bar%25" + + +@given(origin_with_multiple_visits()) +def test_resolve_swhids_snapshot_context(client, archive_data, origin): + visits = archive_data.origin_visit_get(origin["url"]) + visit = random.choice(visits) + snapshot = archive_data.snapshot_get(visit["snapshot"]) + head_rev_id = archive_data.snapshot_get_head(snapshot) + branch_info = random.choice( + [ + {"name": k, "revision": v["target"]} + for k, v in snapshot["branches"].items() + if v["target_type"] == "revision" + ] + ) + release_info = random.choice( + [ + {"name": k, "release": v["target"]} + for k, v in snapshot["branches"].items() + if v["target_type"] == "release" + ] + ) + release_info["name"] = archive_data.release_get(release_info["release"])["name"] + + directory = archive_data.revision_get(branch_info["revision"])["directory"] + directory_content = archive_data.directory_ls(directory) + directory_subdir = random.choice( + [e for e in directory_content if e["type"] == "dir"] + ) + directory_file = random.choice( + [e for e in directory_content if e["type"] == "file"] + ) + random_rev_id = random.choice(archive_data.revision_log(head_rev_id))["id"] + + for snp_ctx_params in ( + {}, + {"branch_name": branch_info["name"]}, + {"release_name": release_info["name"]}, + {"revision_id": random_rev_id}, + ): + snapshot_context = get_snapshot_context( + snapshot["id"], origin["url"], **snp_ctx_params + ) + + _check_resolved_swhid_browse_url(SNAPSHOT, snapshot["id"], snapshot_context) + + rev = head_rev_id + if "branch_name" in snp_ctx_params: + rev = branch_info["revision"] + if "revision_id" in snp_ctx_params: + rev = random_rev_id + + _check_resolved_swhid_browse_url(REVISION, rev, snapshot_context) + + _check_resolved_swhid_browse_url( + DIRECTORY, directory, snapshot_context, path="/" + ) + + _check_resolved_swhid_browse_url( + DIRECTORY, + directory_subdir["target"], + snapshot_context, + path=f"/{directory_subdir['name']}/", + ) + + _check_resolved_swhid_browse_url( + CONTENT, + directory_file["target"], + snapshot_context, + path=f"/{directory_file['name']}", + ) + + +def _check_resolved_swhid_browse_url( + object_type, object_id, snapshot_context, path=None +): + snapshot_id = snapshot_context["snapshot_id"] + origin_url = None + if snapshot_context["origin_info"]: + origin_url = snapshot_context["origin_info"]["url"] + + obj_context = {} + query_params = {} + + if origin_url: + obj_context["origin"] = origin_url + query_params["origin_url"] = origin_url + + obj_context["visit"] = get_swh_persistent_id(SNAPSHOT, snapshot_id) + query_params["snapshot"] = snapshot_id + + if object_type in (CONTENT, DIRECTORY, REVISION): + if snapshot_context["release"]: + obj_context["anchor"] = get_swh_persistent_id( + RELEASE, snapshot_context["release_id"] + ) + query_params["release"] = snapshot_context["release"] + else: + obj_context["anchor"] = get_swh_persistent_id( + REVISION, snapshot_context["revision_id"] + ) + if ( + snapshot_context["branch"] + and snapshot_context["branch"] != snapshot_context["revision_id"] + ): + branch = snapshot_context["branch"] + if branch == "HEAD": + for b in snapshot_context["branches"]: + if ( + b["revision"] == snapshot_context["revision_id"] + and b["name"] != "HEAD" + ): + branch = b["name"] + break + + query_params["branch"] = branch + elif object_type != REVISION: + query_params["revision"] = snapshot_context["revision_id"] + + if path: + obj_context["path"] = path + if path != "/": + if object_type == CONTENT: + query_params["path"] = path[1:] + else: + query_params["path"] = path[1:-1] + + if object_type == DIRECTORY: + object_id = snapshot_context["root_directory"] + + obj_swhid = get_swh_persistent_id(object_type, object_id, metadata=obj_context) + + obj_swhid_resolved = resolve_swh_persistent_id(obj_swhid) + + url_args = {"sha1_git": object_id} + if object_type == CONTENT: + url_args = {"query_string": f"sha1_git:{object_id}"} + elif object_type == SNAPSHOT: + url_args = {"snapshot_id": object_id} + + expected_url = reverse( + f"browse-{object_type}", url_args=url_args, query_params=query_params, + ) + + assert obj_swhid_resolved["browse_url"] == expected_url diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -948,3 +948,24 @@ archive_data.origin_add_one(deb_origin) origin_info = service.lookup_origin({"url": deb_origin.url[:-1]}) assert origin_info["url"] == deb_origin.url + + +@given(snapshot()) +def test_lookup_snapshot_branch_name_from_tip_revision(archive_data, snapshot_id): + snapshot = archive_data.snapshot_get(snapshot_id) + branches = [ + {"name": k, "revision": v["target"]} + for k, v in snapshot["branches"].items() + if v["target_type"] == "revision" + ] + branch_info = random.choice(branches) + possible_results = [ + b["name"] for b in branches if b["revision"] == branch_info["revision"] + ] + + assert ( + service.lookup_snapshot_branch_name_from_tip_revision( + snapshot_id, branch_info["revision"] + ) + in possible_results + )