Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
index 9c9cfb03..ad4f17f1 100644
--- a/swh/web/auth/backends.py
+++ b/swh/web/auth/backends.py
@@ -1,145 +1,150 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from django.core.cache import cache
from django.http import HttpRequest
from django.utils import timezone
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
import sentry_sdk
from swh.web.auth.keycloak import KeycloakOpenIDConnect
from swh.web.auth.utils import get_oidc_client
from swh.web.auth.models import OIDCUser
# OpenID Connect client to communicate with Keycloak server
_oidc_client: KeycloakOpenIDConnect = get_oidc_client()
def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser:
# compute an integer user identifier for Django User model
# by concatenating all groups of the UUID4 user identifier
# generated by Keycloak and converting it from hex to decimal
user_id = int("".join(decoded_token["sub"].split("-")), 16)
# create a Django user that will not be saved to database
user = OIDCUser(
id=user_id,
username=decoded_token["preferred_username"],
password="",
first_name=decoded_token["given_name"],
last_name=decoded_token["family_name"],
email=decoded_token["email"],
)
# set is_staff user property based on groups
if "groups" in decoded_token:
user.is_staff = "/staff" in decoded_token["groups"]
+ # extract user permissions if any
+ resource_access = decoded_token.get("resource_access", {})
+ client_resource_access = resource_access.get(_oidc_client.client_id, {})
+ user.permissions = set(client_resource_access.get("roles", []))
+
# add user sub to custom User proxy model
user.sub = decoded_token["sub"]
return user
def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser:
# decode JWT token
decoded_token = _oidc_client.decode_token(oidc_profile["access_token"])
# create OIDCUser from decoded token
user = _oidc_user_from_decoded_token(decoded_token)
# get authentication init datetime
auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"])
exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
# compute OIDC tokens expiration date
oidc_profile["expires_at"] = exp_datetime
oidc_profile["refresh_expires_at"] = auth_datetime + timedelta(
seconds=oidc_profile["refresh_expires_in"]
)
# add OIDC profile data to custom User proxy model
for key, val in oidc_profile.items():
if hasattr(user, key):
setattr(user, key, val)
return user
class OIDCAuthorizationCodePKCEBackend:
def authenticate(
self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str
) -> Optional[OIDCUser]:
user = None
try:
# try to authenticate user with OIDC PKCE authorization code flow
oidc_profile = _oidc_client.authorization_code(
code, redirect_uri, code_verifier=code_verifier
)
# create Django user
user = _oidc_user_from_profile(oidc_profile)
# set cache key TTL as access token expiration time
assert user.expires_at
ttl = int(user.expires_at.timestamp() - timezone.now().timestamp())
# save oidc_profile in cache
cache.set(f"oidc_user_{user.id}", oidc_profile, timeout=max(0, ttl))
except Exception as e:
sentry_sdk.capture_exception(e)
return user
def get_user(self, user_id: int) -> Optional[OIDCUser]:
# get oidc profile from cache
oidc_profile = cache.get(f"oidc_user_{user_id}")
if oidc_profile:
try:
user = _oidc_user_from_profile(oidc_profile)
# restore auth backend
setattr(user, "backend", f"{__name__}.{self.__class__.__name__}")
return user
except Exception as e:
sentry_sdk.capture_exception(e)
return None
else:
return None
class OIDCBearerTokenAuthentication(BaseAuthentication):
def authenticate(self, request):
auth_header = request.META.get("HTTP_AUTHORIZATION")
if auth_header is None:
return None
try:
auth_type, token = auth_header.split(" ", 1)
except ValueError:
raise AuthenticationFailed("Invalid HTTP authorization header format")
if auth_type != "Bearer":
raise AuthenticationFailed(
(f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).")
)
try:
# attempt to decode token
decoded_token = _oidc_client.decode_token(token)
# create Django user
user = _oidc_user_from_decoded_token(decoded_token)
except Exception as e:
sentry_sdk.capture_exception(e)
raise AuthenticationFailed(str(e))
return user, None
diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py
index 01783857..94bf8da4 100644
--- a/swh/web/auth/models.py
+++ b/swh/web/auth/models.py
@@ -1,43 +1,80 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
-from typing import Optional
+from typing import Optional, Set
from django.contrib.auth.models import User
class OIDCUser(User):
"""
Custom User proxy model for remote users storing OpenID Connect
related data: profile containing authentication tokens.
The model is also not saved to database as all users are already stored
in the Keycloak one.
"""
# OIDC subject identifier
sub: str = ""
# OIDC tokens and session related data, only relevant when a user
# authenticates from a web browser
access_token: Optional[str] = None
expires_at: Optional[datetime] = None
id_token: Optional[str] = None
refresh_token: Optional[str] = None
refresh_expires_at: Optional[datetime] = None
scope: Optional[str] = None
session_state: Optional[str] = None
+ # User permissions
+ permissions: Set[str]
+
class Meta:
app_label = "swh.web.auth"
proxy = True
def save(self, **kwargs):
"""
Override django.db.models.Model.save to avoid saving the remote
users to web application database.
"""
pass
+
+ def get_group_permissions(self, obj=None) -> Set[str]:
+ """
+ Override django.contrib.auth.models.PermissionsMixin.get_group_permissions
+ to get permissions from OIDC
+ """
+ return self.get_all_permissions(obj)
+
+ def get_all_permissions(self, obj=None) -> Set[str]:
+ """
+ Override django.contrib.auth.models.PermissionsMixin.get_all_permissions
+ to get permissions from OIDC
+ """
+ return self.permissions
+
+ def has_perm(self, perm, obj=None) -> bool:
+ """
+ Override django.contrib.auth.models.PermissionsMixin.has_perm
+ to check permission from OIDC
+ """
+ if self.is_active and self.is_superuser:
+ return True
+
+ return perm in self.permissions
+
+ def has_module_perms(self, app_label) -> bool:
+ """
+ Override django.contrib.auth.models.PermissionsMixin.has_module_perms
+ to check permissions from OIDC.
+ """
+ if self.is_active and self.is_superuser:
+ return True
+
+ return any(perm.startswith(app_label) for perm in self.permissions)
diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py
index 3cbe9448..779ea359 100644
--- a/swh/web/tests/auth/keycloak_mock.py
+++ b/swh/web/tests/auth/keycloak_mock.py
@@ -1,98 +1,110 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import copy
from unittest.mock import Mock
from django.utils import timezone
from swh.web.auth.keycloak import KeycloakOpenIDConnect
from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
from swh.web.config import get_config
from .sample_data import oidc_profile, realm_public_key, userinfo
class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
- def __init__(self, auth_success=True, exp=None):
+ def __init__(
+ self, auth_success=True, exp=None, user_groups=[], user_permissions=[]
+ ):
swhweb_config = get_config()
super().__init__(
swhweb_config["keycloak"]["server_url"],
swhweb_config["keycloak"]["realm_name"],
OIDC_SWH_WEB_CLIENT_ID,
)
self.auth_success = auth_success
self.exp = exp
+ self.user_groups = user_groups
+ self.user_permissions = user_permissions
self._keycloak.public_key = lambda: realm_public_key
self._keycloak.well_know = lambda: {
"issuer": f"{self.server_url}realms/{self.realm_name}",
"authorization_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/auth"
),
"token_endpoint": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/token"
),
"token_introspection_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/token/"
"introspect"
),
"userinfo_endpoint": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/userinfo"
),
"end_session_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/logout"
),
"jwks_uri": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/certs"
),
}
self.authorization_code = Mock()
self.userinfo = Mock()
self.logout = Mock()
if auth_success:
self.authorization_code.return_value = copy(oidc_profile)
self.userinfo.return_value = copy(userinfo)
else:
self.authorization_url = Mock()
exception = Exception("Authentication failed")
self.authorization_code.side_effect = exception
self.authorization_url.side_effect = exception
self.userinfo.side_effect = exception
self.logout.side_effect = exception
def decode_token(self, token):
options = {}
if self.auth_success:
# skip signature expiration check as we use a static oidc_profile
# for the tests with expired tokens in it
options["verify_exp"] = False
decoded = super().decode_token(token, options)
# tweak auth and exp time for tests
expire_in = decoded["exp"] - decoded["auth_time"]
if self.exp is not None:
decoded["exp"] = self.exp
decoded["auth_time"] = self.exp - expire_in
else:
decoded["auth_time"] = int(timezone.now().timestamp())
decoded["exp"] = decoded["auth_time"] + expire_in
- decoded["groups"] = ["/staff"]
+ decoded["groups"] = self.user_groups
+ if self.user_permissions:
+ decoded["resource_access"][self.client_id] = {
+ "roles": self.user_permissions
+ }
return decoded
-def mock_keycloak(mocker, auth_success=True, exp=None):
- kc_oidc_mock = KeycloackOpenIDConnectMock(auth_success, exp)
+def mock_keycloak(
+ mocker, auth_success=True, exp=None, user_groups=[], user_permissions=[]
+):
+ kc_oidc_mock = KeycloackOpenIDConnectMock(
+ auth_success, exp, user_groups, user_permissions
+ )
mock_get_oidc_client = mocker.patch("swh.web.auth.views.get_oidc_client")
mock_get_oidc_client.return_value = kc_oidc_mock
mocker.patch("swh.web.auth.backends._oidc_client", kc_oidc_mock)
return kc_oidc_mock
diff --git a/swh/web/tests/auth/sample_data.py b/swh/web/tests/auth/sample_data.py
index 02cd843e..b22d7b30 100644
--- a/swh/web/tests/auth/sample_data.py
+++ b/swh/web/tests/auth/sample_data.py
@@ -1,101 +1,128 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
realm_public_key = (
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnqF4xvGjaI54P6WtJvyGayxP8A93u"
"NcA3TH6jitwmyAalj8dN8/NzK9vrdlSA3Ibvp/XQujPSOP7a35YiYFscEJnogTXQpE/FhZrUY"
"y21U6ezruVUv4z/ER1cYLb+q5ZI86nXSTNCAbH+lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy"
"5o6ke1G3fXbNSXwF4qlWAzo1o7Ms8qNrNyOG8FPx24dvm9xMH7/08IPvh9KUqlnP8h6olpxHr"
"drX/q4E+Nzj8Tr8p7Z5CimInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0/LO2zYie"
"Hl5Lv7Iig4FOIXIVCaDGQIDAQAB"
)
oidc_profile = {
"access_token": (
+ # decoded token:
+ # {'acr': '1',
+ # 'allowed-origins': ['*'],
+ # 'aud': ['swh-web', 'account'],
+ # 'auth_time': 1592395601,
+ # 'azp': 'swh-web',
+ # 'email': 'john.doe@example.com',
+ # 'email_verified': False,
+ # 'exp': 1592396202,
+ # 'family_name': 'Doe',
+ # 'given_name': 'John',
+ # 'groups': ['/staff'],
+ # 'iat': 1582723101,
+ # 'iss': 'http://localhost:8080/auth/realms/SoftwareHeritage',
+ # 'jti': '31fc50b7-bbe5-4f51-91ef-8e3eec51331e',
+ # 'name': 'John Doe',
+ # 'nbf': 0,
+ # 'preferred_username': 'johndoe',
+ # 'realm_access': {'roles': ['offline_access', 'uma_authorization']},
+ # 'resource_access': {'account': {'roles': ['manage-account',
+ # 'manage-account-links',
+ # 'view-profile']}},
+ # 'scope': 'openid email profile',
+ # 'session_state': 'd82b90d1-0a94-4e74-ad66-dd95341c7b6d',
+ # 'sub': 'feacd344-b468-4a65-a236-14f61e6b7200',
+ # 'typ': 'Bearer'
+ # }
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV"
"Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0."
"eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz"
"MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz"
"MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs"
"bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj"
"b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2"
"MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi"
"YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy"
"YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi"
"MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6"
"eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0"
"aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl"
"cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz"
"Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg"
"cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv"
"aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi"
"am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi"
"OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-"
"Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB"
"AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO"
"kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc"
"HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl"
"rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE"
"oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ"
"6A"
),
"expires_in": 600,
"id_token": (
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0"
"TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki"
"OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi"
"OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo"
"dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp"
"dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh"
"NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13"
"ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk"
"ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx"
"IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn"
"cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2"
"ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi"
"am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee"
"JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL"
"aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_"
"PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE"
"0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN"
"ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX"
"ZbYnitD1Typ6Q"
),
"not-before-policy": 0,
"refresh_expires_in": 1800,
"refresh_token": (
"eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM"
"zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk"
"iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC"
"JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL"
"CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv"
"U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q"
"6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj"
"oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid"
"HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi"
"OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ"
"2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl"
"sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic"
"mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu"
"YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc"
"tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG"
"UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI"
),
"scope": "openid email profile",
"session_state": "d82b90d1-0a94-4e74-ad66-dd95341c7b6d",
"token_type": "bearer",
}
userinfo = {
"email": "john.doe@example.com",
"email_verified": False,
"family_name": "Doe",
"given_name": "John",
"groups": ["/staff"],
"name": "John Doe",
"preferred_username": "johndoe",
"sub": "feacd344-b468-4a65-a236-14f61e6b7200",
}
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
index f2acf805..b34a9159 100644
--- a/swh/web/tests/auth/test_backends.py
+++ b/swh/web/tests/auth/test_backends.py
@@ -1,148 +1,181 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
from django.contrib.auth import authenticate, get_backends
import pytest
from django.conf import settings
from rest_framework.exceptions import AuthenticationFailed
from swh.web.auth.backends import OIDCBearerTokenAuthentication
from swh.web.auth.models import OIDCUser
from swh.web.common.utils import reverse
from . import sample_data
from .keycloak_mock import mock_keycloak
def _authenticate_user(request_factory):
request = request_factory.get(reverse("oidc-login-complete"))
return authenticate(
request=request,
code="some-code",
code_verifier="some-code-verifier",
redirect_uri="https://localhost:5004",
)
-def _check_authenticated_user(user, decoded_token):
+def _check_authenticated_user(user, decoded_token, kc_oidc_mock):
assert user is not None
assert isinstance(user, OIDCUser)
assert user.id != 0
assert user.username == decoded_token["preferred_username"]
assert user.password == ""
assert user.first_name == decoded_token["given_name"]
assert user.last_name == decoded_token["family_name"]
assert user.email == decoded_token["email"]
assert user.is_staff == ("/staff" in decoded_token["groups"])
assert user.sub == decoded_token["sub"]
+ resource_access = decoded_token.get("resource_access", {})
+ resource_access_client = resource_access.get(kc_oidc_mock, {})
+ assert user.permissions == set(resource_access_client.get("roles", []))
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_success(mocker, request_factory):
- kc_oidc_mock = mock_keycloak(mocker)
+ kc_oidc_mock = mock_keycloak(mocker, user_groups=["/staff"])
oidc_profile = sample_data.oidc_profile
user = _authenticate_user(request_factory)
decoded_token = kc_oidc_mock.decode_token(user.access_token)
- _check_authenticated_user(user, decoded_token)
+ _check_authenticated_user(user, decoded_token, kc_oidc_mock)
auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"])
exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
refresh_exp_datetime = auth_datetime + timedelta(
seconds=oidc_profile["refresh_expires_in"]
)
assert user.access_token == oidc_profile["access_token"]
assert user.expires_at == exp_datetime
assert user.id_token == oidc_profile["id_token"]
assert user.refresh_token == oidc_profile["refresh_token"]
assert user.refresh_expires_at == refresh_exp_datetime
assert user.scope == oidc_profile["scope"]
assert user.session_state == oidc_profile["session_state"]
backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend"
assert user.backend == backend_path
backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path)
assert get_backends()[backend_idx].get_user(user.id) == user
@pytest.mark.django_db
def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory):
mock_keycloak(mocker, auth_success=False)
user = _authenticate_user(request_factory)
assert user is None
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory):
+ permission = "webapp.some-permission"
+ mock_keycloak(mocker, user_permissions=[permission])
+ user = _authenticate_user(request_factory)
+ assert user.has_perm(permission)
+ assert user.get_all_permissions() == {permission}
+ assert user.get_group_permissions() == {permission}
+ assert user.has_module_perms("webapp")
+ assert not user.has_module_perms("foo")
+
+
@pytest.mark.django_db
def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory):
url = reverse("api-1-stat-counters")
drf_auth_backend = OIDCBearerTokenAuthentication()
kc_oidc_mock = mock_keycloak(mocker)
access_token = sample_data.oidc_profile["access_token"]
decoded_token = kc_oidc_mock.decode_token(access_token)
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}")
user, _ = drf_auth_backend.authenticate(request)
- _check_authenticated_user(user, decoded_token)
+ _check_authenticated_user(user, decoded_token, kc_oidc_mock)
# oidc_profile is not filled when authenticating through bearer token
assert hasattr(user, "access_token") and user.access_token is None
@pytest.mark.django_db
def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory):
url = reverse("api-1-stat-counters")
drf_auth_backend = OIDCBearerTokenAuthentication()
# simulate a failed authentication with a bearer token in expected format
mock_keycloak(mocker, auth_success=False)
access_token = sample_data.oidc_profile["access_token"]
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}")
with pytest.raises(AuthenticationFailed):
drf_auth_backend.authenticate(request)
# simulate a failed authentication with an invalid bearer token format
mock_keycloak(mocker)
request = api_request_factory.get(
url, HTTP_AUTHORIZATION="Bearer invalid-token-format"
)
with pytest.raises(AuthenticationFailed):
drf_auth_backend.authenticate(request)
def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory):
url = reverse("api-1-stat-counters")
drf_auth_backend = OIDCBearerTokenAuthentication()
access_token = sample_data.oidc_profile["access_token"]
# Invalid authorization type
request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token")
with pytest.raises(AuthenticationFailed):
drf_auth_backend.authenticate(request)
# Missing authorization type
request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{access_token}")
with pytest.raises(AuthenticationFailed):
drf_auth_backend.authenticate(request)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_permissions(mocker, api_request_factory):
+ permission = "webapp.some-permission"
+ mock_keycloak(mocker, user_permissions=[permission])
+
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+ access_token = sample_data.oidc_profile["access_token"]
+ url = reverse("api-1-stat-counters")
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {access_token}")
+ user, _ = drf_auth_backend.authenticate(request)
+
+ assert user.has_perm(permission)
+ assert user.get_all_permissions() == {permission}
+ assert user.get_group_permissions() == {permission}
+ assert user.has_module_perms("webapp")
+ assert not user.has_module_perms("foo")

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:38 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276912

Event Timeline