Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7343114
D5303.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
47 KB
Subscribers
None
D5303.id.diff
View Options
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
--- a/swh/web/auth/backends.py
+++ b/swh/web/auth/backends.py
@@ -15,13 +15,9 @@
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed, ValidationError
-from swh.web.auth.keycloak import KeycloakOpenIDConnect
from swh.web.auth.models import OIDCUser
from swh.web.auth.utils import get_oidc_client
-# OpenID Connect client to communicate with Keycloak server
-_oidc_client: KeycloakOpenIDConnect = get_oidc_client()
-
def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser:
# compute an integer user identifier for Django User model
@@ -45,7 +41,7 @@
# extract user permissions if any
resource_access = decoded_token.get("resource_access", {})
- client_resource_access = resource_access.get(_oidc_client.client_id, {})
+ client_resource_access = resource_access.get(get_oidc_client().client_id, {})
user.permissions = set(client_resource_access.get("roles", []))
# add user sub to custom User proxy model
@@ -56,16 +52,18 @@
def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser:
+ oidc_client = get_oidc_client()
+
# decode JWT token
try:
access_token = oidc_profile["access_token"]
- decoded_token = _oidc_client.decode_token(access_token)
+ decoded_token = oidc_client.decode_token(access_token)
# access token has expired or is invalid
except Exception:
# get a new access token from authentication provider
- oidc_profile = _oidc_client.refresh_token(oidc_profile["refresh_token"])
+ oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"])
# decode access token
- decoded_token = _oidc_client.decode_token(oidc_profile["access_token"])
+ decoded_token = oidc_client.decode_token(oidc_profile["access_token"])
# create OIDCUser from decoded token
user = _oidc_user_from_decoded_token(decoded_token)
@@ -106,7 +104,7 @@
user = None
try:
# try to authenticate user with OIDC PKCE authorization code flow
- oidc_profile = _oidc_client.authorization_code(
+ oidc_profile = get_oidc_client().authorization_code(
code, redirect_uri, code_verifier=code_verifier
)
@@ -151,6 +149,8 @@
)
try:
+ oidc_client = get_oidc_client()
+
# compute a cache key from the token that does not exceed
# memcached key size limit
hasher = hashlib.sha1()
@@ -162,16 +162,16 @@
# attempt to decode access token
try:
- decoded_token = _oidc_client.decode_token(access_token)
+ decoded_token = oidc_client.decode_token(access_token)
except Exception:
# access token is None or it has expired
decoded_token = None
if access_token is None or decoded_token is None:
# get a new access token from authentication provider
- access_token = _oidc_client.refresh_token(refresh_token)["access_token"]
+ access_token = oidc_client.refresh_token(refresh_token)["access_token"]
# decode access token
- decoded_token = _oidc_client.decode_token(access_token)
+ decoded_token = oidc_client.decode_token(access_token)
# compute access token expiration
exp = datetime.fromtimestamp(decoded_token["exp"])
ttl = int(exp.timestamp() - timezone.now().timestamp())
diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py
deleted file mode 100644
--- a/swh/web/auth/keycloak.py
+++ /dev/null
@@ -1,168 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from typing import Any, Dict, Optional, Tuple
-from urllib.parse import urlencode
-
-from keycloak import KeycloakOpenID
-
-
-class KeycloakOpenIDConnect:
- """
- Wrapper class around python-keycloak to ease the interaction with Keycloak
- for managing authentication and user permissions with OpenID Connect.
- """
-
- def __init__(
- self,
- server_url: str,
- realm_name: str,
- client_id: str,
- realm_public_key: str = "",
- ):
- """
- Args:
- server_url: URL of the Keycloak server
- realm_name: The realm name
- client_id: The OpenID Connect client identifier
- realm_public_key: The realm public key (will be dynamically
- retrieved if not provided)
- """
- self._keycloak = KeycloakOpenID(
- server_url=server_url, client_id=client_id, realm_name=realm_name,
- )
-
- self.server_url = server_url
- self.realm_name = realm_name
- self.client_id = client_id
- self.realm_public_key = realm_public_key
-
- def well_known(self) -> Dict[str, Any]:
- """
- Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
-
- Returns:
- A dictionary filled with OpenID Connect URIS.
- """
- return self._keycloak.well_know()
-
- def authorization_url(self, redirect_uri: str, **extra_params: str) -> str:
- """
- Get OpenID Connect authorization URL to authenticate users.
-
- Args:
- redirect_uri: URI to redirect to once a user is authenticated
- extra_params: Extra query parameters to add to the
- authorization URL
- """
- auth_url = self._keycloak.auth_url(redirect_uri)
- if extra_params:
- auth_url += "&%s" % urlencode(extra_params)
- return auth_url
-
- def authorization_code(
- self, code: str, redirect_uri: str, **extra_params: str
- ) -> Dict[str, Any]:
- """
- Get OpenID Connect authentication tokens using Authorization
- Code flow.
-
- Args:
- code: Authorization code provided by Keycloak
- redirect_uri: URI to redirect to once a user is authenticated
- (must be the same as the one provided to authorization_url):
- extra_params: Extra parameters to add in the authorization request
- payload.
- """
- return self._keycloak.token(
- grant_type="authorization_code",
- code=code,
- redirect_uri=redirect_uri,
- **extra_params,
- )
-
- def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
- """
- Request a new access token from Keycloak using a refresh token.
-
- Args:
- refresh_token: A refresh token provided by Keycloak
-
- Returns:
- A dictionary filled with tokens info
- """
- return self._keycloak.refresh_token(refresh_token)
-
- def decode_token(
- self, token: str, options: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """
- Try to decode a JWT token.
-
- Args:
- token: A JWT token to decode
- options: Options for jose.jwt.decode
-
- Returns:
- A dictionary filled with decoded token content
- """
- if not self.realm_public_key:
- realm_public_key = self._keycloak.public_key()
- self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n"
- self.realm_public_key += realm_public_key
- self.realm_public_key += "\n-----END PUBLIC KEY-----"
-
- return self._keycloak.decode_token(
- token, key=self.realm_public_key, options=options
- )
-
- def logout(self, refresh_token: str) -> None:
- """
- Logout a user by closing its authenticated session.
-
- Args:
- refresh_token: A refresh token provided by Keycloak
- """
- self._keycloak.logout(refresh_token)
-
- def userinfo(self, access_token: str) -> Dict[str, Any]:
- """
- Return user information from its access token.
-
- Args:
- access_token: An access token provided by Keycloak
-
- Returns:
- A dictionary fillled with user information
- """
- return self._keycloak.userinfo(access_token)
-
-
-# stores instances of KeycloakOpenIDConnect class
-# dict keys are (realm_name, client_id) tuples
-_keycloak_oidc: Dict[Tuple[str, str], KeycloakOpenIDConnect] = {}
-
-
-def get_keycloak_oidc_client(
- server_url: str, realm_name: str, client_id: str
-) -> KeycloakOpenIDConnect:
- """
- Instantiate a KeycloakOpenIDConnect class for a given client in a
- given realm.
-
- Args:
- server_url: Base URL of a Keycloak server
- realm_name: Name of the realm in Keycloak
- client_id: Client identifier in the realm
-
- Returns:
- An object to ease the interaction with the Keycloak server
- """
- realm_client_key = (realm_name, client_id)
- if realm_client_key not in _keycloak_oidc:
- _keycloak_oidc[realm_client_key] = KeycloakOpenIDConnect(
- server_url, realm_name, client_id
- )
- return _keycloak_oidc[realm_client_key]
diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py
--- a/swh/web/auth/utils.py
+++ b/swh/web/auth/utils.py
@@ -6,14 +6,14 @@
from base64 import urlsafe_b64encode
import hashlib
import secrets
-from typing import Tuple
+from typing import Dict, Tuple
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
-from swh.web.auth.keycloak import KeycloakOpenIDConnect, get_keycloak_oidc_client
+from swh.auth.keycloak import KeycloakOpenIDConnect
from swh.web.config import get_config
@@ -103,6 +103,11 @@
return _get_fernet(password, salt).decrypt(data)
+# stores instances of KeycloakOpenIDConnect class
+# dict keys are (realm_name, client_id) tuples
+_keycloak_oidc: Dict[str, KeycloakOpenIDConnect] = {}
+
+
def get_oidc_client(client_id: str = OIDC_SWH_WEB_CLIENT_ID) -> KeycloakOpenIDConnect:
"""
Instantiate a KeycloakOpenIDConnect class for a given client in the
@@ -114,9 +119,10 @@
Returns:
An object to ease the interaction with the Keycloak server
"""
- swhweb_config = get_config()
- return get_keycloak_oidc_client(
- swhweb_config["keycloak"]["server_url"],
- swhweb_config["keycloak"]["realm_name"],
- client_id,
- )
+ keycloak_config = get_config()["keycloak"]
+
+ if client_id not in _keycloak_oidc:
+ _keycloak_oidc[client_id] = KeycloakOpenIDConnect(
+ keycloak_config["server_url"], keycloak_config["realm_name"], client_id
+ )
+ return _keycloak_oidc[client_id]
diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py
--- a/swh/web/tests/api/views/test_graph.py
+++ b/swh/web/tests/api/views/test_graph.py
@@ -16,8 +16,6 @@
from swh.web.api.views.graph import API_GRAPH_PERM
from swh.web.common.utils import reverse
from swh.web.config import SWH_WEB_INTERNAL_SERVER_NAME, get_config
-from swh.web.tests.auth.keycloak_mock import mock_keycloak
-from swh.web.tests.auth.sample_data import oidc_profile
from swh.web.tests.strategies import origin
from swh.web.tests.utils import check_http_get_response
@@ -40,20 +38,21 @@
check_http_get_response(api_client, url, status_code=401)
-def _authenticate_graph_user(api_client, mocker):
- mock_keycloak(mocker, user_permissions=[API_GRAPH_PERM])
+def _authenticate_graph_user(api_client, keycloak_mock):
+ keycloak_mock.user_permissions = [API_GRAPH_PERM]
+ oidc_profile = keycloak_mock.login()
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}")
-def test_graph_endpoint_needs_permission(api_client, mocker, requests_mock):
+def test_graph_endpoint_needs_permission(api_client, keycloak_mock, requests_mock):
graph_query = "stats"
url = reverse("api-1-graph", url_args={"graph_query": graph_query})
+ oidc_profile = keycloak_mock.login()
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}")
- mock_keycloak(mocker, user_permissions=[])
check_http_get_response(api_client, url, status_code=403)
- _authenticate_graph_user(api_client, mocker)
+ _authenticate_graph_user(api_client, keycloak_mock)
requests_mock.get(
get_config()["graph"]["server_url"] + graph_query,
json={},
@@ -62,8 +61,8 @@
check_http_get_response(api_client, url, status_code=200)
-def test_graph_text_plain_response(api_client, mocker, requests_mock):
- _authenticate_graph_user(api_client, mocker)
+def test_graph_text_plain_response(api_client, keycloak_mock, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_mock)
graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323"
@@ -115,8 +114,8 @@
}
-def test_graph_json_response(api_client, mocker, requests_mock):
- _authenticate_graph_user(api_client, mocker)
+def test_graph_json_response(api_client, keycloak_mock, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_mock)
graph_query = "stats"
@@ -133,8 +132,8 @@
assert resp.content == json.dumps(_response_json).encode()
-def test_graph_ndjson_response(api_client, mocker, requests_mock):
- _authenticate_graph_user(api_client, mocker)
+def test_graph_ndjson_response(api_client, keycloak_mock, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_mock)
graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb"
@@ -168,7 +167,7 @@
@given(origin())
def test_graph_response_resolve_origins(
- archive_data, api_client, mocker, requests_mock, origin
+ archive_data, api_client, keycloak_mock, requests_mock, origin
):
hasher = hashlib.sha1()
hasher.update(origin["url"].encode())
@@ -183,7 +182,7 @@
)
)
- _authenticate_graph_user(api_client, mocker)
+ _authenticate_graph_user(api_client, keycloak_mock)
for graph_query, response_text, content_type in (
(
@@ -239,9 +238,9 @@
def test_graph_response_resolve_origins_nothing_to_do(
- api_client, mocker, requests_mock
+ api_client, keycloak_mock, requests_mock
):
- _authenticate_graph_user(api_client, mocker)
+ _authenticate_graph_user(api_client, keycloak_mock)
graph_query = "stats"
diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py
deleted file mode 100644
--- a/swh/web/tests/auth/keycloak_mock.py
+++ /dev/null
@@ -1,117 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from copy import copy
-from unittest.mock import Mock
-
-from keycloak.exceptions import KeycloakError
-
-from django.utils import timezone
-
-from swh.web.auth.keycloak import KeycloakOpenIDConnect
-from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
-from swh.web.config import get_config
-
-from .sample_data import oidc_profile, realm_public_key, userinfo
-
-
-class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
- def __init__(
- self, auth_success=True, exp=None, user_groups=[], user_permissions=[]
- ):
- swhweb_config = get_config()
- super().__init__(
- swhweb_config["keycloak"]["server_url"],
- swhweb_config["keycloak"]["realm_name"],
- OIDC_SWH_WEB_CLIENT_ID,
- )
- self.auth_success = auth_success
- self.exp = exp
- self.user_groups = user_groups
- self.user_permissions = user_permissions
- self._keycloak.public_key = lambda: realm_public_key
- self._keycloak.well_know = lambda: {
- "issuer": f"{self.server_url}realms/{self.realm_name}",
- "authorization_endpoint": (
- f"{self.server_url}realms/"
- f"{self.realm_name}/protocol/"
- "openid-connect/auth"
- ),
- "token_endpoint": (
- f"{self.server_url}realms/{self.realm_name}/"
- "protocol/openid-connect/token"
- ),
- "token_introspection_endpoint": (
- f"{self.server_url}realms/"
- f"{self.realm_name}/protocol/"
- "openid-connect/token/"
- "introspect"
- ),
- "userinfo_endpoint": (
- f"{self.server_url}realms/{self.realm_name}/"
- "protocol/openid-connect/userinfo"
- ),
- "end_session_endpoint": (
- f"{self.server_url}realms/"
- f"{self.realm_name}/protocol/"
- "openid-connect/logout"
- ),
- "jwks_uri": (
- f"{self.server_url}realms/{self.realm_name}/"
- "protocol/openid-connect/certs"
- ),
- }
- self.authorization_code = Mock()
- self.refresh_token = Mock()
- self.userinfo = Mock()
- self.logout = Mock()
- if auth_success:
- self.authorization_code.return_value = copy(oidc_profile)
- self.refresh_token.return_value = copy(oidc_profile)
- self.userinfo.return_value = copy(userinfo)
- else:
- self.authorization_url = Mock()
- exception = KeycloakError(
- error_message="Authentication failed", response_code=401
- )
- self.authorization_code.side_effect = exception
- self.authorization_url.side_effect = exception
- self.refresh_token.side_effect = exception
- self.userinfo.side_effect = exception
- self.logout.side_effect = exception
-
- def decode_token(self, token):
- options = {}
- if self.auth_success:
- # skip signature expiration check as we use a static oidc_profile
- # for the tests with expired tokens in it
- options["verify_exp"] = False
- decoded = super().decode_token(token, options)
- # tweak auth and exp time for tests
- expire_in = decoded["exp"] - decoded["iat"]
- if self.exp is not None:
- decoded["exp"] = self.exp
- decoded["iat"] = self.exp - expire_in
- else:
- decoded["iat"] = int(timezone.now().timestamp())
- decoded["exp"] = decoded["iat"] + expire_in
- decoded["groups"] = self.user_groups
- if self.user_permissions:
- decoded["resource_access"][self.client_id] = {
- "roles": self.user_permissions
- }
- return decoded
-
-
-def mock_keycloak(
- mocker, auth_success=True, exp=None, user_groups=[], user_permissions=[]
-):
- kc_oidc_mock = KeycloackOpenIDConnectMock(
- auth_success, exp, user_groups, user_permissions
- )
- mock_get_oidc_client = mocker.patch("swh.web.auth.views.get_oidc_client")
- mock_get_oidc_client.return_value = kc_oidc_mock
- mocker.patch("swh.web.auth.backends._oidc_client", kc_oidc_mock)
- return kc_oidc_mock
diff --git a/swh/web/tests/auth/sample_data.py b/swh/web/tests/auth/sample_data.py
deleted file mode 100644
--- a/swh/web/tests/auth/sample_data.py
+++ /dev/null
@@ -1,128 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-realm_public_key = (
- "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnqF4xvGjaI54P6WtJvyGayxP8A93u"
- "NcA3TH6jitwmyAalj8dN8/NzK9vrdlSA3Ibvp/XQujPSOP7a35YiYFscEJnogTXQpE/FhZrUY"
- "y21U6ezruVUv4z/ER1cYLb+q5ZI86nXSTNCAbH+lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy"
- "5o6ke1G3fXbNSXwF4qlWAzo1o7Ms8qNrNyOG8FPx24dvm9xMH7/08IPvh9KUqlnP8h6olpxHr"
- "drX/q4E+Nzj8Tr8p7Z5CimInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0/LO2zYie"
- "Hl5Lv7Iig4FOIXIVCaDGQIDAQAB"
-)
-
-oidc_profile = {
- "access_token": (
- # decoded token:
- # {'acr': '1',
- # 'allowed-origins': ['*'],
- # 'aud': ['swh-web', 'account'],
- # 'auth_time': 1582723101,
- # 'azp': 'swh-web',
- # 'email': 'john.doe@example.com',
- # 'email_verified': False,
- # 'exp': 1592396202,
- # 'family_name': 'Doe',
- # 'given_name': 'John',
- # 'groups': ['/staff'],
- # 'iat': 1592395601,
- # 'iss': 'http://localhost:8080/auth/realms/SoftwareHeritage',
- # 'jti': '31fc50b7-bbe5-4f51-91ef-8e3eec51331e',
- # 'name': 'John Doe',
- # 'nbf': 0,
- # 'preferred_username': 'johndoe',
- # 'realm_access': {'roles': ['offline_access', 'uma_authorization']},
- # 'resource_access': {'account': {'roles': ['manage-account',
- # 'manage-account-links',
- # 'view-profile']}},
- # 'scope': 'openid email profile',
- # 'session_state': 'd82b90d1-0a94-4e74-ad66-dd95341c7b6d',
- # 'sub': 'feacd344-b468-4a65-a236-14f61e6b7200',
- # 'typ': 'Bearer'
- # }
- "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV"
- "Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0."
- "eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz"
- "MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz"
- "MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs"
- "bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj"
- "b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2"
- "MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi"
- "YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy"
- "YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi"
- "MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6"
- "eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0"
- "aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl"
- "cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz"
- "Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg"
- "cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv"
- "aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi"
- "am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi"
- "OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-"
- "Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB"
- "AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO"
- "kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc"
- "HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl"
- "rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE"
- "oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ"
- "6A"
- ),
- "expires_in": 600,
- "id_token": (
- "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0"
- "TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki"
- "OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi"
- "OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo"
- "dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp"
- "dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh"
- "NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13"
- "ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk"
- "ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx"
- "IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn"
- "cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2"
- "ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi"
- "am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee"
- "JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL"
- "aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_"
- "PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE"
- "0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN"
- "ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX"
- "ZbYnitD1Typ6Q"
- ),
- "not-before-policy": 0,
- "refresh_expires_in": 1800,
- "refresh_token": (
- "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM"
- "zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk"
- "iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC"
- "JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL"
- "CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv"
- "U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q"
- "6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj"
- "oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid"
- "HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi"
- "OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ"
- "2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl"
- "sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic"
- "mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu"
- "YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc"
- "tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG"
- "UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI"
- ),
- "scope": "openid email profile",
- "session_state": "d82b90d1-0a94-4e74-ad66-dd95341c7b6d",
- "token_type": "bearer",
-}
-
-userinfo = {
- "email": "john.doe@example.com",
- "email_verified": False,
- "family_name": "Doe",
- "given_name": "John",
- "groups": ["/staff"],
- "name": "John Doe",
- "preferred_username": "johndoe",
- "sub": "feacd344-b468-4a65-a236-14f61e6b7200",
-}
diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py
--- a/swh/web/tests/auth/test_api_auth.py
+++ b/swh/web/tests/auth/test_api_auth.py
@@ -11,19 +11,15 @@
from swh.web.common.utils import reverse
from swh.web.tests.utils import check_api_get_responses, check_http_get_response
-from . import sample_data
-from .keycloak_mock import mock_keycloak
-
@pytest.mark.django_db
-def test_drf_django_session_auth_success(mocker, client):
+def test_drf_django_session_auth_success(keycloak_mock, client):
"""
Check user gets authenticated when querying the web api
through a web browser.
"""
url = reverse("api-1-stat-counters")
- mock_keycloak(mocker)
client.login(code="", code_verifier="", redirect_uri="")
response = check_http_get_response(client, url, status_code=200)
@@ -38,16 +34,16 @@
@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_success(mocker, api_client):
+def test_drf_oidc_bearer_token_auth_success(keycloak_mock, api_client):
"""
Check user gets authenticated when querying the web api
through an HTTP client using bearer token authentication.
"""
url = reverse("api-1-stat-counters")
- refresh_token = sample_data.oidc_profile["refresh_token"]
+ oidc_profile = keycloak_mock.login()
+ refresh_token = oidc_profile["refresh_token"]
- mock_keycloak(mocker)
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
response = check_api_get_responses(api_client, url, status_code=200)
@@ -62,13 +58,14 @@
@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_failure(mocker, api_client):
+def test_drf_oidc_bearer_token_auth_failure(keycloak_mock, api_client):
url = reverse("api-1-stat-counters")
- refresh_token = sample_data.oidc_profile["refresh_token"]
+ oidc_profile = keycloak_mock.login()
+ refresh_token = oidc_profile["refresh_token"]
# check for failed authentication but with expected token format
- mock_keycloak(mocker, auth_success=False)
+ keycloak_mock.set_auth_success(False)
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
response = check_api_get_responses(api_client, url, status_code=403)
@@ -85,10 +82,11 @@
assert isinstance(request.user, AnonymousUser)
-def test_drf_oidc_auth_invalid_or_missing_authorization_type(api_client):
+def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_mock, api_client):
url = reverse("api-1-stat-counters")
- refresh_token = sample_data.oidc_profile["refresh_token"]
+ oidc_profile = keycloak_mock.login()
+ refresh_token = oidc_profile["refresh_token"]
# missing authorization type
api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}")
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
--- a/swh/web/tests/auth/test_backends.py
+++ b/swh/web/tests/auth/test_backends.py
@@ -16,9 +16,6 @@
from swh.web.auth.models import OIDCUser
from swh.web.common.utils import reverse
-from . import sample_data
-from .keycloak_mock import mock_keycloak
-
def _authenticate_user(request_factory):
request = request_factory.get(reverse("oidc-login-complete"))
@@ -48,17 +45,18 @@
@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_success(mocker, request_factory):
+def test_oidc_code_pkce_auth_backend_success(keycloak_mock, request_factory):
"""
Checks successful login based on OpenID Connect with PKCE extension
Django authentication backend (login from Web UI).
"""
- kc_oidc_mock = mock_keycloak(mocker, user_groups=["/staff"])
- oidc_profile = sample_data.oidc_profile
+ keycloak_mock.user_groups = ["/staff"]
+
+ oidc_profile = keycloak_mock.login()
user = _authenticate_user(request_factory)
- decoded_token = kc_oidc_mock.decode_token(user.access_token)
- _check_authenticated_user(user, decoded_token, kc_oidc_mock)
+ decoded_token = keycloak_mock.decode_token(user.access_token)
+ _check_authenticated_user(user, decoded_token, keycloak_mock)
auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
@@ -81,12 +79,12 @@
@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory):
+def test_oidc_code_pkce_auth_backend_failure(keycloak_mock, request_factory):
"""
Checks failed login based on OpenID Connect with PKCE extension Django
authentication backend (login from Web UI).
"""
- mock_keycloak(mocker, auth_success=False)
+ keycloak_mock.set_auth_success(False)
user = _authenticate_user(request_factory)
@@ -94,18 +92,19 @@
@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_refresh_token_success(mocker, request_factory):
+def test_oidc_code_pkce_auth_backend_refresh_token_success(
+ keycloak_mock, request_factory
+):
"""
Checks access token renewal success using refresh token.
"""
- kc_oidc_mock = mock_keycloak(mocker)
- oidc_profile = sample_data.oidc_profile
- decoded_token = kc_oidc_mock.decode_token(oidc_profile["access_token"])
+ oidc_profile = keycloak_mock.login()
+ decoded_token = keycloak_mock.decode_token(oidc_profile["access_token"])
new_access_token = "new_access_token"
def _refresh_token(refresh_token):
- oidc_profile = dict(sample_data.oidc_profile)
+ oidc_profile = dict(keycloak_mock.login())
oidc_profile["access_token"] = new_access_token
return oidc_profile
@@ -115,25 +114,25 @@
else:
return decoded_token
- kc_oidc_mock.decode_token = Mock()
- kc_oidc_mock.decode_token.side_effect = _decode_token
- kc_oidc_mock.refresh_token.side_effect = _refresh_token
+ keycloak_mock.decode_token = Mock()
+ keycloak_mock.decode_token.side_effect = _decode_token
+ keycloak_mock.refresh_token.side_effect = _refresh_token
user = _authenticate_user(request_factory)
- kc_oidc_mock.refresh_token.assert_called_with(
- sample_data.oidc_profile["refresh_token"]
- )
+ oidc_profile = keycloak_mock.login()
+ keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"])
assert user is not None
@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_refresh_token_failure(mocker, request_factory):
+def test_oidc_code_pkce_auth_backend_refresh_token_failure(
+ keycloak_mock, request_factory
+):
"""
Checks access token renewal failure using refresh token.
"""
- kc_oidc_mock = mock_keycloak(mocker)
def _refresh_token(refresh_token):
raise Exception("OIDC session has expired")
@@ -141,27 +140,26 @@
def _decode_token(access_token):
raise Exception("access token token has expired")
- kc_oidc_mock.decode_token = Mock()
- kc_oidc_mock.decode_token.side_effect = _decode_token
- kc_oidc_mock.refresh_token.side_effect = _refresh_token
+ keycloak_mock.decode_token = Mock()
+ keycloak_mock.decode_token.side_effect = _decode_token
+ keycloak_mock.refresh_token.side_effect = _refresh_token
user = _authenticate_user(request_factory)
- kc_oidc_mock.refresh_token.assert_called_with(
- sample_data.oidc_profile["refresh_token"]
- )
+ oidc_profile = keycloak_mock.login()
+ keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"])
assert user is None
@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory):
+def test_oidc_code_pkce_auth_backend_permissions(keycloak_mock, request_factory):
"""
Checks that a permission defined with OpenID Connect is correctly mapped
to a Django one when logging from Web UI.
"""
permission = "webapp.some-permission"
- mock_keycloak(mocker, user_permissions=[permission])
+ keycloak_mock.user_permissions = [permission]
user = _authenticate_user(request_factory)
assert user.has_perm(permission)
assert user.get_all_permissions() == {permission}
@@ -171,7 +169,7 @@
@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory):
+def test_drf_oidc_bearer_token_auth_backend_success(keycloak_mock, api_request_factory):
"""
Checks successful login based on OpenID Connect bearer token Django REST
Framework authentication backend (Web API login).
@@ -179,23 +177,22 @@
url = reverse("api-1-stat-counters")
drf_auth_backend = OIDCBearerTokenAuthentication()
- kc_oidc_mock = mock_keycloak(mocker)
-
- refresh_token = sample_data.oidc_profile["refresh_token"]
- access_token = sample_data.oidc_profile["access_token"]
+ oidc_profile = keycloak_mock.login()
+ refresh_token = oidc_profile["refresh_token"]
+ access_token = oidc_profile["access_token"]
- decoded_token = kc_oidc_mock.decode_token(access_token)
+ decoded_token = keycloak_mock.decode_token(access_token)
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
user, _ = drf_auth_backend.authenticate(request)
- _check_authenticated_user(user, decoded_token, kc_oidc_mock)
+ _check_authenticated_user(user, decoded_token, keycloak_mock)
# oidc_profile is not filled when authenticating through bearer token
assert hasattr(user, "access_token") and user.access_token is None
@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory):
+def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_mock, api_request_factory):
"""
Checks failed login based on OpenID Connect bearer token Django REST
Framework authentication backend (Web API login).
@@ -203,10 +200,12 @@
url = reverse("api-1-stat-counters")
drf_auth_backend = OIDCBearerTokenAuthentication()
+ oidc_profile = keycloak_mock.login()
+
# simulate a failed authentication with a bearer token in expected format
- mock_keycloak(mocker, auth_success=False)
+ keycloak_mock.set_auth_success(False)
- refresh_token = sample_data.oidc_profile["refresh_token"]
+ refresh_token = oidc_profile["refresh_token"]
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
@@ -222,7 +221,7 @@
drf_auth_backend.authenticate(request)
-def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory):
+def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_mock, api_request_factory):
"""
Checks failed login based on OpenID Connect bearer token Django REST
Framework authentication backend (Web API login) due to invalid
@@ -231,7 +230,8 @@
url = reverse("api-1-stat-counters")
drf_auth_backend = OIDCBearerTokenAuthentication()
- refresh_token = sample_data.oidc_profile["refresh_token"]
+ oidc_profile = keycloak_mock.login()
+ refresh_token = oidc_profile["refresh_token"]
# Invalid authorization type
request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token")
@@ -247,16 +247,19 @@
@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_permissions(mocker, api_request_factory):
+def test_drf_oidc_bearer_token_auth_backend_permissions(
+ keycloak_mock, api_request_factory
+):
"""
Checks that a permission defined with OpenID Connect is correctly mapped
to a Django one when using bearer token authentication.
"""
permission = "webapp.some-permission"
- mock_keycloak(mocker, user_permissions=[permission])
+ keycloak_mock.user_permissions = [permission]
drf_auth_backend = OIDCBearerTokenAuthentication()
- refresh_token = sample_data.oidc_profile["refresh_token"]
+ oidc_profile = keycloak_mock.login()
+ refresh_token = oidc_profile["refresh_token"]
url = reverse("api-1-stat-counters")
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
user, _ = drf_auth_backend.authenticate(request)
diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py
--- a/swh/web/tests/auth/test_middlewares.py
+++ b/swh/web/tests/auth/test_middlewares.py
@@ -12,18 +12,16 @@
from swh.web.common.utils import reverse
from swh.web.tests.utils import check_html_get_response
-from .keycloak_mock import mock_keycloak
-
@pytest.mark.django_db
@modify_settings(
MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]}
)
-def test_oidc_session_expired_middleware_disabled(client, mocker):
+def test_oidc_session_expired_middleware_disabled(client, keycloak_mock):
# authenticate user
- kc_oidc_mock = mock_keycloak(mocker)
+
client.login(code="", code_verifier="", redirect_uri="")
- kc_oidc_mock.authorization_code.assert_called()
+ keycloak_mock.authorization_code.assert_called()
url = reverse("swh-web-homepage")
@@ -38,11 +36,10 @@
@pytest.mark.django_db
-def test_oidc_session_expired_middleware_enabled(client, mocker):
+def test_oidc_session_expired_middleware_enabled(client, keycloak_mock):
# authenticate user
- kc_oidc_mock = mock_keycloak(mocker)
client.login(code="", code_verifier="", redirect_uri="")
- kc_oidc_mock.authorization_code.assert_called()
+ keycloak_mock.authorization_code.assert_called()
url = reverse("swh-web-homepage")
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
--- a/swh/web/tests/auth/test_views.py
+++ b/swh/web/tests/auth/test_views.py
@@ -24,9 +24,6 @@
)
from swh.web.urls import _default_view as homepage_view
-from . import sample_data
-from .keycloak_mock import mock_keycloak
-
def _check_oidc_login_code_flow_data(
request, response, kc_oidc_mock, redirect_uri, scope="openid"
@@ -62,14 +59,11 @@
@pytest.mark.django_db
-def test_oidc_login_views_success(client, mocker):
+def test_oidc_login_views_success(client, keycloak_mock):
"""
Simulate a successful login authentication with OpenID Connect
authorization code flow with PKCE.
"""
- # mock Keycloak client
- kc_oidc_mock = mock_keycloak(mocker)
-
# user initiates login process
login_url = reverse("oidc-login")
@@ -83,7 +77,7 @@
login_data = _check_oidc_login_code_flow_data(
request,
response,
- kc_oidc_mock,
+ keycloak_mock,
redirect_uri=reverse("oidc-login-complete", request=request),
)
@@ -120,15 +114,13 @@
@pytest.mark.django_db
-def test_oidc_logout_view_success(client, mocker):
+def test_oidc_logout_view_success(client, keycloak_mock):
"""
Simulate a successful logout operation with OpenID Connect.
"""
- # mock Keycloak client
- kc_oidc_mock = mock_keycloak(mocker)
# login our test user
client.login(code="", code_verifier="", redirect_uri="")
- kc_oidc_mock.authorization_code.assert_called()
+ keycloak_mock.authorization_code.assert_called()
# user initiates logout
oidc_logout_url = reverse("oidc-logout")
@@ -141,19 +133,19 @@
assert response["location"] == request.build_absolute_uri(logout_url)
# should have been logged out in Keycloak
- kc_oidc_mock.logout.assert_called_with(sample_data.oidc_profile["refresh_token"])
+ oidc_profile = keycloak_mock.login()
+ keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"])
# check effective logout in Django
assert isinstance(request.user, AnonymousUser)
@pytest.mark.django_db
-def test_oidc_login_view_failure(client, mocker):
+def test_oidc_login_view_failure(client, keycloak_mock):
"""
Simulate a failed authentication with OpenID Connect.
"""
- # mock Keycloak client
- mock_keycloak(mocker, auth_success=False)
+ keycloak_mock.set_auth_success(False)
# user initiates login process
login_url = reverse("oidc-login")
@@ -209,10 +201,7 @@
assert isinstance(request.user, AnonymousUser)
-def test_oidc_login_complete_wrong_csrf_token(client, mocker):
- # mock Keycloak client
- mock_keycloak(mocker)
-
+def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock):
# simulate login process has been initialized
session = client.session
session["login_data"] = {
@@ -242,9 +231,8 @@
@pytest.mark.django_db
-def test_oidc_login_complete_wrong_code_verifier(client, mocker):
- # mock Keycloak client
- mock_keycloak(mocker, auth_success=False)
+def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock):
+ keycloak_mock.set_auth_success(False)
# simulate login process has been initialized
session = client.session
@@ -274,17 +262,15 @@
@pytest.mark.django_db
-def test_oidc_logout_view_failure(client, mocker):
+def test_oidc_logout_view_failure(client, keycloak_mock):
"""
Simulate a failed logout operation with OpenID Connect.
"""
- # mock Keycloak client
- kc_oidc_mock = mock_keycloak(mocker)
# login our test user
client.login(code="", code_verifier="", redirect_uri="")
err_msg = "Authentication server error"
- kc_oidc_mock.logout.side_effect = Exception(err_msg)
+ keycloak_mock.logout.side_effect = Exception(err_msg)
# user initiates logout process
logout_url = reverse("oidc-logout")
@@ -373,13 +359,12 @@
@pytest.mark.django_db
-def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker):
+def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock):
"""
Authenticated user should be able to generate a bearer token using OIDC
Authorization Code Flow.
"""
- kc_oidc_mock = mock_keycloak(mocker)
- _generate_and_test_bearer_token(client, kc_oidc_mock)
+ _generate_and_test_bearer_token(client, keycloak_mock)
def test_oidc_list_bearer_tokens_anonymous_user(client):
@@ -393,15 +378,14 @@
@pytest.mark.django_db
-def test_oidc_list_bearer_tokens(client, mocker):
+def test_oidc_list_bearer_tokens(client, keycloak_mock):
"""
User with correct credentials should be allowed to list his tokens.
"""
- kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
for _ in range(nb_tokens):
- _generate_and_test_bearer_token(client, kc_oidc_mock)
+ _generate_and_test_bearer_token(client, keycloak_mock)
url = reverse(
"oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10}
@@ -426,15 +410,14 @@
@pytest.mark.django_db
-def test_oidc_get_bearer_token(client, mocker):
+def test_oidc_get_bearer_token(client, keycloak_mock):
"""
User with correct credentials should be allowed to display a token.
"""
- kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
for i in range(nb_tokens):
- token = _generate_and_test_bearer_token(client, kc_oidc_mock)
+ token = _generate_and_test_bearer_token(client, keycloak_mock)
url = reverse("oidc-get-bearer-token")
@@ -457,15 +440,14 @@
@pytest.mark.django_db
-def test_oidc_revoke_bearer_tokens(client, mocker):
+def test_oidc_revoke_bearer_tokens(client, keycloak_mock):
"""
User with correct credentials should be allowed to revoke tokens.
"""
- kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
for _ in range(nb_tokens):
- _generate_and_test_bearer_token(client, kc_oidc_mock)
+ _generate_and_test_bearer_token(client, keycloak_mock)
url = reverse("oidc-revoke-bearer-tokens")
@@ -492,7 +474,7 @@
@pytest.mark.django_db
-def test_oidc_profile_view(client, mocker):
+def test_oidc_profile_view(client, keycloak_mock):
"""
Authenticated users should be able to request the profile page
and link to Keycloak account UI should be present.
@@ -500,7 +482,7 @@
url = reverse("oidc-profile")
kc_config = get_config()["keycloak"]
user_permissions = ["perm1", "perm2"]
- mock_keycloak(mocker, user_permissions=user_permissions)
+ keycloak_mock.user_permissions = user_permissions
client.login(code="", code_verifier="", redirect_uri="")
resp = check_html_get_response(
client, url, status_code=200, template_used="auth/profile.html"
diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py
--- a/swh/web/tests/conftest.py
+++ b/swh/web/tests/conftest.py
@@ -18,11 +18,14 @@
from django.core.cache import cache
from rest_framework.test import APIClient, APIRequestFactory
+from swh.auth.pytest_plugin import keycloak_mock_factory
from swh.model.hashutil import ALGORITHMS, hash_to_bytes
from swh.storage.algos.origin import origin_get_latest_visit_status
from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest
+from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
from swh.web.common import converters
from swh.web.common.typing import OriginVisitInfo
+from swh.web.config import get_config
from swh.web.tests.data import get_tests_data, override_storages
# Used to skip some tests
@@ -356,3 +359,24 @@
ctags = self.idx_storage.content_ctags_get([cnt_id_bytes])
for ctag in ctags:
yield converters.from_swh(ctag, hashess={"id"})
+
+
+_keycloak_config = get_config()["keycloak"]
+
+_keycloak_mock = keycloak_mock_factory(
+ server_url=_keycloak_config["server_url"],
+ realm_name=_keycloak_config["realm_name"],
+ client_id=OIDC_SWH_WEB_CLIENT_ID,
+)
+
+
+@pytest.fixture
+def keycloak_mock(_keycloak_mock, mocker):
+ for oidc_client_factory in (
+ "swh.web.auth.views.get_oidc_client",
+ "swh.web.auth.backends.get_oidc_client",
+ ):
+ mock_get_oidc_client = mocker.patch(oidc_client_factory)
+ mock_get_oidc_client.return_value = _keycloak_mock
+
+ return _keycloak_mock
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 7:29 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216642
Attached To
D5303: auth: Use KeycloakOpenIDConnect class and fixture factory from swh-auth
Event Timeline
Log In to Comment