diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py index 042d0a8..370ba64 100644 --- a/swh/auth/keycloak.py +++ b/swh/auth/keycloak.py @@ -1,225 +1,238 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Optional from urllib.parse import urlencode from keycloak import KeycloakOpenID # The next import is required to allow callers to catch on their own term the following # exception from keycloak.exceptions import KeycloakError # noqa from swh.core.config import load_from_envvar class KeycloakOpenIDConnect: """ Wrapper class around python-keycloak to ease the interaction with Keycloak for managing authentication and user permissions with OpenID Connect. """ def __init__( self, server_url: str, realm_name: str, client_id: str, realm_public_key: str = "", ): """ Args: server_url: URL of the Keycloak server realm_name: The realm name client_id: The OpenID Connect client identifier realm_public_key: The realm public key (will be dynamically retrieved if not provided) """ self._keycloak = KeycloakOpenID( server_url=server_url, client_id=client_id, realm_name=realm_name, ) - self.server_url = server_url - self.realm_name = realm_name - self.client_id = client_id self.realm_public_key = realm_public_key + @property + def realm_name(self): + return self._keycloak.realm_name + + @realm_name.setter + def realm_name(self, value): + self._keycloak.realm_name = value + + @property + def client_id(self): + return self._keycloak.client_id + + @client_id.setter + def client_id(self, value): + self._keycloak.client_id = value + def well_known(self) -> Dict[str, Any]: """ Retrieve the OpenID Connect Well-Known URI registry from Keycloak. Returns: A dictionary filled with OpenID Connect URIS. """ return self._keycloak.well_know() def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: """ Get OpenID Connect authorization URL to authenticate users. Args: redirect_uri: URI to redirect to once a user is authenticated extra_params: Extra query parameters to add to the authorization URL """ auth_url = self._keycloak.auth_url(redirect_uri) if extra_params: auth_url += "&%s" % urlencode(extra_params) return auth_url def authorization_code( self, code: str, redirect_uri: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Authorization Code flow. Raises: KeycloakError in case of authentication failures Args: code: Authorization code provided by Keycloak redirect_uri: URI to redirect to once a user is authenticated (must be the same as the one provided to authorization_url): extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="authorization_code", code=code, redirect_uri=redirect_uri, **extra_params, ) def login( self, username: str, password: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Direct Access Grant flow. Raises: KeycloakError in case of authentication failures Args: username: an existing username in the realm password: password associated to username extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="password", scope="openid", username=username, password=password, **extra_params, ) def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """ Request a new access token from Keycloak using a refresh token. Args: refresh_token: A refresh token provided by Keycloak Returns: A dictionary filled with tokens info """ return self._keycloak.refresh_token(refresh_token) def decode_token( self, token: str, options: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Try to decode a JWT token. Args: token: A JWT token to decode options: Options for jose.jwt.decode Returns: A dictionary filled with decoded token content """ if not self.realm_public_key: realm_public_key = self._keycloak.public_key() self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" self.realm_public_key += realm_public_key self.realm_public_key += "\n-----END PUBLIC KEY-----" return self._keycloak.decode_token( token, key=self.realm_public_key, options=options ) def logout(self, refresh_token: str) -> None: """ Logout a user by closing its authenticated session. Args: refresh_token: A refresh token provided by Keycloak """ self._keycloak.logout(refresh_token) def userinfo(self, access_token: str) -> Dict[str, Any]: """ Return user information from its access token. Args: access_token: An access token provided by Keycloak Returns: A dictionary fillled with user information """ return self._keycloak.userinfo(access_token) @classmethod def from_config(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": """Instantiate a KeycloakOpenIDConnect class from a configuration dict. Args: kwargs: configuration dict for the instance, with one keycloak key, whose value is a Dict with the following keys: - server_url: URL of the Keycloak server - realm_name: The realm name - client_id: The OpenID Connect client identifier Returns: the KeycloakOpenIDConnect instance """ cfg = kwargs["keycloak"] return cls( server_url=cfg["server_url"], realm_name=cfg["realm_name"], client_id=cfg["client_id"], ) @classmethod def from_configfile(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": """Instantiate a KeycloakOpenIDConnect class from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to instantiation call Returns: the KeycloakOpenIDConnect instance """ config = dict(load_from_envvar()).get("keycloak", {}) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(keycloak=config) def keycloak_error_message(keycloak_error: KeycloakError) -> str: """Transform a keycloak exception into an error message. """ msg_dict = json.loads(keycloak_error.error_message.decode()) error_msg = msg_dict["error"] error_desc = msg_dict.get("error_description") if error_desc: error_msg = f"{error_msg}: {error_desc}" return error_msg diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py index db3567e..3853851 100644 --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -1,118 +1,117 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime from django.test import override_settings import pytest from swh.auth.django.utils import ( keycloak_oidc_client, oidc_user_from_decoded_token, oidc_user_from_profile, ) from swh.auth.tests.sample_data import ( CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, REALM_NAME, SERVER_URL, ) def _check_user(user, is_staff=False, permissions=set()): assert user.id > 0 assert user.username == DECODED_TOKEN["preferred_username"] assert user.password == "" assert user.first_name == DECODED_TOKEN["given_name"] assert user.last_name == DECODED_TOKEN["family_name"] assert user.email == DECODED_TOKEN["email"] assert user.is_staff == is_staff assert user.permissions == permissions assert user.sub == DECODED_TOKEN["sub"] date_now = datetime.now() if user.expires_at is not None: assert isinstance(user.expires_at, datetime) assert date_now <= user.expires_at if user.refresh_expires_at is not None: assert isinstance(user.refresh_expires_at, datetime) assert date_now <= user.refresh_expires_at assert user.oidc_profile == { k: getattr(user, k) for k in ( "access_token", "expires_in", "expires_at", "id_token", "refresh_token", "refresh_expires_in", "refresh_expires_at", "scope", "session_state", ) } def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) _check_user(user) def test_oidc_user_from_decoded_token2(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) _check_user(user, is_staff=True, permissions={"read-api"}) @pytest.mark.parametrize( "key,mapped_key", [ ("preferred_username", "username"), ("given_name", "first_name"), ("family_name", "last_name"), ("email", "email"), ], ) def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): decoded_token = copy(DECODED_TOKEN) decoded_token.pop(key, None) user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) # Ensure the missing field is mapped to an empty value assert getattr(user, mapped_key) == "" def test_oidc_user_from_profile(keycloak_oidc): user = oidc_user_from_profile(keycloak_oidc, OIDC_PROFILE) _check_user(user) def test_keycloak_oidc_client_missing_django_settings(): with pytest.raises(ValueError, match="settings are mandatory"): keycloak_oidc_client() @override_settings( KEYCLOAK_SERVER_URL=SERVER_URL, KEYCLOAK_REALM_NAME=REALM_NAME, KEYCLOAK_CLIENT_ID=CLIENT_ID, ) -def test_keycloak_oidc_client_parameters_from_django_settings(mocker): - mocker.patch("swh.auth.keycloak.KeycloakOpenID") +def test_keycloak_oidc_client_parameters_from_django_settings(): kc_oidc_client = keycloak_oidc_client() assert kc_oidc_client.server_url == SERVER_URL assert kc_oidc_client.realm_name == REALM_NAME assert kc_oidc_client.client_id == CLIENT_ID