Page MenuHomeSoftware Heritage

identifiers.py
No OneTemporary

identifiers.py

# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from urllib.parse import quote
from typing import cast, Any, Dict, Iterable, List, Optional
from typing_extensions import TypedDict
from django.http import QueryDict
from swh.model.exceptions import ValidationError
from swh.model.hashutil import hash_to_bytes
from swh.model.identifiers import (
swhid,
parse_swhid,
CONTENT,
DIRECTORY,
ORIGIN,
RELEASE,
REVISION,
SNAPSHOT,
SWHID,
)
from swh.web.common import service
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import (
QueryParameters,
SnapshotContext,
SWHObjectInfo,
SWHIDInfo,
SWHIDContext,
)
from swh.web.common.utils import reverse
def gen_swhid(
object_type: str,
object_id: str,
scheme_version: int = 1,
metadata: SWHIDContext = {},
) -> str:
"""
Returns the SoftWare Heritage persistent IDentifier for a swh object based on:
* the object type
* the object id
* the SWHID scheme version
Args:
object_type: the swh object type
(content/directory/release/revision/snapshot)
object_id: the swh object id (hexadecimal representation
of its hash value)
scheme_version: the scheme version of the SWHIDs
Returns:
the SWHID of the object
Raises:
BadInputExc: if the provided parameters do not enable to
generate a valid identifier
"""
try:
obj_swhid = swhid(
object_type, object_id, scheme_version, cast(Dict[str, Any], metadata)
)
except ValidationError as e:
raise BadInputExc("Invalid object (%s) for SWHID. %s" % (object_id, e))
else:
return obj_swhid
class ResolvedSWHID(TypedDict):
"""parsed SWHID with context"""
swhid_parsed: SWHID
"""URL to browse object according to SWHID context"""
browse_url: Optional[str]
def resolve_swhid(
swhid: str, query_params: Optional[QueryParameters] = None
) -> ResolvedSWHID:
"""
Try to resolve a SoftWare Heritage persistent IDentifier into an url for
browsing the targeted object.
Args:
swhid: a SoftWare Heritage persistent IDentifier
query_params: optional dict filled with
query parameters to append to the browse url
Returns:
a dict with the following keys:
* **swhid_parsed**: the parsed identifier
* **browse_url**: the url for browsing the targeted object
"""
swhid_parsed = get_swhid(swhid)
object_type = swhid_parsed.object_type
object_id = swhid_parsed.object_id
browse_url = None
url_args = {}
query_dict = QueryDict("", mutable=True)
fragment = ""
anchor_swhid_parsed = None
process_lines = object_type is CONTENT
if query_params and len(query_params) > 0:
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
if "origin" in swhid_parsed.metadata:
query_dict["origin_url"] = swhid_parsed.metadata["origin"]
if "anchor" in swhid_parsed.metadata:
anchor_swhid_parsed = get_swhid(swhid_parsed.metadata["anchor"])
if "path" in swhid_parsed.metadata and swhid_parsed.metadata["path"] != "/":
query_dict["path"] = swhid_parsed.metadata["path"]
if anchor_swhid_parsed:
directory = ""
if anchor_swhid_parsed.object_type == DIRECTORY:
directory = anchor_swhid_parsed.object_id
elif anchor_swhid_parsed.object_type == REVISION:
revision = service.lookup_revision(anchor_swhid_parsed.object_id)
directory = revision["directory"]
elif anchor_swhid_parsed.object_type == RELEASE:
release = service.lookup_release(anchor_swhid_parsed.object_id)
if release["target_type"] == REVISION:
revision = service.lookup_revision(release["target"])
directory = revision["directory"]
if object_type == CONTENT:
if "origin" not in swhid_parsed.metadata:
# when no origin context, content objects need to have their
# path prefixed by root directory id for proper breadcrumbs display
query_dict["path"] = directory + query_dict["path"]
else:
# remove leading slash from SWHID content path
query_dict["path"] = query_dict["path"][1:]
elif object_type == DIRECTORY:
object_id = directory
# remove leading and trailing slashes from SWHID directory path
query_dict["path"] = query_dict["path"][1:-1]
# snapshot context
if "visit" in swhid_parsed.metadata:
snp_swhid_parsed = get_swhid(swhid_parsed.metadata["visit"])
if snp_swhid_parsed.object_type != SNAPSHOT:
raise BadInputExc("Visit must be a snapshot SWHID.")
query_dict["snapshot"] = snp_swhid_parsed.object_id
if anchor_swhid_parsed:
if anchor_swhid_parsed.object_type == REVISION:
# check if the anchor revision is the tip of a branch
branch_name = service.lookup_snapshot_branch_name_from_tip_revision(
snp_swhid_parsed.object_id, anchor_swhid_parsed.object_id
)
if branch_name:
query_dict["branch"] = branch_name
elif object_type != REVISION:
query_dict["revision"] = anchor_swhid_parsed.object_id
elif anchor_swhid_parsed.object_type == RELEASE:
release = service.lookup_release(anchor_swhid_parsed.object_id)
if release:
query_dict["release"] = release["name"]
if object_type == REVISION and "release" not in query_dict:
branch_name = service.lookup_snapshot_branch_name_from_tip_revision(
snp_swhid_parsed.object_id, object_id
)
if branch_name:
query_dict["branch"] = branch_name
# browsing content or directory without snapshot context
elif object_type in (CONTENT, DIRECTORY) and anchor_swhid_parsed:
if anchor_swhid_parsed.object_type == REVISION:
# anchor revision, objects are browsed from its view
object_type = REVISION
object_id = anchor_swhid_parsed.object_id
elif object_type == DIRECTORY and anchor_swhid_parsed.object_type == DIRECTORY:
# a directory is browsed from its root
object_id = anchor_swhid_parsed.object_id
if object_type == CONTENT:
url_args["query_string"] = f"sha1_git:{object_id}"
elif object_type == DIRECTORY:
url_args["sha1_git"] = object_id
elif object_type == RELEASE:
url_args["sha1_git"] = object_id
elif object_type == REVISION:
url_args["sha1_git"] = object_id
elif object_type == SNAPSHOT:
url_args["snapshot_id"] = object_id
elif object_type == ORIGIN:
raise BadInputExc(
(
"Origin SWHIDs are not publicly resolvable because they are for "
"internal usage only"
)
)
if "lines" in swhid_parsed.metadata and process_lines:
lines = swhid_parsed.metadata["lines"].split("-")
fragment += "#L" + lines[0]
if len(lines) > 1:
fragment += "-L" + lines[1]
if url_args:
browse_url = (
reverse(
f"browse-{object_type}", url_args=url_args, query_params=query_dict,
)
+ fragment
)
return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url)
def get_swhid(swhid: str) -> SWHID:
"""Check if a SWHID is valid and return it parsed.
Args:
swhid: a SoftWare Heritage persistent IDentifier.
Raises:
BadInputExc: if the provided SWHID can not be parsed.
Return:
A parsed SWHID.
"""
try:
swhid_parsed = parse_swhid(swhid)
except ValidationError as ve:
raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages))
else:
return swhid_parsed
def group_swhids(swhids: Iterable[SWHID],) -> Dict[str, List[bytes]]:
"""
Groups many SoftWare Heritage persistent IDentifiers into a
dictionary depending on their type.
Args:
swhids: an iterable of SoftWare Heritage persistent
IDentifier objects
Returns:
A dictionary with:
keys: object types
values: object hashes
"""
swhids_by_type: Dict[str, List[bytes]] = {
CONTENT: [],
DIRECTORY: [],
REVISION: [],
RELEASE: [],
SNAPSHOT: [],
}
for obj_swhid in swhids:
obj_id = obj_swhid.object_id
obj_type = obj_swhid.object_type
swhids_by_type[obj_type].append(hash_to_bytes(obj_id))
return swhids_by_type
def get_swhids_info(
swh_objects: Iterable[SWHObjectInfo],
snapshot_context: Optional[SnapshotContext] = None,
extra_context: Optional[Dict[str, Any]] = None,
) -> List[SWHIDInfo]:
"""
Returns a list of dict containing info related to SWHIDs of objects.
Args:
swh_objects: an iterable of dict describing archived objects
snapshot_context: optional dict parameter describing the snapshot in
which the objects have been found
extra_context: optional dict filled with extra contextual info about
the objects
Returns:
a list of dict containing SWHIDs info
"""
swhids_info = []
for swh_object in swh_objects:
if not swh_object["object_id"]:
swhids_info.append(
SWHIDInfo(
object_type=swh_object["object_type"],
object_id="",
swhid="",
swhid_url="",
context={},
swhid_with_context=None,
swhid_with_context_url=None,
)
)
continue
object_type = swh_object["object_type"]
object_id = swh_object["object_id"]
swhid_context: SWHIDContext = {}
if snapshot_context:
if snapshot_context["origin_info"] is not None:
swhid_context["origin"] = quote(
snapshot_context["origin_info"]["url"], safe="/?:@&"
)
if object_type != SNAPSHOT:
swhid_context["visit"] = gen_swhid(
SNAPSHOT, snapshot_context["snapshot_id"]
)
if object_type in (CONTENT, DIRECTORY):
if snapshot_context["release_id"] is not None:
swhid_context["anchor"] = gen_swhid(
RELEASE, snapshot_context["release_id"]
)
elif snapshot_context["revision_id"] is not None:
swhid_context["anchor"] = gen_swhid(
REVISION, snapshot_context["revision_id"]
)
if object_type in (CONTENT, DIRECTORY):
if (
extra_context
and "revision" in extra_context
and extra_context["revision"]
and "anchor" not in swhid_context
):
swhid_context["anchor"] = gen_swhid(REVISION, extra_context["revision"])
elif (
extra_context
and "root_directory" in extra_context
and extra_context["root_directory"]
and "anchor" not in swhid_context
and (
object_type != DIRECTORY
or extra_context["root_directory"] != object_id
)
):
swhid_context["anchor"] = gen_swhid(
DIRECTORY, extra_context["root_directory"]
)
path = None
if extra_context and "path" in extra_context:
path = extra_context["path"] or "/"
if "filename" in extra_context and object_type == CONTENT:
path += extra_context["filename"]
if path:
swhid_context["path"] = quote(path, safe="/?:@&")
swhid = gen_swhid(object_type, object_id)
swhid_url = reverse("browse-swhid", url_args={"swhid": swhid})
swhid_with_context = None
swhid_with_context_url = None
if swhid_context:
swhid_with_context = gen_swhid(
object_type, object_id, metadata=swhid_context
)
swhid_with_context_url = reverse(
"browse-swhid", url_args={"swhid": swhid_with_context}
)
swhids_info.append(
SWHIDInfo(
object_type=object_type,
object_id=object_id,
swhid=swhid,
swhid_url=swhid_url,
context=swhid_context,
swhid_with_context=swhid_with_context,
swhid_with_context_url=swhid_with_context_url,
)
)
return swhids_info

File Metadata

Mime Type
text/x-python
Expires
Jul 4 2025, 6:34 PM (5 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3393258

Event Timeline