Changeset View
Changeset View
Standalone View
Standalone View
swh/graphql/tests/functional/test_content.py
Show All 10 Lines | |||||
@pytest.mark.parametrize("content", get_contents()) | @pytest.mark.parametrize("content", get_contents()) | ||||
def test_get_content_with_swhid(client, content): | def test_get_content_with_swhid(client, content): | ||||
query_str = """ | query_str = """ | ||||
query getContent($swhid: SWHID!) { | query getContent($swhid: SWHID!) { | ||||
content(swhid: $swhid) { | content(swhid: $swhid) { | ||||
swhid | swhid | ||||
id | id | ||||
checksum { | hashes { | ||||
blake2s256 | blake2s256 | ||||
sha1 | sha1 | ||||
sha1_git | sha1_git | ||||
sha256 | sha256 | ||||
} | } | ||||
length | length | ||||
status | status | ||||
data { | data { | ||||
Show All 11 Lines | query getContent($swhid: SWHID!) { | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
data, _ = utils.get_query_response(client, query_str, swhid=str(content.swhid())) | data, _ = utils.get_query_response(client, query_str, swhid=str(content.swhid())) | ||||
archive_url = "https://archive.softwareheritage.org/api/1/" | archive_url = "https://archive.softwareheritage.org/api/1/" | ||||
response = { | response = { | ||||
"swhid": str(content.swhid()), | "swhid": str(content.swhid()), | ||||
"id": content.sha1_git.hex(), | "id": content.sha1_git.hex(), | ||||
"checksum": { | "hashes": { | ||||
"blake2s256": content.blake2s256.hex(), | "blake2s256": content.blake2s256.hex(), | ||||
"sha1": content.sha1.hex(), | "sha1": content.sha1.hex(), | ||||
"sha1_git": content.sha1_git.hex(), | "sha1_git": content.sha1_git.hex(), | ||||
"sha256": content.sha256.hex(), | "sha256": content.sha256.hex(), | ||||
}, | }, | ||||
"length": content.length, | "length": content.length, | ||||
"status": content.status, | "status": content.status, | ||||
"data": { | "data": { | ||||
"url": f"{archive_url}content/sha1:{content.sha1.hex()}/raw/", | "url": f"{archive_url}content/sha1:{content.sha1.hex()}/raw/", | ||||
}, | }, | ||||
"mimeType": None, | "mimeType": None, | ||||
"language": None, | "language": None, | ||||
"license": None, | "license": None, | ||||
} | } | ||||
assert data["content"] == response | assert data["content"] == response | ||||
@pytest.mark.parametrize("content", get_contents()) | @pytest.mark.parametrize("content", get_contents()) | ||||
def test_get_content_with_hash(client, content): | def test_get_content_with_hash(client, content): | ||||
query_str = """ | query_str = """ | ||||
query getContent($checksums: [ContentHash]!) { | query getContent($hashes: [ContentHash]!) { | ||||
contentByHash(checksums: $checksums) { | contentByHashes(hashes: $hashes) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
data, _ = utils.get_query_response( | data, _ = utils.get_query_response( | ||||
client, | client, | ||||
query_str, | query_str, | ||||
checksums=[ | hashes=[ | ||||
f"blake2s256:{content.blake2s256.hex()}", | f"blake2s256:{content.blake2s256.hex()}", | ||||
f"sha1:{content.sha1.hex()}", | f"sha1:{content.sha1.hex()}", | ||||
f"sha1_git:{content.sha1_git.hex()}", | f"sha1_git:{content.sha1_git.hex()}", | ||||
f"sha256:{content.sha256.hex()}", | f"sha256:{content.sha256.hex()}", | ||||
], | ], | ||||
) | ) | ||||
assert data["contentByHash"] == {"swhid": str(content.swhid())} | assert data["contentByHashes"] == {"swhid": str(content.swhid())} | ||||
def test_get_content_with_invalid_swhid(client): | def test_get_content_with_invalid_swhid(client): | ||||
query_str = """ | query_str = """ | ||||
query getContent($swhid: SWHID!) { | query getContent($swhid: SWHID!) { | ||||
content(swhid: $swhid) { | content(swhid: $swhid) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
errors = utils.get_error_response(client, query_str, swhid="invalid") | errors = utils.get_error_response(client, query_str, swhid="invalid") | ||||
# API will throw an error in case of an invalid SWHID | # API will throw an error in case of an invalid SWHID | ||||
assert len(errors) == 1 | assert len(errors) == 1 | ||||
assert "Input error: Invalid SWHID" in errors[0]["message"] | assert "Input error: Invalid SWHID" in errors[0]["message"] | ||||
def test_get_content_with_invalid_hashes(client): | def test_get_content_with_invalid_hashes(client): | ||||
content = get_contents()[0] | content = get_contents()[0] | ||||
query_str = """ | query_str = """ | ||||
query getContent($checksums: [ContentHash]!) { | query getContent($hashes: [ContentHash]!) { | ||||
contentByHash(checksums: $checksums) { | contentByHashes(hashes: $hashes) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
errors = utils.get_error_response( | errors = utils.get_error_response( | ||||
client, | client, | ||||
query_str, | query_str, | ||||
checksums=[ | hashes=[ | ||||
"invalid", # Only one hash is invalid | "invalid", # Only one hash is invalid | ||||
f"sha1:{content.sha1.hex()}", | f"sha1:{content.sha1.hex()}", | ||||
f"sha1_git:{content.sha1_git.hex()}", | f"sha1_git:{content.sha1_git.hex()}", | ||||
f"sha256:{content.sha256.hex()}", | f"sha256:{content.sha256.hex()}", | ||||
], | ], | ||||
) | ) | ||||
# API will throw an error in case of an invalid content hash | # API will throw an error in case of an invalid content hash | ||||
assert len(errors) == 1 | assert len(errors) == 1 | ||||
assert "Input error: Invalid content checksum" in errors[0]["message"] | assert "Input error: Invalid content hash" in errors[0]["message"] | ||||
def test_get_content_with_invalid_hash_algorithm(client): | def test_get_content_with_invalid_hash_algorithm(client): | ||||
content = get_contents()[0] | content = get_contents()[0] | ||||
query_str = """ | query_str = """ | ||||
query getContent($checksums: [ContentHash]!) { | query getContent($hashes: [ContentHash]!) { | ||||
contentByHash(checksums: $checksums) { | contentByHashes(hashes: $hashes) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
data, errors = utils.get_query_response( | data, errors = utils.get_query_response( | ||||
client, query_str, checksums=[f"test:{content.sha1.hex()}"] | client, query_str, hashes=[f"test:{content.sha1.hex()}"] | ||||
) | ) | ||||
assert data is None | assert data is None | ||||
assert len(errors) == 1 | assert len(errors) == 1 | ||||
assert "Input error: Invalid hash algorithm" in errors[0]["message"] | assert "Input error: Invalid hash algorithm" in errors[0]["message"] | ||||
def test_get_content_as_target(client): | def test_get_content_as_target(client): | ||||
# SWHID of a test dir with a file entry | # SWHID of a test dir with a file entry | ||||
▲ Show 20 Lines • Show All 42 Lines • Show Last 20 Lines |