Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/identifiers.py
Show All 17 Lines | from swh.model.identifiers import ( | ||||
DIRECTORY, | DIRECTORY, | ||||
ORIGIN, | ORIGIN, | ||||
RELEASE, | RELEASE, | ||||
REVISION, | REVISION, | ||||
SNAPSHOT, | SNAPSHOT, | ||||
PersistentId, | PersistentId, | ||||
) | ) | ||||
from swh.web.common import service | |||||
from swh.web.common.exc import BadInputExc | from swh.web.common.exc import BadInputExc | ||||
from swh.web.common.typing import ( | from swh.web.common.typing import ( | ||||
QueryParameters, | QueryParameters, | ||||
SnapshotContext, | SnapshotContext, | ||||
SWHObjectInfo, | SWHObjectInfo, | ||||
SWHIDInfo, | SWHIDInfo, | ||||
SWHIDContext, | SWHIDContext, | ||||
) | ) | ||||
Show All 33 Lines | ) -> str: | ||||
except ValidationError as e: | except ValidationError as e: | ||||
raise BadInputExc( | raise BadInputExc( | ||||
"Invalid object (%s) for swh persistent id. %s" % (object_id, e) | "Invalid object (%s) for swh persistent id. %s" % (object_id, e) | ||||
) | ) | ||||
else: | else: | ||||
return swh_id | return swh_id | ||||
ResolvedPersistentId = TypedDict( | class ResolvedPersistentId(TypedDict): | ||||
"ResolvedPersistentId", {"swh_id_parsed": PersistentId, "browse_url": Optional[str]} | swh_id_parsed: PersistentId | ||||
ardumont: Maybe add a docstring to explain what is its intended use. | |||||
) | browse_url: Optional[str] | ||||
def resolve_swh_persistent_id( | def resolve_swh_persistent_id( | ||||
swh_id: str, query_params: Optional[QueryParameters] = None | swh_id: str, query_params: Optional[QueryParameters] = None | ||||
) -> ResolvedPersistentId: | ) -> ResolvedPersistentId: | ||||
""" | """ | ||||
Try to resolve a Software Heritage persistent id into an url for | Try to resolve a Software Heritage persistent id into an url for | ||||
browsing the targeted object. | browsing the targeted object. | ||||
Args: | Args: | ||||
swh_id: a Software Heritage persistent identifier | swh_id: a Software Heritage persistent identifier | ||||
query_params: optional dict filled with | query_params: optional dict filled with | ||||
query parameters to append to the browse url | query parameters to append to the browse url | ||||
Returns: | Returns: | ||||
a dict with the following keys: | a dict with the following keys: | ||||
* **swh_id_parsed**: the parsed identifier | * **swh_id_parsed**: the parsed identifier | ||||
* **browse_url**: the url for browsing the targeted object | * **browse_url**: the url for browsing the targeted object | ||||
""" | """ | ||||
swh_id_parsed = get_persistent_identifier(swh_id) | swh_id_parsed = get_persistent_identifier(swh_id) | ||||
object_type = swh_id_parsed.object_type | object_type = swh_id_parsed.object_type | ||||
object_id = swh_id_parsed.object_id | object_id = swh_id_parsed.object_id | ||||
browse_url = None | browse_url = None | ||||
url_args = {} | |||||
query_dict = QueryDict("", mutable=True) | query_dict = QueryDict("", mutable=True) | ||||
fragment = "" | |||||
anchor_swhid_parsed = None | |||||
if query_params and len(query_params) > 0: | if query_params and len(query_params) > 0: | ||||
for k in sorted(query_params.keys()): | for k in sorted(query_params.keys()): | ||||
query_dict[k] = query_params[k] | query_dict[k] = query_params[k] | ||||
if "origin" in swh_id_parsed.metadata: | if "origin" in swh_id_parsed.metadata: | ||||
query_dict["origin_url"] = swh_id_parsed.metadata["origin"] | query_dict["origin_url"] = swh_id_parsed.metadata["origin"] | ||||
if "anchor" in swh_id_parsed.metadata: | |||||
anchor_swhid_parsed = get_persistent_identifier( | |||||
swh_id_parsed.metadata["anchor"] | |||||
) | |||||
if "path" in swh_id_parsed.metadata and swh_id_parsed.metadata["path"] != "/": | |||||
query_dict["path"] = swh_id_parsed.metadata["path"] | |||||
if anchor_swhid_parsed: | |||||
directory = "" | |||||
if anchor_swhid_parsed.object_type == DIRECTORY: | |||||
directory = anchor_swhid_parsed.object_id | |||||
elif anchor_swhid_parsed.object_type == REVISION: | |||||
revision = service.lookup_revision(anchor_swhid_parsed.object_id) | |||||
directory = revision["directory"] | |||||
elif anchor_swhid_parsed.object_type == RELEASE: | |||||
release = service.lookup_release(anchor_swhid_parsed.object_id) | |||||
if release["target_type"] == REVISION: | |||||
revision = service.lookup_revision(release["target"]) | |||||
directory = revision["directory"] | |||||
if object_type == CONTENT: | |||||
# content objects need to have their path prefixed by root | |||||
# directory id for proper breadcrumbs display | |||||
query_dict["path"] = directory + query_dict["path"] | |||||
elif object_type == DIRECTORY: | |||||
object_id = directory | |||||
# remove leading and trailing slashes from SWHID directory path | |||||
query_dict["path"] = query_dict["path"][1:-1] | |||||
# snapshot context | |||||
if "visit" in swh_id_parsed.metadata: | |||||
snp_swhid_parsed = get_persistent_identifier(swh_id_parsed.metadata["visit"]) | |||||
if snp_swhid_parsed.object_type != SNAPSHOT: | |||||
raise BadInputExc("Visit must be a snapshot SWHID.") | |||||
query_dict["snapshot"] = snp_swhid_parsed.object_id | |||||
if anchor_swhid_parsed: | |||||
if anchor_swhid_parsed.object_type == REVISION: | |||||
# check if the anchor revision is the tip of a branch | |||||
branch_name = service.lookup_snapshot_branch_name_from_tip_revision( | |||||
snp_swhid_parsed.object_id, anchor_swhid_parsed.object_id | |||||
) | |||||
if branch_name: | |||||
query_dict["branch"] = branch_name | |||||
elif object_type != REVISION: | |||||
query_dict["revision"] = anchor_swhid_parsed.object_id | |||||
elif anchor_swhid_parsed.object_type == RELEASE: | |||||
release = service.lookup_release(anchor_swhid_parsed.object_id) | |||||
if release: | |||||
query_dict["release"] = release["name"] | |||||
if object_type == REVISION and "release" not in query_dict: | |||||
branch_name = service.lookup_snapshot_branch_name_from_tip_revision( | |||||
snp_swhid_parsed.object_id, object_id | |||||
) | |||||
if branch_name: | |||||
query_dict["branch"] = branch_name | |||||
# browsing content or directory without snapshot context | |||||
elif object_type in (CONTENT, DIRECTORY) and anchor_swhid_parsed: | |||||
if anchor_swhid_parsed.object_type == REVISION: | |||||
# anchor revision, objects are browsed from its view | |||||
object_type = REVISION | |||||
object_id = anchor_swhid_parsed.object_id | |||||
elif object_type == DIRECTORY and anchor_swhid_parsed.object_type == DIRECTORY: | |||||
# a directory is browsed from its root | |||||
object_id = anchor_swhid_parsed.object_id | |||||
if object_type == CONTENT: | if object_type == CONTENT: | ||||
query_string = "sha1_git:" + object_id | query_string = "sha1_git:" + object_id | ||||
fragment = "" | |||||
if "lines" in swh_id_parsed.metadata: | if "lines" in swh_id_parsed.metadata: | ||||
lines = swh_id_parsed.metadata["lines"].split("-") | lines = swh_id_parsed.metadata["lines"].split("-") | ||||
fragment += "#L" + lines[0] | fragment += "#L" + lines[0] | ||||
if len(lines) > 1: | if len(lines) > 1: | ||||
fragment += "-L" + lines[1] | fragment += "-L" + lines[1] | ||||
browse_url = ( | url_args["query_string"] = query_string | ||||
reverse( | |||||
"browse-content", | |||||
url_args={"query_string": query_string}, | |||||
query_params=query_dict, | |||||
) | |||||
+ fragment | |||||
) | |||||
elif object_type == DIRECTORY: | elif object_type == DIRECTORY: | ||||
browse_url = reverse( | url_args["sha1_git"] = object_id | ||||
"browse-directory", | |||||
url_args={"sha1_git": object_id}, | |||||
query_params=query_dict, | |||||
) | |||||
elif object_type == RELEASE: | elif object_type == RELEASE: | ||||
browse_url = reverse( | url_args["sha1_git"] = object_id | ||||
"browse-release", url_args={"sha1_git": object_id}, query_params=query_dict | |||||
) | |||||
elif object_type == REVISION: | elif object_type == REVISION: | ||||
browse_url = reverse( | url_args["sha1_git"] = object_id | ||||
"browse-revision", url_args={"sha1_git": object_id}, query_params=query_dict | |||||
) | |||||
elif object_type == SNAPSHOT: | elif object_type == SNAPSHOT: | ||||
browse_url = reverse( | url_args["snapshot_id"] = object_id | ||||
"browse-snapshot", | |||||
url_args={"snapshot_id": object_id}, | |||||
query_params=query_dict, | |||||
) | |||||
elif object_type == ORIGIN: | elif object_type == ORIGIN: | ||||
raise BadInputExc( | raise BadInputExc( | ||||
( | ( | ||||
"Origin PIDs (Persistent Identifiers) are not " | "Origin PIDs (Persistent Identifiers) are not " | ||||
"publicly resolvable because they are for " | "publicly resolvable because they are for " | ||||
"internal usage only" | "internal usage only" | ||||
) | ) | ||||
) | ) | ||||
return {"swh_id_parsed": swh_id_parsed, "browse_url": browse_url} | if url_args: | ||||
browse_url = ( | |||||
reverse( | |||||
f"browse-{object_type}", url_args=url_args, query_params=query_dict, | |||||
) | |||||
+ fragment | |||||
) | |||||
return ResolvedPersistentId(swh_id_parsed=swh_id_parsed, browse_url=browse_url) | |||||
def get_persistent_identifier(persistent_id: str) -> PersistentId: | def get_persistent_identifier(persistent_id: str) -> PersistentId: | ||||
"""Check if a persistent identifier is valid. | """Check if a persistent identifier is valid. | ||||
Args: | Args: | ||||
persistent_id: A string representing a Software Heritage | persistent_id: A string representing a Software Heritage | ||||
persistent identifier. | persistent identifier. | ||||
▲ Show 20 Lines • Show All 161 Lines • Show Last 20 Lines |
Maybe add a docstring to explain what is its intended use.