Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9347566
D4697.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D4697.diff
View Options
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
--- a/swh/web/auth/backends.py
+++ b/swh/web/auth/backends.py
@@ -57,7 +57,15 @@
def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser:
# decode JWT token
- decoded_token = _oidc_client.decode_token(oidc_profile["access_token"])
+ try:
+ access_token = oidc_profile["access_token"]
+ decoded_token = _oidc_client.decode_token(access_token)
+ # access token has expired or is invalid
+ except Exception:
+ # get a new access token from authentication provider
+ oidc_profile = _oidc_client.refresh_token(oidc_profile["refresh_token"])
+ # decode access token
+ decoded_token = _oidc_client.decode_token(oidc_profile["access_token"])
# create OIDCUser from decoded token
user = _oidc_user_from_decoded_token(decoded_token)
@@ -77,6 +85,16 @@
if hasattr(user, key):
setattr(user, key, val)
+ # put OIDC profile in cache or update it after token renewal
+ cache_key = f"oidc_user_{user.id}"
+ if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]:
+ # set cache key TTL as refresh token expiration time
+ assert user.refresh_expires_at
+ ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp())
+
+ # save oidc_profile in cache
+ cache.set(cache_key, oidc_profile, timeout=max(0, ttl))
+
return user
@@ -95,12 +113,6 @@
# create Django user
user = _oidc_user_from_profile(oidc_profile)
- # set cache key TTL as access token expiration time
- assert user.expires_at
- ttl = int(user.expires_at.timestamp() - timezone.now().timestamp())
-
- # save oidc_profile in cache
- cache.set(f"oidc_user_{user.id}", oidc_profile, timeout=max(0, ttl))
except Exception as e:
sentry_sdk.capture_exception(e)
diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py
--- a/swh/web/auth/middlewares.py
+++ b/swh/web/auth/middlewares.py
@@ -9,10 +9,9 @@
from swh.web.common.utils import reverse
-class OIDCSessionRefreshMiddleware:
+class OIDCSessionExpiredMiddleware:
"""
- Middleware for silently refreshing on OpenID Connect session from
- the browser and get new access token.
+ Middleware for checking OIDC user session expiration.
"""
def __init__(self, get_response=None):
@@ -32,13 +31,12 @@
):
return self.get_response(request)
- # At that point, we know that a OIDC user was previously logged in.
- # Access token has expired so we attempt a silent OIDC session refresh.
- # If the latter failed because the session expired, user will be
- # redirected to logout page and a link will be offered to login again.
- # See implementation of "oidc-login-complete" view for more details.
+ # At that point, we know that a OIDC user was previously logged in
+ # and his session has expired.
+ # User will be redirected to logout page and a link will be offered to
+ # login again.
next_path = request.get_full_path()
- redirect_url = reverse(
- "oidc-login", query_params={"next_path": next_path, "prompt": "none"}
+ logout_url = reverse(
+ "logout", query_params={"next_path": next_path, "remote_user": 1}
)
- return HttpResponseRedirect(redirect_url)
+ return HttpResponseRedirect(logout_url)
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -47,7 +47,6 @@
"state": state,
"redirect_uri": redirect_uri,
"next_path": request.GET.get("next_path", ""),
- "prompt": request.GET.get("prompt", ""),
}
authorization_url_params = {
@@ -55,7 +54,6 @@
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"scope": scope,
- "prompt": request.GET.get("prompt", ""),
}
oidc_client = get_oidc_client()
@@ -101,15 +99,8 @@
"""
login_data = _get_login_data(request)
next_path = login_data["next_path"] or request.build_absolute_uri("/")
- if "error" in request.GET and login_data["prompt"] == "none":
- # Silent login failed because OIDC session expired.
- # Redirect to logout page and inform user.
- logout(request)
- logout_url = reverse(
- "logout", query_params={"next_path": next_path, "remote_user": 1}
- )
- return HttpResponseRedirect(logout_url)
- elif "error" in request.GET:
+
+ if "error" in request.GET:
raise Exception(request.GET["error"])
_check_login_data(request, login_data)
diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py
--- a/swh/web/settings/common.py
+++ b/swh/web/settings/common.py
@@ -57,7 +57,7 @@
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
- "swh.web.auth.middlewares.OIDCSessionRefreshMiddleware",
+ "swh.web.auth.middlewares.OIDCSessionExpiredMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
"swh.web.common.middlewares.ThrottlingHeadersMiddleware",
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
--- a/swh/web/tests/auth/test_backends.py
+++ b/swh/web/tests/auth/test_backends.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
+from unittest.mock import Mock
import pytest
@@ -92,6 +93,67 @@
assert user is None
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_refresh_token_success(mocker, request_factory):
+ """
+ Checks access token renewal success using refresh token.
+ """
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ oidc_profile = sample_data.oidc_profile
+ decoded_token = kc_oidc_mock.decode_token(oidc_profile["access_token"])
+ new_access_token = "new_access_token"
+
+ def _refresh_token(refresh_token):
+ oidc_profile = dict(sample_data.oidc_profile)
+ oidc_profile["access_token"] = new_access_token
+ return oidc_profile
+
+ def _decode_token(access_token):
+ if access_token != new_access_token:
+ raise Exception("access token token has expired")
+ else:
+ return decoded_token
+
+ kc_oidc_mock.decode_token = Mock()
+ kc_oidc_mock.decode_token.side_effect = _decode_token
+ kc_oidc_mock.refresh_token.side_effect = _refresh_token
+
+ user = _authenticate_user(request_factory)
+
+ kc_oidc_mock.refresh_token.assert_called_with(
+ sample_data.oidc_profile["refresh_token"]
+ )
+
+ assert user is not None
+
+
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_refresh_token_failure(mocker, request_factory):
+ """
+ Checks access token renewal failure using refresh token.
+ """
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ def _refresh_token(refresh_token):
+ raise Exception("OIDC session has expired")
+
+ def _decode_token(access_token):
+ raise Exception("access token token has expired")
+
+ kc_oidc_mock.decode_token = Mock()
+ kc_oidc_mock.decode_token.side_effect = _decode_token
+ kc_oidc_mock.refresh_token.side_effect = _refresh_token
+
+ user = _authenticate_user(request_factory)
+
+ kc_oidc_mock.refresh_token.assert_called_with(
+ sample_data.oidc_profile["refresh_token"]
+ )
+
+ assert user is None
+
+
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory):
"""
diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py
--- a/swh/web/tests/auth/test_middlewares.py
+++ b/swh/web/tests/auth/test_middlewares.py
@@ -3,10 +3,10 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from datetime import datetime
import pytest
+from django.core.cache import cache
from django.test import modify_settings
from swh.web.common.utils import reverse
@@ -17,31 +17,44 @@
@pytest.mark.django_db
@modify_settings(
- MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionRefreshMiddleware"]}
+ MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]}
)
-def test_oidc_session_refresh_middleware_disabled(client, mocker):
- # authenticate but make session expires immediately
- kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp()))
+def test_oidc_session_expired_middleware_disabled(client, mocker):
+ # authenticate user
+ kc_oidc_mock = mock_keycloak(mocker)
client.login(code="", code_verifier="", redirect_uri="")
kc_oidc_mock.authorization_code.assert_called()
url = reverse("swh-web-homepage")
- # no redirection for silent refresh
+
+ # visit url first to get user from response
+ response = check_html_get_response(client, url, status_code=200)
+
+ # simulate OIDC session expiration
+ cache.delete(f"oidc_user_{response.wsgi_request.user.id}")
+
+ # no redirection when session has expired
check_html_get_response(client, url, status_code=200)
@pytest.mark.django_db
-def test_oidc_session_refresh_middleware_enabled(client, mocker):
- # authenticate but make session expires immediately
- kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp()))
+def test_oidc_session_expired_middleware_enabled(client, mocker):
+ # authenticate user
+ kc_oidc_mock = mock_keycloak(mocker)
client.login(code="", code_verifier="", redirect_uri="")
kc_oidc_mock.authorization_code.assert_called()
url = reverse("swh-web-homepage")
- # should redirect for silent session refresh
+ # visit url first to get user from response
+ response = check_html_get_response(client, url, status_code=200)
+
+ # simulate OIDC session expiration
+ cache.delete(f"oidc_user_{response.wsgi_request.user.id}")
+
+ # should redirect to logout page
resp = check_html_get_response(client, url, status_code=302)
silent_refresh_url = reverse(
- "oidc-login", query_params={"next_path": url, "prompt": "none"}
+ "logout", query_params={"next_path": url, "remote_user": 1}
)
assert resp["location"] == silent_refresh_url
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
--- a/swh/web/tests/auth/test_views.py
+++ b/swh/web/tests/auth/test_views.py
@@ -191,7 +191,6 @@
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
- "prompt": "",
}
session.save()
@@ -221,7 +220,6 @@
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
- "prompt": "",
}
session.save()
@@ -255,7 +253,6 @@
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
- "prompt": "",
}
session.save()
@@ -302,47 +299,6 @@
assert isinstance(request.user, AnonymousUser)
-@pytest.mark.django_db
-def test_oidc_silent_refresh_failure(client, mocker):
- # mock Keycloak client
- mock_keycloak(mocker)
-
- next_path = reverse("swh-web-homepage")
-
- # silent session refresh initialization
- login_url = reverse(
- "oidc-login", query_params={"next_path": next_path, "prompt": "none"}
- )
- response = check_http_get_response(client, login_url, status_code=302)
- request = response.wsgi_request
-
- login_data = request.session["login_data"]
-
- # check prompt value has been registered in user session
- assert "prompt" in login_data
- assert login_data["prompt"] == "none"
-
- # simulate a failed silent session refresh
- session_state = str(uuid.uuid4())
-
- login_complete_url = reverse(
- "oidc-login-complete",
- query_params={
- "error": "login_required",
- "state": login_data["state"],
- "session_state": session_state,
- },
- )
-
- # login process finalization, should redirect to logout page
- response = check_http_get_response(client, login_complete_url, status_code=302)
- request = response.wsgi_request
- logout_url = reverse(
- "logout", query_params={"next_path": next_path, "remote_user": 1}
- )
- assert response["location"] == logout_url
-
-
def test_view_rendering_when_user_not_set_in_request(request_factory):
request = request_factory.get("/")
# Django RequestFactory do not set any user by default
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 5:42 PM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224222
Attached To
D4697: auth: Implement access token renewal in OIDC Authorization Code backend
Event Timeline
Log In to Comment