diff --git a/requirements-swh.txt b/requirements-swh.txt index 16747686..b95db0f5 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,8 +1,8 @@ -swh.auth >= 0.3.4 +swh.auth[django] >= 0.3.7 swh.core >= 0.0.95 swh.indexer >= 0.4.1 swh.model >= 0.5.0 swh.scheduler >= 0.7.0 swh.search >= 0.2.0 swh.storage >= 0.11.10 swh.vault >= 0.0.33 diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index f0022893..1ad9523d 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,190 +1,190 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import hashlib from typing import Any, Dict, Optional import sentry_sdk from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed, ValidationError -from swh.web.auth.models import OIDCUser +from swh.auth.django.models import OIDCUser from swh.web.auth.utils import get_oidc_client def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token["preferred_username"], password="", first_name=decoded_token["given_name"], last_name=decoded_token["family_name"], email=decoded_token["email"], ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(get_oidc_client().client_id, {}) user.permissions = set(client_resource_access.get("roles", [])) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: oidc_client = get_oidc_client() # decode JWT token try: access_token = oidc_profile["access_token"] decoded_token = oidc_client.decode_token(access_token) # access token has expired or is invalid except Exception: # get a new access token from authentication provider oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = _oidc_user_from_decoded_token(decoded_token) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) # put OIDC profile in cache or update it after token renewal cache_key = f"oidc_user_{user.id}" if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]: # set cache key TTL as refresh token expiration time assert user.refresh_expires_at ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) # save oidc_profile in cache cache.set(cache_key, oidc_profile, timeout=max(0, ttl)) return user class OIDCAuthorizationCodePKCEBackend: def authenticate( self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str ) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = get_oidc_client().authorization_code( code, redirect_uri, code_verifier=code_verifier ) # create Django user user = _oidc_user_from_profile(oidc_profile) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get oidc profile from cache oidc_profile = cache.get(f"oidc_user_{user_id}") if oidc_profile: try: user = _oidc_user_from_profile(oidc_profile) # restore auth backend setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): def authenticate(self, request): auth_header = request.META.get("HTTP_AUTHORIZATION") if auth_header is None: return None try: auth_type, refresh_token = auth_header.split(" ", 1) except ValueError: raise AuthenticationFailed("Invalid HTTP authorization header format") if auth_type != "Bearer": raise AuthenticationFailed( (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") ) try: oidc_client = get_oidc_client() # compute a cache key from the token that does not exceed # memcached key size limit hasher = hashlib.sha1() hasher.update(refresh_token.encode("ascii")) cache_key = f"api_token_{hasher.hexdigest()}" # check if an access token is cached access_token = cache.get(cache_key) # attempt to decode access token try: decoded_token = oidc_client.decode_token(access_token) except Exception: # access token is None or it has expired decoded_token = None if access_token is None or decoded_token is None: # get a new access token from authentication provider access_token = oidc_client.refresh_token(refresh_token)["access_token"] # decode access token decoded_token = oidc_client.decode_token(access_token) # compute access token expiration exp = datetime.fromtimestamp(decoded_token["exp"]) ttl = int(exp.timestamp() - timezone.now().timestamp()) # save access token in cache while it is valid cache.set(cache_key, access_token, timeout=max(0, ttl)) # create Django user user = _oidc_user_from_decoded_token(decoded_token) except UnicodeEncodeError as e: sentry_sdk.capture_exception(e) raise ValidationError("Invalid bearer token") except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py index a3e0356f..5ac29aea 100644 --- a/swh/web/auth/models.py +++ b/swh/web/auth/models.py @@ -1,37 +1,20 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.db import models -from swh.auth.django.models import OIDCUser as OIDCUserBase - - -class OIDCUser(OIDCUserBase): - """ - Custom User proxy model for remote users storing OpenID Connect - related data: profile containing authentication tokens. - - The model is also not saved to database as all users are already stored - in the Keycloak one. - """ - - class Meta: - app_label = "swh_web_auth" - proxy = True - auto_created = True # prevent model to be created in database by migrations - class OIDCUserOfflineTokens(models.Model): """ Model storing encrypted bearer tokens generated by users. """ user_id = models.CharField(max_length=50) creation_date = models.DateTimeField(auto_now_add=True) offline_token = models.BinaryField() class Meta: app_label = "swh_web_auth" db_table = "oidc_user_offline_tokens" diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index e68d558e..f1b56647 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,272 +1,273 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, cast import uuid from cryptography.fernet import InvalidToken from django.conf.urls import url from django.contrib.auth import authenticate, login, logout from django.contrib.auth.decorators import login_required from django.core.cache import cache from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseForbidden, HttpResponseRedirect, JsonResponse, ) from django.shortcuts import render from django.views.decorators.http import require_http_methods -from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens +from swh.auth.django.models import OIDCUser +from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import ( decrypt_data, encrypt_data, gen_oidc_pkce_codes, get_oidc_client, ) from swh.web.common.exc import BadInputExc, ForbiddenExc from swh.web.common.utils import reverse from swh.web.config import get_config def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"): # generate a CSRF token state = str(uuid.uuid4()) code_verifier, code_challenge = gen_oidc_pkce_codes() request.session["login_data"] = { "code_verifier": code_verifier, "state": state, "redirect_uri": redirect_uri, "next_path": request.GET.get("next_path", ""), } authorization_url_params = { "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", "scope": scope, } oidc_client = get_oidc_client() authorization_url = oidc_client.authorization_url( redirect_uri, **authorization_url_params ) return HttpResponseRedirect(authorization_url) def oidc_login(request: HttpRequest) -> HttpResponse: """ Django view to initiate login process using OpenID Connect. """ redirect_uri = reverse("oidc-login-complete", request=request) return _oidc_login(request, redirect_uri=redirect_uri) def _get_login_data(request: HttpRequest) -> Dict[str, Any]: if "login_data" not in request.session: raise Exception("Login process has not been initialized.") return request.session["login_data"] def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]): if "code" not in request.GET or "state" not in request.GET: raise BadInputExc("Missing query parameters for authentication.") # get CSRF token returned by OIDC server state = request.GET["state"] if state != login_data["state"]: raise BadInputExc("Wrong CSRF token, aborting login process.") def oidc_login_complete(request: HttpRequest) -> HttpResponse: """ Django view to finalize login process using OpenID Connect. """ login_data = _get_login_data(request) next_path = login_data["next_path"] or request.build_absolute_uri("/") if "error" in request.GET: raise Exception(request.GET["error"]) _check_login_data(request, login_data) user = authenticate( request=request, code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) if user is None: raise Exception("User authentication failed.") login(request, user) return HttpResponseRedirect(next_path) def oidc_logout(request: HttpRequest) -> HttpResponse: """ Django view to logout using OpenID Connect. """ user = request.user logout(request) if hasattr(user, "refresh_token"): oidc_client = get_oidc_client() user = cast(OIDCUser, user) refresh_token = cast(str, user.refresh_token) # end OpenID Connect session oidc_client.logout(refresh_token) # remove user data from cache cache.delete(f"oidc_user_{user.id}") logout_url = reverse("logout", query_params={"remote_user": 1}) return HttpResponseRedirect(request.build_absolute_uri(logout_url)) def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) return _oidc_login( request, redirect_uri=redirect_uri, scope="openid offline_access" ) def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): raise ForbiddenExc("You are not allowed to generate bearer tokens.") if "error" in request.GET: raise Exception(request.GET["error"]) oidc_client = get_oidc_client() login_data = _get_login_data(request) _check_login_data(request, login_data) oidc_profile = oidc_client.authorization_code( code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) user = cast(OIDCUser, request.user) token = oidc_profile["refresh_token"] secret = get_config()["secret_key"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), secret, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, secret, salt) return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") except InvalidToken: return HttpResponse(status=401) @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, secret, salt) oidc_client = get_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) @login_required(login_url="/oidc/login/", redirect_field_name="next_path") def _oidc_profile_view(request: HttpRequest) -> HttpResponse: return render(request, "auth/profile.html") urlpatterns = [ url(r"^oidc/login/$", oidc_login, name="oidc-login"), url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), url( r"^oidc/generate-bearer-token-complete/$", oidc_generate_bearer_token_complete, name="oidc-generate-bearer-token-complete", ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, name="oidc-list-bearer-tokens", ), url( r"^oidc/get-bearer-token/$", oidc_get_bearer_token, name="oidc-get-bearer-token", ), url( r"^oidc/revoke-bearer-tokens/$", oidc_revoke_bearer_tokens, name="oidc-revoke-bearer-tokens", ), url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), ] diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py index f65c17f1..c71ec520 100644 --- a/swh/web/tests/auth/test_api_auth.py +++ b/swh/web/tests/auth/test_api_auth.py @@ -1,105 +1,105 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.contrib.auth.models import AnonymousUser, User -from swh.web.auth.models import OIDCUser +from swh.auth.django.models import OIDCUser from swh.web.common.utils import reverse from swh.web.tests.utils import check_api_get_responses, check_http_get_response @pytest.mark.django_db def test_drf_django_session_auth_success(keycloak_mock, client): """ Check user gets authenticated when querying the web api through a web browser. """ url = reverse("api-1-stat-counters") client.login(code="", code_verifier="", redirect_uri="") response = check_http_get_response(client, url, status_code=200) request = response.wsgi_request # user should be authenticated assert isinstance(request.user, OIDCUser) # check remoter used has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_success(keycloak_mock, api_client): """ Check user gets authenticated when querying the web api through an HTTP client using bearer token authentication. """ url = reverse("api-1-stat-counters") oidc_profile = keycloak_mock.login() refresh_token = oidc_profile["refresh_token"] api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") response = check_api_get_responses(api_client, url, status_code=200) request = response.wsgi_request # user should be authenticated assert isinstance(request.user, OIDCUser) # check remoter used has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_failure(keycloak_mock, api_client): url = reverse("api-1-stat-counters") oidc_profile = keycloak_mock.login() refresh_token = oidc_profile["refresh_token"] # check for failed authentication but with expected token format keycloak_mock.set_auth_success(False) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) # check for failed authentication when token format is invalid api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà") response = check_api_get_responses(api_client, url, status_code=400) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_mock, api_client): url = reverse("api-1-stat-counters") oidc_profile = keycloak_mock.login() refresh_token = oidc_profile["refresh_token"] # missing authorization type api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}") response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) # invalid authorization type api_client.credentials(HTTP_AUTHORIZATION="Foo token") response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py index c38b953e..37cad77b 100644 --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -1,271 +1,271 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from unittest.mock import Mock import pytest from django.conf import settings from django.contrib.auth import authenticate, get_backends from rest_framework.exceptions import AuthenticationFailed +from swh.auth.django.models import OIDCUser from swh.web.auth.backends import OIDCBearerTokenAuthentication -from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse def _authenticate_user(request_factory): request = request_factory.get(reverse("oidc-login-complete")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, kc_oidc_mock): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(kc_oidc_mock, {}) assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(keycloak_mock, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_mock.user_groups = ["/staff"] oidc_profile = keycloak_mock.login() user = _authenticate_user(request_factory) decoded_token = keycloak_mock.decode_token(user.access_token) _check_authenticated_user(user, decoded_token, keycloak_mock) auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(keycloak_mock, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ keycloak_mock.set_auth_success(False) user = _authenticate_user(request_factory) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_success( keycloak_mock, request_factory ): """ Checks access token renewal success using refresh token. """ oidc_profile = keycloak_mock.login() decoded_token = keycloak_mock.decode_token(oidc_profile["access_token"]) new_access_token = "new_access_token" def _refresh_token(refresh_token): oidc_profile = dict(keycloak_mock.login()) oidc_profile["access_token"] = new_access_token return oidc_profile def _decode_token(access_token): if access_token != new_access_token: raise Exception("access token token has expired") else: return decoded_token keycloak_mock.decode_token = Mock() keycloak_mock.decode_token.side_effect = _decode_token keycloak_mock.refresh_token.side_effect = _refresh_token user = _authenticate_user(request_factory) oidc_profile = keycloak_mock.login() keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is not None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_refresh_token_failure( keycloak_mock, request_factory ): """ Checks access token renewal failure using refresh token. """ def _refresh_token(refresh_token): raise Exception("OIDC session has expired") def _decode_token(access_token): raise Exception("access token token has expired") keycloak_mock.decode_token = Mock() keycloak_mock.decode_token.side_effect = _decode_token keycloak_mock.refresh_token.side_effect = _refresh_token user = _authenticate_user(request_factory) oidc_profile = keycloak_mock.login() keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is None @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(keycloak_mock, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ permission = "webapp.some-permission" keycloak_mock.user_permissions = [permission] user = _authenticate_user(request_factory) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(keycloak_mock, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_mock.login() refresh_token = oidc_profile["refresh_token"] access_token = oidc_profile["access_token"] decoded_token = keycloak_mock.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) _check_authenticated_user(user, decoded_token, keycloak_mock) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_mock, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_mock.login() # simulate a failed authentication with a bearer token in expected format keycloak_mock.set_auth_success(False) refresh_token = oidc_profile["refresh_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_mock, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid authorization header value. """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_mock.login() refresh_token = oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_permissions( keycloak_mock, api_request_factory ): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ permission = "webapp.some-permission" keycloak_mock.user_permissions = [permission] drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_mock.login() refresh_token = oidc_profile["refresh_token"] url = reverse("api-1-stat-counters") request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index 8a6abcc4..2cc37dda 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,500 +1,501 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid import pytest from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict -from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens +from swh.auth.django.models import OIDCUser +from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.utils import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.urls import _default_view as homepage_view def _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri, scope="openid" ): parsed_url = urlparse(response["location"]) authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] return login_data @pytest.mark.django_db def test_oidc_login_views_success(client, keycloak_mock): """ Simulate a successful login authentication with OpenID Connect authorization code flow with PKCE. """ # user initiates login process login_url = reverse("oidc-login") # should redirect to Keycloak authentication page in order # for a user to login with its username / password response = check_html_get_response(client, login_url, status_code=302) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) login_data = _check_oidc_login_code_flow_data( request, response, keycloak_mock, redirect_uri=reverse("oidc-login-complete", request=request), ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-login-complete' view to # login in Django. # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) # login process finalization, should redirect to root url by default response = check_html_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request assert response["location"] == request.build_absolute_uri("/") # user should be authenticated assert isinstance(request.user, OIDCUser) # check remote user has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_oidc_logout_view_success(client, keycloak_mock): """ Simulate a successful logout operation with OpenID Connect. """ # login our test user client.login(code="", code_verifier="", redirect_uri="") keycloak_mock.authorization_code.assert_called() # user initiates logout oidc_logout_url = reverse("oidc-logout") # should redirect to logout page response = check_html_get_response(client, oidc_logout_url, status_code=302) request = response.wsgi_request logout_url = reverse("logout", query_params={"remote_user": 1}) assert response["location"] == request.build_absolute_uri(logout_url) # should have been logged out in Keycloak oidc_profile = keycloak_mock.login() keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"]) # check effective logout in Django assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_view_failure(client, keycloak_mock): """ Simulate a failed authentication with OpenID Connect. """ keycloak_mock.set_auth_success(False) # user initiates login process login_url = reverse("oidc-login") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request # no users should be logged in assert isinstance(request.user, AnonymousUser) # Simulate possible errors with OpenID Connect in the login complete view. def test_oidc_login_complete_view_no_login_data(client, mocker): # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) assert_contains( response, "Login process has not been initialized.", status_code=500 ) def test_oidc_login_complete_view_missing_parameters(client, mocker): # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", } session.save() # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Missing query parameters for authentication.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock): # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", } session.save() # user initiates login process login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} ) # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Wrong CSRF token, aborting login process.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock): keycloak_mock.set_auth_success(False) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", } session.save() # check authentication error is reported login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": session["login_data"]["state"]}, ) # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, "User authentication failed.", status_code=500) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_logout_view_failure(client, keycloak_mock): """ Simulate a failed logout operation with OpenID Connect. """ # login our test user client.login(code="", code_verifier="", redirect_uri="") err_msg = "Authentication server error" keycloak_mock.logout.side_effect = Exception(err_msg) # user initiates logout process logout_url = reverse("oidc-logout") # should render an error page response = check_html_get_response( client, logout_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, err_msg, status_code=500) # user should be logged out from Django anyway assert isinstance(request.user, AnonymousUser) def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") check_http_get_response(client, url, status_code=403) def _generate_and_test_bearer_token(client, kc_oidc_mock): # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") response = check_http_get_response(client, url, status_code=302) request = response.wsgi_request redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) # check login data and redirection to Keycloak is valid login_data = _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri=redirect_uri, scope="openid offline_access", ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-generate-bearer-token-complete' view # to get and save bearer token # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) token_complete_url = reverse( "oidc-generate-bearer-token-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) nb_tokens = len(OIDCUserOfflineTokens.objects.all()) response = check_html_get_response(client, token_complete_url, status_code=302) request = response.wsgi_request # check token has been generated and saved encrypted to database assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token secret = get_config()["secret_key"].encode() salt = request.user.sub.encode() decrypted_token = decrypt_data(encrypted_token, secret, salt) oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] # should redirect to tokens management Web UI assert response["location"] == reverse("oidc-profile") + "#tokens" return decrypted_token @pytest.mark.django_db def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ _generate_and_test_bearer_token(client, keycloak_mock) def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_list_bearer_tokens(client, keycloak_mock): """ User with correct credentials should be allowed to list his tokens. """ nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, keycloak_mock) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_get_bearer_token(client, keycloak_mock): """ User with correct credentials should be allowed to display a token. """ nb_tokens = 3 for i in range(nb_tokens): token = _generate_and_test_bearer_token(client, keycloak_mock) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_revoke_bearer_tokens(client, keycloak_mock): """ User with correct credentials should be allowed to revoke tokens. """ nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, keycloak_mock) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when requesting profile view. """ url = reverse("oidc-profile") login_url = reverse("oidc-login", query_params={"next_path": url}) resp = check_html_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db def test_oidc_profile_view(client, keycloak_mock): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. """ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] user_permissions = ["perm1", "perm2"] keycloak_mock.user_permissions = user_permissions client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" ) user = resp.wsgi_request.user kc_account_url = ( f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" ) assert_contains(resp, kc_account_url) assert_contains(resp, user.username) assert_contains(resp, user.first_name) assert_contains(resp, user.last_name) assert_contains(resp, user.email) for perm in user_permissions: assert_contains(resp, perm)