diff --git a/mypy.ini b/mypy.ini index 0bfb45b..f2cab72 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,20 +1,25 @@ [mypy] namespace_packages = True warn_unused_ignores = True # support for django magic: https://github.com/typeddjango/django-stubs plugins = mypy_django_plugin.main, mypy_drf_plugin.main [mypy.plugins.django-stubs] django_settings_module = swh.auth.tests.django.app.apptest.settings # 3rd party libraries without stubs (yet) +[mypy-jose.*] +ignore_missing_imports = True + +[mypy-keycloak.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True -[mypy-keycloak.*] -ignore_missing_imports = True + diff --git a/pytest.ini b/pytest.ini index 0e4be09..2f49b76 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,4 @@ [pytest] +addopts = -p no:flask norecursedirs = docs .* DJANGO_SETTINGS_MODULE = swh.auth.tests.django.app.apptest.settings diff --git a/requirements-django.txt b/requirements-django.txt index 9d46956..6f7287b 100644 --- a/requirements-django.txt +++ b/requirements-django.txt @@ -1 +1,3 @@ -Django<3 +django < 3 +sentry-sdk + diff --git a/swh/auth/django/backends.py b/swh/auth/django/backends.py new file mode 100644 index 0000000..8057e05 --- /dev/null +++ b/swh/auth/django/backends.py @@ -0,0 +1,112 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Optional + +from django.core.cache import cache +from django.http import HttpRequest +from django.utils import timezone +import sentry_sdk + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import ( + keycloak_oidc_client, + oidc_profile_cache_key, + oidc_user_from_profile, +) +from swh.auth.keycloak import KeycloakOpenIDConnect + + +def _update_cached_oidc_profile( + oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any], user: OIDCUser +) -> None: + """ + Update cached OIDC profile associated to a user if needed: when the profile + is not stored in cache or when the authentication tokens have changed. + + Args: + oidc_client: KeycloakOpenID wrapper + oidc_profile: OIDC profile used to authenticate a user + user: django model representing the authenticated user + """ + # put OIDC profile in cache or update it after token renewal + cache_key = oidc_profile_cache_key(oidc_client, user.id) + if ( + cache.get(cache_key) is None + or user.access_token != oidc_profile["access_token"] + ): + # set cache key TTL as refresh token expiration time + assert user.refresh_expires_at + ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) + + # save oidc_profile in cache + cache.set(cache_key, user.oidc_profile, timeout=max(0, ttl)) + + +class OIDCAuthorizationCodePKCEBackend: + """ + Django authentication backend using Keycloak OpenID Connect authorization + code flow with PKCE ("Proof Key for Code Exchange"). + + To use that backend globally in your django application, proceed as follow: + + * add ``"swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend"`` + to the ``AUTHENTICATION_BACKENDS`` django setting + + * configure Keycloak URL, realm and client by adding + ``SWH_AUTH_SERVER_URL``, ``SWH_AUTH_REALM_NAME`` and ``SWH_AUTH_CLIENT_ID`` + in django settings + + * add ``swh.auth.django.views.urlpatterns`` to your django application URLs + + * add an HTML link targeting the ``"oidc-login"`` django view in your + application views + + * once a user is logged in, add an HTML link targeting the ``"oidc-logout"`` + django view in your application views (a ``next_path`` query parameter + can be used to redirect to a view of choice once the user is logged out) + + """ + + def authenticate( + self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str + ) -> Optional[OIDCUser]: + + user = None + try: + oidc_client = keycloak_oidc_client() + # try to authenticate user with OIDC PKCE authorization code flow + oidc_profile = oidc_client.authorization_code( + code, redirect_uri, code_verifier=code_verifier + ) + + # create Django user + user = oidc_user_from_profile(oidc_client, oidc_profile) + + # update cached oidc profile if needed + _update_cached_oidc_profile(oidc_client, oidc_profile, user) + + except Exception as e: + sentry_sdk.capture_exception(e) + + return user + + def get_user(self, user_id: int) -> Optional[OIDCUser]: + # get oidc profile from cache + oidc_client = keycloak_oidc_client() + oidc_profile = cache.get(oidc_profile_cache_key(oidc_client, user_id)) + if oidc_profile: + try: + user = oidc_user_from_profile(oidc_client, oidc_profile) + # update cached oidc profile if needed + _update_cached_oidc_profile(oidc_client, oidc_profile, user) + # restore auth backend + setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") + return user + except Exception as e: + sentry_sdk.capture_exception(e) + return None + else: + return None diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py index b59073e..9697c12 100644 --- a/swh/auth/django/utils.py +++ b/swh/auth/django/utils.py @@ -1,129 +1,189 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from typing import Any, Dict, Optional from django.conf import settings +from django.http import HttpRequest, QueryDict +from django.urls import reverse as django_reverse from swh.auth.django.models import OIDCUser -from swh.auth.keycloak import KeycloakOpenIDConnect +from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token.get("preferred_username", ""), password="", first_name=decoded_token.get("given_name", ""), last_name=decoded_token.get("family_name", ""), email=decoded_token.get("email", ""), ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) permissions = client_resource_access.get("roles", []) else: permissions = [] user.permissions = set(permissions) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def oidc_user_from_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] ) -> OIDCUser: """Initialize an OIDCUser out of an oidc profile dict. Args: oidc_client: KeycloakOpenIDConnect used to discuss with keycloak oidc_profile: OIDC profile retrieved once connected to keycloak Returns: OIDCUser instance parsed out of the token received. """ # decode JWT token - decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) + try: + access_token = oidc_profile["access_token"] + decoded_token = oidc_client.decode_token(access_token) + # access token has expired + except ExpiredSignatureError: + # get a new access token from authentication provider + oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) + # decode access token + decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user +def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str: + return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}" + + def keycloak_oidc_client() -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class from the following django settings: - * KEYCLOAK_SERVER_URL - * KEYCLOAK_REALM_NAME - * KEYCLOAK_CLIENT_ID + * SWH_AUTH_SERVER_URL + * SWH_AUTH_REALM_NAME + * SWH_AUTH_CLIENT_ID Returns: An object to ease the interaction with the Keycloak server Raises: ValueError: at least one mandatory django setting is not set """ - server_url = getattr(settings, "KEYCLOAK_SERVER_URL", None) - realm_name = getattr(settings, "KEYCLOAK_REALM_NAME", None) - client_id = getattr(settings, "KEYCLOAK_CLIENT_ID", None) + server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None) + realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None) + client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None) if server_url is None or realm_name is None or client_id is None: raise ValueError( - "KEYCLOAK_SERVER_URL, KEYCLOAK_REALM_NAME and KEYCLOAK_CLIENT_ID django " + "SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django " "settings are mandatory to instantiate KeycloakOpenIDConnect class" ) return KeycloakOpenIDConnect( server_url=server_url, realm_name=realm_name, client_id=client_id ) + + +def reverse( + viewname: str, + url_args: Optional[Dict[str, Any]] = None, + query_params: Optional[Dict[str, Any]] = None, + current_app: Optional[str] = None, + urlconf: Optional[str] = None, + request: Optional[HttpRequest] = None, +) -> str: + """An override of django reverse function supporting query parameters. + + Args: + viewname: the name of the django view from which to compute a url + url_args: dictionary of url arguments indexed by their names + query_params: dictionary of query parameters to append to the + reversed url + current_app: the name of the django app tighten to the view + urlconf: url configuration module + request: build an absolute URI if provided + + Returns: + str: the url of the requested view with processed arguments and + query parameters + """ + + if url_args: + url_args = {k: v for k, v in url_args.items() if v is not None} + + url = django_reverse( + viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app + ) + + if query_params: + query_params = {k: v for k, v in query_params.items() if v is not None} + + if query_params and len(query_params) > 0: + query_dict = QueryDict("", mutable=True) + for k in sorted(query_params.keys()): + query_dict[k] = query_params[k] + url += "?" + query_dict.urlencode(safe="/;:") + + if request is not None: + url = request.build_absolute_uri(url) + + return url diff --git a/swh/auth/django/views.py b/swh/auth/django/views.py new file mode 100644 index 0000000..741f53e --- /dev/null +++ b/swh/auth/django/views.py @@ -0,0 +1,152 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, cast +import uuid + +from django.conf.urls import url +from django.contrib.auth import authenticate, login, logout +from django.core.cache import cache +from django.http import HttpRequest +from django.http.response import ( + HttpResponse, + HttpResponseBadRequest, + HttpResponseRedirect, + HttpResponseServerError, +) + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import keycloak_oidc_client, oidc_profile_cache_key, reverse +from swh.auth.keycloak import KeycloakError, keycloak_error_message +from swh.auth.utils import gen_oidc_pkce_codes + + +def oidc_login_view(request: HttpRequest, redirect_uri: str, scope: str = "openid"): + """ + Helper view function that initiates a login process using OIDC authorization + code flow with PKCE. + + OIDC session scope can be modified using the dedicated parameter. + """ + # generate a CSRF token + state = str(uuid.uuid4()) + + code_verifier, code_challenge = gen_oidc_pkce_codes() + + request.session["login_data"] = { + "code_verifier": code_verifier, + "state": state, + "redirect_uri": redirect_uri, + "next_path": request.GET.get("next_path", ""), + } + + authorization_url_params = { + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "scope": scope, + } + + try: + oidc_client = keycloak_oidc_client() + authorization_url = oidc_client.authorization_url( + redirect_uri, **authorization_url_params + ) + except KeycloakError as ke: + return HttpResponseServerError(keycloak_error_message(ke)) + + return HttpResponseRedirect(authorization_url) + + +def get_oidc_login_data(request: HttpRequest) -> Dict[str, Any]: + """ + Check and get login data stored in django session. + """ + if "login_data" not in request.session: + raise Exception("Login process has not been initialized.") + + login_data = request.session["login_data"] + + if "code" not in request.GET or "state" not in request.GET: + raise ValueError("Missing query parameters for authentication.") + + # get CSRF token returned by OIDC server + state = request.GET["state"] + + if state != login_data["state"]: + raise ValueError("Wrong CSRF token, aborting login process.") + + return login_data + + +def oidc_login(request: HttpRequest) -> HttpResponse: + """ + Django view to initiate login process using OpenID Connect authorization + code flow with PKCE. + """ + + redirect_uri = reverse("oidc-login-complete", request=request) + + return oidc_login_view(request, redirect_uri=redirect_uri) + + +def oidc_login_complete(request: HttpRequest) -> HttpResponse: + """ + Django view to finalize login process using OpenID Connect authorization + code flow with PKCE. + """ + if "error" in request.GET: + return HttpResponseServerError(request.GET["error"]) + + try: + login_data = get_oidc_login_data(request) + except ValueError as ve: + return HttpResponseBadRequest(str(ve)) + except Exception as e: + return HttpResponseServerError(str(e)) + + next_path = login_data["next_path"] or request.build_absolute_uri("/") + + user = authenticate( + request=request, + code=request.GET["code"], + code_verifier=login_data["code_verifier"], + redirect_uri=login_data["redirect_uri"], + ) + + if user is None: + return HttpResponseServerError("User authentication failed.") + + login(request, user) + + return HttpResponseRedirect(next_path) + + +def oidc_logout(request: HttpRequest) -> HttpResponse: + """ + Django view to logout using OpenID Connect. + """ + user = request.user + logout(request) + if hasattr(user, "refresh_token"): + user = cast(OIDCUser, user) + refresh_token = cast(str, user.refresh_token) + try: + # end OpenID Connect session + oidc_client = keycloak_oidc_client() + oidc_client.logout(refresh_token) + except KeycloakError as ke: + return HttpResponseServerError(keycloak_error_message(ke)) + # remove user data from cache + cache.delete(oidc_profile_cache_key(oidc_client, user.id)) + + return HttpResponseRedirect(request.GET.get("next_path", "/")) + + +urlpatterns = [ + url(r"^oidc/login/$", oidc_login, name="oidc-login"), + url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), + url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), +] diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py index 370ba64..8bf627d 100644 --- a/swh/auth/keycloak.py +++ b/swh/auth/keycloak.py @@ -1,238 +1,241 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Optional from urllib.parse import urlencode +# add ExpiredSignatureError alias to avoid leaking jose import +# in swh-auth client code +from jose.jwt import ExpiredSignatureError # noqa from keycloak import KeycloakOpenID -# The next import is required to allow callers to catch on their own term the following -# exception +# add KeycloakError alias to avoid leaking keycloak import +# in swh-auth client code from keycloak.exceptions import KeycloakError # noqa from swh.core.config import load_from_envvar class KeycloakOpenIDConnect: """ Wrapper class around python-keycloak to ease the interaction with Keycloak for managing authentication and user permissions with OpenID Connect. """ def __init__( self, server_url: str, realm_name: str, client_id: str, realm_public_key: str = "", ): """ Args: server_url: URL of the Keycloak server realm_name: The realm name client_id: The OpenID Connect client identifier realm_public_key: The realm public key (will be dynamically retrieved if not provided) """ self._keycloak = KeycloakOpenID( server_url=server_url, client_id=client_id, realm_name=realm_name, ) self.server_url = server_url self.realm_public_key = realm_public_key @property def realm_name(self): return self._keycloak.realm_name @realm_name.setter def realm_name(self, value): self._keycloak.realm_name = value @property def client_id(self): return self._keycloak.client_id @client_id.setter def client_id(self, value): self._keycloak.client_id = value def well_known(self) -> Dict[str, Any]: """ Retrieve the OpenID Connect Well-Known URI registry from Keycloak. Returns: A dictionary filled with OpenID Connect URIS. """ return self._keycloak.well_know() def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: """ Get OpenID Connect authorization URL to authenticate users. Args: redirect_uri: URI to redirect to once a user is authenticated extra_params: Extra query parameters to add to the authorization URL """ auth_url = self._keycloak.auth_url(redirect_uri) if extra_params: auth_url += "&%s" % urlencode(extra_params) return auth_url def authorization_code( self, code: str, redirect_uri: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Authorization Code flow. Raises: KeycloakError in case of authentication failures Args: code: Authorization code provided by Keycloak redirect_uri: URI to redirect to once a user is authenticated (must be the same as the one provided to authorization_url): extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="authorization_code", code=code, redirect_uri=redirect_uri, **extra_params, ) def login( self, username: str, password: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Direct Access Grant flow. Raises: KeycloakError in case of authentication failures Args: username: an existing username in the realm password: password associated to username extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="password", scope="openid", username=username, password=password, **extra_params, ) def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """ Request a new access token from Keycloak using a refresh token. Args: refresh_token: A refresh token provided by Keycloak Returns: A dictionary filled with tokens info """ return self._keycloak.refresh_token(refresh_token) def decode_token( self, token: str, options: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Try to decode a JWT token. Args: token: A JWT token to decode options: Options for jose.jwt.decode Returns: A dictionary filled with decoded token content """ if not self.realm_public_key: realm_public_key = self._keycloak.public_key() self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" self.realm_public_key += realm_public_key self.realm_public_key += "\n-----END PUBLIC KEY-----" return self._keycloak.decode_token( token, key=self.realm_public_key, options=options ) def logout(self, refresh_token: str) -> None: """ Logout a user by closing its authenticated session. Args: refresh_token: A refresh token provided by Keycloak """ self._keycloak.logout(refresh_token) def userinfo(self, access_token: str) -> Dict[str, Any]: """ Return user information from its access token. Args: access_token: An access token provided by Keycloak Returns: A dictionary fillled with user information """ return self._keycloak.userinfo(access_token) @classmethod def from_config(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": """Instantiate a KeycloakOpenIDConnect class from a configuration dict. Args: kwargs: configuration dict for the instance, with one keycloak key, whose value is a Dict with the following keys: - server_url: URL of the Keycloak server - realm_name: The realm name - client_id: The OpenID Connect client identifier Returns: the KeycloakOpenIDConnect instance """ cfg = kwargs["keycloak"] return cls( server_url=cfg["server_url"], realm_name=cfg["realm_name"], client_id=cfg["client_id"], ) @classmethod def from_configfile(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": """Instantiate a KeycloakOpenIDConnect class from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to instantiation call Returns: the KeycloakOpenIDConnect instance """ config = dict(load_from_envvar()).get("keycloak", {}) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(keycloak=config) def keycloak_error_message(keycloak_error: KeycloakError) -> str: """Transform a keycloak exception into an error message. """ msg_dict = json.loads(keycloak_error.error_message.decode()) error_msg = msg_dict["error"] error_desc = msg_dict.get("error_description") if error_desc: error_msg = f"{error_msg}: {error_desc}" return error_msg diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py index 71ebc67..e8581a1 100644 --- a/swh/auth/pytest_plugin.py +++ b/swh/auth/pytest_plugin.py @@ -1,209 +1,220 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime, timezone import json from typing import Dict, List, Optional from unittest.mock import Mock from keycloak.exceptions import KeycloakError import pytest from swh.auth.keycloak import KeycloakOpenIDConnect from swh.auth.tests.sample_data import ( CLIENT_ID, OIDC_PROFILE, RAW_REALM_PUBLIC_KEY, REALM_NAME, SERVER_URL, USER_INFO, ) class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): """Mock KeycloakOpenIDConnect class to allow testing Args: server_url: Server main auth url (cf. :py:data:`swh.auth.tests.sample_data.SERVER_URL`) realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`) client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`) auth_success: boolean flag to simulate authentication success or failure exp: expiration delay user_groups: user groups configuration (if any) user_permissions: user permissions configuration (if any) oidc_profile: Dict response from a call to a token authentication query (cf. :py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`) user_info: Dict response from a call to userinfo query (cf. :py:data:`swh.auth.tests.sample_data.USER_INFO`) raw_realm_public_key: A raw ascii text representing the realm public key (cf. :py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`) """ def __init__( self, server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], user_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): super().__init__( server_url=server_url, realm_name=realm_name, client_id=client_id ) self.exp = exp self.user_groups = user_groups self.user_permissions = user_permissions self._keycloak.public_key = lambda: raw_realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.set_auth_success(auth_success, oidc_profile, user_info) def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration and audience checks as we use a static # oidc_profile for the tests with expired tokens in it options["verify_exp"] = False options["verify_aud"] = False decoded = super().decode_token(token, options) # Merge the user info configured to be part of the decode token userinfo = self.userinfo() if userinfo is not None: decoded = {**decoded, **userinfo} # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["iat"] if self.exp is not None: decoded["exp"] = self.exp decoded["iat"] = self.exp - expire_in else: now = int(datetime.now(tz=timezone.utc).timestamp()) decoded["iat"] = now decoded["exp"] = now + expire_in decoded["groups"] = self.user_groups decoded["aud"] = [self.client_id, "account"] decoded["azp"] = self.client_id if self.user_permissions: decoded["resource_access"][self.client_id] = { "roles": self.user_permissions } return decoded def set_auth_success( self, auth_success: bool, oidc_profile: Optional[Dict] = None, user_info: Optional[Dict] = None, ) -> None: # following type ignore because mypy is not too happy about affecting mock to # method "Cannot assign to a method affecting mock". Ignore for now. self.authorization_code = Mock() # type: ignore self.refresh_token = Mock() # type: ignore self.login = Mock() # type: ignore self.userinfo = Mock() # type: ignore self.logout = Mock() # type: ignore self.auth_success = auth_success if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.refresh_token.return_value = copy(oidc_profile) self.login.return_value = copy(oidc_profile) self.userinfo.return_value = copy(user_info) else: self.authorization_url = Mock() # type: ignore error = { "error": "invalid_grant", "error_description": "Invalid user credentials", } error_message = json.dumps(error).encode() exception = KeycloakError(error_message=error_message, response_code=401) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception self.login.side_effect = exception def keycloak_oidc_factory( server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, user_groups: List[str] = [], user_permissions: List[str] = [], oidc_profile: Dict = OIDC_PROFILE, user_info: Dict = USER_INFO, raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): """Keycloak mock fixture factory. Report to :py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring. """ @pytest.fixture def keycloak_oidc(): return KeycloackOpenIDConnectMock( server_url=server_url, realm_name=realm_name, client_id=client_id, auth_success=auth_success, exp=exp, user_groups=user_groups, user_permissions=user_permissions, oidc_profile=oidc_profile, user_info=user_info, raw_realm_public_key=raw_realm_public_key, ) return keycloak_oidc # for backward compatibility # TODO: remove that alias once swh-deposit and swh-web use new function name keycloak_mock_factory = keycloak_oidc_factory # generic keycloak fixture that can be used within tests # (cf. test_keycloak.py, test_utils.py, django related tests) # or external modules using that pytest plugin -keycloak_oidc = keycloak_oidc_factory( +_keycloak_oidc = keycloak_oidc_factory( server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, ) + + +@pytest.fixture +def keycloak_oidc(_keycloak_oidc, mocker): + for oidc_client_factory in ( + "swh.auth.django.views.keycloak_oidc_client", + "swh.auth.django.backends.keycloak_oidc_client", + ): + keycloak_oidc_client = mocker.patch(oidc_client_factory) + keycloak_oidc_client.return_value = _keycloak_oidc + return _keycloak_oidc diff --git a/swh/auth/tests/conftest.py b/swh/auth/tests/conftest.py new file mode 100644 index 0000000..ab201d6 --- /dev/null +++ b/swh/auth/tests/conftest.py @@ -0,0 +1,12 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + + +# Alias rf fixture from pytest-django +@pytest.fixture +def request_factory(rf): + return rf diff --git a/swh/auth/tests/django/app/apptest/settings.py b/swh/auth/tests/django/app/apptest/settings.py index 543d0ac..08f16de 100644 --- a/swh/auth/tests/django/app/apptest/settings.py +++ b/swh/auth/tests/django/app/apptest/settings.py @@ -1,7 +1,41 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.auth.tests.sample_data import CLIENT_ID, REALM_NAME, SERVER_URL + SECRET_KEY = "o+&ayiuk(y^wh4ijz5e=c2$$kjj7g^6r%z+8d*c0lbpfs##k#7" INSTALLED_APPS = [ "django.contrib.auth", "django.contrib.contenttypes", + "django.contrib.sessions", "swh.auth.tests.django.app.apptest", ] + +MIDDLEWARE = [ + "django.middleware.security.SecurityMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.middleware.common.CommonMiddleware", + "django.middleware.csrf.CsrfViewMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", + "django.middleware.clickjacking.XFrameOptionsMiddleware", +] + +ROOT_URLCONF = "swh.auth.tests.django.app.apptest.urls" + +DATABASES = { + "default": {"ENGINE": "django.db.backends.sqlite3", "NAME": "swh-auth-test-db",} +} + +SESSION_ENGINE = "django.contrib.sessions.backends.cache" + +AUTHENTICATION_BACKENDS = [ + "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", +] + +SWH_AUTH_SERVER_URL = SERVER_URL +SWH_AUTH_REALM_NAME = REALM_NAME +SWH_AUTH_CLIENT_ID = CLIENT_ID diff --git a/swh/auth/tests/django/app/apptest/urls.py b/swh/auth/tests/django/app/apptest/urls.py index 541ee7b..0235d1c 100644 --- a/swh/auth/tests/django/app/apptest/urls.py +++ b/swh/auth/tests/django/app/apptest/urls.py @@ -1,6 +1,16 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -urlpatterns = [] # type: ignore +from django.conf.urls import url +from django.http import HttpResponse + +from swh.auth.django.views import urlpatterns as auth_urlpatterns + + +def _root_view(request): + return HttpResponse("Hello World !") + + +urlpatterns = [url(r"^$", _root_view, name="root")] + auth_urlpatterns diff --git a/swh/auth/tests/django/django_asserts.py b/swh/auth/tests/django/django_asserts.py new file mode 100644 index 0000000..2c67cea --- /dev/null +++ b/swh/auth/tests/django/django_asserts.py @@ -0,0 +1,21 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# https://github.com/pytest-dev/pytest-django/pull/709 proposes a more +# generic way to expose all asserts but it makes mypy unhappy. +# So explicitly expose the assertions we need for swh-web tests to +# avoid mypy errors + +""" +Expose some Django assertions to be used with pytest +""" + +from django.test import TestCase + +_test_case = TestCase("run") + +assert_template_used = _test_case.assertTemplateUsed +assert_contains = _test_case.assertContains +assert_not_contains = _test_case.assertNotContains diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py new file mode 100644 index 0000000..dcda102 --- /dev/null +++ b/swh/auth/tests/django/test_backends.py @@ -0,0 +1,151 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime, timedelta +from unittest.mock import Mock + +from django.conf import settings +from django.contrib.auth import authenticate, get_backends +import pytest + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import reverse +from swh.auth.keycloak import ExpiredSignatureError + + +def _authenticate_user(request_factory): + request = request_factory.get(reverse("root")) + + return authenticate( + request=request, + code="some-code", + code_verifier="some-code-verifier", + redirect_uri="https://localhost:5004", + ) + + +def _check_authenticated_user(user, decoded_token, keycloak_oidc): + assert user is not None + assert isinstance(user, OIDCUser) + assert user.id != 0 + assert user.username == decoded_token["preferred_username"] + assert user.password == "" + assert user.first_name == decoded_token["given_name"] + assert user.last_name == decoded_token["family_name"] + assert user.email == decoded_token["email"] + assert user.is_staff == ("/staff" in decoded_token["groups"]) + assert user.sub == decoded_token["sub"] + resource_access = decoded_token.get("resource_access", {}) + resource_access_client = resource_access.get(keycloak_oidc.client_id, {}) + assert user.permissions == set(resource_access_client.get("roles", [])) + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_success(keycloak_oidc, request_factory): + """ + Checks successful login based on OpenID Connect with PKCE extension + Django authentication backend (login from Web UI). + """ + keycloak_oidc.user_groups = ["/staff"] + + oidc_profile = keycloak_oidc.login() + user = _authenticate_user(request_factory) + + decoded_token = keycloak_oidc.decode_token(user.access_token) + _check_authenticated_user(user, decoded_token, keycloak_oidc) + + auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) + exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) + refresh_exp_datetime = auth_datetime + timedelta( + seconds=oidc_profile["refresh_expires_in"] + ) + + assert user.access_token == oidc_profile["access_token"] + assert user.expires_at == exp_datetime + assert user.id_token == oidc_profile["id_token"] + assert user.refresh_token == oidc_profile["refresh_token"] + assert user.refresh_expires_at == refresh_exp_datetime + assert user.scope == oidc_profile["scope"] + assert user.session_state == oidc_profile["session_state"] + + backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend" + assert user.backend == backend_path + backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) + assert get_backends()[backend_idx].get_user(user.id) == user + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_failure(keycloak_oidc, request_factory): + """ + Checks failed login based on OpenID Connect with PKCE extension Django + authentication backend (login from Web UI). + """ + keycloak_oidc.set_auth_success(False) + + user = _authenticate_user(request_factory) + + assert user is None + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_refresh_token_success( + keycloak_oidc, request_factory +): + """ + Checks access token renewal success using refresh token. + """ + + oidc_profile = keycloak_oidc.login() + decoded_token = keycloak_oidc.decode_token(oidc_profile["access_token"]) + + keycloak_oidc.decode_token = Mock() + keycloak_oidc.decode_token.side_effect = [ + ExpiredSignatureError("access token token has expired"), + decoded_token, + ] + + user = _authenticate_user(request_factory) + + oidc_profile = keycloak_oidc.login() + keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) + + assert user is not None + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_refresh_token_failure( + keycloak_oidc, request_factory +): + """ + Checks access token renewal failure using refresh token. + """ + keycloak_oidc.decode_token = Mock() + keycloak_oidc.decode_token.side_effect = ExpiredSignatureError( + "access token token has expired" + ) + keycloak_oidc.refresh_token.side_effect = Exception("OIDC session has expired") + + user = _authenticate_user(request_factory) + + oidc_profile = keycloak_oidc.login() + keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) + + assert user is None + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_permissions(keycloak_oidc, request_factory): + """ + Checks that a permission defined with OpenID Connect is correctly mapped + to a Django one when logging from Web UI. + """ + permission = "webapp.some-permission" + keycloak_oidc.user_permissions = [permission] + user = _authenticate_user(request_factory) + assert user.has_perm(permission) + assert user.get_all_permissions() == {permission} + assert user.get_group_permissions() == {permission} + assert user.has_module_perms("webapp") + assert not user.has_module_perms("foo") diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py index 3853851..a7878d4 100644 --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -1,117 +1,120 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime from django.test import override_settings import pytest from swh.auth.django.utils import ( keycloak_oidc_client, oidc_user_from_decoded_token, oidc_user_from_profile, ) from swh.auth.tests.sample_data import ( CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, REALM_NAME, SERVER_URL, ) def _check_user(user, is_staff=False, permissions=set()): assert user.id > 0 assert user.username == DECODED_TOKEN["preferred_username"] assert user.password == "" assert user.first_name == DECODED_TOKEN["given_name"] assert user.last_name == DECODED_TOKEN["family_name"] assert user.email == DECODED_TOKEN["email"] assert user.is_staff == is_staff assert user.permissions == permissions assert user.sub == DECODED_TOKEN["sub"] date_now = datetime.now() if user.expires_at is not None: assert isinstance(user.expires_at, datetime) assert date_now <= user.expires_at if user.refresh_expires_at is not None: assert isinstance(user.refresh_expires_at, datetime) assert date_now <= user.refresh_expires_at assert user.oidc_profile == { k: getattr(user, k) for k in ( "access_token", "expires_in", "expires_at", "id_token", "refresh_token", "refresh_expires_in", "refresh_expires_at", "scope", "session_state", ) } def test_oidc_user_from_decoded_token(): user = oidc_user_from_decoded_token(DECODED_TOKEN) _check_user(user) def test_oidc_user_from_decoded_token2(): decoded_token = copy(DECODED_TOKEN) decoded_token["groups"] = ["/staff", "api"] decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}} user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) _check_user(user, is_staff=True, permissions={"read-api"}) @pytest.mark.parametrize( "key,mapped_key", [ ("preferred_username", "username"), ("given_name", "first_name"), ("family_name", "last_name"), ("email", "email"), ], ) def test_oidc_user_from_decoded_token_empty_fields_ok(key, mapped_key): decoded_token = copy(DECODED_TOKEN) decoded_token.pop(key, None) user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID) # Ensure the missing field is mapped to an empty value assert getattr(user, mapped_key) == "" def test_oidc_user_from_profile(keycloak_oidc): user = oidc_user_from_profile(keycloak_oidc, OIDC_PROFILE) _check_user(user) +@override_settings( + SWH_AUTH_SERVER_URL=None, SWH_AUTH_REALM_NAME=None, SWH_AUTH_CLIENT_ID=None, +) def test_keycloak_oidc_client_missing_django_settings(): with pytest.raises(ValueError, match="settings are mandatory"): keycloak_oidc_client() @override_settings( - KEYCLOAK_SERVER_URL=SERVER_URL, - KEYCLOAK_REALM_NAME=REALM_NAME, - KEYCLOAK_CLIENT_ID=CLIENT_ID, + SWH_AUTH_SERVER_URL=SERVER_URL, + SWH_AUTH_REALM_NAME=REALM_NAME, + SWH_AUTH_CLIENT_ID=CLIENT_ID, ) def test_keycloak_oidc_client_parameters_from_django_settings(): kc_oidc_client = keycloak_oidc_client() assert kc_oidc_client.server_url == SERVER_URL assert kc_oidc_client.realm_name == REALM_NAME assert kc_oidc_client.client_id == CLIENT_ID diff --git a/swh/auth/tests/django/test_views.py b/swh/auth/tests/django/test_views.py new file mode 100644 index 0000000..b0df547 --- /dev/null +++ b/swh/auth/tests/django/test_views.py @@ -0,0 +1,282 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +from urllib.parse import urljoin, urlparse +import uuid + +from django.contrib.auth.models import AnonymousUser, User +from django.http import QueryDict +import pytest + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import reverse +from swh.auth.keycloak import KeycloakError +from swh.auth.tests.django.django_asserts import assert_contains +from swh.auth.tests.sample_data import CLIENT_ID + + +def _check_oidc_login_code_flow_data( + request, response, keycloak_oidc, redirect_uri, scope="openid" +): + parsed_url = urlparse(response["location"]) + + authorization_url = keycloak_oidc.well_known()["authorization_endpoint"] + query_dict = QueryDict(parsed_url.query) + + # check redirect url is valid + assert urljoin(response["location"], parsed_url.path) == authorization_url + assert "client_id" in query_dict + assert query_dict["client_id"] == CLIENT_ID + assert "response_type" in query_dict + assert query_dict["response_type"] == "code" + assert "redirect_uri" in query_dict + assert query_dict["redirect_uri"] == redirect_uri + assert "code_challenge_method" in query_dict + assert query_dict["code_challenge_method"] == "S256" + assert "scope" in query_dict + assert query_dict["scope"] == scope + assert "state" in query_dict + assert "code_challenge" in query_dict + + # check a login_data has been registered in user session + assert "login_data" in request.session + login_data = request.session["login_data"] + assert "code_verifier" in login_data + assert "state" in login_data + assert "redirect_uri" in login_data + assert login_data["redirect_uri"] == query_dict["redirect_uri"] + return login_data + + +@pytest.mark.django_db +def test_oidc_login_views_success(client, keycloak_oidc): + """ + Simulate a successful login authentication with OpenID Connect + authorization code flow with PKCE. + """ + # user initiates login process + login_url = reverse("oidc-login") + + # should redirect to Keycloak authentication page in order + # for a user to login with its username / password + response = client.get(login_url) + assert response.status_code == 302 + + request = response.wsgi_request + assert isinstance(request.user, AnonymousUser) + + login_data = _check_oidc_login_code_flow_data( + request, + response, + keycloak_oidc, + redirect_uri=reverse("oidc-login-complete", request=request), + ) + + # once a user has identified himself in Keycloak, he is + # redirected to the 'oidc-login-complete' view to + # login in Django. + + # generate authorization code / session state in the same + # manner as Keycloak + code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" + session_state = str(uuid.uuid4()) + + login_complete_url = reverse( + "oidc-login-complete", + query_params={ + "code": code, + "state": login_data["state"], + "session_state": session_state, + }, + ) + + # login process finalization, should redirect to root url by default + response = client.get(login_complete_url) + assert response.status_code == 302 + + request = response.wsgi_request + assert response["location"] == request.build_absolute_uri("/") + + # user should be authenticated + assert isinstance(request.user, OIDCUser) + + # check remote user has not been saved to Django database + with pytest.raises(User.DoesNotExist): + User.objects.get(username=request.user.username) + + +@pytest.mark.django_db +def test_oidc_logout_view_success(client, keycloak_oidc): + """ + Simulate a successful logout operation with OpenID Connect. + """ + # login our test user + client.login(code="", code_verifier="", redirect_uri="") + keycloak_oidc.authorization_code.assert_called() + + # user initiates logout + next_path = reverse("root") + oidc_logout_url = reverse("oidc-logout", query_params={"next_path": next_path}) + + # should redirect to logout page + response = client.get(oidc_logout_url) + assert response.status_code == 302 + + request = response.wsgi_request + assert response["location"] == next_path + + # should have been logged out in Keycloak + oidc_profile = keycloak_oidc.login() + keycloak_oidc.logout.assert_called_with(oidc_profile["refresh_token"]) + + # check effective logout in Django + assert isinstance(request.user, AnonymousUser) + + +@pytest.mark.django_db +def test_oidc_login_view_failure(client, keycloak_oidc): + """ + Simulate a failed authentication with OpenID Connect. + """ + keycloak_oidc.set_auth_success(False) + + # user initiates login process + login_url = reverse("oidc-login") + # should render an error page + response = client.get(login_url) + assert response.status_code == 500 + request = response.wsgi_request + + # no users should be logged in + assert isinstance(request.user, AnonymousUser) + + +# Simulate possible errors with OpenID Connect in the login complete view. + + +def test_oidc_login_complete_view_no_login_data(client): + # user initiates login process + login_url = reverse("oidc-login-complete") + # should return with error + response = client.get(login_url) + assert response.status_code == 500 + + assert_contains( + response, "Login process has not been initialized.", status_code=500 + ) + + +def test_oidc_login_complete_view_missing_parameters(client): + # simulate login process has been initialized + session = client.session + session["login_data"] = { + "code_verifier": "", + "state": str(uuid.uuid4()), + "redirect_uri": "", + "next_path": "", + } + session.save() + + # user initiates login process + login_url = reverse("oidc-login-complete") + + # should return with error + response = client.get(login_url) + assert response.status_code == 400 + request = response.wsgi_request + assert_contains( + response, "Missing query parameters for authentication.", status_code=400 + ) + + # no user should be logged in + assert isinstance(request.user, AnonymousUser) + + +def test_oidc_login_complete_wrong_csrf_token(client, keycloak_oidc): + # simulate login process has been initialized + session = client.session + session["login_data"] = { + "code_verifier": "", + "state": str(uuid.uuid4()), + "redirect_uri": "", + "next_path": "", + } + session.save() + + # user initiates login process + login_url = reverse( + "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} + ) + + # should render an error page + response = client.get(login_url) + assert response.status_code == 400 + request = response.wsgi_request + assert_contains( + response, "Wrong CSRF token, aborting login process.", status_code=400 + ) + + # no user should be logged in + assert isinstance(request.user, AnonymousUser) + + +@pytest.mark.django_db +def test_oidc_login_complete_wrong_code_verifier(client, keycloak_oidc): + keycloak_oidc.set_auth_success(False) + + # simulate login process has been initialized + session = client.session + session["login_data"] = { + "code_verifier": "", + "state": str(uuid.uuid4()), + "redirect_uri": "", + "next_path": "", + } + session.save() + + # check authentication error is reported + login_url = reverse( + "oidc-login-complete", + query_params={"code": "some-code", "state": session["login_data"]["state"]}, + ) + + # should render an error page + response = client.get(login_url) + print(response.content) + assert response.status_code == 500 + + request = response.wsgi_request + assert_contains(response, "User authentication failed.", status_code=500) + + # no user should be logged in + assert isinstance(request.user, AnonymousUser) + + +@pytest.mark.django_db +def test_oidc_logout_view_failure(client, keycloak_oidc): + """ + Simulate a failed logout operation with OpenID Connect. + """ + # login our test user + client.login(code="", code_verifier="", redirect_uri="") + + error = "unknown_error" + error_message = json.dumps({"error": error}).encode() + keycloak_oidc.logout.side_effect = KeycloakError( + error_message=error_message, response_code=401 + ) + + # user initiates logout process + logout_url = reverse("oidc-logout") + + # should return with error + response = client.get(logout_url) + assert response.status_code == 500 + request = response.wsgi_request + assert_contains(response, error, status_code=500) + + # user should be logged out from Django anyway + assert isinstance(request.user, AnonymousUser) diff --git a/swh/auth/tests/test_utils.py b/swh/auth/tests/test_utils.py new file mode 100644 index 0000000..ce9485a --- /dev/null +++ b/swh/auth/tests/test_utils.py @@ -0,0 +1,36 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from base64 import urlsafe_b64encode +import hashlib +import re + +from swh.auth.utils import gen_oidc_pkce_codes + + +def test_gen_oidc_pkce_codes(): + """ + Check generated PKCE codes respect the specification + (see https://tools.ietf.org/html/rfc7636#section-4.1) + """ + code_verifier, code_challenge = gen_oidc_pkce_codes() + + # check the code verifier only contains allowed characters + assert re.match(r"[a-zA-Z0-9-\._~]+", code_verifier) + + # check minimum and maximum authorized length for the + # code verifier + assert len(code_verifier) >= 43 + assert len(code_verifier) <= 128 + + # compute code challenge from code verifier + challenge = hashlib.sha256(code_verifier.encode("ascii")).digest() + challenge = urlsafe_b64encode(challenge).decode("ascii") + challenge = challenge.replace("=", "") + + # check base64 padding is not present + assert not code_challenge[-1].endswith("=") + # check code challenge is valid + assert code_challenge == challenge diff --git a/swh/auth/utils.py b/swh/auth/utils.py new file mode 100644 index 0000000..2b2c67b --- /dev/null +++ b/swh/auth/utils.py @@ -0,0 +1,35 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from base64 import urlsafe_b64encode +import hashlib +import secrets +from typing import Tuple + + +def gen_oidc_pkce_codes() -> Tuple[str, str]: + """ + Generates a code verifier and a code challenge to be used + with the OpenID Connect authorization code flow with PKCE + ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636). + + PKCE replaces the static secret used in the standard authorization + code flow with a temporary one-time challenge, making it feasible + to use in public clients. + + The implementation is inspired from that blog post: + https://www.stefaanlippens.net/oauth-code-flow-pkce.html + """ + # generate a code verifier which is a long enough random alphanumeric + # string, only to be used "client side" + code_verifier_str = secrets.token_urlsafe(60) + + # create the PKCE code challenge by hashing the code verifier with SHA256 + # and encoding the result in URL-safe base64 (without padding) + code_challenge = hashlib.sha256(code_verifier_str.encode("ascii")).digest() + code_challenge_str = urlsafe_b64encode(code_challenge).decode("ascii") + code_challenge_str = code_challenge_str.replace("=", "") + + return code_verifier_str, code_challenge_str