diff --git a/PKG-INFO b/PKG-INFO index febc005..e8ed227 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.auth -Version: 0.3.3 +Version: 0.3.4 Summary: Software Heritage Authentication Utilities Home-page: https://forge.softwareheritage.org/source/swh-auth/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh- Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-/ Description: swh-auth ========== Authentication library for SWH (keycloak common utilities) Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: django Provides-Extra: testing diff --git a/swh.auth.egg-info/PKG-INFO b/swh.auth.egg-info/PKG-INFO index febc005..e8ed227 100644 --- a/swh.auth.egg-info/PKG-INFO +++ b/swh.auth.egg-info/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.auth -Version: 0.3.3 +Version: 0.3.4 Summary: Software Heritage Authentication Utilities Home-page: https://forge.softwareheritage.org/source/swh-auth/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh- Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-/ Description: swh-auth ========== Authentication library for SWH (keycloak common utilities) Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: django Provides-Extra: testing diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py index 0b71c4e..93087fb 100644 --- a/swh/auth/django/utils.py +++ b/swh/auth/django/utils.py @@ -1,98 +1,97 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from typing import Any, Dict, Optional from swh.auth.django.models import OIDCUser from swh.auth.keycloak import KeycloakOpenIDConnect def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, - username=decoded_token["preferred_username"], + username=decoded_token.get("preferred_username", ""), password="", - first_name=decoded_token["given_name"], - last_name=decoded_token["family_name"], - email=decoded_token["email"], + first_name=decoded_token.get("given_name", ""), + last_name=decoded_token.get("family_name", ""), + email=decoded_token.get("email", ""), ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) permissions = client_resource_access.get("roles", []) else: permissions = [] user.permissions = set(permissions) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def oidc_user_from_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] ) -> OIDCUser: """Initialize an OIDCUser out of an oidc profile dict. Args: oidc_client: KeycloakOpenIDConnect used to discuss with keycloak oidc_profile: OIDC profile retrieved once connected to keycloak Returns: OIDCUser instance parsed out of the token received. """ # decode JWT token decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) # get authentication init datetime - auth_time = decoded_token.get("auth_time", decoded_token["iat"]) - auth_datetime = datetime.fromtimestamp(auth_time) + auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py index 06a6c9d..bb61c69 100644 --- a/swh/auth/pytest_plugin.py +++ b/swh/auth/pytest_plugin.py @@ -1,190 +1,186 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime, timezone from typing import Dict, List, Optional from unittest.mock import Mock from keycloak.exceptions import KeycloakError import pytest from swh.auth.keycloak import KeycloakOpenIDConnect from swh.auth.tests.sample_data import OIDC_PROFILE, RAW_REALM_PUBLIC_KEY, USER_INFO class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): """Mock KeycloakOpenIDConnect class to allow testing Args: server_url: Server main auth url (cf. :py:data:`swh.auth.tests.sample_data.SERVER_URL`) realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`) client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`) auth_success: boolean flag to simulate authentication success or failure exp: expiration delay user_groups: user groups configuration (if any) user_permissions: user permissions configuration (if any) oidc_profile: Dict response from a call to a token authentication query (cf. :py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`) user_info: Dict response from a call to userinfo query (cf. :py:data:`swh.auth.tests.sample_data.USER_INFO`) raw_realm_public_key: A raw ascii text representing the realm public key (cf. :py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`) """ def __init__( self, server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], user_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): super().__init__( server_url=server_url, realm_name=realm_name, client_id=client_id ) self.exp = exp self.user_groups = user_groups self.user_permissions = user_permissions self._keycloak.public_key = lambda: raw_realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.set_auth_success(auth_success, oidc_profile, user_info) def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration and audience checks as we use a static # oidc_profile for the tests with expired tokens in it options["verify_exp"] = False options["verify_aud"] = False decoded = super().decode_token(token, options) # Merge the user info configured to be part of the decode token userinfo = self.userinfo() if userinfo is not None: decoded = {**decoded, **userinfo} # tweak auth and exp time for tests - auth_time = decoded.get("auth_time", decoded["iat"]) - expire_in = decoded["exp"] - auth_time + expire_in = decoded["exp"] - decoded["iat"] if self.exp is not None: decoded["exp"] = self.exp - auth_time = self.exp - expire_in - decoded["iat"] = auth_time - decoded["auth_time"] = auth_time + decoded["iat"] = self.exp - expire_in else: now = int(datetime.now(tz=timezone.utc).timestamp()) decoded["iat"] = now - decoded["auth_time"] = now decoded["exp"] = now + expire_in decoded["groups"] = self.user_groups decoded["aud"] = [self.client_id, "account"] decoded["azp"] = self.client_id if self.user_permissions: decoded["resource_access"][self.client_id] = { "roles": self.user_permissions } return decoded def set_auth_success( self, auth_success: bool, oidc_profile: Optional[Dict] = None, user_info: Optional[Dict] = None, ) -> None: # following type ignore because mypy is not too happy about affecting mock to # method "Cannot assign to a method affecting mock". Ignore for now. self.authorization_code = Mock() # type: ignore self.refresh_token = Mock() # type: ignore self.login = Mock() # type: ignore self.userinfo = Mock() # type: ignore self.logout = Mock() # type: ignore self.auth_success = auth_success if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.refresh_token.return_value = copy(oidc_profile) self.login.return_value = copy(oidc_profile) self.userinfo.return_value = copy(user_info) else: self.authorization_url = Mock() # type: ignore exception = KeycloakError( error_message="Authentication failed", response_code=401 ) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception self.login.side_effect = exception def keycloak_mock_factory( server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], user_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): """Keycloak mock fixture factory. Report to :py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring. """ @pytest.fixture def keycloak_open_id_connect(): return KeycloackOpenIDConnectMock( server_url=server_url, realm_name=realm_name, client_id=client_id, auth_success=auth_success, exp=exp, user_groups=user_groups, user_permissions=user_permissions, oidc_profile=oidc_profile, user_info=user_info, raw_realm_public_key=raw_realm_public_key, ) return keycloak_open_id_connect diff --git a/swh/auth/tests/test_utils.py b/swh/auth/tests/test_utils.py index 64954b7..61e345d 100644 --- a/swh/auth/tests/test_utils.py +++ b/swh/auth/tests/test_utils.py @@ -1,63 +1,89 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime +import pytest + from swh.auth.django.utils import oidc_user_from_decoded_token, oidc_user_from_profile from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) assert user.id == 338521271020811424925120118444075479552 assert user.username == "johndoe" assert user.password == "" assert user.first_name == "John" assert user.last_name == "Doe" assert user.email == "john.doe@example.com" assert user.is_staff is False assert user.permissions == set() assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" def test_oidc_user_from_decoded_token2(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) assert user.id == 338521271020811424925120118444075479552 assert user.username == "johndoe" assert user.password == "" assert user.first_name == "John" assert user.last_name == "Doe" assert user.email == "john.doe@example.com" assert user.is_staff is True assert user.permissions == {"read-api"} assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" +@pytest.mark.parametrize( + "key,mapped_key", + [ + ("preferred_username", "username"), + ("given_name", "first_name"), + ("family_name", "last_name"), + ("email", "email"), + ], +) +def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): + decoded_token = copy(DECODED_TOKEN) + decoded_token.pop(key, None) + + user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) + + assert user.id == 338521271020811424925120118444075479552 + assert user.password == "" + assert user.is_staff is False + assert user.permissions == set() + assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" + # Ensure the missing field is mapped to an empty value + assert getattr(user, mapped_key) == "" + + def test_oidc_user_from_profile(keycloak_mock): date_now = datetime.now() user = oidc_user_from_profile(keycloak_mock, OIDC_PROFILE) assert user.id == 338521271020811424925120118444075479552 assert user.username == "johndoe" assert user.password == "" assert user.first_name == "John" assert user.last_name == "Doe" assert user.email == "john.doe@example.com" assert user.is_staff is False assert user.permissions == set() assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" assert isinstance(user.expires_at, datetime) assert date_now <= user.expires_at assert isinstance(user.refresh_expires_at, datetime) assert date_now <= user.refresh_expires_at