diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py index 40bbf8c..31f1d8f 100644 --- a/swh/auth/django/utils.py +++ b/swh/auth/django/utils.py @@ -1,57 +1,97 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from datetime import datetime, timedelta from typing import Any, Dict, Optional from swh.auth.django.models import OIDCUser +from swh.auth.keycloak import KeycloakOpenIDConnect def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token["preferred_username"], password="", first_name=decoded_token["given_name"], last_name=decoded_token["family_name"], email=decoded_token["email"], ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) permissions = client_resource_access.get("roles", []) else: permissions = [] user.permissions = set(permissions) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user + + +def oidc_user_from_profile( + oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] +) -> OIDCUser: + """Initialize an OIDCUser out of an oidc profile dict. + + Args: + oidc_client: KeycloakOpenIDConnect used to discuss with keycloak + oidc_profile: OIDC profile retrieved once connected to keycloak + + Returns: + OIDCUser instance parsed out of the token received. + + """ + + # decode JWT token + decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) + + # create OIDCUser from decoded token + user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) + + # get authentication init datetime + auth_datetime = datetime.fromtimestamp(decoded_token["auth_time"]) + exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) + + # compute OIDC tokens expiration date + oidc_profile["expires_at"] = exp_datetime + oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( + seconds=oidc_profile["refresh_expires_in"] + ) + + # add OIDC profile data to custom User proxy model + for key, val in oidc_profile.items(): + if hasattr(user, key): + setattr(user, key, val) + + return user diff --git a/swh/auth/tests/conftest.py b/swh/auth/tests/conftest.py new file mode 100644 index 0000000..9172938 --- /dev/null +++ b/swh/auth/tests/conftest.py @@ -0,0 +1,13 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from swh.auth.pytest_plugin import keycloak_mock_factory +from swh.auth.tests.sample_data import CLIENT_ID, REALM_NAME, SERVER_URL + +# keycloak fixture used within tests (cf. test_keycloak.py, test_utils.py) +keycloak_mock = keycloak_mock_factory( + server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, +) diff --git a/swh/auth/tests/test_keycloak.py b/swh/auth/tests/test_keycloak.py index 3bcda9f..2546b5f 100644 --- a/swh/auth/tests/test_keycloak.py +++ b/swh/auth/tests/test_keycloak.py @@ -1,164 +1,151 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy import os from urllib.parse import parse_qs, urlparse from keycloak.exceptions import KeycloakError import pytest import yaml from swh.auth.keycloak import KeycloakOpenIDConnect -from swh.auth.pytest_plugin import keycloak_mock_factory -from swh.auth.tests.sample_data import ( - CLIENT_ID, - DECODED_TOKEN, - OIDC_PROFILE, - REALM_NAME, - SERVER_URL, - USER_INFO, -) +from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, USER_INFO from swh.core.config import read -# Make keycloak fixture to use for tests below. -keycloak_mock = keycloak_mock_factory( - server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, -) - def test_keycloak_well_known(keycloak_mock): well_known_result = keycloak_mock.well_known() assert set(well_known_result.keys()) == { "issuer", "authorization_endpoint", "token_endpoint", "userinfo_endpoint", "end_session_endpoint", "jwks_uri", "token_introspection_endpoint", } def test_keycloak_authorization_url(keycloak_mock): actual_auth_uri = keycloak_mock.authorization_url("http://redirect-uri", foo="bar") expected_auth_url = keycloak_mock.well_known()["authorization_endpoint"] parsed_result = urlparse(actual_auth_uri) assert expected_auth_url.endswith(parsed_result.path) parsed_query = parse_qs(parsed_result.query) assert parsed_query == { "client_id": [CLIENT_ID], "response_type": ["code"], "redirect_uri": ["http://redirect-uri"], "foo": ["bar"], } def test_keycloak_authorization_code_fail(keycloak_mock): "Authorization failure raise error" # Simulate failed authentication with Keycloak keycloak_mock.set_auth_success(False) with pytest.raises(KeycloakError): keycloak_mock.authorization_code("auth-code", "redirect-uri") def test_keycloak_authorization_code(keycloak_mock): actual_response = keycloak_mock.authorization_code("auth-code", "redirect-uri") assert actual_response == OIDC_PROFILE def test_keycloak_refresh_token(keycloak_mock): actual_result = keycloak_mock.refresh_token("refresh-token") assert actual_result == OIDC_PROFILE def test_keycloak_userinfo(keycloak_mock): actual_user_info = keycloak_mock.userinfo("refresh-token") assert actual_user_info == USER_INFO def test_keycloak_logout(keycloak_mock): """Login out does not raise""" keycloak_mock.logout("refresh-token") def test_keycloak_decode_token(keycloak_mock): actual_decoded_data = keycloak_mock.decode_token(OIDC_PROFILE["access_token"]) actual_decoded_data2 = copy(actual_decoded_data) expected_decoded_token = copy(DECODED_TOKEN) for dynamic_valued_key in ["exp", "auth_time"]: actual_decoded_data2.pop(dynamic_valued_key) expected_decoded_token.pop(dynamic_valued_key) assert actual_decoded_data2 == expected_decoded_token def test_keycloak_login(keycloak_mock): actual_response = keycloak_mock.login("username", "password") assert actual_response == OIDC_PROFILE @pytest.fixture def auth_config(): return { "keycloak": { "server_url": "https://auth.swh.org/SWHTest", "realm_name": "SWHTest", "client_id": "client_id", } } @pytest.fixture def auth_config_path(tmp_path, monkeypatch, auth_config): conf_path = os.path.join(tmp_path, "auth.yml") with open(conf_path, "w") as f: f.write(yaml.dump(auth_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path def test_auth_KeycloakOpenIDConnect_from_config(auth_config): """Instantiating keycloak client out of configuration dict is possible """ client = KeycloakOpenIDConnect.from_config(**auth_config) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == auth_config["keycloak"]["client_id"] def test_auth_KeycloakOpenIDConnect_from_configfile(auth_config_path, monkeypatch): """Instantiating keycloak client out of environment variable is possible """ client = KeycloakOpenIDConnect.from_configfile() auth_config = read(auth_config_path) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == auth_config["keycloak"]["client_id"] def test_auth_KeycloakOpenIDConnect_from_configfile_override( auth_config_path, monkeypatch ): """Instantiating keycloak client out of environment variable is possible And caller can override the configuration at calling """ client = KeycloakOpenIDConnect.from_configfile(client_id="foobar") auth_config = read(auth_config_path) assert client.server_url == auth_config["keycloak"]["server_url"] assert client.realm_name == auth_config["keycloak"]["realm_name"] assert client.client_id == "foobar" diff --git a/swh/auth/tests/test_utils.py b/swh/auth/tests/test_utils.py index 377da24..64954b7 100644 --- a/swh/auth/tests/test_utils.py +++ b/swh/auth/tests/test_utils.py @@ -1,41 +1,63 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy +from datetime import datetime -from swh.auth.django.utils import oidc_user_from_decoded_token -from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN +from swh.auth.django.utils import oidc_user_from_decoded_token, oidc_user_from_profile +from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) assert user.id == 338521271020811424925120118444075479552 assert user.username == "johndoe" assert user.password == "" assert user.first_name == "John" assert user.last_name == "Doe" assert user.email == "john.doe@example.com" assert user.is_staff is False assert user.permissions == set() assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" def test_oidc_user_from_decoded_token2(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) assert user.id == 338521271020811424925120118444075479552 assert user.username == "johndoe" assert user.password == "" assert user.first_name == "John" assert user.last_name == "Doe" assert user.email == "john.doe@example.com" assert user.is_staff is True assert user.permissions == {"read-api"} assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" + + +def test_oidc_user_from_profile(keycloak_mock): + date_now = datetime.now() + + user = oidc_user_from_profile(keycloak_mock, OIDC_PROFILE) + + assert user.id == 338521271020811424925120118444075479552 + assert user.username == "johndoe" + assert user.password == "" + assert user.first_name == "John" + assert user.last_name == "Doe" + assert user.email == "john.doe@example.com" + assert user.is_staff is False + assert user.permissions == set() + assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" + + assert isinstance(user.expires_at, datetime) + assert date_now <= user.expires_at + assert isinstance(user.refresh_expires_at, datetime) + assert date_now <= user.refresh_expires_at