diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py index 72d243c..4d72d63 100644 --- a/swh/auth/keycloak.py +++ b/swh/auth/keycloak.py @@ -1,160 +1,202 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Optional from urllib.parse import urlencode from keycloak import KeycloakOpenID +from swh.core.config import load_from_envvar + class KeycloakOpenIDConnect: """ Wrapper class around python-keycloak to ease the interaction with Keycloak for managing authentication and user permissions with OpenID Connect. """ def __init__( self, server_url: str, realm_name: str, client_id: str, realm_public_key: str = "", ): """ Args: server_url: URL of the Keycloak server realm_name: The realm name client_id: The OpenID Connect client identifier realm_public_key: The realm public key (will be dynamically retrieved if not provided) """ self._keycloak = KeycloakOpenID( server_url=server_url, client_id=client_id, realm_name=realm_name, ) self.server_url = server_url self.realm_name = realm_name self.client_id = client_id self.realm_public_key = realm_public_key def well_known(self) -> Dict[str, Any]: """ Retrieve the OpenID Connect Well-Known URI registry from Keycloak. Returns: A dictionary filled with OpenID Connect URIS. """ return self._keycloak.well_know() def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: """ Get OpenID Connect authorization URL to authenticate users. Args: redirect_uri: URI to redirect to once a user is authenticated extra_params: Extra query parameters to add to the authorization URL """ auth_url = self._keycloak.auth_url(redirect_uri) if extra_params: auth_url += "&%s" % urlencode(extra_params) return auth_url def authorization_code( self, code: str, redirect_uri: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Authorization Code flow. Args: code: Authorization code provided by Keycloak redirect_uri: URI to redirect to once a user is authenticated (must be the same as the one provided to authorization_url): extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="authorization_code", code=code, redirect_uri=redirect_uri, **extra_params, ) def login( self, username: str, password: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Direct Access Grant flow. Args: username: an existing username in the realm password: password associated to username extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="password", scope="openid", username=username, password=password, **extra_params, ) def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """ Request a new access token from Keycloak using a refresh token. Args: refresh_token: A refresh token provided by Keycloak Returns: A dictionary filled with tokens info """ return self._keycloak.refresh_token(refresh_token) def decode_token( self, token: str, options: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Try to decode a JWT token. Args: token: A JWT token to decode options: Options for jose.jwt.decode Returns: A dictionary filled with decoded token content """ if not self.realm_public_key: realm_public_key = self._keycloak.public_key() self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" self.realm_public_key += realm_public_key self.realm_public_key += "\n-----END PUBLIC KEY-----" return self._keycloak.decode_token( token, key=self.realm_public_key, options=options ) def logout(self, refresh_token: str) -> None: """ Logout a user by closing its authenticated session. Args: refresh_token: A refresh token provided by Keycloak """ self._keycloak.logout(refresh_token) def userinfo(self, access_token: str) -> Dict[str, Any]: """ Return user information from its access token. Args: access_token: An access token provided by Keycloak Returns: A dictionary fillled with user information """ return self._keycloak.userinfo(access_token) + + @classmethod + def from_config(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": + """Instantiate a KeycloakOpenIDConnect class from a configuration dict. + + Args: + + kwargs: configuration dict for the instance, with one keycloak key, whose + value is a Dict with the following keys: + - server_url: URL of the Keycloak server + - realm_name: The realm name + - client_id: The OpenID Connect client identifier + + Returns: + the KeycloakOpenIDConnect instance + + """ + cfg = kwargs["keycloak"] + return cls( + server_url=cfg["server_url"], + realm_name=cfg["realm_name"], + client_id=cfg["client_id"], + ) + + @classmethod + def from_configfile(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": + + """Instantiate a KeycloakOpenIDConnect class from the configuration loaded from the + SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their + value is not None. + + Args: + kwargs: kwargs passed to instantiation call + + Returns: + the KeycloakOpenIDConnect instance + """ + config = dict(load_from_envvar()).get("keycloak", {}) + config.update({k: v for k, v in kwargs.items() if v is not None}) + return cls.from_config(keycloak=config) diff --git a/swh/auth/tests/test_keycloak.py b/swh/auth/tests/test_keycloak.py index 5cf774e..3bcda9f 100644 --- a/swh/auth/tests/test_keycloak.py +++ b/swh/auth/tests/test_keycloak.py @@ -1,100 +1,164 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy +import os from urllib.parse import parse_qs, urlparse from keycloak.exceptions import KeycloakError import pytest +import yaml +from swh.auth.keycloak import KeycloakOpenIDConnect from swh.auth.pytest_plugin import keycloak_mock_factory from swh.auth.tests.sample_data import ( CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, REALM_NAME, SERVER_URL, USER_INFO, ) +from swh.core.config import read # Make keycloak fixture to use for tests below. keycloak_mock = keycloak_mock_factory( server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, ) def test_keycloak_well_known(keycloak_mock): well_known_result = keycloak_mock.well_known() assert set(well_known_result.keys()) == { "issuer", "authorization_endpoint", "token_endpoint", "userinfo_endpoint", "end_session_endpoint", "jwks_uri", "token_introspection_endpoint", } def test_keycloak_authorization_url(keycloak_mock): actual_auth_uri = keycloak_mock.authorization_url("http://redirect-uri", foo="bar") expected_auth_url = keycloak_mock.well_known()["authorization_endpoint"] parsed_result = urlparse(actual_auth_uri) assert expected_auth_url.endswith(parsed_result.path) parsed_query = parse_qs(parsed_result.query) assert parsed_query == { "client_id": [CLIENT_ID], "response_type": ["code"], "redirect_uri": ["http://redirect-uri"], "foo": ["bar"], } def test_keycloak_authorization_code_fail(keycloak_mock): "Authorization failure raise error" # Simulate failed authentication with Keycloak keycloak_mock.set_auth_success(False) with pytest.raises(KeycloakError): keycloak_mock.authorization_code("auth-code", "redirect-uri") def test_keycloak_authorization_code(keycloak_mock): actual_response = keycloak_mock.authorization_code("auth-code", "redirect-uri") assert actual_response == OIDC_PROFILE def test_keycloak_refresh_token(keycloak_mock): actual_result = keycloak_mock.refresh_token("refresh-token") assert actual_result == OIDC_PROFILE def test_keycloak_userinfo(keycloak_mock): actual_user_info = keycloak_mock.userinfo("refresh-token") assert actual_user_info == USER_INFO def test_keycloak_logout(keycloak_mock): """Login out does not raise""" keycloak_mock.logout("refresh-token") def test_keycloak_decode_token(keycloak_mock): actual_decoded_data = keycloak_mock.decode_token(OIDC_PROFILE["access_token"]) actual_decoded_data2 = copy(actual_decoded_data) expected_decoded_token = copy(DECODED_TOKEN) for dynamic_valued_key in ["exp", "auth_time"]: actual_decoded_data2.pop(dynamic_valued_key) expected_decoded_token.pop(dynamic_valued_key) assert actual_decoded_data2 == expected_decoded_token def test_keycloak_login(keycloak_mock): actual_response = keycloak_mock.login("username", "password") assert actual_response == OIDC_PROFILE + + +@pytest.fixture +def auth_config(): + return { + "keycloak": { + "server_url": "https://auth.swh.org/SWHTest", + "realm_name": "SWHTest", + "client_id": "client_id", + } + } + + +@pytest.fixture +def auth_config_path(tmp_path, monkeypatch, auth_config): + conf_path = os.path.join(tmp_path, "auth.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(auth_config)) + monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) + return conf_path + + +def test_auth_KeycloakOpenIDConnect_from_config(auth_config): + """Instantiating keycloak client out of configuration dict is possible + + """ + client = KeycloakOpenIDConnect.from_config(**auth_config) + + assert client.server_url == auth_config["keycloak"]["server_url"] + assert client.realm_name == auth_config["keycloak"]["realm_name"] + assert client.client_id == auth_config["keycloak"]["client_id"] + + +def test_auth_KeycloakOpenIDConnect_from_configfile(auth_config_path, monkeypatch): + """Instantiating keycloak client out of environment variable is possible + + """ + client = KeycloakOpenIDConnect.from_configfile() + + auth_config = read(auth_config_path) + + assert client.server_url == auth_config["keycloak"]["server_url"] + assert client.realm_name == auth_config["keycloak"]["realm_name"] + assert client.client_id == auth_config["keycloak"]["client_id"] + + +def test_auth_KeycloakOpenIDConnect_from_configfile_override( + auth_config_path, monkeypatch +): + """Instantiating keycloak client out of environment variable is possible + And caller can override the configuration at calling + + """ + client = KeycloakOpenIDConnect.from_configfile(client_id="foobar") + + auth_config = read(auth_config_path) + + assert client.server_url == auth_config["keycloak"]["server_url"] + assert client.realm_name == auth_config["keycloak"]["realm_name"] + assert client.client_id == "foobar"