diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index 0a83017e..0a3aaaf9 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,178 +1,190 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import hashlib from typing import Any, Dict, Optional import sentry_sdk from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed, ValidationError from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.models import OIDCUser from swh.web.auth.utils import get_oidc_client # OpenID Connect client to communicate with Keycloak server _oidc_client: KeycloakOpenIDConnect = get_oidc_client() def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token["preferred_username"], password="", first_name=decoded_token["given_name"], last_name=decoded_token["family_name"], email=decoded_token["email"], ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(_oidc_client.client_id, {}) user.permissions = set(client_resource_access.get("roles", [])) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: # decode JWT token - decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) + try: + access_token = oidc_profile["access_token"] + decoded_token = _oidc_client.decode_token(access_token) + # access token has expired or is invalid + except Exception: + # get a new access token from authentication provider + oidc_profile = _oidc_client.refresh_token(oidc_profile["refresh_token"]) + # decode access token + decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = _oidc_user_from_decoded_token(decoded_token) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) + # put OIDC profile in cache or update it after token renewal + cache_key = f"oidc_user_{user.id}" + if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]: + # set cache key TTL as refresh token expiration time + assert user.refresh_expires_at + ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) + + # save oidc_profile in cache + cache.set(cache_key, oidc_profile, timeout=max(0, ttl)) + return user class OIDCAuthorizationCodePKCEBackend: def authenticate( self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str ) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = _oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier ) # create Django user user = _oidc_user_from_profile(oidc_profile) - # set cache key TTL as access token expiration time - assert user.expires_at - ttl = int(user.expires_at.timestamp() - timezone.now().timestamp()) - - # save oidc_profile in cache - cache.set(f"oidc_user_{user.id}", oidc_profile, timeout=max(0, ttl)) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get oidc profile from cache oidc_profile = cache.get(f"oidc_user_{user_id}") if oidc_profile: try: user = _oidc_user_from_profile(oidc_profile) # restore auth backend setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): def authenticate(self, request): auth_header = request.META.get("HTTP_AUTHORIZATION") if auth_header is None: return None try: auth_type, refresh_token = auth_header.split(" ", 1) except ValueError: raise AuthenticationFailed("Invalid HTTP authorization header format") if auth_type != "Bearer": raise AuthenticationFailed( (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") ) try: # compute a cache key from the token that does not exceed # memcached key size limit hasher = hashlib.sha1() hasher.update(refresh_token.encode("ascii")) cache_key = f"api_token_{hasher.hexdigest()}" # check if an access token is cached access_token = cache.get(cache_key) # attempt to decode access token try: decoded_token = _oidc_client.decode_token(access_token) except Exception: # access token is None or it has expired decoded_token = None if access_token is None or decoded_token is None: # get a new access token from authentication provider access_token = _oidc_client.refresh_token(refresh_token)["access_token"] # decode access token decoded_token = _oidc_client.decode_token(access_token) # compute access token expiration exp = datetime.fromtimestamp(decoded_token["exp"]) ttl = int(exp.timestamp() - timezone.now().timestamp()) # save access token in cache while it is valid cache.set(cache_key, access_token, timeout=max(0, ttl)) # create Django user user = _oidc_user_from_decoded_token(decoded_token) except UnicodeEncodeError as e: sentry_sdk.capture_exception(e) raise ValidationError("Invalid bearer token") except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py index f750138e..2d8e166e 100644 --- a/swh/web/auth/middlewares.py +++ b/swh/web/auth/middlewares.py @@ -1,44 +1,42 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.contrib.auth import BACKEND_SESSION_KEY from django.http.response import HttpResponseRedirect from swh.web.common.utils import reverse -class OIDCSessionRefreshMiddleware: +class OIDCSessionExpiredMiddleware: """ - Middleware for silently refreshing on OpenID Connect session from - the browser and get new access token. + Middleware for checking OIDC user session expiration. """ def __init__(self, get_response=None): self.get_response = get_response self.exempted_urls = [ reverse(v) for v in ("logout", "oidc-login", "oidc-login-complete", "oidc-logout") ] def __call__(self, request): if ( request.method != "GET" or request.user.is_authenticated or BACKEND_SESSION_KEY not in request.session or "OIDC" not in request.session[BACKEND_SESSION_KEY] or request.path in self.exempted_urls ): return self.get_response(request) - # At that point, we know that a OIDC user was previously logged in. - # Access token has expired so we attempt a silent OIDC session refresh. - # If the latter failed because the session expired, user will be - # redirected to logout page and a link will be offered to login again. - # See implementation of "oidc-login-complete" view for more details. + # At that point, we know that a OIDC user was previously logged in + # and his session has expired. + # User will be redirected to logout page and a link will be offered to + # login again. next_path = request.get_full_path() - redirect_url = reverse( - "oidc-login", query_params={"next_path": next_path, "prompt": "none"} + logout_url = reverse( + "logout", query_params={"next_path": next_path, "remote_user": 1} ) - return HttpResponseRedirect(redirect_url) + return HttpResponseRedirect(logout_url) diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index 02fe63d6..e68d558e 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,281 +1,272 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, cast import uuid from cryptography.fernet import InvalidToken from django.conf.urls import url from django.contrib.auth import authenticate, login, logout from django.contrib.auth.decorators import login_required from django.core.cache import cache from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseForbidden, HttpResponseRedirect, JsonResponse, ) from django.shortcuts import render from django.views.decorators.http import require_http_methods from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens from swh.web.auth.utils import ( decrypt_data, encrypt_data, gen_oidc_pkce_codes, get_oidc_client, ) from swh.web.common.exc import BadInputExc, ForbiddenExc from swh.web.common.utils import reverse from swh.web.config import get_config def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"): # generate a CSRF token state = str(uuid.uuid4()) code_verifier, code_challenge = gen_oidc_pkce_codes() request.session["login_data"] = { "code_verifier": code_verifier, "state": state, "redirect_uri": redirect_uri, "next_path": request.GET.get("next_path", ""), - "prompt": request.GET.get("prompt", ""), } authorization_url_params = { "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", "scope": scope, - "prompt": request.GET.get("prompt", ""), } oidc_client = get_oidc_client() authorization_url = oidc_client.authorization_url( redirect_uri, **authorization_url_params ) return HttpResponseRedirect(authorization_url) def oidc_login(request: HttpRequest) -> HttpResponse: """ Django view to initiate login process using OpenID Connect. """ redirect_uri = reverse("oidc-login-complete", request=request) return _oidc_login(request, redirect_uri=redirect_uri) def _get_login_data(request: HttpRequest) -> Dict[str, Any]: if "login_data" not in request.session: raise Exception("Login process has not been initialized.") return request.session["login_data"] def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]): if "code" not in request.GET or "state" not in request.GET: raise BadInputExc("Missing query parameters for authentication.") # get CSRF token returned by OIDC server state = request.GET["state"] if state != login_data["state"]: raise BadInputExc("Wrong CSRF token, aborting login process.") def oidc_login_complete(request: HttpRequest) -> HttpResponse: """ Django view to finalize login process using OpenID Connect. """ login_data = _get_login_data(request) next_path = login_data["next_path"] or request.build_absolute_uri("/") - if "error" in request.GET and login_data["prompt"] == "none": - # Silent login failed because OIDC session expired. - # Redirect to logout page and inform user. - logout(request) - logout_url = reverse( - "logout", query_params={"next_path": next_path, "remote_user": 1} - ) - return HttpResponseRedirect(logout_url) - elif "error" in request.GET: + + if "error" in request.GET: raise Exception(request.GET["error"]) _check_login_data(request, login_data) user = authenticate( request=request, code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) if user is None: raise Exception("User authentication failed.") login(request, user) return HttpResponseRedirect(next_path) def oidc_logout(request: HttpRequest) -> HttpResponse: """ Django view to logout using OpenID Connect. """ user = request.user logout(request) if hasattr(user, "refresh_token"): oidc_client = get_oidc_client() user = cast(OIDCUser, user) refresh_token = cast(str, user.refresh_token) # end OpenID Connect session oidc_client.logout(refresh_token) # remove user data from cache cache.delete(f"oidc_user_{user.id}") logout_url = reverse("logout", query_params={"remote_user": 1}) return HttpResponseRedirect(request.build_absolute_uri(logout_url)) def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) return _oidc_login( request, redirect_uri=redirect_uri, scope="openid offline_access" ) def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): raise ForbiddenExc("You are not allowed to generate bearer tokens.") if "error" in request.GET: raise Exception(request.GET["error"]) oidc_client = get_oidc_client() login_data = _get_login_data(request) _check_login_data(request, login_data) oidc_profile = oidc_client.authorization_code( code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) user = cast(OIDCUser, request.user) token = oidc_profile["refresh_token"] secret = get_config()["secret_key"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), secret, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, secret, salt) return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") except InvalidToken: return HttpResponse(status=401) @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, secret, salt) oidc_client = get_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) @login_required(login_url="/oidc/login/", redirect_field_name="next_path") def _oidc_profile_view(request: HttpRequest) -> HttpResponse: return render(request, "auth/profile.html") urlpatterns = [ url(r"^oidc/login/$", oidc_login, name="oidc-login"), url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), url( r"^oidc/generate-bearer-token-complete/$", oidc_generate_bearer_token_complete, name="oidc-generate-bearer-token-complete", ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, name="oidc-list-bearer-tokens", ), url( r"^oidc/get-bearer-token/$", oidc_get_bearer_token, name="oidc-get-bearer-token", ), url( r"^oidc/revoke-bearer-tokens/$", oidc_revoke_bearer_tokens, name="oidc-revoke-bearer-tokens", ), url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), ] diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py index 3a0c2a1f..cd9f5040 100644 --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -1,282 +1,282 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django common settings for swh-web. """ import os import sys from typing import Any, Dict from swh.web.config import get_config swh_web_config = get_config() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = swh_web_config["secret_key"] # SECURITY WARNING: don't run with debug turned on in production! DEBUG = swh_web_config["debug"] DEBUG_PROPAGATE_EXCEPTIONS = swh_web_config["debug"] ALLOWED_HOSTS = ["127.0.0.1", "localhost"] + swh_web_config["allowed_hosts"] # Application definition INSTALLED_APPS = [ "django.contrib.admin", "django.contrib.auth", "django.contrib.contenttypes", "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", "rest_framework", "swh.web.common", "swh.web.api", "swh.web.auth", "swh.web.browse", "webpack_loader", "django_js_reverse", "corsheaders", ] MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "corsheaders.middleware.CorsMiddleware", "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", - "swh.web.auth.middlewares.OIDCSessionRefreshMiddleware", + "swh.web.auth.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", "swh.web.common.middlewares.ThrottlingHeadersMiddleware", "swh.web.common.middlewares.ExceptionMiddleware", ] # Compress all assets (static ones and dynamically generated html) # served by django in a local development environment context. # In a production environment, assets compression will be directly # handled by web servers like apache or nginx. if swh_web_config["serve_assets"]: MIDDLEWARE.insert(0, "django.middleware.gzip.GZipMiddleware") ROOT_URLCONF = "swh.web.urls" TEMPLATES = [ { "BACKEND": "django.template.backends.django.DjangoTemplates", "DIRS": [os.path.join(PROJECT_DIR, "../templates")], "APP_DIRS": True, "OPTIONS": { "context_processors": [ "django.template.context_processors.debug", "django.template.context_processors.request", "django.contrib.auth.context_processors.auth", "django.contrib.messages.context_processors.messages", "swh.web.common.utils.context_processor", ], "libraries": {"swh_templatetags": "swh.web.common.swh_templatetags",}, }, }, ] DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": swh_web_config["development_db"], } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator", # noqa }, {"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",}, {"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",}, {"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",}, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = "en-us" TIME_ZONE = "UTC" USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = "/static/" # static folder location when swh-web has been installed with pip STATIC_DIR = os.path.join(sys.prefix, "share/swh/web/static") if not os.path.exists(STATIC_DIR): # static folder location when developping swh-web STATIC_DIR = os.path.join(PROJECT_DIR, "../../../static") STATICFILES_DIRS = [STATIC_DIR] INTERNAL_IPS = ["127.0.0.1"] throttle_rates = {} http_requests = ["GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"] throttling = swh_web_config["throttling"] for limiter_scope, limiter_conf in throttling["scopes"].items(): if "default" in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"]["default"] # for backward compatibility else: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"] # register sub scopes specific for HTTP request types for http_request in http_requests: if http_request in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope + "_" + http_request.lower()] = limiter_conf[ "limiter_rate" ][http_request] REST_FRAMEWORK: Dict[str, Any] = { "DEFAULT_RENDERER_CLASSES": ( "rest_framework.renderers.JSONRenderer", "swh.web.api.renderers.YAMLRenderer", "rest_framework.renderers.TemplateHTMLRenderer", ), "DEFAULT_THROTTLE_CLASSES": ("swh.web.api.throttling.SwhWebRateThrottle",), "DEFAULT_THROTTLE_RATES": throttle_rates, "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", "swh.web.auth.backends.OIDCBearerTokenAuthentication", ], "EXCEPTION_HANDLER": "swh.web.api.apiresponse.error_response_handler", } LOGGING = { "version": 1, "disable_existing_loggers": False, "filters": { "require_debug_false": {"()": "django.utils.log.RequireDebugFalse",}, "require_debug_true": {"()": "django.utils.log.RequireDebugTrue",}, }, "formatters": { "request": { "format": "[%(asctime)s] [%(levelname)s] %(request)s %(status_code)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "simple": { "format": "[%(asctime)s] [%(levelname)s] %(message)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "verbose": { "format": ( "[%(asctime)s] [%(levelname)s] %(name)s.%(funcName)s:%(lineno)s " "- %(message)s" ), "datefmt": "%d/%b/%Y %H:%M:%S", }, }, "handlers": { "console": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "simple", }, "file": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "simple", }, "file_request": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "request", }, "console_verbose": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "verbose", }, "file_verbose": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "verbose", }, "null": {"class": "logging.NullHandler",}, }, "loggers": { "": { "handlers": ["console_verbose", "file_verbose"], "level": "DEBUG" if DEBUG else "WARNING", }, "django": { "handlers": ["console"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.request": { "handlers": ["file_request"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.db.backends": {"handlers": ["null"], "propagate": False}, "django.utils.autoreload": {"level": "INFO",}, }, } WEBPACK_LOADER = { "DEFAULT": { "CACHE": False, "BUNDLE_DIR_NAME": "./", "STATS_FILE": os.path.join(STATIC_DIR, "webpack-stats.json"), "POLL_INTERVAL": 0.1, "TIMEOUT": None, "IGNORE": [".+\\.hot-update.js", ".+\\.map"], } } LOGIN_URL = "/admin/login/" LOGIN_REDIRECT_URL = "admin" SESSION_ENGINE = "django.contrib.sessions.backends.cache" CACHES = { "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}, } JS_REVERSE_JS_MINIFY = False CORS_ORIGIN_ALLOW_ALL = True CORS_URLS_REGEX = r"^/(badge|api)/.*$" AUTHENTICATION_BACKENDS = [ "django.contrib.auth.backends.ModelBackend", "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend", ] diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py index 24fcdf92..b60e34b3 100644 --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -1,206 +1,268 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta +from unittest.mock import Mock import pytest from django.conf import settings from django.contrib.auth import authenticate, get_backends from rest_framework.exceptions import AuthenticationFailed from swh.web.auth.backends import OIDCBearerTokenAuthentication from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse from . import sample_data from .keycloak_mock import mock_keycloak def _authenticate_user(request_factory): request = request_factory.get(reverse("oidc-login-complete")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, kc_oidc_mock): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(kc_oidc_mock, {}) assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(mocker, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ kc_oidc_mock = mock_keycloak(mocker, user_groups=["/staff"]) oidc_profile = sample_data.oidc_profile user = _authenticate_user(request_factory) decoded_token = kc_oidc_mock.decode_token(user.access_token) _check_authenticated_user(user, decoded_token, kc_oidc_mock) auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ mock_keycloak(mocker, auth_success=False) user = _authenticate_user(request_factory) assert user is None +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_refresh_token_success(mocker, request_factory): + """ + Checks access token renewal success using refresh token. + """ + kc_oidc_mock = mock_keycloak(mocker) + + oidc_profile = sample_data.oidc_profile + decoded_token = kc_oidc_mock.decode_token(oidc_profile["access_token"]) + new_access_token = "new_access_token" + + def _refresh_token(refresh_token): + oidc_profile = dict(sample_data.oidc_profile) + oidc_profile["access_token"] = new_access_token + return oidc_profile + + def _decode_token(access_token): + if access_token != new_access_token: + raise Exception("access token token has expired") + else: + return decoded_token + + kc_oidc_mock.decode_token = Mock() + kc_oidc_mock.decode_token.side_effect = _decode_token + kc_oidc_mock.refresh_token.side_effect = _refresh_token + + user = _authenticate_user(request_factory) + + kc_oidc_mock.refresh_token.assert_called_with( + sample_data.oidc_profile["refresh_token"] + ) + + assert user is not None + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_refresh_token_failure(mocker, request_factory): + """ + Checks access token renewal failure using refresh token. + """ + kc_oidc_mock = mock_keycloak(mocker) + + def _refresh_token(refresh_token): + raise Exception("OIDC session has expired") + + def _decode_token(access_token): + raise Exception("access token token has expired") + + kc_oidc_mock.decode_token = Mock() + kc_oidc_mock.decode_token.side_effect = _decode_token + kc_oidc_mock.refresh_token.side_effect = _refresh_token + + user = _authenticate_user(request_factory) + + kc_oidc_mock.refresh_token.assert_called_with( + sample_data.oidc_profile["refresh_token"] + ) + + assert user is None + + @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ permission = "webapp.some-permission" mock_keycloak(mocker, user_permissions=[permission]) user = _authenticate_user(request_factory) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() kc_oidc_mock = mock_keycloak(mocker) refresh_token = sample_data.oidc_profile["refresh_token"] access_token = sample_data.oidc_profile["access_token"] decoded_token = kc_oidc_mock.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) _check_authenticated_user(user, decoded_token, kc_oidc_mock) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() # simulate a failed authentication with a bearer token in expected format mock_keycloak(mocker, auth_success=False) refresh_token = sample_data.oidc_profile["refresh_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid authorization header value. """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() refresh_token = sample_data.oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) @pytest.mark.django_db def test_drf_oidc_bearer_token_auth_backend_permissions(mocker, api_request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ permission = "webapp.some-permission" mock_keycloak(mocker, user_permissions=[permission]) drf_auth_backend = OIDCBearerTokenAuthentication() refresh_token = sample_data.oidc_profile["refresh_token"] url = reverse("api-1-stat-counters") request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py index 1c19007e..e72f40b5 100644 --- a/swh/web/tests/auth/test_middlewares.py +++ b/swh/web/tests/auth/test_middlewares.py @@ -1,47 +1,60 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime import pytest +from django.core.cache import cache from django.test import modify_settings from swh.web.common.utils import reverse from swh.web.tests.utils import check_html_get_response from .keycloak_mock import mock_keycloak @pytest.mark.django_db @modify_settings( - MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionRefreshMiddleware"]} + MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]} ) -def test_oidc_session_refresh_middleware_disabled(client, mocker): - # authenticate but make session expires immediately - kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp())) +def test_oidc_session_expired_middleware_disabled(client, mocker): + # authenticate user + kc_oidc_mock = mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") - # no redirection for silent refresh + + # visit url first to get user from response + response = check_html_get_response(client, url, status_code=200) + + # simulate OIDC session expiration + cache.delete(f"oidc_user_{response.wsgi_request.user.id}") + + # no redirection when session has expired check_html_get_response(client, url, status_code=200) @pytest.mark.django_db -def test_oidc_session_refresh_middleware_enabled(client, mocker): - # authenticate but make session expires immediately - kc_oidc_mock = mock_keycloak(mocker, exp=int(datetime.now().timestamp())) +def test_oidc_session_expired_middleware_enabled(client, mocker): + # authenticate user + kc_oidc_mock = mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") - # should redirect for silent session refresh + # visit url first to get user from response + response = check_html_get_response(client, url, status_code=200) + + # simulate OIDC session expiration + cache.delete(f"oidc_user_{response.wsgi_request.user.id}") + + # should redirect to logout page resp = check_html_get_response(client, url, status_code=302) silent_refresh_url = reverse( - "oidc-login", query_params={"next_path": url, "prompt": "none"} + "logout", query_params={"next_path": url, "remote_user": 1} ) assert resp["location"] == silent_refresh_url diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index b9f9c11e..11c0be23 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,562 +1,518 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid import pytest from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.utils import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.urls import _default_view as homepage_view from . import sample_data from .keycloak_mock import mock_keycloak def _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri, scope="openid" ): parsed_url = urlparse(response["location"]) authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] return login_data @pytest.mark.django_db def test_oidc_login_views_success(client, mocker): """ Simulate a successful login authentication with OpenID Connect authorization code flow with PKCE. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # user initiates login process login_url = reverse("oidc-login") # should redirect to Keycloak authentication page in order # for a user to login with its username / password response = check_html_get_response(client, login_url, status_code=302) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) login_data = _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri=reverse("oidc-login-complete", request=request), ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-login-complete' view to # login in Django. # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) # login process finalization, should redirect to root url by default response = check_html_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request assert response["location"] == request.build_absolute_uri("/") # user should be authenticated assert isinstance(request.user, OIDCUser) # check remote user has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_oidc_logout_view_success(client, mocker): """ Simulate a successful logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() # user initiates logout oidc_logout_url = reverse("oidc-logout") # should redirect to logout page response = check_html_get_response(client, oidc_logout_url, status_code=302) request = response.wsgi_request logout_url = reverse("logout", query_params={"remote_user": 1}) assert response["location"] == request.build_absolute_uri(logout_url) # should have been logged out in Keycloak kc_oidc_mock.logout.assert_called_with(sample_data.oidc_profile["refresh_token"]) # check effective logout in Django assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_view_failure(client, mocker): """ Simulate a failed authentication with OpenID Connect. """ # mock Keycloak client mock_keycloak(mocker, auth_success=False) # user initiates login process login_url = reverse("oidc-login") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request # no users should be logged in assert isinstance(request.user, AnonymousUser) # Simulate possible errors with OpenID Connect in the login complete view. def test_oidc_login_complete_view_no_login_data(client, mocker): # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) assert_contains( response, "Login process has not been initialized.", status_code=500 ) def test_oidc_login_complete_view_missing_parameters(client, mocker): # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", - "prompt": "", } session.save() # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Missing query parameters for authentication.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) def test_oidc_login_complete_wrong_csrf_token(client, mocker): # mock Keycloak client mock_keycloak(mocker) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", - "prompt": "", } session.save() # user initiates login process login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} ) # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Wrong CSRF token, aborting login process.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_complete_wrong_code_verifier(client, mocker): # mock Keycloak client mock_keycloak(mocker, auth_success=False) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", - "prompt": "", } session.save() # check authentication error is reported login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": session["login_data"]["state"]}, ) # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, "User authentication failed.", status_code=500) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_logout_view_failure(client, mocker): """ Simulate a failed logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") err_msg = "Authentication server error" kc_oidc_mock.logout.side_effect = Exception(err_msg) # user initiates logout process logout_url = reverse("oidc-logout") # should render an error page response = check_html_get_response( client, logout_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, err_msg, status_code=500) # user should be logged out from Django anyway assert isinstance(request.user, AnonymousUser) -@pytest.mark.django_db -def test_oidc_silent_refresh_failure(client, mocker): - # mock Keycloak client - mock_keycloak(mocker) - - next_path = reverse("swh-web-homepage") - - # silent session refresh initialization - login_url = reverse( - "oidc-login", query_params={"next_path": next_path, "prompt": "none"} - ) - response = check_http_get_response(client, login_url, status_code=302) - request = response.wsgi_request - - login_data = request.session["login_data"] - - # check prompt value has been registered in user session - assert "prompt" in login_data - assert login_data["prompt"] == "none" - - # simulate a failed silent session refresh - session_state = str(uuid.uuid4()) - - login_complete_url = reverse( - "oidc-login-complete", - query_params={ - "error": "login_required", - "state": login_data["state"], - "session_state": session_state, - }, - ) - - # login process finalization, should redirect to logout page - response = check_http_get_response(client, login_complete_url, status_code=302) - request = response.wsgi_request - logout_url = reverse( - "logout", query_params={"next_path": next_path, "remote_user": 1} - ) - assert response["location"] == logout_url - - def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") check_http_get_response(client, url, status_code=403) def _generate_and_test_bearer_token(client, kc_oidc_mock): # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") response = check_http_get_response(client, url, status_code=302) request = response.wsgi_request redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) # check login data and redirection to Keycloak is valid login_data = _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri=redirect_uri, scope="openid offline_access", ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-generate-bearer-token-complete' view # to get and save bearer token # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) token_complete_url = reverse( "oidc-generate-bearer-token-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) nb_tokens = len(OIDCUserOfflineTokens.objects.all()) response = check_html_get_response(client, token_complete_url, status_code=302) request = response.wsgi_request # check token has been generated and saved encrypted to database assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token secret = get_config()["secret_key"].encode() salt = request.user.sub.encode() decrypted_token = decrypt_data(encrypted_token, secret, salt) oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] # should redirect to tokens management Web UI assert response["location"] == reverse("oidc-profile") + "#tokens" return decrypted_token @pytest.mark.django_db def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ kc_oidc_mock = mock_keycloak(mocker) _generate_and_test_bearer_token(client, kc_oidc_mock) def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_list_bearer_tokens(client, mocker): """ User with correct credentials should be allowed to list his tokens. """ kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_get_bearer_token(client, mocker): """ User with correct credentials should be allowed to display a token. """ kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for i in range(nb_tokens): token = _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_revoke_bearer_tokens(client, mocker): """ User with correct credentials should be allowed to revoke tokens. """ kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when requesting profile view. """ url = reverse("oidc-profile") login_url = reverse("oidc-login", query_params={"next_path": url}) resp = check_html_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db def test_oidc_profile_view(client, mocker): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. """ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] user_permissions = ["perm1", "perm2"] mock_keycloak(mocker, user_permissions=user_permissions) client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" ) user = resp.wsgi_request.user kc_account_url = ( f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" ) assert_contains(resp, kc_account_url) assert_contains(resp, user.username) assert_contains(resp, user.first_name) assert_contains(resp, user.last_name) assert_contains(resp, user.email) for perm in user_permissions: assert_contains(resp, perm)