diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py new file mode 100644 index 00000000..0664c368 --- /dev/null +++ b/swh/web/auth/middlewares.py @@ -0,0 +1,43 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from django.contrib.auth import BACKEND_SESSION_KEY +from django.http.response import HttpResponseRedirect + +from swh.web.common.utils import reverse + + +class OIDCSessionRefreshMiddleware: + """ + Middleware for silently refreshing on OpenID Connect session from + the browser and get new access token. + """ + def __init__(self, get_response=None): + self.get_response = get_response + self.exempted_urls = [ + reverse(v) for v in ('logout', + 'oidc-login', + 'oidc-login-complete', + 'oidc-logout') + ] + + def __call__(self, request): + if (request.method != 'GET' + or request.user.is_authenticated + or BACKEND_SESSION_KEY not in request.session + or 'OIDC' not in request.session[BACKEND_SESSION_KEY] + or request.path in self.exempted_urls): + return self.get_response(request) + + # At that point, we know that a OIDC user was previously logged in. + # Access token has expired so we attempt a silent OIDC session refresh. + # If the latter failed because the session expired, user will be + # redirected to logout page and a link will be offered to login again. + # See implementation of "oidc-login-complete" view for more details. + next_path = request.get_full_path() + redirect_url = reverse('oidc-login', + query_params={'next_path': next_path, + 'prompt': 'none'}) + return HttpResponseRedirect(redirect_url) diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index 12b9aaca..85c211d3 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,120 +1,133 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import uuid from typing import cast from django.conf.urls import url from django.core.cache import cache from django.contrib.auth import authenticate, login, logout from django.http import HttpRequest -from django.http.response import HttpResponse, HttpResponseRedirect +from django.http.response import ( + HttpResponse, HttpResponseRedirect, HttpResponseServerError +) from swh.web.auth.models import OIDCUser from swh.web.auth.utils import gen_oidc_pkce_codes, get_oidc_client from swh.web.common.exc import handle_view_exception, BadInputExc from swh.web.common.utils import reverse def oidc_login(request: HttpRequest) -> HttpResponse: """ Django view to initiate login process using OpenID Connect. """ # generate a CSRF token state = str(uuid.uuid4()) redirect_uri = reverse('oidc-login-complete', request=request) code_verifier, code_challenge = gen_oidc_pkce_codes() request.session['login_data'] = { 'code_verifier': code_verifier, 'state': state, 'redirect_uri': redirect_uri, - 'next_path': request.GET.get('next_path'), + 'next_path': request.GET.get('next_path', ''), + 'prompt': request.GET.get('prompt', ''), } authorization_url_params = { 'state': state, 'code_challenge': code_challenge, 'code_challenge_method': 'S256', 'scope': 'openid', + 'prompt': request.GET.get('prompt', ''), } try: oidc_client = get_oidc_client() authorization_url = oidc_client.authorization_url( redirect_uri, **authorization_url_params) return HttpResponseRedirect(authorization_url) except Exception as e: return handle_view_exception(request, e) def oidc_login_complete(request: HttpRequest) -> HttpResponse: """ Django view to finalize login process using OpenID Connect. """ try: if 'login_data' not in request.session: raise Exception('Login process has not been initialized.') - if 'code' not in request.GET and 'state' not in request.GET: + login_data = request.session['login_data'] + next_path = login_data['next_path'] or request.build_absolute_uri('/') + + if 'error' in request.GET: + if login_data['prompt'] == 'none': + # Silent login failed because OIDC session expired. + # Redirect to logout page and inform user. + logout(request) + logout_url = reverse('logout', + query_params={'next_path': next_path, + 'remote_user': 1}) + return HttpResponseRedirect(logout_url) + return HttpResponseServerError(request.GET['error']) + + if 'code' not in request.GET or 'state' not in request.GET: raise BadInputExc('Missing query parameters for authentication.') # get CSRF token returned by OIDC server state = request.GET['state'] - login_data = request.session['login_data'] - if state != login_data['state']: raise BadInputExc('Wrong CSRF token, aborting login process.') user = authenticate(request=request, code=request.GET['code'], code_verifier=login_data['code_verifier'], redirect_uri=login_data['redirect_uri']) if user is None: raise Exception('User authentication failed.') login(request, user) - redirect_url = (login_data['next_path'] or - request.build_absolute_uri('/')) - - return HttpResponseRedirect(redirect_url) + return HttpResponseRedirect(next_path) except Exception as e: return handle_view_exception(request, e) def oidc_logout(request: HttpRequest) -> HttpResponse: """ Django view to logout using OpenID Connect. """ try: user = request.user logout(request) if hasattr(user, 'refresh_token'): oidc_client = get_oidc_client() user = cast(OIDCUser, user) refresh_token = cast(str, user.refresh_token) # end OpenID Connect session oidc_client.logout(refresh_token) # remove user data from cache cache.delete(f'oidc_user_{user.id}') logout_url = reverse('logout', query_params={'remote_user': 1}) return HttpResponseRedirect(request.build_absolute_uri(logout_url)) except Exception as e: return handle_view_exception(request, e) urlpatterns = [ url(r'^oidc/login/$', oidc_login, name='oidc-login'), url(r'^oidc/login-complete/$', oidc_login_complete, name='oidc-login-complete'), url(r'^oidc/logout/$', oidc_logout, name='oidc-logout'), ] diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py index 87df8f70..110b6e5b 100644 --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -1,302 +1,303 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django common settings for swh-web. """ import os import sys from typing import Any, Dict from swh.web.config import get_config swh_web_config = get_config() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = swh_web_config['secret_key'] # SECURITY WARNING: don't run with debug turned on in production! DEBUG = swh_web_config['debug'] DEBUG_PROPAGATE_EXCEPTIONS = swh_web_config['debug'] ALLOWED_HOSTS = ['127.0.0.1', 'localhost'] + swh_web_config['allowed_hosts'] # Application definition INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'rest_framework', 'swh.web.common', 'swh.web.api', 'swh.web.browse', 'webpack_loader', 'django_js_reverse', 'corsheaders', ] MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'corsheaders.middleware.CorsMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'swh.web.auth.middlewares.OIDCSessionRefreshMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', 'swh.web.common.middlewares.ThrottlingHeadersMiddleware', ] # Compress all assets (static ones and dynamically generated html) # served by django in a local development environment context. # In a production environment, assets compression will be directly # handled by web servers like apache or nginx. if swh_web_config['serve_assets']: MIDDLEWARE.insert(0, 'django.middleware.gzip.GZipMiddleware') ROOT_URLCONF = 'swh.web.urls' TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [os.path.join(PROJECT_DIR, "../templates")], 'APP_DIRS': True, 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', 'swh.web.common.utils.context_processor' ], 'libraries': { 'swh_templatetags': 'swh.web.common.swh_templatetags', }, }, }, ] DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': swh_web_config['development_db'], } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', # noqa }, { 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', # noqa }, { 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', # noqa }, { 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', # noqa }, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = 'en-us' TIME_ZONE = 'UTC' USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = '/static/' # static folder location when swh-web has been installed with pip STATIC_DIR = os.path.join(sys.prefix, 'share/swh/web/static') if not os.path.exists(STATIC_DIR): # static folder location when developping swh-web STATIC_DIR = os.path.join(PROJECT_DIR, '../../../static') STATICFILES_DIRS = [STATIC_DIR] INTERNAL_IPS = ['127.0.0.1'] throttle_rates = {} http_requests = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'] throttling = swh_web_config['throttling'] for limiter_scope, limiter_conf in throttling['scopes'].items(): if 'default' in limiter_conf['limiter_rate']: throttle_rates[limiter_scope] = limiter_conf['limiter_rate']['default'] # for backward compatibility else: throttle_rates[limiter_scope] = limiter_conf['limiter_rate'] # register sub scopes specific for HTTP request types for http_request in http_requests: if http_request in limiter_conf['limiter_rate']: throttle_rates[limiter_scope + '_' + http_request.lower()] = \ limiter_conf['limiter_rate'][http_request] REST_FRAMEWORK: Dict[str, Any] = { 'DEFAULT_RENDERER_CLASSES': ( 'rest_framework.renderers.JSONRenderer', 'swh.web.api.renderers.YAMLRenderer', 'rest_framework.renderers.TemplateHTMLRenderer' ), 'DEFAULT_THROTTLE_CLASSES': ( 'swh.web.api.throttling.SwhWebRateThrottle', ), 'DEFAULT_THROTTLE_RATES': throttle_rates, 'DEFAULT_AUTHENTICATION_CLASSES': [ 'rest_framework.authentication.SessionAuthentication', 'swh.web.auth.backends.OIDCBearerTokenAuthentication', ], } LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'filters': { 'require_debug_false': { '()': 'django.utils.log.RequireDebugFalse', }, 'require_debug_true': { '()': 'django.utils.log.RequireDebugTrue', }, }, 'formatters': { 'request': { 'format': '[%(asctime)s] [%(levelname)s] %(request)s %(status_code)s', # noqa 'datefmt': "%d/%b/%Y %H:%M:%S" }, 'simple': { 'format': '[%(asctime)s] [%(levelname)s] %(message)s', 'datefmt': "%d/%b/%Y %H:%M:%S" }, 'verbose': { 'format': '[%(asctime)s] [%(levelname)s] %(name)s.%(funcName)s:%(lineno)s - %(message)s', # noqa 'datefmt': "%d/%b/%Y %H:%M:%S" }, }, 'handlers': { 'console': { 'level': 'DEBUG', 'filters': ['require_debug_true'], 'class': 'logging.StreamHandler', 'formatter': 'simple' }, 'file': { 'level': 'WARNING', 'filters': ['require_debug_false'], 'class': 'logging.FileHandler', 'filename': os.path.join(swh_web_config['log_dir'], 'swh-web.log'), 'formatter': 'simple' }, 'file_request': { 'level': 'WARNING', 'filters': ['require_debug_false'], 'class': 'logging.FileHandler', 'filename': os.path.join(swh_web_config['log_dir'], 'swh-web.log'), 'formatter': 'request' }, 'console_verbose': { 'level': 'DEBUG', 'filters': ['require_debug_true'], 'class': 'logging.StreamHandler', 'formatter': 'verbose' }, 'file_verbose': { 'level': 'WARNING', 'filters': ['require_debug_false'], 'class': 'logging.FileHandler', 'filename': os.path.join(swh_web_config['log_dir'], 'swh-web.log'), 'formatter': 'verbose' }, 'null': { 'class': 'logging.NullHandler', }, }, 'loggers': { '': { 'handlers': ['console_verbose', 'file_verbose'], 'level': 'DEBUG' if DEBUG else 'WARNING', }, 'django': { 'handlers': ['console'], 'level': 'DEBUG' if DEBUG else 'WARNING', 'propagate': False, }, 'django.request': { 'handlers': ['file_request'], 'level': 'DEBUG' if DEBUG else 'WARNING', 'propagate': False, }, 'django.db.backends': { 'handlers': ['null'], 'propagate': False }, 'django.utils.autoreload': { 'level': 'INFO', }, }, } WEBPACK_LOADER = { 'DEFAULT': { 'CACHE': False, 'BUNDLE_DIR_NAME': './', 'STATS_FILE': os.path.join(STATIC_DIR, 'webpack-stats.json'), 'POLL_INTERVAL': 0.1, 'TIMEOUT': None, 'IGNORE': ['.+\\.hot-update.js', '.+\\.map'] } } LOGIN_URL = '/admin/login/' LOGIN_REDIRECT_URL = 'admin' SESSION_ENGINE = 'django.contrib.sessions.backends.cache' CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache' }, 'db_cache': { 'BACKEND': 'django.core.cache.backends.db.DatabaseCache', 'LOCATION': 'swh_web_cache', } } JS_REVERSE_JS_MINIFY = False CORS_ORIGIN_ALLOW_ALL = True CORS_URLS_REGEX = r'^/badge/.*$' AUTHENTICATION_BACKENDS = [ 'django.contrib.auth.backends.ModelBackend', 'swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend', ] diff --git a/swh/web/templates/logout.html b/swh/web/templates/logout.html index 989ab0a7..744e7337 100644 --- a/swh/web/templates/logout.html +++ b/swh/web/templates/logout.html @@ -1,27 +1,42 @@ {% extends "layout.html" %} {% comment %} Copyright (C) 2018 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% load static %} {% block title %}Logged out – Software Heritage archive {% endblock %} {% block navbar-content %}
Your authenticated session expired so you have been automatically logged out.
+{% else %}You have been successfully logged out.
+{% endif %}{% if oidc_enabled and 'remote_user' in request.GET %} +{% if 'next_path' in request.GET %} + +{% else %} +{% endif %} {% else %} {% endif %} -Log in again.
+Log in again ? +{% if 'next_path' in request.GET %} ++Or go back to + +previous page. +
+{% endif %} {% endblock %} diff --git a/swh/web/tests/api/test_throttling.py b/swh/web/tests/api/test_throttling.py index 2d42ef14..5d861125 100644 --- a/swh/web/tests/api/test_throttling.py +++ b/swh/web/tests/api/test_throttling.py @@ -1,191 +1,192 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.conf.urls import url from django.contrib.auth.models import User from django.test.utils import override_settings from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.decorators import api_view from swh.web.api.throttling import SwhWebRateThrottle, throttle_scope from swh.web.settings.tests import ( scope1_limiter_rate, scope1_limiter_rate_post, scope2_limiter_rate, scope2_limiter_rate_post, scope3_limiter_rate, scope3_limiter_rate_post ) +from swh.web.urls import urlpatterns class MockViewScope1(APIView): throttle_classes = (SwhWebRateThrottle,) throttle_scope = 'scope1' def get(self, request): return Response('foo_get') def post(self, request): return Response('foo_post') @api_view(['GET', 'POST']) @throttle_scope('scope2') def mock_view_scope2(request): if request.method == 'GET': return Response('bar_get') elif request.method == 'POST': return Response('bar_post') class MockViewScope3(APIView): throttle_classes = (SwhWebRateThrottle,) throttle_scope = 'scope3' def get(self, request): return Response('foo_get') def post(self, request): return Response('foo_post') @api_view(['GET', 'POST']) @throttle_scope('scope3') def mock_view_scope3(request): if request.method == 'GET': return Response('bar_get') elif request.method == 'POST': return Response('bar_post') -urlpatterns = [ +urlpatterns += [ url(r'^scope1_class$', MockViewScope1.as_view()), url(r'^scope2_func$', mock_view_scope2), url(r'^scope3_class$', MockViewScope3.as_view()), url(r'^scope3_func$', mock_view_scope3) ] def check_response(response, status_code, limit=None, remaining=None): assert response.status_code == status_code if limit is not None: assert response['X-RateLimit-Limit'] == str(limit) else: assert 'X-RateLimit-Limit' not in response if remaining is not None: assert response['X-RateLimit-Remaining'] == str(remaining) else: assert 'X-RateLimit-Remaining' not in response @override_settings(ROOT_URLCONF=__name__) def test_scope1_requests_are_throttled(api_client): """ Ensure request rate is limited in scope1 """ for i in range(scope1_limiter_rate): response = api_client.get('/scope1_class') check_response(response, 200, scope1_limiter_rate, scope1_limiter_rate - i - 1) response = api_client.get('/scope1_class') check_response(response, 429, scope1_limiter_rate, 0) for i in range(scope1_limiter_rate_post): response = api_client.post('/scope1_class') check_response(response, 200, scope1_limiter_rate_post, scope1_limiter_rate_post - i - 1) response = api_client.post('/scope1_class') check_response(response, 429, scope1_limiter_rate_post, 0) @override_settings(ROOT_URLCONF=__name__) def test_scope2_requests_are_throttled(api_client): """ Ensure request rate is limited in scope2 """ for i in range(scope2_limiter_rate): response = api_client.get('/scope2_func') check_response(response, 200, scope2_limiter_rate, scope2_limiter_rate - i - 1) response = api_client.get('/scope2_func') check_response(response, 429, scope2_limiter_rate, 0) for i in range(scope2_limiter_rate_post): response = api_client.post('/scope2_func') check_response(response, 200, scope2_limiter_rate_post, scope2_limiter_rate_post - i - 1) response = api_client.post('/scope2_func') check_response(response, 429, scope2_limiter_rate_post, 0) @override_settings(ROOT_URLCONF=__name__) def test_scope3_requests_are_throttled_exempted(api_client): """ Ensure request rate is not limited in scope3 as requests coming from localhost are exempted from rate limit. """ for _ in range(scope3_limiter_rate+1): response = api_client.get('/scope3_class') check_response(response, 200) for _ in range(scope3_limiter_rate_post+1): response = api_client.post('/scope3_class') check_response(response, 200) for _ in range(scope3_limiter_rate+1): response = api_client.get('/scope3_func') check_response(response, 200) for _ in range(scope3_limiter_rate_post+1): response = api_client.post('/scope3_func') check_response(response, 200) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db def test_staff_users_are_not_rate_limited(api_client): staff_user = User.objects.create_user( username='johndoe', password='', is_staff=True) api_client.force_login(staff_user) for _ in range(scope2_limiter_rate+1): response = api_client.get('/scope2_func') check_response(response, 200) for _ in range(scope2_limiter_rate_post+1): response = api_client.post('/scope2_func') check_response(response, 200) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db def test_non_staff_users_are_rate_limited(api_client): user = User.objects.create_user( username='johndoe', password='', is_staff=False) api_client.force_login(user) for i in range(scope2_limiter_rate): response = api_client.get('/scope2_func') check_response(response, 200, scope2_limiter_rate, scope2_limiter_rate - i - 1) response = api_client.get('/scope2_func') check_response(response, 429, scope2_limiter_rate, 0) for i in range(scope2_limiter_rate_post): response = api_client.post('/scope2_func') check_response(response, 200, scope2_limiter_rate_post, scope2_limiter_rate_post - i - 1) response = api_client.post('/scope2_func') check_response(response, 429, scope2_limiter_rate_post, 0) diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py index 98dcc21a..a6cdf3df 100644 --- a/swh/web/tests/auth/keycloak_mock.py +++ b/swh/web/tests/auth/keycloak_mock.py @@ -1,81 +1,86 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from unittest.mock import Mock from django.utils import timezone from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config from .sample_data import oidc_profile, realm_public_key, userinfo class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): - def __init__(self, auth_success=True): + def __init__(self, auth_success=True, exp=None): swhweb_config = get_config() super().__init__(swhweb_config['keycloak']['server_url'], swhweb_config['keycloak']['realm_name'], OIDC_SWH_WEB_CLIENT_ID) self.auth_success = auth_success + self.exp = exp self._keycloak.public_key = lambda: realm_public_key self._keycloak.well_know = lambda: { 'issuer': f'{self.server_url}realms/{self.realm_name}', 'authorization_endpoint': (f'{self.server_url}realms/' f'{self.realm_name}/protocol/' 'openid-connect/auth'), 'token_endpoint': (f'{self.server_url}realms/{self.realm_name}/' 'protocol/openid-connect/token'), 'token_introspection_endpoint': (f'{self.server_url}realms/' f'{self.realm_name}/protocol/' 'openid-connect/token/' 'introspect'), 'userinfo_endpoint': (f'{self.server_url}realms/{self.realm_name}/' 'protocol/openid-connect/userinfo'), 'end_session_endpoint': (f'{self.server_url}realms/' f'{self.realm_name}/protocol/' 'openid-connect/logout'), 'jwks_uri': (f'{self.server_url}realms/{self.realm_name}/' 'protocol/openid-connect/certs'), } self.authorization_code = Mock() self.userinfo = Mock() self.logout = Mock() if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.userinfo.return_value = copy(userinfo) else: self.authorization_url = Mock() exception = Exception('Authentication failed') self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration check as we use a static oidc_profile # for the tests with expired tokens in it options['verify_exp'] = False decoded = super().decode_token(token, options) # tweak auth and exp time for tests expire_in = decoded['exp'] - decoded['auth_time'] - decoded['auth_time'] = int(timezone.now().timestamp()) - decoded['exp'] = decoded['auth_time'] + expire_in + if self.exp is not None: + decoded['exp'] = self.exp + decoded['auth_time'] = self.exp - expire_in + else: + decoded['auth_time'] = int(timezone.now().timestamp()) + decoded['exp'] = decoded['auth_time'] + expire_in decoded['groups'] = ['/staff'] return decoded -def mock_keycloak(mocker, auth_success=True): - kc_oidc_mock = KeycloackOpenIDConnectMock(auth_success) +def mock_keycloak(mocker, auth_success=True, exp=None): + kc_oidc_mock = KeycloackOpenIDConnectMock(auth_success, exp) mock_get_oidc_client = mocker.patch( 'swh.web.auth.views.get_oidc_client') mock_get_oidc_client.return_value = kc_oidc_mock mocker.patch('swh.web.auth.backends._oidc_client', kc_oidc_mock) return kc_oidc_mock diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py new file mode 100644 index 00000000..93c06935 --- /dev/null +++ b/swh/web/tests/auth/test_middlewares.py @@ -0,0 +1,46 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime + +from django.test import modify_settings +import pytest + +from .keycloak_mock import mock_keycloak + +from swh.web.common.utils import reverse + + +@pytest.mark.django_db +@modify_settings(MIDDLEWARE={ + 'remove': ['swh.web.auth.middlewares.OIDCSessionRefreshMiddleware'] +}) +def test_oidc_session_refresh_middleware_disabled(client, mocker): + # authenticate but make session expires immediately + kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp())) + client.login(code='', code_verifier='', redirect_uri='') + kc_oidc_mock.authorization_code.assert_called() + + url = reverse('swh-web-homepage') + resp = client.get(url) + # no redirection for silent refresh + assert resp.status_code != 302 + + +@pytest.mark.django_db +def test_oidc_session_refresh_middleware_enabled(client, mocker): + # authenticate but make session expires immediately + kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp())) + client.login(code='', code_verifier='', redirect_uri='') + kc_oidc_mock.authorization_code.assert_called() + + url = reverse('swh-web-homepage') + resp = client.get(url) + + # should redirect for silent session refresh + assert resp.status_code == 302 + silent_refresh_url = reverse('oidc-login', query_params={'next_path': url, + 'prompt': 'none'}) + assert resp['location'] == silent_refresh_url diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index 19bd7360..e344dceb 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,275 +1,316 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from urllib.parse import urljoin, urlparse import uuid from django.http import QueryDict from django.contrib.auth.models import AnonymousUser, User import pytest from swh.web.auth.models import OIDCUser from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.common.utils import reverse from swh.web.tests.django_asserts import assert_template_used, assert_contains from . import sample_data from .keycloak_mock import mock_keycloak @pytest.mark.django_db def test_oidc_login_views_success(client, mocker): """ Simulate a successful login authentication with OpenID Connect authorization code flow with PKCE. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # user initiates login process login_url = reverse('oidc-login') response = client.get(login_url) request = response.wsgi_request # should redirect to Keycloak authentication page in order # for a user to login with its username / password assert response.status_code == 302 assert isinstance(request.user, AnonymousUser) parsed_url = urlparse(response['location']) authorization_url = kc_oidc_mock.well_known()['authorization_endpoint'] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response['location'], parsed_url.path) == authorization_url assert 'client_id' in query_dict assert query_dict['client_id'] == OIDC_SWH_WEB_CLIENT_ID assert 'response_type' in query_dict assert query_dict['response_type'] == 'code' assert 'redirect_uri' in query_dict assert query_dict['redirect_uri'] == reverse('oidc-login-complete', request=request) assert 'code_challenge_method' in query_dict assert query_dict['code_challenge_method'] == 'S256' assert 'scope' in query_dict assert query_dict['scope'] == 'openid' assert 'state' in query_dict assert 'code_challenge' in query_dict # check a login_data has been registered in user session assert 'login_data' in request.session login_data = request.session['login_data'] assert 'code_verifier' in login_data assert 'state' in login_data assert 'redirect_uri' in login_data assert login_data['redirect_uri'] == query_dict['redirect_uri'] # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-login-complete' view to # login in Django. # generate authorization code / session state in the same # manner as Keycloak code = f'{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}' session_state = str(uuid.uuid4()) login_complete_url = reverse('oidc-login-complete', query_params={'code': code, 'state': login_data['state'], 'session_state': session_state}) # login process finalization response = client.get(login_complete_url) request = response.wsgi_request # should redirect to root url by default assert response.status_code == 302 assert response['location'] == request.build_absolute_uri('/') # user should be authenticated assert isinstance(request.user, OIDCUser) # check remote user has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_oidc_logout_view_success(client, mocker): """ Simulate a successful logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code='', code_verifier='', redirect_uri='') kc_oidc_mock.authorization_code.assert_called() # user initiates logout oidc_logout_url = reverse('oidc-logout') response = client.get(oidc_logout_url) request = response.wsgi_request # should redirect to logout page assert response.status_code == 302 logout_url = reverse('logout', query_params={'remote_user': 1}) assert response['location'] == request.build_absolute_uri(logout_url) # should have been logged out in Keycloak kc_oidc_mock.logout.assert_called_with( sample_data.oidc_profile['refresh_token']) # check effective logout in Django assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_view_failure(client, mocker): """ Simulate a failed authentication with OpenID Connect. """ # mock Keycloak client mock_keycloak(mocker, auth_success=False) # user initiates login process login_url = reverse('oidc-login') response = client.get(login_url) request = response.wsgi_request # should render an error page assert response.status_code == 500 assert_template_used(response, "error.html") # no users should be logged in assert isinstance(request.user, AnonymousUser) # Simulate possible errors with OpenID Connect in the login complete view. def test_oidc_login_complete_view_no_login_data(client, mocker): # user initiates login process login_url = reverse('oidc-login-complete') response = client.get(login_url) # should render an error page assert_template_used(response, "error.html") assert_contains(response, 'Login process has not been initialized.', status_code=500) def test_oidc_login_complete_view_missing_parameters(client, mocker): # simulate login process has been initialized session = client.session session['login_data'] = { 'code_verifier': '', 'state': str(uuid.uuid4()), 'redirect_uri': '', - 'next': None, + 'next_path': '', + 'prompt': '', } session.save() # user initiates login process login_url = reverse('oidc-login-complete') response = client.get(login_url) request = response.wsgi_request # should render an error page assert_template_used(response, "error.html") assert_contains(response, 'Missing query parameters for authentication.', status_code=400) # no user should be logged in assert isinstance(request.user, AnonymousUser) def test_oidc_login_complete_wrong_csrf_token(client, mocker): # mock Keycloak client mock_keycloak(mocker) # simulate login process has been initialized session = client.session session['login_data'] = { 'code_verifier': '', 'state': str(uuid.uuid4()), 'redirect_uri': '', - 'next': None, + 'next_path': '', + 'prompt': '', } session.save() # user initiates login process login_url = reverse('oidc-login-complete', query_params={'code': 'some-code', 'state': 'some-state'}) response = client.get(login_url) request = response.wsgi_request # should render an error page assert_template_used(response, "error.html") assert_contains(response, 'Wrong CSRF token, aborting login process.', status_code=400) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_complete_wrong_code_verifier(client, mocker): # mock Keycloak client mock_keycloak(mocker, auth_success=False) # simulate login process has been initialized session = client.session session['login_data'] = { 'code_verifier': '', 'state': str(uuid.uuid4()), 'redirect_uri': '', - 'next': None, + 'next_path': '', + 'prompt': '', } session.save() # check authentication error is reported login_url = reverse('oidc-login-complete', query_params={'code': 'some-code', 'state': session['login_data']['state']}) response = client.get(login_url) request = response.wsgi_request # should render an error page assert_template_used(response, "error.html") assert_contains(response, 'User authentication failed.', status_code=500) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_logout_view_failure(client, mocker): """ Simulate a failed logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code='', code_verifier='', redirect_uri='') err_msg = 'Authentication server error' kc_oidc_mock.logout.side_effect = Exception(err_msg) # user initiates logout process logout_url = reverse('oidc-logout') response = client.get(logout_url) request = response.wsgi_request # should render an error page assert_template_used(response, "error.html") assert_contains(response, err_msg, status_code=500) # user should be logged out from Django anyway assert isinstance(request.user, AnonymousUser) + + +@pytest.mark.django_db +def test_oidc_silent_refresh_failure(client, mocker): + # mock Keycloak client + mock_keycloak(mocker) + + next_path = reverse('swh-web-homepage') + + # silent session refresh initialization + login_url = reverse('oidc-login', query_params={'next_path': next_path, + 'prompt': 'none'}) + response = client.get(login_url) + request = response.wsgi_request + + login_data = request.session['login_data'] + + # check prompt value has been registered in user session + assert 'prompt' in login_data + assert login_data['prompt'] == 'none' + + # simulate a failed silent session refresh + session_state = str(uuid.uuid4()) + + login_complete_url = reverse('oidc-login-complete', + query_params={'error': 'login_required', + 'state': login_data['state'], + 'session_state': session_state}) + + # login process finalization + response = client.get(login_complete_url) + request = response.wsgi_request + + # should redirect to logout page + assert response.status_code == 302 + logout_url = reverse('logout', query_params={'next_path': next_path, + 'remote_user': 1}) + assert response['location'] == logout_url