diff --git a/swh/graphql/resolvers/release.py b/swh/graphql/resolvers/release.py index c766823..b678ef0 100644 --- a/swh/graphql/resolvers/release.py +++ b/swh/graphql/resolvers/release.py @@ -1,52 +1,52 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Union from .base_node import BaseSWHNode from .snapshot_branch import BaseSnapshotBranchNode class BaseReleaseNode(BaseSWHNode): """ Base resolver for all the release nodes """ def _get_release_by_id(self, release_id): - return (self.archive.get_releases([release_id]) or None)[0] + return self.archive.get_releases([release_id])[0] @property def target_hash(self): return self._node.target @property def targetType(self): # To support the schema naming convention return self._node.target_type.value def is_type_of(self): # is_type_of is required only when resolving a UNION type # This is for ariadne to return the right type return "Release" class ReleaseNode(BaseReleaseNode): """ Node resolver for a release requested directly with its SWHID """ def _get_node_data(self): return self._get_release_by_id(self.kwargs.get("swhid").object_id) class TargetReleaseNode(BaseReleaseNode): """ Node resolver for a release requested as a target """ obj: Union[BaseSnapshotBranchNode, BaseReleaseNode] def _get_node_data(self): # self.obj.target_hash is the requested release id return self._get_release_by_id(self.obj.target_hash) diff --git a/swh/graphql/resolvers/revision.py b/swh/graphql/resolvers/revision.py index ce8b0ab..4165df3 100644 --- a/swh/graphql/resolvers/revision.py +++ b/swh/graphql/resolvers/revision.py @@ -1,109 +1,109 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Union from swh.graphql.utils import utils from swh.model.model import Revision from swh.model.swhids import CoreSWHID, ObjectType from swh.storage.interface import PagedResult from .base_connection import BaseConnection from .base_node import BaseSWHNode from .release import BaseReleaseNode from .snapshot_branch import BaseSnapshotBranchNode class BaseRevisionNode(BaseSWHNode): """ Base resolver for all the revision nodes """ def _get_revision_by_id(self, revision_id): - return (self.archive.get_revisions([revision_id]) or None)[0] + return self.archive.get_revisions([revision_id])[0] @property def parent_swhids(self): # for ParentRevisionConnection resolver return [ CoreSWHID(object_type=ObjectType.REVISION, object_id=parent_id) for parent_id in self._node.parents ] @property def directory_hash(self): # for RevisionDirectoryNode resolver return self._node.directory @property def type(self): return self._node.type.value def is_type_of(self): # is_type_of is required only when resolving a UNION type # This is for ariadne to return the right type return "Revision" class RevisionNode(BaseRevisionNode): """ Node resolver for a revision requested directly with its SWHID """ def _get_node_data(self): return self._get_revision_by_id(self.kwargs.get("swhid").object_id) class TargetRevisionNode(BaseRevisionNode): """ Node resolver for a revision requested as a target """ obj: Union[BaseSnapshotBranchNode, BaseReleaseNode] def _get_node_data(self): # self.obj.target_hash is the requested revision id return self._get_revision_by_id(self.obj.target_hash) class ParentRevisionConnection(BaseConnection): """ Connection resolver for parent revisions in a revision """ obj: BaseRevisionNode _node_class = BaseRevisionNode def _get_paged_result(self) -> PagedResult: # self.obj is the current(child) revision # self.obj.parent_swhids is the list of parent SWHIDs # FIXME, using dummy(local) pagination, move pagination to backend # To remove localpagination, just drop the paginated call # STORAGE-TODO (pagination) parents = self.archive.get_revisions( [x.object_id for x in self.obj.parent_swhids] ) return utils.paginated(parents, self._get_first_arg(), self._get_after_arg()) class LogRevisionConnection(BaseConnection): """ Connection resolver for the log (list of revisions) in a revision """ obj: BaseRevisionNode _node_class = BaseRevisionNode def _get_paged_result(self) -> PagedResult: log = self.archive.get_revision_log([self.obj.swhid.object_id]) # Storage is returning a list of dicts instead of model objects # Following loop is to reverse that operation # STORAGE-TODO; remove to_dict from storage.revision_log log = [Revision.from_dict(rev) for rev in log] # FIXME, using dummy(local) pagination, move pagination to backend # To remove localpagination, just drop the paginated call # STORAGE-TODO (pagination) return utils.paginated(log, self._get_first_arg(), self._get_after_arg()) diff --git a/swh/graphql/tests/functional/test_content.py b/swh/graphql/tests/functional/test_content.py index 3a42d26..2c5c703 100644 --- a/swh/graphql/tests/functional/test_content.py +++ b/swh/graphql/tests/functional/test_content.py @@ -1,163 +1,175 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from . import utils from ..data import get_contents @pytest.mark.parametrize("content", get_contents()) def test_get_content_with_swhid(client, content): query_str = """ { content(swhid: "%s") { swhid checksum { blake2s256 sha1 sha1_git sha256 } length status data { url } fileType { encoding } language { lang } license { licenses } } } """ data, _ = utils.get_query_response(client, query_str % content.swhid()) archive_url = "https://archive.softwareheritage.org/api/1/" response = { "swhid": str(content.swhid()), "checksum": { "blake2s256": content.blake2s256.hex(), "sha1": content.sha1.hex(), "sha1_git": content.sha1_git.hex(), "sha256": content.sha256.hex(), }, "length": content.length, "status": content.status, "data": { "url": f"{archive_url}content/sha1:{content.sha1.hex()}/raw/", }, "fileType": None, "language": None, "license": None, } assert data["content"] == response @pytest.mark.parametrize("content", get_contents()) def test_get_content_with_hash(client, content): query_str = """ { contentByHash(checksums: ["blake2s256:%s", "sha1:%s", "sha1_git:%s", "sha256:%s"]) { swhid } } """ data, _ = utils.get_query_response( client, query_str % ( content.blake2s256.hex(), content.sha1.hex(), content.sha1_git.hex(), content.sha256.hex(), ), ) assert data["contentByHash"] == {"swhid": str(content.swhid())} def test_get_content_with_invalid_swhid(client): query_str = """ { content(swhid: "swh:1:cnt:invalid") { swhid } } """ errors = utils.get_error_response(client, query_str) # API will throw an error in case of an invalid SWHID assert len(errors) == 1 assert "Input error: Invalid SWHID" in errors[0]["message"] def test_get_content_with_invalid_hashes(client): content = get_contents()[0] query_str = """ { contentByHash(checksums: ["blake2s256:%s", "sha1:%s", "sha1_git:%s", "sha256:%s"]) { swhid } } """ errors = utils.get_error_response( client, query_str % ( "invalid", # Only one hash is invalid content.sha1.hex(), content.sha1_git.hex(), content.sha256.hex(), ), ) # API will throw an error in case of an invalid content hash assert len(errors) == 1 assert "Input error: Invalid content checksum" in errors[0]["message"] def test_get_content_with_invalid_hash_algorithm(client): content = get_contents()[0] query_str = """ { contentByHash(checksums: ["test:%s"]) { swhid } } """ errors = utils.get_error_response(client, query_str % content.sha1.hex()) assert len(errors) == 1 assert "Input error: Invalid hash algorithm" in errors[0]["message"] def test_get_content_as_target(client): # SWHID of a test dir with a file entry directory_swhid = "swh:1:dir:87b339104f7dc2a8163dec988445e3987995545f" query_str = """ { directory(swhid: "%s") { swhid entries(first: 2) { nodes { targetType target { ...on Content { swhid length } } } } } } """ data, _ = utils.get_query_response(client, query_str % directory_swhid) content_obj = data["directory"]["entries"]["nodes"][1]["target"] assert content_obj == { "length": 4, "swhid": "swh:1:cnt:86bc6b377e9d25f9d26777a4a28d08e63e7c5779", } + + +def test_get_content_with_unknown_swhid(client): + unknown_sha1 = "1" * 40 + query_str = """ + { + content(swhid: "swh:1:cnt:%s") { + swhid + } + } + """ + utils.assert_missing_object(client, query_str % unknown_sha1, "content") diff --git a/swh/graphql/tests/functional/test_directory.py b/swh/graphql/tests/functional/test_directory.py index a07481b..388e4fe 100644 --- a/swh/graphql/tests/functional/test_directory.py +++ b/swh/graphql/tests/functional/test_directory.py @@ -1,74 +1,86 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from . import utils from ..data import get_directories @pytest.mark.parametrize("directory", get_directories()) def test_get_directory(client, directory): query_str = """ { directory(swhid: "%s") { swhid } } """ data, _ = utils.get_query_response(client, query_str % directory.swhid()) assert data["directory"] == {"swhid": str(directory.swhid())} def test_get_directory_with_invalid_swhid(client): query_str = """ { directory(swhid: "swh:1:dir:invalid") { swhid } } """ errors = utils.get_error_response(client, query_str) # API will throw an error in case of an invalid SWHID assert len(errors) == 1 assert "Input error: Invalid SWHID" in errors[0]["message"] def test_get_revision_directory(client): query_str = """ { revision(swhid: "swh:1:rev:66c7c1cd9673275037140f2abff7b7b11fc9439c") { swhid directory { swhid } } } """ data, _ = utils.get_query_response(client, query_str) assert data["revision"]["directory"] == { "swhid": "swh:1:dir:0101010101010101010101010101010101010101" } def test_get_target_directory(client): # TargetDirectoryNode is returned from snapshotbranch, release # and directory entry nodes. Release node is used for testing here query_str = """ { release(swhid: "swh:1:rel:ee4d20e80af850cc0f417d25dc5073792c5010d2") { swhid target { ...on Directory { swhid } } } } """ data, _ = utils.get_query_response(client, query_str) assert data["release"]["target"] == { "swhid": "swh:1:dir:0505050505050505050505050505050505050505" } + + +def test_get_directory_with_unknown_swhid(client): + unknown_sha1 = "1" * 40 + query_str = """ + { + directory(swhid: "swh:1:dir:%s") { + swhid + } + } + """ + utils.assert_missing_object(client, query_str % unknown_sha1, "directory") diff --git a/swh/graphql/tests/functional/test_release_node.py b/swh/graphql/tests/functional/test_release_node.py index fe31b4c..98f5b38 100644 --- a/swh/graphql/tests/functional/test_release_node.py +++ b/swh/graphql/tests/functional/test_release_node.py @@ -1,165 +1,177 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import pytest from swh.model.model import ObjectType from . import utils from ..data import ( get_contents, get_directories, get_releases, get_releases_with_target, get_revisions, ) @pytest.mark.parametrize("release", get_releases()) def test_get_release(client, release): query_str = ( """ { release(swhid: "%s") { swhid name { text base64 } message { text } author { email { text } name { text } fullname { text } } date targetType } } """ % release.swhid() ) data, _ = utils.get_query_response(client, query_str) assert data["release"] == { "swhid": str(release.swhid()), "name": { "text": release.name.decode(), "base64": base64.b64encode(release.name).decode("ascii"), }, "message": {"text": release.message.decode()}, "author": { "email": {"text": release.author.email.decode()}, "name": {"text": release.author.name.decode()}, "fullname": {"text": release.author.fullname.decode()}, } if release.author else None, "date": release.date.to_datetime().isoformat() if release.date else None, "targetType": release.target_type.value, } def test_get_release_with_invalid_swhid(client): query_str = """ { - content(swhid: "swh:1:rel:invalid") { + release(swhid: "swh:1:rel:invalid") { swhid } } """ errors = utils.get_error_response(client, query_str) # API will throw an error in case of an invalid SWHID assert len(errors) == 1 @pytest.mark.parametrize("release_with_target", get_releases_with_target()) def test_get_release_targets(client, release_with_target): query_str = """ { release(swhid: "%s") { targetType target { ...on Revision { swhid } ...on Release { swhid } ...on Directory { swhid } ...on Content { swhid } } } } """ data, _ = utils.get_query_response(client, query_str % release_with_target.swhid()) if release_with_target.target_type == ObjectType.REVISION: target_swhid = get_revisions()[0].swhid() elif release_with_target.target_type == ObjectType.RELEASE: target_swhid = get_releases()[0].swhid() elif release_with_target.target_type == ObjectType.DIRECTORY: target_swhid = get_directories()[0].swhid() elif release_with_target.target_type == ObjectType.CONTENT: target_swhid = get_contents()[0].swhid() assert data["release"] == { "targetType": release_with_target.target_type.value, "target": {"swhid": str(target_swhid)}, } def test_get_release_target_unknown(client): # Clinet can request all the possible options if the target type # is unknown. The data under the right type will be returned # The target is of type Revision in this case # ie: both swhid and message will be available in the response swhid = get_releases_with_target()[0].swhid() query_str = """ { release(swhid: "%s") { targetType target { ...on Revision { swhid message { text } } ...on Release { swhid } ...on Directory { swhid } ...on Content { swhid } } } } """ data, _ = utils.get_query_response(client, query_str % swhid) assert data["release"] == { "target": { "message": {"text": "hello"}, "swhid": str(get_revisions()[0].swhid()), }, "targetType": "revision", } + + +def test_get_release_with_unknown_swhid(client): + unknown_sha1 = "1" * 40 + query_str = """ + { + release(swhid: "swh:1:rel:%s") { + swhid + } + } + """ + utils.assert_missing_object(client, query_str % unknown_sha1, "release") diff --git a/swh/graphql/tests/functional/test_revision.py b/swh/graphql/tests/functional/test_revision.py index ce8b7c4..8e9eb0b 100644 --- a/swh/graphql/tests/functional/test_revision.py +++ b/swh/graphql/tests/functional/test_revision.py @@ -1,160 +1,172 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.model.swhids import CoreSWHID from . import utils from ..data import get_revisions, get_revisions_with_parents @pytest.mark.parametrize("revision", get_revisions()) def test_get_revision(client, revision): query_str = """ { revision(swhid: "%s") { swhid message { text } author { fullname { text } name { text } email { text } } committer { fullname { text } name { text } email { text } } date type directory { swhid } } } """ data, _ = utils.get_query_response(client, query_str % revision.swhid()) assert data["revision"] == { "swhid": str(revision.swhid()), "message": {"text": revision.message.decode()}, "author": { "fullname": {"text": revision.author.fullname.decode()}, "name": {"text": revision.author.name.decode()}, "email": {"text": revision.author.email.decode()}, }, "committer": { "fullname": {"text": revision.committer.fullname.decode()}, "name": {"text": revision.committer.name.decode()}, "email": {"text": revision.committer.email.decode()}, }, "date": revision.date.to_datetime().isoformat(), "type": revision.type.value, "directory": { "swhid": str(CoreSWHID(object_id=revision.directory, object_type="dir")) }, } def test_get_revision_with_invalid_swhid(client): query_str = """ { revision(swhid: "swh:1:cnt:invalid") { swhid } } """ errors = utils.get_error_response(client, query_str) # API will throw an error in case of an invalid SWHID assert len(errors) == 1 assert "Input error: Invalid SWHID" in errors[0]["message"] def test_get_revision_as_target(client): # SWHID of a snapshot with revision as target snapshot_swhid = "swh:1:snp:9e78d7105c5e0f886487511e2a92377b4ee4c32a" query_str = """ { snapshot(swhid: "%s") { branches(first: 1, types: [revision]) { nodes { targetType target { ...on Revision { swhid } } } } } } """ data, _ = utils.get_query_response(client, query_str % snapshot_swhid) revision_obj = data["snapshot"]["branches"]["nodes"][0]["target"] assert revision_obj == { "swhid": "swh:1:rev:66c7c1cd9673275037140f2abff7b7b11fc9439c" } def test_get_revision_log(client): revision_swhid = get_revisions_with_parents()[0].swhid() query_str = """ { revision(swhid: "%s") { swhid revisionLog(first: 3) { nodes { swhid } } } } """ data, _ = utils.get_query_response(client, query_str % revision_swhid) assert data["revision"]["revisionLog"] == { "nodes": [ {"swhid": str(revision_swhid)}, {"swhid": str(get_revisions()[0].swhid())}, {"swhid": str(get_revisions()[1].swhid())}, ] } def test_get_revision_parents(client): revision_swhid = get_revisions_with_parents()[0].swhid() query_str = """ { revision(swhid: "%s") { swhid parents { nodes { swhid } } } } """ data, _ = utils.get_query_response(client, query_str % revision_swhid) assert data["revision"]["parents"] == { "nodes": [ {"swhid": str(get_revisions()[0].swhid())}, {"swhid": str(get_revisions()[1].swhid())}, ] } + + +def test_get_revision_with_unknown_swhid(client): + unknown_sha1 = "1" * 40 + query_str = """ + { + revision(swhid: "swh:1:rev:%s") { + swhid + } + } + """ + utils.assert_missing_object(client, query_str % unknown_sha1, "revision") diff --git a/swh/graphql/tests/functional/utils.py b/swh/graphql/tests/functional/utils.py index 340500c..9fba30b 100644 --- a/swh/graphql/tests/functional/utils.py +++ b/swh/graphql/tests/functional/utils.py @@ -1,36 +1,37 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Dict, Tuple def get_response(client, query_str: str): return client.post("/", json={"query": query_str}) def get_query_response(client, query_str: str) -> Tuple[Dict, Dict]: response = get_response(client, query_str) assert response.status_code == 200, response.data result = json.loads(response.data) return result.get("data"), result.get("errors") def assert_missing_object(client, query_str: str, obj_type: str) -> None: data, errors = get_query_response(client, query_str) assert data[obj_type] is None assert len(errors) == 1 assert errors[0]["message"] == "Object error: Requested object is not available" + assert errors[0]["path"] == [obj_type] def get_error_response(client, query_str: str, error_code: int = 400) -> Dict: response = get_response(client, query_str) assert response.status_code == error_code return json.loads(response.data)["errors"] def get_query_params_from_args(**args) -> str: # build a GraphQL query parameters string from arguments return ",".join([f"{key}: {val}" for (key, val) in args.items()])