diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -57,7 +57,15 @@ def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: # decode JWT token - decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) + try: + access_token = oidc_profile["access_token"] + decoded_token = _oidc_client.decode_token(access_token) + # access token has expired or is invalid + except Exception: + # get a new access token from authentication provider + oidc_profile = _oidc_client.refresh_token(oidc_profile["refresh_token"]) + # decode access token + decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = _oidc_user_from_decoded_token(decoded_token) @@ -77,6 +85,16 @@ if hasattr(user, key): setattr(user, key, val) + # put OIDC profile in cache or update it after token renewal + cache_key = f"oidc_user_{user.id}" + if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]: + # set cache key TTL as refresh token expiration time + assert user.refresh_expires_at + ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) + + # save oidc_profile in cache + cache.set(cache_key, oidc_profile, timeout=max(0, ttl)) + return user @@ -95,12 +113,6 @@ # create Django user user = _oidc_user_from_profile(oidc_profile) - # set cache key TTL as access token expiration time - assert user.expires_at - ttl = int(user.expires_at.timestamp() - timezone.now().timestamp()) - - # save oidc_profile in cache - cache.set(f"oidc_user_{user.id}", oidc_profile, timeout=max(0, ttl)) except Exception as e: sentry_sdk.capture_exception(e) diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py --- a/swh/web/auth/middlewares.py +++ b/swh/web/auth/middlewares.py @@ -9,10 +9,9 @@ from swh.web.common.utils import reverse -class OIDCSessionRefreshMiddleware: +class OIDCSessionExpiredMiddleware: """ - Middleware for silently refreshing on OpenID Connect session from - the browser and get new access token. + Middleware for checking OIDC user session expiration. """ def __init__(self, get_response=None): @@ -32,13 +31,12 @@ ): return self.get_response(request) - # At that point, we know that a OIDC user was previously logged in. - # Access token has expired so we attempt a silent OIDC session refresh. - # If the latter failed because the session expired, user will be - # redirected to logout page and a link will be offered to login again. - # See implementation of "oidc-login-complete" view for more details. + # At that point, we know that a OIDC user was previously logged in + # and his session has expired. + # User will be redirected to logout page and a link will be offered to + # login again. next_path = request.get_full_path() - redirect_url = reverse( - "oidc-login", query_params={"next_path": next_path, "prompt": "none"} + logout_url = reverse( + "logout", query_params={"next_path": next_path, "remote_user": 1} ) - return HttpResponseRedirect(redirect_url) + return HttpResponseRedirect(logout_url) diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -47,7 +47,6 @@ "state": state, "redirect_uri": redirect_uri, "next_path": request.GET.get("next_path", ""), - "prompt": request.GET.get("prompt", ""), } authorization_url_params = { @@ -55,7 +54,6 @@ "code_challenge": code_challenge, "code_challenge_method": "S256", "scope": scope, - "prompt": request.GET.get("prompt", ""), } oidc_client = get_oidc_client() @@ -101,15 +99,8 @@ """ login_data = _get_login_data(request) next_path = login_data["next_path"] or request.build_absolute_uri("/") - if "error" in request.GET and login_data["prompt"] == "none": - # Silent login failed because OIDC session expired. - # Redirect to logout page and inform user. - logout(request) - logout_url = reverse( - "logout", query_params={"next_path": next_path, "remote_user": 1} - ) - return HttpResponseRedirect(logout_url) - elif "error" in request.GET: + + if "error" in request.GET: raise Exception(request.GET["error"]) _check_login_data(request, login_data) diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -57,7 +57,7 @@ "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", - "swh.web.auth.middlewares.OIDCSessionRefreshMiddleware", + "swh.web.auth.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", "swh.web.common.middlewares.ThrottlingHeadersMiddleware", diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information from datetime import datetime, timedelta +from unittest.mock import Mock import pytest @@ -92,6 +93,67 @@ assert user is None +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_refresh_token_success(mocker, request_factory): + """ + Checks access token renewal success using refresh token. + """ + kc_oidc_mock = mock_keycloak(mocker) + + oidc_profile = sample_data.oidc_profile + decoded_token = kc_oidc_mock.decode_token(oidc_profile["access_token"]) + new_access_token = "new_access_token" + + def _refresh_token(refresh_token): + oidc_profile = dict(sample_data.oidc_profile) + oidc_profile["access_token"] = new_access_token + return oidc_profile + + def _decode_token(access_token): + if access_token != new_access_token: + raise Exception("access token token has expired") + else: + return decoded_token + + kc_oidc_mock.decode_token = Mock() + kc_oidc_mock.decode_token.side_effect = _decode_token + kc_oidc_mock.refresh_token.side_effect = _refresh_token + + user = _authenticate_user(request_factory) + + kc_oidc_mock.refresh_token.assert_called_with( + sample_data.oidc_profile["refresh_token"] + ) + + assert user is not None + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_refresh_token_failure(mocker, request_factory): + """ + Checks access token renewal failure using refresh token. + """ + kc_oidc_mock = mock_keycloak(mocker) + + def _refresh_token(refresh_token): + raise Exception("OIDC session has expired") + + def _decode_token(access_token): + raise Exception("access token token has expired") + + kc_oidc_mock.decode_token = Mock() + kc_oidc_mock.decode_token.side_effect = _decode_token + kc_oidc_mock.refresh_token.side_effect = _refresh_token + + user = _authenticate_user(request_factory) + + kc_oidc_mock.refresh_token.assert_called_with( + sample_data.oidc_profile["refresh_token"] + ) + + assert user is None + + @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory): """ diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py --- a/swh/web/tests/auth/test_middlewares.py +++ b/swh/web/tests/auth/test_middlewares.py @@ -3,10 +3,10 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime import pytest +from django.core.cache import cache from django.test import modify_settings from swh.web.common.utils import reverse @@ -17,31 +17,44 @@ @pytest.mark.django_db @modify_settings( - MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionRefreshMiddleware"]} + MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]} ) -def test_oidc_session_refresh_middleware_disabled(client, mocker): - # authenticate but make session expires immediately - kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp())) +def test_oidc_session_expired_middleware_disabled(client, mocker): + # authenticate user + kc_oidc_mock = mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") - # no redirection for silent refresh + + # visit url first to get user from response + response = check_html_get_response(client, url, status_code=200) + + # simulate OIDC session expiration + cache.delete(f"oidc_user_{response.wsgi_request.user.id}") + + # no redirection when session has expired check_html_get_response(client, url, status_code=200) @pytest.mark.django_db -def test_oidc_session_refresh_middleware_enabled(client, mocker): - # authenticate but make session expires immediately - kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp())) +def test_oidc_session_expired_middleware_enabled(client, mocker): + # authenticate user + kc_oidc_mock = mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") - # should redirect for silent session refresh + # visit url first to get user from response + response = check_html_get_response(client, url, status_code=200) + + # simulate OIDC session expiration + cache.delete(f"oidc_user_{response.wsgi_request.user.id}") + + # should redirect to logout page resp = check_html_get_response(client, url, status_code=302) silent_refresh_url = reverse( - "oidc-login", query_params={"next_path": url, "prompt": "none"} + "logout", query_params={"next_path": url, "remote_user": 1} ) assert resp["location"] == silent_refresh_url diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -191,7 +191,6 @@ "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", - "prompt": "", } session.save() @@ -221,7 +220,6 @@ "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", - "prompt": "", } session.save() @@ -255,7 +253,6 @@ "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", - "prompt": "", } session.save() @@ -302,47 +299,6 @@ assert isinstance(request.user, AnonymousUser) -@pytest.mark.django_db -def test_oidc_silent_refresh_failure(client, mocker): - # mock Keycloak client - mock_keycloak(mocker) - - next_path = reverse("swh-web-homepage") - - # silent session refresh initialization - login_url = reverse( - "oidc-login", query_params={"next_path": next_path, "prompt": "none"} - ) - response = check_http_get_response(client, login_url, status_code=302) - request = response.wsgi_request - - login_data = request.session["login_data"] - - # check prompt value has been registered in user session - assert "prompt" in login_data - assert login_data["prompt"] == "none" - - # simulate a failed silent session refresh - session_state = str(uuid.uuid4()) - - login_complete_url = reverse( - "oidc-login-complete", - query_params={ - "error": "login_required", - "state": login_data["state"], - "session_state": session_state, - }, - ) - - # login process finalization, should redirect to logout page - response = check_http_get_response(client, login_complete_url, status_code=302) - request = response.wsgi_request - logout_url = reverse( - "logout", query_params={"next_path": next_path, "remote_user": 1} - ) - assert response["location"] == logout_url - - def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default