diff --git a/swh/auth/django/models.py b/swh/auth/django/models.py index 8ff689f..59a41a7 100644 --- a/swh/auth/django/models.py +++ b/swh/auth/django/models.py @@ -1,86 +1,108 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime -from typing import Optional, Set +from typing import Any, Dict, Optional, Set from django.contrib.auth.models import User class OIDCUser(User): """ Custom User proxy model for remote users storing OpenID Connect related data: profile containing authentication tokens. The model is also not saved to database as all users are already stored in the Keycloak one. """ # OIDC subject identifier sub: str = "" # OIDC tokens and session related data, only relevant when a user # authenticates from a web browser access_token: Optional[str] = None + expires_in: Optional[int] = None expires_at: Optional[datetime] = None id_token: Optional[str] = None refresh_token: Optional[str] = None + refresh_expires_in: Optional[int] = None refresh_expires_at: Optional[datetime] = None scope: Optional[str] = None session_state: Optional[str] = None # User permissions permissions: Set[str] class Meta: # TODO: To redefine in subclass of this class # Forced to empty otherwise, django complains about it # "Model class swh.auth.django.OIDCUser doesn't declare an explicit app_label # and isn't in an application in INSTALLED_APPS" app_label = "" proxy = True auto_created = True # prevent model to be created in database by migrations def save(self, **kwargs): """ Override django.db.models.Model.save to avoid saving the remote users to web application database. """ pass def get_group_permissions(self, obj=None) -> Set[str]: """ Override django.contrib.auth.models.PermissionsMixin.get_group_permissions to get permissions from OIDC """ return self.get_all_permissions(obj) def get_all_permissions(self, obj=None) -> Set[str]: """ Override django.contrib.auth.models.PermissionsMixin.get_all_permissions to get permissions from OIDC """ return self.permissions def has_perm(self, perm, obj=None) -> bool: """ Override django.contrib.auth.models.PermissionsMixin.has_perm to check permission from OIDC """ if self.is_active and self.is_superuser: return True return perm in self.permissions def has_module_perms(self, app_label) -> bool: """ Override django.contrib.auth.models.PermissionsMixin.has_module_perms to check permissions from OIDC. """ if self.is_active and self.is_superuser: return True return any(perm.startswith(app_label) for perm in self.permissions) + + @property + def oidc_profile(self) -> Dict[str, Any]: + """ + Returns OpenID Connect profile associated to the user as a dictionary. + """ + return { + k: getattr(self, k) + for k in ( + "access_token", + "expires_in", + "expires_at", + "id_token", + "refresh_token", + "refresh_expires_in", + "refresh_expires_at", + "scope", + "session_state", + ) + } diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py index 28c0f85..b90ef30 100644 --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -1,121 +1,118 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime from django.test import override_settings import pytest from swh.auth.django.utils import ( keycloak_oidc_client, oidc_user_from_decoded_token, oidc_user_from_profile, ) from swh.auth.tests.sample_data import ( CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, REALM_NAME, SERVER_URL, ) +def _check_user(user, is_staff=False, permissions=set()): + assert user.id > 0 + assert user.username == DECODED_TOKEN["preferred_username"] + assert user.password == "" + assert user.first_name == DECODED_TOKEN["given_name"] + assert user.last_name == DECODED_TOKEN["family_name"] + assert user.email == DECODED_TOKEN["email"] + assert user.is_staff == is_staff + assert user.permissions == permissions + assert user.sub == DECODED_TOKEN["sub"] + + date_now = datetime.now() + if user.expires_at is not None: + assert isinstance(user.expires_at, datetime) + assert date_now <= user.expires_at + if user.refresh_expires_at is not None: + assert isinstance(user.refresh_expires_at, datetime) + assert date_now <= user.refresh_expires_at + + assert user.oidc_profile == { + k: getattr(user, k) + for k in ( + "access_token", + "expires_in", + "expires_at", + "id_token", + "refresh_token", + "refresh_expires_in", + "refresh_expires_at", + "scope", + "session_state", + ) + } + + def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) - - assert user.id == 338521271020811424925120118444075479552 - assert user.username == "johndoe" - assert user.password == "" - assert user.first_name == "John" - assert user.last_name == "Doe" - assert user.email == "john.doe@example.com" - assert user.is_staff is False - assert user.permissions == set() - assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" + _check_user(user) def test_oidc_user_from_decoded_token2(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) - assert user.id == 338521271020811424925120118444075479552 - assert user.username == "johndoe" - assert user.password == "" - assert user.first_name == "John" - assert user.last_name == "Doe" - assert user.email == "john.doe@example.com" - assert user.is_staff is True - assert user.permissions == {"read-api"} - assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" + _check_user(user, is_staff=True, permissions={"read-api"}) @pytest.mark.parametrize( "key,mapped_key", [ ("preferred_username", "username"), ("given_name", "first_name"), ("family_name", "last_name"), ("email", "email"), ], ) def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): decoded_token = copy(DECODED_TOKEN) decoded_token.pop(key, None) user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) - assert user.id == 338521271020811424925120118444075479552 - assert user.password == "" - assert user.is_staff is False - assert user.permissions == set() - assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" # Ensure the missing field is mapped to an empty value assert getattr(user, mapped_key) == "" def test_oidc_user_from_profile(keycloak_mock): - date_now = datetime.now() - user = oidc_user_from_profile(keycloak_mock, OIDC_PROFILE) - - assert user.id == 338521271020811424925120118444075479552 - assert user.username == "johndoe" - assert user.password == "" - assert user.first_name == "John" - assert user.last_name == "Doe" - assert user.email == "john.doe@example.com" - assert user.is_staff is False - assert user.permissions == set() - assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" - - assert isinstance(user.expires_at, datetime) - assert date_now <= user.expires_at - assert isinstance(user.refresh_expires_at, datetime) - assert date_now <= user.refresh_expires_at + _check_user(user) def test_keycloak_oidc_client_missing_django_settings(): with pytest.raises(ValueError, match="settings are mandatory"): keycloak_oidc_client() @override_settings( KEYCLOAK_SERVER_URL=SERVER_URL, KEYCLOAK_REALM_NAME=REALM_NAME, KEYCLOAK_CLIENT_ID=CLIENT_ID, ) def test_keycloak_oidc_client_parameters_from_django_settings(mocker): mocker.patch("swh.auth.keycloak.KeycloakOpenID") kc_oidc_client = keycloak_oidc_client() assert kc_oidc_client.server_url == SERVER_URL assert kc_oidc_client.realm_name == REALM_NAME assert kc_oidc_client.client_id == CLIENT_ID