Changeset View
Changeset View
Standalone View
Standalone View
swh/graphql/tests/functional/test_content.py
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import pytest | import pytest | ||||
from . import utils | from . import utils | ||||
from ..data import get_contents | from ..data import get_contents | ||||
@pytest.mark.parametrize("content", get_contents()) | @pytest.mark.parametrize("content", get_contents()) | ||||
def test_get_content_with_swhid(client, content): | def test_get_content_with_hashes(client, content): | ||||
query_str = """ | query_str = """ | ||||
query getContent($swhid: SWHID!) { | query getContentByHashes($sha1: String!, $sha256: String!, | ||||
content(swhid: $swhid) { | $sha1_git: String!, $blake2s256: String!) { | ||||
contentByHashes(sha1: $sha1, sha256: $sha256, sha1_git: $sha1_git, | |||||
blake2s256: $blake2s256) { | |||||
swhid | swhid | ||||
id | id | ||||
hashes { | hashes { | ||||
blake2s256 | blake2s256 | ||||
sha1 | sha1 | ||||
sha1_git | sha1_git | ||||
sha256 | sha256 | ||||
} | } | ||||
Show All 9 Lines | query getContentByHashes($sha1: String!, $sha256: String!, | ||||
lang | lang | ||||
} | } | ||||
license { | license { | ||||
licenses | licenses | ||||
} | } | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
data, _ = utils.get_query_response(client, query_str, swhid=str(content.swhid())) | data, _ = utils.get_query_response( | ||||
client, | |||||
query_str, | |||||
blake2s256=content.blake2s256.hex(), | |||||
sha1=content.sha1.hex(), | |||||
sha1_git=content.sha1_git.hex(), | |||||
sha256=content.sha256.hex(), | |||||
) | |||||
archive_url = "https://archive.softwareheritage.org/api/1/" | archive_url = "https://archive.softwareheritage.org/api/1/" | ||||
response = { | response = { | ||||
"swhid": str(content.swhid()), | "swhid": str(content.swhid()), | ||||
"id": content.sha1_git.hex(), | "id": content.sha1_git.hex(), | ||||
"hashes": { | "hashes": { | ||||
"blake2s256": content.blake2s256.hex(), | "blake2s256": content.blake2s256.hex(), | ||||
"sha1": content.sha1.hex(), | "sha1": content.sha1.hex(), | ||||
"sha1_git": content.sha1_git.hex(), | "sha1_git": content.sha1_git.hex(), | ||||
"sha256": content.sha256.hex(), | "sha256": content.sha256.hex(), | ||||
}, | }, | ||||
"length": content.length, | "length": content.length, | ||||
"status": content.status, | "status": content.status, | ||||
"data": { | "data": { | ||||
"url": f"{archive_url}content/sha1:{content.sha1.hex()}/raw/", | "url": f"{archive_url}content/sha1:{content.sha1.hex()}/raw/", | ||||
}, | }, | ||||
"mimeType": None, | "mimeType": None, | ||||
"language": None, | "language": None, | ||||
"license": None, | "license": None, | ||||
} | } | ||||
assert data["content"] == response | assert data["contentByHashes"] == response | ||||
def test_get_content_with_invalid_swhid(client): | @pytest.mark.parametrize("content", get_contents()) | ||||
def test_get_contents_with_swhid(client, content): | |||||
query_str = """ | query_str = """ | ||||
query getContent($swhid: SWHID!) { | query getContents($swhid: SWHID!) { | ||||
content(swhid: $swhid) { | contentsBySWHID(swhid: $swhid) { | ||||
swhid | |||||
} | |||||
} | |||||
""" | |||||
data, _ = utils.get_query_response(client, query_str, swhid=str(content.swhid())) | |||||
assert data["contentsBySWHID"] == [{"swhid": str(content.swhid())}] | |||||
def test_get_contents_with_invalid_swhid(client): | |||||
query_str = """ | |||||
query getContents($swhid: SWHID!) { | |||||
contentsBySWHID(swhid: $swhid) { | |||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
errors = utils.get_error_response(client, query_str, swhid="invalid") | errors = utils.get_error_response(client, query_str, swhid="invalid") | ||||
# API will throw an error in case of an invalid SWHID | # API will throw an error in case of an invalid SWHID | ||||
assert len(errors) == 1 | assert len(errors) == 1 | ||||
assert "Input error: Invalid SWHID" in errors[0]["message"] | assert "Input error: Invalid SWHID" in errors[0]["message"] | ||||
def test_get_contents_with_missing_swhid(client): | |||||
missing_sha1 = "1" * 40 | |||||
query_str = """ | |||||
query getContents($swhid: SWHID!) { | |||||
contentsBySWHID(swhid: $swhid) { | |||||
swhid | |||||
} | |||||
} | |||||
""" | |||||
data, _ = utils.get_query_response( | |||||
client, query_str, swhid=f"swh:1:cnt:{missing_sha1}" | |||||
) | |||||
assert data["contentsBySWHID"] == [] | |||||
@pytest.mark.parametrize("content", get_contents()) | @pytest.mark.parametrize("content", get_contents()) | ||||
def test_get_content_with_hash(client, content): | def test_get_contents_with_hashes(client, content): | ||||
query_str = """ | query_str = """ | ||||
query getContent($sha1: String, $sha1_git: String, $sha256: String, $blake2s256: String) { | query getContents($sha1: String, $sha1_git: String, $sha256: String, | ||||
contentByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | $blake2s256: String) { | ||||
contentsByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | |||||
blake2s256: $blake2s256) { | blake2s256: $blake2s256) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
data, _ = utils.get_query_response( | data, _ = utils.get_query_response( | ||||
client, | client, | ||||
query_str, | query_str, | ||||
sha1=content.sha1.hex(), | sha1=content.sha1.hex(), | ||||
sha1_git=content.sha1_git.hex(), | sha1_git=content.sha1_git.hex(), | ||||
sha256=content.sha256.hex(), | sha256=content.sha256.hex(), | ||||
blake2s256=content.blake2s256.hex(), | blake2s256=content.blake2s256.hex(), | ||||
) | ) | ||||
assert data["contentByHashes"] == {"swhid": str(content.swhid())} | assert data["contentsByHashes"] == [{"swhid": str(content.swhid())}] | ||||
@pytest.mark.parametrize("content", get_contents()) | @pytest.mark.parametrize("content", get_contents()) | ||||
def test_get_content_with_single_hash(client, content): | def test_get_content_with_single_hash(client, content): | ||||
query_str = """ | query_str = """ | ||||
query getContent($sha1: String, $sha1_git: String, $sha256: String, $blake2s256: String) { | query getContents($sha1: String, $sha1_git: String, $sha256: String, | ||||
contentByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | $blake2s256: String) { | ||||
contentsByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | |||||
blake2s256: $blake2s256) { | blake2s256: $blake2s256) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
data, _ = utils.get_query_response( | data, _ = utils.get_query_response( | ||||
client, | client, | ||||
query_str, | query_str, | ||||
sha1=content.sha1.hex(), | sha1=content.sha1.hex(), | ||||
) | ) | ||||
assert data["contentByHashes"] == {"swhid": str(content.swhid())} | assert data["contentsByHashes"] == [{"swhid": str(content.swhid())}] | ||||
data, _ = utils.get_query_response( | |||||
client, | |||||
query_str, | |||||
blake2s256=content.blake2s256.hex(), | |||||
) | |||||
assert data["contentsByHashes"] == [{"swhid": str(content.swhid())}] | |||||
@pytest.mark.parametrize("content", get_contents()) | @pytest.mark.parametrize("content", get_contents()) | ||||
def test_get_content_with_one_non_matching_hash(client, content): | def test_get_contents_with_one_non_matching_hash(client, content): | ||||
query_str = """ | query_str = """ | ||||
query getContent($sha1: String, $sha1_git: String, $sha256: String, $blake2s256: String) { | query getContents($sha1: String, $sha1_git: String, $sha256: String, $blake2s256: String) { | ||||
contentByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | contentsByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | ||||
blake2s256: $blake2s256) { | blake2s256: $blake2s256) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
utils.assert_missing_object( | data, _ = utils.get_query_response( | ||||
client, | client, | ||||
query_str, | query_str, | ||||
obj_type="contentByHashes", | obj_type="contentsByHashes", | ||||
sha1=content.sha1.hex(), | sha1=content.sha1.hex(), | ||||
sha1_git="a" * 20, # hash is valid, but not matching the object | sha1_git="a" * 20, # hash is valid, but not matching the object | ||||
) | ) | ||||
assert data["contentsByHashes"] == [] | |||||
def test_get_content_with_invalid_hashes(client): | def test_get_content_with_invalid_hashes(client): | ||||
content = get_contents()[0] | content = get_contents()[0] | ||||
query_str = """ | query_str = """ | ||||
query getContent($sha1: String, $sha1_git: String, $sha256: String, $blake2s256: String) { | query getContents($sha1: String, $sha1_git: String, $sha256: String, | ||||
contentByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | $blake2s256: String) { | ||||
contentsByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | |||||
blake2s256: $blake2s256) { | blake2s256: $blake2s256) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
errors = utils.get_error_response( | errors = utils.get_error_response( | ||||
client, | client, | ||||
query_str, | query_str, | ||||
sha1="invalid", # Only one hash is invalid | sha1="invalid", # Only one hash is invalid | ||||
sha1_git=content.sha1_git.hex(), | sha1_git=content.sha1_git.hex(), | ||||
sha256=content.sha256.hex(), | sha256=content.sha256.hex(), | ||||
) | ) | ||||
# API will throw an error in case of an invalid content hash | # API will throw an error in case of an invalid content hash | ||||
assert len(errors) == 1 | assert len(errors) == 1 | ||||
assert "Input error: Invalid content hash" in errors[0]["message"] | assert "Input error: Invalid content hash" in errors[0]["message"] | ||||
def test_get_content_with_no_hashes(client): | def test_get_content_with_no_hashes(client): | ||||
query_str = """ | query_str = """ | ||||
query getContent($sha1: String, $sha1_git: String, $sha256: String, $blake2s256: String) { | query getContents($sha1: String, $sha1_git: String, $sha256: String, $blake2s256: String) { | ||||
contentByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | contentsByHashes(sha1: $sha1, sha1_git: $sha1_git, sha256: $sha256, | ||||
blake2s256: $blake2s256) { | blake2s256: $blake2s256) { | ||||
swhid | swhid | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
errors = utils.get_error_response( | errors = utils.get_error_response( | ||||
client, | client, | ||||
query_str, | query_str, | ||||
Show All 27 Lines | def test_get_content_as_target(client): | ||||
} | } | ||||
""" | """ | ||||
data, _ = utils.get_query_response(client, query_str, swhid=directory_swhid) | data, _ = utils.get_query_response(client, query_str, swhid=directory_swhid) | ||||
content_obj = data["directory"]["entries"]["nodes"][1]["target"] | content_obj = data["directory"]["entries"]["nodes"][1]["target"] | ||||
assert content_obj == { | assert content_obj == { | ||||
"length": 4, | "length": 4, | ||||
"swhid": "swh:1:cnt:86bc6b377e9d25f9d26777a4a28d08e63e7c5779", | "swhid": "swh:1:cnt:86bc6b377e9d25f9d26777a4a28d08e63e7c5779", | ||||
} | } | ||||
def test_get_content_with_unknown_swhid(client): | |||||
unknown_sha1 = "1" * 40 | |||||
query_str = """ | |||||
query getDirectory($swhid: SWHID!) { | |||||
content(swhid: $swhid) { | |||||
swhid | |||||
} | |||||
} | |||||
""" | |||||
utils.assert_missing_object( | |||||
client, | |||||
query_str, | |||||
obj_type="content", | |||||
swhid=f"swh:1:cnt:{unknown_sha1}", | |||||
) |