diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index 585724d6..004bf84c 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,164 +1,145 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Optional from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed import sentry_sdk from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import get_oidc_client from swh.web.auth.models import OIDCUser # OpenID Connect client to communicate with Keycloak server _oidc_client: KeycloakOpenIDConnect = get_oidc_client() -def _oidc_user_from_info(userinfo: Dict[str, Any]) -> OIDCUser: +def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal - user_id = int(''.join(userinfo['sub'].split('-')), 16) + user_id = int(''.join(decoded_token['sub'].split('-')), 16) # create a Django user that will not be saved to database user = OIDCUser(id=user_id, - username=userinfo['preferred_username'], + username=decoded_token['preferred_username'], password='', - first_name=userinfo['given_name'], - last_name=userinfo['family_name'], - email=userinfo['email']) + first_name=decoded_token['given_name'], + last_name=decoded_token['family_name'], + email=decoded_token['email']) # set is_staff user property based on groups - user.is_staff = '/staff' in userinfo['groups'] + if 'groups' in decoded_token: + user.is_staff = '/staff' in decoded_token['groups'] - # add userinfo sub to custom User proxy model - user.sub = userinfo['sub'] + # add user sub to custom User proxy model + user.sub = decoded_token['sub'] return user -def _oidc_user_from_profile(oidc_profile: Dict[str, Any], - userinfo: Optional[Dict[str, Any]] = None - ) -> Tuple[OIDCUser, Dict[str, Any]]: - # get access token - access_token = oidc_profile['access_token'] - - # request OIDC userinfo - if userinfo is None: - userinfo = _oidc_client.userinfo(access_token) - - # create OIDCUser from userinfo - user = _oidc_user_from_info(userinfo) +def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: # decode JWT token - decoded_token = _oidc_client.decode_token(access_token) + decoded_token = _oidc_client.decode_token(oidc_profile['access_token']) + + # create OIDCUser from decoded token + user = _oidc_user_from_decoded_token(decoded_token) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token['auth_time']) + exp_datetime = datetime.fromtimestamp(decoded_token['exp']) # compute OIDC tokens expiration date - oidc_profile['access_expiration'] = ( - auth_datetime + - timedelta(seconds=oidc_profile['expires_in'])) - oidc_profile['refresh_expiration'] = ( + oidc_profile['expires_at'] = exp_datetime + oidc_profile['refresh_expires_at'] = ( auth_datetime + timedelta(seconds=oidc_profile['refresh_expires_in'])) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) - return user, userinfo + return user class OIDCAuthorizationCodePKCEBackend: def authenticate(self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = _oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier) # create Django user - user, userinfo = _oidc_user_from_profile(oidc_profile) + user = _oidc_user_from_profile(oidc_profile) - # save authenticated user data in cache - cache.set(f'user_{user.id}', - {'userinfo': userinfo, 'oidc_profile': oidc_profile}, - timeout=oidc_profile['refresh_expires_in']) + # set cache key TTL as access token expiration time + assert user.expires_at + ttl = int(user.expires_at.timestamp() - timezone.now().timestamp()) + + # save oidc_profile in cache + cache.set(f'oidc_user_{user.id}', oidc_profile, + timeout=max(0, ttl)) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: - # get user data from cache - user_oidc_data = cache.get(f'user_{user_id}') - if user_oidc_data: + # get oidc profile from cache + oidc_profile = cache.get(f'oidc_user_{user_id}') + if oidc_profile: try: - user, _ = _oidc_user_from_profile( - user_oidc_data['oidc_profile'], user_oidc_data['userinfo']) + user = _oidc_user_from_profile(oidc_profile) # restore auth backend setattr(user, 'backend', f'{__name__}.{self.__class__.__name__}') return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): def authenticate(self, request): auth_header = request.META.get('HTTP_AUTHORIZATION') if auth_header is None: return None try: auth_type, token = auth_header.split(' ', 1) except ValueError: raise AuthenticationFailed( 'Invalid HTTP authorization header format') if auth_type != 'Bearer': raise AuthenticationFailed( (f'Invalid or unsupported HTTP authorization' f' type ({auth_type}).')) - try: # attempt to decode token - decoded = _oidc_client.decode_token(token) - userinfo = cache.get(decoded['sub']) - if userinfo: - user = _oidc_user_from_info(userinfo) - else: - # get OIDC userinfo - userinfo = _oidc_client.userinfo(token) - # create Django user - user = _oidc_user_from_info(userinfo) - # cache userinfo until token expires - max_ttl = decoded['exp'] - decoded['auth_time'] - ttl = decoded['exp'] - int(timezone.now().timestamp()) - ttl = max(0, min(ttl, max_ttl)) - cache.set(decoded['sub'], userinfo, timeout=ttl) - + decoded_token = _oidc_client.decode_token(token) + # create Django user + user = _oidc_user_from_decoded_token(decoded_token) except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py index 53f4be24..91d7c2ee 100644 --- a/swh/web/auth/models.py +++ b/swh/web/auth/models.py @@ -1,43 +1,43 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from typing import Optional from django.contrib.auth.models import User class OIDCUser(User): """ Custom User proxy model for remote users storing OpenID Connect - related data: profile containing authorization tokens and userinfo. + related data: profile containing authentication tokens. The model is also not saved to database as all users are already stored in the Keycloak one. """ # OIDC subject identifier sub: str = '' # OIDC tokens and session related data, only relevant when a user # authenticates from a web browser access_token: Optional[str] = None - access_expiration: Optional[datetime] = None + expires_at: Optional[datetime] = None id_token: Optional[str] = None refresh_token: Optional[str] = None - refresh_expiration: Optional[datetime] = None + refresh_expires_at: Optional[datetime] = None scope: Optional[str] = None session_state: Optional[str] = None class Meta: app_label = 'swh.web.auth' proxy = True def save(self, **kwargs): """ Override django.db.models.Model.save to avoid saving the remote users to web application database. """ pass diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index f48e9c34..12b9aaca 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,120 +1,120 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import uuid from typing import cast from django.conf.urls import url from django.core.cache import cache from django.contrib.auth import authenticate, login, logout from django.http import HttpRequest from django.http.response import HttpResponse, HttpResponseRedirect from swh.web.auth.models import OIDCUser from swh.web.auth.utils import gen_oidc_pkce_codes, get_oidc_client from swh.web.common.exc import handle_view_exception, BadInputExc from swh.web.common.utils import reverse def oidc_login(request: HttpRequest) -> HttpResponse: """ Django view to initiate login process using OpenID Connect. """ # generate a CSRF token state = str(uuid.uuid4()) redirect_uri = reverse('oidc-login-complete', request=request) code_verifier, code_challenge = gen_oidc_pkce_codes() request.session['login_data'] = { 'code_verifier': code_verifier, 'state': state, 'redirect_uri': redirect_uri, 'next_path': request.GET.get('next_path'), } authorization_url_params = { 'state': state, 'code_challenge': code_challenge, 'code_challenge_method': 'S256', 'scope': 'openid', } try: oidc_client = get_oidc_client() authorization_url = oidc_client.authorization_url( redirect_uri, **authorization_url_params) return HttpResponseRedirect(authorization_url) except Exception as e: return handle_view_exception(request, e) def oidc_login_complete(request: HttpRequest) -> HttpResponse: """ Django view to finalize login process using OpenID Connect. """ try: if 'login_data' not in request.session: raise Exception('Login process has not been initialized.') if 'code' not in request.GET and 'state' not in request.GET: raise BadInputExc('Missing query parameters for authentication.') # get CSRF token returned by OIDC server state = request.GET['state'] login_data = request.session['login_data'] if state != login_data['state']: raise BadInputExc('Wrong CSRF token, aborting login process.') user = authenticate(request=request, code=request.GET['code'], code_verifier=login_data['code_verifier'], redirect_uri=login_data['redirect_uri']) if user is None: raise Exception('User authentication failed.') login(request, user) redirect_url = (login_data['next_path'] or request.build_absolute_uri('/')) return HttpResponseRedirect(redirect_url) except Exception as e: return handle_view_exception(request, e) def oidc_logout(request: HttpRequest) -> HttpResponse: """ Django view to logout using OpenID Connect. """ try: user = request.user logout(request) if hasattr(user, 'refresh_token'): oidc_client = get_oidc_client() user = cast(OIDCUser, user) refresh_token = cast(str, user.refresh_token) # end OpenID Connect session oidc_client.logout(refresh_token) # remove user data from cache - cache.delete(f'user_{user.id}') + cache.delete(f'oidc_user_{user.id}') logout_url = reverse('logout', query_params={'remote_user': 1}) return HttpResponseRedirect(request.build_absolute_uri(logout_url)) except Exception as e: return handle_view_exception(request, e) urlpatterns = [ url(r'^oidc/login/$', oidc_login, name='oidc-login'), url(r'^oidc/login-complete/$', oidc_login_complete, name='oidc-login-complete'), url(r'^oidc/logout/$', oidc_logout, name='oidc-logout'), ] diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py index e6008648..98dcc21a 100644 --- a/swh/web/tests/auth/keycloak_mock.py +++ b/swh/web/tests/auth/keycloak_mock.py @@ -1,77 +1,81 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from unittest.mock import Mock from django.utils import timezone from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config from .sample_data import oidc_profile, realm_public_key, userinfo class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): def __init__(self, auth_success=True): swhweb_config = get_config() super().__init__(swhweb_config['keycloak']['server_url'], swhweb_config['keycloak']['realm_name'], OIDC_SWH_WEB_CLIENT_ID) + self.auth_success = auth_success self._keycloak.public_key = lambda: realm_public_key self._keycloak.well_know = lambda: { 'issuer': f'{self.server_url}realms/{self.realm_name}', 'authorization_endpoint': (f'{self.server_url}realms/' f'{self.realm_name}/protocol/' 'openid-connect/auth'), 'token_endpoint': (f'{self.server_url}realms/{self.realm_name}/' 'protocol/openid-connect/token'), 'token_introspection_endpoint': (f'{self.server_url}realms/' f'{self.realm_name}/protocol/' 'openid-connect/token/' 'introspect'), 'userinfo_endpoint': (f'{self.server_url}realms/{self.realm_name}/' 'protocol/openid-connect/userinfo'), 'end_session_endpoint': (f'{self.server_url}realms/' f'{self.realm_name}/protocol/' 'openid-connect/logout'), 'jwks_uri': (f'{self.server_url}realms/{self.realm_name}/' 'protocol/openid-connect/certs'), } self.authorization_code = Mock() self.userinfo = Mock() self.logout = Mock() if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.userinfo.return_value = copy(userinfo) else: self.authorization_url = Mock() exception = Exception('Authentication failed') self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception def decode_token(self, token): - # skip signature expiration check as we use a static oidc_profile - # for the tests with expired tokens in it - options = {'verify_exp': False} + options = {} + if self.auth_success: + # skip signature expiration check as we use a static oidc_profile + # for the tests with expired tokens in it + options['verify_exp'] = False decoded = super().decode_token(token, options) # tweak auth and exp time for tests expire_in = decoded['exp'] - decoded['auth_time'] decoded['auth_time'] = int(timezone.now().timestamp()) decoded['exp'] = decoded['auth_time'] + expire_in + decoded['groups'] = ['/staff'] return decoded def mock_keycloak(mocker, auth_success=True): kc_oidc_mock = KeycloackOpenIDConnectMock(auth_success) mock_get_oidc_client = mocker.patch( 'swh.web.auth.views.get_oidc_client') mock_get_oidc_client.return_value = kc_oidc_mock mocker.patch('swh.web.auth.backends._oidc_client', kc_oidc_mock) return kc_oidc_mock diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py index fac696ae..58cc6cb8 100644 --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -1,163 +1,150 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from django.contrib.auth import authenticate, get_backends import pytest from django.conf import settings from rest_framework.exceptions import AuthenticationFailed from swh.web.auth.backends import OIDCBearerTokenAuthentication from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse from . import sample_data from .keycloak_mock import mock_keycloak def _authenticate_user(request_factory): request = request_factory.get(reverse('oidc-login-complete')) return authenticate(request=request, code='some-code', code_verifier='some-code-verifier', redirect_uri='https://localhost:5004') -def _check_authenticated_user(user): - userinfo = sample_data.userinfo +def _check_authenticated_user(user, decoded_token): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 - assert user.username == userinfo['preferred_username'] + assert user.username == decoded_token['preferred_username'] assert user.password == '' - assert user.first_name == userinfo['given_name'] - assert user.last_name == userinfo['family_name'] - assert user.email == userinfo['email'] - assert user.is_staff == ('/staff' in userinfo['groups']) - assert user.sub == userinfo['sub'] + assert user.first_name == decoded_token['given_name'] + assert user.last_name == decoded_token['family_name'] + assert user.email == decoded_token['email'] + assert user.is_staff == ('/staff' in decoded_token['groups']) + assert user.sub == decoded_token['sub'] @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(mocker, request_factory): kc_oidc_mock = mock_keycloak(mocker) oidc_profile = sample_data.oidc_profile user = _authenticate_user(request_factory) - _check_authenticated_user(user) + decoded_token = kc_oidc_mock.decode_token(user.access_token) + _check_authenticated_user(user, decoded_token) - decoded_token = kc_oidc_mock.decode_token( - sample_data.oidc_profile['access_token']) auth_datetime = datetime.fromtimestamp(decoded_token['auth_time']) - - access_expiration = ( - auth_datetime + timedelta(seconds=oidc_profile['expires_in'])) - refresh_expiration = ( + exp_datetime = datetime.fromtimestamp(decoded_token['exp']) + refresh_exp_datetime = ( auth_datetime + timedelta(seconds=oidc_profile['refresh_expires_in'])) assert user.access_token == oidc_profile['access_token'] - assert user.access_expiration == access_expiration + assert user.expires_at == exp_datetime assert user.id_token == oidc_profile['id_token'] assert user.refresh_token == oidc_profile['refresh_token'] - assert user.refresh_expiration == refresh_expiration + assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile['scope'] assert user.session_state == oidc_profile['session_state'] backend_path = 'swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend' assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory): mock_keycloak(mocker, auth_success=False) user = _authenticate_user(request_factory) assert user is None @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory): url = reverse('api-1-stat-counters') drf_auth_backend = OIDCBearerTokenAuthentication() kc_oidc_mock = mock_keycloak(mocker) access_token = sample_data.oidc_profile['access_token'] + decoded_token = kc_oidc_mock.decode_token(access_token) request = api_request_factory.get( url, HTTP_AUTHORIZATION=f"Bearer {access_token}") - # first authentication user, _ = drf_auth_backend.authenticate(request) - _check_authenticated_user(user) + _check_authenticated_user(user, decoded_token) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, 'access_token') and user.access_token is None - # second authentication, should fetch userinfo from cache - # until token expires - user, _ = drf_auth_backend.authenticate(request) - _check_authenticated_user(user) - assert hasattr(user, 'access_token') and user.access_token is None - - # check user request to keycloak has been sent only once - kc_oidc_mock.userinfo.assert_called_once_with(access_token) - @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory): url = reverse('api-1-stat-counters') drf_auth_backend = OIDCBearerTokenAuthentication() # simulate a failed authentication with a bearer token in expected format mock_keycloak(mocker, auth_success=False) access_token = sample_data.oidc_profile['access_token'] request = api_request_factory.get( url, HTTP_AUTHORIZATION=f"Bearer {access_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format mock_keycloak(mocker) request = api_request_factory.get( url, HTTP_AUTHORIZATION=f"Bearer invalid-token-format") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory): url = reverse('api-1-stat-counters') drf_auth_backend = OIDCBearerTokenAuthentication() access_token = sample_data.oidc_profile['access_token'] # Invalid authorization type request = api_request_factory.get( url, HTTP_AUTHORIZATION=f"Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get( url, HTTP_AUTHORIZATION=f"{access_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request)