Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py
index 9697c12..fe47b9e 100644
--- a/swh/auth/django/utils.py
+++ b/swh/auth/django/utils.py
@@ -1,189 +1,191 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from django.conf import settings
from django.http import HttpRequest, QueryDict
from django.urls import reverse as django_reverse
from swh.auth.django.models import OIDCUser
from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect
def oidc_user_from_decoded_token(
decoded_token: Dict[str, Any], client_id: Optional[str] = None
) -> OIDCUser:
"""Create an OIDCUser out of a decoded token
Args:
decoded_token: Decoded token Dict
client_id: Optional client id of the keycloak client instance used to decode
the token. If not provided, the permissions will be empty.
Returns:
The OIDCUser instance
"""
# compute an integer user identifier for Django User model
# by concatenating all groups of the UUID4 user identifier
# generated by Keycloak and converting it from hex to decimal
user_id = int("".join(decoded_token["sub"].split("-")), 16)
# create a Django user that will not be saved to database
user = OIDCUser(
id=user_id,
username=decoded_token.get("preferred_username", ""),
password="",
first_name=decoded_token.get("given_name", ""),
last_name=decoded_token.get("family_name", ""),
email=decoded_token.get("email", ""),
)
# set is_staff user property based on groups
if "groups" in decoded_token:
user.is_staff = "/staff" in decoded_token["groups"]
+ realm_access = decoded_token.get("realm_access", {})
+ permissions = realm_access.get("roles", [])
+
if client_id:
# extract user permissions if any
resource_access = decoded_token.get("resource_access", {})
client_resource_access = resource_access.get(client_id, {})
- permissions = client_resource_access.get("roles", [])
- else:
- permissions = []
+ permissions += client_resource_access.get("roles", [])
- user.permissions = set(permissions)
+ # set user permissions and filter out default keycloak realm roles
+ user.permissions = set(permissions) - {"offline_access", "uma_authorization"}
# add user sub to custom User proxy model
user.sub = decoded_token["sub"]
return user
def oidc_user_from_profile(
oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any]
) -> OIDCUser:
"""Initialize an OIDCUser out of an oidc profile dict.
Args:
oidc_client: KeycloakOpenIDConnect used to discuss with keycloak
oidc_profile: OIDC profile retrieved once connected to keycloak
Returns:
OIDCUser instance parsed out of the token received.
"""
# decode JWT token
try:
access_token = oidc_profile["access_token"]
decoded_token = oidc_client.decode_token(access_token)
# access token has expired
except ExpiredSignatureError:
# get a new access token from authentication provider
oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"])
# decode access token
decoded_token = oidc_client.decode_token(oidc_profile["access_token"])
# create OIDCUser from decoded token
user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id)
# get authentication init datetime
auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
# compute OIDC tokens expiration date
oidc_profile["expires_at"] = exp_datetime
oidc_profile["refresh_expires_at"] = auth_datetime + timedelta(
seconds=oidc_profile["refresh_expires_in"]
)
# add OIDC profile data to custom User proxy model
for key, val in oidc_profile.items():
if hasattr(user, key):
setattr(user, key, val)
return user
def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str:
return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}"
def keycloak_oidc_client() -> KeycloakOpenIDConnect:
"""
Instantiate a KeycloakOpenIDConnect class from the following django settings:
* SWH_AUTH_SERVER_URL
* SWH_AUTH_REALM_NAME
* SWH_AUTH_CLIENT_ID
Returns:
An object to ease the interaction with the Keycloak server
Raises:
ValueError: at least one mandatory django setting is not set
"""
server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None)
realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None)
client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None)
if server_url is None or realm_name is None or client_id is None:
raise ValueError(
"SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django "
"settings are mandatory to instantiate KeycloakOpenIDConnect class"
)
return KeycloakOpenIDConnect(
server_url=server_url, realm_name=realm_name, client_id=client_id
)
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[Dict[str, Any]] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py
index e8581a1..330cd45 100644
--- a/swh/auth/pytest_plugin.py
+++ b/swh/auth/pytest_plugin.py
@@ -1,220 +1,226 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import copy
from datetime import datetime, timezone
import json
from typing import Dict, List, Optional
from unittest.mock import Mock
from keycloak.exceptions import KeycloakError
import pytest
from swh.auth.keycloak import KeycloakOpenIDConnect
from swh.auth.tests.sample_data import (
CLIENT_ID,
OIDC_PROFILE,
RAW_REALM_PUBLIC_KEY,
REALM_NAME,
SERVER_URL,
USER_INFO,
)
class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
"""Mock KeycloakOpenIDConnect class to allow testing
Args:
server_url: Server main auth url (cf.
:py:data:`swh.auth.tests.sample_data.SERVER_URL`)
realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`)
client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`)
auth_success: boolean flag to simulate authentication success or failure
exp: expiration delay
user_groups: user groups configuration (if any)
- user_permissions: user permissions configuration (if any)
+ realm_permissions: user permissions configuration at realm level (if any)
+ client_permissions: user permissions configuration at client level (if any)
oidc_profile: Dict response from a call to a token authentication query (cf.
:py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`)
user_info: Dict response from a call to userinfo query (cf.
:py:data:`swh.auth.tests.sample_data.USER_INFO`)
raw_realm_public_key: A raw ascii text representing the realm public key (cf.
:py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`)
"""
def __init__(
self,
server_url: str,
realm_name: str,
client_id: str,
auth_success: bool = True,
exp: Optional[int] = None,
user_groups: List[str] = [],
- user_permissions: List[str] = [],
+ realm_permissions: List[str] = [],
+ client_permissions: List[str] = [],
oidc_profile: Dict = OIDC_PROFILE,
user_info: Dict = USER_INFO,
raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY,
):
super().__init__(
server_url=server_url, realm_name=realm_name, client_id=client_id
)
self.exp = exp
self.user_groups = user_groups
- self.user_permissions = user_permissions
+ self.realm_permissions = realm_permissions
+ self.client_permissions = client_permissions
self._keycloak.public_key = lambda: raw_realm_public_key
self._keycloak.well_know = lambda: {
"issuer": f"{self.server_url}realms/{self.realm_name}",
"authorization_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/auth"
),
"token_endpoint": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/token"
),
"token_introspection_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/token/"
"introspect"
),
"userinfo_endpoint": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/userinfo"
),
"end_session_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/logout"
),
"jwks_uri": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/certs"
),
}
self.set_auth_success(auth_success, oidc_profile, user_info)
def decode_token(self, token):
options = {}
if self.auth_success:
# skip signature expiration and audience checks as we use a static
# oidc_profile for the tests with expired tokens in it
options["verify_exp"] = False
options["verify_aud"] = False
decoded = super().decode_token(token, options)
# Merge the user info configured to be part of the decode token
userinfo = self.userinfo()
if userinfo is not None:
decoded = {**decoded, **userinfo}
# tweak auth and exp time for tests
expire_in = decoded["exp"] - decoded["iat"]
if self.exp is not None:
decoded["exp"] = self.exp
decoded["iat"] = self.exp - expire_in
else:
now = int(datetime.now(tz=timezone.utc).timestamp())
decoded["iat"] = now
decoded["exp"] = now + expire_in
decoded["groups"] = self.user_groups
decoded["aud"] = [self.client_id, "account"]
decoded["azp"] = self.client_id
- if self.user_permissions:
+ decoded["realm_access"]["roles"] += self.realm_permissions
+ if self.client_permissions:
decoded["resource_access"][self.client_id] = {
- "roles": self.user_permissions
+ "roles": self.client_permissions
}
return decoded
def set_auth_success(
self,
auth_success: bool,
oidc_profile: Optional[Dict] = None,
user_info: Optional[Dict] = None,
) -> None:
# following type ignore because mypy is not too happy about affecting mock to
# method "Cannot assign to a method affecting mock". Ignore for now.
self.authorization_code = Mock() # type: ignore
self.refresh_token = Mock() # type: ignore
self.login = Mock() # type: ignore
self.userinfo = Mock() # type: ignore
self.logout = Mock() # type: ignore
self.auth_success = auth_success
if auth_success:
self.authorization_code.return_value = copy(oidc_profile)
self.refresh_token.return_value = copy(oidc_profile)
self.login.return_value = copy(oidc_profile)
self.userinfo.return_value = copy(user_info)
else:
self.authorization_url = Mock() # type: ignore
error = {
"error": "invalid_grant",
"error_description": "Invalid user credentials",
}
error_message = json.dumps(error).encode()
exception = KeycloakError(error_message=error_message, response_code=401)
self.authorization_code.side_effect = exception
self.authorization_url.side_effect = exception
self.refresh_token.side_effect = exception
self.userinfo.side_effect = exception
self.logout.side_effect = exception
self.login.side_effect = exception
def keycloak_oidc_factory(
server_url: str,
realm_name: str,
client_id: str,
auth_success: bool = True,
exp: Optional[int] = None,
user_groups: List[str] = [],
- user_permissions: List[str] = [],
+ realm_permissions: List[str] = [],
+ client_permissions: List[str] = [],
oidc_profile: Dict = OIDC_PROFILE,
user_info: Dict = USER_INFO,
raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY,
):
"""Keycloak mock fixture factory. Report to
:py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring.
"""
@pytest.fixture
def keycloak_oidc():
return KeycloackOpenIDConnectMock(
server_url=server_url,
realm_name=realm_name,
client_id=client_id,
auth_success=auth_success,
exp=exp,
user_groups=user_groups,
- user_permissions=user_permissions,
+ realm_permissions=realm_permissions,
+ client_permissions=client_permissions,
oidc_profile=oidc_profile,
user_info=user_info,
raw_realm_public_key=raw_realm_public_key,
)
return keycloak_oidc
# for backward compatibility
# TODO: remove that alias once swh-deposit and swh-web use new function name
keycloak_mock_factory = keycloak_oidc_factory
# generic keycloak fixture that can be used within tests
# (cf. test_keycloak.py, test_utils.py, django related tests)
# or external modules using that pytest plugin
_keycloak_oidc = keycloak_oidc_factory(
server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID,
)
@pytest.fixture
def keycloak_oidc(_keycloak_oidc, mocker):
for oidc_client_factory in (
"swh.auth.django.views.keycloak_oidc_client",
"swh.auth.django.backends.keycloak_oidc_client",
):
keycloak_oidc_client = mocker.patch(oidc_client_factory)
keycloak_oidc_client.return_value = _keycloak_oidc
return _keycloak_oidc
diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py
index 9b57813..19eabca 100644
--- a/swh/auth/tests/django/test_backends.py
+++ b/swh/auth/tests/django/test_backends.py
@@ -1,256 +1,262 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
from unittest.mock import Mock
from django.conf import settings
from django.contrib.auth import authenticate, get_backends
import pytest
from rest_framework.exceptions import AuthenticationFailed
from swh.auth.django.backends import OIDCBearerTokenAuthentication
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import reverse
from swh.auth.keycloak import ExpiredSignatureError
def _authenticate_user(request_factory):
request = request_factory.get(reverse("root"))
return authenticate(
request=request,
code="some-code",
code_verifier="some-code-verifier",
redirect_uri="https://localhost:5004",
)
def _check_authenticated_user(user, decoded_token, keycloak_oidc):
assert user is not None
assert isinstance(user, OIDCUser)
assert user.id != 0
assert user.username == decoded_token["preferred_username"]
assert user.password == ""
assert user.first_name == decoded_token["given_name"]
assert user.last_name == decoded_token["family_name"]
assert user.email == decoded_token["email"]
assert user.is_staff == ("/staff" in decoded_token["groups"])
assert user.sub == decoded_token["sub"]
resource_access = decoded_token.get("resource_access", {})
resource_access_client = resource_access.get(keycloak_oidc.client_id, {})
assert user.permissions == set(resource_access_client.get("roles", []))
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_success(keycloak_oidc, request_factory):
"""
Checks successful login based on OpenID Connect with PKCE extension
Django authentication backend (login from Web UI).
"""
keycloak_oidc.user_groups = ["/staff"]
oidc_profile = keycloak_oidc.login()
user = _authenticate_user(request_factory)
decoded_token = keycloak_oidc.decode_token(user.access_token)
_check_authenticated_user(user, decoded_token, keycloak_oidc)
auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
refresh_exp_datetime = auth_datetime + timedelta(
seconds=oidc_profile["refresh_expires_in"]
)
assert user.access_token == oidc_profile["access_token"]
assert user.expires_at == exp_datetime
assert user.id_token == oidc_profile["id_token"]
assert user.refresh_token == oidc_profile["refresh_token"]
assert user.refresh_expires_at == refresh_exp_datetime
assert user.scope == oidc_profile["scope"]
assert user.session_state == oidc_profile["session_state"]
backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend"
assert user.backend == backend_path
backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path)
assert get_backends()[backend_idx].get_user(user.id) == user
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_failure(keycloak_oidc, request_factory):
"""
Checks failed login based on OpenID Connect with PKCE extension Django
authentication backend (login from Web UI).
"""
keycloak_oidc.set_auth_success(False)
user = _authenticate_user(request_factory)
assert user is None
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_refresh_token_success(
keycloak_oidc, request_factory
):
"""
Checks access token renewal success using refresh token.
"""
oidc_profile = keycloak_oidc.login()
decoded_token = keycloak_oidc.decode_token(oidc_profile["access_token"])
keycloak_oidc.decode_token = Mock()
keycloak_oidc.decode_token.side_effect = [
ExpiredSignatureError("access token token has expired"),
decoded_token,
]
user = _authenticate_user(request_factory)
oidc_profile = keycloak_oidc.login()
keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"])
assert user is not None
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_refresh_token_failure(
keycloak_oidc, request_factory
):
"""
Checks access token renewal failure using refresh token.
"""
keycloak_oidc.decode_token = Mock()
keycloak_oidc.decode_token.side_effect = ExpiredSignatureError(
"access token token has expired"
)
keycloak_oidc.refresh_token.side_effect = Exception("OIDC session has expired")
user = _authenticate_user(request_factory)
oidc_profile = keycloak_oidc.login()
keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"])
assert user is None
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_permissions(keycloak_oidc, request_factory):
"""
Checks that a permission defined with OpenID Connect is correctly mapped
to a Django one when logging from Web UI.
"""
- permission = "webapp.some-permission"
- keycloak_oidc.user_permissions = [permission]
+ realm_permission = "swh.some-permission"
+ client_permission = "webapp.some-permission"
+ keycloak_oidc.realm_permissions = [realm_permission]
+ keycloak_oidc.client_permissions = [client_permission]
user = _authenticate_user(request_factory)
- assert user.has_perm(permission)
- assert user.get_all_permissions() == {permission}
- assert user.get_group_permissions() == {permission}
+ assert user.has_perm(realm_permission)
+ assert user.has_perm(client_permission)
+ assert user.get_all_permissions() == {realm_permission, client_permission}
+ assert user.get_group_permissions() == {realm_permission, client_permission}
assert user.has_module_perms("webapp")
assert not user.has_module_perms("foo")
@pytest.mark.django_db
def test_drf_oidc_bearer_token_auth_backend_success(keycloak_oidc, api_request_factory):
"""
Checks successful login based on OpenID Connect bearer token Django REST
Framework authentication backend (Web API login).
"""
url = reverse("api-test")
drf_auth_backend = OIDCBearerTokenAuthentication()
oidc_profile = keycloak_oidc.login()
refresh_token = oidc_profile["refresh_token"]
access_token = oidc_profile["access_token"]
decoded_token = keycloak_oidc.decode_token(access_token)
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
user, _ = drf_auth_backend.authenticate(request)
_check_authenticated_user(user, decoded_token, keycloak_oidc)
# oidc_profile is not filled when authenticating through bearer token
assert hasattr(user, "access_token") and user.access_token is None
@pytest.mark.django_db
def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_oidc, api_request_factory):
"""
Checks failed login based on OpenID Connect bearer token Django REST
Framework authentication backend (Web API login).
"""
url = reverse("api-test")
drf_auth_backend = OIDCBearerTokenAuthentication()
oidc_profile = keycloak_oidc.login()
# simulate a failed authentication with a bearer token in expected format
keycloak_oidc.set_auth_success(False)
refresh_token = oidc_profile["refresh_token"]
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
with pytest.raises(AuthenticationFailed):
drf_auth_backend.authenticate(request)
# simulate a failed authentication with an invalid bearer token format
request = api_request_factory.get(
url, HTTP_AUTHORIZATION="Bearer invalid-token-format"
)
with pytest.raises(AuthenticationFailed):
drf_auth_backend.authenticate(request)
def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_oidc, api_request_factory):
"""
Checks failed login based on OpenID Connect bearer token Django REST
Framework authentication backend (Web API login) due to invalid
authorization header value.
"""
url = reverse("api-test")
drf_auth_backend = OIDCBearerTokenAuthentication()
oidc_profile = keycloak_oidc.login()
refresh_token = oidc_profile["refresh_token"]
# Invalid authorization type
request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token")
with pytest.raises(AuthenticationFailed):
drf_auth_backend.authenticate(request)
# Missing authorization type
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}")
with pytest.raises(AuthenticationFailed):
drf_auth_backend.authenticate(request)
@pytest.mark.django_db
def test_drf_oidc_bearer_token_auth_backend_permissions(
keycloak_oidc, api_request_factory
):
"""
Checks that a permission defined with OpenID Connect is correctly mapped
to a Django one when using bearer token authentication.
"""
- permission = "webapp.some-permission"
- keycloak_oidc.user_permissions = [permission]
+ realm_permission = "swh.some-permission"
+ client_permission = "webapp.some-permission"
+ keycloak_oidc.realm_permissions = [realm_permission]
+ keycloak_oidc.client_permissions = [client_permission]
drf_auth_backend = OIDCBearerTokenAuthentication()
oidc_profile = keycloak_oidc.login()
refresh_token = oidc_profile["refresh_token"]
url = reverse("api-test")
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
user, _ = drf_auth_backend.authenticate(request)
- assert user.has_perm(permission)
- assert user.get_all_permissions() == {permission}
- assert user.get_group_permissions() == {permission}
+ assert user.has_perm(realm_permission)
+ assert user.has_perm(client_permission)
+ assert user.get_all_permissions() == {realm_permission, client_permission}
+ assert user.get_group_permissions() == {realm_permission, client_permission}
assert user.has_module_perms("webapp")
assert not user.has_module_perms("foo")
diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py
index a7878d4..c2be128 100644
--- a/swh/auth/tests/django/test_utils.py
+++ b/swh/auth/tests/django/test_utils.py
@@ -1,120 +1,121 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import copy
from datetime import datetime
from django.test import override_settings
import pytest
from swh.auth.django.utils import (
keycloak_oidc_client,
oidc_user_from_decoded_token,
oidc_user_from_profile,
)
from swh.auth.tests.sample_data import (
CLIENT_ID,
DECODED_TOKEN,
OIDC_PROFILE,
REALM_NAME,
SERVER_URL,
)
def _check_user(user, is_staff=False, permissions=set()):
assert user.id > 0
assert user.username == DECODED_TOKEN["preferred_username"]
assert user.password == ""
assert user.first_name == DECODED_TOKEN["given_name"]
assert user.last_name == DECODED_TOKEN["family_name"]
assert user.email == DECODED_TOKEN["email"]
assert user.is_staff == is_staff
assert user.permissions == permissions
assert user.sub == DECODED_TOKEN["sub"]
date_now = datetime.now()
if user.expires_at is not None:
assert isinstance(user.expires_at, datetime)
assert date_now <= user.expires_at
if user.refresh_expires_at is not None:
assert isinstance(user.refresh_expires_at, datetime)
assert date_now <= user.refresh_expires_at
assert user.oidc_profile == {
k: getattr(user, k)
for k in (
"access_token",
"expires_in",
"expires_at",
"id_token",
"refresh_token",
"refresh_expires_in",
"refresh_expires_at",
"scope",
"session_state",
)
}
def test_oidc_user_from_decoded_token():
user = oidc_user_from_decoded_token(DECODED_TOKEN)
_check_user(user)
-def test_oidc_user_from_decoded_token2():
+def test_oidc_user_with_permissions_from_decoded_token():
decoded_token = copy(DECODED_TOKEN)
decoded_token["groups"] = ["/staff", "api"]
+ decoded_token["realm_access"] = {"roles": ["swh.ambassador"]}
decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}}
user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID)
- _check_user(user, is_staff=True, permissions={"read-api"})
+ _check_user(user, is_staff=True, permissions={"swh.ambassador", "read-api"})
@pytest.mark.parametrize(
"key,mapped_key",
[
("preferred_username", "username"),
("given_name", "first_name"),
("family_name", "last_name"),
("email", "email"),
],
)
def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key):
decoded_token = copy(DECODED_TOKEN)
decoded_token.pop(key, None)
user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID)
# Ensure the missing field is mapped to an empty value
assert getattr(user, mapped_key) == ""
def test_oidc_user_from_profile(keycloak_oidc):
user = oidc_user_from_profile(keycloak_oidc, OIDC_PROFILE)
_check_user(user)
@override_settings(
SWH_AUTH_SERVER_URL=None, SWH_AUTH_REALM_NAME=None, SWH_AUTH_CLIENT_ID=None,
)
def test_keycloak_oidc_client_missing_django_settings():
with pytest.raises(ValueError, match="settings are mandatory"):
keycloak_oidc_client()
@override_settings(
SWH_AUTH_SERVER_URL=SERVER_URL,
SWH_AUTH_REALM_NAME=REALM_NAME,
SWH_AUTH_CLIENT_ID=CLIENT_ID,
)
def test_keycloak_oidc_client_parameters_from_django_settings():
kc_oidc_client = keycloak_oidc_client()
assert kc_oidc_client.server_url == SERVER_URL
assert kc_oidc_client.realm_name == REALM_NAME
assert kc_oidc_client.client_id == CLIENT_ID

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:33 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3272142

Event Timeline