diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index ad4f17f1..6f1db104 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,150 +1,180 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta +import hashlib from typing import Any, Dict, Optional from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication -from rest_framework.exceptions import AuthenticationFailed +from rest_framework.exceptions import AuthenticationFailed, ValidationError import sentry_sdk from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import get_oidc_client from swh.web.auth.models import OIDCUser # OpenID Connect client to communicate with Keycloak server _oidc_client: KeycloakOpenIDConnect = get_oidc_client() def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token["preferred_username"], password="", first_name=decoded_token["given_name"], last_name=decoded_token["family_name"], email=decoded_token["email"], ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(_oidc_client.client_id, {}) user.permissions = set(client_resource_access.get("roles", [])) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: # decode JWT token decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = _oidc_user_from_decoded_token(decoded_token) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user class OIDCAuthorizationCodePKCEBackend: def authenticate( self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str ) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = _oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier ) # create Django user user = _oidc_user_from_profile(oidc_profile) # set cache key TTL as access token expiration time assert user.expires_at ttl = int(user.expires_at.timestamp() - timezone.now().timestamp()) # save oidc_profile in cache cache.set(f"oidc_user_{user.id}", oidc_profile, timeout=max(0, ttl)) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get oidc profile from cache oidc_profile = cache.get(f"oidc_user_{user_id}") if oidc_profile: try: user = _oidc_user_from_profile(oidc_profile) # restore auth backend setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): def authenticate(self, request): auth_header = request.META.get("HTTP_AUTHORIZATION") if auth_header is None: return None try: - auth_type, token = auth_header.split(" ", 1) + auth_type, refresh_token = auth_header.split(" ", 1) except ValueError: raise AuthenticationFailed("Invalid HTTP authorization header format") if auth_type != "Bearer": raise AuthenticationFailed( (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") ) try: - # attempt to decode token - decoded_token = _oidc_client.decode_token(token) + + # compute a cache key from the token that does not exceed + # memcached key size limit + hasher = hashlib.sha1() + hasher.update(refresh_token.encode("ascii")) + cache_key = f"api_token_{hasher.hexdigest()}" + + # check if an access token is cached + access_token = cache.get(cache_key) + + # attempt to decode access token + try: + decoded_token = _oidc_client.decode_token(access_token) + except Exception: + # access token is None or it has expired + decoded_token = None + + if access_token is None or decoded_token is None: + # get a new access token from authentication provider + access_token = _oidc_client.refresh_token(refresh_token)["access_token"] + # decode access token + decoded_token = _oidc_client.decode_token(access_token) + # compute access token expiration + exp = datetime.fromtimestamp(decoded_token["exp"]) + ttl = int(exp.timestamp() - timezone.now().timestamp()) + # save access token in cache while it is valid + cache.set(cache_key, access_token, timeout=max(0, ttl)) + # create Django user user = _oidc_user_from_decoded_token(decoded_token) + except UnicodeEncodeError as e: + sentry_sdk.capture_exception(e) + raise ValidationError("Invalid bearer token") except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py index 779ea359..e34563c3 100644 --- a/swh/web/tests/auth/keycloak_mock.py +++ b/swh/web/tests/auth/keycloak_mock.py @@ -1,110 +1,113 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from unittest.mock import Mock from django.utils import timezone from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config from .sample_data import oidc_profile, realm_public_key, userinfo class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): def __init__( self, auth_success=True, exp=None, user_groups=[], user_permissions=[] ): swhweb_config = get_config() super().__init__( swhweb_config["keycloak"]["server_url"], swhweb_config["keycloak"]["realm_name"], OIDC_SWH_WEB_CLIENT_ID, ) self.auth_success = auth_success self.exp = exp self.user_groups = user_groups self.user_permissions = user_permissions self._keycloak.public_key = lambda: realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.authorization_code = Mock() + self.refresh_token = Mock() self.userinfo = Mock() self.logout = Mock() if auth_success: self.authorization_code.return_value = copy(oidc_profile) + self.refresh_token.return_value = copy(oidc_profile) self.userinfo.return_value = copy(userinfo) else: self.authorization_url = Mock() exception = Exception("Authentication failed") self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception + self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration check as we use a static oidc_profile # for the tests with expired tokens in it options["verify_exp"] = False decoded = super().decode_token(token, options) # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["auth_time"] if self.exp is not None: decoded["exp"] = self.exp decoded["auth_time"] = self.exp - expire_in else: decoded["auth_time"] = int(timezone.now().timestamp()) decoded["exp"] = decoded["auth_time"] + expire_in decoded["groups"] = self.user_groups if self.user_permissions: decoded["resource_access"][self.client_id] = { "roles": self.user_permissions } return decoded def mock_keycloak( mocker, auth_success=True, exp=None, user_groups=[], user_permissions=[] ): kc_oidc_mock = KeycloackOpenIDConnectMock( auth_success, exp, user_groups, user_permissions ) mock_get_oidc_client = mocker.patch("swh.web.auth.views.get_oidc_client") mock_get_oidc_client.return_value = kc_oidc_mock mocker.patch("swh.web.auth.backends._oidc_client", kc_oidc_mock) return kc_oidc_mock diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py index fe0929d6..9c0b954f 100644 --- a/swh/web/tests/auth/test_api_auth.py +++ b/swh/web/tests/auth/test_api_auth.py @@ -1,115 +1,114 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.contrib.auth.models import AnonymousUser, User from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse from .keycloak_mock import mock_keycloak from . import sample_data @pytest.mark.django_db def test_drf_django_session_auth_success(mocker, client): """ Check user gets authenticated when querying the web api through a web browser. """ url = reverse("api-1-stat-counters") mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") response = client.get(url) request = response.wsgi_request assert response.status_code == 200 # user should be authenticated assert isinstance(request.user, OIDCUser) # check remoter used has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_success(mocker, api_client): """ Check user gets authenticated when querying the web api through an HTTP client using bearer token authentication. """ url = reverse("api-1-stat-counters") - access_token = sample_data.oidc_profile["access_token"] + refresh_token = sample_data.oidc_profile["refresh_token"] mock_keycloak(mocker) - api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {access_token}") + api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") response = api_client.get(url) request = response.wsgi_request assert response.status_code == 200 # user should be authenticated assert isinstance(request.user, OIDCUser) # check remoter used has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_failure(mocker, api_client): url = reverse("api-1-stat-counters") - access_token = sample_data.oidc_profile["access_token"] + refresh_token = sample_data.oidc_profile["refresh_token"] # check for failed authentication but with expected token format mock_keycloak(mocker, auth_success=False) - api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {access_token}") + api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") response = api_client.get(url) request = response.wsgi_request assert response.status_code == 403 assert isinstance(request.user, AnonymousUser) # check for failed authentication when token format is invalid - mock_keycloak(mocker) - api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format") + api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà") response = api_client.get(url) request = response.wsgi_request - assert response.status_code == 403 + assert response.status_code == 400 assert isinstance(request.user, AnonymousUser) def test_drf_oidc_auth_invalid_or_missing_authorization_type(api_client): url = reverse("api-1-stat-counters") - access_token = sample_data.oidc_profile["access_token"] + refresh_token = sample_data.oidc_profile["refresh_token"] # missing authorization type - api_client.credentials(HTTP_AUTHORIZATION=f"{access_token}") + api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}") response = api_client.get(url) request = response.wsgi_request assert response.status_code == 403 assert isinstance(request.user, AnonymousUser) # invalid authorization type api_client.credentials(HTTP_AUTHORIZATION="Foo token") response = api_client.get(url) request = response.wsgi_request assert response.status_code == 403 assert isinstance(request.user, AnonymousUser) diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py index ea0388cd..3c21efdf 100644 --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -1,208 +1,208 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from django.contrib.auth import authenticate, get_backends import pytest from django.conf import settings from rest_framework.exceptions import AuthenticationFailed from swh.web.auth.backends import OIDCBearerTokenAuthentication from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse from . import sample_data from .keycloak_mock import mock_keycloak def _authenticate_user(request_factory): request = request_factory.get(reverse("oidc-login-complete")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, kc_oidc_mock): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(kc_oidc_mock, {}) assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(mocker, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ kc_oidc_mock = mock_keycloak(mocker, user_groups=["/staff"]) oidc_profile = sample_data.oidc_profile user = _authenticate_user(request_factory) decoded_token = kc_oidc_mock.decode_token(user.access_token) _check_authenticated_user(user, decoded_token, kc_oidc_mock) auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ mock_keycloak(mocker, auth_success=False) user = _authenticate_user(request_factory) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ permission = "webapp.some-permission" mock_keycloak(mocker, user_permissions=[permission]) user = _authenticate_user(request_factory) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() kc_oidc_mock = mock_keycloak(mocker) + refresh_token = sample_data.oidc_profile["refresh_token"] access_token = sample_data.oidc_profile["access_token"] + decoded_token = kc_oidc_mock.decode_token(access_token) - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}") + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) _check_authenticated_user(user, decoded_token, kc_oidc_mock) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() # simulate a failed authentication with a bearer token in expected format mock_keycloak(mocker, auth_success=False) - access_token = sample_data.oidc_profile["access_token"] + refresh_token = sample_data.oidc_profile["refresh_token"] - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}") + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format - mock_keycloak(mocker) - request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid authorization header value. """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() - access_token = sample_data.oidc_profile["access_token"] + refresh_token = sample_data.oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{access_token}") + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_permissions(mocker, api_request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ permission = "webapp.some-permission" mock_keycloak(mocker, user_permissions=[permission]) drf_auth_backend = OIDCBearerTokenAuthentication() - access_token = sample_data.oidc_profile["access_token"] + refresh_token = sample_data.oidc_profile["refresh_token"] url = reverse("api-1-stat-counters") - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}") + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo")