Changeset View
Changeset View
Standalone View
Standalone View
swh/graphql/tests/functional/test_directory_entry.py
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import pytest | import pytest | ||||
from swh.graphql import server | from swh.graphql import server | ||||
from swh.model.swhids import CoreSWHID, ObjectType | from swh.model.swhids import CoreSWHID, ObjectType | ||||
from . import utils | from . import utils | ||||
from ..data import get_directories, get_directories_with_nested_path | from ..data import get_directories, get_directories_with_nested_path | ||||
def get_target_type(target_type): | |||||
mapping = {"file": "content", "dir": "directory", "rev": "revision"} | |||||
return mapping.get(target_type) | |||||
def test_get_directory_entry_missing_path(client): | def test_get_directory_entry_missing_path(client): | ||||
directory = get_directories()[0] | directory = get_directories()[0] | ||||
path = "missing" | path = "missing" | ||||
query_str = """ | query_str = """ | ||||
{ | { | ||||
directoryEntry(directorySwhid: "%s", path: "%s") { | directoryEntry(directorySwhid: "%s", path: "%s") { | ||||
name { | name { | ||||
text | text | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | for entry in storage.directory_ls(directory.id, recursive=True): | ||||
) | ) | ||||
elif entry["type"] == "rev" and entry["target"] is not None: | elif entry["type"] == "rev" and entry["target"] is not None: | ||||
swhid = CoreSWHID( | swhid = CoreSWHID( | ||||
object_type=ObjectType.REVISION, object_id=entry["target"] | object_type=ObjectType.REVISION, object_id=entry["target"] | ||||
) | ) | ||||
assert data["directoryEntry"] == { | assert data["directoryEntry"] == { | ||||
"name": {"text": entry["name"].decode()}, | "name": {"text": entry["name"].decode()}, | ||||
"target": {"swhid": str(swhid)} if swhid else None, | "target": {"swhid": str(swhid)} if swhid else None, | ||||
"targetType": entry["type"], | "targetType": get_target_type(entry["type"]), | ||||
} | } | ||||
@pytest.mark.parametrize("directory", get_directories()) | @pytest.mark.parametrize("directory", get_directories()) | ||||
def test_get_directory_entry_connection(client, directory): | def test_get_directory_entry_connection(client, directory): | ||||
query_str = """ | query_str = """ | ||||
{ | { | ||||
directory(swhid: "%s") { | directory(swhid: "%s") { | ||||
swhid | swhid | ||||
entries { | entries { | ||||
nodes { | nodes { | ||||
targetType | targetType | ||||
name { | name { | ||||
text | text | ||||
} | } | ||||
} | } | ||||
} | } | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
data, _ = utils.get_query_response(client, query_str % directory.swhid()) | data, _ = utils.get_query_response(client, query_str % directory.swhid()) | ||||
directory_entries = data["directory"]["entries"]["nodes"] | directory_entries = data["directory"]["entries"]["nodes"] | ||||
assert len(directory_entries) == len(directory.entries) | assert len(directory_entries) == len(directory.entries) | ||||
output = [ | output = [ | ||||
{"name": {"text": de.name.decode()}, "targetType": de.type} | {"name": {"text": de.name.decode()}, "targetType": get_target_type(de.type)} | ||||
for de in directory.entries | for de in directory.entries | ||||
] | ] | ||||
for each_entry in output: | for each_entry in output: | ||||
assert each_entry in directory_entries | assert each_entry in directory_entries | ||||
@pytest.mark.parametrize("directory", get_directories()) | |||||
def test_directory_entry_connection_filter_by_name(client, directory): | |||||
storage = server.get_storage() | |||||
for dir_entry in storage.directory_ls(directory.id): | |||||
name_include = dir_entry["name"][:-1].decode() | |||||
query_str = """ | |||||
{ | |||||
directory(swhid: "%s") { | |||||
swhid | |||||
entries(nameInclude: "%s") { | |||||
nodes { | |||||
targetType | |||||
name { | |||||
text | |||||
} | |||||
} | |||||
} | |||||
} | |||||
} | |||||
""" % ( | |||||
directory.swhid(), | |||||
name_include, | |||||
) | |||||
anlambert: You should try to make your tests cover more cases and more dynamic by avoiding to hard code… | |||||
data, _ = utils.get_query_response(client, query_str) | |||||
for entry in data["directory"]["entries"]["nodes"]: | |||||
assert name_include in entry["name"]["text"] | |||||
assert entry["targetType"] == get_target_type(dir_entry["type"]) |
You should try to make your tests cover more cases and more dynamic by avoiding to hard code test inputs, something like: