Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_archive.py
Show All 25 Lines | |||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | ||||
from swh.web.common import archive | from swh.web.common import archive | ||||
from swh.web.common.exc import BadInputExc, NotFoundExc | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.common.typing import OriginInfo, PagedResult | from swh.web.common.typing import OriginInfo, PagedResult | ||||
from swh.web.tests.conftest import ctags_json_missing, fossology_missing | from swh.web.tests.conftest import ctags_json_missing, fossology_missing | ||||
from swh.web.tests.data import random_content, random_sha1 | from swh.web.tests.data import random_content, random_sha1 | ||||
from swh.web.tests.strategies import ( | from swh.web.tests.strategies import ( | ||||
ancestor_revisions, | ancestor_revisions, | ||||
content, | |||||
contents, | |||||
contents_with_ctags, | |||||
directory, | directory, | ||||
empty_content, | |||||
empty_directory, | empty_directory, | ||||
invalid_sha1, | invalid_sha1, | ||||
new_origin, | new_origin, | ||||
new_revision, | new_revision, | ||||
non_ancestor_revisions, | non_ancestor_revisions, | ||||
origin, | origin, | ||||
release, | release, | ||||
releases, | releases, | ||||
revision, | revision, | ||||
revision_with_submodules, | revision_with_submodules, | ||||
revisions, | revisions, | ||||
sha256, | sha256, | ||||
snapshot, | snapshot, | ||||
unknown_content, | unknown_content, | ||||
unknown_contents, | unknown_contents, | ||||
unknown_directory, | unknown_directory, | ||||
unknown_release, | unknown_release, | ||||
unknown_revision, | unknown_revision, | ||||
unknown_snapshot, | unknown_snapshot, | ||||
visit_dates, | visit_dates, | ||||
) | ) | ||||
@given(contents()) | |||||
def test_lookup_multiple_hashes_all_present(contents): | def test_lookup_multiple_hashes_all_present(contents): | ||||
input_data = [] | input_data = [] | ||||
expected_output = [] | expected_output = [] | ||||
for cnt in contents: | for cnt in contents: | ||||
input_data.append({"sha1": cnt["sha1"]}) | input_data.append({"sha1": cnt["sha1"]}) | ||||
expected_output.append({"sha1": cnt["sha1"], "found": True}) | expected_output.append({"sha1": cnt["sha1"], "found": True}) | ||||
assert archive.lookup_multiple_hashes(input_data) == expected_output | assert archive.lookup_multiple_hashes(input_data) == expected_output | ||||
@given(contents(), unknown_contents()) | @given(unknown_contents()) | ||||
def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): | def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): | ||||
input_contents = list(itertools.chain(contents, unknown_contents)) | input_contents = list(itertools.chain(contents, unknown_contents)) | ||||
random.shuffle(input_contents) | random.shuffle(input_contents) | ||||
input_data = [] | input_data = [] | ||||
expected_output = [] | expected_output = [] | ||||
for cnt in input_contents: | for cnt in input_contents: | ||||
input_data.append({"sha1": cnt["sha1"]}) | input_data.append({"sha1": cnt["sha1"]}) | ||||
expected_output.append({"sha1": cnt["sha1"], "found": cnt in contents}) | expected_output.append({"sha1": cnt["sha1"], "found": cnt in contents}) | ||||
assert archive.lookup_multiple_hashes(input_data) == expected_output | assert archive.lookup_multiple_hashes(input_data) == expected_output | ||||
def test_lookup_hash_does_not_exist(): | def test_lookup_hash_does_not_exist(): | ||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
actual_lookup = archive.lookup_hash("sha1_git:%s" % unknown_content_["sha1_git"]) | actual_lookup = archive.lookup_hash("sha1_git:%s" % unknown_content_["sha1_git"]) | ||||
assert actual_lookup == {"found": None, "algo": "sha1_git"} | assert actual_lookup == {"found": None, "algo": "sha1_git"} | ||||
@given(content()) | |||||
def test_lookup_hash_exist(archive_data, content): | def test_lookup_hash_exist(archive_data, content): | ||||
actual_lookup = archive.lookup_hash("sha1:%s" % content["sha1"]) | actual_lookup = archive.lookup_hash("sha1:%s" % content["sha1"]) | ||||
content_metadata = archive_data.content_get(content["sha1"]) | content_metadata = archive_data.content_get(content["sha1"]) | ||||
assert {"found": content_metadata, "algo": "sha1"} == actual_lookup | assert {"found": content_metadata, "algo": "sha1"} == actual_lookup | ||||
def test_search_hash_does_not_exist(): | def test_search_hash_does_not_exist(): | ||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
actual_lookup = archive.search_hash("sha1_git:%s" % unknown_content_["sha1_git"]) | actual_lookup = archive.search_hash("sha1_git:%s" % unknown_content_["sha1_git"]) | ||||
assert {"found": False} == actual_lookup | assert {"found": False} == actual_lookup | ||||
@given(content()) | |||||
def test_search_hash_exist(content): | def test_search_hash_exist(content): | ||||
actual_lookup = archive.search_hash("sha1:%s" % content["sha1"]) | actual_lookup = archive.search_hash("sha1:%s" % content["sha1"]) | ||||
assert {"found": True} == actual_lookup | assert {"found": True} == actual_lookup | ||||
@pytest.mark.skipif( | @pytest.mark.skipif( | ||||
ctags_json_missing, reason="requires ctags with json output support" | ctags_json_missing, reason="requires ctags with json output support" | ||||
) | ) | ||||
@given(contents_with_ctags()) | |||||
def test_lookup_content_ctags(indexer_data, contents_with_ctags): | def test_lookup_content_ctags(indexer_data, contents_with_ctags): | ||||
content_sha1 = random.choice(contents_with_ctags["sha1s"]) | content_sha1 = random.choice(contents_with_ctags["sha1s"]) | ||||
indexer_data.content_add_ctags(content_sha1) | indexer_data.content_add_ctags(content_sha1) | ||||
actual_ctags = list(archive.lookup_content_ctags("sha1:%s" % content_sha1)) | actual_ctags = list(archive.lookup_content_ctags("sha1:%s" % content_sha1)) | ||||
expected_data = list(indexer_data.content_get_ctags(content_sha1)) | expected_data = list(indexer_data.content_get_ctags(content_sha1)) | ||||
for ctag in expected_data: | for ctag in expected_data: | ||||
ctag["id"] = content_sha1 | ctag["id"] = content_sha1 | ||||
assert actual_ctags == expected_data | assert actual_ctags == expected_data | ||||
def test_lookup_content_ctags_no_hash(): | def test_lookup_content_ctags_no_hash(): | ||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
actual_ctags = list( | actual_ctags = list( | ||||
archive.lookup_content_ctags("sha1:%s" % unknown_content_["sha1"]) | archive.lookup_content_ctags("sha1:%s" % unknown_content_["sha1"]) | ||||
) | ) | ||||
assert actual_ctags == [] | assert actual_ctags == [] | ||||
@given(content()) | |||||
def test_lookup_content_filetype(indexer_data, content): | def test_lookup_content_filetype(indexer_data, content): | ||||
indexer_data.content_add_mimetype(content["sha1"]) | indexer_data.content_add_mimetype(content["sha1"]) | ||||
actual_filetype = archive.lookup_content_filetype(content["sha1"]) | actual_filetype = archive.lookup_content_filetype(content["sha1"]) | ||||
expected_filetype = indexer_data.content_get_mimetype(content["sha1"]) | expected_filetype = indexer_data.content_get_mimetype(content["sha1"]) | ||||
assert actual_filetype == expected_filetype | assert actual_filetype == expected_filetype | ||||
@given(contents_with_ctags()) | |||||
def test_lookup_expression(indexer_data, contents_with_ctags): | def test_lookup_expression(indexer_data, contents_with_ctags): | ||||
per_page = 10 | per_page = 10 | ||||
expected_ctags = [] | expected_ctags = [] | ||||
for content_sha1 in contents_with_ctags["sha1s"]: | for content_sha1 in contents_with_ctags["sha1s"]: | ||||
if len(expected_ctags) == per_page: | if len(expected_ctags) == per_page: | ||||
break | break | ||||
indexer_data.content_add_ctags(content_sha1) | indexer_data.content_add_ctags(content_sha1) | ||||
Show All 19 Lines | def test_lookup_expression_no_result(): | ||||
actual_ctags = list( | actual_ctags = list( | ||||
archive.lookup_expression("barfoo", last_sha1=None, per_page=10) | archive.lookup_expression("barfoo", last_sha1=None, per_page=10) | ||||
) | ) | ||||
assert actual_ctags == expected_ctags | assert actual_ctags == expected_ctags | ||||
@pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") | @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") | ||||
@given(content()) | |||||
def test_lookup_content_license(indexer_data, content): | def test_lookup_content_license(indexer_data, content): | ||||
indexer_data.content_add_license(content["sha1"]) | indexer_data.content_add_license(content["sha1"]) | ||||
actual_license = archive.lookup_content_license(content["sha1"]) | actual_license = archive.lookup_content_license(content["sha1"]) | ||||
expected_license = indexer_data.content_get_license(content["sha1"]) | expected_license = indexer_data.content_get_license(content["sha1"]) | ||||
assert actual_license == expected_license | assert actual_license == expected_license | ||||
▲ Show 20 Lines • Show All 423 Lines • ▼ Show 20 Lines | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_content_raw("sha1:" + unknown_content_["sha1"]) | archive.lookup_content_raw("sha1:" + unknown_content_["sha1"]) | ||||
assert e.match( | assert e.match( | ||||
"Content with %s checksum equals to %s not found!" | "Content with %s checksum equals to %s not found!" | ||||
% ("sha1", unknown_content_["sha1"]) | % ("sha1", unknown_content_["sha1"]) | ||||
) | ) | ||||
@given(content()) | |||||
def test_lookup_content_raw(archive_data, content): | def test_lookup_content_raw(archive_data, content): | ||||
actual_content = archive.lookup_content_raw("sha256:%s" % content["sha256"]) | actual_content = archive.lookup_content_raw("sha256:%s" % content["sha256"]) | ||||
expected_content = archive_data.content_get_data(content["sha1"]) | expected_content = archive_data.content_get_data(content["sha1"]) | ||||
assert actual_content == expected_content | assert actual_content == expected_content | ||||
@given(empty_content()) | def test_lookup_empty_content_raw(empty_content): | ||||
def test_lookup_empty_content_raw(archive_data, empty_content): | |||||
content_raw = archive.lookup_content_raw(f"sha1_git:{empty_content['sha1_git']}") | content_raw = archive.lookup_content_raw(f"sha1_git:{empty_content['sha1_git']}") | ||||
assert content_raw["data"] == b"" | assert content_raw["data"] == b"" | ||||
def test_lookup_content_not_found(): | def test_lookup_content_not_found(): | ||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
with pytest.raises(NotFoundExc) as e: | with pytest.raises(NotFoundExc) as e: | ||||
archive.lookup_content("sha1:%s" % unknown_content_["sha1"]) | archive.lookup_content("sha1:%s" % unknown_content_["sha1"]) | ||||
assert e.match( | assert e.match( | ||||
"Content with %s checksum equals to %s not found!" | "Content with %s checksum equals to %s not found!" | ||||
% ("sha1", unknown_content_["sha1"]) | % ("sha1", unknown_content_["sha1"]) | ||||
) | ) | ||||
@given(content()) | |||||
def test_lookup_content_with_sha1(archive_data, content): | def test_lookup_content_with_sha1(archive_data, content): | ||||
actual_content = archive.lookup_content(f"sha1:{content['sha1']}") | actual_content = archive.lookup_content(f"sha1:{content['sha1']}") | ||||
expected_content = archive_data.content_get(content["sha1"]) | expected_content = archive_data.content_get(content["sha1"]) | ||||
assert actual_content == expected_content | assert actual_content == expected_content | ||||
@given(content()) | |||||
def test_lookup_content_with_sha256(archive_data, content): | def test_lookup_content_with_sha256(archive_data, content): | ||||
actual_content = archive.lookup_content(f"sha256:{content['sha256']}") | actual_content = archive.lookup_content(f"sha256:{content['sha256']}") | ||||
expected_content = archive_data.content_get(content["sha1"]) | expected_content = archive_data.content_get(content["sha1"]) | ||||
assert actual_content == expected_content | assert actual_content == expected_content | ||||
▲ Show 20 Lines • Show All 170 Lines • ▼ Show 20 Lines | def test_lookup_directory_through_revision_ok_with_data(archive_data, revision): | ||||
) == ( | ) == ( | ||||
revision, | revision, | ||||
archive.lookup_directory_with_revision( | archive.lookup_directory_with_revision( | ||||
revision, dir_entry["name"], with_data=True | revision, dir_entry["name"], with_data=True | ||||
), | ), | ||||
) | ) | ||||
@given(content(), directory(), release(), revision(), snapshot()) | @given(directory(), release(), revision(), snapshot()) | ||||
def test_lookup_known_objects( | def test_lookup_known_objects( | ||||
archive_data, content, directory, release, revision, snapshot | archive_data, content, directory, release, revision, snapshot | ||||
): | ): | ||||
expected = archive_data.content_find(content) | expected = archive_data.content_find(content) | ||||
assert archive.lookup_object(ObjectType.CONTENT, content["sha1_git"]) == expected | assert archive.lookup_object(ObjectType.CONTENT, content["sha1_git"]) == expected | ||||
expected = archive_data.directory_get(directory) | expected = archive_data.directory_get(directory) | ||||
assert archive.lookup_object(ObjectType.DIRECTORY, directory) == expected | assert archive.lookup_object(ObjectType.DIRECTORY, directory) == expected | ||||
▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | assert actual_result == { | ||||
missing_cnt, | missing_cnt, | ||||
missing_dir, | missing_dir, | ||||
missing_rev, | missing_rev, | ||||
missing_rel, | missing_rel, | ||||
missing_snp, | missing_snp, | ||||
} | } | ||||
@given(content(), directory()) | @given(directory()) | ||||
def test_lookup_missing_hashes_some_present(archive_data, content, directory): | def test_lookup_missing_hashes_some_present(content, directory): | ||||
missing_rev = random_sha1() | missing_rev = random_sha1() | ||||
missing_rel = random_sha1() | missing_rel = random_sha1() | ||||
missing_snp = random_sha1() | missing_snp = random_sha1() | ||||
grouped_swhids = { | grouped_swhids = { | ||||
ObjectType.CONTENT: [hash_to_bytes(content["sha1_git"])], | ObjectType.CONTENT: [hash_to_bytes(content["sha1_git"])], | ||||
ObjectType.DIRECTORY: [hash_to_bytes(directory)], | ObjectType.DIRECTORY: [hash_to_bytes(directory)], | ||||
ObjectType.REVISION: [hash_to_bytes(missing_rev)], | ObjectType.REVISION: [hash_to_bytes(missing_rev)], | ||||
▲ Show 20 Lines • Show All 270 Lines • Show Last 20 Lines |