diff --git a/swh/auth/django/models.py b/swh/auth/django/models.py index 59a41a7..305201e 100644 --- a/swh/auth/django/models.py +++ b/swh/auth/django/models.py @@ -1,108 +1,123 @@ -# Copyright (C) 2020-2021 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Any, Dict, Optional, Set -from django.contrib.auth.models import User +from django.contrib.auth.models import Group, User +from django.db.models import Q class OIDCUser(User): """ Custom User proxy model for remote users storing OpenID Connect related data: profile containing authentication tokens. The model is also not saved to database as all users are already stored in the Keycloak one. """ # OIDC subject identifier sub: str = "" # OIDC tokens and session related data, only relevant when a user # authenticates from a web browser access_token: Optional[str] = None expires_in: Optional[int] = None expires_at: Optional[datetime] = None id_token: Optional[str] = None refresh_token: Optional[str] = None refresh_expires_in: Optional[int] = None refresh_expires_at: Optional[datetime] = None scope: Optional[str] = None session_state: Optional[str] = None # User permissions permissions: Set[str] + # User groups + group_names: Set[str] + class Meta: # TODO: To redefine in subclass of this class # Forced to empty otherwise, django complains about it # "Model class swh.auth.django.OIDCUser doesn't declare an explicit app_label # and isn't in an application in INSTALLED_APPS" app_label = "" proxy = True auto_created = True # prevent model to be created in database by migrations def save(self, **kwargs): """ Override django.db.models.Model.save to avoid saving the remote users to web application database. """ pass def get_group_permissions(self, obj=None) -> Set[str]: """ Override django.contrib.auth.models.PermissionsMixin.get_group_permissions to get permissions from OIDC """ return self.get_all_permissions(obj) def get_all_permissions(self, obj=None) -> Set[str]: """ Override django.contrib.auth.models.PermissionsMixin.get_all_permissions to get permissions from OIDC """ return self.permissions def has_perm(self, perm, obj=None) -> bool: """ Override django.contrib.auth.models.PermissionsMixin.has_perm to check permission from OIDC """ if self.is_active and self.is_superuser: return True return perm in self.permissions def has_module_perms(self, app_label) -> bool: """ Override django.contrib.auth.models.PermissionsMixin.has_module_perms to check permissions from OIDC. """ if self.is_active and self.is_superuser: return True return any(perm.startswith(app_label) for perm in self.permissions) + @property + def groups(self): + """ + Override django.contrib.auth.models.PermissionsMixin.groups + to get groups from OIDC. + """ + search_query = Q() + for group_name in self.group_names: + search_query = search_query | Q(name=group_name) + return Group.objects.filter(search_query) + @property def oidc_profile(self) -> Dict[str, Any]: """ Returns OpenID Connect profile associated to the user as a dictionary. """ return { k: getattr(self, k) for k in ( "access_token", "expires_in", "expires_at", "id_token", "refresh_token", "refresh_expires_in", "refresh_expires_at", "scope", "session_state", ) } diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py index fe47b9e..aad1729 100644 --- a/swh/auth/django/utils.py +++ b/swh/auth/django/utils.py @@ -1,191 +1,202 @@ -# Copyright (C) 2020-2021 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from typing import Any, Dict, Optional from django.conf import settings +from django.contrib.auth.models import Group from django.http import HttpRequest, QueryDict from django.urls import reverse as django_reverse from swh.auth.django.models import OIDCUser from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token.get("preferred_username", ""), password="", first_name=decoded_token.get("given_name", ""), last_name=decoded_token.get("family_name", ""), email=decoded_token.get("email", ""), ) - # set is_staff user property based on groups + # process keycloak groups + group_names = set() if "groups" in decoded_token: + # set is_staff user property based on group membership user.is_staff = "/staff" in decoded_token["groups"] + for group_name in decoded_token["groups"]: + # remove leading slash added by keycloak to group name + django_group_name = group_name.lstrip("/") + # ensure a corresponding django group exist + Group.objects.get_or_create(name=django_group_name) + group_names.add(django_group_name) realm_access = decoded_token.get("realm_access", {}) permissions = realm_access.get("roles", []) if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) permissions += client_resource_access.get("roles", []) # set user permissions and filter out default keycloak realm roles user.permissions = set(permissions) - {"offline_access", "uma_authorization"} + # set user groups + user.group_names = group_names # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def oidc_user_from_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] ) -> OIDCUser: """Initialize an OIDCUser out of an oidc profile dict. Args: oidc_client: KeycloakOpenIDConnect used to discuss with keycloak oidc_profile: OIDC profile retrieved once connected to keycloak Returns: OIDCUser instance parsed out of the token received. """ # decode JWT token try: access_token = oidc_profile["access_token"] decoded_token = oidc_client.decode_token(access_token) # access token has expired except ExpiredSignatureError: # get a new access token from authentication provider oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str: return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}" def keycloak_oidc_client() -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class from the following django settings: * SWH_AUTH_SERVER_URL * SWH_AUTH_REALM_NAME * SWH_AUTH_CLIENT_ID Returns: An object to ease the interaction with the Keycloak server Raises: ValueError: at least one mandatory django setting is not set """ server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None) realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None) client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None) if server_url is None or realm_name is None or client_id is None: raise ValueError( "SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django " "settings are mandatory to instantiate KeycloakOpenIDConnect class" ) return KeycloakOpenIDConnect( server_url=server_url, realm_name=realm_name, client_id=client_id ) def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[Dict[str, Any]] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py index 9598635..6540a7c 100644 --- a/swh/auth/tests/django/test_backends.py +++ b/swh/auth/tests/django/test_backends.py @@ -1,289 +1,287 @@ -# Copyright (C) 2020-2021 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import json from unittest.mock import Mock from django.conf import settings from django.contrib.auth import authenticate, get_backends from django.core.cache import cache import pytest from rest_framework.exceptions import AuthenticationFailed from swh.auth.django.backends import OIDCBearerTokenAuthentication from swh.auth.django.models import OIDCUser from swh.auth.django.utils import oidc_profile_cache_key, reverse from swh.auth.keycloak import ExpiredSignatureError, KeycloakError +pytestmark = pytest.mark.django_db + def _authenticate_user(request_factory): request = request_factory.get(reverse("root")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, keycloak_oidc): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) + assert {group.name for group in user.groups.all()} == { + group_name.lstrip("/") for group_name in keycloak_oidc.user_groups + } assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(keycloak_oidc.client_id, {}) assert user.permissions == set(resource_access_client.get("roles", [])) + assert all(user.has_perm(perm) for perm in resource_access_client.get("roles", [])) -@pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(keycloak_oidc, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ - keycloak_oidc.user_groups = ["/staff"] + keycloak_oidc.user_groups = ["/staff", "/other_group"] oidc_profile = keycloak_oidc.login() user = _authenticate_user(request_factory) decoded_token = keycloak_oidc.decode_token(user.access_token) _check_authenticated_user(user, decoded_token, keycloak_oidc) auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user -@pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(keycloak_oidc, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_oidc.set_auth_success(False) user = _authenticate_user(request_factory) assert user is None -@pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_success( keycloak_oidc, request_factory ): """ Checks access token renewal success using refresh token. """ oidc_profile = keycloak_oidc.login() decoded_token = keycloak_oidc.decode_token(oidc_profile["access_token"]) keycloak_oidc.decode_token = Mock() keycloak_oidc.decode_token.side_effect = [ ExpiredSignatureError("access token token has expired"), decoded_token, ] user = _authenticate_user(request_factory) oidc_profile = keycloak_oidc.login() keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is not None -@pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_failure( keycloak_oidc, request_factory ): """ Checks access token renewal failure using refresh token. """ # authenticate user user = _authenticate_user(request_factory) assert user is not None # OIDC profile should be in cache cache_key = oidc_profile_cache_key(keycloak_oidc, user.id) assert cache.get(cache_key) is not None # simulate terminated OIDC session keycloak_oidc.decode_token = Mock() keycloak_oidc.decode_token.side_effect = ExpiredSignatureError( "access token token has expired" ) kc_error_dict = { "error": "invalid_grant", "error_description": "Session not active", } keycloak_oidc.refresh_token.side_effect = KeycloakError( error_message=json.dumps(kc_error_dict).encode(), response_code=400 ) backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) # try to authenticate user again from its id and cached OIDC profile user = get_backends()[backend_idx].get_user(user.id) # it should have tried to refresh token oidc_profile = keycloak_oidc.login() keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) # authentication failed assert user is None # invalid OIDC profile should have been removed from cache assert cache.get(cache_key) is None -@pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(keycloak_oidc, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ realm_permission = "swh.some-permission" client_permission = "webapp.some-permission" keycloak_oidc.realm_permissions = [realm_permission] keycloak_oidc.client_permissions = [client_permission] user = _authenticate_user(request_factory) assert user.has_perm(realm_permission) assert user.has_perm(client_permission) assert user.get_all_permissions() == {realm_permission, client_permission} assert user.get_group_permissions() == {realm_permission, client_permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") -@pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(keycloak_oidc, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] access_token = oidc_profile["access_token"] decoded_token = keycloak_oidc.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) _check_authenticated_user(user, decoded_token, keycloak_oidc) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None -@pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_oidc, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() # simulate a failed authentication with a bearer token in expected format keycloak_oidc.set_auth_success(False) refresh_token = oidc_profile["refresh_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_oidc, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid authorization header value. """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) -@pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_permissions( keycloak_oidc, api_request_factory ): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ realm_permission = "swh.some-permission" client_permission = "webapp.some-permission" keycloak_oidc.realm_permissions = [realm_permission] keycloak_oidc.client_permissions = [client_permission] drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] url = reverse("api-test") request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) assert user.has_perm(realm_permission) assert user.has_perm(client_permission) assert user.get_all_permissions() == {realm_permission, client_permission} assert user.get_group_permissions() == {realm_permission, client_permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py index bc5c1da..5e899d2 100644 --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -1,123 +1,132 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime from django.test import override_settings import pytest from swh.auth.django.utils import ( keycloak_oidc_client, oidc_user_from_decoded_token, oidc_user_from_profile, ) from swh.auth.tests.sample_data import ( CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, REALM_NAME, SERVER_URL, ) +pytestmark = pytest.mark.django_db -def _check_user(user, is_staff=False, permissions=set()): + +def _check_user(user, is_staff=False, permissions=set(), groups=set()): assert user.id > 0 assert user.username == DECODED_TOKEN["preferred_username"] assert user.password == "" assert user.first_name == DECODED_TOKEN["given_name"] assert user.last_name == DECODED_TOKEN["family_name"] assert user.email == DECODED_TOKEN["email"] assert user.is_staff == is_staff + assert {group.name for group in user.groups.all()} == groups assert user.permissions == permissions + assert all(user.has_perm(perm) for perm in permissions) assert user.sub == DECODED_TOKEN["sub"] date_now = datetime.now() if user.expires_at is not None: assert isinstance(user.expires_at, datetime) assert date_now <= user.expires_at if user.refresh_expires_at is not None: assert isinstance(user.refresh_expires_at, datetime) assert date_now <= user.refresh_expires_at assert user.oidc_profile == { k: getattr(user, k) for k in ( "access_token", "expires_in", "expires_at", "id_token", "refresh_token", "refresh_expires_in", "refresh_expires_at", "scope", "session_state", ) } def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) _check_user(user) def test_oidc_user_with_permissions_from_decoded_token(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["realm_access"] = {"roles": ["swh.ambassador"]} decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) - _check_user(user, is_staff=True, permissions={"swh.ambassador", "read-api"}) + _check_user( + user, + is_staff=True, + permissions={"swh.ambassador", "read-api"}, + groups={group_name.lstrip("/") for group_name in decoded_token["groups"]}, + ) @pytest.mark.parametrize( "key,mapped_key", [ ("preferred_username", "username"), ("given_name", "first_name"), ("family_name", "last_name"), ("email", "email"), ], ) def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): decoded_token = copy(DECODED_TOKEN) decoded_token.pop(key, None) user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) # Ensure the missing field is mapped to an empty value assert getattr(user, mapped_key) == "" def test_oidc_user_from_profile(keycloak_oidc): user = oidc_user_from_profile(keycloak_oidc, OIDC_PROFILE) _check_user(user) @override_settings( SWH_AUTH_SERVER_URL=None, SWH_AUTH_REALM_NAME=None, SWH_AUTH_CLIENT_ID=None, ) def test_keycloak_oidc_client_missing_django_settings(): with pytest.raises(ValueError, match="settings are mandatory"): keycloak_oidc_client() @override_settings( SWH_AUTH_SERVER_URL=SERVER_URL, SWH_AUTH_REALM_NAME=REALM_NAME, SWH_AUTH_CLIENT_ID=CLIENT_ID, ) def test_keycloak_oidc_client_parameters_from_django_settings(): kc_oidc_client = keycloak_oidc_client() assert kc_oidc_client.server_url == SERVER_URL assert kc_oidc_client.realm_name == REALM_NAME assert kc_oidc_client.client_id == CLIENT_ID