diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py index 9697c12..fe47b9e 100644 --- a/swh/auth/django/utils.py +++ b/swh/auth/django/utils.py @@ -1,189 +1,191 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from typing import Any, Dict, Optional from django.conf import settings from django.http import HttpRequest, QueryDict from django.urls import reverse as django_reverse from swh.auth.django.models import OIDCUser from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token.get("preferred_username", ""), password="", first_name=decoded_token.get("given_name", ""), last_name=decoded_token.get("family_name", ""), email=decoded_token.get("email", ""), ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] + realm_access = decoded_token.get("realm_access", {}) + permissions = realm_access.get("roles", []) + if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) - permissions = client_resource_access.get("roles", []) - else: - permissions = [] + permissions += client_resource_access.get("roles", []) - user.permissions = set(permissions) + # set user permissions and filter out default keycloak realm roles + user.permissions = set(permissions) - {"offline_access", "uma_authorization"} # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def oidc_user_from_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] ) -> OIDCUser: """Initialize an OIDCUser out of an oidc profile dict. Args: oidc_client: KeycloakOpenIDConnect used to discuss with keycloak oidc_profile: OIDC profile retrieved once connected to keycloak Returns: OIDCUser instance parsed out of the token received. """ # decode JWT token try: access_token = oidc_profile["access_token"] decoded_token = oidc_client.decode_token(access_token) # access token has expired except ExpiredSignatureError: # get a new access token from authentication provider oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str: return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}" def keycloak_oidc_client() -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class from the following django settings: * SWH_AUTH_SERVER_URL * SWH_AUTH_REALM_NAME * SWH_AUTH_CLIENT_ID Returns: An object to ease the interaction with the Keycloak server Raises: ValueError: at least one mandatory django setting is not set """ server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None) realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None) client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None) if server_url is None or realm_name is None or client_id is None: raise ValueError( "SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django " "settings are mandatory to instantiate KeycloakOpenIDConnect class" ) return KeycloakOpenIDConnect( server_url=server_url, realm_name=realm_name, client_id=client_id ) def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[Dict[str, Any]] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py index e8581a1..330cd45 100644 --- a/swh/auth/pytest_plugin.py +++ b/swh/auth/pytest_plugin.py @@ -1,220 +1,226 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime, timezone import json from typing import Dict, List, Optional from unittest.mock import Mock from keycloak.exceptions import KeycloakError import pytest from swh.auth.keycloak import KeycloakOpenIDConnect from swh.auth.tests.sample_data import ( CLIENT_ID, OIDC_PROFILE, RAW_REALM_PUBLIC_KEY, REALM_NAME, SERVER_URL, USER_INFO, ) class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): """Mock KeycloakOpenIDConnect class to allow testing Args: server_url: Server main auth url (cf. :py:data:`swh.auth.tests.sample_data.SERVER_URL`) realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`) client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`) auth_success: boolean flag to simulate authentication success or failure exp: expiration delay user_groups: user groups configuration (if any) - user_permissions: user permissions configuration (if any) + realm_permissions: user permissions configuration at realm level (if any) + client_permissions: user permissions configuration at client level (if any) oidc_profile: Dict response from a call to a token authentication query (cf. :py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`) user_info: Dict response from a call to userinfo query (cf. :py:data:`swh.auth.tests.sample_data.USER_INFO`) raw_realm_public_key: A raw ascii text representing the realm public key (cf. :py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`) """ def __init__( self, server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], - user_permissions: List[str] = [], + realm_permissions: List[str] = [], + client_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): super().__init__( server_url=server_url, realm_name=realm_name, client_id=client_id ) self.exp = exp self.user_groups = user_groups - self.user_permissions = user_permissions + self.realm_permissions = realm_permissions + self.client_permissions = client_permissions self._keycloak.public_key = lambda: raw_realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.set_auth_success(auth_success, oidc_profile, user_info) def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration and audience checks as we use a static # oidc_profile for the tests with expired tokens in it options["verify_exp"] = False options["verify_aud"] = False decoded = super().decode_token(token, options) # Merge the user info configured to be part of the decode token userinfo = self.userinfo() if userinfo is not None: decoded = {**decoded, **userinfo} # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["iat"] if self.exp is not None: decoded["exp"] = self.exp decoded["iat"] = self.exp - expire_in else: now = int(datetime.now(tz=timezone.utc).timestamp()) decoded["iat"] = now decoded["exp"] = now + expire_in decoded["groups"] = self.user_groups decoded["aud"] = [self.client_id, "account"] decoded["azp"] = self.client_id - if self.user_permissions: + decoded["realm_access"]["roles"] += self.realm_permissions + if self.client_permissions: decoded["resource_access"][self.client_id] = { - "roles": self.user_permissions + "roles": self.client_permissions } return decoded def set_auth_success( self, auth_success: bool, oidc_profile: Optional[Dict] = None, user_info: Optional[Dict] = None, ) -> None: # following type ignore because mypy is not too happy about affecting mock to # method "Cannot assign to a method affecting mock". Ignore for now. self.authorization_code = Mock() # type: ignore self.refresh_token = Mock() # type: ignore self.login = Mock() # type: ignore self.userinfo = Mock() # type: ignore self.logout = Mock() # type: ignore self.auth_success = auth_success if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.refresh_token.return_value = copy(oidc_profile) self.login.return_value = copy(oidc_profile) self.userinfo.return_value = copy(user_info) else: self.authorization_url = Mock() # type: ignore error = { "error": "invalid_grant", "error_description": "Invalid user credentials", } error_message = json.dumps(error).encode() exception = KeycloakError(error_message=error_message, response_code=401) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception self.login.side_effect = exception def keycloak_oidc_factory( server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], - user_permissions: List[str] = [], + realm_permissions: List[str] = [], + client_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): """Keycloak mock fixture factory. Report to :py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring. """ @pytest.fixture def keycloak_oidc(): return KeycloackOpenIDConnectMock( server_url=server_url, realm_name=realm_name, client_id=client_id, auth_success=auth_success, exp=exp, user_groups=user_groups, - user_permissions=user_permissions, + realm_permissions=realm_permissions, + client_permissions=client_permissions, oidc_profile=oidc_profile, user_info=user_info, raw_realm_public_key=raw_realm_public_key, ) return keycloak_oidc # for backward compatibility # TODO: remove that alias once swh-deposit and swh-web use new function name keycloak_mock_factory = keycloak_oidc_factory # generic keycloak fixture that can be used within tests # (cf. test_keycloak.py, test_utils.py, django related tests) # or external modules using that pytest plugin _keycloak_oidc = keycloak_oidc_factory( server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, ) @pytest.fixture def keycloak_oidc(_keycloak_oidc, mocker): for oidc_client_factory in ( "swh.auth.django.views.keycloak_oidc_client", "swh.auth.django.backends.keycloak_oidc_client", ): keycloak_oidc_client = mocker.patch(oidc_client_factory) keycloak_oidc_client.return_value = _keycloak_oidc return _keycloak_oidc diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py index 9b57813..19eabca 100644 --- a/swh/auth/tests/django/test_backends.py +++ b/swh/auth/tests/django/test_backends.py @@ -1,256 +1,262 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from unittest.mock import Mock from django.conf import settings from django.contrib.auth import authenticate, get_backends import pytest from rest_framework.exceptions import AuthenticationFailed from swh.auth.django.backends import OIDCBearerTokenAuthentication from swh.auth.django.models import OIDCUser from swh.auth.django.utils import reverse from swh.auth.keycloak import ExpiredSignatureError def _authenticate_user(request_factory): request = request_factory.get(reverse("root")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, keycloak_oidc): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(keycloak_oidc.client_id, {}) assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(keycloak_oidc, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_oidc.user_groups = ["/staff"] oidc_profile = keycloak_oidc.login() user = _authenticate_user(request_factory) decoded_token = keycloak_oidc.decode_token(user.access_token) _check_authenticated_user(user, decoded_token, keycloak_oidc) auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(keycloak_oidc, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_oidc.set_auth_success(False) user = _authenticate_user(request_factory) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_success( keycloak_oidc, request_factory ): """ Checks access token renewal success using refresh token. """ oidc_profile = keycloak_oidc.login() decoded_token = keycloak_oidc.decode_token(oidc_profile["access_token"]) keycloak_oidc.decode_token = Mock() keycloak_oidc.decode_token.side_effect = [ ExpiredSignatureError("access token token has expired"), decoded_token, ] user = _authenticate_user(request_factory) oidc_profile = keycloak_oidc.login() keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is not None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_failure( keycloak_oidc, request_factory ): """ Checks access token renewal failure using refresh token. """ keycloak_oidc.decode_token = Mock() keycloak_oidc.decode_token.side_effect = ExpiredSignatureError( "access token token has expired" ) keycloak_oidc.refresh_token.side_effect = Exception("OIDC session has expired") user = _authenticate_user(request_factory) oidc_profile = keycloak_oidc.login() keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(keycloak_oidc, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ - permission = "webapp.some-permission" - keycloak_oidc.user_permissions = [permission] + realm_permission = "swh.some-permission" + client_permission = "webapp.some-permission" + keycloak_oidc.realm_permissions = [realm_permission] + keycloak_oidc.client_permissions = [client_permission] user = _authenticate_user(request_factory) - assert user.has_perm(permission) - assert user.get_all_permissions() == {permission} - assert user.get_group_permissions() == {permission} + assert user.has_perm(realm_permission) + assert user.has_perm(client_permission) + assert user.get_all_permissions() == {realm_permission, client_permission} + assert user.get_group_permissions() == {realm_permission, client_permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(keycloak_oidc, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] access_token = oidc_profile["access_token"] decoded_token = keycloak_oidc.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) _check_authenticated_user(user, decoded_token, keycloak_oidc) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_oidc, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() # simulate a failed authentication with a bearer token in expected format keycloak_oidc.set_auth_success(False) refresh_token = oidc_profile["refresh_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_oidc, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid authorization header value. """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_permissions( keycloak_oidc, api_request_factory ): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ - permission = "webapp.some-permission" - keycloak_oidc.user_permissions = [permission] + realm_permission = "swh.some-permission" + client_permission = "webapp.some-permission" + keycloak_oidc.realm_permissions = [realm_permission] + keycloak_oidc.client_permissions = [client_permission] drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] url = reverse("api-test") request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) - assert user.has_perm(permission) - assert user.get_all_permissions() == {permission} - assert user.get_group_permissions() == {permission} + assert user.has_perm(realm_permission) + assert user.has_perm(client_permission) + assert user.get_all_permissions() == {realm_permission, client_permission} + assert user.get_group_permissions() == {realm_permission, client_permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py index a7878d4..c2be128 100644 --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -1,120 +1,121 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime from django.test import override_settings import pytest from swh.auth.django.utils import ( keycloak_oidc_client, oidc_user_from_decoded_token, oidc_user_from_profile, ) from swh.auth.tests.sample_data import ( CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, REALM_NAME, SERVER_URL, ) def _check_user(user, is_staff=False, permissions=set()): assert user.id > 0 assert user.username == DECODED_TOKEN["preferred_username"] assert user.password == "" assert user.first_name == DECODED_TOKEN["given_name"] assert user.last_name == DECODED_TOKEN["family_name"] assert user.email == DECODED_TOKEN["email"] assert user.is_staff == is_staff assert user.permissions == permissions assert user.sub == DECODED_TOKEN["sub"] date_now = datetime.now() if user.expires_at is not None: assert isinstance(user.expires_at, datetime) assert date_now <= user.expires_at if user.refresh_expires_at is not None: assert isinstance(user.refresh_expires_at, datetime) assert date_now <= user.refresh_expires_at assert user.oidc_profile == { k: getattr(user, k) for k in ( "access_token", "expires_in", "expires_at", "id_token", "refresh_token", "refresh_expires_in", "refresh_expires_at", "scope", "session_state", ) } def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) _check_user(user) -def test_oidc_user_from_decoded_token2(): +def test_oidc_user_with_permissions_from_decoded_token(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] + decoded_token["realm_access"] = {"roles": ["swh.ambassador"]} decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) - _check_user(user, is_staff=True, permissions={"read-api"}) + _check_user(user, is_staff=True, permissions={"swh.ambassador", "read-api"}) @pytest.mark.parametrize( "key,mapped_key", [ ("preferred_username", "username"), ("given_name", "first_name"), ("family_name", "last_name"), ("email", "email"), ], ) def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): decoded_token = copy(DECODED_TOKEN) decoded_token.pop(key, None) user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) # Ensure the missing field is mapped to an empty value assert getattr(user, mapped_key) == "" def test_oidc_user_from_profile(keycloak_oidc): user = oidc_user_from_profile(keycloak_oidc, OIDC_PROFILE) _check_user(user) @override_settings( SWH_AUTH_SERVER_URL=None, SWH_AUTH_REALM_NAME=None, SWH_AUTH_CLIENT_ID=None, ) def test_keycloak_oidc_client_missing_django_settings(): with pytest.raises(ValueError, match="settings are mandatory"): keycloak_oidc_client() @override_settings( SWH_AUTH_SERVER_URL=SERVER_URL, SWH_AUTH_REALM_NAME=REALM_NAME, SWH_AUTH_CLIENT_ID=CLIENT_ID, ) def test_keycloak_oidc_client_parameters_from_django_settings(): kc_oidc_client = keycloak_oidc_client() assert kc_oidc_client.server_url == SERVER_URL assert kc_oidc_client.realm_name == REALM_NAME assert kc_oidc_client.client_id == CLIENT_ID