diff --git a/swh/web/common/identifiers.py b/swh/web/common/identifiers.py index 97f3643c..552e683d 100644 --- a/swh/web/common/identifiers.py +++ b/swh/web/common/identifiers.py @@ -1,381 +1,384 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List, Optional from urllib.parse import quote, unquote from typing_extensions import TypedDict from django.http import QueryDict from swh.model.exceptions import ValidationError from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, ObjectType, QualifiedSWHID, ) from swh.web.common import archive from swh.web.common.exc import BadInputExc from swh.web.common.typing import ( QueryParameters, SnapshotContext, SWHIDContext, SWHIDInfo, SWHObjectInfo, ) from swh.web.common.utils import reverse def gen_swhid( object_type: str, object_id: str, scheme_version: int = 1, metadata: SWHIDContext = {}, ) -> str: """ Returns the SoftWare Heritage persistent IDentifier for a swh object based on: * the object type * the object id * the SWHID scheme version Args: object_type: the swh object type (content/directory/release/revision/snapshot) object_id: the swh object id (hexadecimal representation of its hash value) scheme_version: the scheme version of the SWHIDs Returns: the SWHID of the object Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: decoded_object_type = ObjectType[object_type.upper()] decoded_object_id = hash_to_bytes(object_id) obj_swhid = str( QualifiedSWHID( object_type=decoded_object_type, object_id=decoded_object_id, scheme_version=scheme_version, **metadata, ) ) except (ValidationError, KeyError, ValueError) as e: raise BadInputExc("Invalid object (%s) for SWHID. %s" % (object_id, e)) else: return obj_swhid class ResolvedSWHID(TypedDict): """parsed SWHID with context""" swhid_parsed: QualifiedSWHID """URL to browse object according to SWHID context""" browse_url: Optional[str] def resolve_swhid( swhid: str, query_params: Optional[QueryParameters] = None ) -> ResolvedSWHID: """ Try to resolve a SoftWare Heritage persistent IDentifier into an url for browsing the targeted object. Args: swhid: a SoftWare Heritage persistent IDentifier query_params: optional dict filled with query parameters to append to the browse url Returns: a dict with the following keys: * **swhid_parsed**: the parsed identifier * **browse_url**: the url for browsing the targeted object """ swhid_parsed = get_swhid(swhid) object_type = swhid_parsed.object_type object_id = swhid_parsed.object_id browse_url = None url_args = {} query_dict = QueryDict("", mutable=True) fragment = "" process_lines = object_type == ObjectType.CONTENT if query_params and len(query_params) > 0: for k in sorted(query_params.keys()): query_dict[k] = query_params[k] if swhid_parsed.origin: origin_url = unquote(swhid_parsed.origin) origin_url = archive.lookup_origin({"url": origin_url})["url"] query_dict["origin_url"] = origin_url if swhid_parsed.path and swhid_parsed.path != b"/": query_dict["path"] = swhid_parsed.path.decode("utf8", errors="replace") if swhid_parsed.anchor: directory = b"" if swhid_parsed.anchor.object_type == ObjectType.DIRECTORY: directory = swhid_parsed.anchor.object_id elif swhid_parsed.anchor.object_type == ObjectType.REVISION: revision = archive.lookup_revision( hash_to_hex(swhid_parsed.anchor.object_id) ) directory = revision["directory"] elif swhid_parsed.anchor.object_type == ObjectType.RELEASE: release = archive.lookup_release( hash_to_hex(swhid_parsed.anchor.object_id) ) if release["target_type"] == REVISION: revision = archive.lookup_revision(release["target"]) directory = revision["directory"] if object_type == ObjectType.CONTENT: - if not swhid_parsed.origin: - # when no origin context, content objects need to have their - # path prefixed by root directory id for proper breadcrumbs display + if ( + not swhid_parsed.origin + and swhid_parsed.anchor.object_type != ObjectType.REVISION + ): + # when no origin or revision context, content objects need to have + # their path prefixed by root directory id for breadcrumbs display query_dict["path"] = hash_to_hex(directory) + query_dict["path"] else: # remove leading slash from SWHID content path query_dict["path"] = query_dict["path"][1:] elif object_type == ObjectType.DIRECTORY: object_id = directory # remove leading and trailing slashes from SWHID directory path if query_dict["path"].endswith("/"): query_dict["path"] = query_dict["path"][1:-1] else: query_dict["path"] = query_dict["path"][1:] # snapshot context if swhid_parsed.visit: if swhid_parsed.visit.object_type != ObjectType.SNAPSHOT: raise BadInputExc("Visit must be a snapshot SWHID.") query_dict["snapshot"] = hash_to_hex(swhid_parsed.visit.object_id) if swhid_parsed.anchor: if ( swhid_parsed.anchor.object_type == ObjectType.REVISION and object_type != ObjectType.REVISION ): query_dict["revision"] = hash_to_hex(swhid_parsed.anchor.object_id) elif swhid_parsed.anchor.object_type == ObjectType.RELEASE: release = archive.lookup_release( hash_to_hex(swhid_parsed.anchor.object_id) ) if release: query_dict["release"] = release["name"] # browsing content or directory without snapshot context elif ( object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY) and swhid_parsed.anchor ): if swhid_parsed.anchor.object_type == ObjectType.REVISION: # anchor revision, objects are browsed from its view object_type = ObjectType.REVISION object_id = swhid_parsed.anchor.object_id elif ( object_type == ObjectType.DIRECTORY and swhid_parsed.anchor.object_type == ObjectType.DIRECTORY ): # a directory is browsed from its root object_id = swhid_parsed.anchor.object_id if object_type == ObjectType.CONTENT: url_args["query_string"] = f"sha1_git:{hash_to_hex(object_id)}" elif object_type in (ObjectType.DIRECTORY, ObjectType.RELEASE, ObjectType.REVISION): url_args["sha1_git"] = hash_to_hex(object_id) elif object_type == ObjectType.SNAPSHOT: url_args["snapshot_id"] = hash_to_hex(object_id) if swhid_parsed.lines and process_lines: lines = swhid_parsed.lines fragment += "#L" + str(lines[0]) if lines[1]: fragment += "-L" + str(lines[1]) if url_args: browse_url = ( reverse( f"browse-{object_type.name.lower()}", url_args=url_args, query_params=query_dict, ) + fragment ) return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url) def get_swhid(swhid: str) -> QualifiedSWHID: """Check if a SWHID is valid and return it parsed. Args: swhid: a SoftWare Heritage persistent IDentifier. Raises: BadInputExc: if the provided SWHID can not be parsed. Return: A parsed SWHID. """ try: swhid_parsed = QualifiedSWHID.from_string(swhid) except ValidationError as ve: raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages)) else: return swhid_parsed def group_swhids(swhids: Iterable[QualifiedSWHID],) -> Dict[str, List[bytes]]: """ Groups many SoftWare Heritage persistent IDentifiers into a dictionary depending on their type. Args: swhids: an iterable of SoftWare Heritage persistent IDentifier objects Returns: A dictionary with: keys: object types values: object hashes """ swhids_by_type: Dict[str, List[bytes]] = { CONTENT: [], DIRECTORY: [], REVISION: [], RELEASE: [], SNAPSHOT: [], } for obj_swhid in swhids: obj_id = obj_swhid.object_id obj_type = obj_swhid.object_type swhids_by_type[obj_type.name.lower()].append(hash_to_bytes(obj_id)) return swhids_by_type def get_swhids_info( swh_objects: Iterable[SWHObjectInfo], snapshot_context: Optional[SnapshotContext] = None, extra_context: Optional[Dict[str, Any]] = None, ) -> List[SWHIDInfo]: """ Returns a list of dict containing info related to SWHIDs of objects. Args: swh_objects: an iterable of dict describing archived objects snapshot_context: optional dict parameter describing the snapshot in which the objects have been found extra_context: optional dict filled with extra contextual info about the objects Returns: a list of dict containing SWHIDs info """ swhids_info = [] for swh_object in swh_objects: if not swh_object["object_id"]: swhids_info.append( SWHIDInfo( object_type=swh_object["object_type"], object_id="", swhid="", swhid_url="", context={}, swhid_with_context=None, swhid_with_context_url=None, ) ) continue object_type = swh_object["object_type"] object_id = swh_object["object_id"] swhid_context: SWHIDContext = {} if snapshot_context: if snapshot_context["origin_info"] is not None: swhid_context["origin"] = quote( snapshot_context["origin_info"]["url"], safe="/?:@&" ) if object_type != SNAPSHOT: swhid_context["visit"] = gen_swhid( SNAPSHOT, snapshot_context["snapshot_id"] ) if object_type in (CONTENT, DIRECTORY): if snapshot_context["release_id"] is not None: swhid_context["anchor"] = gen_swhid( RELEASE, snapshot_context["release_id"] ) elif snapshot_context["revision_id"] is not None: swhid_context["anchor"] = gen_swhid( REVISION, snapshot_context["revision_id"] ) if object_type in (CONTENT, DIRECTORY): if ( extra_context and "revision" in extra_context and extra_context["revision"] and "anchor" not in swhid_context ): swhid_context["anchor"] = gen_swhid(REVISION, extra_context["revision"]) elif ( extra_context and "root_directory" in extra_context and extra_context["root_directory"] and "anchor" not in swhid_context and ( object_type != DIRECTORY or extra_context["root_directory"] != object_id ) ): swhid_context["anchor"] = gen_swhid( DIRECTORY, extra_context["root_directory"] ) path = None if extra_context and "path" in extra_context: path = extra_context["path"] or "/" if "filename" in extra_context and object_type == CONTENT: path += extra_context["filename"] if object_type == DIRECTORY and path == "/": path = None if path: swhid_context["path"] = quote(path, safe="/?:@&") swhid = gen_swhid(object_type, object_id) swhid_url = reverse("browse-swhid", url_args={"swhid": swhid}) swhid_with_context = None swhid_with_context_url = None if swhid_context: swhid_with_context = gen_swhid( object_type, object_id, metadata=swhid_context ) swhid_with_context_url = reverse( "browse-swhid", url_args={"swhid": swhid_with_context} ) swhids_info.append( SWHIDInfo( object_type=object_type, object_id=object_id, swhid=swhid, swhid_url=swhid_url, context=swhid_context, swhid_with_context=swhid_with_context, swhid_with_context_url=swhid_with_context_url, ) ) return swhids_info diff --git a/swh/web/tests/common/test_identifiers.py b/swh/web/tests/common/test_identifiers.py index 3a94259d..1ea3dafb 100644 --- a/swh/web/tests/common/test_identifiers.py +++ b/swh/web/tests/common/test_identifiers.py @@ -1,637 +1,726 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from urllib.parse import quote from hypothesis import given import pytest from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ( CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, QualifiedSWHID, ) from swh.model.model import Origin from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.common.exc import BadInputExc from swh.web.common.identifiers import ( gen_swhid, get_swhid, get_swhids_info, group_swhids, resolve_swhid, ) from swh.web.common.typing import SWHObjectInfo from swh.web.common.utils import reverse from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( content, directory, + directory_with_files, directory_with_subdirs, origin, origin_with_multiple_visits, release, revision, snapshot, ) @given(content()) def test_gen_swhid(content): swh_object_type = CONTENT sha1_git = content["sha1_git"] expected_swhid = "swh:1:cnt:" + sha1_git assert gen_swhid(swh_object_type, sha1_git) == expected_swhid assert ( gen_swhid(swh_object_type, sha1_git, metadata={"origin": "test"}) == expected_swhid + ";origin=test" ) assert ( gen_swhid(swh_object_type, sha1_git, metadata={"origin": None}) == expected_swhid ) with pytest.raises(BadInputExc) as e: gen_swhid("foo", sha1_git) assert e.match("Invalid object") with pytest.raises(BadInputExc) as e: gen_swhid(swh_object_type, "not a valid id") assert e.match("Invalid object") @given(content(), directory(), release(), revision(), snapshot()) def test_resolve_swhid_legacy(content, directory, release, revision, snapshot): for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), (RELEASE, release), (REVISION, revision), (SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id) url_args = {} if obj_type == CONTENT: url_args["query_string"] = f"sha1_git:{obj_id}" elif obj_type == SNAPSHOT: url_args["snapshot_id"] = obj_id else: url_args["sha1_git"] = obj_id query_params = {"origin_url": "some-origin"} browse_url = reverse( f"browse-{obj_type}", url_args=url_args, query_params=query_params ) resolved_swhid = resolve_swhid(swhid, query_params) assert isinstance(resolved_swhid["swhid_parsed"], QualifiedSWHID) assert str(resolved_swhid["swhid_parsed"]) == swhid assert resolved_swhid["browse_url"] == browse_url with pytest.raises(BadInputExc, match="'ori' is not a valid ObjectType"): resolve_swhid(f"swh:1:ori:{random_sha1()}") @given(content(), directory(), release(), revision(), snapshot()) def test_get_swhid(content, directory, release, revision, snapshot): for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), (RELEASE, release), (REVISION, revision), (SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id) swh_parsed_swhid = get_swhid(swhid) assert isinstance(swh_parsed_swhid, QualifiedSWHID) assert str(swh_parsed_swhid) == swhid with pytest.raises(BadInputExc, match="Error when parsing identifier"): get_swhid("foo") @given(content(), directory(), release(), revision(), snapshot()) def test_group_swhids(content, directory, release, revision, snapshot): swhids = [] expected = {} for obj_type, obj_id in ( (CONTENT, content["sha1_git"]), (DIRECTORY, directory), (RELEASE, release), (REVISION, revision), (SNAPSHOT, snapshot), ): swhid = gen_swhid(obj_type, obj_id) swhid = get_swhid(swhid) swhids.append(swhid) expected[obj_type] = [hash_to_bytes(obj_id)] swhid_groups = group_swhids(swhids) assert swhid_groups == expected @given(directory_with_subdirs()) def test_get_swhids_info_directory_context(archive_data, directory): swhid = get_swhids_info( [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)], snapshot_context=None, )[0] assert swhid["swhid_with_context"] is None # path qualifier should be discarded for a root directory swhid = get_swhids_info( [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)], snapshot_context=None, extra_context={"path": "/"}, )[0] assert swhid["swhid_with_context"] is None dir_content = archive_data.directory_ls(directory) dir_subdirs = [e for e in dir_content if e["type"] == "dir"] dir_subdir = random.choice(dir_subdirs) dir_subdir_path = f'/{dir_subdir["name"]}/' dir_subdir_content = archive_data.directory_ls(dir_subdir["target"]) dir_subdir_files = [e for e in dir_subdir_content if e["type"] == "file"] swh_objects_info = [ SWHObjectInfo(object_type=DIRECTORY, object_id=dir_subdir["target"]) ] extra_context = { "root_directory": directory, "path": dir_subdir_path, } if dir_subdir_files: dir_subdir_file = random.choice(dir_subdir_files) extra_context["filename"] = dir_subdir_file["name"] swh_objects_info.append( SWHObjectInfo( object_type=CONTENT, object_id=dir_subdir_file["checksums"]["sha1_git"] ) ) swhids = get_swhids_info( swh_objects_info, snapshot_context=None, extra_context=extra_context, ) swhid_dir_parsed = get_swhid(swhids[0]["swhid_with_context"]) anchor = gen_swhid(DIRECTORY, directory) assert swhid_dir_parsed.qualifiers() == { "anchor": anchor, "path": dir_subdir_path, } if dir_subdir_files: swhid_cnt_parsed = get_swhid(swhids[1]["swhid_with_context"]) assert swhid_cnt_parsed.qualifiers() == { "anchor": anchor, "path": f'{dir_subdir_path}{dir_subdir_file["name"]}', } @given(revision()) def test_get_swhids_info_revision_context(archive_data, revision): revision_data = archive_data.revision_get(revision) directory = revision_data["directory"] dir_content = archive_data.directory_ls(directory) dir_entry = random.choice(dir_content) swh_objects = [ SWHObjectInfo(object_type=REVISION, object_id=revision), SWHObjectInfo(object_type=DIRECTORY, object_id=directory), ] extra_context = {"revision": revision, "path": "/"} if dir_entry["type"] == "file": swh_objects.append( SWHObjectInfo( object_type=CONTENT, object_id=dir_entry["checksums"]["sha1_git"] ) ) extra_context["filename"] = dir_entry["name"] swhids = get_swhids_info( swh_objects, snapshot_context=None, extra_context=extra_context, ) assert swhids[0]["context"] == {} swhid_dir_parsed = get_swhid(swhids[1]["swhid_with_context"]) anchor = gen_swhid(REVISION, revision) assert swhid_dir_parsed.qualifiers() == { "anchor": anchor, } if dir_entry["type"] == "file": swhid_cnt_parsed = get_swhid(swhids[2]["swhid_with_context"]) assert swhid_cnt_parsed.qualifiers() == { "anchor": anchor, "path": f'/{dir_entry["name"]}', } @given(origin_with_multiple_visits()) def test_get_swhids_info_origin_snapshot_context(archive_data, origin): """ Test SWHIDs with contextual info computation under a variety of origin / snapshot browsing contexts. """ visits = archive_data.origin_visit_get(origin["url"]) for visit in visits: snapshot = archive_data.snapshot_get(visit["snapshot"]) snapshot_id = snapshot["id"] branches = { k: v["target"] for k, v in snapshot["branches"].items() if v["target_type"] == "revision" } releases = { k: v["target"] for k, v in snapshot["branches"].items() if v["target_type"] == "release" } head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir = head_rev["directory"] dir_content = archive_data.directory_ls(root_dir) dir_files = [e for e in dir_content if e["type"] == "file"] dir_file = random.choice(dir_files) revision_log = [r["id"] for r in archive_data.revision_log(head_rev_id)] branch_name = random.choice(list(branches)) release = random.choice(list(releases)) release_data = archive_data.release_get(releases[release]) release_name = release_data["name"] revision_id = random.choice(revision_log) for snp_ctx_params, anchor_info in ( ( {"snapshot_id": snapshot_id}, {"anchor_type": REVISION, "anchor_id": head_rev_id}, ), ( {"snapshot_id": snapshot_id, "branch_name": branch_name}, {"anchor_type": REVISION, "anchor_id": branches[branch_name]}, ), ( {"snapshot_id": snapshot_id, "release_name": release_name}, {"anchor_type": RELEASE, "anchor_id": releases[release]}, ), ( {"snapshot_id": snapshot_id, "revision_id": revision_id}, {"anchor_type": REVISION, "anchor_id": revision_id}, ), ( {"origin_url": origin["url"], "snapshot_id": snapshot_id}, {"anchor_type": REVISION, "anchor_id": head_rev_id}, ), ( { "origin_url": origin["url"], "snapshot_id": snapshot_id, "branch_name": branch_name, }, {"anchor_type": REVISION, "anchor_id": branches[branch_name]}, ), ( { "origin_url": origin["url"], "snapshot_id": snapshot_id, "release_name": release_name, }, {"anchor_type": RELEASE, "anchor_id": releases[release]}, ), ( { "origin_url": origin["url"], "snapshot_id": snapshot_id, "revision_id": revision_id, }, {"anchor_type": REVISION, "anchor_id": revision_id}, ), ): snapshot_context = get_snapshot_context(**snp_ctx_params) rev_id = head_rev_id if "branch_name" in snp_ctx_params: rev_id = branches[branch_name] elif "release_name" in snp_ctx_params: rev_id = release_data["target"] elif "revision_id" in snp_ctx_params: rev_id = revision_id swh_objects = [ SWHObjectInfo( object_type=CONTENT, object_id=dir_file["checksums"]["sha1_git"] ), SWHObjectInfo(object_type=DIRECTORY, object_id=root_dir), SWHObjectInfo(object_type=REVISION, object_id=rev_id), SWHObjectInfo(object_type=SNAPSHOT, object_id=snapshot_id), ] if "release_name" in snp_ctx_params: swh_objects.append( SWHObjectInfo(object_type=RELEASE, object_id=release_data["id"]) ) swhids = get_swhids_info( swh_objects, snapshot_context, extra_context={"path": "/", "filename": dir_file["name"]}, ) swhid_cnt_parsed = get_swhid(swhids[0]["swhid_with_context"]) swhid_dir_parsed = get_swhid(swhids[1]["swhid_with_context"]) swhid_rev_parsed = get_swhid(swhids[2]["swhid_with_context"]) swhid_snp_parsed = get_swhid( swhids[3]["swhid_with_context"] or swhids[3]["swhid"] ) swhid_rel_parsed = None if "release_name" in snp_ctx_params: swhid_rel_parsed = get_swhid(swhids[4]["swhid_with_context"]) anchor = gen_swhid( object_type=anchor_info["anchor_type"], object_id=anchor_info["anchor_id"], ) snapshot_swhid = gen_swhid(object_type=SNAPSHOT, object_id=snapshot_id) expected_cnt_context = { "visit": snapshot_swhid, "anchor": anchor, "path": f'/{dir_file["name"]}', } expected_dir_context = { "visit": snapshot_swhid, "anchor": anchor, } expected_rev_context = {"visit": snapshot_swhid} expected_snp_context = {} if "origin_url" in snp_ctx_params: expected_cnt_context["origin"] = origin["url"] expected_dir_context["origin"] = origin["url"] expected_rev_context["origin"] = origin["url"] expected_snp_context["origin"] = origin["url"] assert swhid_cnt_parsed.qualifiers() == expected_cnt_context assert swhid_dir_parsed.qualifiers() == expected_dir_context assert swhid_rev_parsed.qualifiers() == expected_rev_context assert swhid_snp_parsed.qualifiers() == expected_snp_context if "release_name" in snp_ctx_params: assert swhid_rel_parsed.qualifiers() == expected_rev_context @given(origin(), directory()) def test_get_swhids_info_characters_and_url_escaping(archive_data, origin, directory): snapshot_context = get_snapshot_context(origin_url=origin["url"]) snapshot_context["origin_info"]["url"] = "http://example.org/?project=abc;def%" path = "/foo;/bar%" swhid_info = get_swhids_info( [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)], snapshot_context=snapshot_context, extra_context={"path": path}, )[0] # check special characters in SWHID have been escaped assert ( swhid_info["context"]["origin"] == "http://example.org/?project%3Dabc%3Bdef%25" ) assert swhid_info["context"]["path"] == "/foo%3B/bar%25" # check special characters in SWHID URL have been escaped parsed_url_swhid = QualifiedSWHID.from_string( swhid_info["swhid_with_context_url"][1:] ) assert ( parsed_url_swhid.qualifiers()["origin"] == "http://example.org/%3Fproject%253Dabc%253Bdef%2525" ) assert parsed_url_swhid.qualifiers()["path"] == "/foo%253B/bar%2525" @given(origin_with_multiple_visits()) def test_resolve_swhids_snapshot_context(client, archive_data, origin): visits = archive_data.origin_visit_get(origin["url"]) visit = random.choice(visits) snapshot = archive_data.snapshot_get(visit["snapshot"]) head_rev_id = archive_data.snapshot_get_head(snapshot) branch_info = None release_info = None for branch_name in sorted(snapshot["branches"]): target_type = snapshot["branches"][branch_name]["target_type"] target = snapshot["branches"][branch_name]["target"] if target_type == "revision" and branch_info is None: branch_info = {"name": branch_name, "revision": target} elif target_type == "release" and release_info is None: release_info = {"name": branch_name, "release": target} if branch_info and release_info: break release_info["name"] = archive_data.release_get(release_info["release"])["name"] directory = archive_data.revision_get(branch_info["revision"])["directory"] directory_content = archive_data.directory_ls(directory) directory_subdirs = [e for e in directory_content if e["type"] == "dir"] directory_subdir = None if directory_subdirs: directory_subdir = random.choice(directory_subdirs) directory_files = [e for e in directory_content if e["type"] == "file"] directory_file = None if directory_files: directory_file = random.choice(directory_files) random_rev_id = random.choice(archive_data.revision_log(head_rev_id))["id"] for snp_ctx_params in ( {}, {"branch_name": branch_info["name"]}, {"release_name": release_info["name"]}, {"revision_id": random_rev_id}, ): snapshot_context = get_snapshot_context( snapshot["id"], origin["url"], **snp_ctx_params ) _check_resolved_swhid_browse_url(SNAPSHOT, snapshot["id"], snapshot_context) rev = head_rev_id if "branch_name" in snp_ctx_params: rev = branch_info["revision"] if "revision_id" in snp_ctx_params: rev = random_rev_id _check_resolved_swhid_browse_url(REVISION, rev, snapshot_context) _check_resolved_swhid_browse_url( DIRECTORY, directory, snapshot_context, path="/" ) if directory_subdir: _check_resolved_swhid_browse_url( DIRECTORY, directory_subdir["target"], snapshot_context, path=f"/{directory_subdir['name']}/", ) if directory_file: _check_resolved_swhid_browse_url( CONTENT, directory_file["target"], snapshot_context, path=f"/{directory_file['name']}", ) _check_resolved_swhid_browse_url( CONTENT, directory_file["target"], snapshot_context, path=f"/{directory_file['name']}", lines="10", ) _check_resolved_swhid_browse_url( CONTENT, directory_file["target"], snapshot_context, path=f"/{directory_file['name']}", lines="10-20", ) def _check_resolved_swhid_browse_url( object_type, object_id, snapshot_context, path=None, lines=None ): snapshot_id = snapshot_context["snapshot_id"] origin_url = None if snapshot_context["origin_info"]: origin_url = snapshot_context["origin_info"]["url"] obj_context = {} query_params = {} if origin_url: obj_context["origin"] = origin_url query_params["origin_url"] = origin_url obj_context["visit"] = gen_swhid(SNAPSHOT, snapshot_id) query_params["snapshot"] = snapshot_id if object_type in (CONTENT, DIRECTORY, REVISION): if snapshot_context["release"]: obj_context["anchor"] = gen_swhid(RELEASE, snapshot_context["release_id"]) query_params["release"] = snapshot_context["release"] else: obj_context["anchor"] = gen_swhid(REVISION, snapshot_context["revision_id"]) if object_type != REVISION: query_params["revision"] = snapshot_context["revision_id"] if path: obj_context["path"] = path if path != "/": if object_type == CONTENT: query_params["path"] = path[1:] else: query_params["path"] = path[1:-1] if object_type == DIRECTORY: object_id = snapshot_context["root_directory"] if lines: obj_context["lines"] = lines obj_swhid = gen_swhid(object_type, object_id, metadata=obj_context) obj_swhid_resolved = resolve_swhid(obj_swhid) url_args = {"sha1_git": object_id} if object_type == CONTENT: url_args = {"query_string": f"sha1_git:{object_id}"} elif object_type == SNAPSHOT: url_args = {"snapshot_id": object_id} expected_url = reverse( f"browse-{object_type}", url_args=url_args, query_params=query_params, ) if lines: lines_number = lines.split("-") expected_url += f"#L{lines_number[0]}" if len(lines_number) > 1: expected_url += f"-L{lines_number[1]}" assert obj_swhid_resolved["browse_url"] == expected_url @given(directory()) def test_resolve_swhid_with_escaped_chars(directory): origin = "http://example.org/?project=abc;" origin_swhid_escaped = quote(origin, safe="/?:@&") origin_swhid_url_escaped = quote(origin, safe="/:@;") swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}) resolved_swhid = resolve_swhid(swhid) assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped assert origin_swhid_url_escaped in resolved_swhid["browse_url"] @given(directory_with_subdirs()) def test_resolve_directory_swhid_path_without_trailing_slash(archive_data, directory): dir_content = archive_data.directory_ls(directory) dir_subdirs = [e for e in dir_content if e["type"] == "dir"] dir_subdir = random.choice(dir_subdirs) dir_subdir_path = dir_subdir["name"] anchor = gen_swhid(DIRECTORY, directory) swhid = gen_swhid( DIRECTORY, dir_subdir["target"], metadata={"anchor": anchor, "path": "/" + dir_subdir_path}, ) resolved_swhid = resolve_swhid(swhid) browse_url = reverse( "browse-directory", url_args={"sha1_git": directory}, query_params={"path": dir_subdir_path}, ) assert resolved_swhid["browse_url"] == browse_url @given(directory()) def test_resolve_swhid_with_malformed_origin_url(archive_data, directory): origin_url = "http://example.org/project/abc" malformed_origin_url = "http:/example.org/project/abc" archive_data.origin_add([Origin(url=origin_url)]) swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": malformed_origin_url}) resolved_swhid = resolve_swhid(swhid) assert origin_url in resolved_swhid["browse_url"] + + +@given(revision()) +def test_resolve_dir_entry_swhid_with_anchor_revision(archive_data, revision): + revision_data = archive_data.revision_get(revision) + directory = revision_data["directory"] + dir_content = archive_data.directory_ls(directory) + dir_entry = random.choice(dir_content) + + rev_swhid = gen_swhid(REVISION, revision) + + if dir_entry["type"] == "rev": + return + + if dir_entry["type"] == "file": + swhid = gen_swhid( + CONTENT, + dir_entry["checksums"]["sha1_git"], + metadata={"anchor": rev_swhid, "path": f"/{dir_entry['name']}"}, + ) + + else: + swhid = gen_swhid( + DIRECTORY, + dir_entry["target"], + metadata={"anchor": rev_swhid, "path": f"/{dir_entry['name']}/"}, + ) + + browse_url = reverse( + "browse-revision", + url_args={"sha1_git": revision}, + query_params={"path": dir_entry["name"]}, + ) + + resolved_swhid = resolve_swhid(swhid) + + assert resolved_swhid["browse_url"] == browse_url + + +@given(directory_with_subdirs()) +def test_resolve_dir_entry_swhid_with_anchor_directory(archive_data, directory): + dir_content = archive_data.directory_ls(directory) + dir_entry = random.choice( + [entry for entry in dir_content if entry["type"] == "dir"] + ) + + dir_swhid = gen_swhid(DIRECTORY, directory) + + swhid = gen_swhid( + DIRECTORY, + dir_entry["target"], + metadata={"anchor": dir_swhid, "path": f"/{dir_entry['name']}/"}, + ) + browse_url = reverse( + "browse-directory", + url_args={"sha1_git": directory}, + query_params={"path": f"{dir_entry['name']}"}, + ) + + resolved_swhid = resolve_swhid(swhid) + + assert resolved_swhid["browse_url"] == browse_url + + +@given(directory_with_files()) +def test_resolve_file_entry_swhid_with_anchor_directory(archive_data, directory): + dir_content = archive_data.directory_ls(directory) + file_entry = random.choice( + [entry for entry in dir_content if entry["type"] == "file"] + ) + + dir_swhid = gen_swhid(DIRECTORY, directory) + + sha1_git = file_entry["checksums"]["sha1_git"] + swhid = gen_swhid( + CONTENT, + sha1_git, + metadata={"anchor": dir_swhid, "path": f"/{file_entry['name']}"}, + ) + browse_url = reverse( + "browse-content", + url_args={"query_string": f"sha1_git:{sha1_git}"}, + query_params={"path": f"{directory}/{file_entry['name']}"}, + ) + + resolved_swhid = resolve_swhid(swhid) + + assert resolved_swhid["browse_url"] == browse_url diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index a4e53e33..dff099fa 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,633 +1,645 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from datetime import datetime import random from hypothesis import assume, settings from hypothesis.extra.dateutil import timezones from hypothesis.strategies import ( binary, characters, composite, datetimes, just, lists, sampled_from, text, ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.model.hypothesis_strategies import origins as new_origin_strategy from swh.model.hypothesis_strategies import snapshots as new_snapshot from swh.model.model import ( Content, Directory, Person, Revision, RevisionType, TimestampWithTimezone, ) from swh.storage.algos.revisions_walker import get_revisions_walker from swh.storage.algos.snapshot import snapshot_get_latest from swh.web.common.utils import browsers_supported_image_mimes from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set the swh-web hypothesis profile if none has been explicitly set hypothesis_default_settings = settings.get_profile("default") if repr(settings()) == repr(hypothesis_default_settings): settings.load_profile("swh-web") # The following strategies exploit the hypothesis capabilities def _filter_checksum(cs): generated_checksums = get_tests_data()["generated_checksums"] if not int.from_bytes(cs, byteorder="little") or cs in generated_checksums: return False generated_checksums.add(cs) return True def _known_swh_object(object_type): return sampled_from(get_tests_data()[object_type]) def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ return binary(min_size=20, max_size=20).filter(_filter_checksum).map(hash_to_hex) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ return binary(min_size=50, max_size=50).filter(_filter_checksum).map(hash_to_hex) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ return binary(min_size=32, max_size=32).filter(_filter_checksum).map(hash_to_hex) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object("contents") def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) def empty_content(): """ Hypothesis strategy returning the empty content ingested into the test archive. """ empty_content = Content.from_data(data=b"").to_dict() for algo in DEFAULT_ALGORITHMS: empty_content[algo] = hash_to_hex(empty_content[algo]) return just(empty_content) def content_text(): """ Hypothesis strategy returning random textual contents ingested into the test archive. """ return content().filter(lambda c: c["mimetype"].startswith("text/")) def content_text_non_utf8(): """ Hypothesis strategy returning random textual contents not encoded to UTF-8 ingested into the test archive. """ return content().filter( lambda c: c["mimetype"].startswith("text/") and c["encoding"] not in ("utf-8", "us-ascii") ) def content_text_no_highlight(): """ Hypothesis strategy returning random textual contents with no detected programming language to highlight ingested into the test archive. """ return content().filter( lambda c: c["mimetype"].startswith("text/") and c["hljs_language"] == "nohighlight" ) def content_image_type(): """ Hypothesis strategy returning random image contents ingested into the test archive. """ return content().filter(lambda c: c["mimetype"] in browsers_supported_image_mimes) def content_unsupported_image_type_rendering(): """ Hypothesis strategy returning random image contents ingested into the test archive that can not be rendered by browsers. """ return content().filter( lambda c: c["mimetype"].startswith("image/") and c["mimetype"] not in browsers_supported_image_mimes ) def content_utf8_detected_as_binary(): """ Hypothesis strategy returning random textual contents detected as binary by libmagic while they are valid UTF-8 encoded files. """ def utf8_binary_detected(content): if content["encoding"] != "binary": return False try: content["data"].decode("utf-8") except Exception: return False else: return True return content().filter(utf8_binary_detected) @composite def new_content(draw): blake2s256_hex = draw(sha256()) sha1_hex = draw(sha1()) sha1_git_hex = draw(sha1()) sha256_hex = draw(sha256()) assume(sha1_hex != sha1_git_hex) assume(blake2s256_hex != sha256_hex) return { "blake2S256": blake2s256_hex, "sha1": sha1_hex, "sha1_git": sha1_git_hex, "sha256": sha256_hex, } def unknown_content(): """ Hypothesis strategy returning a random content not ingested into the test archive. """ return new_content().filter( lambda c: get_tests_data()["storage"].content_get_data(hash_to_bytes(c["sha1"])) is None ) def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object("directories") -def directory_with_subdirs(): - """ - Hypothesis strategy returning a random directory containing - sub directories ingested into the test archive. - """ +def _directory_with_entry_type(type_): return directory().filter( lambda d: any( [ - e["type"] == "dir" + e["type"] == type_ for e in list( get_tests_data()["storage"].directory_ls(hash_to_bytes(d)) ) ] ) ) +def directory_with_subdirs(): + """ + Hypothesis strategy returning a random directory containing + sub directories ingested into the test archive. + """ + return _directory_with_entry_type("dir") + + +def directory_with_files(): + """ + Hypothesis strategy returning a random directory containing + at least one regular file + """ + return _directory_with_entry_type("file") + + def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(Directory(entries=()).id.hex()) def unknown_directory(): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ return sha1().filter( lambda s: len( list(get_tests_data()["storage"].directory_missing([hash_to_bytes(s)])) ) > 0 ) def origin(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object("origins") def origin_with_multiple_visits(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() storage = tests_data["storage"] for origin in tests_data["origins"]: visit_page = storage.origin_visit_get(origin["url"]) if len(visit_page.results) > 1: ret.append(origin) return sampled_from(ret) def origin_with_releases(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data["origins"]: snapshot = snapshot_get_latest(tests_data["storage"], origin["url"]) if any([b.target_type.value == "release" for b in snapshot.branches.values()]): ret.append(origin) return sampled_from(ret) def origin_with_pull_request_branches(): """ Hypothesis strategy returning a random origin with pull request branches ingested into the test archive. """ ret = [] tests_data = get_tests_data() storage = tests_data["storage"] origins = storage.origin_list(limit=1000) for origin in origins.results: snapshot = snapshot_get_latest(storage, origin.url) if any([b"refs/pull/" in b for b in snapshot.branches]): ret.append(origin) return sampled_from(ret) def new_origin(): """ Hypothesis strategy returning a random origin not ingested into the test archive. """ return new_origin_strategy().filter( lambda origin: get_tests_data()["storage"].origin_get([origin.url])[0] is None ) def new_origins(nb_origins=None): """ Hypothesis strategy returning random origins not ingested into the test archive. """ min_size = nb_origins if nb_origins is not None else 2 max_size = nb_origins if nb_origins is not None else 8 size = random.randint(min_size, max_size) return lists( new_origin(), min_size=size, max_size=size, unique_by=lambda o: tuple(sorted(o.items())), ) def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ min_size = nb_dates if nb_dates else 2 max_size = nb_dates if nb_dates else 8 return lists( datetimes( min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0), timezones=timezones(), ), min_size=min_size, max_size=max_size, unique=True, ).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object("releases") def releases(min_size=2, max_size=8): """ Hypothesis strategy returning random releases ingested into the test archive. """ return lists(release(), min_size=min_size, max_size=max_size) def unknown_release(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: get_tests_data()["storage"].release_get([s])[0] is None ) def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object("revisions") def unknown_revision(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: get_tests_data()["storage"].revision_get([hash_to_bytes(s)])[0] is None ) @composite def new_person(draw): """ Hypothesis strategy returning random raw swh person data. """ name = draw( text( min_size=5, max_size=30, alphabet=characters(min_codepoint=0, max_codepoint=255), ) ) email = "%s@company.org" % name return Person( name=name.encode(), email=email.encode(), fullname=("%s <%s>" % (name, email)).encode(), ) @composite def new_swh_date(draw): """ Hypothesis strategy returning random raw swh date data. """ timestamp = draw( datetimes( min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0) ).map(lambda d: int(d.timestamp())) ) return { "timestamp": timestamp, "offset": 0, "negative_utc": False, } @composite def new_revision(draw): """ Hypothesis strategy returning random raw swh revision data not ingested into the test archive. """ return Revision( directory=draw(sha1().map(hash_to_bytes)), author=draw(new_person()), committer=draw(new_person()), message=draw(text(min_size=20, max_size=100).map(lambda t: t.encode())), date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), committer_date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), synthetic=False, type=RevisionType.GIT, ) def revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=min_size, max_size=max_size) def unknown_revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=min_size, max_size=max_size) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object("snapshots") def new_snapshots(nb_snapshots=None): min_size = nb_snapshots if nb_snapshots else 2 max_size = nb_snapshots if nb_snapshots else 8 return lists( new_snapshot(min_size=2, max_size=10, only_objects=True), min_size=min_size, max_size=max_size, ) def unknown_snapshot(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: get_tests_data()["storage"].snapshot_get_branches(hash_to_bytes(s)) is None ) def _get_origin_dfs_revisions_walker(): tests_data = get_tests_data() storage = tests_data["storage"] origin = random.choice(tests_data["origins"][:-1]) snapshot = snapshot_get_latest(storage, origin["url"]) if snapshot.branches[b"HEAD"].target_type.value == "alias": target = snapshot.branches[b"HEAD"].target head = snapshot.branches[target].target else: head = snapshot.branches[b"HEAD"].target return get_revisions_walker("dfs", storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) if not init_rev_found: master_revisions.append(rev) if not rev["parents"]: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev["id"]] return just( { "sha1_git_root": hash_to_hex(root_rev["id"]), "sha1_git": hash_to_hex(ancestor_rev["id"]), "children": [hash_to_hex(r) for r in ancestor_child_revs], } ) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev["parents"]) > 1: merge_revs.append(rev) for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]): selected_revs = merge_rev["parents"] return just( { "sha1_git_root": hash_to_hex(selected_revs[0]), "sha1_git": hash_to_hex(selected_revs[1]), } ) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just( { "sha1s": [ "0ab37c02043ebff946c1937523f60aadd0844351", "15554cf7608dde6bfefac7e3d525596343a85b6f", "2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd", "30acd0b47fc25e159e27a980102ddb1c4bea0b95", "4f81f05aaea3efb981f9d90144f746d6b682285b", "5153aa4b6e4455a62525bc4de38ed0ff6e7dd682", "59d08bafa6a749110dfb65ba43a61963d5a5bf9f", "7568285b2d7f31ae483ae71617bd3db873deaa2c", "7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4", "8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03", "9b3557f1ab4111c8607a4f2ea3c1e53c6992916c", "9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd", "c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b", "e89e55a12def4cd54d5bff58378a3b5119878eb7", "e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e", "eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5", ], "symbol_name": "ABS", } ) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just( { "rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234", "rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2", "rev_dir_rev_path": "libtess2", } )