Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_archive.py
Show All 24 Lines | |||||
) | ) | ||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | ||||
from swh.web.common import archive | from swh.web.common import archive | ||||
from swh.web.common.exc import BadInputExc, NotFoundExc | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.common.typing import OriginInfo, PagedResult | from swh.web.common.typing import OriginInfo, PagedResult | ||||
from swh.web.tests.conftest import ctags_json_missing, fossology_missing | from swh.web.tests.conftest import ctags_json_missing, fossology_missing | ||||
from swh.web.tests.data import random_content, random_sha1 | from swh.web.tests.data import random_content, random_sha1 | ||||
from swh.web.tests.strategies import ( | from swh.web.tests.strategies import ( | ||||
ancestor_revisions, | |||||
invalid_sha1, | invalid_sha1, | ||||
new_origin, | new_origin, | ||||
new_revision, | new_revision, | ||||
non_ancestor_revisions, | |||||
revision, | |||||
revision_with_submodules, | |||||
revisions, | |||||
sha256, | sha256, | ||||
snapshot, | snapshot, | ||||
unknown_content, | unknown_content, | ||||
unknown_contents, | unknown_contents, | ||||
unknown_directory, | unknown_directory, | ||||
unknown_release, | unknown_release, | ||||
unknown_revision, | unknown_revision, | ||||
unknown_snapshot, | unknown_snapshot, | ||||
▲ Show 20 Lines • Show All 237 Lines • ▼ Show 20 Lines | |||||
def test_lookup_release(archive_data, release): | def test_lookup_release(archive_data, release): | ||||
actual_release = archive.lookup_release(release) | actual_release = archive.lookup_release(release) | ||||
assert actual_release == archive_data.release_get(release) | assert actual_release == archive_data.release_get(release) | ||||
@given(revision(), invalid_sha1(), sha256()) | @given(invalid_sha1(), sha256()) | ||||
def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): | def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): | ||||
sha1_git_root = revision | sha1_git_root = revision | ||||
sha1_git = invalid_sha1 | sha1_git = invalid_sha1 | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
archive.lookup_revision_with_context(sha1_git_root, sha1_git) | archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
assert e.match("Invalid checksum query string") | assert e.match("Invalid checksum query string") | ||||
sha1_git = sha256 | sha1_git = sha256 | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
archive.lookup_revision_with_context(sha1_git_root, sha1_git) | archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
assert e.match("Only sha1_git is supported") | assert e.match("Only sha1_git is supported") | ||||
@given(revision(), unknown_revision()) | @given(unknown_revision()) | ||||
def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( | def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( | ||||
revision, unknown_revision | revision, unknown_revision | ||||
): | ): | ||||
sha1_git_root = revision | sha1_git_root = revision | ||||
sha1_git = unknown_revision | sha1_git = unknown_revision | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_revision_with_context(sha1_git_root, sha1_git) | archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
assert e.match("Revision %s not found" % sha1_git) | assert e.match("Revision %s not found" % sha1_git) | ||||
@given(revision(), unknown_revision()) | @given(unknown_revision()) | ||||
def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( | def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( | ||||
revision, unknown_revision | revision, unknown_revision | ||||
): | ): | ||||
sha1_git_root = unknown_revision | sha1_git_root = unknown_revision | ||||
sha1_git = revision | sha1_git = revision | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_revision_with_context(sha1_git_root, sha1_git) | archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
assert e.match("Revision root %s not found" % sha1_git_root) | assert e.match("Revision root %s not found" % sha1_git_root) | ||||
@given(ancestor_revisions()) | |||||
def test_lookup_revision_with_context(archive_data, ancestor_revisions): | def test_lookup_revision_with_context(archive_data, ancestor_revisions): | ||||
sha1_git = ancestor_revisions["sha1_git"] | sha1_git = ancestor_revisions["sha1_git"] | ||||
root_sha1_git = ancestor_revisions["sha1_git_root"] | root_sha1_git = ancestor_revisions["sha1_git_root"] | ||||
for sha1_git_root in (root_sha1_git, {"id": hash_to_bytes(root_sha1_git)}): | for sha1_git_root in (root_sha1_git, {"id": hash_to_bytes(root_sha1_git)}): | ||||
actual_revision = archive.lookup_revision_with_context(sha1_git_root, sha1_git) | actual_revision = archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
children = [] | children = [] | ||||
for rev in archive_data.revision_log(root_sha1_git): | for rev in archive_data.revision_log(root_sha1_git): | ||||
for p_rev in rev["parents"]: | for p_rev in rev["parents"]: | ||||
p_rev_hex = hash_to_hex(p_rev) | p_rev_hex = hash_to_hex(p_rev) | ||||
if p_rev_hex == sha1_git: | if p_rev_hex == sha1_git: | ||||
children.append(rev["id"]) | children.append(rev["id"]) | ||||
expected_revision = archive_data.revision_get(sha1_git) | expected_revision = archive_data.revision_get(sha1_git) | ||||
expected_revision["children"] = children | expected_revision["children"] = children | ||||
assert actual_revision == expected_revision | assert actual_revision == expected_revision | ||||
@given(non_ancestor_revisions()) | |||||
def test_lookup_revision_with_context_ko(non_ancestor_revisions): | def test_lookup_revision_with_context_ko(non_ancestor_revisions): | ||||
sha1_git = non_ancestor_revisions["sha1_git"] | sha1_git = non_ancestor_revisions["sha1_git"] | ||||
root_sha1_git = non_ancestor_revisions["sha1_git_root"] | root_sha1_git = non_ancestor_revisions["sha1_git_root"] | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_revision_with_context(root_sha1_git, sha1_git) | archive.lookup_revision_with_context(root_sha1_git, sha1_git) | ||||
assert e.match("Revision %s is not an ancestor of %s" % (sha1_git, root_sha1_git)) | assert e.match("Revision %s is not an ancestor of %s" % (sha1_git, root_sha1_git)) | ||||
Show All 35 Lines | def test_lookup_directory_with_revision_unknown_content(archive_data, new_revision): | ||||
archive_data.directory_add([dir]) | archive_data.directory_add([dir]) | ||||
archive_data.revision_add([new_revision]) | archive_data.revision_add([new_revision]) | ||||
new_revision_id = hash_to_hex(new_revision.id) | new_revision_id = hash_to_hex(new_revision.id) | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_directory_with_revision(new_revision_id, dir_path) | archive.lookup_directory_with_revision(new_revision_id, dir_path) | ||||
assert e.match("Content not found for revision %s" % new_revision_id) | assert e.match("Content not found for revision %s" % new_revision_id) | ||||
@given(revision()) | |||||
def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): | def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): | ||||
invalid_path = "path/to/something/unknown" | invalid_path = "path/to/something/unknown" | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_directory_with_revision(revision, invalid_path) | archive.lookup_directory_with_revision(revision, invalid_path) | ||||
assert e.match("Directory or File") | assert e.match("Directory or File") | ||||
assert e.match(invalid_path) | assert e.match(invalid_path) | ||||
assert e.match("revision %s" % revision) | assert e.match("revision %s" % revision) | ||||
assert e.match("not found") | assert e.match("not found") | ||||
@given(revision_with_submodules()) | |||||
def test_lookup_directory_with_revision_submodules( | def test_lookup_directory_with_revision_submodules( | ||||
archive_data, revision_with_submodules | archive_data, revision_with_submodules | ||||
): | ): | ||||
rev_sha1_git = revision_with_submodules["rev_sha1_git"] | rev_sha1_git = revision_with_submodules["rev_sha1_git"] | ||||
rev_dir_path = revision_with_submodules["rev_dir_rev_path"] | rev_dir_path = revision_with_submodules["rev_dir_rev_path"] | ||||
actual_data = archive.lookup_directory_with_revision(rev_sha1_git, rev_dir_path) | actual_data = archive.lookup_directory_with_revision(rev_sha1_git, rev_dir_path) | ||||
revision = archive_data.revision_get(revision_with_submodules["rev_sha1_git"]) | revision = archive_data.revision_get(revision_with_submodules["rev_sha1_git"]) | ||||
directory = archive_data.directory_ls(revision["directory"]) | directory = archive_data.directory_ls(revision["directory"]) | ||||
rev_entry = next(e for e in directory if e["name"] == rev_dir_path) | rev_entry = next(e for e in directory if e["name"] == rev_dir_path) | ||||
expected_data = { | expected_data = { | ||||
"content": archive_data.revision_get(rev_entry["target"]), | "content": archive_data.revision_get(rev_entry["target"]), | ||||
"path": rev_dir_path, | "path": rev_dir_path, | ||||
"revision": rev_sha1_git, | "revision": rev_sha1_git, | ||||
"type": "rev", | "type": "rev", | ||||
} | } | ||||
assert actual_data == expected_data | assert actual_data == expected_data | ||||
@given(revision()) | |||||
def test_lookup_directory_with_revision_without_path(archive_data, revision): | def test_lookup_directory_with_revision_without_path(archive_data, revision): | ||||
actual_directory_entries = archive.lookup_directory_with_revision(revision) | actual_directory_entries = archive.lookup_directory_with_revision(revision) | ||||
revision_data = archive_data.revision_get(revision) | revision_data = archive_data.revision_get(revision) | ||||
expected_directory_entries = archive_data.directory_ls(revision_data["directory"]) | expected_directory_entries = archive_data.directory_ls(revision_data["directory"]) | ||||
assert actual_directory_entries["type"] == "dir" | assert actual_directory_entries["type"] == "dir" | ||||
assert actual_directory_entries["content"] == expected_directory_entries | assert actual_directory_entries["content"] == expected_directory_entries | ||||
@given(revision()) | |||||
def test_lookup_directory_with_revision_with_path(archive_data, revision): | def test_lookup_directory_with_revision_with_path(archive_data, revision): | ||||
rev_data = archive_data.revision_get(revision) | rev_data = archive_data.revision_get(revision) | ||||
dir_entries = [ | dir_entries = [ | ||||
e | e | ||||
for e in archive_data.directory_ls(rev_data["directory"]) | for e in archive_data.directory_ls(rev_data["directory"]) | ||||
if e["type"] in ("file", "dir") | if e["type"] in ("file", "dir") | ||||
] | ] | ||||
expected_dir_entry = random.choice(dir_entries) | expected_dir_entry = random.choice(dir_entries) | ||||
Show All 9 Lines | if actual_dir_entry["type"] == "file": | ||||
del actual_dir_entry["content"]["checksums"]["blake2s256"] | del actual_dir_entry["content"]["checksums"]["blake2s256"] | ||||
for key in ("checksums", "status", "length"): | for key in ("checksums", "status", "length"): | ||||
assert actual_dir_entry["content"][key] == expected_dir_entry[key] | assert actual_dir_entry["content"][key] == expected_dir_entry[key] | ||||
else: | else: | ||||
sub_dir_entries = archive_data.directory_ls(expected_dir_entry["target"]) | sub_dir_entries = archive_data.directory_ls(expected_dir_entry["target"]) | ||||
assert actual_dir_entry["content"] == sub_dir_entries | assert actual_dir_entry["content"] == sub_dir_entries | ||||
@given(revision()) | |||||
def test_lookup_directory_with_revision_with_path_to_file_and_data( | def test_lookup_directory_with_revision_with_path_to_file_and_data( | ||||
archive_data, revision | archive_data, revision | ||||
): | ): | ||||
rev_data = archive_data.revision_get(revision) | rev_data = archive_data.revision_get(revision) | ||||
dir_entries = [ | dir_entries = [ | ||||
e | e | ||||
for e in archive_data.directory_ls(rev_data["directory"]) | for e in archive_data.directory_ls(rev_data["directory"]) | ||||
if e["type"] == "file" | if e["type"] == "file" | ||||
Show All 11 Lines | ): | ||||
assert actual_dir_entry["revision"] == revision | assert actual_dir_entry["revision"] == revision | ||||
assert actual_dir_entry["path"] == expected_dir_entry["name"] | assert actual_dir_entry["path"] == expected_dir_entry["name"] | ||||
del actual_dir_entry["content"]["checksums"]["blake2s256"] | del actual_dir_entry["content"]["checksums"]["blake2s256"] | ||||
for key in ("checksums", "status", "length"): | for key in ("checksums", "status", "length"): | ||||
assert actual_dir_entry["content"][key] == expected_dir_entry[key] | assert actual_dir_entry["content"][key] == expected_dir_entry[key] | ||||
assert actual_dir_entry["content"]["data"] == expected_data["data"] | assert actual_dir_entry["content"]["data"] == expected_data["data"] | ||||
@given(revision()) | |||||
def test_lookup_revision(archive_data, revision): | def test_lookup_revision(archive_data, revision): | ||||
actual_revision = archive.lookup_revision(revision) | actual_revision = archive.lookup_revision(revision) | ||||
assert actual_revision == archive_data.revision_get(revision) | assert actual_revision == archive_data.revision_get(revision) | ||||
@given(new_revision()) | @given(new_revision()) | ||||
def test_lookup_revision_invalid_msg(archive_data, new_revision): | def test_lookup_revision_invalid_msg(archive_data, new_revision): | ||||
new_revision = new_revision.to_dict() | new_revision = new_revision.to_dict() | ||||
Show All 18 Lines | def test_lookup_revision_msg_no_rev(): | ||||
unknown_revision_ = random_sha1() | unknown_revision_ = random_sha1() | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_revision_message(unknown_revision_) | archive.lookup_revision_message(unknown_revision_) | ||||
assert e.match("Revision with sha1_git %s not found." % unknown_revision_) | assert e.match("Revision with sha1_git %s not found." % unknown_revision_) | ||||
@given(revisions()) | |||||
def test_lookup_revision_multiple(archive_data, revisions): | def test_lookup_revision_multiple(archive_data, revisions): | ||||
actual_revisions = list(archive.lookup_revision_multiple(revisions)) | actual_revisions = list(archive.lookup_revision_multiple(revisions)) | ||||
expected_revisions = [] | expected_revisions = [] | ||||
for rev in revisions: | for rev in revisions: | ||||
expected_revisions.append(archive_data.revision_get(rev)) | expected_revisions.append(archive_data.revision_get(rev)) | ||||
assert actual_revisions == expected_revisions | assert actual_revisions == expected_revisions | ||||
def test_lookup_revision_multiple_none_found(): | def test_lookup_revision_multiple_none_found(): | ||||
unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] | unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] | ||||
actual_revisions = list(archive.lookup_revision_multiple(unknown_revisions_)) | actual_revisions = list(archive.lookup_revision_multiple(unknown_revisions_)) | ||||
assert actual_revisions == [None] * len(unknown_revisions_) | assert actual_revisions == [None] * len(unknown_revisions_) | ||||
@given(revision()) | |||||
def test_lookup_revision_log(archive_data, revision): | def test_lookup_revision_log(archive_data, revision): | ||||
actual_revision_log = list(archive.lookup_revision_log(revision, limit=25)) | actual_revision_log = list(archive.lookup_revision_log(revision, limit=25)) | ||||
expected_revision_log = archive_data.revision_log(revision, limit=25) | expected_revision_log = archive_data.revision_log(revision, limit=25) | ||||
assert actual_revision_log == expected_revision_log | assert actual_revision_log == expected_revision_log | ||||
def _get_origin_branches(archive_data, origin): | def _get_origin_branches(archive_data, origin): | ||||
▲ Show 20 Lines • Show All 119 Lines • ▼ Show 20 Lines | def test_lookup_revision_by(archive_data, origin): | ||||
actual_revision = archive.lookup_revision_by(origin["url"], branch_name) | actual_revision = archive.lookup_revision_by(origin["url"], branch_name) | ||||
expected_revision = archive_data.revision_get(branches[branch_name]["target"]) | expected_revision = archive_data.revision_get(branches[branch_name]["target"]) | ||||
assert actual_revision == expected_revision | assert actual_revision == expected_revision | ||||
@given(revision()) | |||||
def test_lookup_revision_with_context_by_ko(origin, revision): | def test_lookup_revision_with_context_by_ko(origin, revision): | ||||
with pytest.raises(NotFoundExc): | with pytest.raises(NotFoundExc): | ||||
archive.lookup_revision_with_context_by( | archive.lookup_revision_with_context_by( | ||||
origin["url"], "invalid-branch-name", None, revision | origin["url"], "invalid-branch-name", None, revision | ||||
) | ) | ||||
def test_lookup_revision_with_context_by(archive_data, origin): | def test_lookup_revision_with_context_by(archive_data, origin): | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | def test_lookup_revision_through_with_revision_by(archive_data, origin): | ||||
branches = _get_origin_branches(archive_data, origin) | branches = _get_origin_branches(archive_data, origin) | ||||
branch_name = random.choice(list(branches.keys())) | branch_name = random.choice(list(branches.keys())) | ||||
assert archive.lookup_revision_through( | assert archive.lookup_revision_through( | ||||
{"origin_url": origin["url"], "branch_name": branch_name, "ts": None,} | {"origin_url": origin["url"], "branch_name": branch_name, "ts": None,} | ||||
) == archive.lookup_revision_by(origin["url"], branch_name, None) | ) == archive.lookup_revision_by(origin["url"], branch_name, None) | ||||
@given(ancestor_revisions()) | |||||
def test_lookup_revision_through_with_context(ancestor_revisions): | def test_lookup_revision_through_with_context(ancestor_revisions): | ||||
sha1_git = ancestor_revisions["sha1_git"] | sha1_git = ancestor_revisions["sha1_git"] | ||||
sha1_git_root = ancestor_revisions["sha1_git_root"] | sha1_git_root = ancestor_revisions["sha1_git_root"] | ||||
assert archive.lookup_revision_through( | assert archive.lookup_revision_through( | ||||
{"sha1_git_root": sha1_git_root, "sha1_git": sha1_git,} | {"sha1_git_root": sha1_git_root, "sha1_git": sha1_git,} | ||||
) == archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ) == archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
@given(revision()) | |||||
def test_lookup_revision_through_with_revision(revision): | def test_lookup_revision_through_with_revision(revision): | ||||
assert archive.lookup_revision_through( | assert archive.lookup_revision_through( | ||||
{"sha1_git": revision} | {"sha1_git": revision} | ||||
) == archive.lookup_revision(revision) | ) == archive.lookup_revision(revision) | ||||
@given(revision()) | |||||
def test_lookup_directory_through_revision_ko_not_found(revision): | def test_lookup_directory_through_revision_ko_not_found(revision): | ||||
with pytest.raises(NotFoundExc): | with pytest.raises(NotFoundExc): | ||||
archive.lookup_directory_through_revision( | archive.lookup_directory_through_revision( | ||||
{"sha1_git": revision}, "some/invalid/path" | {"sha1_git": revision}, "some/invalid/path" | ||||
) | ) | ||||
@given(revision()) | |||||
def test_lookup_directory_through_revision_ok(archive_data, revision): | def test_lookup_directory_through_revision_ok(archive_data, revision): | ||||
rev_data = archive_data.revision_get(revision) | rev_data = archive_data.revision_get(revision) | ||||
dir_entries = [ | dir_entries = [ | ||||
e | e | ||||
for e in archive_data.directory_ls(rev_data["directory"]) | for e in archive_data.directory_ls(rev_data["directory"]) | ||||
if e["type"] == "file" | if e["type"] == "file" | ||||
] | ] | ||||
dir_entry = random.choice(dir_entries) | dir_entry = random.choice(dir_entries) | ||||
assert archive.lookup_directory_through_revision( | assert archive.lookup_directory_through_revision( | ||||
{"sha1_git": revision}, dir_entry["name"] | {"sha1_git": revision}, dir_entry["name"] | ||||
) == (revision, archive.lookup_directory_with_revision(revision, dir_entry["name"])) | ) == (revision, archive.lookup_directory_with_revision(revision, dir_entry["name"])) | ||||
@given(revision()) | |||||
def test_lookup_directory_through_revision_ok_with_data(archive_data, revision): | def test_lookup_directory_through_revision_ok_with_data(archive_data, revision): | ||||
rev_data = archive_data.revision_get(revision) | rev_data = archive_data.revision_get(revision) | ||||
dir_entries = [ | dir_entries = [ | ||||
e | e | ||||
for e in archive_data.directory_ls(rev_data["directory"]) | for e in archive_data.directory_ls(rev_data["directory"]) | ||||
if e["type"] == "file" | if e["type"] == "file" | ||||
] | ] | ||||
dir_entry = random.choice(dir_entries) | dir_entry = random.choice(dir_entries) | ||||
assert archive.lookup_directory_through_revision( | assert archive.lookup_directory_through_revision( | ||||
{"sha1_git": revision}, dir_entry["name"], with_data=True | {"sha1_git": revision}, dir_entry["name"], with_data=True | ||||
) == ( | ) == ( | ||||
revision, | revision, | ||||
archive.lookup_directory_with_revision( | archive.lookup_directory_with_revision( | ||||
revision, dir_entry["name"], with_data=True | revision, dir_entry["name"], with_data=True | ||||
), | ), | ||||
) | ) | ||||
@given(revision(), snapshot()) | @given(snapshot()) | ||||
def test_lookup_known_objects( | def test_lookup_known_objects( | ||||
archive_data, content, directory, release, revision, snapshot | archive_data, content, directory, release, revision, snapshot | ||||
): | ): | ||||
expected = archive_data.content_find(content) | expected = archive_data.content_find(content) | ||||
assert archive.lookup_object(ObjectType.CONTENT, content["sha1_git"]) == expected | assert archive.lookup_object(ObjectType.CONTENT, content["sha1_git"]) == expected | ||||
expected = archive_data.directory_get(directory) | expected = archive_data.directory_get(directory) | ||||
assert archive.lookup_object(ObjectType.DIRECTORY, directory) == expected | assert archive.lookup_object(ObjectType.DIRECTORY, directory) == expected | ||||
▲ Show 20 Lines • Show All 188 Lines • ▼ Show 20 Lines | def test_lookup_snapshot_sizes(archive_data, snapshot): | ||||
for branch_name, branch_info in branches.items(): | for branch_name, branch_info in branches.items(): | ||||
if branch_info is not None: | if branch_info is not None: | ||||
expected_sizes[branch_info["target_type"]] += 1 | expected_sizes[branch_info["target_type"]] += 1 | ||||
assert archive.lookup_snapshot_sizes(snapshot) == expected_sizes | assert archive.lookup_snapshot_sizes(snapshot) == expected_sizes | ||||
@given(revision()) | |||||
def test_lookup_snapshot_sizes_with_filtering(archive_data, revision): | def test_lookup_snapshot_sizes_with_filtering(archive_data, revision): | ||||
rev_id = hash_to_bytes(revision) | rev_id = hash_to_bytes(revision) | ||||
snapshot = Snapshot( | snapshot = Snapshot( | ||||
branches={ | branches={ | ||||
b"refs/heads/master": SnapshotBranch( | b"refs/heads/master": SnapshotBranch( | ||||
target=rev_id, target_type=TargetType.REVISION, | target=rev_id, target_type=TargetType.REVISION, | ||||
), | ), | ||||
b"refs/heads/incoming": SnapshotBranch( | b"refs/heads/incoming": SnapshotBranch( | ||||
Show All 22 Lines | |||||
@given(snapshot()) | @given(snapshot()) | ||||
def test_lookup_snapshot_alias(snapshot): | def test_lookup_snapshot_alias(snapshot): | ||||
resolved_alias = archive.lookup_snapshot_alias(snapshot, "HEAD") | resolved_alias = archive.lookup_snapshot_alias(snapshot, "HEAD") | ||||
assert resolved_alias is not None | assert resolved_alias is not None | ||||
assert resolved_alias["target_type"] == "revision" | assert resolved_alias["target_type"] == "revision" | ||||
assert resolved_alias["target"] is not None | assert resolved_alias["target"] is not None | ||||
@given(revision()) | |||||
def test_lookup_snapshot_missing(revision): | def test_lookup_snapshot_missing(revision): | ||||
with pytest.raises(NotFoundExc): | with pytest.raises(NotFoundExc): | ||||
archive.lookup_snapshot(revision) | archive.lookup_snapshot(revision) | ||||
@given(revision()) | |||||
def test_lookup_snapshot_empty_branch_list(archive_data, revision): | def test_lookup_snapshot_empty_branch_list(archive_data, revision): | ||||
rev_id = hash_to_bytes(revision) | rev_id = hash_to_bytes(revision) | ||||
snapshot = Snapshot( | snapshot = Snapshot( | ||||
branches={ | branches={ | ||||
b"refs/heads/master": SnapshotBranch( | b"refs/heads/master": SnapshotBranch( | ||||
target=rev_id, target_type=TargetType.REVISION, | target=rev_id, target_type=TargetType.REVISION, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
archive_data.snapshot_add([snapshot]) | archive_data.snapshot_add([snapshot]) | ||||
# FIXME; This test will change once the inconsistency in storage is fixed | # FIXME; This test will change once the inconsistency in storage is fixed | ||||
# postgres backend returns None in case of a missing branch whereas the | # postgres backend returns None in case of a missing branch whereas the | ||||
# in-memory implementation (used in tests) returns a data structure; | # in-memory implementation (used in tests) returns a data structure; | ||||
# hence the inconsistency | # hence the inconsistency | ||||
branches = archive.lookup_snapshot( | branches = archive.lookup_snapshot( | ||||
hash_to_hex(snapshot.id), branch_name_include_substring="non-existing", | hash_to_hex(snapshot.id), branch_name_include_substring="non-existing", | ||||
)["branches"] | )["branches"] | ||||
assert not branches | assert not branches | ||||
@given(revision()) | |||||
def test_lookup_snapshot_branch_names_filtering(archive_data, revision): | def test_lookup_snapshot_branch_names_filtering(archive_data, revision): | ||||
rev_id = hash_to_bytes(revision) | rev_id = hash_to_bytes(revision) | ||||
snapshot = Snapshot( | snapshot = Snapshot( | ||||
branches={ | branches={ | ||||
b"refs/heads/master": SnapshotBranch( | b"refs/heads/master": SnapshotBranch( | ||||
target=rev_id, target_type=TargetType.REVISION, | target=rev_id, target_type=TargetType.REVISION, | ||||
), | ), | ||||
b"refs/heads/incoming": SnapshotBranch( | b"refs/heads/incoming": SnapshotBranch( | ||||
Show All 28 Lines | ): | ||||
assert len(branches) == nb_results | assert len(branches) == nb_results | ||||
for branch_name in branches: | for branch_name in branches: | ||||
if include_pattern: | if include_pattern: | ||||
assert include_pattern in branch_name | assert include_pattern in branch_name | ||||
if exclude_prefix: | if exclude_prefix: | ||||
assert not branch_name.startswith(exclude_prefix) | assert not branch_name.startswith(exclude_prefix) | ||||
@given(revision()) | |||||
def test_lookup_snapshot_branch_names_filtering_paginated( | def test_lookup_snapshot_branch_names_filtering_paginated( | ||||
archive_data, directory, revision | archive_data, directory, revision | ||||
): | ): | ||||
pattern = "foo" | pattern = "foo" | ||||
nb_branches_by_target_type = 10 | nb_branches_by_target_type = 10 | ||||
branches = {} | branches = {} | ||||
for i in range(nb_branches_by_target_type): | for i in range(nb_branches_by_target_type): | ||||
branches[f"branch/directory/bar{i}".encode()] = SnapshotBranch( | branches[f"branch/directory/bar{i}".encode()] = SnapshotBranch( | ||||
▲ Show 20 Lines • Show All 55 Lines • Show Last 20 Lines |