diff --git a/swh/graphql/resolvers/directory_entry.py b/swh/graphql/resolvers/directory_entry.py index d595be4..cddff3d 100644 --- a/swh/graphql/resolvers/directory_entry.py +++ b/swh/graphql/resolvers/directory_entry.py @@ -1,61 +1,63 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.graphql.utils import utils from swh.storage.interface import PagedResult from .base_connection import BaseConnection from .base_node import BaseNode class BaseDirectoryEntryNode(BaseNode): @property def target_hash(self): # for DirectoryNode return self._node.target @property def targetType(self): # To support the schema naming convention mapping = {"file": "content", "dir": "directory", "rev": "revision"} return mapping.get(self._node.type) class DirectoryEntryNode(BaseDirectoryEntryNode): """ Node resolver for a directory entry requested with a directory SWHID and a relative path """ def _get_node_data(self): # STORAGE-TODO, archive is returning a dict # return DirectoryEntry object instead return self.archive.get_directory_entry_by_path( directory_id=self.kwargs.get("directorySwhid").object_id, path=self.kwargs.get("path"), ) class DirectoryEntryConnection(BaseConnection): """ Connection resolver for entries in a directory """ from .directory import BaseDirectoryNode obj: BaseDirectoryNode _node_class = BaseDirectoryEntryNode def _get_paged_result(self) -> PagedResult: # FIXME, using dummy(local) pagination, move pagination to backend # To remove localpagination, just drop the paginated call # STORAGE-TODO entries = self.archive.get_directory_entries(self.obj.swhid.object_id).results name_include = self.kwargs.get("nameInclude") if name_include is not None: # STORAGE-TODO, move this filter to swh-storage entries = [ - x for x in entries if name_include.lower().encode() in x.name.lower() + x + for x in entries + if name_include.casefold() in x.name.decode().casefold() ] return utils.paginated(entries, self._get_first_arg(), self._get_after_arg()) diff --git a/swh/graphql/tests/data.py b/swh/graphql/tests/data.py index dc7e3f4..71367aa 100644 --- a/swh/graphql/tests/data.py +++ b/swh/graphql/tests/data.py @@ -1,158 +1,174 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.model.model import ( Directory, DirectoryEntry, ObjectType, OriginVisitStatus, Release, Revision, RevisionType, ) from swh.model.tests import swh_model_data UTC = datetime.timezone.utc def populate_search_data(search): search.origin_update({"url": origin.url} for origin in get_origins()) def get_origins(): return swh_model_data.ORIGINS def get_visits(): return swh_model_data.ORIGIN_VISITS def get_visit_status(): return swh_model_data.ORIGIN_VISIT_STATUSES def get_snapshots(): return swh_model_data.SNAPSHOTS def get_releases(): return swh_model_data.RELEASES def get_revisions(): return swh_model_data.REVISIONS def get_contents(): return swh_model_data.CONTENTS def get_directories(): return swh_model_data.DIRECTORIES def get_releases_with_target(): """ GraphQL will not return a target object unless the target id is present in the DB. Return release objects with real targets instead of dummy targets in swh.model.tests.swh_model_data """ with_revision = Release( name=b"v0.0.1", target_type=ObjectType.REVISION, target=get_revisions()[0].id, message=b"foo", synthetic=False, ) with_release = Release( name=b"v0.0.1", target_type=ObjectType.RELEASE, target=get_releases()[0].id, message=b"foo", synthetic=False, ) with_directory = Release( name=b"v0.0.1", target_type=ObjectType.DIRECTORY, target=get_directories()[0].id, message=b"foo", synthetic=False, ) with_content = Release( name=b"v0.0.1", target_type=ObjectType.CONTENT, target=get_contents()[0].sha1_git, message=b"foo", synthetic=False, ) return [with_revision, with_release, with_directory, with_content] def get_revisions_with_parents(): """ Revisions with real revisions as parents """ return [ Revision( message=b"hello", date=swh_model_data.DATES[0], committer=swh_model_data.COMMITTERS[0], author=swh_model_data.COMMITTERS[0], committer_date=swh_model_data.DATES[0], type=RevisionType.GIT, directory=b"\x01" * 20, synthetic=False, parents=(get_revisions()[0].id, get_revisions()[1].id), ) ] def get_directories_with_nested_path(): return [ Directory( entries=( DirectoryEntry( name=b"sub-dir", perms=0o644, type="dir", target=get_directories()[1].id, ), ) ) ] +def get_directories_with_special_name_entries(): + return [ + Directory( + entries=( + DirectoryEntry( + name="ßßétEÉt".encode(), + perms=0o644, + type="file", + target=get_contents()[0].sha1_git, + ), + ) + ) + ] + + def get_visit_with_multiple_status(): return [ OriginVisitStatus( origin=get_origins()[0].url, date=datetime.datetime(2014, 5, 7, 4, 20, 39, 432222, tzinfo=UTC), visit=1, type="git", status="ongoing", snapshot=None, metadata=None, ) ] GRAPHQL_EXTRA_TEST_OBJECTS = { "release": get_releases_with_target(), "revision": get_revisions_with_parents(), - "directory": get_directories_with_nested_path(), + "directory": get_directories_with_nested_path() + + get_directories_with_special_name_entries(), "origin_visit_status": get_visit_with_multiple_status(), } def populate_dummy_data(storage): for object_type, objects in swh_model_data.TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") method(objects) for object_type, objects in GRAPHQL_EXTRA_TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") method(objects) diff --git a/swh/graphql/tests/functional/test_directory_entry.py b/swh/graphql/tests/functional/test_directory_entry.py index 02235c1..7498282 100644 --- a/swh/graphql/tests/functional/test_directory_entry.py +++ b/swh/graphql/tests/functional/test_directory_entry.py @@ -1,156 +1,188 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.graphql import server from swh.model.swhids import CoreSWHID, ObjectType from . import utils -from ..data import get_directories, get_directories_with_nested_path +from ..data import ( + get_directories, + get_directories_with_nested_path, + get_directories_with_special_name_entries, +) def get_target_type(target_type): mapping = {"file": "content", "dir": "directory", "rev": "revision"} return mapping.get(target_type) def test_get_directory_entry_missing_path(client): directory = get_directories()[0] path = "missing" query_str = """ query getDirEntry($swhid: SWHID!, $path: String!) { directoryEntry(directorySwhid: $swhid, path: $path) { name { text } targetType target { ...on Content { swhid } } } } """ utils.assert_missing_object( client, query_str, "directoryEntry", swhid=str(directory.swhid()), path=path, ) @pytest.mark.parametrize( "directory", get_directories() + get_directories_with_nested_path() ) def test_get_directory_entry(client, directory): storage = server.get_storage() query_str = """ query getDirEntry($swhid: SWHID!, $path: String!) { directoryEntry(directorySwhid: $swhid, path: $path) { name { text } targetType target { ...on Content { swhid } ...on Directory { swhid } ...on Revision { swhid } } } } """ for entry in storage.directory_ls(directory.id, recursive=True): data, _ = utils.get_query_response( client, query_str, swhid=str(directory.swhid()), path=entry["name"].decode(), ) swhid = None if entry["type"] == "file" and entry["sha1_git"] is not None: swhid = CoreSWHID( object_type=ObjectType.CONTENT, object_id=entry["sha1_git"] ) elif entry["type"] == "dir" and entry["target"] is not None: swhid = CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=entry["target"] ) elif entry["type"] == "rev" and entry["target"] is not None: swhid = CoreSWHID( object_type=ObjectType.REVISION, object_id=entry["target"] ) assert data["directoryEntry"] == { "name": {"text": entry["name"].decode()}, "target": {"swhid": str(swhid)} if swhid else None, "targetType": get_target_type(entry["type"]), } @pytest.mark.parametrize("directory", get_directories()) def test_get_directory_entry_connection(client, directory): query_str = """ query getDirectory($swhid: SWHID!) { directory(swhid: $swhid) { swhid entries { nodes { targetType name { text } } } } } """ data, _ = utils.get_query_response(client, query_str, swhid=str(directory.swhid())) directory_entries = data["directory"]["entries"]["nodes"] assert len(directory_entries) == len(directory.entries) output = [ {"name": {"text": de.name.decode()}, "targetType": get_target_type(de.type)} for de in directory.entries ] for each_entry in output: assert each_entry in directory_entries @pytest.mark.parametrize("directory", get_directories()) def test_directory_entry_connection_filter_by_name(client, directory): storage = server.get_storage() for dir_entry in storage.directory_ls(directory.id): name_include = dir_entry["name"][:-1].decode() query_str = """ query getDirectory($swhid: SWHID!, $nameInclude: String) { directory(swhid: $swhid) { swhid entries(nameInclude: $nameInclude) { nodes { targetType name { text } } } } } """ data, _ = utils.get_query_response( client, query_str, swhid=str(directory.swhid()), nameInclude=name_include, ) for entry in data["directory"]["entries"]["nodes"]: assert name_include in entry["name"]["text"] assert entry["targetType"] == get_target_type(dir_entry["type"]) + + +def test_directory_entry_connection_filter_by_name_special_chars(client): + directory = get_directories_with_special_name_entries()[0] + query_str = """ + query getDirectory($swhid: SWHID!, $nameInclude: String) { + directory(swhid: $swhid) { + entries(nameInclude: $nameInclude) { + nodes { + targetType + name { + text + } + } + } + } + } + """ + data, _ = utils.get_query_response( + client, + query_str, + swhid=str(directory.swhid()), + nameInclude="ssSSé", + ) + assert data["directory"]["entries"]["nodes"][0] == { + "name": {"text": "ßßétEÉt"}, + "targetType": "content", + }