Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_archive.py
Show All 23 Lines | from swh.model.model import ( | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | ||||
from swh.web.common import archive | from swh.web.common import archive | ||||
from swh.web.common.exc import BadInputExc, NotFoundExc | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.common.typing import OriginInfo, PagedResult | from swh.web.common.typing import OriginInfo, PagedResult | ||||
from swh.web.tests.conftest import ctags_json_missing, fossology_missing | from swh.web.tests.conftest import ctags_json_missing, fossology_missing | ||||
from swh.web.tests.data import random_content, random_sha1 | from swh.web.tests.data import random_content, random_sha1 | ||||
from swh.web.tests.strategies import ( | from swh.web.tests.strategies import new_origin, new_revision, visit_dates | ||||
invalid_sha1, | |||||
new_origin, | |||||
new_revision, | |||||
sha256, | |||||
unknown_content, | |||||
unknown_contents, | |||||
unknown_directory, | |||||
unknown_release, | |||||
unknown_revision, | |||||
unknown_snapshot, | |||||
visit_dates, | |||||
) | |||||
def test_lookup_multiple_hashes_all_present(contents): | def test_lookup_multiple_hashes_all_present(contents): | ||||
input_data = [] | input_data = [] | ||||
expected_output = [] | expected_output = [] | ||||
for cnt in contents: | for cnt in contents: | ||||
input_data.append({"sha1": cnt["sha1"]}) | input_data.append({"sha1": cnt["sha1"]}) | ||||
expected_output.append({"sha1": cnt["sha1"], "found": True}) | expected_output.append({"sha1": cnt["sha1"], "found": True}) | ||||
assert archive.lookup_multiple_hashes(input_data) == expected_output | assert archive.lookup_multiple_hashes(input_data) == expected_output | ||||
@given(unknown_contents()) | |||||
def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): | def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): | ||||
input_contents = list(itertools.chain(contents, unknown_contents)) | input_contents = list(itertools.chain(contents, unknown_contents)) | ||||
random.shuffle(input_contents) | random.shuffle(input_contents) | ||||
input_data = [] | input_data = [] | ||||
expected_output = [] | expected_output = [] | ||||
for cnt in input_contents: | for cnt in input_contents: | ||||
input_data.append({"sha1": cnt["sha1"]}) | input_data.append({"sha1": cnt["sha1"]}) | ||||
▲ Show 20 Lines • Show All 158 Lines • ▼ Show 20 Lines | |||||
def test_lookup_origin(archive_data, new_origin): | def test_lookup_origin(archive_data, new_origin): | ||||
archive_data.origin_add([new_origin]) | archive_data.origin_add([new_origin]) | ||||
actual_origin = archive.lookup_origin({"url": new_origin.url}) | actual_origin = archive.lookup_origin({"url": new_origin.url}) | ||||
expected_origin = archive_data.origin_get([new_origin.url])[0] | expected_origin = archive_data.origin_get([new_origin.url])[0] | ||||
assert actual_origin == expected_origin | assert actual_origin == expected_origin | ||||
@given(invalid_sha1()) | |||||
def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): | def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
archive.lookup_release(invalid_sha1) | archive.lookup_release(invalid_sha1) | ||||
assert e.match("Invalid checksum") | assert e.match("Invalid checksum") | ||||
@given(sha256()) | |||||
def test_lookup_release_ko_id_checksum_too_long(sha256): | def test_lookup_release_ko_id_checksum_too_long(sha256): | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
archive.lookup_release(sha256) | archive.lookup_release(sha256) | ||||
assert e.match("Only sha1_git is supported.") | assert e.match("Only sha1_git is supported.") | ||||
def test_lookup_release_multiple(archive_data, releases): | def test_lookup_release_multiple(archive_data, releases): | ||||
actual_releases = list(archive.lookup_release_multiple(releases)) | actual_releases = list(archive.lookup_release_multiple(releases)) | ||||
Show All 32 Lines | |||||
def test_lookup_release(archive_data, release): | def test_lookup_release(archive_data, release): | ||||
actual_release = archive.lookup_release(release) | actual_release = archive.lookup_release(release) | ||||
assert actual_release == archive_data.release_get(release) | assert actual_release == archive_data.release_get(release) | ||||
@given(invalid_sha1(), sha256()) | |||||
def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): | def test_lookup_revision_with_context_ko_not_a_sha1(revision, invalid_sha1, sha256): | ||||
sha1_git_root = revision | sha1_git_root = revision | ||||
sha1_git = invalid_sha1 | sha1_git = invalid_sha1 | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
archive.lookup_revision_with_context(sha1_git_root, sha1_git) | archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
assert e.match("Invalid checksum query string") | assert e.match("Invalid checksum query string") | ||||
sha1_git = sha256 | sha1_git = sha256 | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
archive.lookup_revision_with_context(sha1_git_root, sha1_git) | archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
assert e.match("Only sha1_git is supported") | assert e.match("Only sha1_git is supported") | ||||
@given(unknown_revision()) | |||||
def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( | def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( | ||||
revision, unknown_revision | revision, unknown_revision | ||||
): | ): | ||||
sha1_git_root = revision | sha1_git_root = revision | ||||
sha1_git = unknown_revision | sha1_git = unknown_revision | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_revision_with_context(sha1_git_root, sha1_git) | archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
assert e.match("Revision %s not found" % sha1_git) | assert e.match("Revision %s not found" % sha1_git) | ||||
@given(unknown_revision()) | |||||
def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( | def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( | ||||
revision, unknown_revision | revision, unknown_revision | ||||
): | ): | ||||
sha1_git_root = unknown_revision | sha1_git_root = unknown_revision | ||||
sha1_git = revision | sha1_git = revision | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_revision_with_context(sha1_git_root, sha1_git) | archive.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
assert e.match("Revision root %s not found" % sha1_git_root) | assert e.match("Revision root %s not found" % sha1_git_root) | ||||
▲ Show 20 Lines • Show All 170 Lines • ▼ Show 20 Lines | |||||
@given(new_revision()) | @given(new_revision()) | ||||
def test_lookup_revision_invalid_msg(archive_data, new_revision): | def test_lookup_revision_invalid_msg(archive_data, new_revision): | ||||
new_revision = new_revision.to_dict() | new_revision = new_revision.to_dict() | ||||
new_revision["message"] = b"elegant fix for bug \xff" | new_revision["message"] = b"elegant fix for bug \xff" | ||||
archive_data.revision_add([Revision.from_dict(new_revision)]) | archive_data.revision_add([Revision.from_dict(new_revision)]) | ||||
revision = archive.lookup_revision(hash_to_hex(new_revision["id"])) | revision = archive.lookup_revision(hash_to_hex(new_revision["id"])) | ||||
assert revision["message"] == "elegant fix for bug \\xff" | assert revision["message"] == "elegant fix for bug \\xff" | ||||
assert revision["decoding_failures"] == ["message"] | assert "message" in revision["decoding_failures"] | ||||
@given(new_revision()) | @given(new_revision()) | ||||
def test_lookup_revision_msg_ok(archive_data, new_revision): | def test_lookup_revision_msg_ok(archive_data, new_revision): | ||||
archive_data.revision_add([new_revision]) | archive_data.revision_add([new_revision]) | ||||
revision_message = archive.lookup_revision_message(hash_to_hex(new_revision.id)) | revision_message = archive.lookup_revision_message(hash_to_hex(new_revision.id)) | ||||
▲ Show 20 Lines • Show All 297 Lines • ▼ Show 20 Lines | ): | ||||
expected = archive_data.revision_get(revision) | expected = archive_data.revision_get(revision) | ||||
assert archive.lookup_object(ObjectType.REVISION, revision) == expected | assert archive.lookup_object(ObjectType.REVISION, revision) == expected | ||||
expected = {**archive_data.snapshot_get(snapshot), "next_branch": None} | expected = {**archive_data.snapshot_get(snapshot), "next_branch": None} | ||||
assert archive.lookup_object(ObjectType.SNAPSHOT, snapshot) == expected | assert archive.lookup_object(ObjectType.SNAPSHOT, snapshot) == expected | ||||
@given( | |||||
unknown_content(), | |||||
unknown_directory(), | |||||
unknown_release(), | |||||
unknown_revision(), | |||||
unknown_snapshot(), | |||||
) | |||||
def test_lookup_unknown_objects( | def test_lookup_unknown_objects( | ||||
unknown_content, | unknown_content, | ||||
unknown_directory, | unknown_directory, | ||||
unknown_release, | unknown_release, | ||||
unknown_revision, | unknown_revision, | ||||
unknown_snapshot, | unknown_snapshot, | ||||
): | ): | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
Show All 12 Lines | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_object(ObjectType.REVISION, unknown_revision) | archive.lookup_object(ObjectType.REVISION, unknown_revision) | ||||
assert e.match(r"Revision.*not found") | assert e.match(r"Revision.*not found") | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_object(ObjectType.SNAPSHOT, unknown_snapshot) | archive.lookup_object(ObjectType.SNAPSHOT, unknown_snapshot) | ||||
assert e.match(r"Snapshot.*not found") | assert e.match(r"Snapshot.*not found") | ||||
@given(invalid_sha1()) | |||||
def test_lookup_invalid_objects(invalid_sha1): | def test_lookup_invalid_objects(invalid_sha1): | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
archive.lookup_object(ObjectType.CONTENT, invalid_sha1) | archive.lookup_object(ObjectType.CONTENT, invalid_sha1) | ||||
assert e.match("Invalid hash") | assert e.match("Invalid hash") | ||||
with pytest.raises(BadInputExc) as e: | with pytest.raises(BadInputExc) as e: | ||||
archive.lookup_object(ObjectType.DIRECTORY, invalid_sha1) | archive.lookup_object(ObjectType.DIRECTORY, invalid_sha1) | ||||
▲ Show 20 Lines • Show All 310 Lines • Show Last 20 Lines |