diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index 9c9cfb03..ad4f17f1 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,145 +1,150 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from typing import Any, Dict, Optional from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed import sentry_sdk from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import get_oidc_client from swh.web.auth.models import OIDCUser # OpenID Connect client to communicate with Keycloak server _oidc_client: KeycloakOpenIDConnect = get_oidc_client() def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token["preferred_username"], password="", first_name=decoded_token["given_name"], last_name=decoded_token["family_name"], email=decoded_token["email"], ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] + # extract user permissions if any + resource_access = decoded_token.get("resource_access", {}) + client_resource_access = resource_access.get(_oidc_client.client_id, {}) + user.permissions = set(client_resource_access.get("roles", [])) + # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: # decode JWT token decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = _oidc_user_from_decoded_token(decoded_token) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user class OIDCAuthorizationCodePKCEBackend: def authenticate( self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str ) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = _oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier ) # create Django user user = _oidc_user_from_profile(oidc_profile) # set cache key TTL as access token expiration time assert user.expires_at ttl = int(user.expires_at.timestamp() - timezone.now().timestamp()) # save oidc_profile in cache cache.set(f"oidc_user_{user.id}", oidc_profile, timeout=max(0, ttl)) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get oidc profile from cache oidc_profile = cache.get(f"oidc_user_{user_id}") if oidc_profile: try: user = _oidc_user_from_profile(oidc_profile) # restore auth backend setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): def authenticate(self, request): auth_header = request.META.get("HTTP_AUTHORIZATION") if auth_header is None: return None try: auth_type, token = auth_header.split(" ", 1) except ValueError: raise AuthenticationFailed("Invalid HTTP authorization header format") if auth_type != "Bearer": raise AuthenticationFailed( (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") ) try: # attempt to decode token decoded_token = _oidc_client.decode_token(token) # create Django user user = _oidc_user_from_decoded_token(decoded_token) except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py index 01783857..94bf8da4 100644 --- a/swh/web/auth/models.py +++ b/swh/web/auth/models.py @@ -1,43 +1,80 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime -from typing import Optional +from typing import Optional, Set from django.contrib.auth.models import User class OIDCUser(User): """ Custom User proxy model for remote users storing OpenID Connect related data: profile containing authentication tokens. The model is also not saved to database as all users are already stored in the Keycloak one. """ # OIDC subject identifier sub: str = "" # OIDC tokens and session related data, only relevant when a user # authenticates from a web browser access_token: Optional[str] = None expires_at: Optional[datetime] = None id_token: Optional[str] = None refresh_token: Optional[str] = None refresh_expires_at: Optional[datetime] = None scope: Optional[str] = None session_state: Optional[str] = None + # User permissions + permissions: Set[str] + class Meta: app_label = "swh.web.auth" proxy = True def save(self, **kwargs): """ Override django.db.models.Model.save to avoid saving the remote users to web application database. """ pass + + def get_group_permissions(self, obj=None) -> Set[str]: + """ + Override django.contrib.auth.models.PermissionsMixin.get_group_permissions + to get permissions from OIDC + """ + return self.get_all_permissions(obj) + + def get_all_permissions(self, obj=None) -> Set[str]: + """ + Override django.contrib.auth.models.PermissionsMixin.get_all_permissions + to get permissions from OIDC + """ + return self.permissions + + def has_perm(self, perm, obj=None) -> bool: + """ + Override django.contrib.auth.models.PermissionsMixin.has_perm + to check permission from OIDC + """ + if self.is_active and self.is_superuser: + return True + + return perm in self.permissions + + def has_module_perms(self, app_label) -> bool: + """ + Override django.contrib.auth.models.PermissionsMixin.has_module_perms + to check permissions from OIDC. + """ + if self.is_active and self.is_superuser: + return True + + return any(perm.startswith(app_label) for perm in self.permissions) diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py index 3cbe9448..779ea359 100644 --- a/swh/web/tests/auth/keycloak_mock.py +++ b/swh/web/tests/auth/keycloak_mock.py @@ -1,98 +1,110 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from unittest.mock import Mock from django.utils import timezone from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config from .sample_data import oidc_profile, realm_public_key, userinfo class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): - def __init__(self, auth_success=True, exp=None): + def __init__( + self, auth_success=True, exp=None, user_groups=[], user_permissions=[] + ): swhweb_config = get_config() super().__init__( swhweb_config["keycloak"]["server_url"], swhweb_config["keycloak"]["realm_name"], OIDC_SWH_WEB_CLIENT_ID, ) self.auth_success = auth_success self.exp = exp + self.user_groups = user_groups + self.user_permissions = user_permissions self._keycloak.public_key = lambda: realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.authorization_code = Mock() self.userinfo = Mock() self.logout = Mock() if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.userinfo.return_value = copy(userinfo) else: self.authorization_url = Mock() exception = Exception("Authentication failed") self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration check as we use a static oidc_profile # for the tests with expired tokens in it options["verify_exp"] = False decoded = super().decode_token(token, options) # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["auth_time"] if self.exp is not None: decoded["exp"] = self.exp decoded["auth_time"] = self.exp - expire_in else: decoded["auth_time"] = int(timezone.now().timestamp()) decoded["exp"] = decoded["auth_time"] + expire_in - decoded["groups"] = ["/staff"] + decoded["groups"] = self.user_groups + if self.user_permissions: + decoded["resource_access"][self.client_id] = { + "roles": self.user_permissions + } return decoded -def mock_keycloak(mocker, auth_success=True, exp=None): - kc_oidc_mock = KeycloackOpenIDConnectMock(auth_success, exp) +def mock_keycloak( + mocker, auth_success=True, exp=None, user_groups=[], user_permissions=[] +): + kc_oidc_mock = KeycloackOpenIDConnectMock( + auth_success, exp, user_groups, user_permissions + ) mock_get_oidc_client = mocker.patch("swh.web.auth.views.get_oidc_client") mock_get_oidc_client.return_value = kc_oidc_mock mocker.patch("swh.web.auth.backends._oidc_client", kc_oidc_mock) return kc_oidc_mock diff --git a/swh/web/tests/auth/sample_data.py b/swh/web/tests/auth/sample_data.py index 02cd843e..b22d7b30 100644 --- a/swh/web/tests/auth/sample_data.py +++ b/swh/web/tests/auth/sample_data.py @@ -1,101 +1,128 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information realm_public_key = ( "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnqF4xvGjaI54P6WtJvyGayxP8A93u" "NcA3TH6jitwmyAalj8dN8/NzK9vrdlSA3Ibvp/XQujPSOP7a35YiYFscEJnogTXQpE/FhZrUY" "y21U6ezruVUv4z/ER1cYLb+q5ZI86nXSTNCAbH+lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy" "5o6ke1G3fXbNSXwF4qlWAzo1o7Ms8qNrNyOG8FPx24dvm9xMH7/08IPvh9KUqlnP8h6olpxHr" "drX/q4E+Nzj8Tr8p7Z5CimInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0/LO2zYie" "Hl5Lv7Iig4FOIXIVCaDGQIDAQAB" ) oidc_profile = { "access_token": ( + # decoded token: + # {'acr': '1', + # 'allowed-origins': ['*'], + # 'aud': ['swh-web', 'account'], + # 'auth_time': 1592395601, + # 'azp': 'swh-web', + # 'email': 'john.doe@example.com', + # 'email_verified': False, + # 'exp': 1592396202, + # 'family_name': 'Doe', + # 'given_name': 'John', + # 'groups': ['/staff'], + # 'iat': 1582723101, + # 'iss': 'http://localhost:8080/auth/realms/SoftwareHeritage', + # 'jti': '31fc50b7-bbe5-4f51-91ef-8e3eec51331e', + # 'name': 'John Doe', + # 'nbf': 0, + # 'preferred_username': 'johndoe', + # 'realm_access': {'roles': ['offline_access', 'uma_authorization']}, + # 'resource_access': {'account': {'roles': ['manage-account', + # 'manage-account-links', + # 'view-profile']}}, + # 'scope': 'openid email profile', + # 'session_state': 'd82b90d1-0a94-4e74-ad66-dd95341c7b6d', + # 'sub': 'feacd344-b468-4a65-a236-14f61e6b7200', + # 'typ': 'Bearer' + # } "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV" "Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0." "eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz" "MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz" "MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs" "bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj" "b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2" "MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi" "YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy" "YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi" "MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6" "eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0" "aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl" "cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz" "Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg" "cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv" "aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi" "am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi" "OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-" "Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB" "AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO" "kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc" "HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl" "rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE" "oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ" "6A" ), "expires_in": 600, "id_token": ( "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0" "TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki" "OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi" "OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo" "dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp" "dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh" "NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13" "ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk" "ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx" "IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn" "cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2" "ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi" "am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee" "JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL" "aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_" "PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE" "0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN" "ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX" "ZbYnitD1Typ6Q" ), "not-before-policy": 0, "refresh_expires_in": 1800, "refresh_token": ( "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM" "zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk" "iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC" "JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL" "CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv" "U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q" "6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj" "oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid" "HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi" "OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ" "2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl" "sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic" "mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu" "YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc" "tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG" "UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI" ), "scope": "openid email profile", "session_state": "d82b90d1-0a94-4e74-ad66-dd95341c7b6d", "token_type": "bearer", } userinfo = { "email": "john.doe@example.com", "email_verified": False, "family_name": "Doe", "given_name": "John", "groups": ["/staff"], "name": "John Doe", "preferred_username": "johndoe", "sub": "feacd344-b468-4a65-a236-14f61e6b7200", } diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py index f2acf805..b34a9159 100644 --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -1,148 +1,181 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from django.contrib.auth import authenticate, get_backends import pytest from django.conf import settings from rest_framework.exceptions import AuthenticationFailed from swh.web.auth.backends import OIDCBearerTokenAuthentication from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse from . import sample_data from .keycloak_mock import mock_keycloak def _authenticate_user(request_factory): request = request_factory.get(reverse("oidc-login-complete")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) -def _check_authenticated_user(user, decoded_token): +def _check_authenticated_user(user, decoded_token, kc_oidc_mock): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] + resource_access = decoded_token.get("resource_access", {}) + resource_access_client = resource_access.get(kc_oidc_mock, {}) + assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(mocker, request_factory): - kc_oidc_mock = mock_keycloak(mocker) + kc_oidc_mock = mock_keycloak(mocker, user_groups=["/staff"]) oidc_profile = sample_data.oidc_profile user = _authenticate_user(request_factory) decoded_token = kc_oidc_mock.decode_token(user.access_token) - _check_authenticated_user(user, decoded_token) + _check_authenticated_user(user, decoded_token, kc_oidc_mock) auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory): mock_keycloak(mocker, auth_success=False) user = _authenticate_user(request_factory) assert user is None +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory): + permission = "webapp.some-permission" + mock_keycloak(mocker, user_permissions=[permission]) + user = _authenticate_user(request_factory) + assert user.has_perm(permission) + assert user.get_all_permissions() == {permission} + assert user.get_group_permissions() == {permission} + assert user.has_module_perms("webapp") + assert not user.has_module_perms("foo") + + @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory): url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() kc_oidc_mock = mock_keycloak(mocker) access_token = sample_data.oidc_profile["access_token"] decoded_token = kc_oidc_mock.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}") user, _ = drf_auth_backend.authenticate(request) - _check_authenticated_user(user, decoded_token) + _check_authenticated_user(user, decoded_token, kc_oidc_mock) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory): url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() # simulate a failed authentication with a bearer token in expected format mock_keycloak(mocker, auth_success=False) access_token = sample_data.oidc_profile["access_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format mock_keycloak(mocker) request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory): url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() access_token = sample_data.oidc_profile["access_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{access_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_backend_permissions(mocker, api_request_factory): + permission = "webapp.some-permission" + mock_keycloak(mocker, user_permissions=[permission]) + + drf_auth_backend = OIDCBearerTokenAuthentication() + access_token = sample_data.oidc_profile["access_token"] + url = reverse("api-1-stat-counters") + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}") + user, _ = drf_auth_backend.authenticate(request) + + assert user.has_perm(permission) + assert user.get_all_permissions() == {permission} + assert user.get_group_permissions() == {permission} + assert user.has_module_perms("webapp") + assert not user.has_module_perms("foo")