diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -16,7 +16,6 @@ prometheus-client pybadges pygments -python-keycloak >= 0.19.0 python-magic >= 0.4.0 python-memcached pyyaml diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py deleted file mode 100644 --- a/swh/web/auth/backends.py +++ /dev/null @@ -1,190 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from datetime import datetime, timedelta -import hashlib -from typing import Any, Dict, Optional - -import sentry_sdk - -from django.core.cache import cache -from django.http import HttpRequest -from django.utils import timezone -from rest_framework.authentication import BaseAuthentication -from rest_framework.exceptions import AuthenticationFailed, ValidationError - -from swh.auth.django.models import OIDCUser -from swh.web.auth.utils import get_oidc_client - - -def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: - # compute an integer user identifier for Django User model - # by concatenating all groups of the UUID4 user identifier - # generated by Keycloak and converting it from hex to decimal - user_id = int("".join(decoded_token["sub"].split("-")), 16) - - # create a Django user that will not be saved to database - user = OIDCUser( - id=user_id, - username=decoded_token["preferred_username"], - password="", - first_name=decoded_token["given_name"], - last_name=decoded_token["family_name"], - email=decoded_token["email"], - ) - - # set is_staff user property based on groups - if "groups" in decoded_token: - user.is_staff = "/staff" in decoded_token["groups"] - - # extract user permissions if any - resource_access = decoded_token.get("resource_access", {}) - client_resource_access = resource_access.get(get_oidc_client().client_id, {}) - user.permissions = set(client_resource_access.get("roles", [])) - - # add user sub to custom User proxy model - user.sub = decoded_token["sub"] - - return user - - -def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: - - oidc_client = get_oidc_client() - - # decode JWT token - try: - access_token = oidc_profile["access_token"] - decoded_token = oidc_client.decode_token(access_token) - # access token has expired or is invalid - except Exception: - # get a new access token from authentication provider - oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) - # decode access token - decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) - - # create OIDCUser from decoded token - user = _oidc_user_from_decoded_token(decoded_token) - - # get authentication init datetime - auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) - exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) - - # compute OIDC tokens expiration date - oidc_profile["expires_at"] = exp_datetime - oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( - seconds=oidc_profile["refresh_expires_in"] - ) - - # add OIDC profile data to custom User proxy model - for key, val in oidc_profile.items(): - if hasattr(user, key): - setattr(user, key, val) - - # put OIDC profile in cache or update it after token renewal - cache_key = f"oidc_user_{user.id}" - if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]: - # set cache key TTL as refresh token expiration time - assert user.refresh_expires_at - ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) - - # save oidc_profile in cache - cache.set(cache_key, oidc_profile, timeout=max(0, ttl)) - - return user - - -class OIDCAuthorizationCodePKCEBackend: - def authenticate( - self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str - ) -> Optional[OIDCUser]: - - user = None - try: - # try to authenticate user with OIDC PKCE authorization code flow - oidc_profile = get_oidc_client().authorization_code( - code, redirect_uri, code_verifier=code_verifier - ) - - # create Django user - user = _oidc_user_from_profile(oidc_profile) - - except Exception as e: - sentry_sdk.capture_exception(e) - - return user - - def get_user(self, user_id: int) -> Optional[OIDCUser]: - # get oidc profile from cache - oidc_profile = cache.get(f"oidc_user_{user_id}") - if oidc_profile: - try: - user = _oidc_user_from_profile(oidc_profile) - # restore auth backend - setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") - return user - except Exception as e: - sentry_sdk.capture_exception(e) - return None - else: - return None - - -class OIDCBearerTokenAuthentication(BaseAuthentication): - def authenticate(self, request): - auth_header = request.META.get("HTTP_AUTHORIZATION") - if auth_header is None: - return None - - try: - auth_type, refresh_token = auth_header.split(" ", 1) - except ValueError: - raise AuthenticationFailed("Invalid HTTP authorization header format") - - if auth_type != "Bearer": - raise AuthenticationFailed( - (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") - ) - try: - - oidc_client = get_oidc_client() - - # compute a cache key from the token that does not exceed - # memcached key size limit - hasher = hashlib.sha1() - hasher.update(refresh_token.encode("ascii")) - cache_key = f"api_token_{hasher.hexdigest()}" - - # check if an access token is cached - access_token = cache.get(cache_key) - - # attempt to decode access token - try: - decoded_token = oidc_client.decode_token(access_token) - except Exception: - # access token is None or it has expired - decoded_token = None - - if access_token is None or decoded_token is None: - # get a new access token from authentication provider - access_token = oidc_client.refresh_token(refresh_token)["access_token"] - # decode access token - decoded_token = oidc_client.decode_token(access_token) - # compute access token expiration - exp = datetime.fromtimestamp(decoded_token["exp"]) - ttl = int(exp.timestamp() - timezone.now().timestamp()) - # save access token in cache while it is valid - cache.set(cache_key, access_token, timeout=max(0, ttl)) - - # create Django user - user = _oidc_user_from_decoded_token(decoded_token) - except UnicodeEncodeError as e: - sentry_sdk.capture_exception(e) - raise ValidationError("Invalid bearer token") - except Exception as e: - sentry_sdk.capture_exception(e) - raise AuthenticationFailed(str(e)) - - return user, None diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py deleted file mode 100644 --- a/swh/web/auth/middlewares.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from django.contrib.auth import BACKEND_SESSION_KEY -from django.http.response import HttpResponseRedirect - -from swh.web.common.utils import reverse - - -class OIDCSessionExpiredMiddleware: - """ - Middleware for checking OIDC user session expiration. - """ - - def __init__(self, get_response=None): - self.get_response = get_response - self.exempted_urls = [ - reverse(v) - for v in ("logout", "oidc-login", "oidc-login-complete", "oidc-logout") - ] - - def __call__(self, request): - if ( - request.method != "GET" - or request.user.is_authenticated - or BACKEND_SESSION_KEY not in request.session - or "OIDC" not in request.session[BACKEND_SESSION_KEY] - or request.path in self.exempted_urls - ): - return self.get_response(request) - - # At that point, we know that a OIDC user was previously logged in - # and his session has expired. - # User will be redirected to logout page and a link will be offered to - # login again. - next_path = request.get_full_path() - logout_url = reverse( - "logout", query_params={"next_path": next_path, "remote_user": 1} - ) - return HttpResponseRedirect(logout_url) diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py --- a/swh/web/auth/utils.py +++ b/swh/web/auth/utils.py @@ -4,45 +4,12 @@ # See top-level LICENSE file for more information from base64 import urlsafe_b64encode -import hashlib -import secrets -from typing import Dict, Tuple from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from swh.auth.keycloak import KeycloakOpenIDConnect -from swh.web.config import get_config - - -def gen_oidc_pkce_codes() -> Tuple[str, str]: - """ - Generates a code verifier and a code challenge to be used - with the OpenID Connect authorization code flow with PKCE - ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636). - - PKCE replaces the static secret used in the standard authorization - code flow with a temporary one-time challenge, making it feasible - to use in public clients. - - The implementation is inspired from that blog post: - https://www.stefaanlippens.net/oauth-code-flow-pkce.html - """ - # generate a code verifier which is a long enough random alphanumeric - # string, only to be used "client side" - code_verifier_str = secrets.token_urlsafe(60) - - # create the PKCE code challenge by hashing the code verifier with SHA256 - # and encoding the result in URL-safe base64 (without padding) - code_challenge = hashlib.sha256(code_verifier_str.encode("ascii")).digest() - code_challenge_str = urlsafe_b64encode(code_challenge).decode("ascii") - code_challenge_str = code_challenge_str.replace("=", "") - - return code_verifier_str, code_challenge_str - - OIDC_SWH_WEB_CLIENT_ID = "swh-web" @@ -101,28 +68,3 @@ The decrypted data """ return _get_fernet(password, salt).decrypt(data) - - -# stores instances of KeycloakOpenIDConnect class -# dict keys are (realm_name, client_id) tuples -_keycloak_oidc: Dict[str, KeycloakOpenIDConnect] = {} - - -def get_oidc_client(client_id: str = OIDC_SWH_WEB_CLIENT_ID) -> KeycloakOpenIDConnect: - """ - Instantiate a KeycloakOpenIDConnect class for a given client in the - SoftwareHeritage realm. - - Args: - client_id: client identifier in the SoftwareHeritage realm - - Returns: - An object to ease the interaction with the Keycloak server - """ - keycloak_config = get_config()["keycloak"] - - if client_id not in _keycloak_oidc: - _keycloak_oidc[client_id] = KeycloakOpenIDConnect( - keycloak_config["server_url"], keycloak_config["realm_name"], client_id - ) - return _keycloak_oidc[client_id] diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -5,14 +5,11 @@ import json from typing import Any, Dict, cast -import uuid from cryptography.fernet import InvalidToken from django.conf.urls import url -from django.contrib.auth import authenticate, login, logout from django.contrib.auth.decorators import login_required -from django.core.cache import cache from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( @@ -25,126 +22,21 @@ from django.views.decorators.http import require_http_methods from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import keycloak_oidc_client +from swh.auth.django.views import get_oidc_login_data, oidc_login_view +from swh.auth.django.views import urlpatterns as auth_urlpatterns from swh.web.auth.models import OIDCUserOfflineTokens -from swh.web.auth.utils import ( - decrypt_data, - encrypt_data, - gen_oidc_pkce_codes, - get_oidc_client, -) -from swh.web.common.exc import BadInputExc, ForbiddenExc +from swh.web.auth.utils import decrypt_data, encrypt_data +from swh.web.common.exc import ForbiddenExc from swh.web.common.utils import reverse from swh.web.config import get_config -def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"): - # generate a CSRF token - state = str(uuid.uuid4()) - - code_verifier, code_challenge = gen_oidc_pkce_codes() - - request.session["login_data"] = { - "code_verifier": code_verifier, - "state": state, - "redirect_uri": redirect_uri, - "next_path": request.GET.get("next_path", ""), - } - - authorization_url_params = { - "state": state, - "code_challenge": code_challenge, - "code_challenge_method": "S256", - "scope": scope, - } - - oidc_client = get_oidc_client() - authorization_url = oidc_client.authorization_url( - redirect_uri, **authorization_url_params - ) - - return HttpResponseRedirect(authorization_url) - - -def oidc_login(request: HttpRequest) -> HttpResponse: - """ - Django view to initiate login process using OpenID Connect. - """ - - redirect_uri = reverse("oidc-login-complete", request=request) - - return _oidc_login(request, redirect_uri=redirect_uri) - - -def _get_login_data(request: HttpRequest) -> Dict[str, Any]: - if "login_data" not in request.session: - raise Exception("Login process has not been initialized.") - - return request.session["login_data"] - - -def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]): - - if "code" not in request.GET or "state" not in request.GET: - raise BadInputExc("Missing query parameters for authentication.") - - # get CSRF token returned by OIDC server - state = request.GET["state"] - - if state != login_data["state"]: - raise BadInputExc("Wrong CSRF token, aborting login process.") - - -def oidc_login_complete(request: HttpRequest) -> HttpResponse: - """ - Django view to finalize login process using OpenID Connect. - """ - login_data = _get_login_data(request) - next_path = login_data["next_path"] or request.build_absolute_uri("/") - - if "error" in request.GET: - raise Exception(request.GET["error"]) - - _check_login_data(request, login_data) - - user = authenticate( - request=request, - code=request.GET["code"], - code_verifier=login_data["code_verifier"], - redirect_uri=login_data["redirect_uri"], - ) - - if user is None: - raise Exception("User authentication failed.") - - login(request, user) - - return HttpResponseRedirect(next_path) - - -def oidc_logout(request: HttpRequest) -> HttpResponse: - """ - Django view to logout using OpenID Connect. - """ - user = request.user - logout(request) - if hasattr(user, "refresh_token"): - oidc_client = get_oidc_client() - user = cast(OIDCUser, user) - refresh_token = cast(str, user.refresh_token) - # end OpenID Connect session - oidc_client.logout(refresh_token) - # remove user data from cache - cache.delete(f"oidc_user_{user.id}") - - logout_url = reverse("logout", query_params={"remote_user": 1}) - return HttpResponseRedirect(request.build_absolute_uri(logout_url)) - - def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) - return _oidc_login( + return oidc_login_view( request, redirect_uri=redirect_uri, scope="openid offline_access" ) @@ -155,9 +47,8 @@ if "error" in request.GET: raise Exception(request.GET["error"]) - oidc_client = get_oidc_client() - login_data = _get_login_data(request) - _check_login_data(request, login_data) + login_data = get_oidc_login_data(request) + oidc_client = keycloak_oidc_client() oidc_profile = oidc_client.authorization_code( code=request.GET["code"], code_verifier=login_data["code_verifier"], @@ -227,7 +118,7 @@ secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, secret, salt) - oidc_client = get_oidc_client() + oidc_client = keycloak_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) @@ -240,10 +131,7 @@ return render(request, "auth/profile.html") -urlpatterns = [ - url(r"^oidc/login/$", oidc_login, name="oidc-login"), - url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), - url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), +urlpatterns = auth_urlpatterns + [ url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -12,6 +12,7 @@ import sys from typing import Any, Dict +from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config swh_web_config = get_config() @@ -57,7 +58,7 @@ "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", - "swh.web.auth.middlewares.OIDCSessionExpiredMiddleware", + "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", "swh.web.common.middlewares.ThrottlingHeadersMiddleware", @@ -166,7 +167,7 @@ "DEFAULT_THROTTLE_RATES": throttle_rates, "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", - "swh.web.auth.backends.OIDCBearerTokenAuthentication", + "swh.auth.django.backends.OIDCBearerTokenAuthentication", ], "EXCEPTION_HANDLER": "swh.web.api.apiresponse.error_response_handler", } @@ -278,5 +279,10 @@ AUTHENTICATION_BACKENDS = [ "django.contrib.auth.backends.ModelBackend", - "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend", + "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", ] + +SWH_AUTH_SERVER_URL = swh_web_config["keycloak"]["server_url"] +SWH_AUTH_REALM_NAME = swh_web_config["keycloak"]["realm_name"] +SWH_AUTH_CLIENT_ID = OIDC_SWH_WEB_CLIENT_ID +SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout" diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html --- a/swh/web/templates/layout.html +++ b/swh/web/templates/layout.html @@ -118,7 +118,7 @@ Logged in as {% if 'OIDC' in user.backend %} {{ user.username }}, - logout + logout {% else %} {{ user.username }}, logout diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py --- a/swh/web/tests/api/views/test_graph.py +++ b/swh/web/tests/api/views/test_graph.py @@ -38,21 +38,21 @@ check_http_get_response(api_client, url, status_code=401) -def _authenticate_graph_user(api_client, keycloak_mock): - keycloak_mock.user_permissions = [API_GRAPH_PERM] - oidc_profile = keycloak_mock.login() +def _authenticate_graph_user(api_client, keycloak_oidc): + keycloak_oidc.user_permissions = [API_GRAPH_PERM] + oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") -def test_graph_endpoint_needs_permission(api_client, keycloak_mock, requests_mock): +def test_graph_endpoint_needs_permission(api_client, keycloak_oidc, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) - oidc_profile = keycloak_mock.login() + oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") check_http_get_response(api_client, url, status_code=403) - _authenticate_graph_user(api_client, keycloak_mock) + _authenticate_graph_user(api_client, keycloak_oidc) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, @@ -61,8 +61,8 @@ check_http_get_response(api_client, url, status_code=200) -def test_graph_text_plain_response(api_client, keycloak_mock, requests_mock): - _authenticate_graph_user(api_client, keycloak_mock) +def test_graph_text_plain_response(api_client, keycloak_oidc, requests_mock): + _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323" @@ -114,8 +114,8 @@ } -def test_graph_json_response(api_client, keycloak_mock, requests_mock): - _authenticate_graph_user(api_client, keycloak_mock) +def test_graph_json_response(api_client, keycloak_oidc, requests_mock): + _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "stats" @@ -132,8 +132,8 @@ assert resp.content == json.dumps(_response_json).encode() -def test_graph_ndjson_response(api_client, keycloak_mock, requests_mock): - _authenticate_graph_user(api_client, keycloak_mock) +def test_graph_ndjson_response(api_client, keycloak_oidc, requests_mock): + _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb" @@ -167,7 +167,7 @@ @given(origin()) def test_graph_response_resolve_origins( - archive_data, api_client, keycloak_mock, requests_mock, origin + archive_data, api_client, keycloak_oidc, requests_mock, origin ): hasher = hashlib.sha1() hasher.update(origin["url"].encode()) @@ -182,7 +182,7 @@ ) ) - _authenticate_graph_user(api_client, keycloak_mock) + _authenticate_graph_user(api_client, keycloak_oidc) for graph_query, response_text, content_type in ( ( @@ -238,9 +238,9 @@ def test_graph_response_resolve_origins_nothing_to_do( - api_client, keycloak_mock, requests_mock + api_client, keycloak_oidc, requests_mock ): - _authenticate_graph_user(api_client, keycloak_mock) + _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "stats" diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py deleted file mode 100644 --- a/swh/web/tests/auth/test_api_auth.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest - -from django.contrib.auth.models import AnonymousUser, User - -from swh.auth.django.models import OIDCUser -from swh.web.common.utils import reverse -from swh.web.tests.utils import check_api_get_responses, check_http_get_response - - -@pytest.mark.django_db -def test_drf_django_session_auth_success(keycloak_mock, client): - """ - Check user gets authenticated when querying the web api - through a web browser. - """ - url = reverse("api-1-stat-counters") - - client.login(code="", code_verifier="", redirect_uri="") - - response = check_http_get_response(client, url, status_code=200) - request = response.wsgi_request - - # user should be authenticated - assert isinstance(request.user, OIDCUser) - - # check remoter used has not been saved to Django database - with pytest.raises(User.DoesNotExist): - User.objects.get(username=request.user.username) - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_success(keycloak_mock, api_client): - """ - Check user gets authenticated when querying the web api - through an HTTP client using bearer token authentication. - """ - url = reverse("api-1-stat-counters") - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - - api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - - response = check_api_get_responses(api_client, url, status_code=200) - request = response.wsgi_request - - # user should be authenticated - assert isinstance(request.user, OIDCUser) - - # check remoter used has not been saved to Django database - with pytest.raises(User.DoesNotExist): - User.objects.get(username=request.user.username) - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_failure(keycloak_mock, api_client): - url = reverse("api-1-stat-counters") - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - - # check for failed authentication but with expected token format - keycloak_mock.set_auth_success(False) - api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - - response = check_api_get_responses(api_client, url, status_code=403) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - - # check for failed authentication when token format is invalid - api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà") - - response = check_api_get_responses(api_client, url, status_code=400) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - - -def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_mock, api_client): - url = reverse("api-1-stat-counters") - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - - # missing authorization type - api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}") - - response = check_api_get_responses(api_client, url, status_code=403) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - - # invalid authorization type - api_client.credentials(HTTP_AUTHORIZATION="Foo token") - - response = check_api_get_responses(api_client, url, status_code=403) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py deleted file mode 100644 --- a/swh/web/tests/auth/test_backends.py +++ /dev/null @@ -1,271 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from datetime import datetime, timedelta -from unittest.mock import Mock - -import pytest - -from django.conf import settings -from django.contrib.auth import authenticate, get_backends -from rest_framework.exceptions import AuthenticationFailed - -from swh.auth.django.models import OIDCUser -from swh.web.auth.backends import OIDCBearerTokenAuthentication -from swh.web.common.utils import reverse - - -def _authenticate_user(request_factory): - request = request_factory.get(reverse("oidc-login-complete")) - - return authenticate( - request=request, - code="some-code", - code_verifier="some-code-verifier", - redirect_uri="https://localhost:5004", - ) - - -def _check_authenticated_user(user, decoded_token, kc_oidc_mock): - assert user is not None - assert isinstance(user, OIDCUser) - assert user.id != 0 - assert user.username == decoded_token["preferred_username"] - assert user.password == "" - assert user.first_name == decoded_token["given_name"] - assert user.last_name == decoded_token["family_name"] - assert user.email == decoded_token["email"] - assert user.is_staff == ("/staff" in decoded_token["groups"]) - assert user.sub == decoded_token["sub"] - resource_access = decoded_token.get("resource_access", {}) - resource_access_client = resource_access.get(kc_oidc_mock, {}) - assert user.permissions == set(resource_access_client.get("roles", [])) - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_success(keycloak_mock, request_factory): - """ - Checks successful login based on OpenID Connect with PKCE extension - Django authentication backend (login from Web UI). - """ - keycloak_mock.user_groups = ["/staff"] - - oidc_profile = keycloak_mock.login() - user = _authenticate_user(request_factory) - - decoded_token = keycloak_mock.decode_token(user.access_token) - _check_authenticated_user(user, decoded_token, keycloak_mock) - - auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) - exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) - refresh_exp_datetime = auth_datetime + timedelta( - seconds=oidc_profile["refresh_expires_in"] - ) - - assert user.access_token == oidc_profile["access_token"] - assert user.expires_at == exp_datetime - assert user.id_token == oidc_profile["id_token"] - assert user.refresh_token == oidc_profile["refresh_token"] - assert user.refresh_expires_at == refresh_exp_datetime - assert user.scope == oidc_profile["scope"] - assert user.session_state == oidc_profile["session_state"] - - backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend" - assert user.backend == backend_path - backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) - assert get_backends()[backend_idx].get_user(user.id) == user - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_failure(keycloak_mock, request_factory): - """ - Checks failed login based on OpenID Connect with PKCE extension Django - authentication backend (login from Web UI). - """ - keycloak_mock.set_auth_success(False) - - user = _authenticate_user(request_factory) - - assert user is None - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_refresh_token_success( - keycloak_mock, request_factory -): - """ - Checks access token renewal success using refresh token. - """ - - oidc_profile = keycloak_mock.login() - decoded_token = keycloak_mock.decode_token(oidc_profile["access_token"]) - new_access_token = "new_access_token" - - def _refresh_token(refresh_token): - oidc_profile = dict(keycloak_mock.login()) - oidc_profile["access_token"] = new_access_token - return oidc_profile - - def _decode_token(access_token): - if access_token != new_access_token: - raise Exception("access token token has expired") - else: - return decoded_token - - keycloak_mock.decode_token = Mock() - keycloak_mock.decode_token.side_effect = _decode_token - keycloak_mock.refresh_token.side_effect = _refresh_token - - user = _authenticate_user(request_factory) - - oidc_profile = keycloak_mock.login() - keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) - - assert user is not None - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_refresh_token_failure( - keycloak_mock, request_factory -): - """ - Checks access token renewal failure using refresh token. - """ - - def _refresh_token(refresh_token): - raise Exception("OIDC session has expired") - - def _decode_token(access_token): - raise Exception("access token token has expired") - - keycloak_mock.decode_token = Mock() - keycloak_mock.decode_token.side_effect = _decode_token - keycloak_mock.refresh_token.side_effect = _refresh_token - - user = _authenticate_user(request_factory) - - oidc_profile = keycloak_mock.login() - keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) - - assert user is None - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_permissions(keycloak_mock, request_factory): - """ - Checks that a permission defined with OpenID Connect is correctly mapped - to a Django one when logging from Web UI. - """ - permission = "webapp.some-permission" - keycloak_mock.user_permissions = [permission] - user = _authenticate_user(request_factory) - assert user.has_perm(permission) - assert user.get_all_permissions() == {permission} - assert user.get_group_permissions() == {permission} - assert user.has_module_perms("webapp") - assert not user.has_module_perms("foo") - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_success(keycloak_mock, api_request_factory): - """ - Checks successful login based on OpenID Connect bearer token Django REST - Framework authentication backend (Web API login). - """ - url = reverse("api-1-stat-counters") - drf_auth_backend = OIDCBearerTokenAuthentication() - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - access_token = oidc_profile["access_token"] - - decoded_token = keycloak_mock.decode_token(access_token) - - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - - user, _ = drf_auth_backend.authenticate(request) - _check_authenticated_user(user, decoded_token, keycloak_mock) - # oidc_profile is not filled when authenticating through bearer token - assert hasattr(user, "access_token") and user.access_token is None - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_mock, api_request_factory): - """ - Checks failed login based on OpenID Connect bearer token Django REST - Framework authentication backend (Web API login). - """ - url = reverse("api-1-stat-counters") - drf_auth_backend = OIDCBearerTokenAuthentication() - - oidc_profile = keycloak_mock.login() - - # simulate a failed authentication with a bearer token in expected format - keycloak_mock.set_auth_success(False) - - refresh_token = oidc_profile["refresh_token"] - - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - - with pytest.raises(AuthenticationFailed): - drf_auth_backend.authenticate(request) - - # simulate a failed authentication with an invalid bearer token format - request = api_request_factory.get( - url, HTTP_AUTHORIZATION="Bearer invalid-token-format" - ) - - with pytest.raises(AuthenticationFailed): - drf_auth_backend.authenticate(request) - - -def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_mock, api_request_factory): - """ - Checks failed login based on OpenID Connect bearer token Django REST - Framework authentication backend (Web API login) due to invalid - authorization header value. - """ - url = reverse("api-1-stat-counters") - drf_auth_backend = OIDCBearerTokenAuthentication() - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - - # Invalid authorization type - request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") - - with pytest.raises(AuthenticationFailed): - drf_auth_backend.authenticate(request) - - # Missing authorization type - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") - - with pytest.raises(AuthenticationFailed): - drf_auth_backend.authenticate(request) - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_permissions( - keycloak_mock, api_request_factory -): - """ - Checks that a permission defined with OpenID Connect is correctly mapped - to a Django one when using bearer token authentication. - """ - permission = "webapp.some-permission" - keycloak_mock.user_permissions = [permission] - - drf_auth_backend = OIDCBearerTokenAuthentication() - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - url = reverse("api-1-stat-counters") - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - user, _ = drf_auth_backend.authenticate(request) - - assert user.has_perm(permission) - assert user.get_all_permissions() == {permission} - assert user.get_group_permissions() == {permission} - assert user.has_module_perms("webapp") - assert not user.has_module_perms("foo") diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py deleted file mode 100644 --- a/swh/web/tests/auth/test_middlewares.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -import pytest - -from django.core.cache import cache -from django.test import modify_settings - -from swh.web.common.utils import reverse -from swh.web.tests.utils import check_html_get_response - - -@pytest.mark.django_db -@modify_settings( - MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]} -) -def test_oidc_session_expired_middleware_disabled(client, keycloak_mock): - # authenticate user - - client.login(code="", code_verifier="", redirect_uri="") - keycloak_mock.authorization_code.assert_called() - - url = reverse("swh-web-homepage") - - # visit url first to get user from response - response = check_html_get_response(client, url, status_code=200) - - # simulate OIDC session expiration - cache.delete(f"oidc_user_{response.wsgi_request.user.id}") - - # no redirection when session has expired - check_html_get_response(client, url, status_code=200) - - -@pytest.mark.django_db -def test_oidc_session_expired_middleware_enabled(client, keycloak_mock): - # authenticate user - client.login(code="", code_verifier="", redirect_uri="") - keycloak_mock.authorization_code.assert_called() - - url = reverse("swh-web-homepage") - - # visit url first to get user from response - response = check_html_get_response(client, url, status_code=200) - - # simulate OIDC session expiration - cache.delete(f"oidc_user_{response.wsgi_request.user.id}") - - # should redirect to logout page - resp = check_html_get_response(client, url, status_code=302) - silent_refresh_url = reverse( - "logout", query_params={"next_path": url, "remote_user": 1} - ) - assert resp["location"] == silent_refresh_url diff --git a/swh/web/tests/auth/test_utils.py b/swh/web/tests/auth/test_utils.py --- a/swh/web/tests/auth/test_utils.py +++ b/swh/web/tests/auth/test_utils.py @@ -3,40 +3,11 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from base64 import urlsafe_b64encode -import hashlib -import re from cryptography.fernet import InvalidToken import pytest -from swh.web.auth.utils import decrypt_data, encrypt_data, gen_oidc_pkce_codes - - -def test_gen_oidc_pkce_codes(): - """ - Check generated PKCE codes respect the specification - (see https://tools.ietf.org/html/rfc7636#section-4.1) - """ - code_verifier, code_challenge = gen_oidc_pkce_codes() - - # check the code verifier only contains allowed characters - assert re.match(r"[a-zA-Z0-9-\._~]+", code_verifier) - - # check minimum and maximum authorized length for the - # code verifier - assert len(code_verifier) >= 43 - assert len(code_verifier) <= 128 - - # compute code challenge from code verifier - challenge = hashlib.sha256(code_verifier.encode("ascii")).digest() - challenge = urlsafe_b64encode(challenge).decode("ascii") - challenge = challenge.replace("=", "") - - # check base64 padding is not present - assert not code_challenge[-1].endswith("=") - # check code challenge is valid - assert code_challenge == challenge +from swh.web.auth.utils import decrypt_data, encrypt_data def test_encrypt_decrypt_data_ok(): diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,10 +9,8 @@ import pytest -from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict -from swh.auth.django.models import OIDCUser from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse @@ -27,11 +25,11 @@ def _check_oidc_login_code_flow_data( - request, response, kc_oidc_mock, redirect_uri, scope="openid" + request, response, keycloak_oidc, redirect_uri, scope="openid" ): parsed_url = urlparse(response["location"]) - authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] + authorization_url = keycloak_oidc.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid @@ -59,233 +57,6 @@ return login_data -@pytest.mark.django_db -def test_oidc_login_views_success(client, keycloak_mock): - """ - Simulate a successful login authentication with OpenID Connect - authorization code flow with PKCE. - """ - # user initiates login process - login_url = reverse("oidc-login") - - # should redirect to Keycloak authentication page in order - # for a user to login with its username / password - response = check_html_get_response(client, login_url, status_code=302) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - - login_data = _check_oidc_login_code_flow_data( - request, - response, - keycloak_mock, - redirect_uri=reverse("oidc-login-complete", request=request), - ) - - # once a user has identified himself in Keycloak, he is - # redirected to the 'oidc-login-complete' view to - # login in Django. - - # generate authorization code / session state in the same - # manner as Keycloak - code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" - session_state = str(uuid.uuid4()) - - login_complete_url = reverse( - "oidc-login-complete", - query_params={ - "code": code, - "state": login_data["state"], - "session_state": session_state, - }, - ) - - # login process finalization, should redirect to root url by default - response = check_html_get_response(client, login_complete_url, status_code=302) - request = response.wsgi_request - - assert response["location"] == request.build_absolute_uri("/") - - # user should be authenticated - assert isinstance(request.user, OIDCUser) - - # check remote user has not been saved to Django database - with pytest.raises(User.DoesNotExist): - User.objects.get(username=request.user.username) - - -@pytest.mark.django_db -def test_oidc_logout_view_success(client, keycloak_mock): - """ - Simulate a successful logout operation with OpenID Connect. - """ - # login our test user - client.login(code="", code_verifier="", redirect_uri="") - keycloak_mock.authorization_code.assert_called() - - # user initiates logout - oidc_logout_url = reverse("oidc-logout") - - # should redirect to logout page - response = check_html_get_response(client, oidc_logout_url, status_code=302) - request = response.wsgi_request - - logout_url = reverse("logout", query_params={"remote_user": 1}) - assert response["location"] == request.build_absolute_uri(logout_url) - - # should have been logged out in Keycloak - oidc_profile = keycloak_mock.login() - keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"]) - - # check effective logout in Django - assert isinstance(request.user, AnonymousUser) - - -@pytest.mark.django_db -def test_oidc_login_view_failure(client, keycloak_mock): - """ - Simulate a failed authentication with OpenID Connect. - """ - keycloak_mock.set_auth_success(False) - - # user initiates login process - login_url = reverse("oidc-login") - # should render an error page - response = check_html_get_response( - client, login_url, status_code=500, template_used="error.html" - ) - request = response.wsgi_request - - # no users should be logged in - assert isinstance(request.user, AnonymousUser) - - -# Simulate possible errors with OpenID Connect in the login complete view. - - -def test_oidc_login_complete_view_no_login_data(client, mocker): - # user initiates login process - login_url = reverse("oidc-login-complete") - # should render an error page - response = check_html_get_response( - client, login_url, status_code=500, template_used="error.html" - ) - - assert_contains( - response, "Login process has not been initialized.", status_code=500 - ) - - -def test_oidc_login_complete_view_missing_parameters(client, mocker): - # simulate login process has been initialized - session = client.session - session["login_data"] = { - "code_verifier": "", - "state": str(uuid.uuid4()), - "redirect_uri": "", - "next_path": "", - } - session.save() - - # user initiates login process - login_url = reverse("oidc-login-complete") - # should render an error page - response = check_html_get_response( - client, login_url, status_code=400, template_used="error.html" - ) - request = response.wsgi_request - assert_contains( - response, "Missing query parameters for authentication.", status_code=400 - ) - - # no user should be logged in - assert isinstance(request.user, AnonymousUser) - - -def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock): - # simulate login process has been initialized - session = client.session - session["login_data"] = { - "code_verifier": "", - "state": str(uuid.uuid4()), - "redirect_uri": "", - "next_path": "", - } - session.save() - - # user initiates login process - login_url = reverse( - "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} - ) - - # should render an error page - response = check_html_get_response( - client, login_url, status_code=400, template_used="error.html" - ) - request = response.wsgi_request - assert_contains( - response, "Wrong CSRF token, aborting login process.", status_code=400 - ) - - # no user should be logged in - assert isinstance(request.user, AnonymousUser) - - -@pytest.mark.django_db -def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock): - keycloak_mock.set_auth_success(False) - - # simulate login process has been initialized - session = client.session - session["login_data"] = { - "code_verifier": "", - "state": str(uuid.uuid4()), - "redirect_uri": "", - "next_path": "", - } - session.save() - - # check authentication error is reported - login_url = reverse( - "oidc-login-complete", - query_params={"code": "some-code", "state": session["login_data"]["state"]}, - ) - - # should render an error page - response = check_html_get_response( - client, login_url, status_code=500, template_used="error.html" - ) - request = response.wsgi_request - assert_contains(response, "User authentication failed.", status_code=500) - - # no user should be logged in - assert isinstance(request.user, AnonymousUser) - - -@pytest.mark.django_db -def test_oidc_logout_view_failure(client, keycloak_mock): - """ - Simulate a failed logout operation with OpenID Connect. - """ - # login our test user - client.login(code="", code_verifier="", redirect_uri="") - - err_msg = "Authentication server error" - keycloak_mock.logout.side_effect = Exception(err_msg) - - # user initiates logout process - logout_url = reverse("oidc-logout") - # should render an error page - response = check_html_get_response( - client, logout_url, status_code=500, template_used="error.html" - ) - request = response.wsgi_request - assert_contains(response, err_msg, status_code=500) - - # user should be logged out from Django anyway - assert isinstance(request.user, AnonymousUser) - - def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default @@ -341,7 +112,7 @@ ) nb_tokens = len(OIDCUserOfflineTokens.objects.all()) - response = check_html_get_response(client, token_complete_url, status_code=302) + response = check_http_get_response(client, token_complete_url, status_code=302) request = response.wsgi_request # check token has been generated and saved encrypted to database @@ -360,12 +131,12 @@ @pytest.mark.django_db -def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock): +def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_oidc): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ - _generate_and_test_bearer_token(client, keycloak_mock) + _generate_and_test_bearer_token(client, keycloak_oidc) def test_oidc_list_bearer_tokens_anonymous_user(client): @@ -379,14 +150,14 @@ @pytest.mark.django_db -def test_oidc_list_bearer_tokens(client, keycloak_mock): +def test_oidc_list_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to list his tokens. """ nb_tokens = 3 for _ in range(nb_tokens): - _generate_and_test_bearer_token(client, keycloak_mock) + _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} @@ -411,14 +182,14 @@ @pytest.mark.django_db -def test_oidc_get_bearer_token(client, keycloak_mock): +def test_oidc_get_bearer_token(client, keycloak_oidc): """ User with correct credentials should be allowed to display a token. """ nb_tokens = 3 for i in range(nb_tokens): - token = _generate_and_test_bearer_token(client, keycloak_mock) + token = _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-get-bearer-token") @@ -441,14 +212,14 @@ @pytest.mark.django_db -def test_oidc_revoke_bearer_tokens(client, keycloak_mock): +def test_oidc_revoke_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to revoke tokens. """ nb_tokens = 3 for _ in range(nb_tokens): - _generate_and_test_bearer_token(client, keycloak_mock) + _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-revoke-bearer-tokens") @@ -470,12 +241,12 @@ """ url = reverse("oidc-profile") login_url = reverse("oidc-login", query_params={"next_path": url}) - resp = check_html_get_response(client, url, status_code=302) + resp = check_http_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db -def test_oidc_profile_view(client, keycloak_mock): +def test_oidc_profile_view(client, keycloak_oidc): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. @@ -483,7 +254,7 @@ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] user_permissions = ["perm1", "perm2"] - keycloak_mock.user_permissions = user_permissions + keycloak_oidc.user_permissions = user_permissions client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -18,7 +18,6 @@ from django.core.cache import cache from rest_framework.test import APIClient, APIRequestFactory -from swh.auth.pytest_plugin import keycloak_mock_factory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest @@ -28,6 +27,8 @@ from swh.web.config import get_config from swh.web.tests.data import get_tests_data, override_storages +pytest_plugins = ["swh.auth.pytest_plugin"] + # Used to skip some tests ctags_json_missing = ( shutil.which("ctags") is None @@ -361,22 +362,15 @@ yield converters.from_swh(ctag, hashess={"id"}) -_keycloak_config = get_config()["keycloak"] - -_keycloak_mock = keycloak_mock_factory( - server_url=_keycloak_config["server_url"], - realm_name=_keycloak_config["realm_name"], - client_id=OIDC_SWH_WEB_CLIENT_ID, -) +@pytest.fixture +def keycloak_oidc(keycloak_oidc, mocker): + keycloak_config = get_config()["keycloak"] + keycloak_oidc.server_url = keycloak_config["server_url"] + keycloak_oidc.realm_name = keycloak_config["realm_name"] + keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID -@pytest.fixture -def keycloak_mock(_keycloak_mock, mocker): - for oidc_client_factory in ( - "swh.web.auth.views.get_oidc_client", - "swh.web.auth.backends.get_oidc_client", - ): - mock_get_oidc_client = mocker.patch(oidc_client_factory) - mock_get_oidc_client.return_value = _keycloak_mock + keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client") + keycloak_oidc_client.return_value = keycloak_oidc - return _keycloak_mock + return keycloak_oidc