diff --git a/requirements-test.txt b/requirements-test.txt index edec91d..e3285f3 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,6 @@ django-stubs djangorestframework-stubs pytest pytest-django +pytest-mock requests_mock diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py index 93087fb..b59073e 100644 --- a/swh/auth/django/utils.py +++ b/swh/auth/django/utils.py @@ -1,97 +1,129 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from typing import Any, Dict, Optional +from django.conf import settings + from swh.auth.django.models import OIDCUser from swh.auth.keycloak import KeycloakOpenIDConnect def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token.get("preferred_username", ""), password="", first_name=decoded_token.get("given_name", ""), last_name=decoded_token.get("family_name", ""), email=decoded_token.get("email", ""), ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) permissions = client_resource_access.get("roles", []) else: permissions = [] user.permissions = set(permissions) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def oidc_user_from_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] ) -> OIDCUser: """Initialize an OIDCUser out of an oidc profile dict. Args: oidc_client: KeycloakOpenIDConnect used to discuss with keycloak oidc_profile: OIDC profile retrieved once connected to keycloak Returns: OIDCUser instance parsed out of the token received. """ # decode JWT token decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user + + +def keycloak_oidc_client() -> KeycloakOpenIDConnect: + """ + Instantiate a KeycloakOpenIDConnect class from the following django settings: + + * KEYCLOAK_SERVER_URL + * KEYCLOAK_REALM_NAME + * KEYCLOAK_CLIENT_ID + + Returns: + An object to ease the interaction with the Keycloak server + + Raises: + ValueError: at least one mandatory django setting is not set + """ + + server_url = getattr(settings, "KEYCLOAK_SERVER_URL", None) + realm_name = getattr(settings, "KEYCLOAK_REALM_NAME", None) + client_id = getattr(settings, "KEYCLOAK_CLIENT_ID", None) + + if server_url is None or realm_name is None or client_id is None: + raise ValueError( + "KEYCLOAK_SERVER_URL, KEYCLOAK_REALM_NAME and KEYCLOAK_CLIENT_ID django " + "settings are mandatory to instantiate KeycloakOpenIDConnect class" + ) + + return KeycloakOpenIDConnect( + server_url=server_url, realm_name=realm_name, client_id=client_id + ) diff --git a/swh/auth/tests/test_utils.py b/swh/auth/tests/test_utils.py index 61e345d..28c0f85 100644 --- a/swh/auth/tests/test_utils.py +++ b/swh/auth/tests/test_utils.py @@ -1,89 +1,121 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime +from django.test import override_settings import pytest -from swh.auth.django.utils import oidc_user_from_decoded_token, oidc_user_from_profile -from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE +from swh.auth.django.utils import ( + keycloak_oidc_client, + oidc_user_from_decoded_token, + oidc_user_from_profile, +) +from swh.auth.tests.sample_data import ( + CLIENT_ID, + DECODED_TOKEN, + OIDC_PROFILE, + REALM_NAME, + SERVER_URL, +) def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) assert user.id == 338521271020811424925120118444075479552 assert user.username == "johndoe" assert user.password == "" assert user.first_name == "John" assert user.last_name == "Doe" assert user.email == "john.doe@example.com" assert user.is_staff is False assert user.permissions == set() assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" def test_oidc_user_from_decoded_token2(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) assert user.id == 338521271020811424925120118444075479552 assert user.username == "johndoe" assert user.password == "" assert user.first_name == "John" assert user.last_name == "Doe" assert user.email == "john.doe@example.com" assert user.is_staff is True assert user.permissions == {"read-api"} assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" @pytest.mark.parametrize( "key,mapped_key", [ ("preferred_username", "username"), ("given_name", "first_name"), ("family_name", "last_name"), ("email", "email"), ], ) def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): decoded_token = copy(DECODED_TOKEN) decoded_token.pop(key, None) user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) assert user.id == 338521271020811424925120118444075479552 assert user.password == "" assert user.is_staff is False assert user.permissions == set() assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" # Ensure the missing field is mapped to an empty value assert getattr(user, mapped_key) == "" def test_oidc_user_from_profile(keycloak_mock): date_now = datetime.now() user = oidc_user_from_profile(keycloak_mock, OIDC_PROFILE) assert user.id == 338521271020811424925120118444075479552 assert user.username == "johndoe" assert user.password == "" assert user.first_name == "John" assert user.last_name == "Doe" assert user.email == "john.doe@example.com" assert user.is_staff is False assert user.permissions == set() assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" assert isinstance(user.expires_at, datetime) assert date_now <= user.expires_at assert isinstance(user.refresh_expires_at, datetime) assert date_now <= user.refresh_expires_at + + +def test_keycloak_oidc_client_missing_django_settings(): + + with pytest.raises(ValueError, match="settings are mandatory"): + keycloak_oidc_client() + + +@override_settings( + KEYCLOAK_SERVER_URL=SERVER_URL, + KEYCLOAK_REALM_NAME=REALM_NAME, + KEYCLOAK_CLIENT_ID=CLIENT_ID, +) +def test_keycloak_oidc_client_parameters_from_django_settings(mocker): + mocker.patch("swh.auth.keycloak.KeycloakOpenID") + + kc_oidc_client = keycloak_oidc_client() + + assert kc_oidc_client.server_url == SERVER_URL + assert kc_oidc_client.realm_name == REALM_NAME + assert kc_oidc_client.client_id == CLIENT_ID