diff --git a/swh/graphql/tests/functional/test_branch_connection.py b/swh/graphql/tests/functional/test_branch_connection.py index ee35bf0..7a0c3a4 100644 --- a/swh/graphql/tests/functional/test_branch_connection.py +++ b/swh/graphql/tests/functional/test_branch_connection.py @@ -1,155 +1,152 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from . import utils -def get_branches(client, swhid: str, first: int, **args) -> tuple: - args["first"] = first - params = utils.get_query_params_from_args(**args) +def get_branches(client, **kwargs) -> tuple: query_str = """ - { - snapshot(swhid: "%s") { - branches(%s) { + query getSnapshot($swhid: SWHID!, $first: Int!, $after: String, $types: [BranchTargetType], + $nameInclude: String, $excludePrefix: String) { + snapshot(swhid: $swhid) { + branches(first: $first, after: $after, types: $types, nameInclude: $nameInclude, + nameExcludePrefix: $excludePrefix ) { pageInfo { endCursor } edges { cursor } nodes { targetType name { text } target { __typename ...on Branch { name { text } } ...on Revision { swhid } ...on Release { swhid } ...on Content { swhid } ...on Directory { swhid } ...on Snapshot { swhid } } } } } } - """ % ( - swhid, - params, - ) - return utils.get_query_response(client, query_str) + """ + return utils.get_query_response(client, query_str, **kwargs) def test_get_data(client): swhid = "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" - data, errors = get_branches(client, swhid, 10, types="[revision]") + data, errors = get_branches(client, swhid=swhid, first=10, types=["revision"]) assert len(data["snapshot"]["branches"]["nodes"]) == 1 # filter 'type' will return a single revision object and is used to assert data node = data["snapshot"]["branches"]["nodes"][0] assert node == { "name": {"text": "target/revision"}, "target": { "__typename": "Revision", "swhid": "swh:1:rev:66c7c1cd9673275037140f2abff7b7b11fc9439c", }, "targetType": "revision", } def test_get_branches_with_alias(client): swhid = "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" - data, _ = get_branches(client, swhid, 10, types="[alias]") + data, _ = get_branches(client, swhid=swhid, first=10, types=["alias"]) node = data["snapshot"]["branches"]["nodes"][0] assert node == { "name": {"text": "target/alias"}, "target": {"__typename": "Branch", "name": {"text": "target/revision"}}, "targetType": "alias", } @pytest.mark.parametrize( "filter_type, count, target_type, swhid_pattern", [ ("revision", 1, "Revision", "swh:1:rev"), ("release", 1, "Release", "swh:1:rel"), ("directory", 1, "Directory", "swh:1:dir"), ("content", 0, "Content", "swh:1:cnt"), ("snapshot", 1, "Snapshot", "swh:1:snp"), ], ) def test_get_type_filter(client, filter_type, count, target_type, swhid_pattern): swhid = "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" - data, _ = get_branches(client, swhid, 10, types=f"[{filter_type}]") + data, _ = get_branches(client, swhid=swhid, first=10, types=[filter_type]) assert len(data["snapshot"]["branches"]["nodes"]) == count for node in data["snapshot"]["branches"]["nodes"]: assert node["target"]["__typename"] == target_type assert node["target"]["swhid"].startswith(swhid_pattern) @pytest.mark.parametrize( "filter_types, count", [ - ("revision, release", 2), - ("revision, snapshot, release", 3), + (["revision", "release"], 2), + (["revision", "snapshot", "release"], 3), ], ) def test_get_type_filter_multiple(client, filter_types, count): swhid = "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" - data, _ = get_branches(client, swhid, 10, types=f"[{filter_types}]") + data, _ = get_branches(client, swhid=swhid, first=10, types=filter_types) assert len(data["snapshot"]["branches"]["nodes"]) == count @pytest.mark.parametrize("name", ["rel", "rev", "non-exist"]) def test_get_name_include_filter(client, name): swhid = "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" - data, _ = get_branches(client, swhid, 10, nameInclude=f'"{name}"') + data, _ = get_branches(client, swhid=swhid, first=10, nameInclude=name) for node in data["snapshot"]["branches"]["nodes"]: assert name in node["name"]["text"] @pytest.mark.parametrize("name", ["target", "target/dir"]) def test_get_name_exclude_prefix_filter(client, name): swhid = "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" - data, _ = get_branches(client, swhid, 10, nameExcludePrefix=f'"{name}"') + data, _ = get_branches(client, swhid=swhid, first=10, excludePrefix=name) for node in data["snapshot"]["branches"]["nodes"]: assert not node["name"]["text"].startswith(name) @pytest.mark.parametrize("count", [1, 2]) def test_get_first_arg(client, count): swhid = "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" - data, _ = get_branches(client, swhid, first=count) + data, _ = get_branches(client, swhid=swhid, first=count) assert len(data["snapshot"]["branches"]["nodes"]) == count def test_get_after_arg(client): swhid = "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" - first_data, _ = get_branches(client, swhid, first=1) + first_data, _ = get_branches(client, swhid=swhid, first=1) end_cursor = first_data["snapshot"]["branches"]["pageInfo"]["endCursor"] node_name = first_data["snapshot"]["branches"]["nodes"][0]["name"]["text"] - second_data, _ = get_branches(client, swhid, first=3, after=f'"{end_cursor}"') + second_data, _ = get_branches(client, swhid=swhid, first=3, after=end_cursor) branches = second_data["snapshot"]["branches"] assert len(branches["nodes"]) == 3 assert branches["edges"][0]["cursor"] == end_cursor for node in branches["nodes"]: assert node["name"]["text"] > node_name diff --git a/swh/graphql/tests/functional/test_content.py b/swh/graphql/tests/functional/test_content.py index 2c5c703..fae6d77 100644 --- a/swh/graphql/tests/functional/test_content.py +++ b/swh/graphql/tests/functional/test_content.py @@ -1,175 +1,183 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from . import utils from ..data import get_contents @pytest.mark.parametrize("content", get_contents()) def test_get_content_with_swhid(client, content): query_str = """ - { - content(swhid: "%s") { + query getContent($swhid: SWHID!) { + content(swhid: $swhid) { swhid checksum { blake2s256 sha1 sha1_git sha256 } length status data { url } fileType { encoding } language { lang } license { licenses } } } """ - data, _ = utils.get_query_response(client, query_str % content.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(content.swhid())) archive_url = "https://archive.softwareheritage.org/api/1/" response = { "swhid": str(content.swhid()), "checksum": { "blake2s256": content.blake2s256.hex(), "sha1": content.sha1.hex(), "sha1_git": content.sha1_git.hex(), "sha256": content.sha256.hex(), }, "length": content.length, "status": content.status, "data": { "url": f"{archive_url}content/sha1:{content.sha1.hex()}/raw/", }, "fileType": None, "language": None, "license": None, } assert data["content"] == response @pytest.mark.parametrize("content", get_contents()) def test_get_content_with_hash(client, content): query_str = """ - { - contentByHash(checksums: ["blake2s256:%s", "sha1:%s", "sha1_git:%s", "sha256:%s"]) { + query getContent($checksums: [ContentHash]!) { + contentByHash(checksums: $checksums) { swhid } } """ data, _ = utils.get_query_response( client, - query_str - % ( - content.blake2s256.hex(), - content.sha1.hex(), - content.sha1_git.hex(), - content.sha256.hex(), - ), + query_str, + checksums=[ + f"blake2s256:{content.blake2s256.hex()}", + f"sha1:{content.sha1.hex()}", + f"sha1_git:{content.sha1_git.hex()}", + f"sha256:{content.sha256.hex()}", + ], ) assert data["contentByHash"] == {"swhid": str(content.swhid())} def test_get_content_with_invalid_swhid(client): query_str = """ - { - content(swhid: "swh:1:cnt:invalid") { + query getContent($swhid: SWHID!) { + content(swhid: $swhid) { swhid } } """ - errors = utils.get_error_response(client, query_str) + errors = utils.get_error_response(client, query_str, swhid="invalid") # API will throw an error in case of an invalid SWHID assert len(errors) == 1 assert "Input error: Invalid SWHID" in errors[0]["message"] def test_get_content_with_invalid_hashes(client): content = get_contents()[0] query_str = """ - { - contentByHash(checksums: ["blake2s256:%s", "sha1:%s", "sha1_git:%s", "sha256:%s"]) { + query getContent($checksums: [ContentHash]!) { + contentByHash(checksums: $checksums) { swhid } } """ errors = utils.get_error_response( client, - query_str - % ( + query_str, + checksums=[ "invalid", # Only one hash is invalid - content.sha1.hex(), - content.sha1_git.hex(), - content.sha256.hex(), - ), + f"sha1:{content.sha1.hex()}", + f"sha1_git:{content.sha1_git.hex()}", + f"sha256:{content.sha256.hex()}", + ], ) # API will throw an error in case of an invalid content hash assert len(errors) == 1 assert "Input error: Invalid content checksum" in errors[0]["message"] def test_get_content_with_invalid_hash_algorithm(client): content = get_contents()[0] query_str = """ - { - contentByHash(checksums: ["test:%s"]) { + query getContent($checksums: [ContentHash]!) { + contentByHash(checksums: $checksums) { swhid } } """ - errors = utils.get_error_response(client, query_str % content.sha1.hex()) + data, errors = utils.get_query_response( + client, query_str, checksums=[f"test:{content.sha1.hex()}"] + ) + assert data is None assert len(errors) == 1 assert "Input error: Invalid hash algorithm" in errors[0]["message"] def test_get_content_as_target(client): # SWHID of a test dir with a file entry directory_swhid = "swh:1:dir:87b339104f7dc2a8163dec988445e3987995545f" query_str = """ - { - directory(swhid: "%s") { + query getDirectory($swhid: SWHID!) { + directory(swhid: $swhid) { swhid entries(first: 2) { nodes { targetType target { ...on Content { swhid length } } } } } } """ - data, _ = utils.get_query_response(client, query_str % directory_swhid) + data, _ = utils.get_query_response(client, query_str, swhid=directory_swhid) content_obj = data["directory"]["entries"]["nodes"][1]["target"] assert content_obj == { "length": 4, "swhid": "swh:1:cnt:86bc6b377e9d25f9d26777a4a28d08e63e7c5779", } def test_get_content_with_unknown_swhid(client): unknown_sha1 = "1" * 40 query_str = """ - { - content(swhid: "swh:1:cnt:%s") { + query getDirectory($swhid: SWHID!) { + content(swhid: $swhid) { swhid } } """ - utils.assert_missing_object(client, query_str % unknown_sha1, "content") + utils.assert_missing_object( + client, + query_str, + obj_type="content", + swhid=f"swh:1:cnt:{unknown_sha1}", + ) diff --git a/swh/graphql/tests/functional/test_directory.py b/swh/graphql/tests/functional/test_directory.py index 388e4fe..ef36358 100644 --- a/swh/graphql/tests/functional/test_directory.py +++ b/swh/graphql/tests/functional/test_directory.py @@ -1,86 +1,99 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from . import utils from ..data import get_directories @pytest.mark.parametrize("directory", get_directories()) def test_get_directory(client, directory): query_str = """ - { - directory(swhid: "%s") { + query getDirectory($swhid: SWHID!) { + directory(swhid: $swhid) { swhid } } """ - data, _ = utils.get_query_response(client, query_str % directory.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(directory.swhid())) assert data["directory"] == {"swhid": str(directory.swhid())} def test_get_directory_with_invalid_swhid(client): query_str = """ - { - directory(swhid: "swh:1:dir:invalid") { + query getDirectory($swhid: SWHID!) { + directory(swhid: $swhid) { swhid } } """ - errors = utils.get_error_response(client, query_str) + errors = utils.get_error_response(client, query_str, swhid="swh:1:dir:invalid") # API will throw an error in case of an invalid SWHID assert len(errors) == 1 assert "Input error: Invalid SWHID" in errors[0]["message"] def test_get_revision_directory(client): query_str = """ - { - revision(swhid: "swh:1:rev:66c7c1cd9673275037140f2abff7b7b11fc9439c") { + query getRevision($swhid: SWHID!) { + revision(swhid: $swhid) { swhid directory { swhid } } } """ - data, _ = utils.get_query_response(client, query_str) + data, _ = utils.get_query_response( + client, + query_str, + swhid="swh:1:rev:66c7c1cd9673275037140f2abff7b7b11fc9439c", + ) assert data["revision"]["directory"] == { "swhid": "swh:1:dir:0101010101010101010101010101010101010101" } def test_get_target_directory(client): # TargetDirectoryNode is returned from snapshotbranch, release # and directory entry nodes. Release node is used for testing here query_str = """ - { - release(swhid: "swh:1:rel:ee4d20e80af850cc0f417d25dc5073792c5010d2") { + query getRelease($swhid: SWHID!) { + release(swhid: $swhid) { swhid target { ...on Directory { swhid } } } } """ - data, _ = utils.get_query_response(client, query_str) + data, _ = utils.get_query_response( + client, + query_str, + swhid="swh:1:rel:ee4d20e80af850cc0f417d25dc5073792c5010d2", + ) assert data["release"]["target"] == { "swhid": "swh:1:dir:0505050505050505050505050505050505050505" } def test_get_directory_with_unknown_swhid(client): unknown_sha1 = "1" * 40 query_str = """ - { - directory(swhid: "swh:1:dir:%s") { + query getDirectory($swhid: SWHID!) { + directory(swhid: $swhid) { swhid } } """ - utils.assert_missing_object(client, query_str % unknown_sha1, "directory") + utils.assert_missing_object( + client, + query_str, + obj_type="directory", + swhid=f"swh:1:dir:{unknown_sha1}", + ) diff --git a/swh/graphql/tests/functional/test_directory_entry.py b/swh/graphql/tests/functional/test_directory_entry.py index 650282c..02235c1 100644 --- a/swh/graphql/tests/functional/test_directory_entry.py +++ b/swh/graphql/tests/functional/test_directory_entry.py @@ -1,153 +1,156 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.graphql import server from swh.model.swhids import CoreSWHID, ObjectType from . import utils from ..data import get_directories, get_directories_with_nested_path def get_target_type(target_type): mapping = {"file": "content", "dir": "directory", "rev": "revision"} return mapping.get(target_type) def test_get_directory_entry_missing_path(client): directory = get_directories()[0] path = "missing" query_str = """ - { - directoryEntry(directorySwhid: "%s", path: "%s") { + query getDirEntry($swhid: SWHID!, $path: String!) { + directoryEntry(directorySwhid: $swhid, path: $path) { name { text } targetType target { ...on Content { swhid } } } } - """ % ( - directory.swhid(), - path, + """ + utils.assert_missing_object( + client, + query_str, + "directoryEntry", + swhid=str(directory.swhid()), + path=path, ) - utils.assert_missing_object(client, query_str, "directoryEntry") @pytest.mark.parametrize( "directory", get_directories() + get_directories_with_nested_path() ) def test_get_directory_entry(client, directory): storage = server.get_storage() query_str = """ - { - directoryEntry(directorySwhid: "%s", path: "%s") { + query getDirEntry($swhid: SWHID!, $path: String!) { + directoryEntry(directorySwhid: $swhid, path: $path) { name { text } targetType target { ...on Content { swhid } ...on Directory { swhid } ...on Revision { swhid } } } } """ for entry in storage.directory_ls(directory.id, recursive=True): - query = query_str % ( - directory.swhid(), - entry["name"].decode(), - ) data, _ = utils.get_query_response( client, - query, + query_str, + swhid=str(directory.swhid()), + path=entry["name"].decode(), ) swhid = None if entry["type"] == "file" and entry["sha1_git"] is not None: swhid = CoreSWHID( object_type=ObjectType.CONTENT, object_id=entry["sha1_git"] ) elif entry["type"] == "dir" and entry["target"] is not None: swhid = CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=entry["target"] ) elif entry["type"] == "rev" and entry["target"] is not None: swhid = CoreSWHID( object_type=ObjectType.REVISION, object_id=entry["target"] ) assert data["directoryEntry"] == { "name": {"text": entry["name"].decode()}, "target": {"swhid": str(swhid)} if swhid else None, "targetType": get_target_type(entry["type"]), } @pytest.mark.parametrize("directory", get_directories()) def test_get_directory_entry_connection(client, directory): query_str = """ - { - directory(swhid: "%s") { + query getDirectory($swhid: SWHID!) { + directory(swhid: $swhid) { swhid entries { nodes { targetType name { text } } } } } """ - data, _ = utils.get_query_response(client, query_str % directory.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(directory.swhid())) directory_entries = data["directory"]["entries"]["nodes"] assert len(directory_entries) == len(directory.entries) output = [ {"name": {"text": de.name.decode()}, "targetType": get_target_type(de.type)} for de in directory.entries ] for each_entry in output: assert each_entry in directory_entries @pytest.mark.parametrize("directory", get_directories()) def test_directory_entry_connection_filter_by_name(client, directory): storage = server.get_storage() for dir_entry in storage.directory_ls(directory.id): name_include = dir_entry["name"][:-1].decode() query_str = """ - { - directory(swhid: "%s") { + query getDirectory($swhid: SWHID!, $nameInclude: String) { + directory(swhid: $swhid) { swhid - entries(nameInclude: "%s") { + entries(nameInclude: $nameInclude) { nodes { targetType name { text } } } } } - """ % ( - directory.swhid(), - name_include, + """ + data, _ = utils.get_query_response( + client, + query_str, + swhid=str(directory.swhid()), + nameInclude=name_include, ) - data, _ = utils.get_query_response(client, query_str) for entry in data["directory"]["entries"]["nodes"]: assert name_include in entry["name"]["text"] assert entry["targetType"] == get_target_type(dir_entry["type"]) diff --git a/swh/graphql/tests/functional/test_origin_connection.py b/swh/graphql/tests/functional/test_origin_connection.py index 1af79fc..267a01d 100644 --- a/swh/graphql/tests/functional/test_origin_connection.py +++ b/swh/graphql/tests/functional/test_origin_connection.py @@ -1,49 +1,53 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Optional + from . import utils from ..data import get_origins -def get_origins_from_api(client, first: int, **args) -> tuple: - args["first"] = first - params = utils.get_query_params_from_args(**args) +def get_origins_from_api( + client, first: int, urlPattern: Optional[str] = None, **args +) -> tuple: query_str = """ - { - origins(%s) { + query getOrigins($first: Int!, $urlPattern: String) { + origins(first: $first, urlPattern: $urlPattern) { nodes { url } pageInfo { hasNextPage endCursor } } } - """ % ( - params, + """ + return utils.get_query_response( + client, query_str, first=first, urlPattern=urlPattern ) - return utils.get_query_response(client, query_str) def test_get(client, storage): - data, _ = get_origins_from_api(client, 10) + data, _ = get_origins_from_api(client, first=10) assert len(data["origins"]["nodes"]) == len(get_origins()) def test_get_filter_by_pattern(client): - data, _ = get_origins_from_api(client, 10, urlPattern='"somewhere.org/den"') + data, _ = get_origins_from_api(client, first=10, urlPattern='"somewhere.org/den"') assert len(data["origins"]["nodes"]) == 1 def test_get_filter_by_non_existing_pattern(client): - data, _ = get_origins_from_api(client, 10, urlPattern='"somewhere.org/den/test/"') + data, _ = get_origins_from_api( + client, first=10, urlPattern='"somewhere.org/den/test/"' + ) assert len(data["origins"]["nodes"]) == 0 def test_basic_pagination(client): data, _ = get_origins_from_api(client, first=len(get_origins())) assert len(data["origins"]["nodes"]) == len(get_origins()) assert data["origins"]["pageInfo"] == {"hasNextPage": False, "endCursor": None} diff --git a/swh/graphql/tests/functional/test_origin_node.py b/swh/graphql/tests/functional/test_origin_node.py index 792dee3..db17c09 100644 --- a/swh/graphql/tests/functional/test_origin_node.py +++ b/swh/graphql/tests/functional/test_origin_node.py @@ -1,111 +1,122 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest +from . import utils from ..data import get_origins -from .utils import assert_missing_object, get_query_response def test_invalid_get(client): query_str = """ - { + query getOrigin { origin(url: "http://example.com/non-existing") { url } } """ - assert_missing_object(client, query_str, "origin") + utils.assert_missing_object(client, query_str, "origin") @pytest.mark.parametrize("origin", get_origins()) def test_get(client, storage, origin): - query_str = ( - """ - { - origin(url: "%s") { + query_str = """ + query getOrigin($url: String!) { + origin(url: $url) { url id visits(first: 10) { nodes { id } } latestVisit { visitId } snapshots(first: 2) { nodes { id } } } } """ - % origin.url - ) - response, _ = get_query_response(client, query_str) + response, _ = utils.get_query_response(client, query_str, url=origin.url) data_origin = response["origin"] storage_origin = storage.origin_get([origin.url])[0] visits_and_statuses = storage.origin_visit_get_with_statuses(origin.url).results assert data_origin["url"] == storage_origin.url assert data_origin["id"] == storage_origin.id.hex() assert len(data_origin["visits"]["nodes"]) == len(visits_and_statuses) assert data_origin["latestVisit"]["visitId"] == visits_and_statuses[-1].visit.visit snapshots = storage.origin_snapshot_get_all(origin.url) assert len(data_origin["snapshots"]["nodes"]) == len(snapshots) def test_latest_visit_type_filter(client): query_str = """ - { - origin(url: "%s") { - latestVisit(visitType: "%s") { + query getOrigin($url: String!, $visitType: String!) { + origin(url: $url) { + latestVisit(visitType: $visitType) { visitId } } } """ - data, _ = get_query_response(client, query_str % (get_origins()[0].url, "git")) + data, _ = utils.get_query_response( + client, query_str, url=get_origins()[0].url, visitType="git" + ) assert data["origin"] == {"latestVisit": {"visitId": 3}} - data, _ = get_query_response(client, query_str % (get_origins()[0].url, "hg")) + data, _ = utils.get_query_response( + client, query_str, url=get_origins()[0].url, visitType="hg" + ) assert data["origin"] == {"latestVisit": None} def test_latest_visit_require_snapshot_filter(client): query_str = """ - { - origin(url: "%s") { - latestVisit(requireSnapshot: %s) { + query getOrigin($url: String!, $requireSnapshot: Boolean!) { + origin(url: $url) { + latestVisit(requireSnapshot: $requireSnapshot) { visitId } } } """ - data, _ = get_query_response(client, query_str % (get_origins()[1].url, "true")) + data, _ = utils.get_query_response( + client, + query_str, + url=get_origins()[1].url, + requireSnapshot=True, + ) assert data["origin"] == {"latestVisit": {"visitId": 2}} def test_latest_visit_allowed_statuses_filter(client): query_str = """ - { - origin(url: "%s") { - latestVisit(allowedStatuses: [partial]) { + query getOrigin($url: String!, $allowedStatuses: [VisitStatusState!]!) { + origin(url: $url) { + latestVisit(allowedStatuses: $allowedStatuses) { visitId statuses { nodes { status } } } } } """ - data, _ = get_query_response(client, query_str % (get_origins()[1].url)) + data, _ = utils.get_query_response( + client, + query_str, + url=get_origins()[1].url, + allowedStatuses=["partial"], + ) assert data["origin"] == { "latestVisit": {"statuses": {"nodes": [{"status": "partial"}]}, "visitId": 2} } diff --git a/swh/graphql/tests/functional/test_pagination.py b/swh/graphql/tests/functional/test_pagination.py index 2370d49..aaf4ac3 100644 --- a/swh/graphql/tests/functional/test_pagination.py +++ b/swh/graphql/tests/functional/test_pagination.py @@ -1,109 +1,103 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from . import utils from ..data import get_origins -from .utils import get_query_response # Using Origin object to run functional tests for pagination -def get_origin_nodes(client, first=1, after=""): +def get_origin_nodes(client, first, after=""): query_str = """ - { - origins(first: %s, %s) { + query getOrigins($first: Int!, $after: String) { + origins(first: $first, after: $after) { nodes { id } pageInfo { hasNextPage endCursor } } } - """ % ( - first, - after, - ) - return get_query_response(client, query_str) + """ + return utils.get_query_response(client, query_str, first=first, after=after) def test_pagination(client): # requesting the max number of nodes available # endCursor must be None - data, _ = get_origin_nodes(client, len(get_origins())) + data, _ = get_origin_nodes(client, first=len(get_origins())) assert len(data["origins"]["nodes"]) == len(get_origins()) assert data["origins"]["pageInfo"] == {"hasNextPage": False, "endCursor": None} def test_first_arg(client): - data, _ = get_origin_nodes(client, 1) + data, _ = get_origin_nodes(client, first=1) assert len(data["origins"]["nodes"]) == 1 assert data["origins"]["pageInfo"]["hasNextPage"] is True def test_invalid_first_arg(client): - data, errors = get_origin_nodes(client, -1) + data, errors = get_origin_nodes(client, first=-1) assert data["origins"] is None assert (len(errors)) == 2 # one error for origins and anotehr one for pageInfo assert ( errors[0]["message"] == "Pagination error: Value for argument 'first' is invalid; it must be between 0 and 1000" # noqa: B950 ) def test_too_big_first_arg(client): - data, errors = get_origin_nodes(client, 1001) # max page size is 1000 + data, errors = get_origin_nodes(client, first=1001) # max page size is 1000 assert data["origins"] is None assert (len(errors)) == 2 assert ( errors[0]["message"] == "Pagination error: Value for argument 'first' is invalid; it must be between 0 and 1000" # noqa: B950 ) def test_after_arg(client): - first_data, _ = get_origin_nodes(client) + first_data, _ = get_origin_nodes(client, first=1) end_cursor = first_data["origins"]["pageInfo"]["endCursor"] # get again with endcursor as the after argument - data, _ = get_origin_nodes(client, 1, f'after: "{end_cursor}"') + data, _ = get_origin_nodes(client, first=1, after=end_cursor) assert len(data["origins"]["nodes"]) == 1 assert data["origins"]["pageInfo"] == {"hasNextPage": False, "endCursor": None} def test_invalid_after_arg(client): - data, errors = get_origin_nodes(client, 1, 'after: "invalid"') + data, errors = get_origin_nodes(client, first=1, after="invalid") assert data["origins"] is None assert (len(errors)) == 2 assert ( errors[0]["message"] == "Pagination error: Invalid value for argument 'after'" ) def test_edge_cursor(client): - origins = get_origin_nodes(client)[0]["origins"] + origins = get_origin_nodes(client, first=1)[0]["origins"] # end cursor here must be the item cursor for the second item end_cursor = origins["pageInfo"]["endCursor"] - query_str = ( - """ - { - origins(first: 1, after: "%s") { + query_str = """ + query getOrigins($first: Int!, $after: String) { + origins(first: $first, after: $after) { edges { cursor node { id } } nodes { id } } } """ - % end_cursor - ) - data, _ = get_query_response(client, query_str) + data, _ = utils.get_query_response(client, query_str, first=1, after=end_cursor) origins = data["origins"] assert [edge["node"] for edge in origins["edges"]] == origins["nodes"] assert origins["edges"][0]["cursor"] == end_cursor diff --git a/swh/graphql/tests/functional/test_release_node.py b/swh/graphql/tests/functional/test_release_node.py index 98f5b38..36532ee 100644 --- a/swh/graphql/tests/functional/test_release_node.py +++ b/swh/graphql/tests/functional/test_release_node.py @@ -1,177 +1,182 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import pytest from swh.model.model import ObjectType from . import utils from ..data import ( get_contents, get_directories, get_releases, get_releases_with_target, get_revisions, ) @pytest.mark.parametrize("release", get_releases()) def test_get_release(client, release): - query_str = ( - """ - { - release(swhid: "%s") { + query_str = """ + query getRelease($swhid: SWHID!) { + release(swhid: $swhid) { swhid name { text base64 } message { text } author { email { text } name { text } fullname { text } } date targetType } } """ - % release.swhid() - ) - data, _ = utils.get_query_response(client, query_str) + data, _ = utils.get_query_response(client, query_str, swhid=str(release.swhid())) assert data["release"] == { "swhid": str(release.swhid()), "name": { "text": release.name.decode(), "base64": base64.b64encode(release.name).decode("ascii"), }, "message": {"text": release.message.decode()}, "author": { "email": {"text": release.author.email.decode()}, "name": {"text": release.author.name.decode()}, "fullname": {"text": release.author.fullname.decode()}, } if release.author else None, "date": release.date.to_datetime().isoformat() if release.date else None, "targetType": release.target_type.value, } def test_get_release_with_invalid_swhid(client): query_str = """ - { - release(swhid: "swh:1:rel:invalid") { + query getRelease($swhid: SWHID!) { + release(swhid: $swhid) { swhid } } """ - errors = utils.get_error_response(client, query_str) + errors = utils.get_error_response(client, query_str, swhid="swh:1:rel:invalid") # API will throw an error in case of an invalid SWHID assert len(errors) == 1 + assert "Expected type 'SWHID'. Input error: Invalid SWHID" in errors[0]["message"] @pytest.mark.parametrize("release_with_target", get_releases_with_target()) def test_get_release_targets(client, release_with_target): query_str = """ - { - release(swhid: "%s") { + query getRelease($swhid: SWHID!) { + release(swhid: $swhid) { targetType target { ...on Revision { swhid } ...on Release { swhid } ...on Directory { swhid } ...on Content { swhid } } } } """ - data, _ = utils.get_query_response(client, query_str % release_with_target.swhid()) + data, _ = utils.get_query_response( + client, query_str, swhid=str(release_with_target.swhid()) + ) if release_with_target.target_type == ObjectType.REVISION: target_swhid = get_revisions()[0].swhid() elif release_with_target.target_type == ObjectType.RELEASE: target_swhid = get_releases()[0].swhid() elif release_with_target.target_type == ObjectType.DIRECTORY: target_swhid = get_directories()[0].swhid() elif release_with_target.target_type == ObjectType.CONTENT: target_swhid = get_contents()[0].swhid() assert data["release"] == { "targetType": release_with_target.target_type.value, "target": {"swhid": str(target_swhid)}, } def test_get_release_target_unknown(client): # Clinet can request all the possible options if the target type # is unknown. The data under the right type will be returned # The target is of type Revision in this case # ie: both swhid and message will be available in the response swhid = get_releases_with_target()[0].swhid() query_str = """ - { - release(swhid: "%s") { + query getRelease($swhid: SWHID!) { + release(swhid: $swhid) { targetType target { ...on Revision { swhid message { text } } ...on Release { swhid } ...on Directory { swhid } ...on Content { swhid } } } } """ - data, _ = utils.get_query_response(client, query_str % swhid) + data, _ = utils.get_query_response(client, query_str, swhid=str(swhid)) assert data["release"] == { "target": { "message": {"text": "hello"}, "swhid": str(get_revisions()[0].swhid()), }, "targetType": "revision", } def test_get_release_with_unknown_swhid(client): unknown_sha1 = "1" * 40 query_str = """ - { - release(swhid: "swh:1:rel:%s") { + query getRelease($swhid: SWHID!) { + release(swhid: $swhid) { swhid } } """ - utils.assert_missing_object(client, query_str % unknown_sha1, "release") + utils.assert_missing_object( + client, + query_str, + obj_type="release", + swhid=f"swh:1:rel:{unknown_sha1}", + ) diff --git a/swh/graphql/tests/functional/test_revision.py b/swh/graphql/tests/functional/test_revision.py index 8e9eb0b..b8ce5c5 100644 --- a/swh/graphql/tests/functional/test_revision.py +++ b/swh/graphql/tests/functional/test_revision.py @@ -1,172 +1,177 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.model.swhids import CoreSWHID from . import utils from ..data import get_revisions, get_revisions_with_parents @pytest.mark.parametrize("revision", get_revisions()) def test_get_revision(client, revision): query_str = """ - { - revision(swhid: "%s") { + query getRevision($swhid: SWHID!) { + revision(swhid: $swhid) { swhid message { text } author { fullname { text } name { text } email { text } } committer { fullname { text } name { text } email { text } } date type directory { swhid } } } """ - data, _ = utils.get_query_response(client, query_str % revision.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(revision.swhid())) assert data["revision"] == { "swhid": str(revision.swhid()), "message": {"text": revision.message.decode()}, "author": { "fullname": {"text": revision.author.fullname.decode()}, "name": {"text": revision.author.name.decode()}, "email": {"text": revision.author.email.decode()}, }, "committer": { "fullname": {"text": revision.committer.fullname.decode()}, "name": {"text": revision.committer.name.decode()}, "email": {"text": revision.committer.email.decode()}, }, "date": revision.date.to_datetime().isoformat(), "type": revision.type.value, "directory": { "swhid": str(CoreSWHID(object_id=revision.directory, object_type="dir")) }, } def test_get_revision_with_invalid_swhid(client): query_str = """ - { - revision(swhid: "swh:1:cnt:invalid") { + query getRevision($swhid: SWHID!) { + revision(swhid: $swhid) { swhid } } """ - errors = utils.get_error_response(client, query_str) + errors = utils.get_error_response(client, query_str, swhid="swh:1:cnt:invalid") # API will throw an error in case of an invalid SWHID assert len(errors) == 1 assert "Input error: Invalid SWHID" in errors[0]["message"] def test_get_revision_as_target(client): # SWHID of a snapshot with revision as target snapshot_swhid = "swh:1:snp:9e78d7105c5e0f886487511e2a92377b4ee4c32a" query_str = """ - { - snapshot(swhid: "%s") { + query getSnapshot($swhid: SWHID!) { + snapshot(swhid: $swhid) { branches(first: 1, types: [revision]) { nodes { targetType target { ...on Revision { swhid } } } } } } """ - data, _ = utils.get_query_response(client, query_str % snapshot_swhid) + data, _ = utils.get_query_response(client, query_str, swhid=snapshot_swhid) revision_obj = data["snapshot"]["branches"]["nodes"][0]["target"] assert revision_obj == { "swhid": "swh:1:rev:66c7c1cd9673275037140f2abff7b7b11fc9439c" } def test_get_revision_log(client): revision_swhid = get_revisions_with_parents()[0].swhid() query_str = """ - { - revision(swhid: "%s") { + query getRevision($swhid: SWHID!) { + revision(swhid: $swhid) { swhid revisionLog(first: 3) { nodes { swhid } } } } """ - data, _ = utils.get_query_response(client, query_str % revision_swhid) + data, _ = utils.get_query_response(client, query_str, swhid=str(revision_swhid)) assert data["revision"]["revisionLog"] == { "nodes": [ {"swhid": str(revision_swhid)}, {"swhid": str(get_revisions()[0].swhid())}, {"swhid": str(get_revisions()[1].swhid())}, ] } def test_get_revision_parents(client): revision_swhid = get_revisions_with_parents()[0].swhid() query_str = """ - { - revision(swhid: "%s") { + query getRevision($swhid: SWHID!) { + revision(swhid: $swhid) { swhid parents { nodes { swhid } } } } """ - data, _ = utils.get_query_response(client, query_str % revision_swhid) + data, _ = utils.get_query_response(client, query_str, swhid=str(revision_swhid)) assert data["revision"]["parents"] == { "nodes": [ {"swhid": str(get_revisions()[0].swhid())}, {"swhid": str(get_revisions()[1].swhid())}, ] } def test_get_revision_with_unknown_swhid(client): unknown_sha1 = "1" * 40 query_str = """ - { - revision(swhid: "swh:1:rev:%s") { + query getRevision($swhid: SWHID!) { + revision(swhid: $swhid) { swhid } } """ - utils.assert_missing_object(client, query_str % unknown_sha1, "revision") + utils.assert_missing_object( + client, + query_str, + obj_type="revision", + swhid=f"swh:1:rev:{unknown_sha1}", + ) diff --git a/swh/graphql/tests/functional/test_search.py b/swh/graphql/tests/functional/test_search.py index 9d1596f..e119753 100644 --- a/swh/graphql/tests/functional/test_search.py +++ b/swh/graphql/tests/functional/test_search.py @@ -1,64 +1,64 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from . import utils def test_search_origins(client): query_str = """ - { - search(query: "fox", first: 1) { + query doSearch($query: String!, $first: Int!) { + search(query: $query, first: $first) { nodes { targetType target { ...on Origin { url latestVisit { date } } } } pageInfo { hasNextPage endCursor } } } """ - data, _ = utils.get_query_response(client, query_str) + data, _ = utils.get_query_response(client, query_str, query="fox", first=1) assert len(data["search"]["nodes"]) == 1 assert data == { "search": { "nodes": [ { "target": { "url": "https://somewhere.org/den/fox", "latestVisit": {"date": "2018-11-27T17:20:39+00:00"}, }, "targetType": "origin", } ], "pageInfo": {"endCursor": "MQ==", "hasNextPage": True}, } } def test_search_missing_url(client): query_str = """ - { - search(query: "missing-fox", first: 1) { + query doSearch($query: String!, $first: Int!) { + search(query: $query, first: $first) { nodes { targetType } pageInfo { hasNextPage endCursor } } } """ - data, _ = utils.get_query_response(client, query_str) + data, _ = utils.get_query_response(client, query_str, query="missing-fox", first=1) assert len(data["search"]["nodes"]) == 0 diff --git a/swh/graphql/tests/functional/test_snapshot_node.py b/swh/graphql/tests/functional/test_snapshot_node.py index f91f570..89dc944 100644 --- a/swh/graphql/tests/functional/test_snapshot_node.py +++ b/swh/graphql/tests/functional/test_snapshot_node.py @@ -1,57 +1,62 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest +from . import utils from ..data import get_snapshots -from .utils import assert_missing_object, get_error_response, get_query_response @pytest.mark.parametrize("snapshot", get_snapshots()) def test_get_snapshot(client, snapshot): query_str = """ - { - snapshot(swhid: "%s") { + query getSnapshot($swhid: SWHID!) { + snapshot(swhid: $swhid) { id swhid branches(first:5) { nodes { targetType name { text } } } } } """ - data, _ = get_query_response(client, query_str % snapshot.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(snapshot.swhid())) assert data["snapshot"]["swhid"] == str(snapshot.swhid()) assert data["snapshot"]["id"] == snapshot.id.hex() assert len(data["snapshot"]["branches"]["nodes"]) == len(snapshot.branches) def test_get_snapshot_missing_swhid(client): query_str = """ - { - snapshot(swhid: "swh:1:snp:0949d7a8c96347dba09be8d79085b8207f345412") { + query getSnapshot($swhid: SWHID!) { + snapshot(swhid: $swhid) { swhid } } """ - assert_missing_object(client, query_str, "snapshot") + utils.assert_missing_object( + client, + query_str, + obj_type="snapshot", + swhid="swh:1:snp:0949d7a8c96347dba09be8d79085b8207f345412", + ) def test_get_snapshot_invalid_swhid(client): query_str = """ - { - snapshot(swhid: "swh:1:snp:invalid") { + query getSnapshot($swhid: SWHID!) { + snapshot(swhid: $swhid) { swhid } } """ - errors = get_error_response(client, query_str) + errors = utils.get_error_response(client, query_str, swhid="swh:1:snp:invalid") assert len(errors) == 1 assert "Input error: Invalid SWHID" in errors[0]["message"] diff --git a/swh/graphql/tests/functional/test_swhid_resolve.py b/swh/graphql/tests/functional/test_swhid_resolve.py index bd63746..5a6efde 100644 --- a/swh/graphql/tests/functional/test_swhid_resolve.py +++ b/swh/graphql/tests/functional/test_swhid_resolve.py @@ -1,221 +1,221 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from . import utils from ..data import ( get_contents, get_directories, get_releases, get_revisions, get_snapshots, ) def test_invalid_swhid(client): query_str = """ - { - resolveSwhid(swhid: "swh:1:dir:dae0d245988b472abd30a4f968b919d0019b6c7") { + query resolve($swhid: SWHID!) { + resolveSwhid(swhid: $swhid) { nodes { targetType } } } """ - errors = utils.get_error_response(client, query_str) + errors = utils.get_error_response(client, query_str, swhid="swh:1:dir:invalid") # API will throw an error in case of an invalid SWHID assert len(errors) == 1 assert "Input error: Invalid SWHID" in errors[0]["message"] @pytest.mark.parametrize( "swhid", [ "swh:1:rel:0949d7a8c96347dba09be8d79085b8207f345412", "swh:1:rev:0949d7a8c96347dba09be8d79085b8207f345412", "swh:1:dir:0949d7a8c96347dba09be8d79085b8207f345412", "swh:1:cnt:0949d7a8c96347dba09be8d79085b8207f345412", "swh:1:snp:0949d7a8c96347dba09be8d79085b8207f345412", ], ) def test_missing_swhid(client, swhid): query_str = """ - { - resolveSwhid(swhid: "%s") { + query resolve($swhid: SWHID!) { + resolveSwhid(swhid: $swhid) { nodes { targetType } } } """ - data, _ = utils.get_query_response(client, query_str % swhid) + data, _ = utils.get_query_response(client, query_str, swhid=swhid) # API will return an empty list in case of a valid, non existing SWHID assert data == {"resolveSwhid": {"nodes": []}} @pytest.mark.parametrize("snapshot", get_snapshots()) def test_snapshot_swhid_resolve(client, snapshot): query_str = """ - { - resolveSwhid(swhid: "%s") { + query resolve($swhid: SWHID!) { + resolveSwhid(swhid: $swhid) { nodes { targetType target { __typename ... on Snapshot { swhid } } } } } """ - data, _ = utils.get_query_response(client, query_str % snapshot.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(snapshot.swhid())) assert data == { "resolveSwhid": { "nodes": [ { "target": { "__typename": "Snapshot", "swhid": str(snapshot.swhid()), }, "targetType": "snapshot", } ] } } @pytest.mark.parametrize("revision", get_revisions()) def test_revision_swhid_resolve(client, revision): query_str = """ - { - resolveSwhid(swhid: "%s") { + query resolve($swhid: SWHID!) { + resolveSwhid(swhid: $swhid) { nodes { targetType target { __typename ... on Revision { swhid } } } } } """ - data, _ = utils.get_query_response(client, query_str % revision.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(revision.swhid())) assert data == { "resolveSwhid": { "nodes": [ { "target": { "__typename": "Revision", "swhid": str(revision.swhid()), }, "targetType": "revision", } ] } } @pytest.mark.parametrize("release", get_releases()) def test_release_swhid_resolve(client, release): query_str = """ - { - resolveSwhid(swhid: "%s") { + query resolve($swhid: SWHID!) { + resolveSwhid(swhid: $swhid) { nodes { targetType target { __typename ... on Release { swhid } } } } } """ - data, _ = utils.get_query_response(client, query_str % release.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(release.swhid())) assert data == { "resolveSwhid": { "nodes": [ { "target": { "__typename": "Release", "swhid": str(release.swhid()), }, "targetType": "release", } ] } } @pytest.mark.parametrize("directory", get_directories()) def test_directory_swhid_resolve(client, directory): query_str = """ - { - resolveSwhid(swhid: "%s") { + query resolve($swhid: SWHID!) { + resolveSwhid(swhid: $swhid) { nodes { targetType target { __typename ... on Directory { swhid } } } } } """ - data, _ = utils.get_query_response(client, query_str % directory.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(directory.swhid())) assert data == { "resolveSwhid": { "nodes": [ { "target": { "__typename": "Directory", "swhid": str(directory.swhid()), }, "targetType": "directory", } ] } } @pytest.mark.parametrize("content", get_contents()) def test_content_swhid_resolve(client, content): query_str = """ - { - resolveSwhid(swhid: "%s") { + query resolve($swhid: SWHID!) { + resolveSwhid(swhid: $swhid) { nodes { targetType target { __typename ... on Content { swhid } } } } } """ - data, _ = utils.get_query_response(client, query_str % content.swhid()) + data, _ = utils.get_query_response(client, query_str, swhid=str(content.swhid())) assert data == { "resolveSwhid": { "nodes": [ { "target": { "__typename": "Content", "swhid": str(content.swhid()), }, "targetType": "content", } ] } } diff --git a/swh/graphql/tests/functional/test_visit_node.py b/swh/graphql/tests/functional/test_visit_node.py index d7a0a57..bca6733 100644 --- a/swh/graphql/tests/functional/test_visit_node.py +++ b/swh/graphql/tests/functional/test_visit_node.py @@ -1,164 +1,163 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest +from . import utils from ..data import get_origins -from .utils import assert_missing_object, get_query_response @pytest.mark.parametrize("origin", get_origins()) def test_get_visit(client, storage, origin): query_str = """ - { - visit(originUrl: "%s", visitId: %s) { + query getVisit($origin: String!, $visitId: Int!) { + visit(originUrl: $origin, visitId: $visitId) { visitId date type latestStatus { status date type snapshot { swhid } } statuses { nodes { status } } } } """ visits_and_statuses = storage.origin_visit_get_with_statuses(origin.url).results for vws in visits_and_statuses: visit = vws.visit statuses = vws.statuses - data, _ = get_query_response(client, query_str % (origin.url, visit.visit)) + data, _ = utils.get_query_response( + client, query_str, origin=origin.url, visitId=visit.visit + ) assert data["visit"] == { "visitId": visit.visit, "type": visit.type, "date": visit.date.isoformat(), "latestStatus": { "date": statuses[-1].date.isoformat(), "type": statuses[-1].type, "status": statuses[-1].status, "snapshot": ({"swhid": f"swh:1:snp:{statuses[-1].snapshot.hex()}"}) if statuses[-1].snapshot else None, }, "statuses": {"nodes": [{"status": status.status} for status in statuses]}, } def test_invalid_get_visit(client): query_str = """ { visit(originUrl: "http://example.com/forge1", visitId: 3) { type } } """ - assert_missing_object(client, query_str, "visit") + utils.assert_missing_object(client, query_str, "visit") def test_get_latest_visit_status_filter_by_status_return_null(client): query_str = """ - { - visit(originUrl: "%s", visitId: %s) { + query getVisit($origin: String!, $visitId: Int!) { + visit(originUrl: $origin, visitId: $visitId) { visitId date type latestStatus(allowedStatuses: [full]) { status } } } - """ % ( - get_origins()[0].url, - 1, + """ + data, err = utils.get_query_response( + client, query_str, origin=get_origins()[0].url, visitId=1 ) - data, err = get_query_response(client, query_str) assert err is None assert data == { "visit": { "date": "2013-05-07T04:20:39.369271+00:00", "latestStatus": None, "type": "git", "visitId": 1, } } def test_get_latest_visit_status_filter_by_type(client): query_str = """ - { - visit(originUrl: "%s", visitId: %s) { + query getVisit($origin: String!, $visitId: Int!) { + visit(originUrl: $origin, visitId: $visitId) { visitId date type latestStatus(allowedStatuses: [ongoing]) { status date } } } - """ % ( - get_origins()[0].url, - 1, + """ + data, err = utils.get_query_response( + client, query_str, origin=get_origins()[0].url, visitId=1 ) - data, err = get_query_response(client, query_str) assert err is None assert data == { "visit": { "date": "2013-05-07T04:20:39.369271+00:00", "latestStatus": { "date": "2014-05-07T04:20:39.432222+00:00", "status": "ongoing", }, "type": "git", "visitId": 1, } } def test_get_latest_visit_status_filter_by_snapshot(client): query_str = """ - { - visit(originUrl: "%s", visitId: %s) { + query getVisit($origin: String!, $visitId: Int!) { + visit(originUrl: $origin, visitId: $visitId) { visitId date type latestStatus(requireSnapshot: true) { status date snapshot { swhid } } } } - """ % ( - get_origins()[1].url, - 2, + """ + data, err = utils.get_query_response( + client, query_str, origin=get_origins()[1].url, visitId=2 ) - data, err = get_query_response(client, query_str) assert err is None assert data == { "visit": { "date": "2015-11-27T17:20:39+00:00", "latestStatus": { "date": "2015-11-27T17:22:18+00:00", "snapshot": { "swhid": "swh:1:snp:0e7f84ede9a254f2cd55649ad5240783f557e65f" }, "status": "partial", }, "type": "hg", "visitId": 2, } } diff --git a/swh/graphql/tests/functional/test_visit_status.py b/swh/graphql/tests/functional/test_visit_status.py index 13c81ba..4171ffb 100644 --- a/swh/graphql/tests/functional/test_visit_status.py +++ b/swh/graphql/tests/functional/test_visit_status.py @@ -1,103 +1,103 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest +from . import utils from ..data import get_origins, get_visit_status, get_visits -from .utils import get_query_response @pytest.mark.parametrize( "visit, visit_status", list(zip(get_visits(), get_visit_status())) ) def test_get_visit_status(client, visit, visit_status): query_str = """ - { - visit(originUrl: "%s", visitId: %s) { + query getVisit($origin: String!, $visitId: Int!) { + visit(originUrl: $origin, visitId: $visitId) { statuses(first: 3) { nodes { status date type snapshot { swhid } } } } } - """ % ( - visit.origin, - visit.visit, + """ + data, _ = utils.get_query_response( + client, query_str, origin=visit.origin, visitId=visit.visit ) - data, _ = get_query_response(client, query_str) assert data["visit"]["statuses"]["nodes"][0] == { "date": visit_status.date.isoformat(), "snapshot": {"swhid": f"swh:1:snp:{visit_status.snapshot.hex()}"} if visit_status.snapshot is not None else None, "status": visit_status.status, "type": visit_status.type, } def test_visit_status_pagination(client): # visit status is using a different cursor, hence separate test query_str = """ - { - visit(originUrl: "%s", visitId: %s) { + query getVisit($origin: String!, $visitId: Int!) { + visit(originUrl: $origin, visitId: $visitId) { statuses(first: 1) { pageInfo { hasNextPage endCursor } edges { cursor node { status } } } } } - """ % ( - get_origins()[0].url, - 1, + """ + data, _ = utils.get_query_response( + client, query_str, origin=get_origins()[0].url, visitId=1 ) - data, _ = get_query_response(client, query_str) # request again with the endcursor end_cursor = data["visit"]["statuses"]["pageInfo"]["endCursor"] query_str = """ - { - visit(originUrl: "%s", visitId: %s) { - statuses(first: 1, after: "%s") { + query getVisit($origin: String!, $visitId: Int!, $after: String) { + visit(originUrl: $origin, visitId: $visitId) { + statuses(first: 1, after: $after) { pageInfo { hasNextPage endCursor } edges { cursor node { status } } } } } - """ % ( - get_origins()[0].url, - 1, - end_cursor, + """ + data, _ = utils.get_query_response( + client, + query_str, + origin=get_origins()[0].url, + visitId=1, + after=end_cursor, ) - data, _ = get_query_response(client, query_str) assert data["visit"]["statuses"] == { "edges": [ { "cursor": "MjAxNC0wNS0wN1QwNDoyMDozOS40MzIyMjIrMDA6MDA=", "node": {"status": "ongoing"}, } ], "pageInfo": {"endCursor": None, "hasNextPage": False}, } diff --git a/swh/graphql/tests/functional/utils.py b/swh/graphql/tests/functional/utils.py index 9fba30b..35d2ee3 100644 --- a/swh/graphql/tests/functional/utils.py +++ b/swh/graphql/tests/functional/utils.py @@ -1,37 +1,32 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Dict, Tuple +from ariadne import gql -def get_response(client, query_str: str): - return client.post("/", json={"query": query_str}) - -def get_query_response(client, query_str: str) -> Tuple[Dict, Dict]: - response = get_response(client, query_str) +def get_query_response(client, query_str: str, **kwargs) -> Tuple[Dict, Dict]: + query = gql(query_str) + response = client.post("/", json={"query": query, "variables": kwargs}) assert response.status_code == 200, response.data result = json.loads(response.data) return result.get("data"), result.get("errors") -def assert_missing_object(client, query_str: str, obj_type: str) -> None: - data, errors = get_query_response(client, query_str) +def assert_missing_object(client, query_str: str, obj_type: str, **kwargs) -> None: + data, errors = get_query_response(client, query_str, **kwargs) assert data[obj_type] is None assert len(errors) == 1 assert errors[0]["message"] == "Object error: Requested object is not available" assert errors[0]["path"] == [obj_type] -def get_error_response(client, query_str: str, error_code: int = 400) -> Dict: - response = get_response(client, query_str) - assert response.status_code == error_code - return json.loads(response.data)["errors"] - - -def get_query_params_from_args(**args) -> str: - # build a GraphQL query parameters string from arguments - return ",".join([f"{key}: {val}" for (key, val) in args.items()]) +def get_error_response(client, query_str: str, **kwargs) -> Dict: + data, errors = get_query_response(client, query_str, **kwargs) + assert data is None + assert len(errors) > 0 + return errors