diff --git a/swh/auth/django/middlewares.py b/swh/auth/django/middlewares.py new file mode 100644 --- /dev/null +++ b/swh/auth/django/middlewares.py @@ -0,0 +1,68 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from django.conf import settings +from django.contrib.auth import BACKEND_SESSION_KEY +from django.http.response import HttpResponseRedirect + +from swh.auth.django.utils import reverse + + +class OIDCSessionExpiredMiddleware: + """ + Middleware for checking OpenID Connect user session expiration. + + That middleware detects when a user previously logged in using + the OpenID Connect authentication backend got his session expired. + + In that case it will perform a redirection to a django view whose + name must be set in the ``SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW`` + django setting (typically a logout view). + + The following query parameter will be set for that view: + + * ``next_path``: requested URL before the detection of the session expiration + * ``remote_user``: indicates that the user was previously authenticated with OIDC + + """ + + def __init__(self, get_response=None): + self.get_response = get_response + self.redirect_view = getattr( + settings, "SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW", None + ) + if self.redirect_view is None: + raise ValueError( + "SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW django setting " + "is mandatory to instantiate OIDCSessionExpiredMiddleware class" + ) + self.exempted_urls = [ + reverse(v) + for v in ( + self.redirect_view, + "oidc-login", + "oidc-login-complete", + "oidc-logout", + ) + ] + + def __call__(self, request): + if ( + request.method != "GET" + or request.user.is_authenticated + or BACKEND_SESSION_KEY not in request.session + or "OIDC" not in request.session[BACKEND_SESSION_KEY] + or request.path in self.exempted_urls + ): + return self.get_response(request) + + # At that point, we know that a OIDC user was previously logged in + # and his session has expired. + # Redirect to a view specified in django settings. + next_path = request.get_full_path() + logout_url = reverse( + self.redirect_view, query_params={"next_path": next_path, "remote_user": 1} + ) + return HttpResponseRedirect(logout_url) diff --git a/swh/auth/tests/django/app/apptest/settings.py b/swh/auth/tests/django/app/apptest/settings.py --- a/swh/auth/tests/django/app/apptest/settings.py +++ b/swh/auth/tests/django/app/apptest/settings.py @@ -20,6 +20,7 @@ "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", + "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", ] @@ -39,6 +40,7 @@ SWH_AUTH_SERVER_URL = SERVER_URL SWH_AUTH_REALM_NAME = REALM_NAME SWH_AUTH_CLIENT_ID = CLIENT_ID +SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout" REST_FRAMEWORK = { "DEFAULT_RENDERER_CLASSES": ("rest_framework.renderers.JSONRenderer",), diff --git a/swh/auth/tests/django/app/apptest/urls.py b/swh/auth/tests/django/app/apptest/urls.py --- a/swh/auth/tests/django/app/apptest/urls.py +++ b/swh/auth/tests/django/app/apptest/urls.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information from django.conf.urls import url +from django.contrib.auth.views import LogoutView from django.http import HttpResponse from rest_framework.decorators import api_view from rest_framework.response import Response @@ -23,4 +24,5 @@ urlpatterns = [ url(r"^$", _root_view, name="root"), url(r"^api/test$", _api_view_test, name="api-test"), + url(r"^logout/$", LogoutView.as_view(), name="logout"), ] + auth_urlpatterns diff --git a/swh/auth/tests/django/test_middlewares.py b/swh/auth/tests/django/test_middlewares.py new file mode 100644 --- /dev/null +++ b/swh/auth/tests/django/test_middlewares.py @@ -0,0 +1,71 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from django.core.cache import cache +from django.test import modify_settings, override_settings +import pytest + +from swh.auth.django.utils import oidc_profile_cache_key, reverse + + +@pytest.mark.django_db +@override_settings(SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW=None) +def test_oidc_session_expired_middleware_missing_setting(client, keycloak_oidc): + client.login(code="", code_verifier="", redirect_uri="") + keycloak_oidc.authorization_code.assert_called() + + url = reverse("root") + + with pytest.raises(ValueError, match="setting is mandatory"): + client.get(url) + + +@pytest.mark.django_db +@modify_settings( + MIDDLEWARE={"remove": ["swh.auth.django.middlewares.OIDCSessionExpiredMiddleware"]} +) +def test_oidc_session_expired_middleware_disabled(client, keycloak_oidc): + # authenticate user + + client.login(code="", code_verifier="", redirect_uri="") + keycloak_oidc.authorization_code.assert_called() + + url = reverse("root") + + # visit url first to get user from response + response = client.get(url) + assert response.status_code == 200 + + # simulate OIDC session expiration + cache.delete(oidc_profile_cache_key(keycloak_oidc, response.wsgi_request.user.id)) + + # no redirection when session has expired + response = client.get(url) + assert response.status_code == 200 + + +@pytest.mark.django_db +def test_oidc_session_expired_middleware_enabled(client, keycloak_oidc): + # authenticate user + client.login(code="", code_verifier="", redirect_uri="") + keycloak_oidc.authorization_code.assert_called() + + url = reverse("root") + + # visit url first to get user from response + response = client.get(url) + assert response.status_code == 200 + + # simulate OIDC session expiration + cache.delete(oidc_profile_cache_key(keycloak_oidc, response.wsgi_request.user.id)) + + # should redirect to logout page + response = client.get(url) + assert response.status_code == 302 + silent_refresh_url = reverse( + "logout", query_params={"next_path": url, "remote_user": 1} + ) + assert response["location"] == silent_refresh_url