diff --git a/swh/auth/__init__.py b/swh/auth/__init__.py index 53e8601..e69de29 100644 --- a/swh/auth/__init__.py +++ b/swh/auth/__init__.py @@ -1,140 +0,0 @@ -# Copyright (C) 2020-2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from typing import Any, Dict, Optional -from urllib.parse import urlencode - -from keycloak import KeycloakOpenID - - -class KeycloakOpenIDConnect: - """ - Wrapper class around python-keycloak to ease the interaction with Keycloak - for managing authentication and user permissions with OpenID Connect. - """ - - def __init__( - self, - server_url: str, - realm_name: str, - client_id: str, - realm_public_key: str = "", - ): - """ - Args: - server_url: URL of the Keycloak server - realm_name: The realm name - client_id: The OpenID Connect client identifier - realm_public_key: The realm public key (will be dynamically - retrieved if not provided) - """ - self._keycloak = KeycloakOpenID( - server_url=server_url, client_id=client_id, realm_name=realm_name, - ) - - self.server_url = server_url - self.realm_name = realm_name - self.client_id = client_id - self.realm_public_key = realm_public_key - - def well_known(self) -> Dict[str, Any]: - """ - Retrieve the OpenID Connect Well-Known URI registry from Keycloak. - - Returns: - A dictionary filled with OpenID Connect URIS. - """ - return self._keycloak.well_know() - - def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: - """ - Get OpenID Connect authorization URL to authenticate users. - - Args: - redirect_uri: URI to redirect to once a user is authenticated - extra_params: Extra query parameters to add to the - authorization URL - """ - auth_url = self._keycloak.auth_url(redirect_uri) - if extra_params: - auth_url += "&%s" % urlencode(extra_params) - return auth_url - - def authorization_code( - self, code: str, redirect_uri: str, **extra_params: str - ) -> Dict[str, Any]: - """ - Get OpenID Connect authentication tokens using Authorization - Code flow. - - Args: - code: Authorization code provided by Keycloak - redirect_uri: URI to redirect to once a user is authenticated - (must be the same as the one provided to authorization_url): - extra_params: Extra parameters to add in the authorization request - payload. - """ - return self._keycloak.token( - grant_type="authorization_code", - code=code, - redirect_uri=redirect_uri, - **extra_params, - ) - - def refresh_token(self, refresh_token: str) -> Dict[str, Any]: - """ - Request a new access token from Keycloak using a refresh token. - - Args: - refresh_token: A refresh token provided by Keycloak - - Returns: - A dictionary filled with tokens info - """ - return self._keycloak.refresh_token(refresh_token) - - def decode_token( - self, token: str, options: Optional[Dict[str, Any]] = None - ) -> Dict[str, Any]: - """ - Try to decode a JWT token. - - Args: - token: A JWT token to decode - options: Options for jose.jwt.decode - - Returns: - A dictionary filled with decoded token content - """ - if not self.realm_public_key: - realm_public_key = self._keycloak.public_key() - self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" - self.realm_public_key += realm_public_key - self.realm_public_key += "\n-----END PUBLIC KEY-----" - - return self._keycloak.decode_token( - token, key=self.realm_public_key, options=options - ) - - def logout(self, refresh_token: str) -> None: - """ - Logout a user by closing its authenticated session. - - Args: - refresh_token: A refresh token provided by Keycloak - """ - self._keycloak.logout(refresh_token) - - def userinfo(self, access_token: str) -> Dict[str, Any]: - """ - Return user information from its access token. - - Args: - access_token: An access token provided by Keycloak - - Returns: - A dictionary fillled with user information - """ - return self._keycloak.userinfo(access_token) diff --git a/swh/auth/__init__.py b/swh/auth/keycloak.py similarity index 100% copy from swh/auth/__init__.py copy to swh/auth/keycloak.py diff --git a/swh/auth/tests/conftest.py b/swh/auth/tests/conftest.py index 151c53f..f5cd5b9 100644 --- a/swh/auth/tests/conftest.py +++ b/swh/auth/tests/conftest.py @@ -1,157 +1,157 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime, timezone from typing import Optional from unittest.mock import Mock from keycloak.exceptions import KeycloakError import pytest -from swh.auth import KeycloakOpenIDConnect +from swh.auth.keycloak import KeycloakOpenIDConnect from .sample_data import ( OIDC_PROFILE, RAW_REALM_PUBLIC_KEY, REALM, SERVER_URL, USER_INFO, ) class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): """Mock KeycloakOpenIDConnect class to allow testing Args: auth_success: boolean flag to simulate authentication success or failure exp: expiration user_groups: user groups configuration (if any) user_permissions: user permissions configuration (if any) """ def __init__( self, server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups=[], user_permissions=[], ): super().__init__( server_url=server_url, realm_name=realm_name, client_id=client_id ) self.exp = exp self.user_groups = user_groups self.user_permissions = user_permissions self._keycloak.public_key = lambda: RAW_REALM_PUBLIC_KEY self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.set_auth_success(auth_success) def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration check as we use a static oidc_profile # for the tests with expired tokens in it options["verify_exp"] = False decoded = super().decode_token(token, options) # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["auth_time"] if self.exp is not None: decoded["exp"] = self.exp decoded["auth_time"] = self.exp - expire_in else: decoded["auth_time"] = int(datetime.now(tz=timezone.utc).timestamp()) decoded["exp"] = decoded["auth_time"] + expire_in decoded["groups"] = self.user_groups if self.user_permissions: decoded["resource_access"][self.client_id] = { "roles": self.user_permissions } return decoded def set_auth_success(self, auth_success: bool) -> None: # following type ignore because mypy is not too happy about affecting mock to # method "Cannot assign to a method affecting mock". Ignore for now. self.authorization_code = Mock() # type: ignore self.refresh_token = Mock() # type: ignore self.userinfo = Mock() # type: ignore self.logout = Mock() # type: ignore self.auth_success = auth_success if auth_success: self.authorization_code.return_value = copy(OIDC_PROFILE) self.refresh_token.return_value = copy(OIDC_PROFILE) self.userinfo.return_value = copy(USER_INFO) else: self.authorization_url = Mock() # type: ignore exception = KeycloakError( error_message="Authentication failed", response_code=401 ) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception def keycloak_mock_factory( server_url=SERVER_URL, realm_name=REALM, client_id="swh-client-id", auth_success=True, exp=None, user_groups=[], user_permissions=[], ): """Keycloak mock fixture factory """ @pytest.fixture def keycloak_open_id_connect(): return KeycloackOpenIDConnectMock( server_url=server_url, realm_name=realm_name, client_id=client_id, auth_success=auth_success, exp=exp, user_groups=user_groups, user_permissions=user_permissions, ) return keycloak_open_id_connect diff --git a/swh/auth/tests/test_auth.py b/swh/auth/tests/test_keycloak.py similarity index 86% rename from swh/auth/tests/test_auth.py rename to swh/auth/tests/test_keycloak.py index 442c916..19226f3 100644 --- a/swh/auth/tests/test_auth.py +++ b/swh/auth/tests/test_keycloak.py @@ -1,86 +1,86 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from urllib.parse import parse_qs, urlparse from keycloak.exceptions import KeycloakError import pytest from swh.auth.tests.conftest import keycloak_mock_factory from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, USER_INFO # dataset we have here is bound to swh-web keycloak_mock = keycloak_mock_factory(client_id=CLIENT_ID) -def test_auth_well_known(keycloak_mock): +def test_keycloak_well_known(keycloak_mock): well_known_result = keycloak_mock.well_known() assert set(well_known_result.keys()) == { "issuer", "authorization_endpoint", "token_endpoint", "userinfo_endpoint", "end_session_endpoint", "jwks_uri", "token_introspection_endpoint", } -def test_auth_authorization_url(keycloak_mock): +def test_keycloak_authorization_url(keycloak_mock): actual_auth_uri = keycloak_mock.authorization_url("http://redirect-uri", foo="bar") expected_auth_url = keycloak_mock.well_known()["authorization_endpoint"] parsed_result = urlparse(actual_auth_uri) assert expected_auth_url.endswith(parsed_result.path) parsed_query = parse_qs(parsed_result.query) assert parsed_query == { "client_id": [CLIENT_ID], "response_type": ["code"], "redirect_uri": ["http://redirect-uri"], "foo": ["bar"], } -def test_auth_authorization_code_fail(keycloak_mock): +def test_keycloak_authorization_code_fail(keycloak_mock): "Authorization failure raise error" # Simulate failed authentication with Keycloak keycloak_mock.set_auth_success(False) with pytest.raises(KeycloakError): keycloak_mock.authorization_code("auth-code", "redirect-uri") -def test_auth_authorization_code(keycloak_mock): +def test_keycloak_authorization_code(keycloak_mock): actual_response = keycloak_mock.authorization_code("auth-code", "redirect-uri") assert actual_response == OIDC_PROFILE -def test_auth_refresh_token(keycloak_mock): +def test_keycloak_refresh_token(keycloak_mock): actual_result = keycloak_mock.refresh_token("refresh-token") assert actual_result == OIDC_PROFILE -def test_auth_userinfo(keycloak_mock): +def test_keycloak_userinfo(keycloak_mock): actual_user_info = keycloak_mock.userinfo("refresh-token") assert actual_user_info == USER_INFO -def test_auth_logout(keycloak_mock): +def test_keycloak_logout(keycloak_mock): """Login out does not raise""" keycloak_mock.logout("refresh-token") -def test_auth_decode_token(keycloak_mock): +def test_keycloak_decode_token(keycloak_mock): actual_decoded_data = keycloak_mock.decode_token(OIDC_PROFILE["access_token"]) actual_decoded_data2 = copy(actual_decoded_data) expected_decoded_token = copy(DECODED_TOKEN) for dynamic_valued_key in ["exp", "auth_time"]: actual_decoded_data2.pop(dynamic_valued_key) expected_decoded_token.pop(dynamic_valued_key) assert actual_decoded_data2 == expected_decoded_token