Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9696311
D5170.id18489.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D5170.id18489.diff
View Options
diff --git a/swh/web/api/views/identifiers.py b/swh/web/api/views/identifiers.py
--- a/swh/web/api/views/identifiers.py
+++ b/swh/web/api/views/identifiers.py
@@ -3,6 +3,7 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.web.api.apidoc import api_doc, format_docstring
from swh.web.api.apiurls import api_route
from swh.web.common import archive
@@ -53,13 +54,18 @@
# object is present in the archive, NotFoundExc
# will be raised otherwise
swhid_parsed = swhid_resolved["swhid_parsed"]
- object_type = swhid_parsed.object_type
- object_id = swhid_parsed.object_id
+ object_type = swhid_parsed.object_type.name.lower()
+ object_id = hash_to_hex(swhid_parsed.object_id)
archive.lookup_object(object_type, object_id)
# id is well-formed and the pointed object exists
- swhid_data = swhid_parsed.to_dict()
- swhid_data["browse_url"] = request.build_absolute_uri(swhid_resolved["browse_url"])
- return swhid_data
+ return {
+ "namespace": swhid_parsed.namespace,
+ "scheme_version": swhid_parsed.scheme_version,
+ "object_type": object_type,
+ "object_id": object_id,
+ "metadata": swhid_parsed.qualifiers(),
+ "browse_url": request.build_absolute_uri(swhid_resolved["browse_url"]),
+ }
@api_route(r"/known/", "api-1-known", methods=["POST"])
@@ -103,7 +109,9 @@
# group swhids by their type
swhids_by_type = group_swhids(swhids)
# search for hashes not present in the storage
- missing_hashes = archive.lookup_missing_hashes(swhids_by_type)
+ missing_hashes = set(
+ map(hash_to_bytes, archive.lookup_missing_hashes(swhids_by_type))
+ )
for swhid in swhids:
if swhid.object_id not in missing_hashes:
diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py
--- a/swh/web/common/identifiers.py
+++ b/swh/web/common/identifiers.py
@@ -11,18 +11,16 @@
from django.http import QueryDict
from swh.model.exceptions import ValidationError
-from swh.model.hashutil import hash_to_bytes
+from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.identifiers import (
CONTENT,
DIRECTORY,
- ORIGIN,
RELEASE,
REVISION,
SNAPSHOT,
SWHID,
ObjectType,
QualifiedSWHID,
- parse_swhid,
)
from swh.web.common import archive
from swh.web.common.exc import BadInputExc
@@ -89,7 +87,7 @@
class ResolvedSWHID(TypedDict):
"""parsed SWHID with context"""
- swhid_parsed: SWHID
+ swhid_parsed: QualifiedSWHID
"""URL to browse object according to SWHID context"""
browse_url: Optional[str]
@@ -119,44 +117,44 @@
url_args = {}
query_dict = QueryDict("", mutable=True)
fragment = ""
- anchor_swhid_parsed = None
- process_lines = object_type is CONTENT
+ process_lines = object_type == ObjectType.CONTENT
if query_params and len(query_params) > 0:
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
- if "origin" in swhid_parsed.metadata:
- origin_url = unquote(swhid_parsed.metadata["origin"])
+ if swhid_parsed.origin:
+ origin_url = unquote(swhid_parsed.origin)
origin_url = archive.lookup_origin({"url": origin_url})["url"]
query_dict["origin_url"] = origin_url
- if "anchor" in swhid_parsed.metadata:
- anchor_swhid_parsed = get_swhid(swhid_parsed.metadata["anchor"])
-
- if "path" in swhid_parsed.metadata and swhid_parsed.metadata["path"] != "/":
- query_dict["path"] = unquote(swhid_parsed.metadata["path"])
- if anchor_swhid_parsed:
- directory = ""
- if anchor_swhid_parsed.object_type == DIRECTORY:
- directory = anchor_swhid_parsed.object_id
- elif anchor_swhid_parsed.object_type == REVISION:
- revision = archive.lookup_revision(anchor_swhid_parsed.object_id)
+ if swhid_parsed.path and swhid_parsed.path != b"/":
+ query_dict["path"] = swhid_parsed.path.decode("utf8", errors="replace")
+ if swhid_parsed.anchor:
+ directory = b""
+ if swhid_parsed.anchor.object_type == ObjectType.DIRECTORY:
+ directory = swhid_parsed.anchor.object_id
+ elif swhid_parsed.anchor.object_type == ObjectType.REVISION:
+ revision = archive.lookup_revision(
+ hash_to_hex(swhid_parsed.anchor.object_id)
+ )
directory = revision["directory"]
- elif anchor_swhid_parsed.object_type == RELEASE:
- release = archive.lookup_release(anchor_swhid_parsed.object_id)
+ elif swhid_parsed.anchor.object_type == ObjectType.RELEASE:
+ release = archive.lookup_release(
+ hash_to_hex(swhid_parsed.anchor.object_id)
+ )
if release["target_type"] == REVISION:
revision = archive.lookup_revision(release["target"])
directory = revision["directory"]
- if object_type == CONTENT:
- if "origin" not in swhid_parsed.metadata:
+ if object_type == ObjectType.CONTENT:
+ if not swhid_parsed.origin:
# when no origin context, content objects need to have their
# path prefixed by root directory id for proper breadcrumbs display
- query_dict["path"] = directory + query_dict["path"]
+ query_dict["path"] = hash_to_hex(directory) + query_dict["path"]
else:
# remove leading slash from SWHID content path
query_dict["path"] = query_dict["path"][1:]
- elif object_type == DIRECTORY:
+ elif object_type == ObjectType.DIRECTORY:
object_id = directory
# remove leading and trailing slashes from SWHID directory path
if query_dict["path"].endswith("/"):
@@ -165,74 +163,72 @@
query_dict["path"] = query_dict["path"][1:]
# snapshot context
- if "visit" in swhid_parsed.metadata:
-
- snp_swhid_parsed = get_swhid(swhid_parsed.metadata["visit"])
- if snp_swhid_parsed.object_type != SNAPSHOT:
+ if swhid_parsed.visit:
+ if swhid_parsed.visit.object_type != ObjectType.SNAPSHOT:
raise BadInputExc("Visit must be a snapshot SWHID.")
- query_dict["snapshot"] = snp_swhid_parsed.object_id
+ query_dict["snapshot"] = hash_to_hex(swhid_parsed.visit.object_id)
- if anchor_swhid_parsed:
- if anchor_swhid_parsed.object_type == REVISION:
+ if swhid_parsed.anchor:
+ if swhid_parsed.anchor.object_type == ObjectType.REVISION:
# check if the anchor revision is the tip of a branch
branch_name = archive.lookup_snapshot_branch_name_from_tip_revision(
- snp_swhid_parsed.object_id, anchor_swhid_parsed.object_id
+ hash_to_hex(swhid_parsed.visit.object_id),
+ hash_to_hex(swhid_parsed.anchor.object_id),
)
if branch_name:
query_dict["branch"] = branch_name
- elif object_type != REVISION:
- query_dict["revision"] = anchor_swhid_parsed.object_id
+ elif object_type != ObjectType.REVISION:
+ query_dict["revision"] = hash_to_hex(swhid_parsed.anchor.object_id)
- elif anchor_swhid_parsed.object_type == RELEASE:
- release = archive.lookup_release(anchor_swhid_parsed.object_id)
+ elif swhid_parsed.anchor.object_type == ObjectType.RELEASE:
+ release = archive.lookup_release(
+ hash_to_hex(swhid_parsed.anchor.object_id)
+ )
if release:
query_dict["release"] = release["name"]
- if object_type == REVISION and "release" not in query_dict:
+ if object_type == ObjectType.REVISION and "release" not in query_dict:
branch_name = archive.lookup_snapshot_branch_name_from_tip_revision(
- snp_swhid_parsed.object_id, object_id
+ hash_to_hex(swhid_parsed.visit.object_id), hash_to_hex(object_id)
)
if branch_name:
query_dict["branch"] = branch_name
# browsing content or directory without snapshot context
- elif object_type in (CONTENT, DIRECTORY) and anchor_swhid_parsed:
- if anchor_swhid_parsed.object_type == REVISION:
+ elif (
+ object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY)
+ and swhid_parsed.anchor
+ ):
+ if swhid_parsed.anchor.object_type == ObjectType.REVISION:
# anchor revision, objects are browsed from its view
- object_type = REVISION
- object_id = anchor_swhid_parsed.object_id
- elif object_type == DIRECTORY and anchor_swhid_parsed.object_type == DIRECTORY:
+ object_type = ObjectType.REVISION
+ object_id = swhid_parsed.anchor.object_id
+ elif (
+ object_type == ObjectType.DIRECTORY
+ and swhid_parsed.anchor.object_type == ObjectType.DIRECTORY
+ ):
# a directory is browsed from its root
- object_id = anchor_swhid_parsed.object_id
-
- if object_type == CONTENT:
- url_args["query_string"] = f"sha1_git:{object_id}"
- elif object_type == DIRECTORY:
- url_args["sha1_git"] = object_id
- elif object_type == RELEASE:
- url_args["sha1_git"] = object_id
- elif object_type == REVISION:
- url_args["sha1_git"] = object_id
- elif object_type == SNAPSHOT:
- url_args["snapshot_id"] = object_id
- elif object_type == ORIGIN:
- raise BadInputExc(
- (
- "Origin SWHIDs are not publicly resolvable because they are for "
- "internal usage only"
- )
- )
+ object_id = swhid_parsed.anchor.object_id
+
+ if object_type == ObjectType.CONTENT:
+ url_args["query_string"] = f"sha1_git:{hash_to_hex(object_id)}"
+ elif object_type in (ObjectType.DIRECTORY, ObjectType.RELEASE, ObjectType.REVISION):
+ url_args["sha1_git"] = hash_to_hex(object_id)
+ elif object_type == ObjectType.SNAPSHOT:
+ url_args["snapshot_id"] = hash_to_hex(object_id)
- if "lines" in swhid_parsed.metadata and process_lines:
- lines = swhid_parsed.metadata["lines"].split("-")
- fragment += "#L" + lines[0]
- if len(lines) > 1:
- fragment += "-L" + lines[1]
+ if swhid_parsed.lines and process_lines:
+ lines = swhid_parsed.lines
+ fragment += "#L" + str(lines[0])
+ if lines[1]:
+ fragment += "-L" + str(lines[1])
if url_args:
browse_url = (
reverse(
- f"browse-{object_type}", url_args=url_args, query_params=query_dict,
+ f"browse-{object_type.name.lower()}",
+ url_args=url_args,
+ query_params=query_dict,
)
+ fragment
)
@@ -240,7 +236,7 @@
return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url)
-def get_swhid(swhid: str) -> SWHID:
+def get_swhid(swhid: str) -> QualifiedSWHID:
"""Check if a SWHID is valid and return it parsed.
Args:
@@ -253,14 +249,14 @@
A parsed SWHID.
"""
try:
- swhid_parsed = parse_swhid(swhid)
+ swhid_parsed = QualifiedSWHID.from_string(swhid)
except ValidationError as ve:
raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages))
else:
return swhid_parsed
-def group_swhids(swhids: Iterable[SWHID],) -> Dict[str, List[bytes]]:
+def group_swhids(swhids: Iterable[QualifiedSWHID],) -> Dict[str, List[bytes]]:
"""
Groups many SoftWare Heritage persistent IDentifiers into a
dictionary depending on their type.
@@ -285,7 +281,7 @@
for obj_swhid in swhids:
obj_id = obj_swhid.object_id
obj_type = obj_swhid.object_type
- swhids_by_type[obj_type].append(hash_to_bytes(obj_id))
+ swhids_by_type[obj_type.name.lower()].append(hash_to_bytes(obj_id))
return swhids_by_type
diff --git a/swh/web/tests/common/test_identifiers.py b/swh/web/tests/common/test_identifiers.py
--- a/swh/web/tests/common/test_identifiers.py
+++ b/swh/web/tests/common/test_identifiers.py
@@ -16,8 +16,7 @@
RELEASE,
REVISION,
SNAPSHOT,
- SWHID,
- parse_swhid,
+ QualifiedSWHID,
)
from swh.model.model import Origin
from swh.web.browse.snapshot_context import get_snapshot_context
@@ -98,11 +97,11 @@
resolved_swhid = resolve_swhid(swhid, query_params)
- assert isinstance(resolved_swhid["swhid_parsed"], SWHID)
+ assert isinstance(resolved_swhid["swhid_parsed"], QualifiedSWHID)
assert str(resolved_swhid["swhid_parsed"]) == swhid
assert resolved_swhid["browse_url"] == browse_url
- with pytest.raises(BadInputExc, match="Origin SWHIDs"):
+ with pytest.raises(BadInputExc, match="'ori' is not a valid ObjectType"):
resolve_swhid(f"swh:1:ori:{random_sha1()}")
@@ -118,7 +117,7 @@
swhid = gen_swhid(obj_type, obj_id)
swh_parsed_swhid = get_swhid(swhid)
- assert isinstance(swh_parsed_swhid, SWHID)
+ assert isinstance(swh_parsed_swhid, QualifiedSWHID)
assert str(swh_parsed_swhid) == swhid
with pytest.raises(BadInputExc, match="Error when parsing identifier"):
@@ -196,7 +195,7 @@
anchor = gen_swhid(DIRECTORY, directory)
- assert swhid_dir_parsed.metadata == {
+ assert swhid_dir_parsed.qualifiers() == {
"anchor": anchor,
"path": dir_subdir_path,
}
@@ -204,7 +203,7 @@
if dir_subdir_files:
swhid_cnt_parsed = get_swhid(swhids[1]["swhid_with_context"])
- assert swhid_cnt_parsed.metadata == {
+ assert swhid_cnt_parsed.qualifiers() == {
"anchor": anchor,
"path": f'{dir_subdir_path}{dir_subdir_file["name"]}',
}
@@ -240,13 +239,13 @@
anchor = gen_swhid(REVISION, revision)
- assert swhid_dir_parsed.metadata == {
+ assert swhid_dir_parsed.qualifiers() == {
"anchor": anchor,
}
if dir_entry["type"] == "file":
swhid_cnt_parsed = get_swhid(swhids[2]["swhid_with_context"])
- assert swhid_cnt_parsed.metadata == {
+ assert swhid_cnt_parsed.qualifiers() == {
"anchor": anchor,
"path": f'/{dir_entry["name"]}',
}
@@ -405,13 +404,13 @@
expected_rev_context["origin"] = origin["url"]
expected_snp_context["origin"] = origin["url"]
- assert swhid_cnt_parsed.metadata == expected_cnt_context
- assert swhid_dir_parsed.metadata == expected_dir_context
- assert swhid_rev_parsed.metadata == expected_rev_context
- assert swhid_snp_parsed.metadata == expected_snp_context
+ assert swhid_cnt_parsed.qualifiers() == expected_cnt_context
+ assert swhid_dir_parsed.qualifiers() == expected_dir_context
+ assert swhid_rev_parsed.qualifiers() == expected_rev_context
+ assert swhid_snp_parsed.qualifiers() == expected_snp_context
if "release_name" in snp_ctx_params:
- assert swhid_rel_parsed.metadata == expected_rev_context
+ assert swhid_rel_parsed.qualifiers() == expected_rev_context
@given(origin(), directory())
@@ -433,12 +432,14 @@
assert swhid_info["context"]["path"] == "/foo%3B/bar%25"
# check special characters in SWHID URL have been escaped
- parsed_url_swhid = parse_swhid(swhid_info["swhid_with_context_url"][1:-1])
+ parsed_url_swhid = QualifiedSWHID.from_string(
+ swhid_info["swhid_with_context_url"][1:-1]
+ )
assert (
- parsed_url_swhid.metadata["origin"]
+ parsed_url_swhid.qualifiers()["origin"]
== "http://example.org/%3Fproject%253Dabc%253Bdef%2525"
)
- assert parsed_url_swhid.metadata["path"] == "/foo%253B/bar%2525"
+ assert parsed_url_swhid.qualifiers()["path"] == "/foo%253B/bar%2525"
@given(origin_with_multiple_visits())
@@ -616,7 +617,7 @@
origin_swhid_url_escaped = quote(origin, safe="/:@;")
swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped})
resolved_swhid = resolve_swhid(swhid)
- assert resolved_swhid["swhid_parsed"].metadata["origin"] == origin_swhid_escaped
+ assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped
assert origin_swhid_url_escaped in resolved_swhid["browse_url"]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 17, 7:49 PM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227023
Attached To
D5170: identifiers.get_swhid: return QualifiedSWHID instead of the deprecated SWHID class
Event Timeline
Log In to Comment