diff --git a/swh/web/api/views/identifiers.py b/swh/web/api/views/identifiers.py --- a/swh/web/api/views/identifiers.py +++ b/swh/web/api/views/identifiers.py @@ -3,6 +3,7 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.common import archive @@ -53,13 +54,18 @@ # object is present in the archive, NotFoundExc # will be raised otherwise swhid_parsed = swhid_resolved["swhid_parsed"] - object_type = swhid_parsed.object_type - object_id = swhid_parsed.object_id + object_type = swhid_parsed.object_type.name.lower() + object_id = hash_to_hex(swhid_parsed.object_id) archive.lookup_object(object_type, object_id) # id is well-formed and the pointed object exists - swhid_data = swhid_parsed.to_dict() - swhid_data["browse_url"] = request.build_absolute_uri(swhid_resolved["browse_url"]) - return swhid_data + return { + "namespace": swhid_parsed.namespace, + "scheme_version": swhid_parsed.scheme_version, + "object_type": object_type, + "object_id": object_id, + "metadata": swhid_parsed.qualifiers(), + "browse_url": request.build_absolute_uri(swhid_resolved["browse_url"]), + } @api_route(r"/known/", "api-1-known", methods=["POST"]) @@ -103,7 +109,9 @@ # group swhids by their type swhids_by_type = group_swhids(swhids) # search for hashes not present in the storage - missing_hashes = archive.lookup_missing_hashes(swhids_by_type) + missing_hashes = set( + map(hash_to_bytes, archive.lookup_missing_hashes(swhids_by_type)) + ) for swhid in swhids: if swhid.object_id not in missing_hashes: diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py --- a/swh/web/common/identifiers.py +++ b/swh/web/common/identifiers.py @@ -11,18 +11,16 @@ from django.http import QueryDict from swh.model.exceptions import ValidationError -from swh.model.hashutil import hash_to_bytes +from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import ( CONTENT, DIRECTORY, - ORIGIN, RELEASE, REVISION, SNAPSHOT, SWHID, ObjectType, QualifiedSWHID, - parse_swhid, ) from swh.web.common import archive from swh.web.common.exc import BadInputExc @@ -89,7 +87,7 @@ class ResolvedSWHID(TypedDict): """parsed SWHID with context""" - swhid_parsed: SWHID + swhid_parsed: QualifiedSWHID """URL to browse object according to SWHID context""" browse_url: Optional[str] @@ -119,44 +117,44 @@ url_args = {} query_dict = QueryDict("", mutable=True) fragment = "" - anchor_swhid_parsed = None - process_lines = object_type is CONTENT + process_lines = object_type == ObjectType.CONTENT if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] - if "origin" in swhid_parsed.metadata: - origin_url = unquote(swhid_parsed.metadata["origin"]) + if swhid_parsed.origin: + origin_url = unquote(swhid_parsed.origin) origin_url = archive.lookup_origin({"url": origin_url})["url"] query_dict["origin_url"] = origin_url - if "anchor" in swhid_parsed.metadata: - anchor_swhid_parsed = get_swhid(swhid_parsed.metadata["anchor"]) - - if "path" in swhid_parsed.metadata and swhid_parsed.metadata["path"] != "/": - query_dict["path"] = unquote(swhid_parsed.metadata["path"]) - if anchor_swhid_parsed: - directory = "" - if anchor_swhid_parsed.object_type == DIRECTORY: - directory = anchor_swhid_parsed.object_id - elif anchor_swhid_parsed.object_type == REVISION: - revision = archive.lookup_revision(anchor_swhid_parsed.object_id) + if swhid_parsed.path and swhid_parsed.path != b"/": + query_dict["path"] = swhid_parsed.path.decode("utf8", errors="replace") + if swhid_parsed.anchor: + directory = b"" + if swhid_parsed.anchor.object_type == ObjectType.DIRECTORY: + directory = swhid_parsed.anchor.object_id + elif swhid_parsed.anchor.object_type == ObjectType.REVISION: + revision = archive.lookup_revision( + hash_to_hex(swhid_parsed.anchor.object_id) + ) directory = revision["directory"] - elif anchor_swhid_parsed.object_type == RELEASE: - release = archive.lookup_release(anchor_swhid_parsed.object_id) + elif swhid_parsed.anchor.object_type == ObjectType.RELEASE: + release = archive.lookup_release( + hash_to_hex(swhid_parsed.anchor.object_id) + ) if release["target_type"] == REVISION: revision = archive.lookup_revision(release["target"]) directory = revision["directory"] - if object_type == CONTENT: - if "origin" not in swhid_parsed.metadata: + if object_type == ObjectType.CONTENT: + if not swhid_parsed.origin: # when no origin context, content objects need to have their # path prefixed by root directory id for proper breadcrumbs display - query_dict["path"] = directory + query_dict["path"] + query_dict["path"] = hash_to_hex(directory) + query_dict["path"] else: # remove leading slash from SWHID content path query_dict["path"] = query_dict["path"][1:] - elif object_type == DIRECTORY: + elif object_type == ObjectType.DIRECTORY: object_id = directory # remove leading and trailing slashes from SWHID directory path if query_dict["path"].endswith("/"): @@ -165,74 +163,72 @@ query_dict["path"] = query_dict["path"][1:] # snapshot context - if "visit" in swhid_parsed.metadata: - - snp_swhid_parsed = get_swhid(swhid_parsed.metadata["visit"]) - if snp_swhid_parsed.object_type != SNAPSHOT: + if swhid_parsed.visit: + if swhid_parsed.visit.object_type != ObjectType.SNAPSHOT: raise BadInputExc("Visit must be a snapshot SWHID.") - query_dict["snapshot"] = snp_swhid_parsed.object_id + query_dict["snapshot"] = hash_to_hex(swhid_parsed.visit.object_id) - if anchor_swhid_parsed: - if anchor_swhid_parsed.object_type == REVISION: + if swhid_parsed.anchor: + if swhid_parsed.anchor.object_type == ObjectType.REVISION: # check if the anchor revision is the tip of a branch branch_name = archive.lookup_snapshot_branch_name_from_tip_revision( - snp_swhid_parsed.object_id, anchor_swhid_parsed.object_id + hash_to_hex(swhid_parsed.visit.object_id), + hash_to_hex(swhid_parsed.anchor.object_id), ) if branch_name: query_dict["branch"] = branch_name - elif object_type != REVISION: - query_dict["revision"] = anchor_swhid_parsed.object_id + elif object_type != ObjectType.REVISION: + query_dict["revision"] = hash_to_hex(swhid_parsed.anchor.object_id) - elif anchor_swhid_parsed.object_type == RELEASE: - release = archive.lookup_release(anchor_swhid_parsed.object_id) + elif swhid_parsed.anchor.object_type == ObjectType.RELEASE: + release = archive.lookup_release( + hash_to_hex(swhid_parsed.anchor.object_id) + ) if release: query_dict["release"] = release["name"] - if object_type == REVISION and "release" not in query_dict: + if object_type == ObjectType.REVISION and "release" not in query_dict: branch_name = archive.lookup_snapshot_branch_name_from_tip_revision( - snp_swhid_parsed.object_id, object_id + hash_to_hex(swhid_parsed.visit.object_id), hash_to_hex(object_id) ) if branch_name: query_dict["branch"] = branch_name # browsing content or directory without snapshot context - elif object_type in (CONTENT, DIRECTORY) and anchor_swhid_parsed: - if anchor_swhid_parsed.object_type == REVISION: + elif ( + object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY) + and swhid_parsed.anchor + ): + if swhid_parsed.anchor.object_type == ObjectType.REVISION: # anchor revision, objects are browsed from its view - object_type = REVISION - object_id = anchor_swhid_parsed.object_id - elif object_type == DIRECTORY and anchor_swhid_parsed.object_type == DIRECTORY: + object_type = ObjectType.REVISION + object_id = swhid_parsed.anchor.object_id + elif ( + object_type == ObjectType.DIRECTORY + and swhid_parsed.anchor.object_type == ObjectType.DIRECTORY + ): # a directory is browsed from its root - object_id = anchor_swhid_parsed.object_id - - if object_type == CONTENT: - url_args["query_string"] = f"sha1_git:{object_id}" - elif object_type == DIRECTORY: - url_args["sha1_git"] = object_id - elif object_type == RELEASE: - url_args["sha1_git"] = object_id - elif object_type == REVISION: - url_args["sha1_git"] = object_id - elif object_type == SNAPSHOT: - url_args["snapshot_id"] = object_id - elif object_type == ORIGIN: - raise BadInputExc( - ( - "Origin SWHIDs are not publicly resolvable because they are for " - "internal usage only" - ) - ) + object_id = swhid_parsed.anchor.object_id + + if object_type == ObjectType.CONTENT: + url_args["query_string"] = f"sha1_git:{hash_to_hex(object_id)}" + elif object_type in (ObjectType.DIRECTORY, ObjectType.RELEASE, ObjectType.REVISION): + url_args["sha1_git"] = hash_to_hex(object_id) + elif object_type == ObjectType.SNAPSHOT: + url_args["snapshot_id"] = hash_to_hex(object_id) - if "lines" in swhid_parsed.metadata and process_lines: - lines = swhid_parsed.metadata["lines"].split("-") - fragment += "#L" + lines[0] - if len(lines) > 1: - fragment += "-L" + lines[1] + if swhid_parsed.lines and process_lines: + lines = swhid_parsed.lines + fragment += "#L" + str(lines[0]) + if lines[1]: + fragment += "-L" + str(lines[1]) if url_args: browse_url = ( reverse( - f"browse-{object_type}", url_args=url_args, query_params=query_dict, + f"browse-{object_type.name.lower()}", + url_args=url_args, + query_params=query_dict, ) + fragment ) @@ -240,7 +236,7 @@ return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url) -def get_swhid(swhid: str) -> SWHID: +def get_swhid(swhid: str) -> QualifiedSWHID: """Check if a SWHID is valid and return it parsed. Args: @@ -253,14 +249,14 @@ A parsed SWHID. """ try: - swhid_parsed = parse_swhid(swhid) + swhid_parsed = QualifiedSWHID.from_string(swhid) except ValidationError as ve: raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages)) else: return swhid_parsed -def group_swhids(swhids: Iterable[SWHID],) -> Dict[str, List[bytes]]: +def group_swhids(swhids: Iterable[QualifiedSWHID],) -> Dict[str, List[bytes]]: """ Groups many SoftWare Heritage persistent IDentifiers into a dictionary depending on their type. @@ -285,7 +281,7 @@ for obj_swhid in swhids: obj_id = obj_swhid.object_id obj_type = obj_swhid.object_type - swhids_by_type[obj_type].append(hash_to_bytes(obj_id)) + swhids_by_type[obj_type.name.lower()].append(hash_to_bytes(obj_id)) return swhids_by_type diff --git a/swh/web/tests/common/test_identifiers.py b/swh/web/tests/common/test_identifiers.py --- a/swh/web/tests/common/test_identifiers.py +++ b/swh/web/tests/common/test_identifiers.py @@ -16,8 +16,7 @@ RELEASE, REVISION, SNAPSHOT, - SWHID, - parse_swhid, + QualifiedSWHID, ) from swh.model.model import Origin from swh.web.browse.snapshot_context import get_snapshot_context @@ -98,11 +97,11 @@ resolved_swhid = resolve_swhid(swhid, query_params) - assert isinstance(resolved_swhid["swhid_parsed"], SWHID) + assert isinstance(resolved_swhid["swhid_parsed"], QualifiedSWHID) assert str(resolved_swhid["swhid_parsed"]) == swhid assert resolved_swhid["browse_url"] == browse_url - with pytest.raises(BadInputExc, match="Origin SWHIDs"): + with pytest.raises(BadInputExc, match="'ori' is not a valid ObjectType"): resolve_swhid(f"swh:1:ori:{random_sha1()}") @@ -118,7 +117,7 @@ swhid = gen_swhid(obj_type, obj_id) swh_parsed_swhid = get_swhid(swhid) - assert isinstance(swh_parsed_swhid, SWHID) + assert isinstance(swh_parsed_swhid, QualifiedSWHID) assert str(swh_parsed_swhid) == swhid with pytest.raises(BadInputExc, match="Error when parsing identifier"): @@ -196,7 +195,7 @@ anchor = gen_swhid(DIRECTORY, directory) - assert swhid_dir_parsed.metadata == { + assert swhid_dir_parsed.qualifiers() == { "anchor": anchor, "path": dir_subdir_path, } @@ -204,7 +203,7 @@ if dir_subdir_files: swhid_cnt_parsed = get_swhid(swhids[1]["swhid_with_context"]) - assert swhid_cnt_parsed.metadata == { + assert swhid_cnt_parsed.qualifiers() == { "anchor": anchor, "path": f'{dir_subdir_path}{dir_subdir_file["name"]}', } @@ -240,13 +239,13 @@ anchor = gen_swhid(REVISION, revision) - assert swhid_dir_parsed.metadata == { + assert swhid_dir_parsed.qualifiers() == { "anchor": anchor, } if dir_entry["type"] == "file": swhid_cnt_parsed = get_swhid(swhids[2]["swhid_with_context"]) - assert swhid_cnt_parsed.metadata == { + assert swhid_cnt_parsed.qualifiers() == { "anchor": anchor, "path": f'/{dir_entry["name"]}', } @@ -405,13 +404,13 @@ expected_rev_context["origin"] = origin["url"] expected_snp_context["origin"] = origin["url"] - assert swhid_cnt_parsed.metadata == expected_cnt_context - assert swhid_dir_parsed.metadata == expected_dir_context - assert swhid_rev_parsed.metadata == expected_rev_context - assert swhid_snp_parsed.metadata == expected_snp_context + assert swhid_cnt_parsed.qualifiers() == expected_cnt_context + assert swhid_dir_parsed.qualifiers() == expected_dir_context + assert swhid_rev_parsed.qualifiers() == expected_rev_context + assert swhid_snp_parsed.qualifiers() == expected_snp_context if "release_name" in snp_ctx_params: - assert swhid_rel_parsed.metadata == expected_rev_context + assert swhid_rel_parsed.qualifiers() == expected_rev_context @given(origin(), directory()) @@ -433,12 +432,14 @@ assert swhid_info["context"]["path"] == "/foo%3B/bar%25" # check special characters in SWHID URL have been escaped - parsed_url_swhid = parse_swhid(swhid_info["swhid_with_context_url"][1:-1]) + parsed_url_swhid = QualifiedSWHID.from_string( + swhid_info["swhid_with_context_url"][1:-1] + ) assert ( - parsed_url_swhid.metadata["origin"] + parsed_url_swhid.qualifiers()["origin"] == "http://example.org/%3Fproject%253Dabc%253Bdef%2525" ) - assert parsed_url_swhid.metadata["path"] == "/foo%253B/bar%2525" + assert parsed_url_swhid.qualifiers()["path"] == "/foo%253B/bar%2525" @given(origin_with_multiple_visits()) @@ -616,7 +617,7 @@ origin_swhid_url_escaped = quote(origin, safe="/:@;") swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}) resolved_swhid = resolve_swhid(swhid) - assert resolved_swhid["swhid_parsed"].metadata["origin"] == origin_swhid_escaped + assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped assert origin_swhid_url_escaped in resolved_swhid["browse_url"]