diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py index f956c75..b0457ab 100644 --- a/swh/auth/pytest_plugin.py +++ b/swh/auth/pytest_plugin.py @@ -1,190 +1,195 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime, timezone import json from typing import Dict, List, Optional from unittest.mock import Mock from keycloak.exceptions import KeycloakError import pytest from swh.auth.keycloak import KeycloakOpenIDConnect from swh.auth.tests.sample_data import OIDC_PROFILE, RAW_REALM_PUBLIC_KEY, USER_INFO class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): """Mock KeycloakOpenIDConnect class to allow testing Args: server_url: Server main auth url (cf. :py:data:`swh.auth.tests.sample_data.SERVER_URL`) realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`) client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`) auth_success: boolean flag to simulate authentication success or failure exp: expiration delay user_groups: user groups configuration (if any) user_permissions: user permissions configuration (if any) oidc_profile: Dict response from a call to a token authentication query (cf. :py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`) user_info: Dict response from a call to userinfo query (cf. :py:data:`swh.auth.tests.sample_data.USER_INFO`) raw_realm_public_key: A raw ascii text representing the realm public key (cf. :py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`) """ def __init__( self, server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], user_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): super().__init__( server_url=server_url, realm_name=realm_name, client_id=client_id ) self.exp = exp self.user_groups = user_groups self.user_permissions = user_permissions self._keycloak.public_key = lambda: raw_realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.set_auth_success(auth_success, oidc_profile, user_info) def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration and audience checks as we use a static # oidc_profile for the tests with expired tokens in it options["verify_exp"] = False options["verify_aud"] = False decoded = super().decode_token(token, options) # Merge the user info configured to be part of the decode token userinfo = self.userinfo() if userinfo is not None: decoded = {**decoded, **userinfo} # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["iat"] if self.exp is not None: decoded["exp"] = self.exp decoded["iat"] = self.exp - expire_in else: now = int(datetime.now(tz=timezone.utc).timestamp()) decoded["iat"] = now decoded["exp"] = now + expire_in decoded["groups"] = self.user_groups decoded["aud"] = [self.client_id, "account"] decoded["azp"] = self.client_id if self.user_permissions: decoded["resource_access"][self.client_id] = { "roles": self.user_permissions } return decoded def set_auth_success( self, auth_success: bool, oidc_profile: Optional[Dict] = None, user_info: Optional[Dict] = None, ) -> None: # following type ignore because mypy is not too happy about affecting mock to # method "Cannot assign to a method affecting mock". Ignore for now. self.authorization_code = Mock() # type: ignore self.refresh_token = Mock() # type: ignore self.login = Mock() # type: ignore self.userinfo = Mock() # type: ignore self.logout = Mock() # type: ignore self.auth_success = auth_success if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.refresh_token.return_value = copy(oidc_profile) self.login.return_value = copy(oidc_profile) self.userinfo.return_value = copy(user_info) else: self.authorization_url = Mock() # type: ignore error = { "error": "invalid_grant", "error_description": "Invalid user credentials", } error_message = json.dumps(error).encode() exception = KeycloakError(error_message=error_message, response_code=401) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception self.login.side_effect = exception -def keycloak_mock_factory( +def keycloak_oidc_factory( server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], user_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): """Keycloak mock fixture factory. Report to :py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring. """ @pytest.fixture - def keycloak_open_id_connect(): + def keycloak_oidc(): return KeycloackOpenIDConnectMock( server_url=server_url, realm_name=realm_name, client_id=client_id, auth_success=auth_success, exp=exp, user_groups=user_groups, user_permissions=user_permissions, oidc_profile=oidc_profile, user_info=user_info, raw_realm_public_key=raw_realm_public_key, ) - return keycloak_open_id_connect + return keycloak_oidc + + +# for backward compatibility +# TODO: remove that alias once swh-deposit and swh-web use new function name +keycloak_mock_factory = keycloak_oidc_factory diff --git a/swh/auth/tests/conftest.py b/swh/auth/tests/conftest.py index 9172938..b73f7b1 100644 --- a/swh/auth/tests/conftest.py +++ b/swh/auth/tests/conftest.py @@ -1,13 +1,13 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.auth.pytest_plugin import keycloak_mock_factory +from swh.auth.pytest_plugin import keycloak_oidc_factory from swh.auth.tests.sample_data import CLIENT_ID, REALM_NAME, SERVER_URL # keycloak fixture used within tests (cf. test_keycloak.py, test_utils.py) -keycloak_mock = keycloak_mock_factory( +keycloak_oidc = keycloak_oidc_factory( server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, ) diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py index b90ef30..db3567e 100644 --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -1,118 +1,118 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime from django.test import override_settings import pytest from swh.auth.django.utils import ( keycloak_oidc_client, oidc_user_from_decoded_token, oidc_user_from_profile, ) from swh.auth.tests.sample_data import ( CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, REALM_NAME, SERVER_URL, ) def _check_user(user, is_staff=False, permissions=set()): assert user.id > 0 assert user.username == DECODED_TOKEN["preferred_username"] assert user.password == "" assert user.first_name == DECODED_TOKEN["given_name"] assert user.last_name == DECODED_TOKEN["family_name"] assert user.email == DECODED_TOKEN["email"] assert user.is_staff == is_staff assert user.permissions == permissions assert user.sub == DECODED_TOKEN["sub"] date_now = datetime.now() if user.expires_at is not None: assert isinstance(user.expires_at, datetime) assert date_now <= user.expires_at if user.refresh_expires_at is not None: assert isinstance(user.refresh_expires_at, datetime) assert date_now <= user.refresh_expires_at assert user.oidc_profile == { k: getattr(user, k) for k in ( "access_token", "expires_in", "expires_at", "id_token", "refresh_token", "refresh_expires_in", "refresh_expires_at", "scope", "session_state", ) } def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) _check_user(user) def test_oidc_user_from_decoded_token2(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) _check_user(user, is_staff=True, permissions={"read-api"}) @pytest.mark.parametrize( "key,mapped_key", [ ("preferred_username", "username"), ("given_name", "first_name"), ("family_name", "last_name"), ("email", "email"), ], ) def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): decoded_token = copy(DECODED_TOKEN) decoded_token.pop(key, None) user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) # Ensure the missing field is mapped to an empty value assert getattr(user, mapped_key) == "" -def test_oidc_user_from_profile(keycloak_mock): - user = oidc_user_from_profile(keycloak_mock, OIDC_PROFILE) +def test_oidc_user_from_profile(keycloak_oidc): + user = oidc_user_from_profile(keycloak_oidc, OIDC_PROFILE) _check_user(user) def test_keycloak_oidc_client_missing_django_settings(): with pytest.raises(ValueError, match="settings are mandatory"): keycloak_oidc_client() @override_settings( KEYCLOAK_SERVER_URL=SERVER_URL, KEYCLOAK_REALM_NAME=REALM_NAME, KEYCLOAK_CLIENT_ID=CLIENT_ID, ) def test_keycloak_oidc_client_parameters_from_django_settings(mocker): mocker.patch("swh.auth.keycloak.KeycloakOpenID") kc_oidc_client = keycloak_oidc_client() assert kc_oidc_client.server_url == SERVER_URL assert kc_oidc_client.realm_name == REALM_NAME assert kc_oidc_client.client_id == CLIENT_ID diff --git a/swh/auth/tests/test_keycloak.py b/swh/auth/tests/test_keycloak.py index edd1f58..33b7abe 100644 --- a/swh/auth/tests/test_keycloak.py +++ b/swh/auth/tests/test_keycloak.py @@ -1,175 +1,175 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy import json import os from urllib.parse import parse_qs, urlparse from keycloak.exceptions import KeycloakError import pytest import yaml from swh.auth.keycloak import KeycloakOpenIDConnect, keycloak_error_message from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, USER_INFO from swh.core.config import read -def test_keycloak_well_known(keycloak_mock): - well_known_result = keycloak_mock.well_known() +def test_keycloak_oidc_well_known(keycloak_oidc): + well_known_result = keycloak_oidc.well_known() assert set(well_known_result.keys()) == { "issuer", "authorization_endpoint", "token_endpoint", "userinfo_endpoint", "end_session_endpoint", "jwks_uri", "token_introspection_endpoint", } -def test_keycloak_authorization_url(keycloak_mock): - actual_auth_uri = keycloak_mock.authorization_url("http://redirect-uri", foo="bar") +def test_keycloak_oidc_authorization_url(keycloak_oidc): + actual_auth_uri = keycloak_oidc.authorization_url("http://redirect-uri", foo="bar") - expected_auth_url = keycloak_mock.well_known()["authorization_endpoint"] + expected_auth_url = keycloak_oidc.well_known()["authorization_endpoint"] parsed_result = urlparse(actual_auth_uri) assert expected_auth_url.endswith(parsed_result.path) parsed_query = parse_qs(parsed_result.query) assert parsed_query == { "client_id": [CLIENT_ID], "response_type": ["code"], "redirect_uri": ["http://redirect-uri"], "foo": ["bar"], } -def test_keycloak_authorization_code_fail(keycloak_mock): +def test_keycloak_oidc_authorization_code_fail(keycloak_oidc): "Authorization failure raise error" # Simulate failed authentication with Keycloak - keycloak_mock.set_auth_success(False) + keycloak_oidc.set_auth_success(False) with pytest.raises(KeycloakError): - keycloak_mock.authorization_code("auth-code", "redirect-uri") + keycloak_oidc.authorization_code("auth-code", "redirect-uri") with pytest.raises(KeycloakError): - keycloak_mock.login("username", "password") + keycloak_oidc.login("username", "password") -def test_keycloak_authorization_code(keycloak_mock): - actual_response = keycloak_mock.authorization_code("auth-code", "redirect-uri") +def test_keycloak_oidc_authorization_code(keycloak_oidc): + actual_response = keycloak_oidc.authorization_code("auth-code", "redirect-uri") assert actual_response == OIDC_PROFILE -def test_keycloak_refresh_token(keycloak_mock): - actual_result = keycloak_mock.refresh_token("refresh-token") +def test_keycloak_oidc_refresh_token(keycloak_oidc): + actual_result = keycloak_oidc.refresh_token("refresh-token") assert actual_result == OIDC_PROFILE -def test_keycloak_userinfo(keycloak_mock): - actual_user_info = keycloak_mock.userinfo("refresh-token") +def test_keycloak_oidc_userinfo(keycloak_oidc): + actual_user_info = keycloak_oidc.userinfo("refresh-token") assert actual_user_info == USER_INFO -def test_keycloak_logout(keycloak_mock): +def test_keycloak_oidc_logout(keycloak_oidc): """Login out does not raise""" - keycloak_mock.logout("refresh-token") + keycloak_oidc.logout("refresh-token") -def test_keycloak_decode_token(keycloak_mock): - actual_decoded_data = keycloak_mock.decode_token(OIDC_PROFILE["access_token"]) +def test_keycloak_oidc_decode_token(keycloak_oidc): + actual_decoded_data = keycloak_oidc.decode_token(OIDC_PROFILE["access_token"]) actual_decoded_data2 = copy(actual_decoded_data) expected_decoded_token = copy(DECODED_TOKEN) for dynamic_valued_key in ["exp", "iat", "auth_time"]: actual_decoded_data2.pop(dynamic_valued_key, None) expected_decoded_token.pop(dynamic_valued_key, None) assert actual_decoded_data2 == expected_decoded_token -def test_keycloak_login(keycloak_mock): - actual_response = keycloak_mock.login("username", "password") +def test_keycloak_oidc_login(keycloak_oidc): + actual_response = keycloak_oidc.login("username", "password") assert actual_response == OIDC_PROFILE @pytest.fixture def auth_config(): return { "keycloak": { "server_url": "https://auth.swh.org/SWHTest", "realm_name": "SWHTest", "client_id": "client_id", } } @pytest.fixture def auth_config_path(tmp_path, monkeypatch, auth_config): conf_path = os.path.join(tmp_path, "auth.yml") with open(conf_path, "w") as f: f.write(yaml.dump(auth_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path def test_auth_KeycloakOpenIDConnect_from_config(auth_config): """Instantiating keycloak client out of configuration dict is possible """ client = KeycloakOpenIDConnect.from_config(**auth_config) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == auth_config["keycloak"]["client_id"] def test_auth_KeycloakOpenIDConnect_from_configfile(auth_config_path, monkeypatch): """Instantiating keycloak client out of environment variable is possible """ client = KeycloakOpenIDConnect.from_configfile() auth_config = read(auth_config_path) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == auth_config["keycloak"]["client_id"] def test_auth_KeycloakOpenIDConnect_from_configfile_override( auth_config_path, monkeypatch ): """Instantiating keycloak client out of environment variable is possible And caller can override the configuration at calling """ client = KeycloakOpenIDConnect.from_configfile(client_id="foobar") auth_config = read(auth_config_path) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == "foobar" @pytest.mark.parametrize( "error_dict, expected_result", [ ({"error": "unknown_error"}, "unknown_error"), ( {"error": "invalid_grant", "error_description": "Invalid credentials"}, "invalid_grant: Invalid credentials", ), ], ) def test_auth_keycloak_error_message(error_dict, expected_result): """Conversion from KeycloakError to error message should work with detail or not""" error_message = json.dumps(error_dict).encode() exception = KeycloakError(error_message=error_message, response_code=401) actual_result = keycloak_error_message(exception) assert actual_result == expected_result