diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py new file mode 100644 index 0000000..40bbf8c --- /dev/null +++ b/swh/auth/django/utils.py @@ -0,0 +1,57 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Optional + +from swh.auth.django.models import OIDCUser + + +def oidc_user_from_decoded_token( + decoded_token: Dict[str, Any], client_id: Optional[str] = None +) -> OIDCUser: + """Create an OIDCUser out of a decoded token + + Args: + decoded_token: Decoded token Dict + client_id: Optional client id of the keycloak client instance used to decode + the token. If not provided, the permissions will be empty. + + Returns: + The OIDCUser instance + + """ + # compute an integer user identifier for Django User model + # by concatenating all groups of the UUID4 user identifier + # generated by Keycloak and converting it from hex to decimal + user_id = int("".join(decoded_token["sub"].split("-")), 16) + + # create a Django user that will not be saved to database + user = OIDCUser( + id=user_id, + username=decoded_token["preferred_username"], + password="", + first_name=decoded_token["given_name"], + last_name=decoded_token["family_name"], + email=decoded_token["email"], + ) + + # set is_staff user property based on groups + if "groups" in decoded_token: + user.is_staff = "/staff" in decoded_token["groups"] + + if client_id: + # extract user permissions if any + resource_access = decoded_token.get("resource_access", {}) + client_resource_access = resource_access.get(client_id, {}) + permissions = client_resource_access.get("roles", []) + else: + permissions = [] + + user.permissions = set(permissions) + + # add user sub to custom User proxy model + user.sub = decoded_token["sub"] + + return user diff --git a/swh/auth/tests/test_utils.py b/swh/auth/tests/test_utils.py new file mode 100644 index 0000000..377da24 --- /dev/null +++ b/swh/auth/tests/test_utils.py @@ -0,0 +1,41 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from copy import copy + +from swh.auth.django.utils import oidc_user_from_decoded_token +from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN + + +def test_oidc_user_from_decoded_token(): + user = oidc_user_from_decoded_token(DECODED_TOKEN) + + assert user.id == 338521271020811424925120118444075479552 + assert user.username == "johndoe" + assert user.password == "" + assert user.first_name == "John" + assert user.last_name == "Doe" + assert user.email == "john.doe@example.com" + assert user.is_staff is False + assert user.permissions == set() + assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200" + + +def test_oidc_user_from_decoded_token2(): + decoded_token = copy(DECODED_TOKEN) + decoded_token["groups"] = ["/staff", "api"] + decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} + + user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) + + assert user.id == 338521271020811424925120118444075479552 + assert user.username == "johndoe" + assert user.password == "" + assert user.first_name == "John" + assert user.last_name == "Doe" + assert user.email == "john.doe@example.com" + assert user.is_staff is True + assert user.permissions == {"read-api"} + assert user.sub == "feacd344-b468-4a65-a236-14f61e6b7200"