Page MenuHomeSoftware Heritage

identifiers.py
No OneTemporary

identifiers.py

# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Iterable, List, Optional
from urllib.parse import quote, unquote
from typing_extensions import TypedDict
from django.http import QueryDict
from swh.model.exceptions import ValidationError
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.swhids import ObjectType, QualifiedSWHID
from swh.web.common import archive
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import (
QueryParameters,
SnapshotContext,
SWHIDContext,
SWHIDInfo,
SWHObjectInfo,
)
from swh.web.common.utils import reverse
def parse_object_type(object_type: str) -> ObjectType:
try:
return ObjectType[object_type.upper()]
except KeyError:
valid_types = ", ".join(variant.name.lower() for variant in ObjectType)
raise BadInputExc(
f"Invalid swh object type! Valid types are {valid_types}; not {object_type}"
)
def gen_swhid(
object_type: ObjectType,
object_id: str,
scheme_version: int = 1,
metadata: SWHIDContext = {},
) -> str:
"""
Returns the SoftWare Heritage persistent IDentifier for a swh object based on:
* the object type
* the object id
* the SWHID scheme version
Args:
object_type: the swh object type
(content/directory/release/revision/snapshot)
object_id: the swh object id (hexadecimal representation
of its hash value)
scheme_version: the scheme version of the SWHIDs
Returns:
the SWHID of the object
Raises:
BadInputExc: if the provided parameters do not enable to
generate a valid identifier
"""
try:
decoded_object_id = hash_to_bytes(object_id)
obj_swhid = str(
QualifiedSWHID(
object_type=object_type,
object_id=decoded_object_id,
scheme_version=scheme_version,
**metadata,
)
)
except (ValidationError, KeyError, ValueError) as e:
raise BadInputExc("Invalid object (%s) for SWHID. %s" % (object_id, e))
else:
return obj_swhid
class ResolvedSWHID(TypedDict):
"""parsed SWHID with context"""
swhid_parsed: QualifiedSWHID
"""URL to browse object according to SWHID context"""
browse_url: Optional[str]
def resolve_swhid(
swhid: str, query_params: Optional[QueryParameters] = None
) -> ResolvedSWHID:
"""
Try to resolve a SoftWare Heritage persistent IDentifier into an url for
browsing the targeted object.
Args:
swhid: a SoftWare Heritage persistent IDentifier
query_params: optional dict filled with
query parameters to append to the browse url
Returns:
a dict with the following keys:
* **swhid_parsed**: the parsed identifier
* **browse_url**: the url for browsing the targeted object
"""
swhid_parsed = get_swhid(swhid)
object_type = swhid_parsed.object_type
object_id = swhid_parsed.object_id
browse_url = None
url_args = {}
query_dict = QueryDict("", mutable=True)
fragment = ""
process_lines = object_type == ObjectType.CONTENT
if query_params and len(query_params) > 0:
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
if swhid_parsed.origin:
origin_url = unquote(swhid_parsed.origin)
origin_url = archive.lookup_origin({"url": origin_url})["url"]
query_dict["origin_url"] = origin_url
if swhid_parsed.path and swhid_parsed.path != b"/":
query_dict["path"] = swhid_parsed.path.decode("utf8", errors="replace")
if swhid_parsed.anchor:
directory = b""
if swhid_parsed.anchor.object_type == ObjectType.DIRECTORY:
directory = swhid_parsed.anchor.object_id
elif swhid_parsed.anchor.object_type == ObjectType.REVISION:
revision = archive.lookup_revision(
hash_to_hex(swhid_parsed.anchor.object_id)
)
directory = revision["directory"]
elif swhid_parsed.anchor.object_type == ObjectType.RELEASE:
release = archive.lookup_release(
hash_to_hex(swhid_parsed.anchor.object_id)
)
if release["target_type"] == ObjectType.REVISION.name.lower():
revision = archive.lookup_revision(release["target"])
directory = revision["directory"]
if object_type == ObjectType.CONTENT:
if (
not swhid_parsed.origin
and swhid_parsed.anchor.object_type != ObjectType.REVISION
):
# when no origin or revision context, content objects need to have
# their path prefixed by root directory id for breadcrumbs display
query_dict["path"] = hash_to_hex(directory) + query_dict["path"]
else:
# remove leading slash from SWHID content path
query_dict["path"] = query_dict["path"][1:]
elif object_type == ObjectType.DIRECTORY:
object_id = directory
# remove leading and trailing slashes from SWHID directory path
if query_dict["path"].endswith("/"):
query_dict["path"] = query_dict["path"][1:-1]
else:
query_dict["path"] = query_dict["path"][1:]
# snapshot context
if swhid_parsed.visit:
if swhid_parsed.visit.object_type != ObjectType.SNAPSHOT:
raise BadInputExc("Visit must be a snapshot SWHID.")
query_dict["snapshot"] = hash_to_hex(swhid_parsed.visit.object_id)
if swhid_parsed.anchor:
if (
swhid_parsed.anchor.object_type == ObjectType.REVISION
and object_type != ObjectType.REVISION
):
query_dict["revision"] = hash_to_hex(swhid_parsed.anchor.object_id)
elif swhid_parsed.anchor.object_type == ObjectType.RELEASE:
release = archive.lookup_release(
hash_to_hex(swhid_parsed.anchor.object_id)
)
if release:
query_dict["release"] = release["name"]
# browsing content or directory without snapshot context
elif (
object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY)
and swhid_parsed.anchor
):
if swhid_parsed.anchor.object_type == ObjectType.REVISION:
# anchor revision, objects are browsed from its view
object_type = ObjectType.REVISION
object_id = swhid_parsed.anchor.object_id
elif (
object_type == ObjectType.DIRECTORY
and swhid_parsed.anchor.object_type == ObjectType.DIRECTORY
):
# a directory is browsed from its root
object_id = swhid_parsed.anchor.object_id
if object_type == ObjectType.CONTENT:
url_args["query_string"] = f"sha1_git:{hash_to_hex(object_id)}"
elif object_type in (ObjectType.DIRECTORY, ObjectType.RELEASE, ObjectType.REVISION):
url_args["sha1_git"] = hash_to_hex(object_id)
elif object_type == ObjectType.SNAPSHOT:
url_args["snapshot_id"] = hash_to_hex(object_id)
if swhid_parsed.lines and process_lines:
lines = swhid_parsed.lines
fragment += "#L" + str(lines[0])
if lines[1]:
fragment += "-L" + str(lines[1])
if url_args:
browse_url = (
reverse(
f"browse-{object_type.name.lower()}",
url_args=url_args,
query_params=query_dict,
)
+ fragment
)
return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url)
def get_swhid(swhid: str) -> QualifiedSWHID:
"""Check if a SWHID is valid and return it parsed.
Args:
swhid: a SoftWare Heritage persistent IDentifier.
Raises:
BadInputExc: if the provided SWHID can not be parsed.
Return:
A parsed SWHID.
"""
try:
# ensure core part of SWHID is in lower case to avoid parsing error
(core, sep, qualifiers) = swhid.partition(";")
core = core.lower()
return QualifiedSWHID.from_string(core + sep + qualifiers)
except ValidationError as ve:
raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages))
def group_swhids(
swhids: Iterable[QualifiedSWHID],
) -> Dict[ObjectType, List[bytes]]:
"""
Groups many SoftWare Heritage persistent IDentifiers into a
dictionary depending on their type.
Args:
swhids: an iterable of SoftWare Heritage persistent
IDentifier objects
Returns:
A dictionary with:
keys: object types
values: object hashes
"""
swhids_by_type: Dict[ObjectType, List[bytes]] = {
ObjectType.CONTENT: [],
ObjectType.DIRECTORY: [],
ObjectType.REVISION: [],
ObjectType.RELEASE: [],
ObjectType.SNAPSHOT: [],
}
for obj_swhid in swhids:
obj_id = obj_swhid.object_id
obj_type = obj_swhid.object_type
swhids_by_type[obj_type].append(hash_to_bytes(obj_id))
return swhids_by_type
def get_swhids_info(
swh_objects: Iterable[SWHObjectInfo],
snapshot_context: Optional[SnapshotContext] = None,
extra_context: Optional[Dict[str, Any]] = None,
) -> List[SWHIDInfo]:
"""
Returns a list of dict containing info related to SWHIDs of objects.
Args:
swh_objects: an iterable of dict describing archived objects
snapshot_context: optional dict parameter describing the snapshot in
which the objects have been found
extra_context: optional dict filled with extra contextual info about
the objects
Returns:
a list of dict containing SWHIDs info
"""
swhids_info = []
for swh_object in swh_objects:
if not swh_object["object_id"]:
swhids_info.append(
SWHIDInfo(
object_type=swh_object["object_type"],
object_id="",
swhid="",
swhid_url="",
context={},
swhid_with_context=None,
swhid_with_context_url=None,
)
)
continue
object_type = swh_object["object_type"]
object_id = swh_object["object_id"]
swhid_context: SWHIDContext = {}
if snapshot_context:
if snapshot_context["origin_info"] is not None:
swhid_context["origin"] = quote(
snapshot_context["origin_info"]["url"], safe="/?:@&"
)
if object_type != ObjectType.SNAPSHOT:
swhid_context["visit"] = gen_swhid(
ObjectType.SNAPSHOT, snapshot_context["snapshot_id"]
)
if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY):
if snapshot_context["release_id"] is not None:
swhid_context["anchor"] = gen_swhid(
ObjectType.RELEASE, snapshot_context["release_id"]
)
elif snapshot_context["revision_id"] is not None:
swhid_context["anchor"] = gen_swhid(
ObjectType.REVISION, snapshot_context["revision_id"]
)
if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY):
if (
extra_context
and "revision" in extra_context
and extra_context["revision"]
and "anchor" not in swhid_context
):
swhid_context["anchor"] = gen_swhid(
ObjectType.REVISION, extra_context["revision"]
)
elif (
extra_context
and "root_directory" in extra_context
and extra_context["root_directory"]
and "anchor" not in swhid_context
and (
object_type != ObjectType.DIRECTORY
or extra_context["root_directory"] != object_id
)
):
swhid_context["anchor"] = gen_swhid(
ObjectType.DIRECTORY, extra_context["root_directory"]
)
path = None
if extra_context and "path" in extra_context:
path = extra_context["path"] or "/"
if "filename" in extra_context and object_type == ObjectType.CONTENT:
path += extra_context["filename"]
if object_type == ObjectType.DIRECTORY and path == "/":
path = None
if path:
swhid_context["path"] = quote(path, safe="/?:@&")
swhid = gen_swhid(object_type, object_id)
swhid_url = reverse("browse-swhid", url_args={"swhid": swhid})
swhid_with_context = None
swhid_with_context_url = None
if swhid_context:
swhid_with_context = gen_swhid(
object_type, object_id, metadata=swhid_context
)
swhid_with_context_url = reverse(
"browse-swhid", url_args={"swhid": swhid_with_context}
)
swhids_info.append(
SWHIDInfo(
object_type=object_type,
object_id=object_id,
swhid=swhid,
swhid_url=swhid_url,
context=swhid_context,
swhid_with_context=swhid_with_context,
swhid_with_context_url=swhid_with_context_url,
)
)
return swhids_info

File Metadata

Mime Type
text/x-python
Expires
Thu, Jul 3, 11:44 AM (5 d, 5 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3364107

Event Timeline