Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_identifiers.py
Show All 10 Lines | |||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
CONTENT, | CONTENT, | ||||
DIRECTORY, | DIRECTORY, | ||||
RELEASE, | RELEASE, | ||||
REVISION, | REVISION, | ||||
SNAPSHOT, | SNAPSHOT, | ||||
SWHID, | QualifiedSWHID, | ||||
parse_swhid, | |||||
) | ) | ||||
from swh.model.model import Origin | from swh.model.model import Origin | ||||
from swh.web.browse.snapshot_context import get_snapshot_context | from swh.web.browse.snapshot_context import get_snapshot_context | ||||
from swh.web.common.exc import BadInputExc | from swh.web.common.exc import BadInputExc | ||||
from swh.web.common.identifiers import ( | from swh.web.common.identifiers import ( | ||||
gen_swhid, | gen_swhid, | ||||
get_swhid, | get_swhid, | ||||
get_swhids_info, | get_swhids_info, | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | ): | ||||
url_args["sha1_git"] = obj_id | url_args["sha1_git"] = obj_id | ||||
query_params = {"origin_url": "some-origin"} | query_params = {"origin_url": "some-origin"} | ||||
browse_url = reverse( | browse_url = reverse( | ||||
f"browse-{obj_type}", url_args=url_args, query_params=query_params | f"browse-{obj_type}", url_args=url_args, query_params=query_params | ||||
) | ) | ||||
resolved_swhid = resolve_swhid(swhid, query_params) | resolved_swhid = resolve_swhid(swhid, query_params) | ||||
assert isinstance(resolved_swhid["swhid_parsed"], SWHID) | assert isinstance(resolved_swhid["swhid_parsed"], QualifiedSWHID) | ||||
assert str(resolved_swhid["swhid_parsed"]) == swhid | assert str(resolved_swhid["swhid_parsed"]) == swhid | ||||
assert resolved_swhid["browse_url"] == browse_url | assert resolved_swhid["browse_url"] == browse_url | ||||
with pytest.raises(BadInputExc, match="Origin SWHIDs"): | with pytest.raises(BadInputExc, match="'ori' is not a valid ObjectType"): | ||||
resolve_swhid(f"swh:1:ori:{random_sha1()}") | resolve_swhid(f"swh:1:ori:{random_sha1()}") | ||||
@given(content(), directory(), release(), revision(), snapshot()) | @given(content(), directory(), release(), revision(), snapshot()) | ||||
def test_get_swhid(content, directory, release, revision, snapshot): | def test_get_swhid(content, directory, release, revision, snapshot): | ||||
for obj_type, obj_id in ( | for obj_type, obj_id in ( | ||||
(CONTENT, content["sha1_git"]), | (CONTENT, content["sha1_git"]), | ||||
(DIRECTORY, directory), | (DIRECTORY, directory), | ||||
(RELEASE, release), | (RELEASE, release), | ||||
(REVISION, revision), | (REVISION, revision), | ||||
(SNAPSHOT, snapshot), | (SNAPSHOT, snapshot), | ||||
): | ): | ||||
swhid = gen_swhid(obj_type, obj_id) | swhid = gen_swhid(obj_type, obj_id) | ||||
swh_parsed_swhid = get_swhid(swhid) | swh_parsed_swhid = get_swhid(swhid) | ||||
assert isinstance(swh_parsed_swhid, SWHID) | assert isinstance(swh_parsed_swhid, QualifiedSWHID) | ||||
assert str(swh_parsed_swhid) == swhid | assert str(swh_parsed_swhid) == swhid | ||||
with pytest.raises(BadInputExc, match="Error when parsing identifier"): | with pytest.raises(BadInputExc, match="Error when parsing identifier"): | ||||
get_swhid("foo") | get_swhid("foo") | ||||
@given(content(), directory(), release(), revision(), snapshot()) | @given(content(), directory(), release(), revision(), snapshot()) | ||||
def test_group_swhids(content, directory, release, revision, snapshot): | def test_group_swhids(content, directory, release, revision, snapshot): | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def test_get_swhids_info_directory_context(archive_data, directory): | ||||
swhids = get_swhids_info( | swhids = get_swhids_info( | ||||
swh_objects_info, snapshot_context=None, extra_context=extra_context, | swh_objects_info, snapshot_context=None, extra_context=extra_context, | ||||
) | ) | ||||
swhid_dir_parsed = get_swhid(swhids[0]["swhid_with_context"]) | swhid_dir_parsed = get_swhid(swhids[0]["swhid_with_context"]) | ||||
anchor = gen_swhid(DIRECTORY, directory) | anchor = gen_swhid(DIRECTORY, directory) | ||||
assert swhid_dir_parsed.metadata == { | assert swhid_dir_parsed.qualifiers() == { | ||||
"anchor": anchor, | "anchor": anchor, | ||||
"path": dir_subdir_path, | "path": dir_subdir_path, | ||||
} | } | ||||
if dir_subdir_files: | if dir_subdir_files: | ||||
swhid_cnt_parsed = get_swhid(swhids[1]["swhid_with_context"]) | swhid_cnt_parsed = get_swhid(swhids[1]["swhid_with_context"]) | ||||
assert swhid_cnt_parsed.metadata == { | assert swhid_cnt_parsed.qualifiers() == { | ||||
"anchor": anchor, | "anchor": anchor, | ||||
"path": f'{dir_subdir_path}{dir_subdir_file["name"]}', | "path": f'{dir_subdir_path}{dir_subdir_file["name"]}', | ||||
} | } | ||||
@given(revision()) | @given(revision()) | ||||
def test_get_swhids_info_revision_context(archive_data, revision): | def test_get_swhids_info_revision_context(archive_data, revision): | ||||
revision_data = archive_data.revision_get(revision) | revision_data = archive_data.revision_get(revision) | ||||
Show All 19 Lines | swhids = get_swhids_info( | ||||
swh_objects, snapshot_context=None, extra_context=extra_context, | swh_objects, snapshot_context=None, extra_context=extra_context, | ||||
) | ) | ||||
assert swhids[0]["context"] == {} | assert swhids[0]["context"] == {} | ||||
swhid_dir_parsed = get_swhid(swhids[1]["swhid_with_context"]) | swhid_dir_parsed = get_swhid(swhids[1]["swhid_with_context"]) | ||||
anchor = gen_swhid(REVISION, revision) | anchor = gen_swhid(REVISION, revision) | ||||
assert swhid_dir_parsed.metadata == { | assert swhid_dir_parsed.qualifiers() == { | ||||
"anchor": anchor, | "anchor": anchor, | ||||
} | } | ||||
if dir_entry["type"] == "file": | if dir_entry["type"] == "file": | ||||
swhid_cnt_parsed = get_swhid(swhids[2]["swhid_with_context"]) | swhid_cnt_parsed = get_swhid(swhids[2]["swhid_with_context"]) | ||||
assert swhid_cnt_parsed.metadata == { | assert swhid_cnt_parsed.qualifiers() == { | ||||
"anchor": anchor, | "anchor": anchor, | ||||
"path": f'/{dir_entry["name"]}', | "path": f'/{dir_entry["name"]}', | ||||
} | } | ||||
@given(origin_with_multiple_visits()) | @given(origin_with_multiple_visits()) | ||||
def test_get_swhids_info_origin_snapshot_context(archive_data, origin): | def test_get_swhids_info_origin_snapshot_context(archive_data, origin): | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 142 Lines • ▼ Show 20 Lines | for visit in visits: | ||||
expected_snp_context = {} | expected_snp_context = {} | ||||
if "origin_url" in snp_ctx_params: | if "origin_url" in snp_ctx_params: | ||||
expected_cnt_context["origin"] = origin["url"] | expected_cnt_context["origin"] = origin["url"] | ||||
expected_dir_context["origin"] = origin["url"] | expected_dir_context["origin"] = origin["url"] | ||||
expected_rev_context["origin"] = origin["url"] | expected_rev_context["origin"] = origin["url"] | ||||
expected_snp_context["origin"] = origin["url"] | expected_snp_context["origin"] = origin["url"] | ||||
assert swhid_cnt_parsed.metadata == expected_cnt_context | assert swhid_cnt_parsed.qualifiers() == expected_cnt_context | ||||
assert swhid_dir_parsed.metadata == expected_dir_context | assert swhid_dir_parsed.qualifiers() == expected_dir_context | ||||
assert swhid_rev_parsed.metadata == expected_rev_context | assert swhid_rev_parsed.qualifiers() == expected_rev_context | ||||
assert swhid_snp_parsed.metadata == expected_snp_context | assert swhid_snp_parsed.qualifiers() == expected_snp_context | ||||
if "release_name" in snp_ctx_params: | if "release_name" in snp_ctx_params: | ||||
assert swhid_rel_parsed.metadata == expected_rev_context | assert swhid_rel_parsed.qualifiers() == expected_rev_context | ||||
@given(origin(), directory()) | @given(origin(), directory()) | ||||
def test_get_swhids_info_characters_and_url_escaping(archive_data, origin, directory): | def test_get_swhids_info_characters_and_url_escaping(archive_data, origin, directory): | ||||
snapshot_context = get_snapshot_context(origin_url=origin["url"]) | snapshot_context = get_snapshot_context(origin_url=origin["url"]) | ||||
snapshot_context["origin_info"]["url"] = "http://example.org/?project=abc;def%" | snapshot_context["origin_info"]["url"] = "http://example.org/?project=abc;def%" | ||||
path = "/foo;/bar%" | path = "/foo;/bar%" | ||||
swhid_info = get_swhids_info( | swhid_info = get_swhids_info( | ||||
[SWHObjectInfo(object_type=DIRECTORY, object_id=directory)], | [SWHObjectInfo(object_type=DIRECTORY, object_id=directory)], | ||||
snapshot_context=snapshot_context, | snapshot_context=snapshot_context, | ||||
extra_context={"path": path}, | extra_context={"path": path}, | ||||
)[0] | )[0] | ||||
# check special characters in SWHID have been escaped | # check special characters in SWHID have been escaped | ||||
assert ( | assert ( | ||||
swhid_info["context"]["origin"] == "http://example.org/?project%3Dabc%3Bdef%25" | swhid_info["context"]["origin"] == "http://example.org/?project%3Dabc%3Bdef%25" | ||||
) | ) | ||||
assert swhid_info["context"]["path"] == "/foo%3B/bar%25" | assert swhid_info["context"]["path"] == "/foo%3B/bar%25" | ||||
# check special characters in SWHID URL have been escaped | # check special characters in SWHID URL have been escaped | ||||
parsed_url_swhid = parse_swhid(swhid_info["swhid_with_context_url"][1:-1]) | parsed_url_swhid = QualifiedSWHID.from_string( | ||||
swhid_info["swhid_with_context_url"][1:-1] | |||||
) | |||||
assert ( | assert ( | ||||
parsed_url_swhid.metadata["origin"] | parsed_url_swhid.qualifiers()["origin"] | ||||
== "http://example.org/%3Fproject%253Dabc%253Bdef%2525" | == "http://example.org/%3Fproject%253Dabc%253Bdef%2525" | ||||
) | ) | ||||
assert parsed_url_swhid.metadata["path"] == "/foo%253B/bar%2525" | assert parsed_url_swhid.qualifiers()["path"] == "/foo%253B/bar%2525" | ||||
@given(origin_with_multiple_visits()) | @given(origin_with_multiple_visits()) | ||||
def test_resolve_swhids_snapshot_context(client, archive_data, origin): | def test_resolve_swhids_snapshot_context(client, archive_data, origin): | ||||
visits = archive_data.origin_visit_get(origin["url"]) | visits = archive_data.origin_visit_get(origin["url"]) | ||||
visit = random.choice(visits) | visit = random.choice(visits) | ||||
snapshot = archive_data.snapshot_get(visit["snapshot"]) | snapshot = archive_data.snapshot_get(visit["snapshot"]) | ||||
head_rev_id = archive_data.snapshot_get_head(snapshot) | head_rev_id = archive_data.snapshot_get_head(snapshot) | ||||
▲ Show 20 Lines • Show All 161 Lines • ▼ Show 20 Lines | |||||
@given(directory()) | @given(directory()) | ||||
def test_resolve_swhid_with_escaped_chars(directory): | def test_resolve_swhid_with_escaped_chars(directory): | ||||
origin = "http://example.org/?project=abc;" | origin = "http://example.org/?project=abc;" | ||||
origin_swhid_escaped = quote(origin, safe="/?:@&") | origin_swhid_escaped = quote(origin, safe="/?:@&") | ||||
origin_swhid_url_escaped = quote(origin, safe="/:@;") | origin_swhid_url_escaped = quote(origin, safe="/:@;") | ||||
swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}) | swhid = gen_swhid(DIRECTORY, directory, metadata={"origin": origin_swhid_escaped}) | ||||
resolved_swhid = resolve_swhid(swhid) | resolved_swhid = resolve_swhid(swhid) | ||||
assert resolved_swhid["swhid_parsed"].metadata["origin"] == origin_swhid_escaped | assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped | ||||
assert origin_swhid_url_escaped in resolved_swhid["browse_url"] | assert origin_swhid_url_escaped in resolved_swhid["browse_url"] | ||||
@given(directory_with_subdirs()) | @given(directory_with_subdirs()) | ||||
def test_resolve_directory_swhid_path_without_trailing_slash(archive_data, directory): | def test_resolve_directory_swhid_path_without_trailing_slash(archive_data, directory): | ||||
dir_content = archive_data.directory_ls(directory) | dir_content = archive_data.directory_ls(directory) | ||||
dir_subdirs = [e for e in dir_content if e["type"] == "dir"] | dir_subdirs = [e for e in dir_content if e["type"] == "dir"] | ||||
dir_subdir = random.choice(dir_subdirs) | dir_subdir = random.choice(dir_subdirs) | ||||
Show All 24 Lines |