diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py index cb45d37..e06ab7e 100644 --- a/swh/auth/keycloak.py +++ b/swh/auth/keycloak.py @@ -1,246 +1,246 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Optional from urllib.parse import urlencode # add ExpiredSignatureError alias to avoid leaking jose import # in swh-auth client code from jose.jwt import ExpiredSignatureError # noqa from keycloak import KeycloakOpenID # add KeycloakError alias to avoid leaking keycloak import # in swh-auth client code from keycloak.exceptions import KeycloakError # noqa from swh.core.config import load_from_envvar class KeycloakOpenIDConnect: """ Wrapper class around python-keycloak to ease the interaction with Keycloak for managing authentication and user permissions with OpenID Connect. """ def __init__( self, server_url: str, realm_name: str, client_id: str, realm_public_key: str = "", ): """ Args: server_url: URL of the Keycloak server realm_name: The realm name client_id: The OpenID Connect client identifier realm_public_key: The realm public key (will be dynamically retrieved if not provided) """ self._keycloak = KeycloakOpenID( - server_url=server_url, client_id=client_id, realm_name=realm_name, + server_url=server_url, + client_id=client_id, + realm_name=realm_name, ) self.server_url = server_url self.realm_public_key = realm_public_key @property def realm_name(self): return self._keycloak.realm_name @realm_name.setter def realm_name(self, value): self._keycloak.realm_name = value @property def client_id(self): return self._keycloak.client_id @client_id.setter def client_id(self, value): self._keycloak.client_id = value def well_known(self) -> Dict[str, Any]: """ Retrieve the OpenID Connect Well-Known URI registry from Keycloak. Returns: A dictionary filled with OpenID Connect URIS. """ return self._keycloak.well_know() def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: """ Get OpenID Connect authorization URL to authenticate users. Args: redirect_uri: URI to redirect to once a user is authenticated extra_params: Extra query parameters to add to the authorization URL """ auth_url = self._keycloak.auth_url(redirect_uri) if extra_params: auth_url += "&%s" % urlencode(extra_params) return auth_url def authorization_code( self, code: str, redirect_uri: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Authorization Code flow. Raises: KeycloakError in case of authentication failures Args: code: Authorization code provided by Keycloak redirect_uri: URI to redirect to once a user is authenticated (must be the same as the one provided to authorization_url): extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="authorization_code", code=code, redirect_uri=redirect_uri, **extra_params, ) def login( self, username: str, password: str, scope: str = "openid", **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Direct Access Grant flow. Raises: KeycloakError in case of authentication failures Args: username: an existing username in the realm password: password associated to username extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="password", scope=scope, username=username, password=password, **extra_params, ) def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """ Request a new access token from Keycloak using a refresh token. Args: refresh_token: A refresh token provided by Keycloak Returns: A dictionary filled with tokens info """ return self._keycloak.refresh_token(refresh_token) def decode_token( self, token: str, options: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Try to decode a JWT token. Args: token: A JWT token to decode options: Options for jose.jwt.decode Returns: A dictionary filled with decoded token content """ if not self.realm_public_key: realm_public_key = self._keycloak.public_key() self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" self.realm_public_key += realm_public_key self.realm_public_key += "\n-----END PUBLIC KEY-----" return self._keycloak.decode_token( token, key=self.realm_public_key, options=options ) def logout(self, refresh_token: str) -> None: """ Logout a user by closing its authenticated session. Args: refresh_token: A refresh token provided by Keycloak """ self._keycloak.logout(refresh_token) def userinfo(self, access_token: str) -> Dict[str, Any]: """ Return user information from its access token. Args: access_token: An access token provided by Keycloak Returns: A dictionary fillled with user information """ return self._keycloak.userinfo(access_token) @classmethod def from_config(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": """Instantiate a KeycloakOpenIDConnect class from a configuration dict. Args: kwargs: configuration dict for the instance, with one keycloak key, whose value is a Dict with the following keys: - server_url: URL of the Keycloak server - realm_name: The realm name - client_id: The OpenID Connect client identifier Returns: the KeycloakOpenIDConnect instance """ cfg = kwargs["keycloak"] return cls( server_url=cfg["server_url"], realm_name=cfg["realm_name"], client_id=cfg["client_id"], ) @classmethod def from_configfile(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": """Instantiate a KeycloakOpenIDConnect class from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to instantiation call Returns: the KeycloakOpenIDConnect instance """ config = dict(load_from_envvar()).get("keycloak", {}) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(keycloak=config) def keycloak_error_message(keycloak_error: KeycloakError) -> str: - """Transform a keycloak exception into an error message. - - """ + """Transform a keycloak exception into an error message.""" try: # keycloak error wrapped in a JSON document msg_dict = json.loads(keycloak_error.error_message.decode()) error_msg = msg_dict["error"] error_desc = msg_dict.get("error_description") if error_desc: error_msg = f"{error_msg}: {error_desc}" return error_msg except Exception: # fallback: return error message string return keycloak_error.error_message diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py index d2b5486..9dd5e5b 100644 --- a/swh/auth/pytest_plugin.py +++ b/swh/auth/pytest_plugin.py @@ -1,228 +1,229 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime, timezone import json from typing import Dict, List, Optional from unittest.mock import Mock from keycloak.exceptions import KeycloakError import pytest from swh.auth.keycloak import KeycloakOpenIDConnect from swh.auth.tests.sample_data import ( CLIENT_ID, OIDC_PROFILE, RAW_REALM_PUBLIC_KEY, REALM_NAME, SERVER_URL, USER_INFO, ) class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): """Mock KeycloakOpenIDConnect class to allow testing Args: server_url: Server main auth url (cf. :py:data:`swh.auth.tests.sample_data.SERVER_URL`) realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`) client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`) auth_success: boolean flag to simulate authentication success or failure exp: expiration delay user_groups: user groups configuration (if any) realm_permissions: user permissions configuration at realm level (if any) client_permissions: user permissions configuration at client level (if any) oidc_profile: Dict response from a call to a token authentication query (cf. :py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`) user_info: Dict response from a call to userinfo query (cf. :py:data:`swh.auth.tests.sample_data.USER_INFO`) raw_realm_public_key: A raw ascii text representing the realm public key (cf. :py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`) """ def __init__( self, server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], realm_permissions: List[str] = [], client_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): - """Constructor - """ + """Constructor""" super().__init__( server_url=server_url, realm_name=realm_name, client_id=client_id ) self.exp = exp self.user_groups = user_groups self.realm_permissions = realm_permissions self.client_permissions = client_permissions self._keycloak.public_key = lambda: raw_realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.set_auth_success(auth_success, oidc_profile, user_info) def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration and audience checks as we use a static # oidc_profile for the tests with expired tokens in it options["verify_exp"] = False options["verify_aud"] = False decoded = super().decode_token(token, options) # Merge the user info configured to be part of the decode token userinfo = self.userinfo() if userinfo is not None: decoded = {**decoded, **userinfo} # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["iat"] if self.exp is not None: decoded["exp"] = self.exp decoded["iat"] = self.exp - expire_in else: now = int(datetime.now(tz=timezone.utc).timestamp()) decoded["iat"] = now decoded["exp"] = now + expire_in decoded["groups"] = self.user_groups decoded["aud"] = [self.client_id, "account"] decoded["azp"] = self.client_id decoded["realm_access"]["roles"] += self.realm_permissions if self.client_permissions: decoded["resource_access"][self.client_id] = { "roles": self.client_permissions } return decoded def set_auth_success( self, auth_success: bool, oidc_profile: Optional[Dict] = None, user_info: Optional[Dict] = None, ) -> None: # following type ignore because mypy is not too happy about affecting mock to # method "Cannot assign to a method affecting mock". Ignore for now. self.authorization_code = Mock() # type: ignore self.refresh_token = Mock() # type: ignore self.login = Mock() # type: ignore self.userinfo = Mock() # type: ignore self.logout = Mock() # type: ignore self.auth_success = auth_success if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.refresh_token.return_value = copy(oidc_profile) self.login.return_value = copy(oidc_profile) self.userinfo.return_value = copy(user_info) else: self.authorization_url = Mock() # type: ignore error = { "error": "invalid_grant", "error_description": "Invalid user credentials", } error_message = json.dumps(error).encode() exception = KeycloakError(error_message=error_message, response_code=401) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception self.login.side_effect = exception def keycloak_oidc_factory( server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], realm_permissions: List[str] = [], client_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): """Keycloak mock fixture factory. Report to - :py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring. + :py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring. """ @pytest.fixture def keycloak_oidc(): return KeycloackOpenIDConnectMock( server_url=server_url, realm_name=realm_name, client_id=client_id, auth_success=auth_success, exp=exp, user_groups=user_groups, realm_permissions=realm_permissions, client_permissions=client_permissions, oidc_profile=oidc_profile, user_info=user_info, raw_realm_public_key=raw_realm_public_key, ) return keycloak_oidc # for backward compatibility # TODO: remove that alias once swh-deposit and swh-web use new function name keycloak_mock_factory = keycloak_oidc_factory # generic keycloak fixture that can be used within tests # (cf. test_keycloak.py, test_utils.py, django related tests) # or external modules using that pytest plugin _keycloak_oidc = keycloak_oidc_factory( - server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, + server_url=SERVER_URL, + realm_name=REALM_NAME, + client_id=CLIENT_ID, ) @pytest.fixture def keycloak_oidc(_keycloak_oidc, mocker): for oidc_client_factory in ( "swh.auth.django.views.keycloak_oidc_client", "swh.auth.django.backends.keycloak_oidc_client", ): keycloak_oidc_client = mocker.patch(oidc_client_factory) keycloak_oidc_client.return_value = _keycloak_oidc return _keycloak_oidc diff --git a/swh/auth/tests/django/app/apptest/settings.py b/swh/auth/tests/django/app/apptest/settings.py index 211e57b..f8ee5fe 100644 --- a/swh/auth/tests/django/app/apptest/settings.py +++ b/swh/auth/tests/django/app/apptest/settings.py @@ -1,51 +1,54 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.auth.tests.sample_data import CLIENT_ID, REALM_NAME, SERVER_URL SECRET_KEY = "o+&ayiuk(y^wh4ijz5e=c2$$kjj7g^6r%z+8d*c0lbpfs##k#7" INSTALLED_APPS = [ "django.contrib.auth", "django.contrib.contenttypes", "django.contrib.sessions", "swh.auth.tests.django.app.apptest", ] MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", ] ROOT_URLCONF = "swh.auth.tests.django.app.apptest.urls" DATABASES = { - "default": {"ENGINE": "django.db.backends.sqlite3", "NAME": "swh-auth-test-db",} + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": "swh-auth-test-db", + } } SESSION_ENGINE = "django.contrib.sessions.backends.cache" AUTHENTICATION_BACKENDS = [ "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", ] SWH_AUTH_SERVER_URL = SERVER_URL SWH_AUTH_REALM_NAME = REALM_NAME SWH_AUTH_CLIENT_ID = CLIENT_ID SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout" REST_FRAMEWORK = { "DEFAULT_RENDERER_CLASSES": ("rest_framework.renderers.JSONRenderer",), "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", "swh.auth.django.backends.OIDCBearerTokenAuthentication", ], } diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py index c2be128..bc5c1da 100644 --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -1,121 +1,123 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime from django.test import override_settings import pytest from swh.auth.django.utils import ( keycloak_oidc_client, oidc_user_from_decoded_token, oidc_user_from_profile, ) from swh.auth.tests.sample_data import ( CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, REALM_NAME, SERVER_URL, ) def _check_user(user, is_staff=False, permissions=set()): assert user.id > 0 assert user.username == DECODED_TOKEN["preferred_username"] assert user.password == "" assert user.first_name == DECODED_TOKEN["given_name"] assert user.last_name == DECODED_TOKEN["family_name"] assert user.email == DECODED_TOKEN["email"] assert user.is_staff == is_staff assert user.permissions == permissions assert user.sub == DECODED_TOKEN["sub"] date_now = datetime.now() if user.expires_at is not None: assert isinstance(user.expires_at, datetime) assert date_now <= user.expires_at if user.refresh_expires_at is not None: assert isinstance(user.refresh_expires_at, datetime) assert date_now <= user.refresh_expires_at assert user.oidc_profile == { k: getattr(user, k) for k in ( "access_token", "expires_in", "expires_at", "id_token", "refresh_token", "refresh_expires_in", "refresh_expires_at", "scope", "session_state", ) } def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) _check_user(user) def test_oidc_user_with_permissions_from_decoded_token(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["realm_access"] = {"roles": ["swh.ambassador"]} decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) _check_user(user, is_staff=True, permissions={"swh.ambassador", "read-api"}) @pytest.mark.parametrize( "key,mapped_key", [ ("preferred_username", "username"), ("given_name", "first_name"), ("family_name", "last_name"), ("email", "email"), ], ) def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): decoded_token = copy(DECODED_TOKEN) decoded_token.pop(key, None) user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) # Ensure the missing field is mapped to an empty value assert getattr(user, mapped_key) == "" def test_oidc_user_from_profile(keycloak_oidc): user = oidc_user_from_profile(keycloak_oidc, OIDC_PROFILE) _check_user(user) @override_settings( - SWH_AUTH_SERVER_URL=None, SWH_AUTH_REALM_NAME=None, SWH_AUTH_CLIENT_ID=None, + SWH_AUTH_SERVER_URL=None, + SWH_AUTH_REALM_NAME=None, + SWH_AUTH_CLIENT_ID=None, ) def test_keycloak_oidc_client_missing_django_settings(): with pytest.raises(ValueError, match="settings are mandatory"): keycloak_oidc_client() @override_settings( SWH_AUTH_SERVER_URL=SERVER_URL, SWH_AUTH_REALM_NAME=REALM_NAME, SWH_AUTH_CLIENT_ID=CLIENT_ID, ) def test_keycloak_oidc_client_parameters_from_django_settings(): kc_oidc_client = keycloak_oidc_client() assert kc_oidc_client.server_url == SERVER_URL assert kc_oidc_client.realm_name == REALM_NAME assert kc_oidc_client.client_id == CLIENT_ID diff --git a/swh/auth/tests/test_keycloak.py b/swh/auth/tests/test_keycloak.py index b8f23a1..ca91b9a 100644 --- a/swh/auth/tests/test_keycloak.py +++ b/swh/auth/tests/test_keycloak.py @@ -1,182 +1,178 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy import json import os from urllib.parse import parse_qs, urlparse from keycloak.exceptions import KeycloakError import pytest import yaml from swh.auth.keycloak import KeycloakOpenIDConnect, keycloak_error_message from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, USER_INFO from swh.core.config import read def test_keycloak_oidc_well_known(keycloak_oidc): well_known_result = keycloak_oidc.well_known() assert set(well_known_result.keys()) == { "issuer", "authorization_endpoint", "token_endpoint", "userinfo_endpoint", "end_session_endpoint", "jwks_uri", "token_introspection_endpoint", } def test_keycloak_oidc_authorization_url(keycloak_oidc): actual_auth_uri = keycloak_oidc.authorization_url("http://redirect-uri", foo="bar") expected_auth_url = keycloak_oidc.well_known()["authorization_endpoint"] parsed_result = urlparse(actual_auth_uri) assert expected_auth_url.endswith(parsed_result.path) parsed_query = parse_qs(parsed_result.query) assert parsed_query == { "client_id": [CLIENT_ID], "response_type": ["code"], "redirect_uri": ["http://redirect-uri"], "foo": ["bar"], } def test_keycloak_oidc_authorization_code_fail(keycloak_oidc): "Authorization failure raise error" # Simulate failed authentication with Keycloak keycloak_oidc.set_auth_success(False) with pytest.raises(KeycloakError): keycloak_oidc.authorization_code("auth-code", "redirect-uri") with pytest.raises(KeycloakError): keycloak_oidc.login("username", "password") def test_keycloak_oidc_authorization_code(keycloak_oidc): actual_response = keycloak_oidc.authorization_code("auth-code", "redirect-uri") assert actual_response == OIDC_PROFILE def test_keycloak_oidc_refresh_token(keycloak_oidc): actual_result = keycloak_oidc.refresh_token("refresh-token") assert actual_result == OIDC_PROFILE def test_keycloak_oidc_userinfo(keycloak_oidc): actual_user_info = keycloak_oidc.userinfo("refresh-token") assert actual_user_info == USER_INFO def test_keycloak_oidc_logout(keycloak_oidc): """Login out does not raise""" keycloak_oidc.logout("refresh-token") def test_keycloak_oidc_decode_token(keycloak_oidc): actual_decoded_data = keycloak_oidc.decode_token(OIDC_PROFILE["access_token"]) actual_decoded_data2 = copy(actual_decoded_data) expected_decoded_token = copy(DECODED_TOKEN) for dynamic_valued_key in ["exp", "iat", "auth_time"]: actual_decoded_data2.pop(dynamic_valued_key, None) expected_decoded_token.pop(dynamic_valued_key, None) assert actual_decoded_data2 == expected_decoded_token def test_keycloak_oidc_login(keycloak_oidc): actual_response = keycloak_oidc.login("username", "password") assert actual_response == OIDC_PROFILE @pytest.fixture def auth_config(): return { "keycloak": { "server_url": "https://auth.swh.org/SWHTest", "realm_name": "SWHTest", "client_id": "client_id", } } @pytest.fixture def auth_config_path(tmp_path, monkeypatch, auth_config): conf_path = os.path.join(tmp_path, "auth.yml") with open(conf_path, "w") as f: f.write(yaml.dump(auth_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path def test_auth_KeycloakOpenIDConnect_from_config(auth_config): - """Instantiating keycloak client out of configuration dict is possible - - """ + """Instantiating keycloak client out of configuration dict is possible""" client = KeycloakOpenIDConnect.from_config(**auth_config) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == auth_config["keycloak"]["client_id"] def test_auth_KeycloakOpenIDConnect_from_configfile(auth_config_path, monkeypatch): - """Instantiating keycloak client out of environment variable is possible - - """ + """Instantiating keycloak client out of environment variable is possible""" client = KeycloakOpenIDConnect.from_configfile() auth_config = read(auth_config_path) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == auth_config["keycloak"]["client_id"] def test_auth_KeycloakOpenIDConnect_from_configfile_override( auth_config_path, monkeypatch ): """Instantiating keycloak client out of environment variable is possible And caller can override the configuration at calling - """ + """ client = KeycloakOpenIDConnect.from_configfile(client_id="foobar") auth_config = read(auth_config_path) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == "foobar" @pytest.mark.parametrize( "error_dict, expected_result", [ ({"error": "unknown_error"}, "unknown_error"), ( {"error": "invalid_grant", "error_description": "Invalid credentials"}, "invalid_grant: Invalid credentials", ), ], ) def test_auth_keycloak_error_message(error_dict, expected_result): """Conversion from KeycloakError to error message should work with detail or not""" error_message = json.dumps(error_dict).encode() exception = KeycloakError(error_message=error_message, response_code=401) actual_result = keycloak_error_message(exception) assert actual_result == expected_result def test_auth_keycloak_error_message_string(): """Conversion from KeycloakError to error message should work with detail or not""" error_message = "Can't connect to server " exception = KeycloakError(error_message=error_message) assert keycloak_error_message(exception) == error_message