diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -15,13 +15,9 @@ from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed, ValidationError -from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.models import OIDCUser from swh.web.auth.utils import get_oidc_client -# OpenID Connect client to communicate with Keycloak server -_oidc_client: KeycloakOpenIDConnect = get_oidc_client() - def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model @@ -45,7 +41,7 @@ # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) - client_resource_access = resource_access.get(_oidc_client.client_id, {}) + client_resource_access = resource_access.get(get_oidc_client().client_id, {}) user.permissions = set(client_resource_access.get("roles", [])) # add user sub to custom User proxy model @@ -56,16 +52,18 @@ def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: + oidc_client = get_oidc_client() + # decode JWT token try: access_token = oidc_profile["access_token"] - decoded_token = _oidc_client.decode_token(access_token) + decoded_token = oidc_client.decode_token(access_token) # access token has expired or is invalid except Exception: # get a new access token from authentication provider - oidc_profile = _oidc_client.refresh_token(oidc_profile["refresh_token"]) + oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token - decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) + decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = _oidc_user_from_decoded_token(decoded_token) @@ -106,7 +104,7 @@ user = None try: # try to authenticate user with OIDC PKCE authorization code flow - oidc_profile = _oidc_client.authorization_code( + oidc_profile = get_oidc_client().authorization_code( code, redirect_uri, code_verifier=code_verifier ) @@ -151,6 +149,8 @@ ) try: + oidc_client = get_oidc_client() + # compute a cache key from the token that does not exceed # memcached key size limit hasher = hashlib.sha1() @@ -162,16 +162,16 @@ # attempt to decode access token try: - decoded_token = _oidc_client.decode_token(access_token) + decoded_token = oidc_client.decode_token(access_token) except Exception: # access token is None or it has expired decoded_token = None if access_token is None or decoded_token is None: # get a new access token from authentication provider - access_token = _oidc_client.refresh_token(refresh_token)["access_token"] + access_token = oidc_client.refresh_token(refresh_token)["access_token"] # decode access token - decoded_token = _oidc_client.decode_token(access_token) + decoded_token = oidc_client.decode_token(access_token) # compute access token expiration exp = datetime.fromtimestamp(decoded_token["exp"]) ttl = int(exp.timestamp() - timezone.now().timestamp()) diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py deleted file mode 100644 --- a/swh/web/auth/keycloak.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from typing import Any, Dict, Optional, Tuple -from urllib.parse import urlencode - -from keycloak import KeycloakOpenID - - -class KeycloakOpenIDConnect: - """ - Wrapper class around python-keycloak to ease the interaction with Keycloak - for managing authentication and user permissions with OpenID Connect. - """ - - def __init__( - self, - server_url: str, - realm_name: str, - client_id: str, - realm_public_key: str = "", - ): - """ - Args: - server_url: URL of the Keycloak server - realm_name: The realm name - client_id: The OpenID Connect client identifier - realm_public_key: The realm public key (will be dynamically - retrieved if not provided) - """ - self._keycloak = KeycloakOpenID( - server_url=server_url, client_id=client_id, realm_name=realm_name, - ) - - self.server_url = server_url - self.realm_name = realm_name - self.client_id = client_id - self.realm_public_key = realm_public_key - - def well_known(self) -> Dict[str, Any]: - """ - Retrieve the OpenID Connect Well-Known URI registry from Keycloak. - - Returns: - A dictionary filled with OpenID Connect URIS. - """ - return self._keycloak.well_know() - - def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: - """ - Get OpenID Connect authorization URL to authenticate users. - - Args: - redirect_uri: URI to redirect to once a user is authenticated - extra_params: Extra query parameters to add to the - authorization URL - """ - auth_url = self._keycloak.auth_url(redirect_uri) - if extra_params: - auth_url += "&%s" % urlencode(extra_params) - return auth_url - - def authorization_code( - self, code: str, redirect_uri: str, **extra_params: str - ) -> Dict[str, Any]: - """ - Get OpenID Connect authentication tokens using Authorization - Code flow. - - Args: - code: Authorization code provided by Keycloak - redirect_uri: URI to redirect to once a user is authenticated - (must be the same as the one provided to authorization_url): - extra_params: Extra parameters to add in the authorization request - payload. - """ - return self._keycloak.token( - grant_type="authorization_code", - code=code, - redirect_uri=redirect_uri, - **extra_params, - ) - - def refresh_token(self, refresh_token: str) -> Dict[str, Any]: - """ - Request a new access token from Keycloak using a refresh token. - - Args: - refresh_token: A refresh token provided by Keycloak - - Returns: - A dictionary filled with tokens info - """ - return self._keycloak.refresh_token(refresh_token) - - def decode_token( - self, token: str, options: Optional[Dict[str, Any]] = None - ) -> Dict[str, Any]: - """ - Try to decode a JWT token. - - Args: - token: A JWT token to decode - options: Options for jose.jwt.decode - - Returns: - A dictionary filled with decoded token content - """ - if not self.realm_public_key: - realm_public_key = self._keycloak.public_key() - self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" - self.realm_public_key += realm_public_key - self.realm_public_key += "\n-----END PUBLIC KEY-----" - - return self._keycloak.decode_token( - token, key=self.realm_public_key, options=options - ) - - def logout(self, refresh_token: str) -> None: - """ - Logout a user by closing its authenticated session. - - Args: - refresh_token: A refresh token provided by Keycloak - """ - self._keycloak.logout(refresh_token) - - def userinfo(self, access_token: str) -> Dict[str, Any]: - """ - Return user information from its access token. - - Args: - access_token: An access token provided by Keycloak - - Returns: - A dictionary fillled with user information - """ - return self._keycloak.userinfo(access_token) - - -# stores instances of KeycloakOpenIDConnect class -# dict keys are (realm_name, client_id) tuples -_keycloak_oidc: Dict[Tuple[str, str], KeycloakOpenIDConnect] = {} - - -def get_keycloak_oidc_client( - server_url: str, realm_name: str, client_id: str -) -> KeycloakOpenIDConnect: - """ - Instantiate a KeycloakOpenIDConnect class for a given client in a - given realm. - - Args: - server_url: Base URL of a Keycloak server - realm_name: Name of the realm in Keycloak - client_id: Client identifier in the realm - - Returns: - An object to ease the interaction with the Keycloak server - """ - realm_client_key = (realm_name, client_id) - if realm_client_key not in _keycloak_oidc: - _keycloak_oidc[realm_client_key] = KeycloakOpenIDConnect( - server_url, realm_name, client_id - ) - return _keycloak_oidc[realm_client_key] diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py --- a/swh/web/auth/utils.py +++ b/swh/web/auth/utils.py @@ -6,14 +6,14 @@ from base64 import urlsafe_b64encode import hashlib import secrets -from typing import Tuple +from typing import Dict, Tuple from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from swh.web.auth.keycloak import KeycloakOpenIDConnect, get_keycloak_oidc_client +from swh.auth.keycloak import KeycloakOpenIDConnect from swh.web.config import get_config @@ -103,6 +103,11 @@ return _get_fernet(password, salt).decrypt(data) +# stores instances of KeycloakOpenIDConnect class +# dict keys are (realm_name, client_id) tuples +_keycloak_oidc: Dict[str, KeycloakOpenIDConnect] = {} + + def get_oidc_client(client_id: str = OIDC_SWH_WEB_CLIENT_ID) -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class for a given client in the @@ -114,9 +119,10 @@ Returns: An object to ease the interaction with the Keycloak server """ - swhweb_config = get_config() - return get_keycloak_oidc_client( - swhweb_config["keycloak"]["server_url"], - swhweb_config["keycloak"]["realm_name"], - client_id, - ) + keycloak_config = get_config()["keycloak"] + + if client_id not in _keycloak_oidc: + _keycloak_oidc[client_id] = KeycloakOpenIDConnect( + keycloak_config["server_url"], keycloak_config["realm_name"], client_id + ) + return _keycloak_oidc[client_id] diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py --- a/swh/web/tests/api/views/test_graph.py +++ b/swh/web/tests/api/views/test_graph.py @@ -16,8 +16,6 @@ from swh.web.api.views.graph import API_GRAPH_PERM from swh.web.common.utils import reverse from swh.web.config import SWH_WEB_INTERNAL_SERVER_NAME, get_config -from swh.web.tests.auth.keycloak_mock import mock_keycloak -from swh.web.tests.auth.sample_data import oidc_profile from swh.web.tests.strategies import origin from swh.web.tests.utils import check_http_get_response @@ -40,20 +38,21 @@ check_http_get_response(api_client, url, status_code=401) -def _authenticate_graph_user(api_client, mocker): - mock_keycloak(mocker, user_permissions=[API_GRAPH_PERM]) +def _authenticate_graph_user(api_client, keycloak_mock): + keycloak_mock.user_permissions = [API_GRAPH_PERM] + oidc_profile = keycloak_mock.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") -def test_graph_endpoint_needs_permission(api_client, mocker, requests_mock): +def test_graph_endpoint_needs_permission(api_client, keycloak_mock, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) + oidc_profile = keycloak_mock.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") - mock_keycloak(mocker, user_permissions=[]) check_http_get_response(api_client, url, status_code=403) - _authenticate_graph_user(api_client, mocker) + _authenticate_graph_user(api_client, keycloak_mock) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, @@ -62,8 +61,8 @@ check_http_get_response(api_client, url, status_code=200) -def test_graph_text_plain_response(api_client, mocker, requests_mock): - _authenticate_graph_user(api_client, mocker) +def test_graph_text_plain_response(api_client, keycloak_mock, requests_mock): + _authenticate_graph_user(api_client, keycloak_mock) graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323" @@ -115,8 +114,8 @@ } -def test_graph_json_response(api_client, mocker, requests_mock): - _authenticate_graph_user(api_client, mocker) +def test_graph_json_response(api_client, keycloak_mock, requests_mock): + _authenticate_graph_user(api_client, keycloak_mock) graph_query = "stats" @@ -133,8 +132,8 @@ assert resp.content == json.dumps(_response_json).encode() -def test_graph_ndjson_response(api_client, mocker, requests_mock): - _authenticate_graph_user(api_client, mocker) +def test_graph_ndjson_response(api_client, keycloak_mock, requests_mock): + _authenticate_graph_user(api_client, keycloak_mock) graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb" @@ -168,7 +167,7 @@ @given(origin()) def test_graph_response_resolve_origins( - archive_data, api_client, mocker, requests_mock, origin + archive_data, api_client, keycloak_mock, requests_mock, origin ): hasher = hashlib.sha1() hasher.update(origin["url"].encode()) @@ -183,7 +182,7 @@ ) ) - _authenticate_graph_user(api_client, mocker) + _authenticate_graph_user(api_client, keycloak_mock) for graph_query, response_text, content_type in ( ( @@ -239,9 +238,9 @@ def test_graph_response_resolve_origins_nothing_to_do( - api_client, mocker, requests_mock + api_client, keycloak_mock, requests_mock ): - _authenticate_graph_user(api_client, mocker) + _authenticate_graph_user(api_client, keycloak_mock) graph_query = "stats" diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py deleted file mode 100644 --- a/swh/web/tests/auth/keycloak_mock.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from copy import copy -from unittest.mock import Mock - -from keycloak.exceptions import KeycloakError - -from django.utils import timezone - -from swh.web.auth.keycloak import KeycloakOpenIDConnect -from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID -from swh.web.config import get_config - -from .sample_data import oidc_profile, realm_public_key, userinfo - - -class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): - def __init__( - self, auth_success=True, exp=None, user_groups=[], user_permissions=[] - ): - swhweb_config = get_config() - super().__init__( - swhweb_config["keycloak"]["server_url"], - swhweb_config["keycloak"]["realm_name"], - OIDC_SWH_WEB_CLIENT_ID, - ) - self.auth_success = auth_success - self.exp = exp - self.user_groups = user_groups - self.user_permissions = user_permissions - self._keycloak.public_key = lambda: realm_public_key - self._keycloak.well_know = lambda: { - "issuer": f"{self.server_url}realms/{self.realm_name}", - "authorization_endpoint": ( - f"{self.server_url}realms/" - f"{self.realm_name}/protocol/" - "openid-connect/auth" - ), - "token_endpoint": ( - f"{self.server_url}realms/{self.realm_name}/" - "protocol/openid-connect/token" - ), - "token_introspection_endpoint": ( - f"{self.server_url}realms/" - f"{self.realm_name}/protocol/" - "openid-connect/token/" - "introspect" - ), - "userinfo_endpoint": ( - f"{self.server_url}realms/{self.realm_name}/" - "protocol/openid-connect/userinfo" - ), - "end_session_endpoint": ( - f"{self.server_url}realms/" - f"{self.realm_name}/protocol/" - "openid-connect/logout" - ), - "jwks_uri": ( - f"{self.server_url}realms/{self.realm_name}/" - "protocol/openid-connect/certs" - ), - } - self.authorization_code = Mock() - self.refresh_token = Mock() - self.userinfo = Mock() - self.logout = Mock() - if auth_success: - self.authorization_code.return_value = copy(oidc_profile) - self.refresh_token.return_value = copy(oidc_profile) - self.userinfo.return_value = copy(userinfo) - else: - self.authorization_url = Mock() - exception = KeycloakError( - error_message="Authentication failed", response_code=401 - ) - self.authorization_code.side_effect = exception - self.authorization_url.side_effect = exception - self.refresh_token.side_effect = exception - self.userinfo.side_effect = exception - self.logout.side_effect = exception - - def decode_token(self, token): - options = {} - if self.auth_success: - # skip signature expiration check as we use a static oidc_profile - # for the tests with expired tokens in it - options["verify_exp"] = False - decoded = super().decode_token(token, options) - # tweak auth and exp time for tests - expire_in = decoded["exp"] - decoded["iat"] - if self.exp is not None: - decoded["exp"] = self.exp - decoded["iat"] = self.exp - expire_in - else: - decoded["iat"] = int(timezone.now().timestamp()) - decoded["exp"] = decoded["iat"] + expire_in - decoded["groups"] = self.user_groups - if self.user_permissions: - decoded["resource_access"][self.client_id] = { - "roles": self.user_permissions - } - return decoded - - -def mock_keycloak( - mocker, auth_success=True, exp=None, user_groups=[], user_permissions=[] -): - kc_oidc_mock = KeycloackOpenIDConnectMock( - auth_success, exp, user_groups, user_permissions - ) - mock_get_oidc_client = mocker.patch("swh.web.auth.views.get_oidc_client") - mock_get_oidc_client.return_value = kc_oidc_mock - mocker.patch("swh.web.auth.backends._oidc_client", kc_oidc_mock) - return kc_oidc_mock diff --git a/swh/web/tests/auth/sample_data.py b/swh/web/tests/auth/sample_data.py deleted file mode 100644 --- a/swh/web/tests/auth/sample_data.py +++ /dev/null @@ -1,128 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -realm_public_key = ( - "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnqF4xvGjaI54P6WtJvyGayxP8A93u" - "NcA3TH6jitwmyAalj8dN8/NzK9vrdlSA3Ibvp/XQujPSOP7a35YiYFscEJnogTXQpE/FhZrUY" - "y21U6ezruVUv4z/ER1cYLb+q5ZI86nXSTNCAbH+lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy" - "5o6ke1G3fXbNSXwF4qlWAzo1o7Ms8qNrNyOG8FPx24dvm9xMH7/08IPvh9KUqlnP8h6olpxHr" - "drX/q4E+Nzj8Tr8p7Z5CimInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0/LO2zYie" - "Hl5Lv7Iig4FOIXIVCaDGQIDAQAB" -) - -oidc_profile = { - "access_token": ( - # decoded token: - # {'acr': '1', - # 'allowed-origins': ['*'], - # 'aud': ['swh-web', 'account'], - # 'auth_time': 1582723101, - # 'azp': 'swh-web', - # 'email': 'john.doe@example.com', - # 'email_verified': False, - # 'exp': 1592396202, - # 'family_name': 'Doe', - # 'given_name': 'John', - # 'groups': ['/staff'], - # 'iat': 1592395601, - # 'iss': 'http://localhost:8080/auth/realms/SoftwareHeritage', - # 'jti': '31fc50b7-bbe5-4f51-91ef-8e3eec51331e', - # 'name': 'John Doe', - # 'nbf': 0, - # 'preferred_username': 'johndoe', - # 'realm_access': {'roles': ['offline_access', 'uma_authorization']}, - # 'resource_access': {'account': {'roles': ['manage-account', - # 'manage-account-links', - # 'view-profile']}}, - # 'scope': 'openid email profile', - # 'session_state': 'd82b90d1-0a94-4e74-ad66-dd95341c7b6d', - # 'sub': 'feacd344-b468-4a65-a236-14f61e6b7200', - # 'typ': 'Bearer' - # } - "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV" - "Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0." - "eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz" - "MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz" - "MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs" - "bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj" - "b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2" - "MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi" - "YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy" - "YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi" - "MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6" - "eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0" - "aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl" - "cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz" - "Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg" - "cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv" - "aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi" - "am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi" - "OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-" - "Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB" - "AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO" - "kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc" - "HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl" - "rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE" - "oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ" - "6A" - ), - "expires_in": 600, - "id_token": ( - "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0" - "TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki" - "OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi" - "OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo" - "dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp" - "dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh" - "NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13" - "ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk" - "ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx" - "IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn" - "cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2" - "ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi" - "am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee" - "JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL" - "aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_" - "PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE" - "0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN" - "ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX" - "ZbYnitD1Typ6Q" - ), - "not-before-policy": 0, - "refresh_expires_in": 1800, - "refresh_token": ( - "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM" - "zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk" - "iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC" - "JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL" - "CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv" - "U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q" - "6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj" - "oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid" - "HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi" - "OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ" - "2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl" - "sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic" - "mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu" - "YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc" - "tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG" - "UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI" - ), - "scope": "openid email profile", - "session_state": "d82b90d1-0a94-4e74-ad66-dd95341c7b6d", - "token_type": "bearer", -} - -userinfo = { - "email": "john.doe@example.com", - "email_verified": False, - "family_name": "Doe", - "given_name": "John", - "groups": ["/staff"], - "name": "John Doe", - "preferred_username": "johndoe", - "sub": "feacd344-b468-4a65-a236-14f61e6b7200", -} diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py --- a/swh/web/tests/auth/test_api_auth.py +++ b/swh/web/tests/auth/test_api_auth.py @@ -11,19 +11,15 @@ from swh.web.common.utils import reverse from swh.web.tests.utils import check_api_get_responses, check_http_get_response -from . import sample_data -from .keycloak_mock import mock_keycloak - @pytest.mark.django_db -def test_drf_django_session_auth_success(mocker, client): +def test_drf_django_session_auth_success(keycloak_mock, client): """ Check user gets authenticated when querying the web api through a web browser. """ url = reverse("api-1-stat-counters") - mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") response = check_http_get_response(client, url, status_code=200) @@ -38,16 +34,16 @@ @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_success(mocker, api_client): +def test_drf_oidc_bearer_token_auth_success(keycloak_mock, api_client): """ Check user gets authenticated when querying the web api through an HTTP client using bearer token authentication. """ url = reverse("api-1-stat-counters") - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] - mock_keycloak(mocker) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") response = check_api_get_responses(api_client, url, status_code=200) @@ -62,13 +58,14 @@ @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_failure(mocker, api_client): +def test_drf_oidc_bearer_token_auth_failure(keycloak_mock, api_client): url = reverse("api-1-stat-counters") - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] # check for failed authentication but with expected token format - mock_keycloak(mocker, auth_success=False) + keycloak_mock.set_auth_success(False) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") response = check_api_get_responses(api_client, url, status_code=403) @@ -85,10 +82,11 @@ assert isinstance(request.user, AnonymousUser) -def test_drf_oidc_auth_invalid_or_missing_authorization_type(api_client): +def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_mock, api_client): url = reverse("api-1-stat-counters") - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] # missing authorization type api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}") diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -16,9 +16,6 @@ from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse -from . import sample_data -from .keycloak_mock import mock_keycloak - def _authenticate_user(request_factory): request = request_factory.get(reverse("oidc-login-complete")) @@ -48,17 +45,18 @@ @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_success(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_success(keycloak_mock, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ - kc_oidc_mock = mock_keycloak(mocker, user_groups=["/staff"]) - oidc_profile = sample_data.oidc_profile + keycloak_mock.user_groups = ["/staff"] + + oidc_profile = keycloak_mock.login() user = _authenticate_user(request_factory) - decoded_token = kc_oidc_mock.decode_token(user.access_token) - _check_authenticated_user(user, decoded_token, kc_oidc_mock) + decoded_token = keycloak_mock.decode_token(user.access_token) + _check_authenticated_user(user, decoded_token, keycloak_mock) auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) @@ -81,12 +79,12 @@ @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_failure(keycloak_mock, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ - mock_keycloak(mocker, auth_success=False) + keycloak_mock.set_auth_success(False) user = _authenticate_user(request_factory) @@ -94,18 +92,19 @@ @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_refresh_token_success(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_refresh_token_success( + keycloak_mock, request_factory +): """ Checks access token renewal success using refresh token. """ - kc_oidc_mock = mock_keycloak(mocker) - oidc_profile = sample_data.oidc_profile - decoded_token = kc_oidc_mock.decode_token(oidc_profile["access_token"]) + oidc_profile = keycloak_mock.login() + decoded_token = keycloak_mock.decode_token(oidc_profile["access_token"]) new_access_token = "new_access_token" def _refresh_token(refresh_token): - oidc_profile = dict(sample_data.oidc_profile) + oidc_profile = dict(keycloak_mock.login()) oidc_profile["access_token"] = new_access_token return oidc_profile @@ -115,25 +114,25 @@ else: return decoded_token - kc_oidc_mock.decode_token = Mock() - kc_oidc_mock.decode_token.side_effect = _decode_token - kc_oidc_mock.refresh_token.side_effect = _refresh_token + keycloak_mock.decode_token = Mock() + keycloak_mock.decode_token.side_effect = _decode_token + keycloak_mock.refresh_token.side_effect = _refresh_token user = _authenticate_user(request_factory) - kc_oidc_mock.refresh_token.assert_called_with( - sample_data.oidc_profile["refresh_token"] - ) + oidc_profile = keycloak_mock.login() + keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is not None @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_refresh_token_failure(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_refresh_token_failure( + keycloak_mock, request_factory +): """ Checks access token renewal failure using refresh token. """ - kc_oidc_mock = mock_keycloak(mocker) def _refresh_token(refresh_token): raise Exception("OIDC session has expired") @@ -141,27 +140,26 @@ def _decode_token(access_token): raise Exception("access token token has expired") - kc_oidc_mock.decode_token = Mock() - kc_oidc_mock.decode_token.side_effect = _decode_token - kc_oidc_mock.refresh_token.side_effect = _refresh_token + keycloak_mock.decode_token = Mock() + keycloak_mock.decode_token.side_effect = _decode_token + keycloak_mock.refresh_token.side_effect = _refresh_token user = _authenticate_user(request_factory) - kc_oidc_mock.refresh_token.assert_called_with( - sample_data.oidc_profile["refresh_token"] - ) + oidc_profile = keycloak_mock.login() + keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is None @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_permissions(keycloak_mock, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ permission = "webapp.some-permission" - mock_keycloak(mocker, user_permissions=[permission]) + keycloak_mock.user_permissions = [permission] user = _authenticate_user(request_factory) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} @@ -171,7 +169,7 @@ @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory): +def test_drf_oidc_bearer_token_auth_backend_success(keycloak_mock, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). @@ -179,23 +177,22 @@ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() - kc_oidc_mock = mock_keycloak(mocker) - - refresh_token = sample_data.oidc_profile["refresh_token"] - access_token = sample_data.oidc_profile["access_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] + access_token = oidc_profile["access_token"] - decoded_token = kc_oidc_mock.decode_token(access_token) + decoded_token = keycloak_mock.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) - _check_authenticated_user(user, decoded_token, kc_oidc_mock) + _check_authenticated_user(user, decoded_token, keycloak_mock) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory): +def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_mock, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). @@ -203,10 +200,12 @@ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() + oidc_profile = keycloak_mock.login() + # simulate a failed authentication with a bearer token in expected format - mock_keycloak(mocker, auth_success=False) + keycloak_mock.set_auth_success(False) - refresh_token = sample_data.oidc_profile["refresh_token"] + refresh_token = oidc_profile["refresh_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") @@ -222,7 +221,7 @@ drf_auth_backend.authenticate(request) -def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory): +def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_mock, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid @@ -231,7 +230,8 @@ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") @@ -247,16 +247,19 @@ @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_permissions(mocker, api_request_factory): +def test_drf_oidc_bearer_token_auth_backend_permissions( + keycloak_mock, api_request_factory +): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ permission = "webapp.some-permission" - mock_keycloak(mocker, user_permissions=[permission]) + keycloak_mock.user_permissions = [permission] drf_auth_backend = OIDCBearerTokenAuthentication() - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] url = reverse("api-1-stat-counters") request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py --- a/swh/web/tests/auth/test_middlewares.py +++ b/swh/web/tests/auth/test_middlewares.py @@ -12,18 +12,16 @@ from swh.web.common.utils import reverse from swh.web.tests.utils import check_html_get_response -from .keycloak_mock import mock_keycloak - @pytest.mark.django_db @modify_settings( MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]} ) -def test_oidc_session_expired_middleware_disabled(client, mocker): +def test_oidc_session_expired_middleware_disabled(client, keycloak_mock): # authenticate user - kc_oidc_mock = mock_keycloak(mocker) + client.login(code="", code_verifier="", redirect_uri="") - kc_oidc_mock.authorization_code.assert_called() + keycloak_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") @@ -38,11 +36,10 @@ @pytest.mark.django_db -def test_oidc_session_expired_middleware_enabled(client, mocker): +def test_oidc_session_expired_middleware_enabled(client, keycloak_mock): # authenticate user - kc_oidc_mock = mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") - kc_oidc_mock.authorization_code.assert_called() + keycloak_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -24,9 +24,6 @@ ) from swh.web.urls import _default_view as homepage_view -from . import sample_data -from .keycloak_mock import mock_keycloak - def _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri, scope="openid" @@ -62,14 +59,11 @@ @pytest.mark.django_db -def test_oidc_login_views_success(client, mocker): +def test_oidc_login_views_success(client, keycloak_mock): """ Simulate a successful login authentication with OpenID Connect authorization code flow with PKCE. """ - # mock Keycloak client - kc_oidc_mock = mock_keycloak(mocker) - # user initiates login process login_url = reverse("oidc-login") @@ -83,7 +77,7 @@ login_data = _check_oidc_login_code_flow_data( request, response, - kc_oidc_mock, + keycloak_mock, redirect_uri=reverse("oidc-login-complete", request=request), ) @@ -120,15 +114,13 @@ @pytest.mark.django_db -def test_oidc_logout_view_success(client, mocker): +def test_oidc_logout_view_success(client, keycloak_mock): """ Simulate a successful logout operation with OpenID Connect. """ - # mock Keycloak client - kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") - kc_oidc_mock.authorization_code.assert_called() + keycloak_mock.authorization_code.assert_called() # user initiates logout oidc_logout_url = reverse("oidc-logout") @@ -141,19 +133,19 @@ assert response["location"] == request.build_absolute_uri(logout_url) # should have been logged out in Keycloak - kc_oidc_mock.logout.assert_called_with(sample_data.oidc_profile["refresh_token"]) + oidc_profile = keycloak_mock.login() + keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"]) # check effective logout in Django assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db -def test_oidc_login_view_failure(client, mocker): +def test_oidc_login_view_failure(client, keycloak_mock): """ Simulate a failed authentication with OpenID Connect. """ - # mock Keycloak client - mock_keycloak(mocker, auth_success=False) + keycloak_mock.set_auth_success(False) # user initiates login process login_url = reverse("oidc-login") @@ -209,10 +201,7 @@ assert isinstance(request.user, AnonymousUser) -def test_oidc_login_complete_wrong_csrf_token(client, mocker): - # mock Keycloak client - mock_keycloak(mocker) - +def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock): # simulate login process has been initialized session = client.session session["login_data"] = { @@ -242,9 +231,8 @@ @pytest.mark.django_db -def test_oidc_login_complete_wrong_code_verifier(client, mocker): - # mock Keycloak client - mock_keycloak(mocker, auth_success=False) +def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock): + keycloak_mock.set_auth_success(False) # simulate login process has been initialized session = client.session @@ -274,17 +262,15 @@ @pytest.mark.django_db -def test_oidc_logout_view_failure(client, mocker): +def test_oidc_logout_view_failure(client, keycloak_mock): """ Simulate a failed logout operation with OpenID Connect. """ - # mock Keycloak client - kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") err_msg = "Authentication server error" - kc_oidc_mock.logout.side_effect = Exception(err_msg) + keycloak_mock.logout.side_effect = Exception(err_msg) # user initiates logout process logout_url = reverse("oidc-logout") @@ -373,13 +359,12 @@ @pytest.mark.django_db -def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): +def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ - kc_oidc_mock = mock_keycloak(mocker) - _generate_and_test_bearer_token(client, kc_oidc_mock) + _generate_and_test_bearer_token(client, keycloak_mock) def test_oidc_list_bearer_tokens_anonymous_user(client): @@ -393,15 +378,14 @@ @pytest.mark.django_db -def test_oidc_list_bearer_tokens(client, mocker): +def test_oidc_list_bearer_tokens(client, keycloak_mock): """ User with correct credentials should be allowed to list his tokens. """ - kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for _ in range(nb_tokens): - _generate_and_test_bearer_token(client, kc_oidc_mock) + _generate_and_test_bearer_token(client, keycloak_mock) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} @@ -426,15 +410,14 @@ @pytest.mark.django_db -def test_oidc_get_bearer_token(client, mocker): +def test_oidc_get_bearer_token(client, keycloak_mock): """ User with correct credentials should be allowed to display a token. """ - kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for i in range(nb_tokens): - token = _generate_and_test_bearer_token(client, kc_oidc_mock) + token = _generate_and_test_bearer_token(client, keycloak_mock) url = reverse("oidc-get-bearer-token") @@ -457,15 +440,14 @@ @pytest.mark.django_db -def test_oidc_revoke_bearer_tokens(client, mocker): +def test_oidc_revoke_bearer_tokens(client, keycloak_mock): """ User with correct credentials should be allowed to revoke tokens. """ - kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for _ in range(nb_tokens): - _generate_and_test_bearer_token(client, kc_oidc_mock) + _generate_and_test_bearer_token(client, keycloak_mock) url = reverse("oidc-revoke-bearer-tokens") @@ -492,7 +474,7 @@ @pytest.mark.django_db -def test_oidc_profile_view(client, mocker): +def test_oidc_profile_view(client, keycloak_mock): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. @@ -500,7 +482,7 @@ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] user_permissions = ["perm1", "perm2"] - mock_keycloak(mocker, user_permissions=user_permissions) + keycloak_mock.user_permissions = user_permissions client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -18,11 +18,14 @@ from django.core.cache import cache from rest_framework.test import APIClient, APIRequestFactory +from swh.auth.pytest_plugin import keycloak_mock_factory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest +from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.common import converters from swh.web.common.typing import OriginVisitInfo +from swh.web.config import get_config from swh.web.tests.data import get_tests_data, override_storages # Used to skip some tests @@ -356,3 +359,24 @@ ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={"id"}) + + +_keycloak_config = get_config()["keycloak"] + +_keycloak_mock = keycloak_mock_factory( + server_url=_keycloak_config["server_url"], + realm_name=_keycloak_config["realm_name"], + client_id=OIDC_SWH_WEB_CLIENT_ID, +) + + +@pytest.fixture +def keycloak_mock(_keycloak_mock, mocker): + for oidc_client_factory in ( + "swh.web.auth.views.get_oidc_client", + "swh.web.auth.backends.get_oidc_client", + ): + mock_get_oidc_client = mocker.patch(oidc_client_factory) + mock_get_oidc_client.return_value = _keycloak_mock + + return _keycloak_mock