Page MenuHomeSoftware Heritage

D4697.diff
No OneTemporary

D4697.diff

diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
--- a/swh/web/auth/backends.py
+++ b/swh/web/auth/backends.py
@@ -57,7 +57,15 @@
def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser:
# decode JWT token
- decoded_token = _oidc_client.decode_token(oidc_profile["access_token"])
+ try:
+ access_token = oidc_profile["access_token"]
+ decoded_token = _oidc_client.decode_token(access_token)
+ # access token has expired or is invalid
+ except Exception:
+ # get a new access token from authentication provider
+ oidc_profile = _oidc_client.refresh_token(oidc_profile["refresh_token"])
+ # decode access token
+ decoded_token = _oidc_client.decode_token(oidc_profile["access_token"])
# create OIDCUser from decoded token
user = _oidc_user_from_decoded_token(decoded_token)
@@ -77,6 +85,16 @@
if hasattr(user, key):
setattr(user, key, val)
+ # put OIDC profile in cache or update it after token renewal
+ cache_key = f"oidc_user_{user.id}"
+ if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]:
+ # set cache key TTL as refresh token expiration time
+ assert user.refresh_expires_at
+ ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp())
+
+ # save oidc_profile in cache
+ cache.set(cache_key, oidc_profile, timeout=max(0, ttl))
+
return user
@@ -95,12 +113,6 @@
# create Django user
user = _oidc_user_from_profile(oidc_profile)
- # set cache key TTL as access token expiration time
- assert user.expires_at
- ttl = int(user.expires_at.timestamp() - timezone.now().timestamp())
-
- # save oidc_profile in cache
- cache.set(f"oidc_user_{user.id}", oidc_profile, timeout=max(0, ttl))
except Exception as e:
sentry_sdk.capture_exception(e)
diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py
--- a/swh/web/auth/middlewares.py
+++ b/swh/web/auth/middlewares.py
@@ -9,10 +9,9 @@
from swh.web.common.utils import reverse
-class OIDCSessionRefreshMiddleware:
+class OIDCSessionExpiredMiddleware:
"""
- Middleware for silently refreshing on OpenID Connect session from
- the browser and get new access token.
+ Middleware for checking OIDC user session expiration.
"""
def __init__(self, get_response=None):
@@ -32,13 +31,12 @@
):
return self.get_response(request)
- # At that point, we know that a OIDC user was previously logged in.
- # Access token has expired so we attempt a silent OIDC session refresh.
- # If the latter failed because the session expired, user will be
- # redirected to logout page and a link will be offered to login again.
- # See implementation of "oidc-login-complete" view for more details.
+ # At that point, we know that a OIDC user was previously logged in
+ # and his session has expired.
+ # User will be redirected to logout page and a link will be offered to
+ # login again.
next_path = request.get_full_path()
- redirect_url = reverse(
- "oidc-login", query_params={"next_path": next_path, "prompt": "none"}
+ logout_url = reverse(
+ "logout", query_params={"next_path": next_path, "remote_user": 1}
)
- return HttpResponseRedirect(redirect_url)
+ return HttpResponseRedirect(logout_url)
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -47,7 +47,6 @@
"state": state,
"redirect_uri": redirect_uri,
"next_path": request.GET.get("next_path", ""),
- "prompt": request.GET.get("prompt", ""),
}
authorization_url_params = {
@@ -55,7 +54,6 @@
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"scope": scope,
- "prompt": request.GET.get("prompt", ""),
}
oidc_client = get_oidc_client()
@@ -101,15 +99,8 @@
"""
login_data = _get_login_data(request)
next_path = login_data["next_path"] or request.build_absolute_uri("/")
- if "error" in request.GET and login_data["prompt"] == "none":
- # Silent login failed because OIDC session expired.
- # Redirect to logout page and inform user.
- logout(request)
- logout_url = reverse(
- "logout", query_params={"next_path": next_path, "remote_user": 1}
- )
- return HttpResponseRedirect(logout_url)
- elif "error" in request.GET:
+
+ if "error" in request.GET:
raise Exception(request.GET["error"])
_check_login_data(request, login_data)
diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py
--- a/swh/web/settings/common.py
+++ b/swh/web/settings/common.py
@@ -57,7 +57,7 @@
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
- "swh.web.auth.middlewares.OIDCSessionRefreshMiddleware",
+ "swh.web.auth.middlewares.OIDCSessionExpiredMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
"swh.web.common.middlewares.ThrottlingHeadersMiddleware",
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
--- a/swh/web/tests/auth/test_backends.py
+++ b/swh/web/tests/auth/test_backends.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
+from unittest.mock import Mock
import pytest
@@ -92,6 +93,67 @@
assert user is None
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_refresh_token_success(mocker, request_factory):
+ """
+ Checks access token renewal success using refresh token.
+ """
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ oidc_profile = sample_data.oidc_profile
+ decoded_token = kc_oidc_mock.decode_token(oidc_profile["access_token"])
+ new_access_token = "new_access_token"
+
+ def _refresh_token(refresh_token):
+ oidc_profile = dict(sample_data.oidc_profile)
+ oidc_profile["access_token"] = new_access_token
+ return oidc_profile
+
+ def _decode_token(access_token):
+ if access_token != new_access_token:
+ raise Exception("access token token has expired")
+ else:
+ return decoded_token
+
+ kc_oidc_mock.decode_token = Mock()
+ kc_oidc_mock.decode_token.side_effect = _decode_token
+ kc_oidc_mock.refresh_token.side_effect = _refresh_token
+
+ user = _authenticate_user(request_factory)
+
+ kc_oidc_mock.refresh_token.assert_called_with(
+ sample_data.oidc_profile["refresh_token"]
+ )
+
+ assert user is not None
+
+
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_refresh_token_failure(mocker, request_factory):
+ """
+ Checks access token renewal failure using refresh token.
+ """
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ def _refresh_token(refresh_token):
+ raise Exception("OIDC session has expired")
+
+ def _decode_token(access_token):
+ raise Exception("access token token has expired")
+
+ kc_oidc_mock.decode_token = Mock()
+ kc_oidc_mock.decode_token.side_effect = _decode_token
+ kc_oidc_mock.refresh_token.side_effect = _refresh_token
+
+ user = _authenticate_user(request_factory)
+
+ kc_oidc_mock.refresh_token.assert_called_with(
+ sample_data.oidc_profile["refresh_token"]
+ )
+
+ assert user is None
+
+
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory):
"""
diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py
--- a/swh/web/tests/auth/test_middlewares.py
+++ b/swh/web/tests/auth/test_middlewares.py
@@ -3,10 +3,10 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from datetime import datetime
import pytest
+from django.core.cache import cache
from django.test import modify_settings
from swh.web.common.utils import reverse
@@ -17,31 +17,44 @@
@pytest.mark.django_db
@modify_settings(
- MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionRefreshMiddleware"]}
+ MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]}
)
-def test_oidc_session_refresh_middleware_disabled(client, mocker):
- # authenticate but make session expires immediately
- kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp()))
+def test_oidc_session_expired_middleware_disabled(client, mocker):
+ # authenticate user
+ kc_oidc_mock = mock_keycloak(mocker)
client.login(code="", code_verifier="", redirect_uri="")
kc_oidc_mock.authorization_code.assert_called()
url = reverse("swh-web-homepage")
- # no redirection for silent refresh
+
+ # visit url first to get user from response
+ response = check_html_get_response(client, url, status_code=200)
+
+ # simulate OIDC session expiration
+ cache.delete(f"oidc_user_{response.wsgi_request.user.id}")
+
+ # no redirection when session has expired
check_html_get_response(client, url, status_code=200)
@pytest.mark.django_db
-def test_oidc_session_refresh_middleware_enabled(client, mocker):
- # authenticate but make session expires immediately
- kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp()))
+def test_oidc_session_expired_middleware_enabled(client, mocker):
+ # authenticate user
+ kc_oidc_mock = mock_keycloak(mocker)
client.login(code="", code_verifier="", redirect_uri="")
kc_oidc_mock.authorization_code.assert_called()
url = reverse("swh-web-homepage")
- # should redirect for silent session refresh
+ # visit url first to get user from response
+ response = check_html_get_response(client, url, status_code=200)
+
+ # simulate OIDC session expiration
+ cache.delete(f"oidc_user_{response.wsgi_request.user.id}")
+
+ # should redirect to logout page
resp = check_html_get_response(client, url, status_code=302)
silent_refresh_url = reverse(
- "oidc-login", query_params={"next_path": url, "prompt": "none"}
+ "logout", query_params={"next_path": url, "remote_user": 1}
)
assert resp["location"] == silent_refresh_url
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
--- a/swh/web/tests/auth/test_views.py
+++ b/swh/web/tests/auth/test_views.py
@@ -191,7 +191,6 @@
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
- "prompt": "",
}
session.save()
@@ -221,7 +220,6 @@
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
- "prompt": "",
}
session.save()
@@ -255,7 +253,6 @@
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
- "prompt": "",
}
session.save()
@@ -302,47 +299,6 @@
assert isinstance(request.user, AnonymousUser)
-@pytest.mark.django_db
-def test_oidc_silent_refresh_failure(client, mocker):
- # mock Keycloak client
- mock_keycloak(mocker)
-
- next_path = reverse("swh-web-homepage")
-
- # silent session refresh initialization
- login_url = reverse(
- "oidc-login", query_params={"next_path": next_path, "prompt": "none"}
- )
- response = check_http_get_response(client, login_url, status_code=302)
- request = response.wsgi_request
-
- login_data = request.session["login_data"]
-
- # check prompt value has been registered in user session
- assert "prompt" in login_data
- assert login_data["prompt"] == "none"
-
- # simulate a failed silent session refresh
- session_state = str(uuid.uuid4())
-
- login_complete_url = reverse(
- "oidc-login-complete",
- query_params={
- "error": "login_required",
- "state": login_data["state"],
- "session_state": session_state,
- },
- )
-
- # login process finalization, should redirect to logout page
- response = check_http_get_response(client, login_complete_url, status_code=302)
- request = response.wsgi_request
- logout_url = reverse(
- "logout", query_params={"next_path": next_path, "remote_user": 1}
- )
- assert response["location"] == logout_url
-
-
def test_view_rendering_when_user_not_set_in_request(request_factory):
request = request_factory.get("/")
# Django RequestFactory do not set any user by default

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 5:42 PM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224222

Event Timeline