diff --git a/requirements-django.txt b/requirements-django.txt index 6f7287b..07ef416 100644 --- a/requirements-django.txt +++ b/requirements-django.txt @@ -1,3 +1,3 @@ django < 3 +djangorestframework sentry-sdk - diff --git a/swh/auth/django/backends.py b/swh/auth/django/backends.py index 8057e05..29e992c 100644 --- a/swh/auth/django/backends.py +++ b/swh/auth/django/backends.py @@ -1,112 +1,201 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from datetime import datetime +import hashlib from typing import Any, Dict, Optional from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone +from rest_framework.authentication import BaseAuthentication +from rest_framework.exceptions import AuthenticationFailed, ValidationError import sentry_sdk from swh.auth.django.models import OIDCUser from swh.auth.django.utils import ( keycloak_oidc_client, oidc_profile_cache_key, + oidc_user_from_decoded_token, oidc_user_from_profile, ) -from swh.auth.keycloak import KeycloakOpenIDConnect +from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect def _update_cached_oidc_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any], user: OIDCUser ) -> None: """ Update cached OIDC profile associated to a user if needed: when the profile is not stored in cache or when the authentication tokens have changed. Args: oidc_client: KeycloakOpenID wrapper oidc_profile: OIDC profile used to authenticate a user user: django model representing the authenticated user """ # put OIDC profile in cache or update it after token renewal cache_key = oidc_profile_cache_key(oidc_client, user.id) if ( cache.get(cache_key) is None or user.access_token != oidc_profile["access_token"] ): # set cache key TTL as refresh token expiration time assert user.refresh_expires_at ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) # save oidc_profile in cache cache.set(cache_key, user.oidc_profile, timeout=max(0, ttl)) class OIDCAuthorizationCodePKCEBackend: """ Django authentication backend using Keycloak OpenID Connect authorization code flow with PKCE ("Proof Key for Code Exchange"). To use that backend globally in your django application, proceed as follow: * add ``"swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend"`` to the ``AUTHENTICATION_BACKENDS`` django setting * configure Keycloak URL, realm and client by adding ``SWH_AUTH_SERVER_URL``, ``SWH_AUTH_REALM_NAME`` and ``SWH_AUTH_CLIENT_ID`` in django settings * add ``swh.auth.django.views.urlpatterns`` to your django application URLs * add an HTML link targeting the ``"oidc-login"`` django view in your application views * once a user is logged in, add an HTML link targeting the ``"oidc-logout"`` django view in your application views (a ``next_path`` query parameter can be used to redirect to a view of choice once the user is logged out) """ def authenticate( self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str ) -> Optional[OIDCUser]: user = None try: oidc_client = keycloak_oidc_client() # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier ) # create Django user user = oidc_user_from_profile(oidc_client, oidc_profile) # update cached oidc profile if needed _update_cached_oidc_profile(oidc_client, oidc_profile, user) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get oidc profile from cache oidc_client = keycloak_oidc_client() oidc_profile = cache.get(oidc_profile_cache_key(oidc_client, user_id)) if oidc_profile: try: user = oidc_user_from_profile(oidc_client, oidc_profile) # update cached oidc profile if needed _update_cached_oidc_profile(oidc_client, oidc_profile, user) # restore auth backend setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None + + +class OIDCBearerTokenAuthentication(BaseAuthentication): + """ + Django REST Framework authentication backend using bearer tokens for + Keycloak OpenID Connect. + + It enables to authenticate a Web API user by sending a long-lived + OpenID Connect refresh token in HTTP Authorization headers. + Long lived refresh tokens can be generated by opening an OpenID Connect + session with the following scope: ``openid offline_access``. + + To use that backend globally in your DRF application, proceed as follow: + + * add ``"swh.auth.django.backends.OIDCBearerTokenAuthentication"`` + to the ``REST_FRAMEWORK["DEFAULT_AUTHENTICATION_CLASSES"]`` + django setting. + + * configure Keycloak URL, realm and client by adding + ``SWH_AUTH_SERVER_URL``, ``SWH_AUTH_REALM_NAME`` and ``SWH_AUTH_CLIENT_ID`` + in django settings + + Users will then be able to perform authenticated Web API calls by sending + their refresh token in HTTP Authorization headers, for instance: + ``curl -H "Authorization: Bearer ${TOKEN}" https://...``. + + """ + + def authenticate(self, request): + auth_header = request.META.get("HTTP_AUTHORIZATION") + if auth_header is None: + return None + + try: + auth_type, refresh_token = auth_header.split(" ", 1) + except ValueError: + raise AuthenticationFailed("Invalid HTTP authorization header format") + + if auth_type != "Bearer": + raise AuthenticationFailed( + (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") + ) + try: + + oidc_client = keycloak_oidc_client() + + # compute a cache key from the token that does not exceed + # memcached key size limit + hasher = hashlib.sha1() + hasher.update(refresh_token.encode("ascii")) + cache_key = f"api_token_{hasher.hexdigest()}" + + # check if an access token is cached + access_token = cache.get(cache_key) + + if access_token is not None: + # attempt to decode access token + try: + decoded_token = oidc_client.decode_token(access_token) + # access token has expired + except ExpiredSignatureError: + decoded_token = None + + if access_token is None or decoded_token is None: + # get a new access token from authentication provider + access_token = oidc_client.refresh_token(refresh_token)["access_token"] + # decode access token + decoded_token = oidc_client.decode_token(access_token) + # compute access token expiration + exp = datetime.fromtimestamp(decoded_token["exp"]) + ttl = int(exp.timestamp() - timezone.now().timestamp()) + # save access token in cache while it is valid + cache.set(cache_key, access_token, timeout=max(0, ttl)) + + # create Django user + user = oidc_user_from_decoded_token(decoded_token, oidc_client.client_id) + except UnicodeEncodeError as e: + sentry_sdk.capture_exception(e) + raise ValidationError("Invalid bearer token") + except Exception as e: + sentry_sdk.capture_exception(e) + raise AuthenticationFailed(str(e)) + + return user, None diff --git a/swh/auth/tests/conftest.py b/swh/auth/tests/conftest.py index ab201d6..2a1c929 100644 --- a/swh/auth/tests/conftest.py +++ b/swh/auth/tests/conftest.py @@ -1,12 +1,25 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest +from rest_framework.test import APIClient, APIRequestFactory # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf + + +# Fixture to get test client from Django REST Framework +@pytest.fixture(scope="module") +def api_client(): + return APIClient() + + +# Fixture to get API request factory from Django REST Framework +@pytest.fixture(scope="module") +def api_request_factory(): + return APIRequestFactory() diff --git a/swh/auth/tests/django/app/apptest/settings.py b/swh/auth/tests/django/app/apptest/settings.py index 08f16de..aefc252 100644 --- a/swh/auth/tests/django/app/apptest/settings.py +++ b/swh/auth/tests/django/app/apptest/settings.py @@ -1,41 +1,49 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.auth.tests.sample_data import CLIENT_ID, REALM_NAME, SERVER_URL SECRET_KEY = "o+&ayiuk(y^wh4ijz5e=c2$$kjj7g^6r%z+8d*c0lbpfs##k#7" INSTALLED_APPS = [ "django.contrib.auth", "django.contrib.contenttypes", "django.contrib.sessions", "swh.auth.tests.django.app.apptest", ] MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", ] ROOT_URLCONF = "swh.auth.tests.django.app.apptest.urls" DATABASES = { "default": {"ENGINE": "django.db.backends.sqlite3", "NAME": "swh-auth-test-db",} } SESSION_ENGINE = "django.contrib.sessions.backends.cache" AUTHENTICATION_BACKENDS = [ "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", ] SWH_AUTH_SERVER_URL = SERVER_URL SWH_AUTH_REALM_NAME = REALM_NAME SWH_AUTH_CLIENT_ID = CLIENT_ID + +REST_FRAMEWORK = { + "DEFAULT_RENDERER_CLASSES": ("rest_framework.renderers.JSONRenderer",), + "DEFAULT_AUTHENTICATION_CLASSES": [ + "rest_framework.authentication.SessionAuthentication", + "swh.auth.django.backends.OIDCBearerTokenAuthentication", + ], +} diff --git a/swh/auth/tests/django/app/apptest/urls.py b/swh/auth/tests/django/app/apptest/urls.py index 0235d1c..9011183 100644 --- a/swh/auth/tests/django/app/apptest/urls.py +++ b/swh/auth/tests/django/app/apptest/urls.py @@ -1,16 +1,26 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.conf.urls import url from django.http import HttpResponse +from rest_framework.decorators import api_view +from rest_framework.response import Response from swh.auth.django.views import urlpatterns as auth_urlpatterns def _root_view(request): return HttpResponse("Hello World !") -urlpatterns = [url(r"^$", _root_view, name="root")] + auth_urlpatterns +@api_view() +def _api_view_test(request): + return Response({"message": "Hello World !"}) + + +urlpatterns = [ + url(r"^$", _root_view, name="root"), + url(r"^api/test$", _api_view_test, name="api-test"), +] + auth_urlpatterns diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py index dcda102..9b57813 100644 --- a/swh/auth/tests/django/test_backends.py +++ b/swh/auth/tests/django/test_backends.py @@ -1,151 +1,256 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from unittest.mock import Mock from django.conf import settings from django.contrib.auth import authenticate, get_backends import pytest +from rest_framework.exceptions import AuthenticationFailed +from swh.auth.django.backends import OIDCBearerTokenAuthentication from swh.auth.django.models import OIDCUser from swh.auth.django.utils import reverse from swh.auth.keycloak import ExpiredSignatureError def _authenticate_user(request_factory): request = request_factory.get(reverse("root")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, keycloak_oidc): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(keycloak_oidc.client_id, {}) assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(keycloak_oidc, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_oidc.user_groups = ["/staff"] oidc_profile = keycloak_oidc.login() user = _authenticate_user(request_factory) decoded_token = keycloak_oidc.decode_token(user.access_token) _check_authenticated_user(user, decoded_token, keycloak_oidc) auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(keycloak_oidc, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_oidc.set_auth_success(False) user = _authenticate_user(request_factory) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_success( keycloak_oidc, request_factory ): """ Checks access token renewal success using refresh token. """ oidc_profile = keycloak_oidc.login() decoded_token = keycloak_oidc.decode_token(oidc_profile["access_token"]) keycloak_oidc.decode_token = Mock() keycloak_oidc.decode_token.side_effect = [ ExpiredSignatureError("access token token has expired"), decoded_token, ] user = _authenticate_user(request_factory) oidc_profile = keycloak_oidc.login() keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is not None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_failure( keycloak_oidc, request_factory ): """ Checks access token renewal failure using refresh token. """ keycloak_oidc.decode_token = Mock() keycloak_oidc.decode_token.side_effect = ExpiredSignatureError( "access token token has expired" ) keycloak_oidc.refresh_token.side_effect = Exception("OIDC session has expired") user = _authenticate_user(request_factory) oidc_profile = keycloak_oidc.login() keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(keycloak_oidc, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ permission = "webapp.some-permission" keycloak_oidc.user_permissions = [permission] user = _authenticate_user(request_factory) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_backend_success(keycloak_oidc, api_request_factory): + """ + Checks successful login based on OpenID Connect bearer token Django REST + Framework authentication backend (Web API login). + """ + url = reverse("api-test") + drf_auth_backend = OIDCBearerTokenAuthentication() + + oidc_profile = keycloak_oidc.login() + refresh_token = oidc_profile["refresh_token"] + access_token = oidc_profile["access_token"] + + decoded_token = keycloak_oidc.decode_token(access_token) + + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") + + user, _ = drf_auth_backend.authenticate(request) + _check_authenticated_user(user, decoded_token, keycloak_oidc) + # oidc_profile is not filled when authenticating through bearer token + assert hasattr(user, "access_token") and user.access_token is None + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_oidc, api_request_factory): + """ + Checks failed login based on OpenID Connect bearer token Django REST + Framework authentication backend (Web API login). + """ + url = reverse("api-test") + drf_auth_backend = OIDCBearerTokenAuthentication() + + oidc_profile = keycloak_oidc.login() + + # simulate a failed authentication with a bearer token in expected format + keycloak_oidc.set_auth_success(False) + + refresh_token = oidc_profile["refresh_token"] + + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") + + with pytest.raises(AuthenticationFailed): + drf_auth_backend.authenticate(request) + + # simulate a failed authentication with an invalid bearer token format + request = api_request_factory.get( + url, HTTP_AUTHORIZATION="Bearer invalid-token-format" + ) + + with pytest.raises(AuthenticationFailed): + drf_auth_backend.authenticate(request) + + +def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_oidc, api_request_factory): + """ + Checks failed login based on OpenID Connect bearer token Django REST + Framework authentication backend (Web API login) due to invalid + authorization header value. + """ + url = reverse("api-test") + drf_auth_backend = OIDCBearerTokenAuthentication() + + oidc_profile = keycloak_oidc.login() + refresh_token = oidc_profile["refresh_token"] + + # Invalid authorization type + request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") + + with pytest.raises(AuthenticationFailed): + drf_auth_backend.authenticate(request) + + # Missing authorization type + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") + + with pytest.raises(AuthenticationFailed): + drf_auth_backend.authenticate(request) + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_backend_permissions( + keycloak_oidc, api_request_factory +): + """ + Checks that a permission defined with OpenID Connect is correctly mapped + to a Django one when using bearer token authentication. + """ + permission = "webapp.some-permission" + keycloak_oidc.user_permissions = [permission] + + drf_auth_backend = OIDCBearerTokenAuthentication() + oidc_profile = keycloak_oidc.login() + refresh_token = oidc_profile["refresh_token"] + url = reverse("api-test") + request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") + user, _ = drf_auth_backend.authenticate(request) + + assert user.has_perm(permission) + assert user.get_all_permissions() == {permission} + assert user.get_group_permissions() == {permission} + assert user.has_module_perms("webapp") + assert not user.has_module_perms("foo") diff --git a/swh/auth/tests/django/test_drf_bearer_token_auth.py b/swh/auth/tests/django/test_drf_bearer_token_auth.py new file mode 100644 index 0000000..44d1190 --- /dev/null +++ b/swh/auth/tests/django/test_drf_bearer_token_auth.py @@ -0,0 +1,109 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from django.contrib.auth.models import AnonymousUser, User +import pytest + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import reverse + + +@pytest.mark.django_db +def test_drf_django_session_auth_success(keycloak_oidc, client): + """ + Check user gets authenticated when querying the web api + through a web browser. + """ + url = reverse("api-test") + + client.login(code="", code_verifier="", redirect_uri="") + + response = client.get(url) + assert response.status_code == 200 + request = response.wsgi_request + + # user should be authenticated + assert isinstance(request.user, OIDCUser) + + # check remoter used has not been saved to Django database + with pytest.raises(User.DoesNotExist): + User.objects.get(username=request.user.username) + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_success(keycloak_oidc, api_client): + """ + Check user gets authenticated when querying the web api + through an HTTP client using bearer token authentication. + """ + url = reverse("api-test") + + oidc_profile = keycloak_oidc.login() + refresh_token = oidc_profile["refresh_token"] + + api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") + + response = api_client.get(url) + assert response.status_code == 200 + request = response.wsgi_request + + # user should be authenticated + assert isinstance(request.user, OIDCUser) + + # check remoter used has not been saved to Django database + with pytest.raises(User.DoesNotExist): + User.objects.get(username=request.user.username) + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_failure(keycloak_oidc, api_client): + url = reverse("api-test") + + oidc_profile = keycloak_oidc.login() + refresh_token = oidc_profile["refresh_token"] + + # check for failed authentication but with expected token format + keycloak_oidc.set_auth_success(False) + api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") + + response = api_client.get(url) + assert response.status_code == 403 + request = response.wsgi_request + + assert isinstance(request.user, AnonymousUser) + + # check for failed authentication when token format is invalid + api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà") + + response = api_client.get(url) + assert response.status_code == 400 + request = response.wsgi_request + + assert isinstance(request.user, AnonymousUser) + + +def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_oidc, api_client): + url = reverse("api-test") + + oidc_profile = keycloak_oidc.login() + refresh_token = oidc_profile["refresh_token"] + + # missing authorization type + api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}") + + response = api_client.get(url) + assert response.status_code == 403 + request = response.wsgi_request + + assert isinstance(request.user, AnonymousUser) + + # invalid authorization type + api_client.credentials(HTTP_AUTHORIZATION="Foo token") + + response = api_client.get(url) + assert response.status_code == 403 + request = response.wsgi_request + + assert isinstance(request.user, AnonymousUser)