diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py --- a/swh/auth/django/utils.py +++ b/swh/auth/django/utils.py @@ -47,15 +47,17 @@ if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] + realm_access = decoded_token.get("realm_access", {}) + permissions = realm_access.get("roles", []) + if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) - permissions = client_resource_access.get("roles", []) - else: - permissions = [] + permissions += client_resource_access.get("roles", []) - user.permissions = set(permissions) + # set user permissions and filter out default keycloak realm roles + user.permissions = set(permissions) - {"offline_access", "uma_authorization"} # add user sub to custom User proxy model user.sub = decoded_token["sub"] diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py --- a/swh/auth/pytest_plugin.py +++ b/swh/auth/pytest_plugin.py @@ -34,7 +34,8 @@ auth_success: boolean flag to simulate authentication success or failure exp: expiration delay user_groups: user groups configuration (if any) - user_permissions: user permissions configuration (if any) + realm_permissions: user permissions configuration at realm level (if any) + client_permissions: user permissions configuration at client level (if any) oidc_profile: Dict response from a call to a token authentication query (cf. :py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`) user_info: Dict response from a call to userinfo query (cf. @@ -52,7 +53,8 @@ auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], - user_permissions: List[str] = [], + realm_permissions: List[str] = [], + client_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, @@ -62,7 +64,8 @@ ) self.exp = exp self.user_groups = user_groups - self.user_permissions = user_permissions + self.realm_permissions = realm_permissions + self.client_permissions = client_permissions self._keycloak.public_key = lambda: raw_realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", @@ -121,9 +124,10 @@ decoded["groups"] = self.user_groups decoded["aud"] = [self.client_id, "account"] decoded["azp"] = self.client_id - if self.user_permissions: + decoded["realm_access"]["roles"] += self.realm_permissions + if self.client_permissions: decoded["resource_access"][self.client_id] = { - "roles": self.user_permissions + "roles": self.client_permissions } return decoded @@ -169,7 +173,8 @@ auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], - user_permissions: List[str] = [], + realm_permissions: List[str] = [], + client_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, @@ -188,7 +193,8 @@ auth_success=auth_success, exp=exp, user_groups=user_groups, - user_permissions=user_permissions, + realm_permissions=realm_permissions, + client_permissions=client_permissions, oidc_profile=oidc_profile, user_info=user_info, raw_realm_public_key=raw_realm_public_key, diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py --- a/swh/auth/tests/django/test_backends.py +++ b/swh/auth/tests/django/test_backends.py @@ -143,12 +143,15 @@ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ - permission = "webapp.some-permission" - keycloak_oidc.user_permissions = [permission] + realm_permission = "swh.some-permission" + client_permission = "webapp.some-permission" + keycloak_oidc.realm_permissions = [realm_permission] + keycloak_oidc.client_permissions = [client_permission] user = _authenticate_user(request_factory) - assert user.has_perm(permission) - assert user.get_all_permissions() == {permission} - assert user.get_group_permissions() == {permission} + assert user.has_perm(realm_permission) + assert user.has_perm(client_permission) + assert user.get_all_permissions() == {realm_permission, client_permission} + assert user.get_group_permissions() == {realm_permission, client_permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") @@ -239,8 +242,10 @@ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ - permission = "webapp.some-permission" - keycloak_oidc.user_permissions = [permission] + realm_permission = "swh.some-permission" + client_permission = "webapp.some-permission" + keycloak_oidc.realm_permissions = [realm_permission] + keycloak_oidc.client_permissions = [client_permission] drf_auth_backend = OIDCBearerTokenAuthentication() oidc_profile = keycloak_oidc.login() @@ -249,8 +254,9 @@ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) - assert user.has_perm(permission) - assert user.get_all_permissions() == {permission} - assert user.get_group_permissions() == {permission} + assert user.has_perm(realm_permission) + assert user.has_perm(client_permission) + assert user.get_all_permissions() == {realm_permission, client_permission} + assert user.get_group_permissions() == {realm_permission, client_permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -63,14 +63,15 @@ _check_user(user) -def test_oidc_user_from_decoded_token2(): +def test_oidc_user_with_permissions_from_decoded_token(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] + decoded_token["realm_access"] = {"roles": ["swh.ambassador"]} decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) - _check_user(user, is_staff=True, permissions={"read-api"}) + _check_user(user, is_staff=True, permissions={"swh.ambassador", "read-api"}) @pytest.mark.parametrize(