Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_identifiers.py
Show All 20 Lines | from swh.web.common.identifiers import ( | ||||
group_swhids, | group_swhids, | ||||
parse_object_type, | parse_object_type, | ||||
resolve_swhid, | resolve_swhid, | ||||
) | ) | ||||
from swh.web.common.typing import SWHObjectInfo | from swh.web.common.typing import SWHObjectInfo | ||||
from swh.web.common.utils import reverse | from swh.web.common.utils import reverse | ||||
from swh.web.tests.data import random_sha1 | from swh.web.tests.data import random_sha1 | ||||
from swh.web.tests.strategies import ( | from swh.web.tests.strategies import ( | ||||
directory, | |||||
directory_with_files, | |||||
directory_with_subdirs, | |||||
origin, | origin, | ||||
origin_with_multiple_visits, | origin_with_multiple_visits, | ||||
release, | release, | ||||
revision, | revision, | ||||
snapshot, | snapshot, | ||||
) | ) | ||||
Show All 27 Lines | def test_parse_object_type(): | ||||
assert parse_object_type("release") == ObjectType.RELEASE | assert parse_object_type("release") == ObjectType.RELEASE | ||||
assert parse_object_type("snapshot") == ObjectType.SNAPSHOT | assert parse_object_type("snapshot") == ObjectType.SNAPSHOT | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
parse_object_type("foo") | parse_object_type("foo") | ||||
assert e.match("Invalid swh object type") | assert e.match("Invalid swh object type") | ||||
@given(directory(), release(), revision(), snapshot()) | @given(release(), revision(), snapshot()) | ||||
def test_resolve_swhid_legacy(content, directory, release, revision, snapshot): | def test_resolve_swhid_legacy(content, directory, release, revision, snapshot): | ||||
for obj_type, obj_id in ( | for obj_type, obj_id in ( | ||||
(ObjectType.CONTENT, content["sha1_git"]), | (ObjectType.CONTENT, content["sha1_git"]), | ||||
(ObjectType.DIRECTORY, directory), | (ObjectType.DIRECTORY, directory), | ||||
(ObjectType.RELEASE, release), | (ObjectType.RELEASE, release), | ||||
(ObjectType.REVISION, revision), | (ObjectType.REVISION, revision), | ||||
(ObjectType.SNAPSHOT, snapshot), | (ObjectType.SNAPSHOT, snapshot), | ||||
): | ): | ||||
Show All 20 Lines | ): | ||||
assert isinstance(resolved_swhid["swhid_parsed"], QualifiedSWHID) | assert isinstance(resolved_swhid["swhid_parsed"], QualifiedSWHID) | ||||
assert str(resolved_swhid["swhid_parsed"]) == swhid | assert str(resolved_swhid["swhid_parsed"]) == swhid | ||||
assert resolved_swhid["browse_url"] == browse_url | assert resolved_swhid["browse_url"] == browse_url | ||||
with pytest.raises(BadInputExc, match="'ori' is not a valid ObjectType"): | with pytest.raises(BadInputExc, match="'ori' is not a valid ObjectType"): | ||||
resolve_swhid(f"swh:1:ori:{random_sha1()}") | resolve_swhid(f"swh:1:ori:{random_sha1()}") | ||||
@given(directory(), release(), revision(), snapshot()) | @given(release(), revision(), snapshot()) | ||||
def test_get_swhid(content, directory, release, revision, snapshot): | def test_get_swhid(content, directory, release, revision, snapshot): | ||||
for obj_type, obj_id in ( | for obj_type, obj_id in ( | ||||
(ObjectType.CONTENT, content["sha1_git"]), | (ObjectType.CONTENT, content["sha1_git"]), | ||||
(ObjectType.DIRECTORY, directory), | (ObjectType.DIRECTORY, directory), | ||||
(ObjectType.RELEASE, release), | (ObjectType.RELEASE, release), | ||||
(ObjectType.REVISION, revision), | (ObjectType.REVISION, revision), | ||||
(ObjectType.SNAPSHOT, snapshot), | (ObjectType.SNAPSHOT, snapshot), | ||||
): | ): | ||||
swhid = gen_swhid(obj_type, obj_id) | swhid = gen_swhid(obj_type, obj_id) | ||||
for swhid_ in (swhid, swhid.upper()): | for swhid_ in (swhid, swhid.upper()): | ||||
swh_parsed_swhid = get_swhid(swhid_) | swh_parsed_swhid = get_swhid(swhid_) | ||||
assert isinstance(swh_parsed_swhid, QualifiedSWHID) | assert isinstance(swh_parsed_swhid, QualifiedSWHID) | ||||
assert str(swh_parsed_swhid) == swhid.lower() | assert str(swh_parsed_swhid) == swhid.lower() | ||||
with pytest.raises(BadInputExc, match="Error when parsing identifier"): | with pytest.raises(BadInputExc, match="Error when parsing identifier"): | ||||
get_swhid("foo") | get_swhid("foo") | ||||
@given(directory(), release(), revision(), snapshot()) | @given(release(), revision(), snapshot()) | ||||
def test_group_swhids(content, directory, release, revision, snapshot): | def test_group_swhids(content, directory, release, revision, snapshot): | ||||
swhids = [] | swhids = [] | ||||
expected = {} | expected = {} | ||||
for obj_type, obj_id in ( | for obj_type, obj_id in ( | ||||
(ObjectType.CONTENT, content["sha1_git"]), | (ObjectType.CONTENT, content["sha1_git"]), | ||||
(ObjectType.DIRECTORY, directory), | (ObjectType.DIRECTORY, directory), | ||||
(ObjectType.RELEASE, release), | (ObjectType.RELEASE, release), | ||||
(ObjectType.REVISION, revision), | (ObjectType.REVISION, revision), | ||||
(ObjectType.SNAPSHOT, snapshot), | (ObjectType.SNAPSHOT, snapshot), | ||||
): | ): | ||||
swhid = gen_swhid(obj_type, obj_id) | swhid = gen_swhid(obj_type, obj_id) | ||||
swhid = get_swhid(swhid) | swhid = get_swhid(swhid) | ||||
swhids.append(swhid) | swhids.append(swhid) | ||||
expected[obj_type] = [hash_to_bytes(obj_id)] | expected[obj_type] = [hash_to_bytes(obj_id)] | ||||
swhid_groups = group_swhids(swhids) | swhid_groups = group_swhids(swhids) | ||||
assert swhid_groups == expected | assert swhid_groups == expected | ||||
@given(directory_with_subdirs()) | def test_get_swhids_info_directory_context(archive_data, directory_with_subdirs): | ||||
def test_get_swhids_info_directory_context(archive_data, directory): | |||||
swhid = get_swhids_info( | swhid = get_swhids_info( | ||||
[SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory)], | [ | ||||
SWHObjectInfo( | |||||
object_type=ObjectType.DIRECTORY, object_id=directory_with_subdirs | |||||
) | |||||
], | |||||
snapshot_context=None, | snapshot_context=None, | ||||
)[0] | )[0] | ||||
assert swhid["swhid_with_context"] is None | assert swhid["swhid_with_context"] is None | ||||
# path qualifier should be discarded for a root directory | # path qualifier should be discarded for a root directory | ||||
swhid = get_swhids_info( | swhid = get_swhids_info( | ||||
[SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory)], | [ | ||||
SWHObjectInfo( | |||||
object_type=ObjectType.DIRECTORY, object_id=directory_with_subdirs | |||||
) | |||||
], | |||||
snapshot_context=None, | snapshot_context=None, | ||||
extra_context={"path": "/"}, | extra_context={"path": "/"}, | ||||
)[0] | )[0] | ||||
assert swhid["swhid_with_context"] is None | assert swhid["swhid_with_context"] is None | ||||
dir_content = archive_data.directory_ls(directory) | dir_content = archive_data.directory_ls(directory_with_subdirs) | ||||
dir_subdirs = [e for e in dir_content if e["type"] == "dir"] | dir_subdirs = [e for e in dir_content if e["type"] == "dir"] | ||||
dir_subdir = random.choice(dir_subdirs) | dir_subdir = random.choice(dir_subdirs) | ||||
dir_subdir_path = f'/{dir_subdir["name"]}/' | dir_subdir_path = f'/{dir_subdir["name"]}/' | ||||
dir_subdir_content = archive_data.directory_ls(dir_subdir["target"]) | dir_subdir_content = archive_data.directory_ls(dir_subdir["target"]) | ||||
dir_subdir_files = [e for e in dir_subdir_content if e["type"] == "file"] | dir_subdir_files = [e for e in dir_subdir_content if e["type"] == "file"] | ||||
swh_objects_info = [ | swh_objects_info = [ | ||||
SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=dir_subdir["target"]) | SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=dir_subdir["target"]) | ||||
] | ] | ||||
extra_context = { | extra_context = { | ||||
"root_directory": directory, | "root_directory": directory_with_subdirs, | ||||
"path": dir_subdir_path, | "path": dir_subdir_path, | ||||
} | } | ||||
if dir_subdir_files: | if dir_subdir_files: | ||||
dir_subdir_file = random.choice(dir_subdir_files) | dir_subdir_file = random.choice(dir_subdir_files) | ||||
extra_context["filename"] = dir_subdir_file["name"] | extra_context["filename"] = dir_subdir_file["name"] | ||||
swh_objects_info.append( | swh_objects_info.append( | ||||
SWHObjectInfo( | SWHObjectInfo( | ||||
object_type=ObjectType.CONTENT, | object_type=ObjectType.CONTENT, | ||||
object_id=dir_subdir_file["checksums"]["sha1_git"], | object_id=dir_subdir_file["checksums"]["sha1_git"], | ||||
) | ) | ||||
) | ) | ||||
swhids = get_swhids_info( | swhids = get_swhids_info( | ||||
swh_objects_info, snapshot_context=None, extra_context=extra_context, | swh_objects_info, snapshot_context=None, extra_context=extra_context, | ||||
) | ) | ||||
swhid_lower = swhids[0]["swhid_with_context"] | swhid_lower = swhids[0]["swhid_with_context"] | ||||
swhid_upper = swhid_lower.replace(swhids[0]["swhid"], swhids[0]["swhid"].upper()) | swhid_upper = swhid_lower.replace(swhids[0]["swhid"], swhids[0]["swhid"].upper()) | ||||
for swhid in (swhid_lower, swhid_upper): | for swhid in (swhid_lower, swhid_upper): | ||||
swhid_dir_parsed = get_swhid(swhid) | swhid_dir_parsed = get_swhid(swhid) | ||||
anchor = gen_swhid(ObjectType.DIRECTORY, directory) | anchor = gen_swhid(ObjectType.DIRECTORY, directory_with_subdirs) | ||||
assert swhid_dir_parsed.qualifiers() == { | assert swhid_dir_parsed.qualifiers() == { | ||||
"anchor": anchor, | "anchor": anchor, | ||||
"path": dir_subdir_path, | "path": dir_subdir_path, | ||||
} | } | ||||
if dir_subdir_files: | if dir_subdir_files: | ||||
swhid_cnt_parsed = get_swhid(swhids[1]["swhid_with_context"]) | swhid_cnt_parsed = get_swhid(swhids[1]["swhid_with_context"]) | ||||
▲ Show 20 Lines • Show All 220 Lines • ▼ Show 20 Lines | for visit in visits: | ||||
assert swhid_dir_parsed.qualifiers() == expected_dir_context | assert swhid_dir_parsed.qualifiers() == expected_dir_context | ||||
assert swhid_rev_parsed.qualifiers() == expected_rev_context | assert swhid_rev_parsed.qualifiers() == expected_rev_context | ||||
assert swhid_snp_parsed.qualifiers() == expected_snp_context | assert swhid_snp_parsed.qualifiers() == expected_snp_context | ||||
if "release_name" in snp_ctx_params: | if "release_name" in snp_ctx_params: | ||||
assert swhid_rel_parsed.qualifiers() == expected_rev_context | assert swhid_rel_parsed.qualifiers() == expected_rev_context | ||||
@given(origin(), directory()) | @given(origin()) | ||||
def test_get_swhids_info_characters_and_url_escaping(archive_data, origin, directory): | def test_get_swhids_info_characters_and_url_escaping(archive_data, directory, origin): | ||||
snapshot_context = get_snapshot_context(origin_url=origin["url"]) | snapshot_context = get_snapshot_context(origin_url=origin["url"]) | ||||
snapshot_context["origin_info"]["url"] = "http://example.org/?project=abc;def%" | snapshot_context["origin_info"]["url"] = "http://example.org/?project=abc;def%" | ||||
path = "/foo;/bar%" | path = "/foo;/bar%" | ||||
swhid_info = get_swhids_info( | swhid_info = get_swhids_info( | ||||
[SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory)], | [SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory)], | ||||
snapshot_context=snapshot_context, | snapshot_context=snapshot_context, | ||||
extra_context={"path": path}, | extra_context={"path": path}, | ||||
▲ Show 20 Lines • Show All 175 Lines • ▼ Show 20 Lines | for obj_swhid in (obj_swhid_lower, obj_swhid_upper): | ||||
lines_number = lines.split("-") | lines_number = lines.split("-") | ||||
expected_url += f"#L{lines_number[0]}" | expected_url += f"#L{lines_number[0]}" | ||||
if len(lines_number) > 1: | if len(lines_number) > 1: | ||||
expected_url += f"-L{lines_number[1]}" | expected_url += f"-L{lines_number[1]}" | ||||
assert obj_swhid_resolved["browse_url"] == expected_url | assert obj_swhid_resolved["browse_url"] == expected_url | ||||
@given(directory()) | |||||
def test_resolve_swhid_with_escaped_chars(archive_data, directory): | def test_resolve_swhid_with_escaped_chars(archive_data, directory): | ||||
origin_url = "http://example.org/?project=abc;" | origin_url = "http://example.org/?project=abc;" | ||||
archive_data.origin_add([Origin(url=origin_url)]) | archive_data.origin_add([Origin(url=origin_url)]) | ||||
origin_swhid_escaped = quote(origin_url, safe="/?:@&") | origin_swhid_escaped = quote(origin_url, safe="/?:@&") | ||||
origin_swhid_url_escaped = quote(origin_url, safe="/:@;") | origin_swhid_url_escaped = quote(origin_url, safe="/:@;") | ||||
swhid = gen_swhid( | swhid = gen_swhid( | ||||
ObjectType.DIRECTORY, directory, metadata={"origin": origin_swhid_escaped} | ObjectType.DIRECTORY, directory, metadata={"origin": origin_swhid_escaped} | ||||
) | ) | ||||
resolved_swhid = resolve_swhid(swhid) | resolved_swhid = resolve_swhid(swhid) | ||||
assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped | assert resolved_swhid["swhid_parsed"].origin == origin_swhid_escaped | ||||
assert origin_swhid_url_escaped in resolved_swhid["browse_url"] | assert origin_swhid_url_escaped in resolved_swhid["browse_url"] | ||||
@given(directory_with_subdirs()) | def test_resolve_directory_swhid_path_without_trailing_slash( | ||||
def test_resolve_directory_swhid_path_without_trailing_slash(archive_data, directory): | archive_data, directory_with_subdirs | ||||
dir_content = archive_data.directory_ls(directory) | ): | ||||
dir_content = archive_data.directory_ls(directory_with_subdirs) | |||||
dir_subdirs = [e for e in dir_content if e["type"] == "dir"] | dir_subdirs = [e for e in dir_content if e["type"] == "dir"] | ||||
dir_subdir = random.choice(dir_subdirs) | dir_subdir = random.choice(dir_subdirs) | ||||
dir_subdir_path = dir_subdir["name"] | dir_subdir_path = dir_subdir["name"] | ||||
anchor = gen_swhid(ObjectType.DIRECTORY, directory) | anchor = gen_swhid(ObjectType.DIRECTORY, directory_with_subdirs) | ||||
swhid = gen_swhid( | swhid = gen_swhid( | ||||
ObjectType.DIRECTORY, | ObjectType.DIRECTORY, | ||||
dir_subdir["target"], | dir_subdir["target"], | ||||
metadata={"anchor": anchor, "path": "/" + dir_subdir_path}, | metadata={"anchor": anchor, "path": "/" + dir_subdir_path}, | ||||
) | ) | ||||
resolved_swhid = resolve_swhid(swhid) | resolved_swhid = resolve_swhid(swhid) | ||||
browse_url = reverse( | browse_url = reverse( | ||||
"browse-directory", | "browse-directory", | ||||
url_args={"sha1_git": directory}, | url_args={"sha1_git": directory_with_subdirs}, | ||||
query_params={"path": dir_subdir_path}, | query_params={"path": dir_subdir_path}, | ||||
) | ) | ||||
assert resolved_swhid["browse_url"] == browse_url | assert resolved_swhid["browse_url"] == browse_url | ||||
@given(directory()) | |||||
def test_resolve_swhid_with_malformed_origin_url(archive_data, directory): | def test_resolve_swhid_with_malformed_origin_url(archive_data, directory): | ||||
origin_url = "http://example.org/project/abc" | origin_url = "http://example.org/project/abc" | ||||
malformed_origin_url = "http:/example.org/project/abc" | malformed_origin_url = "http:/example.org/project/abc" | ||||
archive_data.origin_add([Origin(url=origin_url)]) | archive_data.origin_add([Origin(url=origin_url)]) | ||||
swhid = gen_swhid( | swhid = gen_swhid( | ||||
ObjectType.DIRECTORY, directory, metadata={"origin": malformed_origin_url} | ObjectType.DIRECTORY, directory, metadata={"origin": malformed_origin_url} | ||||
) | ) | ||||
resolved_swhid = resolve_swhid(swhid) | resolved_swhid = resolve_swhid(swhid) | ||||
Show All 32 Lines | browse_url = reverse( | ||||
query_params={"path": dir_entry["name"]}, | query_params={"path": dir_entry["name"]}, | ||||
) | ) | ||||
resolved_swhid = resolve_swhid(swhid) | resolved_swhid = resolve_swhid(swhid) | ||||
assert resolved_swhid["browse_url"] == browse_url | assert resolved_swhid["browse_url"] == browse_url | ||||
@given(directory_with_subdirs()) | def test_resolve_dir_entry_swhid_with_anchor_directory( | ||||
def test_resolve_dir_entry_swhid_with_anchor_directory(archive_data, directory): | archive_data, directory_with_subdirs | ||||
dir_content = archive_data.directory_ls(directory) | ): | ||||
dir_content = archive_data.directory_ls(directory_with_subdirs) | |||||
dir_entry = random.choice( | dir_entry = random.choice( | ||||
[entry for entry in dir_content if entry["type"] == "dir"] | [entry for entry in dir_content if entry["type"] == "dir"] | ||||
) | ) | ||||
dir_swhid = gen_swhid(ObjectType.DIRECTORY, directory) | dir_swhid = gen_swhid(ObjectType.DIRECTORY, directory_with_subdirs) | ||||
swhid = gen_swhid( | swhid = gen_swhid( | ||||
ObjectType.DIRECTORY, | ObjectType.DIRECTORY, | ||||
dir_entry["target"], | dir_entry["target"], | ||||
metadata={"anchor": dir_swhid, "path": f"/{dir_entry['name']}/"}, | metadata={"anchor": dir_swhid, "path": f"/{dir_entry['name']}/"}, | ||||
) | ) | ||||
browse_url = reverse( | browse_url = reverse( | ||||
"browse-directory", | "browse-directory", | ||||
url_args={"sha1_git": directory}, | url_args={"sha1_git": directory_with_subdirs}, | ||||
query_params={"path": f"{dir_entry['name']}"}, | query_params={"path": f"{dir_entry['name']}"}, | ||||
) | ) | ||||
resolved_swhid = resolve_swhid(swhid) | resolved_swhid = resolve_swhid(swhid) | ||||
assert resolved_swhid["browse_url"] == browse_url | assert resolved_swhid["browse_url"] == browse_url | ||||
@given(directory_with_files()) | def test_resolve_file_entry_swhid_with_anchor_directory( | ||||
def test_resolve_file_entry_swhid_with_anchor_directory(archive_data, directory): | archive_data, directory_with_files | ||||
dir_content = archive_data.directory_ls(directory) | ): | ||||
dir_content = archive_data.directory_ls(directory_with_files) | |||||
file_entry = random.choice( | file_entry = random.choice( | ||||
[entry for entry in dir_content if entry["type"] == "file"] | [entry for entry in dir_content if entry["type"] == "file"] | ||||
) | ) | ||||
dir_swhid = gen_swhid(ObjectType.DIRECTORY, directory) | dir_swhid = gen_swhid(ObjectType.DIRECTORY, directory_with_files) | ||||
sha1_git = file_entry["checksums"]["sha1_git"] | sha1_git = file_entry["checksums"]["sha1_git"] | ||||
swhid = gen_swhid( | swhid = gen_swhid( | ||||
ObjectType.CONTENT, | ObjectType.CONTENT, | ||||
sha1_git, | sha1_git, | ||||
metadata={"anchor": dir_swhid, "path": f"/{file_entry['name']}"}, | metadata={"anchor": dir_swhid, "path": f"/{file_entry['name']}"}, | ||||
) | ) | ||||
browse_url = reverse( | browse_url = reverse( | ||||
"browse-content", | "browse-content", | ||||
url_args={"query_string": f"sha1_git:{sha1_git}"}, | url_args={"query_string": f"sha1_git:{sha1_git}"}, | ||||
query_params={"path": f"{directory}/{file_entry['name']}"}, | query_params={"path": f"{directory_with_files}/{file_entry['name']}"}, | ||||
) | ) | ||||
resolved_swhid = resolve_swhid(swhid) | resolved_swhid = resolve_swhid(swhid) | ||||
assert resolved_swhid["browse_url"] == browse_url | assert resolved_swhid["browse_url"] == browse_url |