diff --git a/swh/auth/django/backends.py b/swh/auth/django/backends.py index c5eeba4..c61e5fc 100644 --- a/swh/auth/django/backends.py +++ b/swh/auth/django/backends.py @@ -1,218 +1,225 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import hashlib from typing import Any, Dict, Optional from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed, ValidationError import sentry_sdk from swh.auth.django.models import OIDCUser from swh.auth.django.utils import ( keycloak_oidc_client, oidc_profile_cache_key, oidc_user_from_decoded_token, oidc_user_from_profile, ) from swh.auth.keycloak import ( ExpiredSignatureError, KeycloakError, KeycloakOpenIDConnect, keycloak_error_message, ) def _update_cached_oidc_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any], user: OIDCUser ) -> None: """ Update cached OIDC profile associated to a user if needed: when the profile is not stored in cache or when the authentication tokens have changed. Args: oidc_client: KeycloakOpenID wrapper oidc_profile: OIDC profile used to authenticate a user user: django model representing the authenticated user """ # put OIDC profile in cache or update it after token renewal cache_key = oidc_profile_cache_key(oidc_client, user.id) if ( cache.get(cache_key) is None or user.access_token != oidc_profile["access_token"] ): # set cache key TTL as refresh token expiration time assert user.refresh_expires_at ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) # save oidc_profile in cache cache.set(cache_key, user.oidc_profile, timeout=max(0, ttl)) class OIDCAuthorizationCodePKCEBackend: """ Django authentication backend using Keycloak OpenID Connect authorization code flow with PKCE ("Proof Key for Code Exchange"). To use that backend globally in your django application, proceed as follow: * add ``"swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend"`` to the ``AUTHENTICATION_BACKENDS`` django setting * configure Keycloak URL, realm and client by adding ``SWH_AUTH_SERVER_URL``, ``SWH_AUTH_REALM_NAME`` and ``SWH_AUTH_CLIENT_ID`` in django settings * add ``swh.auth.django.views.urlpatterns`` to your django application URLs * add an HTML link targeting the ``"oidc-login"`` django view in your application views * once a user is logged in, add an HTML link targeting the ``"oidc-logout"`` django view in your application views (a ``next_path`` query parameter can be used to redirect to a view of choice once the user is logged out) """ def authenticate( self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str ) -> Optional[OIDCUser]: user = None try: oidc_client = keycloak_oidc_client() # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier ) # create Django user user = oidc_user_from_profile(oidc_client, oidc_profile) # update cached oidc profile if needed _update_cached_oidc_profile(oidc_client, oidc_profile, user) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get oidc profile from cache oidc_client = keycloak_oidc_client() - oidc_profile = cache.get(oidc_profile_cache_key(oidc_client, user_id)) + cache_key = oidc_profile_cache_key(oidc_client, user_id) + oidc_profile = cache.get(cache_key) if oidc_profile: try: user = oidc_user_from_profile(oidc_client, oidc_profile) # update cached oidc profile if needed _update_cached_oidc_profile(oidc_client, oidc_profile, user) # restore auth backend setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") return user + except KeycloakError as ke: + error_msg = keycloak_error_message(ke) + if error_msg == "invalid_grant: Session not active": + # user session no longer active, remove oidc profile from cache + cache.delete(cache_key) + return None except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): """ Django REST Framework authentication backend using bearer tokens for Keycloak OpenID Connect. It enables to authenticate a Web API user by sending a long-lived OpenID Connect refresh token in HTTP Authorization headers. Long lived refresh tokens can be generated by opening an OpenID Connect session with the following scope: ``openid offline_access``. To use that backend globally in your DRF application, proceed as follow: * add ``"swh.auth.django.backends.OIDCBearerTokenAuthentication"`` to the ``REST_FRAMEWORK["DEFAULT_AUTHENTICATION_CLASSES"]`` django setting. * configure Keycloak URL, realm and client by adding ``SWH_AUTH_SERVER_URL``, ``SWH_AUTH_REALM_NAME`` and ``SWH_AUTH_CLIENT_ID`` in django settings Users will then be able to perform authenticated Web API calls by sending their refresh token in HTTP Authorization headers, for instance: ``curl -H "Authorization: Bearer ${TOKEN}" https://...``. """ def authenticate(self, request): auth_header = request.META.get("HTTP_AUTHORIZATION") if auth_header is None: return None try: auth_type, refresh_token = auth_header.split(" ", 1) except ValueError: raise AuthenticationFailed("Invalid HTTP authorization header format") if auth_type != "Bearer": raise AuthenticationFailed( (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") ) try: oidc_client = keycloak_oidc_client() # compute a cache key from the token that does not exceed # memcached key size limit hasher = hashlib.sha1() hasher.update(refresh_token.encode("ascii")) cache_key = f"api_token_{hasher.hexdigest()}" # check if an access token is cached access_token = cache.get(cache_key) if access_token is not None: # attempt to decode access token try: decoded_token = oidc_client.decode_token(access_token) # access token has expired except ExpiredSignatureError: decoded_token = None if access_token is None or decoded_token is None: # get a new access token from authentication provider access_token = oidc_client.refresh_token(refresh_token)["access_token"] # decode access token decoded_token = oidc_client.decode_token(access_token) # compute access token expiration exp = datetime.fromtimestamp(decoded_token["exp"]) ttl = int(exp.timestamp() - timezone.now().timestamp()) # save access token in cache while it is valid cache.set(cache_key, access_token, timeout=max(0, ttl)) # create Django user user = oidc_user_from_decoded_token(decoded_token, oidc_client.client_id) except UnicodeEncodeError as e: sentry_sdk.capture_exception(e) raise ValidationError("Invalid bearer token") except KeycloakError as ke: error_msg = keycloak_error_message(ke) if error_msg in ( "invalid_grant: Offline session not active", "invalid_grant: Offline user session not found", ): error_msg = ( "Bearer token expired after a long period of inactivity; " "please generate a new one." ) sentry_sdk.capture_exception(ke) raise AuthenticationFailed(error_msg) except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py index 19eabca..9598635 100644 --- a/swh/auth/tests/django/test_backends.py +++ b/swh/auth/tests/django/test_backends.py @@ -1,262 +1,289 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta +import json from unittest.mock import Mock from django.conf import settings from django.contrib.auth import authenticate, get_backends +from django.core.cache import cache import pytest from rest_framework.exceptions import AuthenticationFailed from swh.auth.django.backends import OIDCBearerTokenAuthentication from swh.auth.django.models import OIDCUser -from swh.auth.django.utils import reverse -from swh.auth.keycloak import ExpiredSignatureError +from swh.auth.django.utils import oidc_profile_cache_key, reverse +from swh.auth.keycloak import ExpiredSignatureError, KeycloakError def _authenticate_user(request_factory): request = request_factory.get(reverse("root")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, keycloak_oidc): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(keycloak_oidc.client_id, {}) assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(keycloak_oidc, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_oidc.user_groups = ["/staff"] oidc_profile = keycloak_oidc.login() user = _authenticate_user(request_factory) decoded_token = keycloak_oidc.decode_token(user.access_token) _check_authenticated_user(user, decoded_token, keycloak_oidc) auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(keycloak_oidc, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_oidc.set_auth_success(False) user = _authenticate_user(request_factory) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_success( keycloak_oidc, request_factory ): """ Checks access token renewal success using refresh token. """ oidc_profile = keycloak_oidc.login() decoded_token = keycloak_oidc.decode_token(oidc_profile["access_token"]) keycloak_oidc.decode_token = Mock() keycloak_oidc.decode_token.side_effect = [ ExpiredSignatureError("access token token has expired"), decoded_token, ] user = _authenticate_user(request_factory) oidc_profile = keycloak_oidc.login() keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is not None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_failure( keycloak_oidc, request_factory ): """ Checks access token renewal failure using refresh token. """ + + # authenticate user + user = _authenticate_user(request_factory) + assert user is not None + # OIDC profile should be in cache + cache_key = oidc_profile_cache_key(keycloak_oidc, user.id) + assert cache.get(cache_key) is not None + + # simulate terminated OIDC session keycloak_oidc.decode_token = Mock() keycloak_oidc.decode_token.side_effect = ExpiredSignatureError( "access token token has expired" ) - keycloak_oidc.refresh_token.side_effect = Exception("OIDC session has expired") - user = _authenticate_user(request_factory) + kc_error_dict = { + "error": "invalid_grant", + "error_description": "Session not active", + } + keycloak_oidc.refresh_token.side_effect = KeycloakError( + error_message=json.dumps(kc_error_dict).encode(), response_code=400 + ) + + backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend" + assert user.backend == backend_path + backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) + + # try to authenticate user again from its id and cached OIDC profile + user = get_backends()[backend_idx].get_user(user.id) + # it should have tried to refresh token oidc_profile = keycloak_oidc.login() keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) + # authentication failed assert user is None + # invalid OIDC profile should have been removed from cache + assert cache.get(cache_key) is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(keycloak_oidc, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ realm_permission = "swh.some-permission" client_permission = "webapp.some-permission" keycloak_oidc.realm_permissions = [realm_permission] keycloak_oidc.client_permissions = [client_permission] user = _authenticate_user(request_factory) assert user.has_perm(realm_permission) assert user.has_perm(client_permission) assert user.get_all_permissions() == {realm_permission, client_permission} assert user.get_group_permissions() == {realm_permission, client_permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(keycloak_oidc, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] access_token = oidc_profile["access_token"] decoded_token = keycloak_oidc.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) _check_authenticated_user(user, decoded_token, keycloak_oidc) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_oidc, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() # simulate a failed authentication with a bearer token in expected format keycloak_oidc.set_auth_success(False) refresh_token = oidc_profile["refresh_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_oidc, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid authorization header value. """ url = reverse("api-test") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_permissions( keycloak_oidc, api_request_factory ): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ realm_permission = "swh.some-permission" client_permission = "webapp.some-permission" keycloak_oidc.realm_permissions = [realm_permission] keycloak_oidc.client_permissions = [client_permission] drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() refresh_token = oidc_profile["refresh_token"] url = reverse("api-test") request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) assert user.has_perm(realm_permission) assert user.has_perm(client_permission) assert user.get_all_permissions() == {realm_permission, client_permission} assert user.get_group_permissions() == {realm_permission, client_permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo")