diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index 0a3aaaf9..4f99c7ad 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,190 +1,190 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import hashlib from typing import Any, Dict, Optional import sentry_sdk from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed, ValidationError from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.models import OIDCUser from swh.web.auth.utils import get_oidc_client # OpenID Connect client to communicate with Keycloak server _oidc_client: KeycloakOpenIDConnect = get_oidc_client() def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token["preferred_username"], password="", first_name=decoded_token["given_name"], last_name=decoded_token["family_name"], email=decoded_token["email"], ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(_oidc_client.client_id, {}) user.permissions = set(client_resource_access.get("roles", [])) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: # decode JWT token try: access_token = oidc_profile["access_token"] decoded_token = _oidc_client.decode_token(access_token) # access token has expired or is invalid except Exception: # get a new access token from authentication provider oidc_profile = _oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = _oidc_user_from_decoded_token(decoded_token) # get authentication init datetime - auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) + auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) # put OIDC profile in cache or update it after token renewal cache_key = f"oidc_user_{user.id}" if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]: # set cache key TTL as refresh token expiration time assert user.refresh_expires_at ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) # save oidc_profile in cache cache.set(cache_key, oidc_profile, timeout=max(0, ttl)) return user class OIDCAuthorizationCodePKCEBackend: def authenticate( self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str ) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = _oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier ) # create Django user user = _oidc_user_from_profile(oidc_profile) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get oidc profile from cache oidc_profile = cache.get(f"oidc_user_{user_id}") if oidc_profile: try: user = _oidc_user_from_profile(oidc_profile) # restore auth backend setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): def authenticate(self, request): auth_header = request.META.get("HTTP_AUTHORIZATION") if auth_header is None: return None try: auth_type, refresh_token = auth_header.split(" ", 1) except ValueError: raise AuthenticationFailed("Invalid HTTP authorization header format") if auth_type != "Bearer": raise AuthenticationFailed( (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") ) try: # compute a cache key from the token that does not exceed # memcached key size limit hasher = hashlib.sha1() hasher.update(refresh_token.encode("ascii")) cache_key = f"api_token_{hasher.hexdigest()}" # check if an access token is cached access_token = cache.get(cache_key) # attempt to decode access token try: decoded_token = _oidc_client.decode_token(access_token) except Exception: # access token is None or it has expired decoded_token = None if access_token is None or decoded_token is None: # get a new access token from authentication provider access_token = _oidc_client.refresh_token(refresh_token)["access_token"] # decode access token decoded_token = _oidc_client.decode_token(access_token) # compute access token expiration exp = datetime.fromtimestamp(decoded_token["exp"]) ttl = int(exp.timestamp() - timezone.now().timestamp()) # save access token in cache while it is valid cache.set(cache_key, access_token, timeout=max(0, ttl)) # create Django user user = _oidc_user_from_decoded_token(decoded_token) except UnicodeEncodeError as e: sentry_sdk.capture_exception(e) raise ValidationError("Invalid bearer token") except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py index 093d12ff..7e61b061 100644 --- a/swh/web/tests/auth/keycloak_mock.py +++ b/swh/web/tests/auth/keycloak_mock.py @@ -1,117 +1,117 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from unittest.mock import Mock from keycloak.exceptions import KeycloakError from django.utils import timezone from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config from .sample_data import oidc_profile, realm_public_key, userinfo class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): def __init__( self, auth_success=True, exp=None, user_groups=[], user_permissions=[] ): swhweb_config = get_config() super().__init__( swhweb_config["keycloak"]["server_url"], swhweb_config["keycloak"]["realm_name"], OIDC_SWH_WEB_CLIENT_ID, ) self.auth_success = auth_success self.exp = exp self.user_groups = user_groups self.user_permissions = user_permissions self._keycloak.public_key = lambda: realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.authorization_code = Mock() self.refresh_token = Mock() self.userinfo = Mock() self.logout = Mock() if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.refresh_token.return_value = copy(oidc_profile) self.userinfo.return_value = copy(userinfo) else: self.authorization_url = Mock() exception = KeycloakError( error_message="Authentication failed", response_code=401 ) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration check as we use a static oidc_profile # for the tests with expired tokens in it options["verify_exp"] = False decoded = super().decode_token(token, options) # tweak auth and exp time for tests - expire_in = decoded["exp"] - decoded["auth_time"] + expire_in = decoded["exp"] - decoded["iat"] if self.exp is not None: decoded["exp"] = self.exp - decoded["auth_time"] = self.exp - expire_in + decoded["iat"] = self.exp - expire_in else: - decoded["auth_time"] = int(timezone.now().timestamp()) - decoded["exp"] = decoded["auth_time"] + expire_in + decoded["iat"] = int(timezone.now().timestamp()) + decoded["exp"] = decoded["iat"] + expire_in decoded["groups"] = self.user_groups if self.user_permissions: decoded["resource_access"][self.client_id] = { "roles": self.user_permissions } return decoded def mock_keycloak( mocker, auth_success=True, exp=None, user_groups=[], user_permissions=[] ): kc_oidc_mock = KeycloackOpenIDConnectMock( auth_success, exp, user_groups, user_permissions ) mock_get_oidc_client = mocker.patch("swh.web.auth.views.get_oidc_client") mock_get_oidc_client.return_value = kc_oidc_mock mocker.patch("swh.web.auth.backends._oidc_client", kc_oidc_mock) return kc_oidc_mock diff --git a/swh/web/tests/auth/sample_data.py b/swh/web/tests/auth/sample_data.py index b22d7b30..edccc9af 100644 --- a/swh/web/tests/auth/sample_data.py +++ b/swh/web/tests/auth/sample_data.py @@ -1,128 +1,128 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information realm_public_key = ( "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnqF4xvGjaI54P6WtJvyGayxP8A93u" "NcA3TH6jitwmyAalj8dN8/NzK9vrdlSA3Ibvp/XQujPSOP7a35YiYFscEJnogTXQpE/FhZrUY" "y21U6ezruVUv4z/ER1cYLb+q5ZI86nXSTNCAbH+lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy" "5o6ke1G3fXbNSXwF4qlWAzo1o7Ms8qNrNyOG8FPx24dvm9xMH7/08IPvh9KUqlnP8h6olpxHr" "drX/q4E+Nzj8Tr8p7Z5CimInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0/LO2zYie" "Hl5Lv7Iig4FOIXIVCaDGQIDAQAB" ) oidc_profile = { "access_token": ( # decoded token: # {'acr': '1', # 'allowed-origins': ['*'], # 'aud': ['swh-web', 'account'], - # 'auth_time': 1592395601, + # 'auth_time': 1582723101, # 'azp': 'swh-web', # 'email': 'john.doe@example.com', # 'email_verified': False, # 'exp': 1592396202, # 'family_name': 'Doe', # 'given_name': 'John', # 'groups': ['/staff'], - # 'iat': 1582723101, + # 'iat': 1592395601, # 'iss': 'http://localhost:8080/auth/realms/SoftwareHeritage', # 'jti': '31fc50b7-bbe5-4f51-91ef-8e3eec51331e', # 'name': 'John Doe', # 'nbf': 0, # 'preferred_username': 'johndoe', # 'realm_access': {'roles': ['offline_access', 'uma_authorization']}, # 'resource_access': {'account': {'roles': ['manage-account', # 'manage-account-links', # 'view-profile']}}, # 'scope': 'openid email profile', # 'session_state': 'd82b90d1-0a94-4e74-ad66-dd95341c7b6d', # 'sub': 'feacd344-b468-4a65-a236-14f61e6b7200', # 'typ': 'Bearer' # } "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV" "Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0." "eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz" "MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz" "MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs" "bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj" "b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2" "MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi" "YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy" "YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi" "MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6" "eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0" "aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl" "cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz" "Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg" "cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv" "aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi" "am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi" "OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-" "Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB" "AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO" "kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc" "HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl" "rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE" "oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ" "6A" ), "expires_in": 600, "id_token": ( "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0" "TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki" "OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi" "OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo" "dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp" "dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh" "NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13" "ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk" "ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx" "IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn" "cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2" "ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi" "am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee" "JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL" "aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_" "PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE" "0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN" "ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX" "ZbYnitD1Typ6Q" ), "not-before-policy": 0, "refresh_expires_in": 1800, "refresh_token": ( "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM" "zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk" "iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC" "JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL" "CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv" "U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q" "6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj" "oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid" "HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi" "OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ" "2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl" "sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic" "mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu" "YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc" "tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG" "UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI" ), "scope": "openid email profile", "session_state": "d82b90d1-0a94-4e74-ad66-dd95341c7b6d", "token_type": "bearer", } userinfo = { "email": "john.doe@example.com", "email_verified": False, "family_name": "Doe", "given_name": "John", "groups": ["/staff"], "name": "John Doe", "preferred_username": "johndoe", "sub": "feacd344-b468-4a65-a236-14f61e6b7200", } diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py index b60e34b3..1e3e9470 100644 --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -1,268 +1,268 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from unittest.mock import Mock import pytest from django.conf import settings from django.contrib.auth import authenticate, get_backends from rest_framework.exceptions import AuthenticationFailed from swh.web.auth.backends import OIDCBearerTokenAuthentication from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse from . import sample_data from .keycloak_mock import mock_keycloak def _authenticate_user(request_factory): request = request_factory.get(reverse("oidc-login-complete")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, kc_oidc_mock): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(kc_oidc_mock, {}) assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(mocker, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ kc_oidc_mock = mock_keycloak(mocker, user_groups=["/staff"]) oidc_profile = sample_data.oidc_profile user = _authenticate_user(request_factory) decoded_token = kc_oidc_mock.decode_token(user.access_token) _check_authenticated_user(user, decoded_token, kc_oidc_mock) - auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) + auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ mock_keycloak(mocker, auth_success=False) user = _authenticate_user(request_factory) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_success(mocker, request_factory): """ Checks access token renewal success using refresh token. """ kc_oidc_mock = mock_keycloak(mocker) oidc_profile = sample_data.oidc_profile decoded_token = kc_oidc_mock.decode_token(oidc_profile["access_token"]) new_access_token = "new_access_token" def _refresh_token(refresh_token): oidc_profile = dict(sample_data.oidc_profile) oidc_profile["access_token"] = new_access_token return oidc_profile def _decode_token(access_token): if access_token != new_access_token: raise Exception("access token token has expired") else: return decoded_token kc_oidc_mock.decode_token = Mock() kc_oidc_mock.decode_token.side_effect = _decode_token kc_oidc_mock.refresh_token.side_effect = _refresh_token user = _authenticate_user(request_factory) kc_oidc_mock.refresh_token.assert_called_with( sample_data.oidc_profile["refresh_token"] ) assert user is not None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_failure(mocker, request_factory): """ Checks access token renewal failure using refresh token. """ kc_oidc_mock = mock_keycloak(mocker) def _refresh_token(refresh_token): raise Exception("OIDC session has expired") def _decode_token(access_token): raise Exception("access token token has expired") kc_oidc_mock.decode_token = Mock() kc_oidc_mock.decode_token.side_effect = _decode_token kc_oidc_mock.refresh_token.side_effect = _refresh_token user = _authenticate_user(request_factory) kc_oidc_mock.refresh_token.assert_called_with( sample_data.oidc_profile["refresh_token"] ) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ permission = "webapp.some-permission" mock_keycloak(mocker, user_permissions=[permission]) user = _authenticate_user(request_factory) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() kc_oidc_mock = mock_keycloak(mocker) refresh_token = sample_data.oidc_profile["refresh_token"] access_token = sample_data.oidc_profile["access_token"] decoded_token = kc_oidc_mock.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) _check_authenticated_user(user, decoded_token, kc_oidc_mock) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() # simulate a failed authentication with a bearer token in expected format mock_keycloak(mocker, auth_success=False) refresh_token = sample_data.oidc_profile["refresh_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid authorization header value. """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() refresh_token = sample_data.oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_permissions(mocker, api_request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ permission = "webapp.some-permission" mock_keycloak(mocker, user_permissions=[permission]) drf_auth_backend = OIDCBearerTokenAuthentication() refresh_token = sample_data.oidc_profile["refresh_token"] url = reverse("api-1-stat-counters") request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo")